Add pysawsim.manager and pysawsim.manager.thread for running asynchronous jobs.
authorW. Trevor King <wking@drexel.edu>
Tue, 19 Oct 2010 12:42:54 +0000 (08:42 -0400)
committerW. Trevor King <wking@drexel.edu>
Tue, 19 Oct 2010 12:42:54 +0000 (08:42 -0400)
Also:
* simplify .gitignore and add *.pyc.
* add log() and __version__ to pysawsim.
* fix exception handling in pysawsim.invoke.invoke().

.gitignore
pysawsim/__init__.py
pysawsim/invoke.py
pysawsim/manager/__init__.py [new file with mode: 0644]
pysawsim/manager/thread.py [new file with mode: 0644]

index 81f15fdf98cbc96a11a85c391b3805380a0de271..24f504a4cb66015e7082007a7ac1fdd144a4267d 100644 (file)
@@ -1,31 +1,7 @@
 # ignore the following autogenerated files:
 
-accel_k.c
-accel_k.h
-global.h
-k_model.c
-k_model.h
-k_model_utils
-k_model_utils.c
-list.c
-list.h
+*.pyc
+bin/
+build/
+doc/
 Makefile
-parse.c
-parse.h
-sawsim
-sawsim.aux
-sawsim.bbl
-sawsim.blg
-sawsim.c
-sawsim.log
-sawsim.out
-sawsim.pdf
-sawsim.tex
-string_eval.c
-string_eval.h
-tension_balance.c
-tension_balance.h
-tension_model.c
-tension_model.h
-tension_model_utils
-tension_model_utils.c
index cc179d9f09eba199fbac1088d436404e5d882bbb..f5b7e64447e260673fac11302ec1c43a8050382f 100644 (file)
@@ -44,3 +44,23 @@ Adding tests to modules
 
 Just go crazy with doctests and unittests; nose_ will find them.
 """
+
+import logging
+import logging.handlers
+import sys
+
+
+__version__ = '0.10'  # match sawsim version
+
+def log():
+    return logging.getLogger('pysawsim')
+
+_log = log()
+_log.setLevel(logging.DEBUG)
+_console = logging.StreamHandler()
+_formatter = logging.Formatter('%(name)-8s: %(levelname)-6s %(message)s')
+_console.setFormatter(_formatter)
+_log.addHandler(_console)
+del(_log)
+del(_console)
+del(_formatter)
index d7c10b5c778f9d38118be0cd2093fd9b17a01557..bb051d7ad8c36e17090d188dc2763800401a0915 100644 (file)
@@ -34,7 +34,7 @@ class CommandError(Exception):
         self.stdout = stdout
         self.stderr = stderr
 
-def invoke(cmd_string, stdin=None, expect=(0,), cwd=None, verbose=True):
+def invoke(cmd_string, stdin=None, expect=(0,), cwd=None, verbose=False):
     """
     expect should be a tuple of allowed exit codes.  cwd should be
     the directory from which the command will be executed.
@@ -51,11 +51,11 @@ def invoke(cmd_string, stdin=None, expect=(0,), cwd=None, verbose=True):
             q = Popen(cmd_string, stdin=PIPE, stdout=PIPE, stderr=PIPE,
                       shell=True, cwd=cwd)
     except OSError, e :
-        raise CommandError(args, status=e.args[0], stdout="", stderr=e)
-    output,error = q.communicate(input=stdin)
+        raise CommandError(cmd_string, status=e.args[0], stdout="", stderr=e)
+    stdout,stderr = q.communicate(input=stdin)
     status = q.wait()
     if verbose == True:
-        print >> sys.stderr, "%d\n%s%s" % (status, output, error)
+        print >> sys.stderr, "%d\n%s%s" % (status, stdout, stderr)
     if status not in expect:
