"""
Context managers.
"""
import collections
import contextlib
import filecmp
import os
import pathlib
import shutil
import subprocess
import sys
import time
import types
import typing
from . import _updatelib
from . import configuration as _configuration
from . import iterate as _iterate
from . import score as _score
from . import string as _string
_Configuration = _configuration.Configuration()
[docs]
class FilesystemState:
    """
    Filesystem state context manager.
    """
    __slots__ = ("_keep", "_remove")
    _is_abstract = True
    def __init__(
        self,
        *,
        keep: list[pathlib.Path] | None = None,
        remove: list[pathlib.Path] | None = None,
    ):
        keep = keep or []
        keep_tuple = tuple([str(_) for _ in keep])
        self._keep = keep_tuple
        remove = remove or []
        remove_tuple = tuple([str(_) for _ in remove])
        self._remove = remove_tuple
    def __enter__(self) -> "FilesystemState":
        """
        Backs up filesystem paths.
        """
        for path in self.remove():
            assert not os.path.exists(path), repr(path)
        for path in self.keep():
            assert os.path.exists(path), repr(path)
            assert os.path.isfile(path) or os.path.isdir(path), repr(path)
        for path in self.keep():
            backup_path = path + ".backup"
            if os.path.isfile(path):
                shutil.copyfile(path, backup_path)
                assert filecmp.cmp(path, backup_path), repr(path)
            elif os.path.isdir(path):
                shutil.copytree(path, backup_path)
            else:
                raise TypeError(f"neither file nor directory: {path}.")
        return self
    def __exit__(self, exg_type, exc_value, trackeback) -> None:
        """
        Restores filesytem paths and removes backups; also removes paths in
        remove list.
        """
        backup_paths = (_ + ".backup" for _ in self.keep())
        for path in backup_paths:
            assert os.path.exists(path), repr(path)
        for path in self.keep():
            backup_path = path + ".backup"
            assert os.path.exists(backup_path), repr(backup_path)
            if os.path.isfile(backup_path):
                shutil.copyfile(backup_path, path)
                filecmp.cmp(path, backup_path)
                os.remove(backup_path)
            elif os.path.isdir(backup_path):
                if os.path.exists(path):
                    shutil.rmtree(path)
                shutil.copytree(backup_path, path)
                shutil.rmtree(backup_path)
            else:
                raise TypeError(f"neither file nor directory: {path}.")
        for path in self.remove():
            if os.path.exists(path):
                if os.path.isfile(path):
                    os.remove(path)
                elif os.path.isdir(path):
                    shutil.rmtree(path)
                else:
                    raise TypeError(f"neither file nor directory: {path}.")
        for path in self.keep():
            assert os.path.exists(path), repr(path)
        for path in backup_paths:
            assert not os.path.exists(path), repr(path)
[docs]
    def keep(self) -> tuple[str, ...]:
        """
        Gets paths to restore on exit.
        """
        return self._keep 
[docs]
    def remove(self) -> tuple[str, ...]:
        """
        Gets paths to remove on exit.
        """
        return self._remove 
 
[docs]
class ForbidUpdate:
    """
    Forbid update context manager.
    """
    __slots__ = ("_component", "_update_on_exit")
    _is_abstract = True
    def __init__(self, component: _score.Component, *, update_on_exit: bool = False):
        assert isinstance(component, _score.Component), repr(component)
        self._component = component
        assert isinstance(update_on_exit, bool), repr(update_on_exit)
        self._update_on_exit = update_on_exit
    def __enter__(self) -> "ForbidUpdate":
        r"""
        Enters context manager.
        ..  container:: example
            REGRESSION. Indicators need to be updated after swap; context
            manager updates indicators before forbidding further updates:
            >>> staff = abjad.Staff(r"\tuplet 1/1 { c'4 d' }")
            >>> abjad.attach(abjad.Clef("alto"), staff[0][0])
            >>> container = abjad.Container()
            >>> abjad.mutate.swap(staff[0], container)
            >>> with abjad.contextmanagers.ForbidUpdate(staff):
            ...     for note in staff[0]:
            ...         print(note)
            ...         print(abjad.get.effective_indicator(note, abjad.Clef))
            ...
            Note("c'4")
            Clef(name='alto')
            Note("d'4")
            Clef(name='alto')
        """
        for component_ in _iterate.components(self.component()):
            _updatelib._update_now(
                component_, indicators=True, offsets=True, offsets_in_seconds=True
            )
        self.component()._is_forbidden_to_update = True
        return self
    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """
        Exits context manager.
        """
        self.component()._is_forbidden_to_update = False
        if self.update_on_exit() is True:
            for component_ in _iterate.components(self.component()):
                _updatelib._update_now(
                    component_,
                    indicators=True,
                    offsets=True,
                    offsets_in_seconds=True,
                )
        else:
            assert self.update_on_exit() is False
[docs]
    def component(self) -> _score.Component:
        """
        Gets component.
        """
        return self._component 
[docs]
    def update_on_exit(self) -> bool:
        """
        Is true when context manager updates offsets on exit.
        """
        return self._update_on_exit 
 
[docs]
class NullContextManager:
    """
    Null context manager.
    """
    __slots__ = ()
    _is_abstract = True
    def __init__(self):
        pass
    def __enter__(self) -> None:
        """
        Enters context manager and does nothing.
        """
        pass
    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """
        Exits context manager and does nothing.
        """
        pass 
