Add pycomedi.utility.CallbackReader for handling read data as it comes in.
authorW. Trevor King <wking@tremily.us>
Thu, 26 Apr 2012 16:56:43 +0000 (12:56 -0400)
committerW. Trevor King <wking@tremily.us>
Thu, 26 Apr 2012 17:00:52 +0000 (13:00 -0400)
pycomedi/utility.py

index e3f0b0926e7c8b764b4717b23d76e75ef1b706d3..3ce8842929948b266b179ed95acd93610b4b2030 100644 (file)
@@ -191,6 +191,93 @@ class Reader (_ReadWriteThread):
             self.block()
 
 
             self.block()
 
 
+class CallbackReader (Reader):
+    """`read()`-based reader with callbacks
+
+    Examples
+    --------
+
+    Setup a temporary data file for testing.
+
+    >>> from os import close, remove
+    >>> from sys import stdout
+    >>> from tempfile import mkstemp
+    >>> from time import sleep
+    >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-')
+    >>> f = _os.fdopen(fd, 'rb+')
+    >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16)
+    >>> buf.tofile(t)
+
+    Override the default `Reader` methods for our dummy subdevice.
+
+    >>> class TestReader (CallbackReader):
+    ...     def _file(self):
+    ...         return f
+
+    Define a callback function.
+
+    >>> def callback(data):
+    ...     sleep(0.1)  # for consistent output spacing
+    ...     print('got: {0}'.format(repr(data)))
+    ...     stdout.flush()
+
+    Run the test reader.
+
+    >>> rbuf = _numpy.zeros((buf.shape[1],), dtype=_numpy.uint16)
+    >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest',
+    ...     callback=callback, count=buf.shape[0])
+    >>> r.start()
+    >>> sleep(0.25)
+    got: array([ 0, 10], dtype=uint16)
+    got: array([ 1, 11], dtype=uint16)
+    >>> r.join()
+    got: array([ 2, 12], dtype=uint16)
+
+    While `numpy` arrays make multi-channel indexing easy, they do
+    require an external library.  For single-channel input, the
+    `array` module is sufficient.
+
+    >>> f.seek(0)
+    >>> rbuf = _array.array('H', [0])
+    >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest',
+    ...     callback=callback, count=buf.size)
+    >>> r.start()
+    >>> sleep(0.35)
+    got: array('H', [0])
+    got: array('H', [10])
+    got: array('H', [1])
+    >>> r.join()
+    got: array('H', [11])
+    got: array('H', [2])
+    got: array('H', [12])
+
+    Cleanup the temporary data file.
+
+    >>> f.close()  # no need for `close(fd)`
+    >>> remove(t)
+    """
+    def __init__(self, callback=None, count=None, **kwargs):
+        self.callback = callback
+        self.count = count
+        super(CallbackReader, self).__init__(**kwargs)
+
+    def run(self):
+        count = self.count
+        block_while_running = self.block_while_running
+        while count is None or count > 0:
+            if count is not None:
+                count -= 1
+            try:
+                self.block_while_running = False
+                super(CallbackReader, self).run()
+            finally:
+                self.block_while_running = block_while_running
+            if self.callback:
+                self.callback(self.buffer)
+        if self.block_while_running:
+            self.block()
+
+
 class Writer (_ReadWriteThread):
     """`write()`-based writer
 
 class Writer (_ReadWriteThread):
     """`write()`-based writer