import collections
import filecmp
import os
import pathlib
import shutil
import subprocess
import sys
import tempfile
import time
from . import _updatelib
from . import configuration as _configuration
from . import iterate as _iterate
from . import string as _string
configuration = _configuration.Configuration()
[docs]
class ContextManager:
"""
An abstract context manager class.
"""
__slots__ = ()
_is_abstract = True
[docs]
def __repr__(self) -> str:
"""
Gets repr.
"""
return f"<{type(self).__name__}()>"
[docs]
class FilesystemState(ContextManager):
"""
Filesystem state context manager.
"""
__documentation_section__ = "Context managers"
__slots__ = ("_keep", "_remove")
def __init__(self, keep=None, remove=None):
keep = keep or []
assert isinstance(keep, collections.abc.Iterable), repr(keep)
keep = tuple([str(_) for _ in keep])
self._keep = keep
remove = remove or []
assert isinstance(remove, collections.abc.Iterable), repr(remove)
remove = tuple([str(_) for _ in remove])
self._remove = remove
[docs]
def __enter__(self) -> "FilesystemState":
"""
Backs up filesystem assets.
"""
for path in self.remove:
assert not os.path.exists(path), repr(path)
for path in self.keep:
assert os.path.exists(path), repr(path)
assert os.path.isfile(path) or os.path.isdir(path), repr(path)
for path in self.keep:
backup_path = path + ".backup"
if os.path.isfile(path):
shutil.copyfile(path, backup_path)
assert filecmp.cmp(path, backup_path), repr(path)
elif os.path.isdir(path):
shutil.copytree(path, backup_path)
else:
raise TypeError(f"neither file nor directory: {path}.")
return self
[docs]
def __exit__(self, exg_type, exc_value, trackeback) -> None:
"""
Restores filesytem assets and removes backups; also removes paths in remove list.
"""
backup_paths = (_ + ".backup" for _ in self.keep)
for path in backup_paths:
assert os.path.exists(path), repr(path)
for path in self.keep:
backup_path = path + ".backup"
assert os.path.exists(backup_path), repr(backup_path)
if os.path.isfile(backup_path):
shutil.copyfile(backup_path, path)
filecmp.cmp(path, backup_path)
os.remove(backup_path)
elif os.path.isdir(backup_path):
if os.path.exists(path):
shutil.rmtree(path)
shutil.copytree(backup_path, path)
shutil.rmtree(backup_path)
else:
raise TypeError(f"neither file nor directory: {path}.")
for path in self.remove:
if os.path.exists(path):
if os.path.isfile(path):
os.remove(path)
elif os.path.isdir(path):
shutil.rmtree(path)
else:
raise TypeError(f"neither file nor directory: {path}.")
for path in self.keep:
assert os.path.exists(path), repr(path)
for path in backup_paths:
assert not os.path.exists(path), repr(path)
@property
def keep(self):
"""
Gets asset paths to restore on exit.
Returns tuple.
"""
return self._keep
@property
def remove(self):
"""
Gets paths to remove on exit.
Returns tuple.
"""
return self._remove
[docs]
class ForbidUpdate(ContextManager):
r"""
A context manager for forbidding score updates.
.. container:: example
>>> staff = abjad.Staff("c'8 d'8 ~ d'2 e'4")
>>> with abjad.ForbidUpdate(component=staff):
... for note in staff[:]:
... pitch_1 = note.written_pitch
... pitch_2 = pitch_1 + abjad.NamedInterval('M3')
... pitches = [pitch_1, pitch_2]
... chord = abjad.Chord(pitches, note.written_duration)
... abjad.mutate.replace(note, chord)
...
>>> abjad.wf.wellformed(staff)
True
>>> abjad.show(staff) # doctest: +SKIP
.. docs::
>>> string = abjad.lilypond(staff)
>>> print(string)
\new Staff
{
<c' e'>8
<d' fs'>8
<d' fs'>2
<e' gs'>4
}
"""
### CLASS VARIABLES ###
__documentation_section__ = "Context managers"
__slots__ = ("_component", "_update_on_enter", "_update_on_exit")
### INITIALIZER ###
def __init__(self, component=None, update_on_enter=True, update_on_exit=None):
if component is not None:
assert hasattr(component, "_timespan"), repr(component)
self._component = component
if update_on_enter is not None:
update_on_enter = bool(update_on_enter)
self._update_on_enter = update_on_enter
if update_on_exit is not None:
update_on_exit = bool(update_on_exit)
self._update_on_exit = update_on_exit
### SPECIAL METHODS ###
[docs]
def __enter__(self) -> "ForbidUpdate":
r"""
Enters context manager.
.. container:: example
REGRESSION. Indicators need to be updated after swap; context
manager updates indicators before forbidding further updates:
>>> staff = abjad.Staff(r"\times 1/1 { c'4 d' }")
>>> abjad.attach(abjad.Clef("alto"), staff[0][0])
>>> container = abjad.Container()
>>> abjad.mutate.swap(staff[0], container)
>>> with abjad.ForbidUpdate(staff):
... for note in staff[0]:
... print(note)
... print(abjad.get.effective(note, abjad.Clef))
...
Note("c'4")
Clef(name='alto', hide=False)
Note("d'4")
Clef(name='alto', hide=False)
"""
if self.component is not None:
for component_ in _iterate.components(self.component):
_updatelib._update_now(
component_, indicators=True, offsets=True, offsets_in_seconds=True
)
self.component._is_forbidden_to_update = True
return self
[docs]
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""
Exits context manager.
"""
if self.component is not None:
self.component._is_forbidden_to_update = False
if self.update_on_exit:
for component_ in _iterate.components(self.component):
_updatelib._update_now(
component_,
indicators=True,
offsets=True,
offsets_in_seconds=True,
)
### PUBLIC PROPERTIES ###
@property
def component(self):
"""
Gets component.
Set to component or none.
Returns component or none.
"""
return self._component
@property
def update_on_enter(self) -> bool | None:
"""
Is true when context manager should update offsets on enter.
Set to true, false or none.
"""
return self._update_on_enter
@property
def update_on_exit(self) -> bool | None:
"""
Is true when context manager should update offsets on exit.
Set to true, false or none.
"""
return self._update_on_exit
[docs]
class NullContextManager(ContextManager):
"""
A context manager that does nothing.
"""
__documentation_section__ = "Context managers"
__slots__ = ()
def __init__(self):
pass
[docs]
def __enter__(self) -> None:
"""
Enters context manager and does nothing.
"""
pass
[docs]
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""
Exits context manager and does nothing.
"""
pass
[docs]
class ProgressIndicator(ContextManager):
"""
A context manager for printing progress indications.
"""
### CLASS VARIABLES ###
__documentation_section__ = "Context managers"
__slots__ = ("_is_warning", "_message", "_progress", "_total", "_verbose")
RED = "\033[91m"
END = "\033[0m"
### INITIALIZER ###
def __init__(self, message="", total=None, verbose=True, is_warning=None):
self._message = message
self._progress = 0
self._total = total
self._verbose = bool(verbose)
self._is_warning = bool(is_warning)
### SPECIAL METHODS ###
[docs]
def __enter__(self) -> "ProgressIndicator":
"""
Enters progress indicator.
"""
self._print()
return self
[docs]
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""
Exits progress indicator.
"""
if self.verbose:
print()
[docs]
def __repr__(self) -> str:
"""
Gets interpreter representation of context manager.
.. container:: example
>>> context_manager = abjad.ProgressIndicator()
>>> context_manager
<ProgressIndicator()>
"""
return f"<{type(self).__name__}()>"
### PRIVATE METHODS ###
def _print(self):
if not self.verbose:
return
message = self.message or "Progress"
if self.total is not None:
message = f"{message}: {self.progress} / {self.total}"
else:
message = f"{message}: {self.progress}"
if self.is_warning and self.progress:
message = self.RED + message + self.END
print(message, end="")
### PUBLIC METHODS ###
[docs]
def advance(self):
"""
Advances the progress indicator's progress count. Overwrites
the current terminal line with the progress indicators message and new
count.
"""
self._progress += 1
if self.verbose:
sys.stdout.flush()
print("\r", end="")
self._print()
### PUBLIC PROPERTIES ###
@property
def is_warning(self) -> bool:
"""
Is true if progress indicator prints in red when its progress goes above zero.
"""
return self._is_warning
@property
def message(self) -> str:
"""
Gets message of progress indicator.
"""
return self._message
@property
def progress(self) -> int:
"""
Gets progress.
"""
return self._progress
@property
def total(self) -> int | None:
"""
Gets total count.
"""
return self._total
@property
def verbose(self) -> bool:
"""
Is true if progress indicator prints status.
"""
return self._verbose
[docs]
class RedirectedStreams(ContextManager):
"""
A context manager for capturing stdout and stderr output.
.. container:: example
>>> abjad.RedirectedStreams()
<RedirectedStreams()>
>>> from io import StringIO
>>> string_io = StringIO()
>>> with abjad.RedirectedStreams(stdout=string_io):
... print("hello, world!")
...
>>> result = string_io.getvalue()
>>> string_io.close()
>>> print(result)
hello, world!
<BLANKLINE>
"""
### CLASS VARIABLES ###
__documentation_section__ = "Context managers"
__slots__ = ("_stdout", "_stderr", "_old_stderr", "_old_stdout")
### INITIALIZER ###
def __init__(self, stdout=None, stderr=None):
self._stdout = stdout or sys.stdout
self._stderr = stderr or sys.stderr
### SPECIAL METHODS ###
[docs]
def __enter__(self) -> "RedirectedStreams":
"""
Enters redirected streams context manager.
"""
self._old_stdout, self._old_stderr = sys.stdout, sys.stderr
self._old_stdout.flush()
self._old_stderr.flush()
sys.stdout, sys.stderr = self._stdout, self._stderr
return self
[docs]
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""
Exits redirected streams context manager.
"""
try:
self._stdout.flush()
self._stderr.flush()
except Exception:
pass
sys.stdout = self._old_stdout
sys.stderr = self._old_stderr
[docs]
def __repr__(self) -> str:
"""
Gets interpreter representation of context manager.
.. container:: example
>>> context_manager = abjad.RedirectedStreams()
>>> context_manager
<RedirectedStreams()>
"""
return super().__repr__()
@property
def stderr(self):
"""
Gets stderr of context manager.
"""
return self._stderr
@property
def stdout(self):
"""
Gets stdout of context manager.
"""
return self._stdout
[docs]
class TemporaryDirectory(ContextManager):
"""
A temporary directory context manager.
"""
### CLASS VARIABLES ###
__documentation_section__ = "Context managers"
__slots__ = ("_parent_directory", "_temporary_directory")
### INITIALIZER ###
def __init__(self, parent_directory=None):
self._parent_directory = parent_directory
self._temporary_directory = None
### SPECIAL METHODS ###
[docs]
def __enter__(self):
"""
Enters context manager.
Creates and returns path to a temporary directory.
"""
self._temporary_directory = tempfile.mkdtemp(dir=self.parent_directory)
return self._temporary_directory
[docs]
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""
Exits context manager.
Deletes previously created temporary directory.
"""
shutil.rmtree(self._temporary_directory)
### PUBLIC PROPERTIES ###
@property
def parent_directory(self) -> str:
"""
Gets parent directory.
"""
return self._parent_directory
@property
def temporary_directory(self) -> str:
"""
Gets temporary directory.
"""
return self._temporary_directory
[docs]
class TemporaryDirectoryChange(ContextManager):
"""
A context manager for temporarily changing the current working directory.
"""
### CLASS VARIABLES ###
__documentation_section__ = "Context managers"
__slots__ = ("_directory", "_original_directory", "_verbose")
### INITIALIZER ###
def __init__(self, directory=None, verbose=None):
if directory is None:
pass
elif isinstance(directory, pathlib.Path):
directory = str(directory)
elif os.path.isdir(directory):
pass
elif os.path.isfile(directory):
directory = os.path.dirname(directory)
self._directory = directory
self._original_directory = None
if verbose is not None:
verbose = bool(verbose)
self._verbose = bool(verbose)
### SPECIAL METHODS ###
[docs]
def __enter__(self) -> "TemporaryDirectoryChange":
"""
Enters context manager and changes to ``directory``.
"""
self._original_directory = os.getcwd()
if self._directory is not None:
os.chdir(self.directory)
if self.verbose:
message = f"Changing directory to {self.directory} ..."
print(message)
return self
[docs]
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""
Exits context manager and returns to original working directory.
"""
if self._directory is not None:
os.chdir(self._original_directory)
if self.verbose:
message = f"Returning to {self.original_directory} ..."
print(message)
self._original_directory = None
[docs]
def __repr__(self) -> str:
"""
Gets interpreter representation of context manager.
"""
return f"<{type(self).__name__}()>"
### PUBLIC PROPERTIES ###
@property
def directory(self) -> str:
"""
Gets temporary directory of context manager.
"""
return self._directory
@property
def original_directory(self) -> str:
"""
Gets original directory of context manager.
"""
return self._original_directory
@property
def verbose(self) -> bool:
"""
Is true if context manager prints verbose messages on entrance and exit.
"""
return self._verbose
[docs]
class Timer(ContextManager):
"""
A timing context manager.
.. container:: example
>>> timer = abjad.Timer()
>>> with timer:
... for _ in range(1000000):
... x = 1 + 1
...
>>> timer.elapsed_time # doctest: +SKIP
0.092742919921875
The timer can also be accessed from within the ``with`` block:
>>> with abjad.Timer() as timer: # doctest: +SKIP
... for _ in range(5):
... for _ in range(1000000):
... x = 1 + 1
... print(timer.elapsed_time)
...
0.101150989532
0.203935861588
0.304930925369
0.4057970047
0.50649189949
Timers can be reused between ``with`` blocks. They will reset their clock
on entering any ``with`` block.
"""
### CLASS VARIABLES ###
__documentation_section__ = "Context managers"
__slots__ = (
"_enter_message",
"_exit_message",
"_print_continuously_from_background",
"_process",
"_start_time",
"_stop_time",
"_timer_process",
"_verbose",
)
### INITIALIZER ###
def __init__(
self,
exit_message=None,
enter_message=None,
print_continuously_from_background=False,
verbose=True,
):
if enter_message is not None:
enter_message = str(enter_message)
self._enter_message = enter_message
if exit_message is not None:
exit_message = str(exit_message)
self._exit_message = exit_message
self._print_continuously_from_background = print_continuously_from_background
self._process = None
self._timer_process = None
self._start_time = None
self._stop_time = None
self._verbose = bool(verbose)
### SPECIAL METHODS ###
[docs]
def __enter__(self) -> "Timer":
"""
Enters context manager.
"""
if self.enter_message and self.verbose:
print(self.enter_message)
self._stop_time = None
self._start_time = time.time()
if self.print_continuously_from_background:
path = configuration.abjad_directory.parent / "scr" / "timer"
interval = str(int(self.print_continuously_from_background))
process = subprocess.Popen([path, interval], shell=False)
self._process = process
return self
[docs]
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""
Exit context manager.
"""
self._stop_time = time.time()
if self._process is not None:
self._process.kill()
if self.exit_message and self.verbose:
print(self.exit_message, self.elapsed_time)
### PUBLIC PROPERTIES ###
@property
def elapsed_time(self) -> float | None:
"""
Elapsed time.
"""
if self.start_time is not None:
if self.stop_time is not None:
return self.stop_time - self.start_time
return time.time() - self.start_time
return None
@property
def enter_message(self) -> str:
"""
Timer enter message.
"""
return self._enter_message
@property
def exit_message(self) -> str:
"""
Timer exit message.
"""
return self._exit_message
@property
def print_continuously_from_background(self) -> bool:
"""
Is true when timer should print continuously from background.
"""
return self._print_continuously_from_background
@property
def start_time(self):
"""
Start time of timer.
Returns time.
"""
return self._start_time
@property
def stop_time(self):
"""
Stop time of timer.
Returns time.
"""
return self._stop_time
@property
def total_time_message(self) -> str:
"""
Gets total time message.
Truncated to the nearest second.
"""
assert self.elapsed_time is not None
identifier = _string.pluralize("second", int(self.elapsed_time))
message = f"total time {int(self.elapsed_time)} {identifier} ..."
return message
@property
def verbose(self) -> bool:
"""
Is true if timer should print messages.
"""
return self._verbose