1 # Copyright (C) 2010 W. Trevor King
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 "Useful utility functions and classes."
18 import array as _array
21 import threading as _threading
24 import comedi as _comedi
25 import numpy as _numpy
27 from . import LOG as _LOG
28 from . import classes as _classes
29 from . import constants as _constants
34 lsampl = _numpy.uint32
38 def subdevice_dtype(subdevice):
39 "Return the appropriate `numpy.dtype` based on subdevice flags"
40 if subdevice.get_flags().lsampl:
44 def subdevice_typecode(subdevice):
45 "Return the appropriate `array` type based on subdevice flags"
46 if subdevice.get_flags().lsampl:
47 return lsampl_typecode
50 def sampl_array(list):
51 "Convert a Python list/tuple into a `sampl` array"
52 ret = _comedi.sampl_array(len(list))
53 for i,x in enumerate(list):
56 #return _array.array(sampl_typecode, list)
58 def lsampl_array(list):
59 "Convert a Python list/tuple into an `lsampl` array"
60 ret = _comedi.lsampl_array(len(list))
61 for i,x in enumerate(list):
64 #return _array.array(lsampl_typecode, list)
66 def set_insn_chanspec(insn, channels):
67 "Configure `insn.chanspec` from the list `channels`"
68 chanlist = _classes.Chanlist(channels)
69 insn.chanspec = chanlist.chanlist()
71 def set_insn_data(insn, data):
72 "Configure `insn.data` and `insn.n` from the iterable `data`"
74 insn.data = lsampl_array(data)
76 def set_cmd_chanlist(cmd, channels):
77 "Configure `cmd.chanlist` and `cmd.chanlist_len` from the list `channels`"
78 chanlist = _classes.Chanlist(channels)
79 cmd.chanlist = chanlist.chanlist()
80 cmd.chanlist_len = len(chanlist)
82 def set_cmd_data(cmd, data):
83 "Configure `cmd.data` and `cmd.data_len` from the iterable `data`"
84 cmd.data = sampl_array(data)
85 cmd.data_len = len(data)
87 def inttrig_insn(subdevice):
88 """Setup an internal trigger for a given `subdevice`
90 From the Comedi docs `section 4.4`_ (Instruction for internal
93 This special instruction has `INSN_INTTRIG` as the insn flag in
94 its instruction data structure. Its execution causes an internal
95 triggering event. This event can, for example, cause the device
96 driver to start a conversion, or to stop an ongoing
97 acquisition. The exact meaning of the triggering depends on the
98 card and its particular driver.
100 The `data[0]` field of the `INSN_INTTRIG` instruction is
101 reserved for future use, and should be set to `0`.
103 From the comedi source (`comedi.comedi_fops.c:parse_insn()`), we
104 see that the `chanspec` attribute is ignored for `INSN_INTTRIG`,
105 so we don't bother setting it here.
107 .. _section 4.4: http://www.comedi.org/doc/x621.html
109 insn = subdevice.insn()
110 insn.insn = _constants.INSN.inttrig.value
111 set_insn_data(insn, [0])
115 def _builtin_array(array):
116 "`array` is an array from the builtin :mod:`array` module"
117 return isinstance(array, _array.array)
120 class _ReadWriteThread (_threading.Thread):
121 "Base class for all reader/writer threads"
122 def __init__(self, subdevice, buffer, name=None):
124 name = '<%s subdevice %d>' % (
125 self.__class__.__name__, subdevice._index)
126 self.subdevice = subdevice
128 super(_ReadWriteThread, self).__init__(name=name)
131 """File for reading/writing data to `.subdevice`
133 This file may use the internal comedi fileno, so do not close
134 it when you are finished. The file will eventually be closed
135 when the backing `Device` instance is closed.
137 return self.subdevice._device.file
140 class Reader (_ReadWriteThread):
141 """`read()`-based reader
146 Setup a temporary data file for testing.
148 >>> from os import close, remove
149 >>> from tempfile import mkstemp
150 >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-')
151 >>> f = _os.fdopen(fd, 'r+')
152 >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16)
155 Override the default `Reader` methods for our dummy subdevice.
157 >>> class TestReader (Reader):
164 >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest')
168 The input buffer is updated in place, and is also available as the
169 reader's `buffer` attribute.
174 [ 2, 12]], dtype=uint16)
178 [ 2, 12]], dtype=uint16)
180 While `numpy` arrays make multi-channel indexing easy, they do
181 require an external library. For single-channel input, the
182 `array` module is sufficient.
185 >>> rbuf = _array.array('H', [0]*buf.size)
186 >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest')
190 array('H', [0, 10, 1, 11, 2, 12])
192 array('H', [0, 10, 1, 11, 2, 12])
194 Cleanup the temporary data file.
196 >>> f.close() # no need for `close(fd)`
200 builtin_array = _builtin_array(self.buffer)
203 # TODO: read into already allocated memory (somehow)
204 size = len(self.buffer)
205 a = _array.array(self.buffer.typecode)
208 else: # numpy.ndarray
209 # TODO: read into already allocated memory (somehow)
210 buf = _numpy.fromfile(
211 f, dtype=self.buffer.dtype, count=self.buffer.size)
213 shape=self.buffer.shape, dtype=self.buffer.dtype,
218 class Writer (_ReadWriteThread):
219 """`write()`-based writer
224 Setup a temporary data file for testing.
226 >>> from os import close, remove
227 >>> from tempfile import mkstemp
228 >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-')
229 >>> f = _os.fdopen(fd, 'r+')
230 >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16)
232 Override the default `Writer` methods for our dummy subdevice.
234 >>> class TestWriter (Writer):
240 >>> r = TestWriter(subdevice=None, buffer=buf, name='Writer-doctest')
243 >>> a = _array.array('H')
244 >>> a.fromfile(open(t, 'rb'), buf.size)
246 array('H', [0, 10, 1, 11, 2, 12])
248 While `numpy` arrays make multi-channel indexing easy, they do
249 require an external library. For single-channel input, the
250 `array` module is sufficient.
253 >>> buf = _array.array('H', [2*x for x in buf.flat])
254 >>> r = TestWriter(subdevice=None, buffer=buf, name='Writer-doctest')
257 >>> a = _array.array('H')
258 >>> a.fromfile(open(t, 'rb'), len(buf))
260 array('H', [0, 20, 2, 22, 4, 24])
262 Cleanup the temporary data file.
264 >>> f.close() # no need for `close(fd)`
269 self.buffer.tofile(f)
273 class _MMapReadWriteThread (_ReadWriteThread):
274 "`mmap()`-based reader/wrtier"
275 def __init__(self, *args, **kwargs):
276 super(_MMapReadWriteThread, self).__init__(*args, **kwargs)
279 def _sleep_time(self, mmap_size):
280 "Expected seconds needed to write a tenth of the mmap buffer"
284 # all sizes measured in bytes
285 builtin_array = _builtin_array(self.buffer)
286 mmap_size = self._mmap_size()
287 sleep_time = self._sleep_time(mmap_size)
289 self._fileno(), mmap_size, self._prot, _mmap.MAP_SHARED)
293 remaining = len(self.buffer)
294 else: # numpy.ndarray
295 remaining = self.buffer.size
296 remaining *= self.buffer.itemsize
297 action = self._initial_action(
298 mmap, buffer_offset, remaining,
299 mmap_size, action_bytes=mmap_size)
300 buffer_offset += action
304 action_bytes = self._action_bytes()
306 action,mmap_offset = self._act(
307 mmap, mmap_offset, buffer_offset, remaining,
308 mmap_size, action_bytes=action_bytes,
309 builtin_array=builtin_array)
310 buffer_offset += action
313 _time.sleep(sleep_time)
315 def _act(self, mmap, mmap_offset, buffer_offset, remaining, mmap_size,
316 action_bytes=None, builtin_array=None):
317 if action_bytes == None:
318 action_bytes = self.subdevice.get_buffer_contents()
319 if mmap_offset + action_bytes >= mmap_size - 1:
320 action_bytes = mmap_size - mmap_offset
324 action_size = min(action_bytes, remaining)
325 self._mmap_action(mmap, buffer_offset, action_size, builtin_array)
326 mmap.flush() # (offset, size), necessary? calls msync?
327 self._mark_action(action_size)
331 return action_size, mmap_offset
333 # pull out subdevice calls for easier testing
335 def _mmap_size(self):
336 return self.subdevice.get_buffer_size()
339 return self.subdevice._device.fileno()
341 def _action_bytes(self):
342 return self.subdevice.get_buffer_contents()
344 # hooks for subclasses
346 def _initial_action(self, mmap, buffer_offset, remaining, mmap_size,
350 def _mmap_action(self, mmap, offset, size):
351 raise NotImplementedError()
353 def _mark_action(self, size):
354 raise NotImplementedError()
357 # MMap classes have more subdevice-based methods to override
358 _mmap_docstring_overrides = '\n ... '.join([
359 'def _mmap_size(self):',
360 ' from os.path import getsize',
361 ' return getsize(t)',
362 'def _fileno(self):',
364 'def _action_bytes(self):',
366 'def _mark_action(self, size):',
372 class MMapReader (_MMapReadWriteThread):
373 __doc__ = Reader.__doc__
375 # convert class and function names
376 ('`read()`', '`mmap()`'),
377 ('Writer', 'MMapWriter'),
378 ('def _file', _mmap_docstring_overrides)]:
379 __doc__ = __doc__.replace(_from, _to)
381 def __init__(self, *args, **kwargs):
382 super(MMapReader, self).__init__(*args, **kwargs)
383 self._prot = _mmap.PROT_READ
385 def _mmap_action(self, mmap, offset, size, builtin_array):
386 offset /= self.buffer.itemsize
387 s = size / self.buffer.itemsize
389 # TODO: read into already allocated memory (somehow)
390 a = _array.array(self.buffer.typecode)
391 a.fromstring(mmap.read(size))
392 self.buffer[offset:offset+s] = a
393 else: # numpy.ndarray
394 # TODO: read into already allocated memory (somehow)
395 a = _numpy.fromstring(mmap.read(size), dtype=self.buffer.dtype)
396 self.buffer.flat[offset:offset+s] = a
398 def _mark_action(self, size):
399 self.subdevice.mark_buffer_read(size)
402 class MMapWriter (_MMapReadWriteThread):
403 __doc__ = Writer.__doc__
405 ('`write()`', '`mmap()`'),
406 ('Reader', 'MMapReader'),
407 ('def _file', _mmap_docstring_overrides)]:
408 __doc__ = __doc__.replace(_from, _to)
410 def __init__(self, *args, **kwargs):
411 super(MMapWriter, self).__init__(*args, **kwargs)
412 self._prot = _mmap.PROT_WRITE
413 self.buffer = buffer(self.buffer)
415 def _mmap_action(self, mmap, offset, size, builtin_array):
416 mmap.write(self.buffer[offset:offset+size])
418 def _mark_action(self, size):
419 self.subdevice.mark_buffer_written(size)
422 del _mmap_docstring_overrides