Source code for abjad.contextmanagers

"""
Context managers.
"""

import collections
import contextlib
import filecmp
import os
import pathlib
import shutil
import subprocess
import sys
import time
import types
import typing

from . import _updatelib
from . import configuration as _configuration
from . import iterate as _iterate
from . import score as _score
from . import string as _string

_Configuration = _configuration.Configuration()


[docs] class FilesystemState: """ Filesystem state context manager. """ __slots__ = ("_keep", "_remove") _is_abstract = True def __init__( self, *, keep: list[pathlib.Path] | None = None, remove: list[pathlib.Path] | None = None, ): keep = keep or [] keep_tuple = tuple([str(_) for _ in keep]) self._keep = keep_tuple remove = remove or [] remove_tuple = tuple([str(_) for _ in remove]) self._remove = remove_tuple def __enter__(self) -> "FilesystemState": """ Backs up filesystem paths. """ for path in self.remove(): assert not os.path.exists(path), repr(path) for path in self.keep(): assert os.path.exists(path), repr(path) assert os.path.isfile(path) or os.path.isdir(path), repr(path) for path in self.keep(): backup_path = path + ".backup" if os.path.isfile(path): shutil.copyfile(path, backup_path) assert filecmp.cmp(path, backup_path), repr(path) elif os.path.isdir(path): shutil.copytree(path, backup_path) else: raise TypeError(f"neither file nor directory: {path}.") return self def __exit__(self, exg_type, exc_value, trackeback) -> None: """ Restores filesytem paths and removes backups; also removes paths in remove list. """ backup_paths = (_ + ".backup" for _ in self.keep()) for path in backup_paths: assert os.path.exists(path), repr(path) for path in self.keep(): backup_path = path + ".backup" assert os.path.exists(backup_path), repr(backup_path) if os.path.isfile(backup_path): shutil.copyfile(backup_path, path) filecmp.cmp(path, backup_path) os.remove(backup_path) elif os.path.isdir(backup_path): if os.path.exists(path): shutil.rmtree(path) shutil.copytree(backup_path, path) shutil.rmtree(backup_path) else: raise TypeError(f"neither file nor directory: {path}.") for path in self.remove(): if os.path.exists(path): if os.path.isfile(path): os.remove(path) elif os.path.isdir(path): shutil.rmtree(path) else: raise TypeError(f"neither file nor directory: {path}.") for path in self.keep(): assert os.path.exists(path), repr(path) for path in backup_paths: assert not os.path.exists(path), repr(path)
[docs] def keep(self) -> tuple[str, ...]: """ Gets paths to restore on exit. """ return self._keep
[docs] def remove(self) -> tuple[str, ...]: """ Gets paths to remove on exit. """ return self._remove
[docs] class ForbidUpdate: """ Forbid update context manager. """ __slots__ = ("_component", "_update_on_exit") _is_abstract = True def __init__(self, component: _score.Component, *, update_on_exit: bool = False): assert isinstance(component, _score.Component), repr(component) self._component = component assert isinstance(update_on_exit, bool), repr(update_on_exit) self._update_on_exit = update_on_exit def __enter__(self) -> "ForbidUpdate": r""" Enters context manager. .. container:: example REGRESSION. Indicators need to be updated after swap; context manager updates indicators before forbidding further updates: >>> staff = abjad.Staff(r"\tuplet 1/1 { c'4 d' }") >>> abjad.attach(abjad.Clef("alto"), staff[0][0]) >>> container = abjad.Container() >>> abjad.mutate.swap(staff[0], container) >>> with abjad.contextmanagers.ForbidUpdate(staff): ... for note in staff[0]: ... print(note) ... print(abjad.get.effective_indicator(note, abjad.Clef)) ... Note("c'4") Clef(name='alto') Note("d'4") Clef(name='alto') """ for component_ in _iterate.components(self.component()): _updatelib._update_now( component_, indicators=True, offsets=True, offsets_in_seconds=True ) self.component()._is_forbidden_to_update = True return self def __exit__(self, exc_type, exc_value, traceback) -> None: """ Exits context manager. """ self.component()._is_forbidden_to_update = False if self.update_on_exit() is True: for component_ in _iterate.components(self.component()): _updatelib._update_now( component_, indicators=True, offsets=True, offsets_in_seconds=True, ) else: assert self.update_on_exit() is False
[docs] def component(self) -> _score.Component: """ Gets component. """ return self._component
[docs] def update_on_exit(self) -> bool: """ Is true when context manager updates offsets on exit. """ return self._update_on_exit
[docs] class NullContextManager: """ Null context manager. """ __slots__ = () _is_abstract = True def __init__(self): pass def __enter__(self) -> None: """ Enters context manager and does nothing. """ pass def __exit__(self, exc_type, exc_value, traceback) -> None: """ Exits context manager and does nothing. """ pass
[docs] class RedirectedStreams: """ Redirected streams context manager. .. container:: example Captures stdout and stderr output: >>> import io >>> string_io = io.StringIO() >>> with abjad.contextmanagers.RedirectedStreams(stdout=string_io): ... print("hello, world!") ... >>> result = string_io.getvalue() >>> string_io.close() >>> print(result) hello, world! <BLANKLINE> """ __slots__ = ("_stdout", "_stderr", "_old_stderr", "_old_stdout") _is_abstract = True def __init__( self, *, stdout: typing.TextIO | None = None, stderr: typing.TextIO | None = None, ): if stdout is None: self._stdout = sys.stdout else: self._stdout = stdout if stderr is None: self._stderr = sys.stderr else: self._stderr = stderr def __enter__(self) -> "RedirectedStreams": """ Enters redirected streams context manager. """ self._old_stdout, self._old_stderr = sys.stdout, sys.stderr self._old_stdout.flush() self._old_stderr.flush() sys.stdout, sys.stderr = self.stdout(), self.stderr() return self def __exit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: types.TracebackType | None, ) -> None: """ Exits redirected streams context manager. """ try: self.stdout().flush() self.stderr().flush() except Exception: pass sys.stdout = self._old_stdout sys.stderr = self._old_stderr
[docs] def stderr(self) -> typing.TextIO: """ Gets stderr of context manager. """ return self._stderr
[docs] def stdout(self) -> typing.TextIO: """ Gets stdout of context manager. """ return self._stdout
[docs] @contextlib.contextmanager def temporary_directory_change( directory: str | os.PathLike, ) -> collections.abc.Iterator[None]: """ Temporary directory change context manager. """ original_directory = os.getcwd() os.chdir(directory) try: yield finally: os.chdir(original_directory)
[docs] class Timer: """ Timer context manager. .. container:: example Prints elapsed time after timer finishes: >>> timer = abjad.contextmanagers.Timer() >>> with timer: ... for _ in range(1000000): ... x = 1 + 1 ... >>> timer.elapsed_time() # doctest: +SKIP 0.092742919921875 .. container:: example Prints elapsed time while timer is running; timers can be reused between with-blocks: >>> with abjad.contextmanagers.Timer() as timer: # doctest: +SKIP ... for _ in range(5): ... for _ in range(1000000): ... x = 1 + 1 ... print(timer.elapsed_time()) ... 0.101150989532 0.203935861588 0.304930925369 0.4057970047 0.50649189949 """ __slots__ = ( "_enter_message", "_exit_message", "_print_continuously_from_background", "_process", "_start_time", "_stop_time", "_timer_process", "_verbose", ) _is_abstract = True def __init__( self, *, enter_message: str = "", exit_message: str = "", print_continuously_from_background: bool = False, verbose: bool = True, ): assert isinstance(enter_message, str), repr(enter_message) self._enter_message = enter_message assert isinstance(exit_message, str), repr(exit_message) self._exit_message = exit_message self._print_continuously_from_background = print_continuously_from_background self._process: subprocess.Popen | None = None self._timer_process = None self._start_time: float | None = None self._stop_time: float | None = None assert isinstance(verbose, bool), repr(verbose) self._verbose = bool(verbose) def __enter__(self) -> "Timer": """ Enters context manager. """ if self.enter_message() and self.verbose(): print(self.enter_message()) self._stop_time = None self._start_time = time.time() if self.print_continuously_from_background(): path = ( _Configuration.abjad_install_directory().parent / "scripts" / "timer.py" ) interval = str(int(self.print_continuously_from_background())) process = subprocess.Popen([path, interval], shell=False) self._process = process return self def __exit__(self, exc_type, exc_value, traceback) -> None: """ Exits context manager. """ self._stop_time = time.time() if self._process is not None: self._process.kill() if self.exit_message() and self.verbose(): print(self.exit_message(), self.total_time_message())
[docs] def elapsed_time(self) -> float: """ Gets elapsed time of timer. """ start_time, stop_time = self.start_time(), self.stop_time() if start_time is not None: if stop_time is not None: return stop_time - start_time return time.time() - start_time return 0
[docs] def enter_message(self) -> str: """ Gets timer enter message. """ return self._enter_message
[docs] def exit_message(self) -> str: """ Gets timer exit message. """ return self._exit_message
[docs] def print_continuously_from_background(self) -> bool: """ Is true when timer prints continuously from background. """ return self._print_continuously_from_background
[docs] def start_time(self) -> float | None: """ Gets start time of timer. """ return self._start_time
[docs] def stop_time(self) -> float | None: """ Gets stop time of timer. """ return self._stop_time
[docs] def total_time_message(self) -> str: """ Gets total time message, truncated to nearest second. """ identifier = _string.pluralize("second", int(self.elapsed_time())) message = f"total time {int(self.elapsed_time())} {identifier} ..." return message
[docs] def verbose(self) -> bool: """ Is true when timer prints messages. """ return self._verbose