Versioning started.
[pycomedi.git] / pycomedi / comedi_simult_aio.py
1 # Simultaneous, finite, buffered analog inpout/output using comedi drivers
2
3 import comedi
4 from numpy import array, fromstring, uint16, float32, pi, sin
5 import int16_rw
6
7 # imports for testing
8 from time import sleep
9 from scipy.stats import linregress
10 from os import system
11
12 #VERBOSE = True
13 VERBOSE = False
14 AO_TRIGGERS_OFF_AI_START = True
15 #AO_TRIGGERS_OFF_AI_START = False
16
17 class simAioError (Exception) :
18     "Simultaneous Analog IO error"
19     pass
20
21 _example_array = array([0], dtype=uint16) # for typing, since I don't know what type(array) should be
22
23 _cmdtest_message = ["success",
24                      "invalid source",
25                      "source conflict",
26                      "invalid argument",
27                      "argument conflict",
28                      "invalid chanlist"]
29
30 def _print_cmdsrc(source) :
31     if source & comedi.TRIG_NONE : print "none|",
32     if source & comedi.TRIG_NOW : print "now|",
33     if source & comedi.TRIG_FOLLOW : print "follow|",
34     if source & comedi.TRIG_TIME : print "time|",
35     if source & comedi.TRIG_TIMER : print "timer|",
36     if source & comedi.TRIG_COUNT : print "count|",
37     if source & comedi.TRIG_EXT : print "ext|",
38     if source & comedi.TRIG_INT : print "int|",
39     if source & comedi.TRIG_OTHER : print "other|",
40
41 def _print_command(cmd) :
42     print "subdevice: \t%d" % cmd.subdev
43     print "flags:     \t0x%x" % cmd.flags
44     print "start:     \t",
45     _print_cmdsrc(cmd.start_src)
46     print "\t%d" % cmd.start_arg
47     print "scan_begin:\t",
48     _print_cmdsrc(cmd.scan_begin_src)
49     print "\t%d" % cmd.scan_begin_arg
50     print "convert:   \t",
51     _print_cmdsrc(cmd.convert_src)
52     print "\t%d" % cmd.convert_arg
53     print "scan_end:  \t",
54     _print_cmdsrc(cmd.scan_end_src)
55     print "\t%d" % cmd.scan_end_arg
56     print "stop:      \t",
57     _print_cmdsrc(cmd.stop_src)
58     print "\t%d" % cmd.stop_arg
59
60 def _expand_tuple(tup, length) : 
61     "Expand an iterable TUP to a tuple of LENGTH by repeating the last element"
62     if len(tup) > length :
63         raise simAioError, "Tuple too long."
64     elif len(tup) < length :
65         temp_tup = tup + tuple((tup[-1],)*(length-len(tup)))
66         tup = temp_tup
67     return tup
68
69 class aio_obj :
70     def __init__(self, filename="/dev/comedi0",
71                  in_subdevice=-1, in_chan=(0,), in_aref=(0,), in_range=(0,),
72                  out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,)) :
73         self._comedi = comedi
74         self._filename = filename
75         self.state = "Closed"
76         self.open()
77
78         self._iaref = _expand_tuple(in_aref, len(in_chan))
79         self._irange = _expand_tuple(in_range, len(in_chan))
80         temp = self._check_options(in_subdevice, in_chan, self._iaref, self._irange, output=False)
81         self._isubdev = temp["subdevice"]
82         self._ichan_params = temp["chan_params"]
83         self._ichan = in_chan
84         self.i_nchan = len(self._ichan)
85         self._ichanlist = self._comedi.chanlist(self.i_nchan)
86         for i in range(self.i_nchan) :
87             self._ichanlist[i] = self._comedi.cr_pack(self._ichan[i], self._irange[i], self._iaref[i])
88
89         self._oaref = _expand_tuple(out_aref, len(in_chan))
90         self._orange = _expand_tuple(out_range, len(in_chan))
91         temp = self._check_options(out_subdevice, out_chan, self._oaref, self._orange, output=True)
92         self._osubdev = temp["subdevice"]
93         self._ochan_params = temp["chan_params"]
94         self._ochan = out_chan
95         self.o_nchan = len(self._ochan)
96         self._ochanlist = self._comedi.chanlist(self.o_nchan)
97         for i in range(self.o_nchan) :
98             self._ochanlist[i] = self._comedi.cr_pack(self._ochan[i], self._orange[i], self._oaref[i])
99
100         self._gen_rough_output_cmd()
101         self._gen_rough_input_cmd()
102         self.state = "Initialized"
103     def __del__(self) :
104         self.close()
105     def close(self) :
106         if self.state != "Closed" :
107             self.reset(force=True)
108             rc = self._comedi.comedi_close(self._dev)
109             if rc < 0 :
110                 self._comedi.comedi_perror("comedi_close")
111                 raise simAioError, "Cannot close %s" % self._filename
112             if VERBOSE :
113                 print "Closed %s on fd %d" % (self._filename, self._fd)
114             self.state = "Closed"
115     def open(self) :
116         if self.state != "Closed" :
117             raise simAioError, "Invalid state %s" % self.state
118         self._dev = self._comedi.comedi_open(self._filename)
119         self._fd = self._comedi.comedi_fileno(self._dev)
120         if VERBOSE :
121             print "Opened %s on fd %d" % (self._filename, self._fd)
122         self.state = "Initialized"
123     def _check_options(self, subdevice, chan, aref, rnge, output=True) :
124         subdevice = self._check_subdevice(subdevice, output=output)
125         chan_params = []
126         for i in range(len(chan)) :
127             chan_params.append(self._check_chan(subdevice, chan[i], aref[i], rnge[i]))
128         if VERBOSE :
129             if output :
130                 print "Output",
131             else :
132                 print "Input",
133             print " subdevice with channels %s is valid" % (str(chan))
134         return {"subdevice":subdevice,
135                 "chan_params":chan_params}
136     def _check_subdevice(self, subdevice, output=True) :
137         if output == True :
138             target_type = self._comedi.COMEDI_SUBD_AO
139         else :
140             target_type = self._comedi.COMEDI_SUBD_AI
141         if (subdevice < 0) : # autodetect an input device
142             subdevice = self._comedi.comedi_find_subdevice_by_type(self._dev, target_type, 0) # 0 is starting subdevice
143         else :
144             type = self._comedi.comedi_get_subdevice_type(self._dev, subdevice)
145             if type != target_type :
146                 raise simAioError, "Comedi subdevice %d has wrong type %d" % (subdevice, type)
147         return subdevice
148     def _check_chan(self, subdevice, chan, aref, range) :
149         subdev_n_chan = self._comedi.comedi_get_n_channels(self._dev, subdevice)
150         if chan >= subdev_n_chan :
151             raise simAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, subdevice, subdev_n_chan-1)
152         n_range = self._comedi.comedi_get_n_ranges(self._dev, subdevice, chan)
153         if range >= n_range :
154             raise simAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdevice, chan, n_range-1)
155         maxdata = self._comedi.comedi_get_maxdata(self._dev, subdevice, chan)
156         comrange = self._comedi.comedi_get_range(self._dev, subdevice, chan, range)
157         return {"maxdata":maxdata, "comrange": comrange}
158     def _gen_rough_output_cmd(self) :
159         if VERBOSE :
160             print "generate rough output command"
161         cmd = self._comedi.comedi_cmd_struct()
162         cmd.subdev = self._osubdev
163         cmd.flags = self._comedi.CMDF_WRITE
164         if AO_TRIGGERS_OFF_AI_START :
165             cmd.start_src = self._comedi.TRIG_EXT
166             cmd.start_arg = 18 # AI_START1 internal AI start signal
167         else :
168             cmd.start_src = self._comedi.TRIG_INT
169             cmd.start_arg = 0
170         cmd.scan_begin_src = self._comedi.TRIG_TIMER
171         cmd.scan_begin_arg = 1 # temporary value for now
172         cmd.convert_src = self._comedi.TRIG_NOW
173         cmd.convert_arg = 0
174         cmd.scan_end_src = self._comedi.TRIG_COUNT
175         cmd.scan_end_arg = self.o_nchan
176         cmd.stop_src = self._comedi.TRIG_COUNT
177         cmd.stop_arg = 1 # temporary value for now
178         cmd.chanlist = self._ochanlist
179         cmd.chanlist_len = self.o_nchan
180         self._test_cmd(cmd, max_passes=3)
181         self._ocmd = cmd
182     def _gen_rough_input_cmd(self) :
183         if VERBOSE :
184             print "generate rough input command"
185         cmd = self._comedi.comedi_cmd_struct()
186         cmd.subdev = self._isubdev
187         cmd.flags = 0
188         cmd.start_src = self._comedi.TRIG_INT
189         cmd.start_arg = 0
190         cmd.scan_begin_src = self._comedi.TRIG_TIMER
191         cmd.scan_begin_arg = 1 # temporary value for now
192         cmd.convert_src = self._comedi.TRIG_TIMER
193         cmd.convert_arg = 1
194         cmd.scan_end_src = self._comedi.TRIG_COUNT
195         cmd.scan_end_arg = self.i_nchan
196         cmd.stop_src = self._comedi.TRIG_COUNT
197         cmd.stop_arg = 1 # temporary value for now
198         cmd.chanlist = self._ichanlist
199         cmd.chanlist_len = self.i_nchan
200         self._test_cmd(cmd, max_passes=3)
201         self._icmd = cmd
202     def _test_cmd(self, cmd, max_passes=1) :
203         very_verbose = False
204         i = 0
205         rc = 0
206         if  very_verbose : 
207             print "Testing command:"
208             _print_command(cmd)
209         while i < max_passes :
210             rc = self._comedi.comedi_command_test(self._dev, cmd)
211             if (rc == 0) :
212                 break
213             if VERBOSE or very_verbose :
214                 print "test pass %d, %s" % (i, _cmdtest_message[rc])
215             i += 1
216         if (VERBOSE or very_verbose) and i < max_passes :
217             print "Passing command:"
218             _print_command(cmd)
219         if i >= max_passes :
220             print "Failing command:"
221             _print_command(cmd)
222             raise simAioError, "Invalid command: %s" % _cmdtest_message[rc]
223     def setup(self, nsamps, freq, out_buffer) :
224         if self.state != "Initialized" :
225             raise simAioError, "Invalid state %s" % self.state
226         if type(out_buffer) != type(_example_array) :
227             raise simAioError, "out_buffer must be a numpy array, not a %s" % str(type(out_buffer))
228         self._ocmd.scan_begin_arg = int(1e9/freq)
229         self._ocmd.stop_arg = nsamps
230         if VERBOSE :
231             print "Configure the board (%d ns per scan, %d samps)" % (self._ocmd.scan_begin_arg, self._ocmd.stop_arg)
232         self._onremain = nsamps
233         self._test_cmd(self._ocmd)
234         rc = self._comedi.comedi_command(self._dev, self._ocmd)
235         if rc < 0 :
236             self._comedi.comedi_perror("comedi_command")
237             raise simAioError, "Error executing output command %d" % rc
238         self._icmd.scan_begin_arg = int(1e9/freq)
239         self._icmd.stop_arg = nsamps
240         self._test_cmd(self._icmd)
241         self._inremain = nsamps
242         rc = self._comedi.comedi_command(self._dev, self._icmd)
243         if rc < 0 :
244             self._comedi.comedi_perror("comedi_command")
245             raise simAioError, "Error executing input command"
246
247         if VERBOSE :
248             print "Write %d output samples to the card" % (nsamps*self.o_nchan)
249         rc = int16_rw.write_samples(self._fd, nsamps*self.o_nchan, out_buffer, 1)
250         if rc != nsamps*self.o_nchan :
251             raise simAioError, "Error %d writing output buffer\n" % rc
252         rc = int16_rw.write_samples(self._fd, self.o_nchan, out_buffer[-self.o_nchan:], 1) # HACK, add an extra sample for each channel to the output buffer
253         if rc != self.o_nchan :
254             raise simAioError, "Error %d writing hack output buffer\n" % rc
255         # maybe will avoid resetting...
256         self._nsamps = nsamps
257         self.state = "Setup"
258     def arm(self) :
259         if self.state != "Setup" :
260             raise simAioError, "Invalid state %s" % self.state 
261         if VERBOSE :
262             print "Arm the analog ouptut"
263         self._comedi_internal_trigger(self._osubdev)
264         self.state = "Armed"
265     def start_read(self, in_buffer) :
266         if self.state != "Armed" :
267             raise simAioError, "Invalid state %s" % self.state
268         if VERBOSE :
269             print "Start the run"
270         self._comedi_internal_trigger(self._isubdev)
271         if VERBOSE :
272             print "Read %d input samples from the card" % (self._nsamps*self.i_nchan)
273         rc = int16_rw.read_samples(self._fd, self._nsamps*self.i_nchan, in_buffer, -1)
274         if rc != self._nsamps*self.i_nchan :
275             raise simAioError, "Error %d reading input buffer\n" % rc
276         self.state = "Read"
277     def _comedi_internal_trigger(self, subdevice) :
278         data = self._comedi.chanlist(1) # by luck, data is an array of lsampl_t (unsigned ints), as is chanlist
279         insn = self._comedi.comedi_insn_struct()
280         insn.insn = self._comedi.INSN_INTTRIG
281         insn.subdev = subdevice
282         insn.data = data
283         insn.n = 1
284         data[0] = 0
285         rc = self._comedi.comedi_do_insn(self._dev, insn)
286     def reset(self, force=False) :
287         if VERBOSE :
288             print "Reset the analog subdevices"
289         # clean up after the read
290         rc = self._comedi.comedi_cancel(self._dev, self._osubdev)
291         if rc < 0 :
292             self._comedi.comedi_perror("comedi_cancel")
293             raise simAioError, "Error cleaning up output command"
294         rc = self._comedi.comedi_cancel(self._dev, self._isubdev)
295         if rc < 0 :
296             self._comedi.comedi_perror("comedi_cancel")
297             raise simAioError, "Error cleaning up input command"
298         self.state = "Initialized"
299
300
301 # define the test suite
302
303 def _test_aio_obj(aio=None, start_wait=0, verbose=False) :
304     if (verbose) :
305         print "_test_aio_obj(start_wait = %g)" % start_wait
306     nsamps = 10
307     out_data = array([0]*nsamps, dtype=uint16)
308     in_data =  array([0]*nsamps, dtype=uint16)
309     for i in range(nsamps) :
310         out_data[i] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
311     aio.setup(10, 1000, out_data)
312     aio.arm()
313     sleep(start_wait)
314     aio.start_read(in_data)
315     aio.reset()
316     if (verbose) :
317         print "out_data:\n", out_data
318         print "in_data:\n", in_data
319         print "residual:\n[",
320         for i, o in zip(in_data, out_data) :
321             print int(i)-int(o),
322         print "]"
323     return (out_data, in_data)
324
325 def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=False) :
326     print "_repeat_aio_test()"
327     grads = array([0]*num_tests, dtype=float32)
328     good = 0
329     bad = 0
330     good_run = 0
331     good_run_arr = []
332     for i in range(num_tests) :
333         out_data, in_data = _test_aio_obj(aio, start_wait)
334         gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
335         grads[i] = gradient
336         if verbose :
337             print "wait %2d, run %2d, gradient %g" % (start_wait, i, gradient)
338         if gradient < .7 :
339             bad += 1
340             good_run_arr.append(good_run)
341             good_run = 0
342         else :
343             good += 1
344             good_run += 1
345     good_run_arr.append(good_run)
346     print "failure rate %g%% in %d runs" % ((float(bad)/float(good+bad))*100.0, num_tests)
347     call = 'echo "'
348     for num in good_run_arr :
349         call += "%d " % num
350     call += '" | stem_leaf 2'
351     print "good run stem and leaf:"
352     system(call)
353
354 def _test_aio_obj_multi_chan(aio=None, start_wait=0, verbose=False) :
355     if (verbose) :
356         print "_test_aio_obj_multi_chan(start_wait = %g)" % start_wait
357     nsamps = 10
358     out_data = array([0]*nsamps*aio.o_nchan, dtype=uint16)
359     in_data =  array([0]*nsamps*aio.i_nchan, dtype=uint16)
360     # set up interleaved data
361     for i in range(nsamps) :
362         out_data[i*aio.o_nchan] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
363         for j in range(1, aio.o_nchan) :
364             out_data[i*aio.o_nchan + j] = 0
365     aio.setup(10, 1000, out_data)
366     aio.arm()
367     sleep(start_wait)
368     aio.start_read(in_data)
369     aio.reset()
370     if (verbose) :
371         print "#",
372         for j in range(aio.o_nchan) :
373             print "%s\t" % aio._ochan[j],
374         for j in range(aio.i_nchan) :
375             print "%s\t" % aio._ichan[j],
376         print ""
377         for i in range(nsamps) :
378             for j in range(aio.o_nchan) :
379                 print "%s\t" % out_data[i*aio.o_nchan+j],
380             for j in range(aio.i_nchan) :
381                 print "%s\t" % in_data[i*aio.i_nchan+j],
382             print ""
383     return (out_data, in_data)
384
385
386
387 def test() :
388     aio = aio_obj()
389     _test_aio_obj(aio, start_wait = 0, verbose=True)
390     _test_aio_obj(aio, start_wait = 0.5, verbose=True)
391     aio.close()
392     aio.open()
393     #_repeat_aio_test(aio, num_tests=100, start_wait=0, verbose=False)
394     aio.close()
395
396     aiom = aio_obj(in_chan=(0,1,2,3), out_chan=(0,1))
397     _test_aio_obj_multi_chan(aiom, start_wait = 0, verbose=True)
398
399 if __name__ == "__main__" :
400     test()