Source code for abjadext.nauert.jobhandlers

import abc
import multiprocessing
import pickle
import typing

from .quantizationjob import QuantizationJob


[docs] class JobHandler(abc.ABC): """ Abstact job-handler. ``JobHandlers`` control how ``QuantizationJob`` instances are processed by the ``quantize`` function, either serially or in parallel. """ ### CLASS VARIABLES ### __slots__ = () ### INITIALIZER ### def __init__(self): pass ### SPECIAL METHODS ###
[docs] @abc.abstractmethod def __call__(self, jobs): """ Calls job handler. """ raise NotImplementedError
[docs] class ParallelJobHandlerWorker(multiprocessing.Process): """ Parallel job-handler worker. Worker process which runs ``QuantizationJobs``. Not composer-safe. Used internally by ``ParallelJobHandler``. """ ### INITIALIZER ### def __init__(self, job_queue=None, result_queue=None): multiprocessing.Process.__init__(self) job_queue = job_queue or () result_queue = result_queue or () self.job_queue = job_queue self.result_queue = result_queue ### PUBLIC METHODS ###
[docs] def run(self): """ Runs parallel job handler worker. Returns none. """ while True: job = None if hasattr(self.job_queue, "get"): job = self.job_queue.get() if job is None: # poison pill causes worker shutdown # print '{}: Exiting'.format(process_name) assert hasattr(self.job_queue, "task_done") self.job_queue.task_done() break # print '{}: {!r}'.format(process_name, job) job = pickle.loads(job) job() self.job_queue.task_done() assert hasattr(self.result_queue, "put") self.result_queue.put(pickle.dumps(job, protocol=0)) return
[docs] class ParallelJobHandler(JobHandler): """ Parallel job-handler. Processes ``QuantizationJob`` instances in parallel, based on the number of CPUs available. """ ### CLASS VARIABLES ### __slots__ = () ### SPECIAL METHODS ###
[docs] def __call__(self, jobs): """ Calls parallel job handler. """ finished_jobs = [] job_queue = multiprocessing.JoinableQueue() result_queue = multiprocessing.Queue() workers = [ ParallelJobHandlerWorker(job_queue, result_queue) for i in range(multiprocessing.cpu_count() * 2) ] for worker in workers: worker.start() for job in jobs: job_queue.put(pickle.dumps(job, protocol=0)) for i in range(len(jobs)): finished_jobs.append(pickle.loads(result_queue.get())) for worker in workers: job_queue.put(None) job_queue.join() result_queue.close() job_queue.close() for worker in workers: worker.join() return finished_jobs
[docs] class SerialJobHandler(JobHandler): """ Serial job-handler. """ ### CLASS VARIABLES ### __slots__ = () ### SPECIAL METHODS ###
[docs] def __call__( self, jobs: typing.Sequence[QuantizationJob] ) -> typing.Sequence[QuantizationJob]: """ Calls serial job handler. Returns ``jobs``. """ for job in jobs: job() return jobs