5 """Use comedi commands for asyncronous input.
7 An example for directly using Comedi commands. Comedi commands
8 are used for asynchronous acquisition, with the timing controlled
9 by on-board timers or external events.
11 Based on David A. Schleef's `comedilib/demo/cmd.c`.
17 import numpy as _numpy
19 from pycomedi import LOG as _LOG
20 import pycomedi.constant as _constant
21 from pycomedi.device import Device as _Device
22 from pycomedi.subdevice import StreamingSubdevice as _StreamingSubdevice
23 from pycomedi.channel import AnalogChannel as _AnalogChannel
24 from pycomedi.chanspec import ChanSpec as _ChanSpec
25 import pycomedi.utility as _utility
28 def open_channels(device, subdevice, channels, range, aref):
29 """Subdevice index and list of channel indexes
30 to ``Subdevice`` instance and list of ``AnalogChannel`` instances
33 subdevice = device.subdevice(subdevice, factory=_StreamingSubdevice)
35 subdevice = device.find_subdevice_by_type(
36 _constant.SUBDEVICE_TYPE.ai, factory=_StreamingSubdevice)
37 channels = [subdevice.channel(
38 index=i, factory=_AnalogChannel, range=range, aref=aref)
40 return(subdevice, channels)
42 def prepare_command(subdevice, channels, period, num_scans):
43 """Create a periodic sampling command.
45 Ask comedilib to create a generic sampling command and then modify
48 command = subdevice.get_cmd_generic_timed(
49 len(channels), scan_period_ns=period)
50 command.chanlist = channels
51 command.stop_src = _constant.TRIG_SRC.count
52 command.stop_arg = num_scans
55 def test_command(subdevice, num_tests=2):
56 """Adjust a command as necessary to get valid arguments.
58 _LOG.info('command before testing:\n{}'.format(subdevice.cmd))
60 rc = subdevice.command_test()
62 _LOG.info('command is valid')
64 _LOG.info('test {} returned {}\n{}'.format(i, rc, subdevice.cmd))
65 _LOG.error('error preparing command: {}'.format(rc))
67 test_command.__test__ = False # test_command is not a Nose test
69 def write_data(stream, channels, data, physical=False):
71 converters = [c.get_converter() for c in channels]
72 physical_data = _numpy.zeros(data.shape, dtype=float32)
73 for i,c in enumerate(converters):
74 physical_data[:,i] = c.to_physical(data[:,i])
76 for row in range(data.shape[0]):
77 stream.write('\t'.join(str(x) for x in data[row,:]))
80 def read(device, subdevice=None, channels=[0], range=0, aref=0, period=0,
81 num_scans=2, reader=_utility.Reader, physical=False,
83 """Read ``num_scans`` samples from each specified channel.
85 subdevice,channels = open_channels(
86 device=device, subdevice=subdevice, channels=channels, range=range,
88 subdevice.cmd = prepare_command(
89 subdevice=subdevice, channels=channels, period=period,
91 rc = test_command(subdevice=subdevice)
92 read_buffer = _numpy.zeros(
93 (num_scans, len(channels)),
94 dtype=subdevice.get_dtype())
95 reader = reader(subdevice=subdevice, buffer=read_buffer, name='Reader')
97 _LOG.info('start time: {}'.format(start))
102 _LOG.info('stop time: {}'.format(stop))
103 _LOG.info('time: {}'.format(stop - start))
105 stream=stream, channels=channels, data=read_buffer, physical=physical)
108 def run(filename, **kwargs):
111 >>> stdout = StringIO.StringIO()
112 >>> run(filename='/dev/comedi0', stream=stdout)
113 >>> print(stdout.getvalue()) # doctest: +SKIP
117 >>> stdout.truncate(0)
118 >>> run(filename='/dev/comedi0', reader=_utility.MMapReader, stream=stdout)
119 >>> print(stdout.getvalue()) # doctest: +SKIP
124 device = _Device(filename=filename)
127 read(device=device, **kwargs)
132 if __name__ == '__main__':
133 import pycomedi_demo_args
135 args = pycomedi_demo_args.parse_args(
137 argnames=['filename', 'subdevice', 'channels', 'aref', 'range',
138 'num-scans', 'mmap', 'frequency', 'physical', 'verbose'])
140 _LOG.info(('measuring device={0.filename} subdevice={0.subdevice} '
141 'channels={0.channels} range={0.range} '
142 'analog-reference={0.aref}'
146 reader = _utility.MMapReader
148 reader = _utility.Reader
150 run(filename=args.filename, subdevice=args.subdevice,
151 channels=args.channels, aref=args.aref, range=args.range,
152 num_scans=args.num_scans, reader=reader, period=args.period,
153 physical=args.physical)