Source code for abjad.contextmanagers

"""
Context managers.
"""

import contextlib
import filecmp
import os
import pathlib
import shutil
import subprocess
import sys
import time
import types
import typing

from . import _updatelib
from . import configuration as _configuration
from . import iterate as _iterate
from . import score as _score
from . import string as _string

_Configuration = _configuration.Configuration()


[docs] class FilesystemState: """ Filesystem state context manager. """ __slots__ = ("_keep", "_remove") _is_abstract = True def __init__( self, *, keep: list[pathlib.Path] | None = None, remove: list[pathlib.Path] | None = None, ): keep = keep or [] keep_tuple = tuple([str(_) for _ in keep]) self._keep = keep_tuple remove = remove or [] remove_tuple = tuple([str(_) for _ in remove]) self._remove = remove_tuple def __enter__(self) -> "FilesystemState": """ Backs up filesystem paths. """ for path in self.remove(): assert not os.path.exists(path), repr(path) for path in self.keep(): assert os.path.exists(path), repr(path) assert os.path.isfile(path) or os.path.isdir(path), repr(path) for path in self.keep(): backup_path = path + ".backup" if os.path.isfile(path): shutil.copyfile(path, backup_path) assert filecmp.cmp(path, backup_path), repr(path) elif os.path.isdir(path): shutil.copytree(path, backup_path) else: raise TypeError(f"neither file nor directory: {path}.") return self def __exit__(self, exg_type, exc_value, trackeback) -> None: """ Restores filesytem paths and removes backups; also removes paths in remove list. """ backup_paths = (_ + ".backup" for _ in self.keep()) for path in backup_paths: assert os.path.exists(path), repr(path) for path in self.keep(): backup_path = path + ".backup" assert os.path.exists(backup_path), repr(backup_path) if os.path.isfile(backup_path): shutil.copyfile(backup_path, path) filecmp.cmp(path, backup_path) os.remove(backup_path) elif os.path.isdir(backup_path): if os.path.exists(path): shutil.rmtree(path) shutil.copytree(backup_path, path) shutil.rmtree(backup_path) else: raise TypeError(f"neither file nor directory: {path}.") for path in self.remove(): if os.path.exists(path): if os.path.isfile(path): os.remove(path) elif os.path.isdir(path): shutil.rmtree(path) else: raise TypeError(f"neither file nor directory: {path}.") for path in self.keep(): assert os.path.exists(path), repr(path) for path in backup_paths: assert not os.path.exists(path), repr(path)
[docs] def keep(self) -> tuple[str, ...]: """ Gets paths to restore on exit. """ return self._keep
[docs] def remove(self) -> tuple[str, ...]: """ Gets paths to remove on exit. """ return self._remove
[docs] class ForbidUpdate: """ Forbid update context manager. """ __slots__ = ("_component", "_update_on_exit") _is_abstract = True def __init__(self, component: _score.Component, *, update_on_exit: bool = False): assert isinstance(component, _score.Component), repr(component) self._component = component assert isinstance(update_on_exit, bool), repr(update_on_exit) self._update_on_exit = update_on_exit def __enter__(self) -> "ForbidUpdate": r""" Enters context manager. .. container:: example REGRESSION. Indicators need to be updated after swap; context manager updates indicators before forbidding further updates: >>> staff = abjad.Staff(r"\tuplet 1/1 { c'4 d' }") >>> abjad.attach(abjad.Clef("alto"), staff[0][0]) >>> container = abjad.Container() >>> abjad.mutate.swap(staff[0], container) >>> with abjad.contextmanagers.ForbidUpdate(staff): ... for note in staff[0]: ... print(note) ... print(abjad.get.effective_indicator(note, abjad.Clef)) ... Note("c'4") Clef(name='alto') Note("d'4") Clef(name='alto') """ for component_ in _iterate.components(self.component()): _updatelib._update_now( component_, indicators=True, offsets=True, offsets_in_seconds=True ) self.component()._is_forbidden_to_update = True return self def __exit__(self, exc_type, exc_value, traceback) -> None: """ Exits context manager. """ self.component()._is_forbidden_to_update = False if self.update_on_exit() is True: for component_ in _iterate.components(self.component()): _updatelib._update_now( component_, indicators=True, offsets=True, offsets_in_seconds=True, ) else: assert self.update_on_exit() is False
[docs] def component(self) -> _score.Component: """ Gets component. """ return self._component
[docs] def update_on_exit(self) -> bool: """ Is true when context manager updates offsets on exit. """ return self._update_on_exit
[docs] class NullContextManager: """ Null context manager. """ __slots__ = () _is_abstract = True def __init__(self): pass def __enter__(self) -> None: """ Enters context manager and does nothing. """ pass def __exit__(self, exc_type, exc_value, traceback) -> None: """ Exits context manager and does nothing. """ pass
[docs] class RedirectedStreams: """ Redirected streams context manager. .. container:: example Captures stdout and stderr output: >>> import io >>> string_io = io.StringIO() >>> with abjad.contextmanagers.RedirectedStreams(stdout=string_io): ... print("hello, world!") ... >>> result = string_io.getvalue() >>> string_io.close() >>> print(result) hello, world! <BLANKLINE> """ __slots__ = ("_stdout", "_stderr", "_old_stderr", "_old_stdout") _is_abstract = True def __init__( self, *, stdout: typing.TextIO | None = None, stderr: typing.TextIO | None = None, ): if stdout is None: self._stdout = sys.stdout else: self._stdout = stdout if stderr is None: self._stderr = sys.stderr else: self._stderr = stderr def __enter__(self) -> "RedirectedStreams": """ Enters redirected streams context manager. """ self._old_stdout, self._old_stderr = sys.stdout, sys.stderr self._old_stdout.flush() self._old_stderr.flush() sys.stdout, sys.stderr = self.stdout(), self.stderr() return self def __exit__( self, exc_type: typing.Type[BaseException] | None, exc_value: BaseException | None, traceback: types.TracebackType | None, ) -> None: """ Exits redirected streams context manager. """ try: self.stdout().flush() self.stderr().flush() except Exception: pass sys.stdout = self._old_stdout sys.stderr = self._old_stderr
[docs] def stderr(self) -> typing.TextIO: """ Gets stderr of context manager. """ return self._stderr
[docs] def stdout(self) -> typing.TextIO: """ Gets stdout of context manager. """ return self._stdout
[docs] @contextlib.contextmanager def temporary_directory_change(directory: str | os.PathLike) -> typing.Iterator[None]: """ Temporary directory change context manager. """ original_directory = os.getcwd() os.chdir(directory) try: yield finally: os.chdir(original_directory)
[docs] class Timer: """ Timer context manager. .. container:: example Prints elapsed time after timer finishes: >>> timer = abjad.contextmanagers.Timer() >>> with timer: ... for _ in range(1000000): ... x = 1 + 1 ... >>> timer.elapsed_time() # doctest: +SKIP 0.092742919921875 .. container:: example Prints elapsed time while timer is running; timers can be reused between with-blocks: >>> with abjad.contextmanagers.Timer() as timer: # doctest: +SKIP ... for _ in range(5): ... for _ in range(1000000): ... x = 1 + 1 ... print(timer.elapsed_time()) ... 0.101150989532 0.203935861588 0.304930925369 0.4057970047 0.50649189949 """ __slots__ = ( "_enter_message", "_exit_message", "_print_continuously_from_background", "_process", "_start_time", "_stop_time", "_timer_process", "_verbose", ) _is_abstract = True def __init__( self, *, enter_message: str = "", exit_message: str = "", print_continuously_from_background: bool = False, verbose: bool = True, ): assert isinstance(enter_message, str), repr(enter_message) self._enter_message = enter_message assert isinstance(exit_message, str), repr(exit_message) self._exit_message = exit_message self._print_continuously_from_background = print_continuously_from_background self._process: subprocess.Popen | None = None self._timer_process = None self._start_time: float | None = None self._stop_time: float | None = None assert isinstance(verbose, bool), repr(verbose) self._verbose = bool(verbose) def __enter__(self) -> "Timer": """ Enters context manager. """ if self.enter_message() and self.verbose(): print(self.enter_message()) self._stop_time = None self._start_time = time.time() if self.print_continuously_from_background(): path = ( _Configuration.abjad_install_directory().parent / "scripts" / "timer.py" ) interval = str(int(self.print_continuously_from_background())) process = subprocess.Popen([path, interval], shell=False) self._process = process return self def __exit__(self, exc_type, exc_value, traceback) -> None: """ Exits context manager. """ self._stop_time = time.time() if self._process is not None: self._process.kill() if self.exit_message() and self.verbose(): print(self.exit_message(), self.total_time_message())
[docs] def elapsed_time(self) -> float: """ Gets elapsed time of timer. """ start_time, stop_time = self.start_time(), self.stop_time() if start_time is not None: if stop_time is not None: return stop_time - start_time return time.time() - start_time return 0
[docs] def enter_message(self) -> str: """ Gets timer enter message. """ return self._enter_message
[docs] def exit_message(self) -> str: """ Gets timer exit message. """ return self._exit_message
[docs] def print_continuously_from_background(self) -> bool: """ Is true when timer prints continuously from background. """ return self._print_continuously_from_background
[docs] def start_time(self) -> float | None: """ Gets start time of timer. """ return self._start_time
[docs] def stop_time(self) -> float | None: """ Gets stop time of timer. """ return self._stop_time
[docs] def total_time_message(self) -> str: """ Gets total time message, truncated to nearest second. """ identifier = _string.pluralize("second", int(self.elapsed_time())) message = f"total time {int(self.elapsed_time())} {identifier} ..." return message
[docs] def verbose(self) -> bool: """ Is true when timer prints messages. """ return self._verbose