Time to compile this heap-o-stuff into a real package.
--- /dev/null
+# Simultaneous, finite, buffered analog inpout/output using comedi drivers
+
+import comedi
+from numpy import array, fromstring, uint16, float32, pi, sin
+import int16_rw
+
+# imports for testing
+from time import sleep
+from scipy.stats import linregress
+from os import system
+
+#VERBOSE = True
+VERBOSE = False
+AO_TRIGGERS_OFF_AI_START = True
+#AO_TRIGGERS_OFF_AI_START = False
+
+class simAioError (Exception) :
+ "Simultaneous Analog IO error"
+ pass
+
+_example_array = array([0], dtype=uint16) # for typing, since I don't know what type(array) should be
+
+_cmdtest_message = ["success",
+ "invalid source",
+ "source conflict",
+ "invalid argument",
+ "argument conflict",
+ "invalid chanlist"]
+
+def _print_cmdsrc(source) :
+ if source & comedi.TRIG_NONE : print "none|",
+ if source & comedi.TRIG_NOW : print "now|",
+ if source & comedi.TRIG_FOLLOW : print "follow|",
+ if source & comedi.TRIG_TIME : print "time|",
+ if source & comedi.TRIG_TIMER : print "timer|",
+ if source & comedi.TRIG_COUNT : print "count|",
+ if source & comedi.TRIG_EXT : print "ext|",
+ if source & comedi.TRIG_INT : print "int|",
+ if source & comedi.TRIG_OTHER : print "other|",
+
+def _print_command(cmd) :
+ print "subdevice: \t%d" % cmd.subdev
+ print "flags: \t0x%x" % cmd.flags
+ print "start: \t",
+ _print_cmdsrc(cmd.start_src)
+ print "\t%d" % cmd.start_arg
+ print "scan_begin:\t",
+ _print_cmdsrc(cmd.scan_begin_src)
+ print "\t%d" % cmd.scan_begin_arg
+ print "convert: \t",
+ _print_cmdsrc(cmd.convert_src)
+ print "\t%d" % cmd.convert_arg
+ print "scan_end: \t",
+ _print_cmdsrc(cmd.scan_end_src)
+ print "\t%d" % cmd.scan_end_arg
+ print "stop: \t",
+ _print_cmdsrc(cmd.stop_src)
+ print "\t%d" % cmd.stop_arg
+
+def _expand_tuple(tup, length) :
+ "Expand an iterable TUP to a tuple of LENGTH by repeating the last element"
+ if len(tup) > length :
+ raise simAioError, "Tuple too long."
+ elif len(tup) < length :
+ temp_tup = tup + tuple((tup[-1],)*(length-len(tup)))
+ tup = temp_tup
+ return tup
+
+class aio_obj :
+ def __init__(self, filename="/dev/comedi0",
+ in_subdevice=-1, in_chan=(0,), in_aref=(0,), in_range=(0,),
+ out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,)) :
+ self._comedi = comedi
+ self._filename = filename
+ self.state = "Closed"
+ self.open()
+
+ self._iaref = _expand_tuple(in_aref, len(in_chan))
+ self._irange = _expand_tuple(in_range, len(in_chan))
+ temp = self._check_options(in_subdevice, in_chan, self._iaref, self._irange, output=False)
+ self._isubdev = temp["subdevice"]
+ self._ichan_params = temp["chan_params"]
+ self._ichan = in_chan
+ self.i_nchan = len(self._ichan)
+ self._ichanlist = self._comedi.chanlist(self.i_nchan)
+ for i in range(self.i_nchan) :
+ self._ichanlist[i] = self._comedi.cr_pack(self._ichan[i], self._irange[i], self._iaref[i])
+
+ self._oaref = _expand_tuple(out_aref, len(in_chan))
+ self._orange = _expand_tuple(out_range, len(in_chan))
+ temp = self._check_options(out_subdevice, out_chan, self._oaref, self._orange, output=True)
+ self._osubdev = temp["subdevice"]
+ self._ochan_params = temp["chan_params"]
+ self._ochan = out_chan
+ self.o_nchan = len(self._ochan)
+ self._ochanlist = self._comedi.chanlist(self.o_nchan)
+ for i in range(self.o_nchan) :
+ self._ochanlist[i] = self._comedi.cr_pack(self._ochan[i], self._orange[i], self._oaref[i])
+
+ self._gen_rough_output_cmd()
+ self._gen_rough_input_cmd()
+ self.state = "Initialized"
+ def __del__(self) :
+ self.close()
+ def close(self) :
+ if self.state != "Closed" :
+ self.reset(force=True)
+ rc = self._comedi.comedi_close(self._dev)
+ if rc < 0 :
+ self._comedi.comedi_perror("comedi_close")
+ raise simAioError, "Cannot close %s" % self._filename
+ if VERBOSE :
+ print "Closed %s on fd %d" % (self._filename, self._fd)
+ self.state = "Closed"
+ def open(self) :
+ if self.state != "Closed" :
+ raise simAioError, "Invalid state %s" % self.state
+ self._dev = self._comedi.comedi_open(self._filename)
+ self._fd = self._comedi.comedi_fileno(self._dev)
+ if VERBOSE :
+ print "Opened %s on fd %d" % (self._filename, self._fd)
+ self.state = "Initialized"
+ def _check_options(self, subdevice, chan, aref, rnge, output=True) :
+ subdevice = self._check_subdevice(subdevice, output=output)
+ chan_params = []
+ for i in range(len(chan)) :
+ chan_params.append(self._check_chan(subdevice, chan[i], aref[i], rnge[i]))
+ if VERBOSE :
+ if output :
+ print "Output",
+ else :
+ print "Input",
+ print " subdevice with channels %s is valid" % (str(chan))
+ return {"subdevice":subdevice,
+ "chan_params":chan_params}
+ def _check_subdevice(self, subdevice, output=True) :
+ if output == True :
+ target_type = self._comedi.COMEDI_SUBD_AO
+ else :
+ target_type = self._comedi.COMEDI_SUBD_AI
+ if (subdevice < 0) : # autodetect an input device
+ subdevice = self._comedi.comedi_find_subdevice_by_type(self._dev, target_type, 0) # 0 is starting subdevice
+ else :
+ type = self._comedi.comedi_get_subdevice_type(self._dev, subdevice)
+ if type != target_type :
+ raise simAioError, "Comedi subdevice %d has wrong type %d" % (subdevice, type)
+ return subdevice
+ def _check_chan(self, subdevice, chan, aref, range) :
+ subdev_n_chan = self._comedi.comedi_get_n_channels(self._dev, subdevice)
+ if chan >= subdev_n_chan :
+ raise simAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, subdevice, subdev_n_chan-1)
+ n_range = self._comedi.comedi_get_n_ranges(self._dev, subdevice, chan)
+ if range >= n_range :
+ raise simAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdevice, chan, n_range-1)
+ maxdata = self._comedi.comedi_get_maxdata(self._dev, subdevice, chan)
+ comrange = self._comedi.comedi_get_range(self._dev, subdevice, chan, range)
+ return {"maxdata":maxdata, "comrange": comrange}
+ def _gen_rough_output_cmd(self) :
+ if VERBOSE :
+ print "generate rough output command"
+ cmd = self._comedi.comedi_cmd_struct()
+ cmd.subdev = self._osubdev
+ cmd.flags = self._comedi.CMDF_WRITE
+ if AO_TRIGGERS_OFF_AI_START :
+ cmd.start_src = self._comedi.TRIG_EXT
+ cmd.start_arg = 18 # AI_START1 internal AI start signal
+ else :
+ cmd.start_src = self._comedi.TRIG_INT
+ cmd.start_arg = 0
+ cmd.scan_begin_src = self._comedi.TRIG_TIMER
+ cmd.scan_begin_arg = 1 # temporary value for now
+ cmd.convert_src = self._comedi.TRIG_NOW
+ cmd.convert_arg = 0
+ cmd.scan_end_src = self._comedi.TRIG_COUNT
+ cmd.scan_end_arg = self.o_nchan
+ cmd.stop_src = self._comedi.TRIG_COUNT
+ cmd.stop_arg = 1 # temporary value for now
+ cmd.chanlist = self._ochanlist
+ cmd.chanlist_len = self.o_nchan
+ self._test_cmd(cmd, max_passes=3)
+ self._ocmd = cmd
+ def _gen_rough_input_cmd(self) :
+ if VERBOSE :
+ print "generate rough input command"
+ cmd = self._comedi.comedi_cmd_struct()
+ cmd.subdev = self._isubdev
+ cmd.flags = 0
+ cmd.start_src = self._comedi.TRIG_INT
+ cmd.start_arg = 0
+ cmd.scan_begin_src = self._comedi.TRIG_TIMER
+ cmd.scan_begin_arg = 1 # temporary value for now
+ cmd.convert_src = self._comedi.TRIG_TIMER
+ cmd.convert_arg = 1
+ cmd.scan_end_src = self._comedi.TRIG_COUNT
+ cmd.scan_end_arg = self.i_nchan
+ cmd.stop_src = self._comedi.TRIG_COUNT
+ cmd.stop_arg = 1 # temporary value for now
+ cmd.chanlist = self._ichanlist
+ cmd.chanlist_len = self.i_nchan
+ self._test_cmd(cmd, max_passes=3)
+ self._icmd = cmd
+ def _test_cmd(self, cmd, max_passes=1) :
+ very_verbose = False
+ i = 0
+ rc = 0
+ if very_verbose :
+ print "Testing command:"
+ _print_command(cmd)
+ while i < max_passes :
+ rc = self._comedi.comedi_command_test(self._dev, cmd)
+ if (rc == 0) :
+ break
+ if VERBOSE or very_verbose :
+ print "test pass %d, %s" % (i, _cmdtest_message[rc])
+ i += 1
+ if (VERBOSE or very_verbose) and i < max_passes :
+ print "Passing command:"
+ _print_command(cmd)
+ if i >= max_passes :
+ print "Failing command:"
+ _print_command(cmd)
+ raise simAioError, "Invalid command: %s" % _cmdtest_message[rc]
+ def setup(self, nsamps, freq, out_buffer) :
+ if self.state != "Initialized" :
+ raise simAioError, "Invalid state %s" % self.state
+ if type(out_buffer) != type(_example_array) :
+ raise simAioError, "out_buffer must be a numpy array, not a %s" % str(type(out_buffer))
+ self._ocmd.scan_begin_arg = int(1e9/freq)
+ self._ocmd.stop_arg = nsamps
+ if VERBOSE :
+ print "Configure the board (%d ns per scan, %d samps)" % (self._ocmd.scan_begin_arg, self._ocmd.stop_arg)
+ self._onremain = nsamps
+ self._test_cmd(self._ocmd)
+ rc = self._comedi.comedi_command(self._dev, self._ocmd)
+ if rc < 0 :
+ self._comedi.comedi_perror("comedi_command")
+ raise simAioError, "Error executing output command %d" % rc
+ self._icmd.scan_begin_arg = int(1e9/freq)
+ self._icmd.stop_arg = nsamps
+ self._test_cmd(self._icmd)
+ self._inremain = nsamps
+ rc = self._comedi.comedi_command(self._dev, self._icmd)
+ if rc < 0 :
+ self._comedi.comedi_perror("comedi_command")
+ raise simAioError, "Error executing input command"
+
+ if VERBOSE :
+ print "Write %d output samples to the card" % (nsamps*self.o_nchan)
+ rc = int16_rw.write_samples(self._fd, nsamps*self.o_nchan, out_buffer, 1)
+ if rc != nsamps*self.o_nchan :
+ raise simAioError, "Error %d writing output buffer\n" % rc
+ rc = int16_rw.write_samples(self._fd, self.o_nchan, out_buffer[-self.o_nchan:], 1) # HACK, add an extra sample for each channel to the output buffer
+ if rc != self.o_nchan :
+ raise simAioError, "Error %d writing hack output buffer\n" % rc
+ # maybe will avoid resetting...
+ self._nsamps = nsamps
+ self.state = "Setup"
+ def arm(self) :
+ if self.state != "Setup" :
+ raise simAioError, "Invalid state %s" % self.state
+ if VERBOSE :
+ print "Arm the analog ouptut"
+ self._comedi_internal_trigger(self._osubdev)
+ self.state = "Armed"
+ def start_read(self, in_buffer) :
+ if self.state != "Armed" :
+ raise simAioError, "Invalid state %s" % self.state
+ if VERBOSE :
+ print "Start the run"
+ self._comedi_internal_trigger(self._isubdev)
+ if VERBOSE :
+ print "Read %d input samples from the card" % (self._nsamps*self.i_nchan)
+ rc = int16_rw.read_samples(self._fd, self._nsamps*self.i_nchan, in_buffer, -1)
+ if rc != self._nsamps*self.i_nchan :
+ raise simAioError, "Error %d reading input buffer\n" % rc
+ self.state = "Read"
+ def _comedi_internal_trigger(self, subdevice) :
+ data = self._comedi.chanlist(1) # by luck, data is an array of lsampl_t (unsigned ints), as is chanlist
+ insn = self._comedi.comedi_insn_struct()
+ insn.insn = self._comedi.INSN_INTTRIG
+ insn.subdev = subdevice
+ insn.data = data
+ insn.n = 1
+ data[0] = 0
+ rc = self._comedi.comedi_do_insn(self._dev, insn)
+ def reset(self, force=False) :
+ if VERBOSE :
+ print "Reset the analog subdevices"
+ # clean up after the read
+ rc = self._comedi.comedi_cancel(self._dev, self._osubdev)
+ if rc < 0 :
+ self._comedi.comedi_perror("comedi_cancel")
+ raise simAioError, "Error cleaning up output command"
+ rc = self._comedi.comedi_cancel(self._dev, self._isubdev)
+ if rc < 0 :
+ self._comedi.comedi_perror("comedi_cancel")
+ raise simAioError, "Error cleaning up input command"
+ self.state = "Initialized"
+
+
+# define the test suite
+
+def _test_aio_obj(aio=None, start_wait=0, verbose=False) :
+ if (verbose) :
+ print "_test_aio_obj(start_wait = %g)" % start_wait
+ nsamps = 10
+ out_data = array([0]*nsamps, dtype=uint16)
+ in_data = array([0]*nsamps, dtype=uint16)
+ for i in range(nsamps) :
+ out_data[i] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
+ aio.setup(10, 1000, out_data)
+ aio.arm()
+ sleep(start_wait)
+ aio.start_read(in_data)
+ aio.reset()
+ if (verbose) :
+ print "out_data:\n", out_data
+ print "in_data:\n", in_data
+ print "residual:\n[",
+ for i, o in zip(in_data, out_data) :
+ print int(i)-int(o),
+ print "]"
+ return (out_data, in_data)
+
+def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=False) :
+ print "_repeat_aio_test()"
+ grads = array([0]*num_tests, dtype=float32)
+ good = 0
+ bad = 0
+ good_run = 0
+ good_run_arr = []
+ for i in range(num_tests) :
+ out_data, in_data = _test_aio_obj(aio, start_wait)
+ gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
+ grads[i] = gradient
+ if verbose :
+ print "wait %2d, run %2d, gradient %g" % (start_wait, i, gradient)
+ if gradient < .7 :
+ bad += 1
+ good_run_arr.append(good_run)
+ good_run = 0
+ else :
+ good += 1
+ good_run += 1
+ good_run_arr.append(good_run)
+ print "failure rate %g%% in %d runs" % ((float(bad)/float(good+bad))*100.0, num_tests)
+ call = 'echo "'
+ for num in good_run_arr :
+ call += "%d " % num
+ call += '" | stem_leaf 2'
+ print "good run stem and leaf:"
+ system(call)
+
+def _test_aio_obj_multi_chan(aio=None, start_wait=0, verbose=False) :
+ if (verbose) :
+ print "_test_aio_obj_multi_chan(start_wait = %g)" % start_wait
+ nsamps = 10
+ out_data = array([0]*nsamps*aio.o_nchan, dtype=uint16)
+ in_data = array([0]*nsamps*aio.i_nchan, dtype=uint16)
+ # set up interleaved data
+ for i in range(nsamps) :
+ out_data[i*aio.o_nchan] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
+ for j in range(1, aio.o_nchan) :
+ out_data[i*aio.o_nchan + j] = 0
+ aio.setup(10, 1000, out_data)
+ aio.arm()
+ sleep(start_wait)
+ aio.start_read(in_data)
+ aio.reset()
+ if (verbose) :
+ print "#",
+ for j in range(aio.o_nchan) :
+ print "%s\t" % aio._ochan[j],
+ for j in range(aio.i_nchan) :
+ print "%s\t" % aio._ichan[j],
+ print ""
+ for i in range(nsamps) :
+ for j in range(aio.o_nchan) :
+ print "%s\t" % out_data[i*aio.o_nchan+j],
+ for j in range(aio.i_nchan) :
+ print "%s\t" % in_data[i*aio.i_nchan+j],
+ print ""
+ return (out_data, in_data)
+
+
+
+def test() :
+ aio = aio_obj()
+ _test_aio_obj(aio, start_wait = 0, verbose=True)
+ _test_aio_obj(aio, start_wait = 0.5, verbose=True)
+ aio.close()
+ aio.open()
+ #_repeat_aio_test(aio, num_tests=100, start_wait=0, verbose=False)
+ aio.close()
+
+ aiom = aio_obj(in_chan=(0,1,2,3), out_chan=(0,1))
+ _test_aio_obj_multi_chan(aiom, start_wait = 0, verbose=True)
+
+if __name__ == "__main__" :
+ test()
--- /dev/null
+# Use Comedi drivers for single-shot analog input/output
+
+import comedi
+
+VERSION = 0.0
+VERBOSE_DEBUG = True
+
+class sngAioError (Exception) :
+ "Single point Analog IO error"
+ pass
+
+class ai_obj :
+ def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1,2,3), aref=0, range=0) :
+ self.verbose = False
+ self.comedi = comedi
+ self.filename = filename
+ self.state = "Closed"
+ self.open()
+ if (subdevice < 0) : # autodetect an output device
+ self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AI, 0) # 0 is starting subdevice
+ else :
+ self.subdev = subdevice
+ type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev)
+ if type != self.comedi.COMEDI_SUBD_AI :
+ raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
+ self.chan = chan
+ self.aref = aref
+ self.range = range
+ subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev)
+ self.maxdata = []
+ self.comedi_range = []
+ for chan in self.chan :
+ if int(chan) != chan :
+ raise sngAioError, "Channels must be integers, not %s" % str(chan)
+ if chan >= subdev_n_chan :
+ raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1)
+ n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan)
+ if range > n_range :
+ raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1)
+ maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan)
+ self.maxdata.append(maxdata)
+ comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range)
+ # comrange becomes invalid if device is closed, so make a copy...
+ comrange_copy = self.comedi.comedi_range()
+ comrange_copy.min = comrange.min
+ comrange_copy.max = comrange.max
+ comrange_copy.unit = comrange.unit
+ self.comedi_range.append(comrange_copy)
+ def __del__(self) :
+ self.close()
+ def open(self) :
+ if self.state == "Closed" :
+ self.dev = self.comedi.comedi_open(self.filename)
+ self.state = "Opened"
+ def close(self) :
+ if self.state != "Closed" :
+ rc = self.comedi.comedi_close(self.dev)
+ if rc < 0 :
+ self.comedi.comedi_perror("comedi_close")
+ raise sngAioError, "Cannot close %s" % self.filename
+ self.state = "Closed"
+ def comedi_to_phys(self, chan_index, comedi) :
+ phys = self.comedi.comedi_to_phys(comedi, self.comedi_range[chan_index], self.maxdata[chan_index])
+ if self.verbose :
+ print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
+ return phys
+ def phys_to_comedi(self, chan_index, phys) :
+ comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index])
+ if self.verbose :
+ print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
+ return comedi
+ def read_chan_index(self, chan_index) :
+ rc, data = self.comedi.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref);
+ if rc != 1 : # the number of samples read
+ raise sngAioError, "comedi_data_read returned %d" % rc
+ return data
+ def read(self) :
+ out = range(len(self.chan))
+ for i in range(len(self.chan)) :
+ out[i] = self.read_chan_index(i)
+ #print "Read %s, got %s" % (str(self.chan), str(out))
+ return out
+
+def _test_ai_obj() :
+ ai = ai_obj()
+ print "read ", ai.read()
+ print "read ", ai.read()
+ print "read ", ai.read()
+ print "read ", ai.read()
+ ai.close()
+ print "ai success"
+
+class ao_obj :
+ def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1), aref=0, range=0) :
+ self.verbose = False
+ self.comedi = comedi
+ self.filename = filename
+ self.state = "Closed"
+ self.open()
+ if (subdevice < 0) : # autodetect an output device
+ self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AO, 0) # 0 is starting subdevice
+ else :
+ self.subdev = subdevice
+ type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev)
+ if type != self.comedi.COMEDI_SUBD_AO :
+ raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
+ self.chan = chan
+ self.aref = aref
+ self.range = range
+ subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev)
+ self.maxdata = []
+ self.comedi_range = []
+ for chan in self.chan :
+ if chan >= subdev_n_chan :
+ raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1)
+ n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan)
+ if range > n_range :
+ raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1)
+ maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan)
+ self.maxdata.append(maxdata)
+ comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range)
+ # comrange becomes invalid if device is closed, so make a copy...
+ comrange_copy = self.comedi.comedi_range()
+ comrange_copy.min = comrange.min
+ comrange_copy.max = comrange.max
+ comrange_copy.unit = comrange.unit
+ self.comedi_range.append(comrange_copy)
+ def __del__(self) :
+ self.close()
+ def open(self) :
+ if self.state != "Closed" :
+ raise sngAioError, "Invalid state %s" % self.state
+ self.dev = self.comedi.comedi_open(self.filename)
+ self.state = "Opened"
+ def close(self) :
+ if self.state != "Closed" :
+ for i in range(len(self.chan)) :
+ self.write_chan_index(i, self.phys_to_comedi(i, 0))
+ rc = self.comedi.comedi_close(self.dev)
+ if rc < 0 :
+ self.comedi.comedi_perror("comedi_close")
+ raise sngAioError, "Cannot close %s" % self.filename
+ self.state = "Closed"
+ def comedi_to_phys(self, chan_index, comedi) :
+ phys = self.comedi.comedi_to_phys(int(comedi), self.comedi_range[chan_index], self.maxdata[chan_index])
+ if self.verbose :
+ print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
+ return phys
+ def phys_to_comedi(self, chan_index, phys) :
+ comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index])
+ if self.verbose :
+ print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
+ return comedi
+ def write_chan_index(self, chan_index, data) :
+ #print "set output on chan %d to %d" % (chan_index, data)
+ rc = self.comedi.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, int(data));
+ if rc != 1 : # the number of samples written
+ raise sngAioError, 'comedi_data_write returned %d' % rc
+ def write(self, data) :
+ if len(data) != len(self.chan) :
+ raise sngAioError, "data length %d != the number of channels (%d)" % (len(data), len(self.chan))
+ for i in range(len(self.chan)) :
+ self.write_chan_index(i, data[i])
+
+def _test_ao_obj() :
+ ao = ao_obj()
+ ao.write([0,0])
+ ao.write([3000,3000])
+ ao.write([0,0])
+ ao.close()
+ print "ao success"
+
+def _fit_with_residual(out_data, in_data) :
+ from scipy.stats import linregress
+ gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
+ print "y = %g + %g x" % (intercept, gradient)
+ print "r = ", r_value # correlation coefficient = covariance / (std_dev_x*std_dev_y)
+ print "p = ", p_value # probablility of measuring this ?slope? for non-correlated, normally-distruibuted data
+ print "err = ", std_err # root mean sqared error of best fit
+ if gradient < .7 or p_value > 0.05 :
+ raise sngAioError, "Out channel 0 != in channel 0"
+ residual = zeros((points,))
+ for i in range(points) :
+ pred_y = intercept + gradient * out_data[i]
+ residual[i] = in_data[i] - pred_y
+ return residual
+
+def plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1) :
+ try :
+ from pylab import plot, show, subplot
+ subplot(311)
+ plot(out_data0, in_data0, 'r.-', out_data1, in_data1, 'b.')
+ subplot(312)
+ plot(out_data0, residual0, 'r.', residual1, 'b.')
+ subplot(313)
+ plot(in_data0, 'r.', out_data1, 'b.')
+ show() # if interactive mode is off...
+ #raw_input("Press enter to continue") # otherwise, pause
+ except ImportError :
+ pass # ignore plot erros
+
+def _test_aio() :
+ from scipy.stats import linregress
+ from numpy import linspace, zeros
+ ao = ao_obj(chan=(0,1))
+ ai = ai_obj(chan=(0,1))
+ start = 0.1 * ao.maxdata[0]
+ stop = 0.9 * ao.maxdata[0]
+ points = 10
+ out_data0 = linspace(start, stop, points)
+ out_data1 = linspace(stop, start, points)
+ in_data0 = zeros((points,))
+ in_data1 = zeros((points,))
+ for i in range(points) :
+ ao.write([out_data0[i], out_data1[i]])
+ id = ai.read()
+ in_data0[i] = id[0]
+ in_data1[i] = id[1]
+ ai.close()
+ ao.close()
+ residual0 = _fit_with_residual(out_data0, in_data0)
+ residual1 = _fit_with_residual(out_data1, in_data1)
+ if VERBOSE_DEBUG :
+ plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1)
+ for i in range(points) :
+ if abs(residual0[i]) > 10 : # HACK, hardcoded maximum nonlinearity
+ raise Exception, "Input 0, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data0[i], in_data0[i], residual0[i])
+ if abs(residual1[i]) > 10 : # HACK, hardcoded maximum nonlinearity
+ raise Exception, "Input 1, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data1[i], in_data1[i], residual1[i])
+
+ print "_test_aio success"
+
+def test() :
+ _test_ai_obj()
+ _test_ao_obj()
+ _test_aio()
+
+if __name__ == "__main__" :
+ test()
--- /dev/null
+"""Use Comedi drivers for single-shot digital input/output
+
+Being single-shot implementations, read/writes will be software timed,
+so this module would not be a good choice if you need millisecond
+resolution. However, it does provide a simple and robust way to
+generate/aquire signals at 1 second and greater timescales.
+"""
+
+import comedi as c
+
+VERSION = 0.0
+
+class dioError (Exception) :
+ "Digital IO error"
+ pass
+
+class dio_obj :
+ def __init__(self, filename="/dev/comedi0", subdevice=2, chan=(0,1,2,3), aref=0, range=0, output=True) :
+ self.filename = filename
+ self.subdev = subdevice
+ self.chan = chan
+ self.aref = aref
+ self.range = range
+ self.output = output
+ self.dev = c.comedi_open(filename)
+ if self.dev < 0 :
+ raise dioError, "Cannot open %s" % self.filename
+ type = c.comedi_get_subdevice_type(self.dev, self.subdev)
+ if type != c.COMEDI_SUBD_DIO :
+ raise dioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
+ if self.output :
+ self.set_to_output()
+ else :
+ self.set_to_input()
+ def set_to_output(self) :
+ for chan in self.chan :
+ rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_OUTPUT)
+ if rc != 1 : # yes, comedi_dio_config returns 1 on success, -1 on failure, as of comedilib-0.8.1
+ raise dioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_OUTPUT, rc)
+ self.output = True
+ def set_to_input(self) :
+ for chan in self.chan :
+ rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_INPUT)
+ if rc != 1 :
+ raise dioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_INPUT, rc)
+ self.output = False
+ def write_chan_index(self, chan_index, data) :
+ if self.output != True :
+ raise dioError, "Must be an output to write"
+ rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, data);
+ if rc != 1 : # the number of samples written
+ raise dioError, "comedi_data_write returned %d" % rc
+ def read_chan_index(self, chan_index) :
+ if self.output == True :
+ raise dioError, "Must be an input to read"
+ rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref);
+ if rc != 1 : # the number of samples read
+ raise dioError, "comedi_data_read returned %d" % rc
+ return data
+ def write_port(self, data) :
+ "Channel significance increases with array index"
+ for i in range(len(self.chan)) :
+ self.write_chan_index(i, (data >> i) % 2)
+ def read_port(self) :
+ data = 0
+ for i in range(len(self.chan)) :
+ data += self.read_chan_index(i) << i
+ return data
+
+class write_dig_port (dio_obj) :
+ def __call__(self, data) :
+ self.write_port(data)
+
+def _test_dio_obj() :
+ d = dio_obj()
+ d.set_to_output()
+ d.write_chan_index(0, 1)
+ d.write_chan_index(0, 0)
+ d.write_port(7)
+ d.set_to_input()
+ data = d.read_chan_index(0)
+ print "channel %d is %d" % (d.chan[0], data)
+ data = d.read_port()
+ print "port value is %d" % data
+ print "dio_obj success"
+
+def _test_write_dig_port() :
+ p = write_dig_port()
+ for data in [0, 1, 2, 3, 4, 5, 6, 7] :
+ p(data)
+ print "write_dig_port success"
+
+def test() :
+ _test_dio_obj()
+ _test_write_dig_port()
+
+if __name__ == "__main__" :
+ test()
--- /dev/null
+#!/usr/bin/python
+
+from scipy.stats import linregress
+from scipy.io import read_array, write_array
+import sys
+
+if __name__ == "__main__" :
+ data = read_array(sys.argv[1]) #, atype='Integer' numpy.typecodes
+ gradient, intercept, r_value, p_value, std_err = linregress(data)
+ print "y = %g + %g x" % (intercept, gradient)
+ print "r = ", r_value # correlation coefficient = covariance / (std_dev_x*std_dev_y)
+ print "p = ", p_value # probablility of measuring this ?slope? for non-correlated, normally-distruibuted data
+ print "err = ", std_err # root mean sqared error of best fit
+
--- /dev/null
+#!/usr/bin/python
+#
+# With analog output 0 cabled directly to analog input 0
+# output a single period of a 1000kHz sine wave
+# and perform a linear regression between the output and input data.
+# If the input is closely correlated to the output (gradient > 0.7)
+# consider the synchronized input/output a success.
+# Prints the success rate for num_runs
+
+from scipy.stats import linregress
+from scipy.io import read_array, write_array
+from os import system
+from numpy import zeros
+
+# I had been checking at different waits between AO arming and AI triggering,
+# thinking that the problem might be due to my patch.
+# But I get failure rates of ~ 20% regardless of the wait time (-t option)
+# So currently only use waits of 0 seconds to get more data.
+
+def test() :
+ #waits = range(5, -1, -1)
+ waits = [0]
+
+ num_runs = 200
+ runs = range(0, num_runs)
+ results = zeros((1, num_runs))
+ good_run_arr = []
+
+ fails = 0
+ successes = 0
+ good_run = 0
+ for wait in waits :
+ for run in runs :
+ call = './simult_aio -n 50 -F 50000 -w 1000 -t%d' % wait
+ if system(call) != 0 :
+ return 1
+ if system('int16s_to_ascii_array out in > data') != 0 :
+ return 1
+ data = read_array('data')
+ gradient, intercept, r_value, p_value, std_err = linregress(data)
+ results[wait,run] = gradient
+ print "wait %2d, run %2d, gradient %g" % (wait, run, gradient)
+ if gradient < .7 :
+ fails += 1
+ good_run_arr.append(good_run)
+ good_run = 0
+ else :
+ successes += 1
+ good_run += 1
+ print "failure rate ", (float(fails)/float(fails+successes))
+ call = 'echo "'
+ for num in good_run_arr :
+ call += "%d " % num
+ call += '" | stem_leaf 2'
+ print "good runs:"
+ system(call)
+
+if __name__ == "__main__" :
+ test()
--- /dev/null
+#A test-application to demonstrate using the Comedilib API
+# and streaming acquisition in particular, from Python.
+#This script is completely untested!
+# author bryan.cole@teraview.co.uk
+
+#set the paths so python can find the comedi module
+import sys, os, string
+sys.path.append('./build/lib.linux-i686-2.2')
+
+import comedi as c
+
+#open a comedi device
+dev=c.comedi_open('/dev/comedi0')
+if not dev: raise Exception, "Error openning Comedi device"
+
+#get a file-descriptor for use later
+fd = c.comedi_fileno(dev)
+
+nscans=100 #specify total number of scans
+#1000
+#three lists containing the chans, gains and referencing
+#the lists must all have the same length
+chans=[0,2,3]
+gains=[1,1,1]
+aref =[c.AREF_GROUND, c.AREF_GROUND, c.AREF_GROUND]
+
+nchans = len(chans) #number of channels
+
+#wrappers include a "chanlist" object (just an Unsigned Int array) for holding the chanlist information
+mylist = c.chanlist(nchans) #create a chanlist of length nchans
+
+#now pack the channel, gain and reference information into the chanlist object
+#N.B. the CR_PACK and other comedi macros are now python functions
+for index in range(nchans):
+ mylist[index]=c.cr_pack(chans[index], gains[index], aref[index])
+
+#construct a comedi command manually
+cmd = c.comedi_cmd_struct()
+cmd.subdev = 0
+cmd.flags = 0
+cmd.start_src = c.TRIG_NOW
+cmd.sart_arg = 0
+cmd.scan_begin_src = c.TRIG_TIMER
+cmd.scan_begin_arg = int(1.0e9/100000)
+cmd.convert_src = c.TRIG_TIMER
+cmd.convert_arg = 1
+cmd.scan_end_src = c.TRIG_COUNT
+cmd.scan_end_arg = nchans
+cmd.stop_src = c.TRIG_COUNT
+cmd.stop_arg = nscans
+cmd.chanlist = mylist
+cmd.chanlist_len = nchans
+
+#test our comedi command a few times.
+ret = c.comedi_command_test(dev,cmd)
+print "first cmd test returns ", ret
+ret = c.comedi_command_test(dev,cmd)
+print "second test returns ", ret
+ret = c.comedi_command_test(dev,cmd)
+if ret: raise Exception, "Error %d testing comedi command" % ret
+
+#execute the command!
+ret = c.comedi_command(dev,cmd)
+
+#I think this will work but it's completely untested!
+datalist=[]
+buffersize=1000 # max bytes to acquire per read attempt
+data=os.read(fd, buffersize)
+while len(data)>0:
+ print "read %d bytes" % len(data)
+ datalist.append(data)
+ data=os.read(fd, buffersize)
+if len(datalist) == 0 :
+ raise Exeption, "Didn't get any data..."
+
+ret = c.comedi_close(dev)
+
+print datalist
+datastr = string.join(datalist,'')
+
+print datastr #heres your data, as a single string!
+
+#if you've got Numeric installed you can convert data into a flat Numpy array
+# with:
+# dataarray = Numeric.fromstring(data, Int16)
+try :
+ import numpy
+ dataarray = numpy.fromstring(data, dtype=numpy.int16)
+ print "size: ", dataarray.size
+ print dataarray
+except ImportError :
+ print "Install numpy to read the data into Python"
--- /dev/null
+#!/usr/bin/python
+
+from scipy.io import read_array, write_array
+import sys
+
+data = read_array(sys.argv[1]) #, atype='Integer' numpy.typecodes
+
+start_vals = data[0]
+for point in data :
+ x = point-start_vals
+ for val in x :
+ print val, "\t",
+ print ""
+