-        raise CommandError(args, status, output, error)
-    return status, output, error
+        raise CommandError(cmd_string, status, stdout, stderr)
+    return status, stdout, stderr
diff --git a/pysawsim/manager/__init__.py b/pysawsim/manager/__init__.py
new file mode 100644 (file)
index 0000000..5deb701
--- /dev/null
@@ -0,0 +1,189 @@
+# Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# The author may be contacted at <wking@drexel.edu> on the Internet, or
+# write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
+# Philadelphia PA 19104, USA.
+
+"""Functions for running external commands on other hosts.
+"""
+
+from .. import invoke as invoke
+
+
+class Job (object):
+    """
+    >>> import copy
+
+    Jobs execute a Python function (through an interface similar to
+    threading.Thread).
+
+    >>> def job(*args, **kwargs):
+    ...     print 'executing job with %s and %s.' % (args, kwargs)
+    ...     return [4, 5]
+    >>> j = Job(id=3, target=job, args=[1,2], kwargs={'kwargA':3})
+    >>> j2 = copy.deepcopy(j)
+
+    Job status is initially `None`.
+
+    >>> print j.status
+    None
+
+    >>> j.run()
+    executing job with (1, 2) and {'kwargA': 3}.
+
+    After execution, `status` and `data` are set.
+
+    >>> j.status
+    0
+    >>> j.data
+    [4, 5]
+
+    You can copy these attributes over to another job with `copy_onto()`.
+
+    >>> j.copy_onto(j2)
+    >>> j2.status
+    0
+    >>> j2.data
+    [4, 5]
+
+    String representations are nice and simple.
+
+    >>> str(j)
+    '<Job 3>'
+    >>> repr(j)
+    '<Job 3>'
+
+    If `target` raises an exception, it is stored in `status` (for
+    successful runs, the status value is `0`).
+
+    >>> def job():
+    ...     raise Exception('error running job')
+    >>> j = Job(id=3, target=job)
+    >>> j.run()
+    >>> j.status
+    Exception('error running job',)
+    >>> print j.data
+    None
+    """
+    def __init__(self, id, target, blocks_on=None, args=None, kwargs=None):
+        if blocks_on == None:
+            blocks_on = []
+        if args == None:
+            args = []
+        if kwargs == None:
+            kwargs = {}
+        self.id = id
+        self.target = target
+        self.blocks_on = blocks_on
+        self.args = args
+        self.kwargs = kwargs
+        self.status = None
+        self.data = None
+
+    def __str__(self):
+        return '<%s %d>' % (self.__class__.__name__, self.id)
+
+    def __repr__(self):
+        return self.__str__()
+
+    def run(self):
+        try:
+            self.data = self.target(*self.args, **self.kwargs)
+        except Exception, e:
+            self.status = e
+        else:
+            self.status = 0
+
+    def copy_onto(self, other):
+        """Merge results of run onto initial job-request instance."""
+        for attr in ['status', 'data']:
+            setattr(other, attr, getattr(self, attr))
+
+
+class InvokeJob (Job):
+    """Job subclass which `invoke()`\s a command line function.
+
+    >>> q = 'What is the meaning of life, the universe, and everything?'
+    >>> j = InvokeJob(id=3, target='echo "%s"' % q)
+    >>> j.run()
+    >>> j.status
+    0
+    >>> j.data['stdout']
+    'What is the meaning of life, the universe, and everything?\\n'
+    >>> j.data['stderr']
+    ''
+
+    >>> j = InvokeJob(id=3, target='missing_command')
+    >>> j.run()
+    >>> print j.status
+    Command failed (127):
+      /bin/sh: missing_command: command not found
+    <BLANKLINE>
+    <BLANKLINE>
+    while executing
+      missing_command
+    >>> j.data['stdout']
+    ''
+    >>> j.data['stderr']
+    '/bin/sh: missing_command: command not found\\n'
+    """
+    def run(self):
+        try:
+            self.status,stdout,stderr = invoke.invoke(
+                self.target, *self.args, **self.kwargs)
+            self.data = {'stdout':stdout, 'stderr':stderr}
+        except invoke.CommandError, e:
+            self.status = e
+            self.data = {'stdout':e.stdout, 'stderr':e.stderr}
+
+
+class JobManager (object):
+    def __init__(self):
+        self._jobs = {}
+        self.next_id = 0
+
+    def teardown(self):
+        pass
+
+    def async_invoke(self, cmd_string, blocks_on=None, *args, **kwargs):
+        id = self.next_id
+        self.next_id += 1
+        job = Job(id, cmd_string, *args, **kwargs)
+        self._jobs[id] = job
+        self._spawn_job(job)
+        return job
+
+    def _spawn_job(self, job):
+        raise NotImplementedError
+
+    def wait(self, ids=None):
+        if ids == None:
+            ids = self._jobs.keys()
+        jobs = {}
+        for id in list(ids):  # get already completed jobs
+            if self._jobs[id] != None:
+                jobs[id] = self._jobs.pop(id)
+                ids.remove(id)
+        while len(ids) > 0:  # wait for outstanding jobs
+            job = self._receive_job()
+            job.copy_onto(self._jobs[job.id])
+            if job.id in ids:
+                jobs[job.id] = self._jobs.pop(job.id)
+                ids.remove(id)
+        return jobs
+
+    def _receive_job(self):
+        raise NotImplementedError
diff --git a/pysawsim/manager/thread.py b/pysawsim/manager/thread.py
new file mode 100644 (file)
index 0000000..cc5b05d
--- /dev/null
@@ -0,0 +1,113 @@
+# Copyright (C) 2010  W. Trevor King <wking@drexel.edu>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# The author may be contacted at <wking@drexel.edu> on the Internet, or
+# write to Trevor King, Drexel University, Physics Dept., 3141 Chestnut St.,
+# Philadelphia PA 19104, USA.
+
+"""Functions for running external commands on other hosts.
+"""
+
+from Queue import Queue
+import threading
+
+from .. import log
+from . import Job, JobManager
+
+
+CLOSE_MESSAGE = "close"
+
+
+class WorkerThread (threading.Thread):
+    def __init__(self, spawn_queue, receive_queue, *args, **kwargs):
+        super(WorkerThread, self).__init__(*args, **kwargs)
+        self.spawn_queue = spawn_queue
+        self.receive_queue = receive_queue
+
+    def run(self):
+        while True:
+            msg = self.spawn_queue.get()
+            if msg == CLOSE_MESSAGE:
+                break
+            assert isinstance(msg, Job), msg
+            msg.run()
+            self.receive_queue.put(msg)
+
+
+class ThreadManager (JobManager):
+    """
+    >>> t = ThreadManager()
+    >>> group_A = []
+    >>> for i in range(10):
+    ...     group_A.append(t.async_invoke('echo "%d"' % i))
+    >>> group_B = []
+    >>> for i in range(10):
+    ...     group_B.append(t.async_invoke('echo "%d"' % i,
+    ...                                   blocks_on=[j.id for j in group_A]))
+    >>> jobs = t.wait(ids=[j.id for j in group_A[5:8]])
+    >>> print sorted(jobs.values(), key=lambda j: j.id)
+    [<Job 5>, <Job 6>, <Job 7>]
+    >>> jobs = t.wait()
+    >>> print sorted(jobs.values(), key=lambda j: j.id)
+    ... # doctest: +NORMALIZE_WHITESPACE
+    [<Job 0>, <Job 1>, <Job 2>, <Job 3>, <Job 4>, <Job 8>, <Job 9>, <Job 10>,
+     <Job 11>, <Job 12>, <Job 13>, <Job 14>, <Job 15>, <Job 16>, <Job 17>,
+     <Job 18>, <Job 19>]
+    >>> t.teardown()
+    """
+    def __init__(self, worker_pool=5):
+        super(ThreadManager, self).__init__()
+        self._blocked = []
+        self._spawn_queue = Queue()
+        self._receive_queue = Queue()
+        self._workers = []
+        for i in range(worker_pool):
+            worker = WorkerThread(spawn_queue=self._spawn_queue,
+                                  receive_queue=self._receive_queue,
+                                  name='worker-%d' % i)
+            log().debug('start thread %s' % worker.name)
+            worker.start()
+            self._workers.append(worker)
+
+    def teardown(self):
+        for worker in self._workers:
+            self._spawn_queue.put(CLOSE_MESSAGE)
+        for worker in self._workers:
+            log().debug('join thread %s' % worker.name)
+            worker.join()
+        super(ThreadManager, self).teardown()
+
+    def _job_is_blocked(self, job):
+        for id in job.blocks_on:
+            if id in self._jobs and self.jobs[id].status == None:
+                return True
+        return False
+
+    def _spawn_job(self, job):
+        if self._job_is_blocked(job):
+            self._blocked.append(job)
+        log().debug('queue job %s' % job)
+        self._spawn_queue.put(job)
+
+    def _receive_job(self):
+        job = self._receive_queue.get()
+        for j in self._blocked:
+            if job.id in j.blocks_on:
+                if not self._job_is_blocked(j):
+                    self._blocked.remove(j)
+                    log().debug('queue job %s' % j)
+                    self._spawn_queue.put(j)
+        log().debug('receive job %s' % job)
+        return job