3 # Copyright (C) 2012 W. Trevor King <wking@tremily.us>
5 # This file is part of pycomedi.
7 # pycomedi is free software: you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation, either version 2 of the License, or (at your option) any later
12 # pycomedi is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License along with
17 # pycomedi. If not, see <http://www.gnu.org/licenses/>.
19 """Use comedi commands for asyncronous input.
21 An example for directly using Comedi commands. Comedi commands
22 are used for asynchronous acquisition, with the timing controlled
23 by on-board timers or external events.
25 Based on David A. Schleef's `comedilib/demo/cmd.c`.
31 import numpy as _numpy
33 from pycomedi import LOG as _LOG
34 import pycomedi.constant as _constant
35 from pycomedi.device import Device as _Device
36 from pycomedi.subdevice import StreamingSubdevice as _StreamingSubdevice
37 from pycomedi.channel import AnalogChannel as _AnalogChannel
38 from pycomedi.chanspec import ChanSpec as _ChanSpec
39 import pycomedi.utility as _utility
42 def open_channels(device, subdevice, channels, range, aref):
43 """Subdevice index and list of channel indexes
44 to ``Subdevice`` instance and list of ``AnalogChannel`` instances
47 subdevice = device.subdevice(subdevice, factory=_StreamingSubdevice)
49 subdevice = device.find_subdevice_by_type(
50 _constant.SUBDEVICE_TYPE.ai, factory=_StreamingSubdevice)
51 channels = [subdevice.channel(
52 index=i, factory=_AnalogChannel, range=range, aref=aref)
54 return(subdevice, channels)
56 def prepare_command(subdevice, channels, period, num_scans):
57 """Create a periodic sampling command.
59 Ask comedilib to create a generic sampling command and then modify
62 command = subdevice.get_cmd_generic_timed(
63 len(channels), scan_period_ns=period*1e9)
64 command.chanlist = channels
65 command.stop_src = _constant.TRIG_SRC.count
66 command.stop_arg = num_scans
69 def test_command(subdevice, num_tests=2):
70 """Adjust a command as necessary to get valid arguments.
72 _LOG.info('command before testing:\n{}'.format(subdevice.cmd))
74 rc = subdevice.command_test()
76 _LOG.info('command is valid')
78 _LOG.info('test {} returned {}\n{}'.format(i, rc, subdevice.cmd))
79 _LOG.error('error preparing command: {}'.format(rc))
81 test_command.__test__ = False # test_command is not a Nose test
83 def write_data(stream, channels, data, physical=False):
85 converters = [c.get_converter() for c in channels]
86 physical_data = _numpy.zeros(data.shape, dtype=_numpy.float32)
87 for i,c in enumerate(converters):
88 physical_data[:,i] = c.to_physical(data[:,i])
90 for row in range(data.shape[0]):
91 stream.write('\t'.join(str(x) for x in data[row,:]))
94 class DataWriter (object):
95 def __init__(self, stream, channels, physical=False):
97 self.channels = channels
98 self.physical = physical
100 def __call__(self, data):
101 d = _numpy.ndarray(shape=(1,data.size), dtype=data.dtype, buffer=data)
103 stream=self.stream, channels=self.channels, data=d,
104 physical=self.physical)
107 def read(device, subdevice=None, channels=[0], range=0, aref=0, period=0,
108 num_scans=2, reader=_utility.Reader, physical=False,
110 """Read ``num_scans`` samples from each specified channel.
112 subdevice,channels = open_channels(
113 device=device, subdevice=subdevice, channels=channels, range=range,
115 subdevice.cmd = prepare_command(
116 subdevice=subdevice, channels=channels, period=period,
118 rc = test_command(subdevice=subdevice)
120 if reader == _utility.CallbackReader:
121 read_buffer_shape = (len(channels),)
123 'callback': DataWriter(
124 stream=stream, channels=channels, physical=physical),
128 read_buffer_shape = (num_scans, len(channels))
129 read_buffer = _numpy.zeros(read_buffer_shape, dtype=subdevice.get_dtype())
131 subdevice=subdevice, buffer=read_buffer, name='Reader', **kwargs)
133 _LOG.info('start time: {}'.format(start))
138 _LOG.info('stop time: {}'.format(stop))
139 _LOG.info('time: {}'.format(stop - start))
140 if not isinstance(reader, _utility.CallbackReader):
142 stream=stream, channels=channels, data=read_buffer,
146 def run(filename, **kwargs):
149 >>> stdout = StringIO.StringIO()
150 >>> run(filename='/dev/comedi0', stream=stdout)
151 >>> print(stdout.getvalue()) # doctest: +SKIP
155 >>> stdout.truncate(0)
156 >>> run(filename='/dev/comedi0', reader=_utility.MMapReader, stream=stdout)
157 >>> print(stdout.getvalue()) # doctest: +SKIP
162 device = _Device(filename=filename)
165 read(device=device, **kwargs)
170 if __name__ == '__main__':
171 import pycomedi_demo_args
173 args = pycomedi_demo_args.parse_args(
175 argnames=['filename', 'subdevice', 'channels', 'aref', 'range',
176 'num-scans', 'mmap', 'callback', 'frequency', 'physical',
179 _LOG.info(('measuring device={0.filename} subdevice={0.subdevice} '
180 'channels={0.channels} range={0.range} '
181 'analog-reference={0.aref}'
185 reader = _utility.CallbackReader
187 reader = _utility.MMapReader
189 reader = _utility.Reader
191 run(filename=args.filename, subdevice=args.subdevice,
192 channels=args.channels, aref=args.aref, range=args.range,
193 num_scans=args.num_scans, reader=reader, period=args.period,
194 physical=args.physical)