# ignore the following autogenerated files:
-accel_k.c
-accel_k.h
-global.h
-k_model.c
-k_model.h
-k_model_utils
-k_model_utils.c
-list.c
-list.h
+*.pyc
+bin/
+build/
+doc/
Makefile
-parse.c
-parse.h
-sawsim
-sawsim.aux
-sawsim.bbl
-sawsim.blg
-sawsim.c
-sawsim.log
-sawsim.out
-sawsim.pdf
-sawsim.tex
-string_eval.c
-string_eval.h
-tension_balance.c
-tension_balance.h
-tension_model.c
-tension_model.h
-tension_model_utils
-tension_model_utils.c
Just go crazy with doctests and unittests; nose_ will find them.
"""
+
+import logging
+import logging.handlers
+import sys
+
+
+__version__ = '0.10' # match sawsim version
+
+def log():
+ return logging.getLogger('pysawsim')
+
+_log = log()
+_log.setLevel(logging.DEBUG)
+_console = logging.StreamHandler()
+_formatter = logging.Formatter('%(name)-8s: %(levelname)-6s %(message)s')
+_console.setFormatter(_formatter)
+_log.addHandler(_console)
+del(_log)
+del(_console)
+del(_formatter)
self.stdout = stdout
self.stderr = stderr
-def invoke(cmd_string, stdin=None, expect=(0,), cwd=None, verbose=True):
+def invoke(cmd_string, stdin=None, expect=(0,), cwd=None, verbose=False):
"""
expect should be a tuple of allowed exit codes. cwd should be
the directory from which the command will be executed.
q = Popen(cmd_string, stdin=PIPE, stdout=PIPE, stderr=PIPE,
shell=True, cwd=cwd)
except OSError, e :
- raise CommandError(args, status=e.args[0], stdout="", stderr=e)
- output,error = q.communicate(input=stdin)
+ raise CommandError(cmd_string, status=e.args[0], stdout="", stderr=e)
+ stdout,stderr = q.communicate(input=stdin)
status = q.wait()
if verbose == True:
- print >> sys.stderr, "%d\n%s%s" % (status, output, error)
+ print >> sys.stderr, "%d\n%s%s" % (status, stdout, stderr)
if status not in expect:
- raise CommandError(args, status, output, error)
- return status, output, error
+ raise CommandError(cmd_string, status, stdout, stderr)
+ return status, stdout, stderr
--- /dev/null
+# Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# The author may be contacted at <wking@drexel.edu> on the Internet, or
+# write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
+# Philadelphia PA 19104, USA.
+
+"""Functions for running external commands on other hosts.
+"""
+
+from .. import invoke as invoke
+
+
+class Job (object):
+ """
+ >>> import copy
+
+ Jobs execute a Python function (through an interface similar to
+ threading.Thread).
+
+ >>> def job(*args, **kwargs):
+ ... print 'executing job with %s and %s.' % (args, kwargs)
+ ... return [4, 5]
+ >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
+ >>> j2 = copy.deepcopy(j)
+
+ Job status is initially `None`.
+
+ >>> print j.status
+ None
+
+ >>> j.run()
+ executing job with (1, 2) and {'kwargA': 3}.
+
+ After execution, `status` and `data` are set.
+
+ >>> j.status
+ 0
+ >>> j.data
+ [4, 5]
+
+ You can copy these attributes over to another job with `copy_onto()`.
+
+ >>> j.copy_onto(j2)
+ >>> j2.status
+ 0
+ >>> j2.data
+ [4, 5]
+
+ String representations are nice and simple.
+
+ >>> str(j)
+ '<Job 3>'
+ >>> repr(j)
+ '<Job 3>'
+
+ If `target` raises an exception, it is stored in `status` (for
+ successful runs, the status value is `0`).
+
+ >>> def job():
+ ... raise Exception('error running job')
+ >>> j = Job(id=3, target=job)
+ >>> j.run()
+ >>> j.status
+ Exception('error running job',)
+ >>> print j.data
+ None
+ """
+ def __init__(self, id, target, blocks_on=None, args=None, kwargs=None):
+ if blocks_on == None:
+ blocks_on = []
+ if args == None:
+ args = []
+ if kwargs == None:
+ kwargs = {}
+ self.id = id
+ self.target = target
+ self.blocks_on = blocks_on
+ self.args = args
+ self.kwargs = kwargs
+ self.status = None
+ self.data = None
+
+ def __str__(self):
+ return '<%s %d>' % (self.__class__.__name__, self.id)
+
+ def __repr__(self):
+ return self.__str__()
+
+ def run(self):
+ try:
+ self.data = self.target(*self.args, **self.kwargs)
+ except Exception, e:
+ self.status = e
+ else:
+ self.status = 0
+
+ def copy_onto(self, other):
+ """Merge results of run onto initial job-request instance."""
+ for attr in ['status', 'data']:
+ setattr(other, attr, getattr(self, attr))
+
+
+class InvokeJob (Job):
+ """Job subclass which `invoke()`\s a command line function.
+
+ >>> q = 'What is the meaning of life, the universe, and everything?'
+ >>> j = InvokeJob(id=3, target='echo "%s"' % q)
+ >>> j.run()
+ >>> j.status
+ 0
+ >>> j.data['stdout']
+ 'What is the meaning of life, the universe, and everything?\\n'
+ >>> j.data['stderr']
+ ''
+
+ >>> j = InvokeJob(id=3, target='missing_command')
+ >>> j.run()
+ >>> print j.status
+ Command failed (127):
+ /bin/sh: missing_command: command not found
+ <BLANKLINE>
+ <BLANKLINE>
+ while executing
+ missing_command
+ >>> j.data['stdout']
+ ''
+ >>> j.data['stderr']
+ '/bin/sh: missing_command: command not found\\n'
+ """
+ def run(self):
+ try:
+ self.status,stdout,stderr = invoke.invoke(
+ self.target, *self.args, **self.kwargs)
+ self.data = {'stdout':stdout, 'stderr':stderr}
+ except invoke.CommandError, e:
+ self.status = e
+ self.data = {'stdout':e.stdout, 'stderr':e.stderr}
+
+
+class JobManager (object):
+ def __init__(self):
+ self._jobs = {}
+ self.next_id = 0
+
+ def teardown(self):
+ pass
+
+ def async_invoke(self, cmd_string, blocks_on=None, *args, **kwargs):
+ id = self.next_id
+ self.next_id += 1
+ job = Job(id, cmd_string, *args, **kwargs)
+ self._jobs[id] = job
+ self._spawn_job(job)
+ return job
+
+ def _spawn_job(self, job):
+ raise NotImplementedError
+
+ def wait(self, ids=None):
+ if ids == None:
+ ids = self._jobs.keys()
+ jobs = {}
+ for id in list(ids): # get already completed jobs
+ if self._jobs[id] != None:
+ jobs[id] = self._jobs.pop(id)
+ ids.remove(id)
+ while len(ids) > 0: # wait for outstanding jobs
+ job = self._receive_job()
+ job.copy_onto(self._jobs[job.id])
+ if job.id in ids:
+ jobs[job.id] = self._jobs.pop(job.id)
+ ids.remove(id)
+ return jobs
+
+ def _receive_job(self):
+ raise NotImplementedError
--- /dev/null
+# Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# The author may be contacted at <wking@drexel.edu> on the Internet, or
+# write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
+# Philadelphia PA 19104, USA.
+
+"""Functions for running external commands on other hosts.
+"""
+
+from Queue import Queue
+import threading
+
+from .. import log
+from . import Job, JobManager
+
+
+CLOSE_MESSAGE = "close"
+
+
+class WorkerThread (threading.Thread):
+ def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
+ super(WorkerThread, self).__init__(*args, **kwargs)
+ self.spawn_queue = spawn_queue
+ self.receive_queue = receive_queue
+
+ def run(self):
+ while True:
+ msg = self.spawn_queue.get()
+ if msg == CLOSE_MESSAGE:
+ break
+ assert isinstance(msg, Job), msg
+ msg.run()
+ self.receive_queue.put(msg)
+
+
+class ThreadManager (JobManager):
+ """
+ >>> t = ThreadManager()
+ >>> group_A = []
+ >>> for i in range(10):
+ ... group_A.append(t.async_invoke('echo "%d"' % i))
+ >>> group_B = []
+ >>> for i in range(10):
+ ... group_B.append(t.async_invoke('echo "%d"' % i,
+ ... blocks_on=[j.id for j in group_A]))
+ >>> jobs = t.wait(ids=[j.id for j in group_A[5:8]])
+ >>> print sorted(jobs.values(), key=lambda j: j.id)
+ [<Job 5>, <Job 6>, <Job 7>]
+ >>> jobs = t.wait()
+ >>> print sorted(jobs.values(), key=lambda j: j.id)
+ ... # doctest: +NORMALIZE_WHITESPACE
+ [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
+ <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
+ <Job 18>, <Job 19>]
+ >>> t.teardown()
+ """
+ def __init__(self, worker_pool=5):
+ super(ThreadManager, self).__init__()
+ self._blocked = []
+ self._spawn_queue = Queue()
+ self._receive_queue = Queue()
+ self._workers = []
+ for i in range(worker_pool):
+ worker = WorkerThread(spawn_queue=self._spawn_queue,
+ receive_queue=self._receive_queue,
+ name='worker-%d' % i)
+ log().debug('start thread %s' % worker.name)
+ worker.start()
+ self._workers.append(worker)
+
+ def teardown(self):
+ for worker in self._workers:
+ self._spawn_queue.put(CLOSE_MESSAGE)
+ for worker in self._workers:
+ log().debug('join thread %s' % worker.name)
+ worker.join()
+ super(ThreadManager, self).teardown()
+
+ def _job_is_blocked(self, job):
+ for id in job.blocks_on:
+ if id in self._jobs and self.jobs[id].status == None:
+ return True
+ return False
+
+ def _spawn_job(self, job):
+ if self._job_is_blocked(job):
+ self._blocked.append(job)
+ log().debug('queue job %s' % job)
+ self._spawn_queue.put(job)
+
+ def _receive_job(self):
+ job = self._receive_queue.get()
+ for j in self._blocked:
+ if job.id in j.blocks_on:
+ if not self._job_is_blocked(j):
+ self._blocked.remove(j)
+ log().debug('queue job %s' % j)
+ self._spawn_queue.put(j)
+ log().debug('receive job %s' % job)
+ return job