1 # Simultaneous, finite, buffered analog inpout/output using comedi drivers
4 from numpy import array, fromstring, uint16, float32, pi, sin
9 from scipy.stats import linregress
14 AO_TRIGGERS_OFF_AI_START = True
15 #AO_TRIGGERS_OFF_AI_START = False
17 class simAioError (Exception) :
18 "Simultaneous Analog IO error"
21 _example_array = array([0], dtype=uint16) # for typing, since I don't know what type(array) should be
23 _cmdtest_message = ["success",
30 def _print_cmdsrc(source) :
31 if source & comedi.TRIG_NONE : print "none|",
32 if source & comedi.TRIG_NOW : print "now|",
33 if source & comedi.TRIG_FOLLOW : print "follow|",
34 if source & comedi.TRIG_TIME : print "time|",
35 if source & comedi.TRIG_TIMER : print "timer|",
36 if source & comedi.TRIG_COUNT : print "count|",
37 if source & comedi.TRIG_EXT : print "ext|",
38 if source & comedi.TRIG_INT : print "int|",
39 if source & comedi.TRIG_OTHER : print "other|",
41 def _print_command(cmd) :
42 print "subdevice: \t%d" % cmd.subdev
43 print "flags: \t0x%x" % cmd.flags
45 _print_cmdsrc(cmd.start_src)
46 print "\t%d" % cmd.start_arg
47 print "scan_begin:\t",
48 _print_cmdsrc(cmd.scan_begin_src)
49 print "\t%d" % cmd.scan_begin_arg
51 _print_cmdsrc(cmd.convert_src)
52 print "\t%d" % cmd.convert_arg
54 _print_cmdsrc(cmd.scan_end_src)
55 print "\t%d" % cmd.scan_end_arg
57 _print_cmdsrc(cmd.stop_src)
58 print "\t%d" % cmd.stop_arg
60 def _expand_tuple(tup, length) :
61 "Expand an iterable TUP to a tuple of LENGTH by repeating the last element"
62 if len(tup) > length :
63 raise simAioError, "Tuple too long."
64 elif len(tup) < length :
65 temp_tup = tup + tuple((tup[-1],)*(length-len(tup)))
70 def __init__(self, filename="/dev/comedi0",
71 in_subdevice=-1, in_chan=(0,), in_aref=(0,), in_range=(0,),
72 out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,)) :
74 self._filename = filename
78 self._iaref = _expand_tuple(in_aref, len(in_chan))
79 self._irange = _expand_tuple(in_range, len(in_chan))
80 temp = self._check_options(in_subdevice, in_chan, self._iaref, self._irange, output=False)
81 self._isubdev = temp["subdevice"]
82 self._ichan_params = temp["chan_params"]
84 self.i_nchan = len(self._ichan)
85 self._ichanlist = self._comedi.chanlist(self.i_nchan)
86 for i in range(self.i_nchan) :
87 self._ichanlist[i] = self._comedi.cr_pack(self._ichan[i], self._irange[i], self._iaref[i])
89 self._oaref = _expand_tuple(out_aref, len(in_chan))
90 self._orange = _expand_tuple(out_range, len(in_chan))
91 temp = self._check_options(out_subdevice, out_chan, self._oaref, self._orange, output=True)
92 self._osubdev = temp["subdevice"]
93 self._ochan_params = temp["chan_params"]
94 self._ochan = out_chan
95 self.o_nchan = len(self._ochan)
96 self._ochanlist = self._comedi.chanlist(self.o_nchan)
97 for i in range(self.o_nchan) :
98 self._ochanlist[i] = self._comedi.cr_pack(self._ochan[i], self._orange[i], self._oaref[i])
100 self._gen_rough_output_cmd()
101 self._gen_rough_input_cmd()
102 self.state = "Initialized"
106 if self.state != "Closed" :
107 self.reset(force=True)
108 rc = self._comedi.comedi_close(self._dev)
110 self._comedi.comedi_perror("comedi_close")
111 raise simAioError, "Cannot close %s" % self._filename
113 print "Closed %s on fd %d" % (self._filename, self._fd)
114 self.state = "Closed"
116 if self.state != "Closed" :
117 raise simAioError, "Invalid state %s" % self.state
118 self._dev = self._comedi.comedi_open(self._filename)
119 self._fd = self._comedi.comedi_fileno(self._dev)
121 print "Opened %s on fd %d" % (self._filename, self._fd)
122 self.state = "Initialized"
123 def _check_options(self, subdevice, chan, aref, rnge, output=True) :
124 subdevice = self._check_subdevice(subdevice, output=output)
126 for i in range(len(chan)) :
127 chan_params.append(self._check_chan(subdevice, chan[i], aref[i], rnge[i]))
133 print " subdevice with channels %s is valid" % (str(chan))
134 return {"subdevice":subdevice,
135 "chan_params":chan_params}
136 def _check_subdevice(self, subdevice, output=True) :
138 target_type = self._comedi.COMEDI_SUBD_AO
140 target_type = self._comedi.COMEDI_SUBD_AI
141 if (subdevice < 0) : # autodetect an input device
142 subdevice = self._comedi.comedi_find_subdevice_by_type(self._dev, target_type, 0) # 0 is starting subdevice
144 type = self._comedi.comedi_get_subdevice_type(self._dev, subdevice)
145 if type != target_type :
146 raise simAioError, "Comedi subdevice %d has wrong type %d" % (subdevice, type)
148 def _check_chan(self, subdevice, chan, aref, range) :
149 subdev_n_chan = self._comedi.comedi_get_n_channels(self._dev, subdevice)
150 if chan >= subdev_n_chan :
151 raise simAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, subdevice, subdev_n_chan-1)
152 n_range = self._comedi.comedi_get_n_ranges(self._dev, subdevice, chan)
153 if range >= n_range :
154 raise simAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdevice, chan, n_range-1)
155 maxdata = self._comedi.comedi_get_maxdata(self._dev, subdevice, chan)
156 comrange = self._comedi.comedi_get_range(self._dev, subdevice, chan, range)
157 return {"maxdata":maxdata, "comrange": comrange}
158 def _gen_rough_output_cmd(self) :
160 print "generate rough output command"
161 cmd = self._comedi.comedi_cmd_struct()
162 cmd.subdev = self._osubdev
163 cmd.flags = self._comedi.CMDF_WRITE
164 if AO_TRIGGERS_OFF_AI_START :
165 cmd.start_src = self._comedi.TRIG_EXT
166 cmd.start_arg = 18 # AI_START1 internal AI start signal
168 cmd.start_src = self._comedi.TRIG_INT
170 cmd.scan_begin_src = self._comedi.TRIG_TIMER
171 cmd.scan_begin_arg = 1 # temporary value for now
172 cmd.convert_src = self._comedi.TRIG_NOW
174 cmd.scan_end_src = self._comedi.TRIG_COUNT
175 cmd.scan_end_arg = self.o_nchan
176 cmd.stop_src = self._comedi.TRIG_COUNT
177 cmd.stop_arg = 1 # temporary value for now
178 cmd.chanlist = self._ochanlist
179 cmd.chanlist_len = self.o_nchan
180 self._test_cmd(cmd, max_passes=3)
182 def _gen_rough_input_cmd(self) :
184 print "generate rough input command"
185 cmd = self._comedi.comedi_cmd_struct()
186 cmd.subdev = self._isubdev
188 cmd.start_src = self._comedi.TRIG_INT
190 cmd.scan_begin_src = self._comedi.TRIG_TIMER
191 cmd.scan_begin_arg = 1 # temporary value for now
192 cmd.convert_src = self._comedi.TRIG_TIMER
194 cmd.scan_end_src = self._comedi.TRIG_COUNT
195 cmd.scan_end_arg = self.i_nchan
196 cmd.stop_src = self._comedi.TRIG_COUNT
197 cmd.stop_arg = 1 # temporary value for now
198 cmd.chanlist = self._ichanlist
199 cmd.chanlist_len = self.i_nchan
200 self._test_cmd(cmd, max_passes=3)
202 def _test_cmd(self, cmd, max_passes=1) :
207 print "Testing command:"
209 while i < max_passes :
210 rc = self._comedi.comedi_command_test(self._dev, cmd)
213 if VERBOSE or very_verbose :
214 print "test pass %d, %s" % (i, _cmdtest_message[rc])
216 if (VERBOSE or very_verbose) and i < max_passes :
217 print "Passing command:"
220 print "Failing command:"
222 raise simAioError, "Invalid command: %s" % _cmdtest_message[rc]
223 def setup(self, nsamps, freq, out_buffer) :
224 if self.state != "Initialized" :
225 raise simAioError, "Invalid state %s" % self.state
226 if type(out_buffer) != type(_example_array) :
227 raise simAioError, "out_buffer must be a numpy array, not a %s" % str(type(out_buffer))
228 self._ocmd.scan_begin_arg = int(1e9/freq)
229 self._ocmd.stop_arg = nsamps
231 print "Configure the board (%d ns per scan, %d samps)" % (self._ocmd.scan_begin_arg, self._ocmd.stop_arg)
232 self._onremain = nsamps
233 self._test_cmd(self._ocmd)
234 rc = self._comedi.comedi_command(self._dev, self._ocmd)
236 self._comedi.comedi_perror("comedi_command")
237 raise simAioError, "Error executing output command %d" % rc
238 self._icmd.scan_begin_arg = int(1e9/freq)
239 self._icmd.stop_arg = nsamps
240 self._test_cmd(self._icmd)
241 self._inremain = nsamps
242 rc = self._comedi.comedi_command(self._dev, self._icmd)
244 self._comedi.comedi_perror("comedi_command")
245 raise simAioError, "Error executing input command"
248 print "Write %d output samples to the card" % (nsamps*self.o_nchan)
249 rc = int16_rw.write_samples(self._fd, nsamps*self.o_nchan, out_buffer, 1)
250 if rc != nsamps*self.o_nchan :
251 raise simAioError, "Error %d writing output buffer\n" % rc
252 rc = int16_rw.write_samples(self._fd, self.o_nchan, out_buffer[-self.o_nchan:], 1) # HACK, add an extra sample for each channel to the output buffer
253 if rc != self.o_nchan :
254 raise simAioError, "Error %d writing hack output buffer\n" % rc
255 # maybe will avoid resetting...
256 self._nsamps = nsamps
259 if self.state != "Setup" :
260 raise simAioError, "Invalid state %s" % self.state
262 print "Arm the analog ouptut"
263 self._comedi_internal_trigger(self._osubdev)
265 def start_read(self, in_buffer) :
266 if self.state != "Armed" :
267 raise simAioError, "Invalid state %s" % self.state
269 print "Start the run"
270 self._comedi_internal_trigger(self._isubdev)
272 print "Read %d input samples from the card" % (self._nsamps*self.i_nchan)
273 rc = int16_rw.read_samples(self._fd, self._nsamps*self.i_nchan, in_buffer, -1)
274 if rc != self._nsamps*self.i_nchan :
275 raise simAioError, "Error %d reading input buffer\n" % rc
277 def _comedi_internal_trigger(self, subdevice) :
278 data = self._comedi.chanlist(1) # by luck, data is an array of lsampl_t (unsigned ints), as is chanlist
279 insn = self._comedi.comedi_insn_struct()
280 insn.insn = self._comedi.INSN_INTTRIG
281 insn.subdev = subdevice
285 rc = self._comedi.comedi_do_insn(self._dev, insn)
286 def reset(self, force=False) :
288 print "Reset the analog subdevices"
289 # clean up after the read
290 rc = self._comedi.comedi_cancel(self._dev, self._osubdev)
292 self._comedi.comedi_perror("comedi_cancel")
293 raise simAioError, "Error cleaning up output command"
294 rc = self._comedi.comedi_cancel(self._dev, self._isubdev)
296 self._comedi.comedi_perror("comedi_cancel")
297 raise simAioError, "Error cleaning up input command"
298 self.state = "Initialized"
301 # define the test suite
303 def _test_aio_obj(aio=None, start_wait=0, verbose=False) :
305 print "_test_aio_obj(start_wait = %g)" % start_wait
307 out_data = array([0]*nsamps, dtype=uint16)
308 in_data = array([0]*nsamps, dtype=uint16)
309 for i in range(nsamps) :
310 out_data[i] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
311 aio.setup(10, 1000, out_data)
314 aio.start_read(in_data)
317 print "out_data:\n", out_data
318 print "in_data:\n", in_data
319 print "residual:\n[",
320 for i, o in zip(in_data, out_data) :
323 return (out_data, in_data)
325 def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=False) :
326 print "_repeat_aio_test()"
327 grads = array([0]*num_tests, dtype=float32)
332 for i in range(num_tests) :
333 out_data, in_data = _test_aio_obj(aio, start_wait)
334 gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
337 print "wait %2d, run %2d, gradient %g" % (start_wait, i, gradient)
340 good_run_arr.append(good_run)
345 good_run_arr.append(good_run)
346 print "failure rate %g%% in %d runs" % ((float(bad)/float(good+bad))*100.0, num_tests)
348 for num in good_run_arr :
350 call += '" | stem_leaf 2'
351 print "good run stem and leaf:"
354 def _test_aio_obj_multi_chan(aio=None, start_wait=0, verbose=False) :
356 print "_test_aio_obj_multi_chan(start_wait = %g)" % start_wait
358 out_data = array([0]*nsamps*aio.o_nchan, dtype=uint16)
359 in_data = array([0]*nsamps*aio.i_nchan, dtype=uint16)
360 # set up interleaved data
361 for i in range(nsamps) :
362 out_data[i*aio.o_nchan] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
363 for j in range(1, aio.o_nchan) :
364 out_data[i*aio.o_nchan + j] = 0
365 aio.setup(10, 1000, out_data)
368 aio.start_read(in_data)
372 for j in range(aio.o_nchan) :
373 print "%s\t" % aio._ochan[j],
374 for j in range(aio.i_nchan) :
375 print "%s\t" % aio._ichan[j],
377 for i in range(nsamps) :
378 for j in range(aio.o_nchan) :
379 print "%s\t" % out_data[i*aio.o_nchan+j],
380 for j in range(aio.i_nchan) :
381 print "%s\t" % in_data[i*aio.i_nchan+j],
383 return (out_data, in_data)
389 _test_aio_obj(aio, start_wait = 0, verbose=True)
390 _test_aio_obj(aio, start_wait = 0.5, verbose=True)
393 #_repeat_aio_test(aio, num_tests=100, start_wait=0, verbose=False)
396 aiom = aio_obj(in_chan=(0,1,2,3), out_chan=(0,1))
397 _test_aio_obj_multi_chan(aiom, start_wait = 0, verbose=True)
399 if __name__ == "__main__" :