1 # Copyright (C) 2011-2012 W. Trevor King <wking@drexel.edu>
3 # This file is part of pycomedi.
5 # pycomedi is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 2 of the License, or (at your option) any later
10 # pycomedi is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License along with
15 # pycomedi. If not, see <http://www.gnu.org/licenses/>.
17 "Useful utility functions and classes"
19 import array as _array
22 import threading as _threading
25 import numpy as _numpy
27 from . import LOG as _LOG
28 from . import constant as _constant
33 lsampl = _numpy.uint32
37 def _subdevice_dtype(subdevice):
38 "Return the appropriate `numpy.dtype` based on subdevice flags"
39 if subdevice.get_flags().lsampl:
43 def _subdevice_typecode(subdevice):
44 "Return the appropriate `array` type based on subdevice flags"
45 if subdevice.get_flags().lsampl:
46 return lsampl_typecode
49 def inttrig_insn(subdevice):
50 """Setup an internal trigger for a given `subdevice`
52 From the Comedi docs `section 4.4`_ (Instruction for internal
55 This special instruction has `INSN_INTTRIG` as the insn flag in
56 its instruction data structure. Its execution causes an internal
57 triggering event. This event can, for example, cause the device
58 driver to start a conversion, or to stop an ongoing
59 acquisition. The exact meaning of the triggering depends on the
60 card and its particular driver.
62 The `data[0]` field of the `INSN_INTTRIG` instruction is
63 reserved for future use, and should be set to `0`.
65 From the comedi source (`comedi.comedi_fops.c:parse_insn()`), we
66 see that the `chanspec` attribute is ignored for `INSN_INTTRIG`,
67 so we don't bother setting it here.
69 .. _section 4.4: http://www.comedi.org/doc/x621.html
71 insn = subdevice.insn()
72 insn.insn = _constant.INSN.inttrig.value
77 def _builtin_array(array):
78 "`array` is an array from the builtin :mod:`array` module"
79 return isinstance(array, _array.array)
82 class _ReadWriteThread (_threading.Thread):
83 "Base class for all reader/writer threads"
84 def __init__(self, subdevice, buffer, name=None):
86 name = '<%s subdevice %d>' % (
87 self.__class__.__name__, subdevice.index)
88 self.subdevice = subdevice
91 super(_ReadWriteThread, self).__init__(name=name)
93 def _setup_buffer(self):
94 "Currently just a hook for an MMapWriter hack."
98 """File for reading/writing data to `.subdevice`
100 This file may use the internal comedi fileno, so do not close
101 it when you are finished. The file will eventually be closed
102 when the backing `Device` instance is closed.
104 return self.subdevice.device.file
107 class Reader (_ReadWriteThread):
108 """`read()`-based reader
113 Setup a temporary data file for testing.
115 >>> from os import close, remove
116 >>> from tempfile import mkstemp
117 >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-')
118 >>> f = _os.fdopen(fd, 'r+')
119 >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16)
122 Override the default `Reader` methods for our dummy subdevice.
124 >>> class TestReader (Reader):
131 >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest')
135 The input buffer is updated in place, and is also available as the
136 reader's `buffer` attribute.
141 [ 2, 12]], dtype=uint16)
145 [ 2, 12]], dtype=uint16)
147 While `numpy` arrays make multi-channel indexing easy, they do
148 require an external library. For single-channel input, the
149 `array` module is sufficient.
152 >>> rbuf = _array.array('H', [0]*buf.size)
153 >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest')
157 array('H', [0, 10, 1, 11, 2, 12])
159 array('H', [0, 10, 1, 11, 2, 12])
161 Cleanup the temporary data file.
163 >>> f.close() # no need for `close(fd)`
167 builtin_array = _builtin_array(self.buffer)
170 # TODO: read into already allocated memory (somehow)
171 size = len(self.buffer)
172 a = _array.array(self.buffer.typecode)
175 else: # numpy.ndarray
176 # TODO: read into already allocated memory (somehow)
177 buf = _numpy.fromfile(
178 f, dtype=self.buffer.dtype, count=self.buffer.size)
180 shape=self.buffer.shape, dtype=self.buffer.dtype,
183 #_LOG.critical('ai running? %s' % self.subdevice.get_flags().running)
184 #while self.subdevice.get_flags().running:
185 ##_LOG.critical('ai running? %s' % self.subdevice.get_flags().running)
187 #_LOG.critical('ai running? %s' % self.subdevice.get_flags().running)
191 class Writer (_ReadWriteThread):
192 """`write()`-based writer
197 Setup a temporary data file for testing.
199 >>> from os import close, remove
200 >>> from tempfile import mkstemp
201 >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-')
202 >>> f = _os.fdopen(fd, 'r+')
203 >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16)
205 Override the default `Writer` methods for our dummy subdevice.
207 >>> class TestWriter (Writer):
214 >>> w = TestWriter(subdevice=None, buffer=buf, name='Writer-doctest',
216 >>> a = _array.array('H')
217 >>> a.fromfile(open(t, 'rb'), preload)
219 array('H', [0, 10, 1])
222 >>> a = _array.array('H')
223 >>> a.fromfile(open(t, 'rb'), buf.size)
225 array('H', [0, 10, 1, 11, 2, 12])
227 While `numpy` arrays make multi-channel indexing easy, they do
228 require an external library. For single-channel input, the
229 `array` module is sufficient.
232 >>> buf = _array.array('H', [2*x for x in buf.flat])
233 >>> w = TestWriter(subdevice=None, buffer=buf, name='Writer-doctest',
235 >>> a = _array.array('H')
236 >>> a.fromfile(open(t, 'rb'), preload)
238 array('H', [0, 20, 2])
241 >>> a = _array.array('H')
242 >>> a.fromfile(open(t, 'rb'), len(buf))
244 array('H', [0, 20, 2, 22, 4, 24])
246 Cleanup the temporary data file.
248 >>> f.close() # no need for `close(fd)`
251 def __init__(self, *args, **kwargs):
252 preload = kwargs.pop('preload', 0)
253 super(Writer, self).__init__(*args, **kwargs)
254 if not _builtin_array(self.buffer): # numpy.ndarray
255 self.buffer = self.buffer.flat
256 preload_buffer = self.buffer[:preload]
257 self._preload_setup = {'remaining_buffer': self.buffer[preload:]}
259 preload_buffer.tofile(f)
263 remaining_buffer = self._preload_setup['remaining_buffer']
264 del(self._preload_setup)
267 remaining_buffer.tofile(f)
269 #_LOG.critical('ao running? %s' % self.subdevice.get_flags().running)
270 #while self.subdevice.get_flags().running:
271 ##_LOG.critical('ao running? %s' % self.subdevice.get_flags().running)
273 #_LOG.critical('ao running? %s' % self.subdevice.get_flags().running)
277 class _MMapReadWriteThread (_ReadWriteThread):
278 "`mmap()`-based reader/wrtier"
279 def __init__(self, *args, **kwargs):
280 preload = kwargs.pop('preload', 0)
281 access = kwargs.pop('access')
282 super(_MMapReadWriteThread, self).__init__(*args, **kwargs)
284 # all sizes measured in bytes
285 builtin_array = _builtin_array(self.buffer)
286 mmap_size = int(self._mmap_size())
287 mmap = _mmap.mmap(self._fileno(), mmap_size, access=access)
289 remaining = self._buffer_bytes(builtin_array)
290 action,mmap_offset = self._initial_action(
291 mmap, buffer_offset, remaining, mmap_size, action_bytes=mmap_size,
292 builtin_array=builtin_array)
293 buffer_offset += action
295 self._preload_setup = {
296 'builtin_array': builtin_array,
297 'mmap_size': mmap_size,
299 'mmap_offset': mmap_offset,
300 'buffer_offset': buffer_offset,
301 'remaining': remaining,
304 def _sleep_time(self, mmap_size):
305 "Expected seconds needed to write a tenth of the mmap buffer"
309 builtin_array = self._preload_setup['builtin_array']
310 mmap_size = self._preload_setup['mmap_size']
311 mmap = self._preload_setup['mmap']
312 mmap_offset = self._preload_setup['mmap_offset']
313 buffer_offset = self._preload_setup['buffer_offset']
314 remaining = self._preload_setup['remaining']
315 del(self._preload_setup)
317 sleep_time = self._sleep_time(mmap_size)
319 action_bytes = self._action_bytes()
321 action,mmap_offset = self._act(
322 mmap, mmap_offset, buffer_offset, remaining, mmap_size,
323 action_bytes=action_bytes, builtin_array=builtin_array)
324 buffer_offset += action
327 _time.sleep(sleep_time)
329 def _act(self, mmap, mmap_offset, buffer_offset, remaining, mmap_size,
330 action_bytes=None, builtin_array=None):
331 if action_bytes == None:
332 action_bytes = self.subdevice.get_buffer_contents()
333 if mmap_offset + action_bytes >= mmap_size - 1:
334 action_bytes = mmap_size - mmap_offset
338 action_size = min(action_bytes, remaining, mmap_size-mmap_offset)
339 self._mmap_action(mmap, buffer_offset, action_size, builtin_array)
340 mmap.flush() # (offset, size), necessary? calls msync?
341 self._mark_action(action_size)
345 return action_size, mmap_offset
347 # pull out subdevice calls for easier testing
349 def _mmap_size(self):
350 return self.subdevice.get_buffer_size()
353 return self.subdevice._device.fileno()
355 def _action_bytes(self):
356 return self.subdevice.get_buffer_contents()
358 # hooks for subclasses
360 def _buffer_bytes(self, builtin_array):
362 return len(self.buffer)*self.buffer.itemsize
364 return self.buffer.size*self.buffer.itemsize
366 def _initial_action(self, mmap, buffer_offset, remaining, mmap_size,
367 action_bytes, builtin_array):
370 def _mmap_action(self, mmap, offset, size):
371 raise NotImplementedError()
373 def _mark_action(self, size):
374 raise NotImplementedError()
377 # MMap classes have more subdevice-based methods to override
378 _mmap_docstring_overrides = '\n ... '.join([
379 'def _mmap_size(self):',
380 ' from os.path import getsize',
381 ' return getsize(t)',
382 'def _fileno(self):',
384 'def _action_bytes(self):',
386 'def _mark_action(self, size):',
392 class MMapReader (_MMapReadWriteThread):
393 __doc__ = Reader.__doc__
395 # convert class and function names
396 ('`read()`', '`mmap()`'),
397 ('Reader', 'MMapReader'),
398 ('def _file', _mmap_docstring_overrides)]:
399 __doc__ = __doc__.replace(_from, _to)
401 def __init__(self, *args, **kwargs):
402 assert 'access' not in kwargs
403 kwargs['access'] = _mmap.ACCESS_READ
404 super(MMapReader, self).__init__(*args, **kwargs)
406 def _mmap_action(self, mmap, offset, size, builtin_array):
407 offset /= self.buffer.itemsize
408 s = size / self.buffer.itemsize
410 # TODO: read into already allocated memory (somehow)
411 a = _array.array(self.buffer.typecode)
412 a.fromstring(mmap.read(size))
413 self.buffer[offset:offset+s] = a
414 else: # numpy.ndarray
415 # TODO: read into already allocated memory (somehow)
416 a = _numpy.fromstring(mmap.read(size), dtype=self.buffer.dtype)
417 self.buffer.flat[offset:offset+s] = a
419 def _mark_action(self, size):
420 self.subdevice.mark_buffer_read(size)
423 class MMapWriter (_MMapReadWriteThread):
424 __doc__ = Writer.__doc__
426 ('`write()`', '`mmap()`'),
427 ('Writer', 'MMapWriter'),
428 ('def _file', _mmap_docstring_overrides),
429 ("f = _os.fdopen(fd, 'r+')",
430 "f = _os.fdopen(fd, 'r+'); f.write(6*'\\x00'); f.flush(); f.seek(0)"),
431 ("a.fromfile(open(t, 'rb'), buf.size)",
432 "a.fromfile(open(t, 'rb'), w._mmap_size()/a.itemsize)"),
433 ("a.fromfile(open(t, 'rb'), len(buf))",
434 "a.fromfile(open(t, 'rb'), w._mmap_size()/a.itemsize)"),
435 ("array('H', [0, 10, 1, 11, 2, 12])", "array('H', [11, 2, 12])"),
436 ("array('H', [0, 20, 2, 22, 4, 24])", "array('H', [22, 4, 24])")]:
438 __doc__ = __doc__.replace(_from, _to)
440 def __init__(self, *args, **kwargs):
441 assert 'access' not in kwargs
442 kwargs['access'] = _mmap.ACCESS_WRITE
443 super(MMapWriter, self).__init__(*args, **kwargs)
445 def _setup_buffer(self):
446 self.buffer = buffer(self.buffer)
448 def _buffer_bytes(self, builtin_array):
449 return len(self.buffer) # because of buffer() in _setup_buffer
451 def _initial_action(self, mmap, buffer_offset, remaining, mmap_size,
452 action_bytes, builtin_array):
453 action_size = min(action_bytes, remaining, mmap_size)
454 self._mmap_action(mmap, buffer_offset, action_size, builtin_array)
455 if action_size == mmap_size:
459 mmap_offset = action_size
460 return (action_size, mmap_offset)
462 def _mmap_action(self, mmap, offset, size, builtin_array):
463 mmap.write(self.buffer[offset:offset+size])
466 def _mark_action(self, size):
467 self.subdevice.mark_buffer_written(size)
470 del _mmap_docstring_overrides