[docs]
class RedirectedStreams:
    """
    Redirected streams context manager.
    ..  container:: example
        Captures stdout and stderr output:
        >>> import io
        >>> string_io = io.StringIO()
        >>> with abjad.contextmanagers.RedirectedStreams(stdout=string_io):
        ...     print("hello, world!")
        ...
        >>> result = string_io.getvalue()
        >>> string_io.close()
        >>> print(result)
        hello, world!
        <BLANKLINE>
    """
    __slots__ = ("_stdout", "_stderr", "_old_stderr", "_old_stdout")
    _is_abstract = True
    def __init__(
        self,
        *,
        stdout: typing.TextIO | None = None,
        stderr: typing.TextIO | None = None,
    ):
        if stdout is None:
            self._stdout = sys.stdout
        else:
            self._stdout = stdout
        if stderr is None:
            self._stderr = sys.stderr
        else:
            self._stderr = stderr
    def __enter__(self) -> "RedirectedStreams":
        """
        Enters redirected streams context manager.
        """
        self._old_stdout, self._old_stderr = sys.stdout, sys.stderr
        self._old_stdout.flush()
        self._old_stderr.flush()
        sys.stdout, sys.stderr = self.stdout(), self.stderr()
        return self
    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: types.TracebackType | None,
    ) -> None:
        """
        Exits redirected streams context manager.
        """
        try:
            self.stdout().flush()
            self.stderr().flush()
        except Exception:
            pass
        sys.stdout = self._old_stdout
        sys.stderr = self._old_stderr
[docs]
    def stderr(self) -> typing.TextIO:
        """
        Gets stderr of context manager.
        """
        return self._stderr 
[docs]
    def stdout(self) -> typing.TextIO:
        """
        Gets stdout of context manager.
        """
        return self._stdout 
 
[docs]
@contextlib.contextmanager
def temporary_directory_change(
    directory: str | os.PathLike,
) -> collections.abc.Iterator[None]:
    """
    Temporary directory change context manager.
    """
    original_directory = os.getcwd()
    os.chdir(directory)
    try:
        yield
    finally:
        os.chdir(original_directory) 
[docs]
class Timer:
    """
    Timer context manager.
    ..  container:: example
        Prints elapsed time after timer finishes:
        >>> timer = abjad.contextmanagers.Timer()
        >>> with timer:
        ...     for _ in range(1000000):
        ...         x = 1 + 1
        ...
        >>> timer.elapsed_time() # doctest: +SKIP
        0.092742919921875
    ..  container:: example
        Prints elapsed time while timer is running; timers can be reused
        between with-blocks:
        >>> with abjad.contextmanagers.Timer() as timer: # doctest: +SKIP
        ...     for _ in range(5):
        ...         for _ in range(1000000):
        ...             x = 1 + 1
        ...         print(timer.elapsed_time())
        ...
        0.101150989532
        0.203935861588
        0.304930925369
        0.4057970047
        0.50649189949
    """
    __slots__ = (
        "_enter_message",
        "_exit_message",
        "_print_continuously_from_background",
        "_process",
        "_start_time",
        "_stop_time",
        "_timer_process",
        "_verbose",
    )
    _is_abstract = True
    def __init__(
        self,
        *,
        enter_message: str = "",
        exit_message: str = "",
        print_continuously_from_background: bool = False,
        verbose: bool = True,
    ):
        assert isinstance(enter_message, str), repr(enter_message)
        self._enter_message = enter_message
        assert isinstance(exit_message, str), repr(exit_message)
        self._exit_message = exit_message
        self._print_continuously_from_background = print_continuously_from_background
        self._process: subprocess.Popen | None = None
        self._timer_process = None
        self._start_time: float | None = None
        self._stop_time: float | None = None
        assert isinstance(verbose, bool), repr(verbose)
        self._verbose = bool(verbose)
    def __enter__(self) -> "Timer":
        """
        Enters context manager.
        """
        if self.enter_message() and self.verbose():
            print(self.enter_message())
        self._stop_time = None
        self._start_time = time.time()
        if self.print_continuously_from_background():
            path = (
                _Configuration.abjad_install_directory().parent / "scripts" / "timer.py"
            )
            interval = str(int(self.print_continuously_from_background()))
            process = subprocess.Popen([path, interval], shell=False)
            self._process = process
        return self
    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """
        Exits context manager.
        """
        self._stop_time = time.time()
        if self._process is not None:
            self._process.kill()
        if self.exit_message() and self.verbose():
            print(self.exit_message(), self.total_time_message())
[docs]
    def elapsed_time(self) -> float:
        """
        Gets elapsed time of timer.
        """
        start_time, stop_time = self.start_time(), self.stop_time()
        if start_time is not None:
            if stop_time is not None:
                return stop_time - start_time
            return time.time() - start_time
        return 0 
[docs]
    def enter_message(self) -> str:
        """
        Gets timer enter message.
        """
        return self._enter_message 
[docs]
    def exit_message(self) -> str:
        """
        Gets timer exit message.
        """
        return self._exit_message 
[docs]
    def print_continuously_from_background(self) -> bool:
        """
        Is true when timer prints continuously from background.
        """
        return self._print_continuously_from_background 
[docs]
    def start_time(self) -> float | None:
        """
        Gets start time of timer.
        """
        return self._start_time 
[docs]
    def stop_time(self) -> float | None:
        """
        Gets stop time of timer.
        """
        return self._stop_time 
[docs]
    def total_time_message(self) -> str:
        """
        Gets total time message, truncated to nearest second.
        """
        identifier = _string.pluralize("second", int(self.elapsed_time()))
        message = f"total time {int(self.elapsed_time())} {identifier} ..."
        return message 
[docs]
    def verbose(self) -> bool:
        """
        Is true when timer prints messages.
        """
        return self._verbose