1 # Copyright (C) 2011-2012 W. Trevor King <wking@tremily.us>
3 # This file is part of pycomedi.
5 # pycomedi is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 2 of the License, or (at your option) any later
10 # pycomedi is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License along with
15 # pycomedi. If not, see <http://www.gnu.org/licenses/>.
17 "Useful utility functions and classes"
19 import array as _array
22 import threading as _threading
25 import numpy as _numpy
27 from . import LOG as _LOG
28 from . import constant as _constant
33 lsampl = _numpy.uint32
37 def _subdevice_dtype(subdevice):
38 "Return the appropriate `numpy.dtype` based on subdevice flags"
39 if subdevice.get_flags().lsampl:
43 def _subdevice_typecode(subdevice):
44 "Return the appropriate `array` type based on subdevice flags"
45 if subdevice.get_flags().lsampl:
46 return lsampl_typecode
49 def inttrig_insn(subdevice):
50 """Setup an internal trigger for a given `subdevice`
52 From the Comedi docs `section 4.4`_ (Instruction for internal
55 This special instruction has `INSN_INTTRIG` as the insn flag in
56 its instruction data structure. Its execution causes an internal
57 triggering event. This event can, for example, cause the device
58 driver to start a conversion, or to stop an ongoing
59 acquisition. The exact meaning of the triggering depends on the
60 card and its particular driver.
62 The `data[0]` field of the `INSN_INTTRIG` instruction is
63 reserved for future use, and should be set to `0`.
65 From the comedi source (`comedi.comedi_fops.c:parse_insn()`), we
66 see that the `chanspec` attribute is ignored for `INSN_INTTRIG`,
67 so we don't bother setting it here.
69 .. _section 4.4: http://www.comedi.org/doc/x621.html
71 insn = subdevice.insn()
72 insn.insn = _constant.INSN.inttrig.value
77 def _builtin_array(array):
78 "`array` is an array from the builtin :mod:`array` module"
79 return isinstance(array, _array.array)
82 class _ReadWriteThread (_threading.Thread):
83 "Base class for all reader/writer threads"
84 def __init__(self, subdevice, buffer, name=None,
85 block_while_running=False):
87 name = '<%s subdevice %d>' % (
88 self.__class__.__name__, subdevice.index)
89 self.subdevice = subdevice
91 self.block_while_running = block_while_running
93 super(_ReadWriteThread, self).__init__(name=name)
95 def _setup_buffer(self):
96 "Currently just a hook for an MMapWriter hack."
100 """File for reading/writing data to `.subdevice`
102 This file may use the internal comedi fileno, so do not close
103 it when you are finished. The file will eventually be closed
104 when the backing `Device` instance is closed.
106 return self.subdevice.device.file
109 while self.subdevice.get_flags().running:
111 self.subdevice.cancel() # become unbusy
114 class Reader (_ReadWriteThread):
115 """`read()`-based reader
120 Setup a temporary data file for testing.
122 >>> from os import close, remove
123 >>> from tempfile import mkstemp
124 >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-')
125 >>> f = _os.fdopen(fd, 'r+')
126 >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16)
129 Override the default `Reader` methods for our dummy subdevice.
131 >>> class TestReader (Reader):
138 >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest')
142 The input buffer is updated in place, and is also available as the
143 reader's `buffer` attribute.
148 [ 2, 12]], dtype=uint16)
152 [ 2, 12]], dtype=uint16)
154 While `numpy` arrays make multi-channel indexing easy, they do
155 require an external library. For single-channel input, the
156 `array` module is sufficient.
159 >>> rbuf = _array.array('H', [0]*buf.size)
160 >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest')
164 array('H', [0, 10, 1, 11, 2, 12])
166 array('H', [0, 10, 1, 11, 2, 12])
168 Cleanup the temporary data file.
170 >>> f.close() # no need for `close(fd)`
174 builtin_array = _builtin_array(self.buffer)
177 # TODO: read into already allocated memory (somehow)
178 size = len(self.buffer)
179 a = _array.array(self.buffer.typecode)
182 else: # numpy.ndarray
183 # TODO: read into already allocated memory (somehow)
184 buf = _numpy.fromfile(
185 f, dtype=self.buffer.dtype, count=self.buffer.size)
187 shape=self.buffer.shape, dtype=self.buffer.dtype,
190 if self.block_while_running:
194 class CallbackReader (Reader):
195 """`read()`-based reader with callbacks
200 Setup a temporary data file for testing.
202 >>> from os import close, remove
203 >>> from sys import stdout
204 >>> from tempfile import mkstemp
205 >>> from time import sleep
206 >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-')
207 >>> f = _os.fdopen(fd, 'rb+')
208 >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16)
211 Override the default `Reader` methods for our dummy subdevice.
213 >>> class TestReader (CallbackReader):
217 Define a callback function.
219 >>> def callback(data):
220 ... sleep(0.1) # for consistent output spacing
221 ... print('got: {0}'.format(repr(data)))
226 >>> rbuf = _numpy.zeros((buf.shape[1],), dtype=_numpy.uint16)
227 >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest',
228 ... callback=callback, count=buf.shape[0])
231 got: array([ 0, 10], dtype=uint16)
232 got: array([ 1, 11], dtype=uint16)
234 got: array([ 2, 12], dtype=uint16)
236 While `numpy` arrays make multi-channel indexing easy, they do
237 require an external library. For single-channel input, the
238 `array` module is sufficient.
241 >>> rbuf = _array.array('H', [0])
242 >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest',
243 ... callback=callback, count=buf.size)
247 got: array('H', [10])
250 got: array('H', [11])
252 got: array('H', [12])
254 Cleanup the temporary data file.
256 >>> f.close() # no need for `close(fd)`
259 def __init__(self, callback=None, count=None, **kwargs):
260 self.callback = callback
262 super(CallbackReader, self).__init__(**kwargs)
266 block_while_running = self.block_while_running
267 while count is None or count > 0:
268 if count is not None:
271 self.block_while_running = False
272 super(CallbackReader, self).run()
274 self.block_while_running = block_while_running
276 self.callback(self.buffer)
277 if self.block_while_running:
281 class Writer (_ReadWriteThread):
282 """`write()`-based writer
287 Setup a temporary data file for testing.
289 >>> from os import close, remove
290 >>> from tempfile import mkstemp
291 >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-')
292 >>> f = _os.fdopen(fd, 'r+')
293 >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16)
295 Override the default `Writer` methods for our dummy subdevice.
297 >>> class TestWriter (Writer):
304 >>> w = TestWriter(subdevice=None, buffer=buf, name='Writer-doctest',
306 >>> a = _array.array('H')
307 >>> a.fromfile(open(t, 'rb'), preload)
309 array('H', [0, 10, 1])
312 >>> a = _array.array('H')
313 >>> a.fromfile(open(t, 'rb'), buf.size)
315 array('H', [0, 10, 1, 11, 2, 12])
317 While `numpy` arrays make multi-channel indexing easy, they do
318 require an external library. For single-channel input, the
319 `array` module is sufficient.
322 >>> buf = _array.array('H', [2*x for x in buf.flat])
323 >>> w = TestWriter(subdevice=None, buffer=buf, name='Writer-doctest',
325 >>> a = _array.array('H')
326 >>> a.fromfile(open(t, 'rb'), preload)
328 array('H', [0, 20, 2])
331 >>> a = _array.array('H')
332 >>> a.fromfile(open(t, 'rb'), len(buf))
334 array('H', [0, 20, 2, 22, 4, 24])
336 Cleanup the temporary data file.
338 >>> f.close() # no need for `close(fd)`
341 def __init__(self, *args, **kwargs):
342 preload = kwargs.pop('preload', 0)
343 super(Writer, self).__init__(*args, **kwargs)
344 if not _builtin_array(self.buffer): # numpy.ndarray
345 self.buffer = self.buffer.flat
346 preload_buffer = self.buffer[:preload]
347 self._preload_setup = {'remaining_buffer': self.buffer[preload:]}
349 preload_buffer.tofile(f)
353 remaining_buffer = self._preload_setup['remaining_buffer']
354 del(self._preload_setup)
357 remaining_buffer.tofile(f)
359 if self.block_while_running:
363 class _MMapReadWriteThread (_ReadWriteThread):
364 "`mmap()`-based reader/writer"
365 def __init__(self, *args, **kwargs):
366 preload = kwargs.pop('preload', 0)
367 access = kwargs.pop('access')
368 super(_MMapReadWriteThread, self).__init__(*args, **kwargs)
370 # all sizes measured in bytes
371 builtin_array = _builtin_array(self.buffer)
372 mmap_size = int(self._mmap_size())
373 mmap = _mmap.mmap(self._fileno(), mmap_size, access=access)
375 remaining = self._buffer_bytes(builtin_array)
376 action,mmap_offset = self._initial_action(
377 mmap, buffer_offset, remaining, mmap_size, action_bytes=mmap_size,
378 builtin_array=builtin_array)
379 buffer_offset += action
381 self._preload_setup = {
382 'builtin_array': builtin_array,
383 'mmap_size': mmap_size,
385 'mmap_offset': mmap_offset,
386 'buffer_offset': buffer_offset,
387 'remaining': remaining,
390 def _sleep_time(self, mmap_size):
391 "Expected seconds needed to write a tenth of the mmap buffer"
395 builtin_array = self._preload_setup['builtin_array']
396 mmap_size = self._preload_setup['mmap_size']
397 mmap = self._preload_setup['mmap']
398 mmap_offset = self._preload_setup['mmap_offset']
399 buffer_offset = self._preload_setup['buffer_offset']
400 remaining = self._preload_setup['remaining']
401 del(self._preload_setup)
403 sleep_time = self._sleep_time(mmap_size)
405 action_bytes = self._action_bytes()
407 action,mmap_offset = self._act(
408 mmap, mmap_offset, buffer_offset, remaining, mmap_size,
409 action_bytes=action_bytes, builtin_array=builtin_array)
410 buffer_offset += action
413 _time.sleep(sleep_time)
414 if self.block_while_running:
417 def _act(self, mmap, mmap_offset, buffer_offset, remaining, mmap_size,
418 action_bytes=None, builtin_array=None):
419 if action_bytes == None:
420 action_bytes = self.subdevice.get_buffer_contents()
421 if mmap_offset + action_bytes >= mmap_size - 1:
422 action_bytes = mmap_size - mmap_offset
426 action_size = min(action_bytes, remaining, mmap_size-mmap_offset)
427 self._mmap_action(mmap, buffer_offset, action_size, builtin_array)
428 mmap.flush() # (offset, size), necessary? calls msync?
429 self._mark_action(action_size)
433 return action_size, mmap_offset
435 # pull out subdevice calls for easier testing
437 def _mmap_size(self):
438 return self.subdevice.get_buffer_size()
441 return self.subdevice.device.fileno()
443 def _action_bytes(self):
444 return self.subdevice.get_buffer_contents()
446 # hooks for subclasses
448 def _buffer_bytes(self, builtin_array):
450 return len(self.buffer)*self.buffer.itemsize
452 return self.buffer.size*self.buffer.itemsize
454 def _initial_action(self, mmap, buffer_offset, remaining, mmap_size,
455 action_bytes, builtin_array):
458 def _mmap_action(self, mmap, offset, size):
459 raise NotImplementedError()
461 def _mark_action(self, size):
462 raise NotImplementedError()
465 # MMap classes have more subdevice-based methods to override
466 _mmap_docstring_overrides = '\n ... '.join([
467 'def _mmap_size(self):',
468 ' from os.path import getsize',
469 ' return getsize(t)',
470 'def _fileno(self):',
472 'def _action_bytes(self):',
474 'def _mark_action(self, size):',
480 class MMapReader (_MMapReadWriteThread):
481 __doc__ = Reader.__doc__
483 # convert class and function names
484 ('`read()`', '`mmap()`'),
485 ('Reader', 'MMapReader'),
486 ('def _file', _mmap_docstring_overrides)]:
487 __doc__ = __doc__.replace(_from, _to)
489 def __init__(self, *args, **kwargs):
490 assert 'access' not in kwargs
491 kwargs['access'] = _mmap.ACCESS_READ
492 super(MMapReader, self).__init__(*args, **kwargs)
494 def _mmap_action(self, mmap, offset, size, builtin_array):
495 offset /= self.buffer.itemsize
496 s = size / self.buffer.itemsize
498 # TODO: read into already allocated memory (somehow)
499 a = _array.array(self.buffer.typecode)
500 a.fromstring(mmap.read(size))
501 self.buffer[offset:offset+s] = a
502 else: # numpy.ndarray
503 # TODO: read into already allocated memory (somehow)
504 a = _numpy.fromstring(mmap.read(size), dtype=self.buffer.dtype)
505 self.buffer.flat[offset:offset+s] = a
507 def _mark_action(self, size):
508 self.subdevice.mark_buffer_read(size)
511 class MMapWriter (_MMapReadWriteThread):
512 __doc__ = Writer.__doc__
514 ('`write()`', '`mmap()`'),
515 ('Writer', 'MMapWriter'),
516 ('def _file', _mmap_docstring_overrides),
517 ("f = _os.fdopen(fd, 'r+')",
518 "f = _os.fdopen(fd, 'r+'); f.write(6*'\\x00'); f.flush(); f.seek(0)"),
519 ("a.fromfile(open(t, 'rb'), buf.size)",
520 "a.fromfile(open(t, 'rb'), w._mmap_size()/a.itemsize)"),
521 ("a.fromfile(open(t, 'rb'), len(buf))",
522 "a.fromfile(open(t, 'rb'), w._mmap_size()/a.itemsize)"),
523 ("array('H', [0, 10, 1, 11, 2, 12])", "array('H', [11, 2, 12])"),
524 ("array('H', [0, 20, 2, 22, 4, 24])", "array('H', [22, 4, 24])")]:
526 __doc__ = __doc__.replace(_from, _to)
528 def __init__(self, *args, **kwargs):
529 assert 'access' not in kwargs
530 kwargs['access'] = _mmap.ACCESS_WRITE
531 super(MMapWriter, self).__init__(*args, **kwargs)
533 def _setup_buffer(self):
534 self.buffer = buffer(self.buffer)
536 def _buffer_bytes(self, builtin_array):
537 return len(self.buffer) # because of buffer() in _setup_buffer
539 def _initial_action(self, mmap, buffer_offset, remaining, mmap_size,
540 action_bytes, builtin_array):
541 action_size = min(action_bytes, remaining, mmap_size)
542 self._mmap_action(mmap, buffer_offset, action_size, builtin_array)
543 if action_size == mmap_size:
547 mmap_offset = action_size
548 return (action_size, mmap_offset)
550 def _mmap_action(self, mmap, offset, size, builtin_array):
551 mmap.write(self.buffer[offset:offset+size])
554 def _mark_action(self, size):
555 self.subdevice.mark_buffer_written(size)
558 del _mmap_docstring_overrides