From 54d3c4daf189ec2534e0ec2dc337080222efe9fb Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Tue, 19 Oct 2010 08:42:54 -0400 Subject: [PATCH] Add pysawsim.manager and pysawsim.manager.thread for running asynchronous jobs. Also: * simplify .gitignore and add *.pyc. * add log() and __version__ to pysawsim. * fix exception handling in pysawsim.invoke.invoke(). --- .gitignore | 32 +----- pysawsim/__init__.py | 20 ++++ pysawsim/invoke.py | 12 +-- pysawsim/manager/__init__.py | 189 +++++++++++++++++++++++++++++++++++ pysawsim/manager/thread.py | 113 +++++++++++++++++++++ 5 files changed, 332 insertions(+), 34 deletions(-) create mode 100644 pysawsim/manager/__init__.py create mode 100644 pysawsim/manager/thread.py diff --git a/.gitignore b/.gitignore index 81f15fd..24f504a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,31 +1,7 @@ # ignore the following autogenerated files: -accel_k.c -accel_k.h -global.h -k_model.c -k_model.h -k_model_utils -k_model_utils.c -list.c -list.h +*.pyc +bin/ +build/ +doc/ Makefile -parse.c -parse.h -sawsim -sawsim.aux -sawsim.bbl -sawsim.blg -sawsim.c -sawsim.log -sawsim.out -sawsim.pdf -sawsim.tex -string_eval.c -string_eval.h -tension_balance.c -tension_balance.h -tension_model.c -tension_model.h -tension_model_utils -tension_model_utils.c diff --git a/pysawsim/__init__.py b/pysawsim/__init__.py index cc179d9..f5b7e64 100644 --- a/pysawsim/__init__.py +++ b/pysawsim/__init__.py @@ -44,3 +44,23 @@ Adding tests to modules Just go crazy with doctests and unittests; nose_ will find them. """ + +import logging +import logging.handlers +import sys + + +__version__ = '0.10' # match sawsim version + +def log(): + return logging.getLogger('pysawsim') + +_log = log() +_log.setLevel(logging.DEBUG) +_console = logging.StreamHandler() +_formatter = logging.Formatter('%(name)-8s: %(levelname)-6s %(message)s') +_console.setFormatter(_formatter) +_log.addHandler(_console) +del(_log) +del(_console) +del(_formatter) diff --git a/pysawsim/invoke.py b/pysawsim/invoke.py index d7c10b5..bb051d7 100644 --- a/pysawsim/invoke.py +++ b/pysawsim/invoke.py @@ -34,7 +34,7 @@ class CommandError(Exception): self.stdout = stdout self.stderr = stderr -def invoke(cmd_string, stdin=None, expect=(0,), cwd=None, verbose=True): +def invoke(cmd_string, stdin=None, expect=(0,), cwd=None, verbose=False): """ expect should be a tuple of allowed exit codes. cwd should be the directory from which the command will be executed. @@ -51,11 +51,11 @@ def invoke(cmd_string, stdin=None, expect=(0,), cwd=None, verbose=True): q = Popen(cmd_string, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True, cwd=cwd) except OSError, e : - raise CommandError(args, status=e.args[0], stdout="", stderr=e) - output,error = q.communicate(input=stdin) + raise CommandError(cmd_string, status=e.args[0], stdout="", stderr=e) + stdout,stderr = q.communicate(input=stdin) status = q.wait() if verbose == True: - print >> sys.stderr, "%d\n%s%s" % (status, output, error) + print >> sys.stderr, "%d\n%s%s" % (status, stdout, stderr) if status not in expect: - raise CommandError(args, status, output, error) - return status, output, error + raise CommandError(cmd_string, status, stdout, stderr) + return status, stdout, stderr diff --git a/pysawsim/manager/__init__.py b/pysawsim/manager/__init__.py new file mode 100644 index 0000000..5deb701 --- /dev/null +++ b/pysawsim/manager/__init__.py @@ -0,0 +1,189 @@ +# Copyright (C) 2010 W. Trevor King +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# The author may be contacted at on the Internet, or +# write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St., +# Philadelphia PA 19104, USA. + +"""Functions for running external commands on other hosts. +""" + +from .. import invoke as invoke + + +class Job (object): + """ + >>> import copy + + Jobs execute a Python function (through an interface similar to + threading.Thread). + + >>> def job(*args, **kwargs): + ... print 'executing job with %s and %s.' % (args, kwargs) + ... return [4, 5] + >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3}) + >>> j2 = copy.deepcopy(j) + + Job status is initially `None`. + + >>> print j.status + None + + >>> j.run() + executing job with (1, 2) and {'kwargA': 3}. + + After execution, `status` and `data` are set. + + >>> j.status + 0 + >>> j.data + [4, 5] + + You can copy these attributes over to another job with `copy_onto()`. + + >>> j.copy_onto(j2) + >>> j2.status + 0 + >>> j2.data + [4, 5] + + String representations are nice and simple. + + >>> str(j) + '' + >>> repr(j) + '' + + If `target` raises an exception, it is stored in `status` (for + successful runs, the status value is `0`). + + >>> def job(): + ... raise Exception('error running job') + >>> j = Job(id=3, target=job) + >>> j.run() + >>> j.status + Exception('error running job',) + >>> print j.data + None + """ + def __init__(self, id, target, blocks_on=None, args=None, kwargs=None): + if blocks_on == None: + blocks_on = [] + if args == None: + args = [] + if kwargs == None: + kwargs = {} + self.id = id + self.target = target + self.blocks_on = blocks_on + self.args = args + self.kwargs = kwargs + self.status = None + self.data = None + + def __str__(self): + return '<%s %d>' % (self.__class__.__name__, self.id) + + def __repr__(self): + return self.__str__() + + def run(self): + try: + self.data = self.target(*self.args, **self.kwargs) + except Exception, e: + self.status = e + else: + self.status = 0 + + def copy_onto(self, other): + """Merge results of run onto initial job-request instance.""" + for attr in ['status', 'data']: + setattr(other, attr, getattr(self, attr)) + + +class InvokeJob (Job): + """Job subclass which `invoke()`\s a command line function. + + >>> q = 'What is the meaning of life, the universe, and everything?' + >>> j = InvokeJob(id=3, target='echo "%s"' % q) + >>> j.run() + >>> j.status + 0 + >>> j.data['stdout'] + 'What is the meaning of life, the universe, and everything?\\n' + >>> j.data['stderr'] + '' + + >>> j = InvokeJob(id=3, target='missing_command') + >>> j.run() + >>> print j.status + Command failed (127): + /bin/sh: missing_command: command not found + + + while executing + missing_command + >>> j.data['stdout'] + '' + >>> j.data['stderr'] + '/bin/sh: missing_command: command not found\\n' + """ + def run(self): + try: + self.status,stdout,stderr = invoke.invoke( + self.target, *self.args, **self.kwargs) + self.data = {'stdout':stdout, 'stderr':stderr} + except invoke.CommandError, e: + self.status = e + self.data = {'stdout':e.stdout, 'stderr':e.stderr} + + +class JobManager (object): + def __init__(self): + self._jobs = {} + self.next_id = 0 + + def teardown(self): + pass + + def async_invoke(self, cmd_string, blocks_on=None, *args, **kwargs): + id = self.next_id + self.next_id += 1 + job = Job(id, cmd_string, *args, **kwargs) + self._jobs[id] = job + self._spawn_job(job) + return job + + def _spawn_job(self, job): + raise NotImplementedError + + def wait(self, ids=None): + if ids == None: + ids = self._jobs.keys() + jobs = {} + for id in list(ids): # get already completed jobs + if self._jobs[id] != None: + jobs[id] = self._jobs.pop(id) + ids.remove(id) + while len(ids) > 0: # wait for outstanding jobs + job = self._receive_job() + job.copy_onto(self._jobs[job.id]) + if job.id in ids: + jobs[job.id] = self._jobs.pop(job.id) + ids.remove(id) + return jobs + + def _receive_job(self): + raise NotImplementedError diff --git a/pysawsim/manager/thread.py b/pysawsim/manager/thread.py new file mode 100644 index 0000000..cc5b05d --- /dev/null +++ b/pysawsim/manager/thread.py @@ -0,0 +1,113 @@ +# Copyright (C) 2010 W. Trevor King +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# The author may be contacted at on the Internet, or +# write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St., +# Philadelphia PA 19104, USA. + +"""Functions for running external commands on other hosts. +""" + +from Queue import Queue +import threading + +from .. import log +from . import Job, JobManager + + +CLOSE_MESSAGE = "close" + + +class WorkerThread (threading.Thread): + def __init__(self, spawn_queue, receive_queue, *args, **kwargs): + super(WorkerThread, self).__init__(*args, **kwargs) + self.spawn_queue = spawn_queue + self.receive_queue = receive_queue + + def run(self): + while True: + msg = self.spawn_queue.get() + if msg == CLOSE_MESSAGE: + break + assert isinstance(msg, Job), msg + msg.run() + self.receive_queue.put(msg) + + +class ThreadManager (JobManager): + """ + >>> t = ThreadManager() + >>> group_A = [] + >>> for i in range(10): + ... group_A.append(t.async_invoke('echo "%d"' % i)) + >>> group_B = [] + >>> for i in range(10): + ... group_B.append(t.async_invoke('echo "%d"' % i, + ... blocks_on=[j.id for j in group_A])) + >>> jobs = t.wait(ids=[j.id for j in group_A[5:8]]) + >>> print sorted(jobs.values(), key=lambda j: j.id) + [, , ] + >>> jobs = t.wait() + >>> print sorted(jobs.values(), key=lambda j: j.id) + ... # doctest: +NORMALIZE_WHITESPACE + [, , , , , , , , + , , , , , , , + , ] + >>> t.teardown() + """ + def __init__(self, worker_pool=5): + super(ThreadManager, self).__init__() + self._blocked = [] + self._spawn_queue = Queue() + self._receive_queue = Queue() + self._workers = [] + for i in range(worker_pool): + worker = WorkerThread(spawn_queue=self._spawn_queue, + receive_queue=self._receive_queue, + name='worker-%d' % i) + log().debug('start thread %s' % worker.name) + worker.start() + self._workers.append(worker) + + def teardown(self): + for worker in self._workers: + self._spawn_queue.put(CLOSE_MESSAGE) + for worker in self._workers: + log().debug('join thread %s' % worker.name) + worker.join() + super(ThreadManager, self).teardown() + + def _job_is_blocked(self, job): + for id in job.blocks_on: + if id in self._jobs and self.jobs[id].status == None: + return True + return False + + def _spawn_job(self, job): + if self._job_is_blocked(job): + self._blocked.append(job) + log().debug('queue job %s' % job) + self._spawn_queue.put(job) + + def _receive_job(self): + job = self._receive_queue.get() + for j in self._blocked: + if job.id in j.blocks_on: + if not self._job_is_blocked(j): + self._blocked.remove(j) + log().debug('queue job %s' % j) + self._spawn_queue.put(j) + log().debug('receive job %s' % job) + return job -- 2.26.2