+class CallbackReader (Reader):
+ """`read()`-based reader with callbacks
+ Examples
+ --------
+ Setup a temporary data file for testing.
+ >>> from os import close, remove
+ >>> from sys import stdout
+ >>> from tempfile import mkstemp
+ >>> from time import sleep
+ >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-')
+ >>> f = _os.fdopen(fd, 'rb+')
+ >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16)
+ >>> buf.tofile(t)
+ Override the default `Reader` methods for our dummy subdevice.
+ >>> class TestReader (CallbackReader):
+ ... def _file(self):
+ ... return f
+ Define a callback function.
+ >>> def callback(data):
+ ... sleep(0.1) # for consistent output spacing
+ ... print('got: {0}'.format(repr(data)))
+ ... stdout.flush()
+ Run the test reader.
+ >>> rbuf = _numpy.zeros((buf.shape[1],), dtype=_numpy.uint16)
+ >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest',
+ ... callback=callback, count=buf.shape[0])
+ >>> r.start()
+ >>> sleep(0.25)
+ got: array([ 0, 10], dtype=uint16)
+ got: array([ 1, 11], dtype=uint16)
+ >>> r.join()
+ got: array([ 2, 12], dtype=uint16)
+ While `numpy` arrays make multi-channel indexing easy, they do
+ require an external library. For single-channel input, the
+ `array` module is sufficient.
+ >>>
+ >>> rbuf = _array.array('H', [0])
+ >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest',
+ ... callback=callback, count=buf.size)
+ >>> r.start()
+ >>> sleep(0.35)
+ got: array('H', [0])
+ got: array('H', [10])
+ got: array('H', [1])
+ >>> r.join()
+ got: array('H', [11])
+ got: array('H', [2])
+ got: array('H', [12])
+ Cleanup the temporary data file.
+ >>> f.close() # no need for `close(fd)`
+ >>> remove(t)
+ """
+ def __init__(self, callback=None, count=None, **kwargs):
+ self.callback = callback
+ self.count = count
+ super(CallbackReader, self).__init__(**kwargs)
+ def run(self):
+ count = self.count
+ block_while_running = self.block_while_running
+ while count is None or count > 0:
+ if count is not None:
+ count -= 1
+ try:
+ self.block_while_running = False
+ super(CallbackReader, self).run()
+ finally:
+ self.block_while_running = block_while_running
+ if self.callback:
+ self.callback(self.buffer)
+ if self.block_while_running:
+ self.block()
class Writer (_ReadWriteThread):
"""`write()`-based writer