From 9af0a440ec5c1a95da3254a289454dbb55fa0ed1 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Sat, 4 Oct 2008 18:35:29 -0400 Subject: [PATCH] No changes? --- pycomedi/comedi_simult_aio.py | 400 ---------------------------------- pycomedi/comedi_single_aio.py | 239 -------------------- pycomedi/comedi_single_dio.py | 98 --------- 3 files changed, 737 deletions(-) delete mode 100644 pycomedi/comedi_simult_aio.py delete mode 100644 pycomedi/comedi_single_aio.py delete mode 100644 pycomedi/comedi_single_dio.py diff --git a/pycomedi/comedi_simult_aio.py b/pycomedi/comedi_simult_aio.py deleted file mode 100644 index 303b72b..0000000 --- a/pycomedi/comedi_simult_aio.py +++ /dev/null @@ -1,400 +0,0 @@ -# Simultaneous, finite, buffered analog inpout/output using comedi drivers - -import comedi -from numpy import array, fromstring, uint16, float32, pi, sin -import int16_rw - -# imports for testing -from time import sleep -from scipy.stats import linregress -from os import system - -#VERBOSE = True -VERBOSE = False -AO_TRIGGERS_OFF_AI_START = True -#AO_TRIGGERS_OFF_AI_START = False - -class simAioError (Exception) : - "Simultaneous Analog IO error" - pass - -_example_array = array([0], dtype=uint16) # for typing, since I don't know what type(array) should be - -_cmdtest_message = ["success", - "invalid source", - "source conflict", - "invalid argument", - "argument conflict", - "invalid chanlist"] - -def _print_cmdsrc(source) : - if source & comedi.TRIG_NONE : print "none|", - if source & comedi.TRIG_NOW : print "now|", - if source & comedi.TRIG_FOLLOW : print "follow|", - if source & comedi.TRIG_TIME : print "time|", - if source & comedi.TRIG_TIMER : print "timer|", - if source & comedi.TRIG_COUNT : print "count|", - if source & comedi.TRIG_EXT : print "ext|", - if source & comedi.TRIG_INT : print "int|", - if source & comedi.TRIG_OTHER : print "other|", - -def _print_command(cmd) : - print "subdevice: \t%d" % cmd.subdev - print "flags: \t0x%x" % cmd.flags - print "start: \t", - _print_cmdsrc(cmd.start_src) - print "\t%d" % cmd.start_arg - print "scan_begin:\t", - _print_cmdsrc(cmd.scan_begin_src) - print "\t%d" % cmd.scan_begin_arg - print "convert: \t", - _print_cmdsrc(cmd.convert_src) - print "\t%d" % cmd.convert_arg - print "scan_end: \t", - _print_cmdsrc(cmd.scan_end_src) - print "\t%d" % cmd.scan_end_arg - print "stop: \t", - _print_cmdsrc(cmd.stop_src) - print "\t%d" % cmd.stop_arg - -def _expand_tuple(tup, length) : - "Expand an iterable TUP to a tuple of LENGTH by repeating the last element" - if len(tup) > length : - raise simAioError, "Tuple too long." - elif len(tup) < length : - temp_tup = tup + tuple((tup[-1],)*(length-len(tup))) - tup = temp_tup - return tup - -class aio_obj : - def __init__(self, filename="/dev/comedi0", - in_subdevice=-1, in_chan=(0,), in_aref=(0,), in_range=(0,), - out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,)) : - self._comedi = comedi - self._filename = filename - self.state = "Closed" - self.open() - - self._iaref = _expand_tuple(in_aref, len(in_chan)) - self._irange = _expand_tuple(in_range, len(in_chan)) - temp = self._check_options(in_subdevice, in_chan, self._iaref, self._irange, output=False) - self._isubdev = temp["subdevice"] - self._ichan_params = temp["chan_params"] - self._ichan = in_chan - self.i_nchan = len(self._ichan) - self._ichanlist = self._comedi.chanlist(self.i_nchan) - for i in range(self.i_nchan) : - self._ichanlist[i] = self._comedi.cr_pack(self._ichan[i], self._irange[i], self._iaref[i]) - - self._oaref = _expand_tuple(out_aref, len(in_chan)) - self._orange = _expand_tuple(out_range, len(in_chan)) - temp = self._check_options(out_subdevice, out_chan, self._oaref, self._orange, output=True) - self._osubdev = temp["subdevice"] - self._ochan_params = temp["chan_params"] - self._ochan = out_chan - self.o_nchan = len(self._ochan) - self._ochanlist = self._comedi.chanlist(self.o_nchan) - for i in range(self.o_nchan) : - self._ochanlist[i] = self._comedi.cr_pack(self._ochan[i], self._orange[i], self._oaref[i]) - - self._gen_rough_output_cmd() - self._gen_rough_input_cmd() - self.state = "Initialized" - def __del__(self) : - self.close() - def close(self) : - if self.state != "Closed" : - self.reset(force=True) - rc = self._comedi.comedi_close(self._dev) - if rc < 0 : - self._comedi.comedi_perror("comedi_close") - raise simAioError, "Cannot close %s" % self._filename - if VERBOSE : - print "Closed %s on fd %d" % (self._filename, self._fd) - self.state = "Closed" - def open(self) : - if self.state != "Closed" : - raise simAioError, "Invalid state %s" % self.state - self._dev = self._comedi.comedi_open(self._filename) - self._fd = self._comedi.comedi_fileno(self._dev) - if VERBOSE : - print "Opened %s on fd %d" % (self._filename, self._fd) - self.state = "Initialized" - def _check_options(self, subdevice, chan, aref, rnge, output=True) : - subdevice = self._check_subdevice(subdevice, output=output) - chan_params = [] - for i in range(len(chan)) : - chan_params.append(self._check_chan(subdevice, chan[i], aref[i], rnge[i])) - if VERBOSE : - if output : - print "Output", - else : - print "Input", - print " subdevice with channels %s is valid" % (str(chan)) - return {"subdevice":subdevice, - "chan_params":chan_params} - def _check_subdevice(self, subdevice, output=True) : - if output == True : - target_type = self._comedi.COMEDI_SUBD_AO - else : - target_type = self._comedi.COMEDI_SUBD_AI - if (subdevice < 0) : # autodetect an input device - subdevice = self._comedi.comedi_find_subdevice_by_type(self._dev, target_type, 0) # 0 is starting subdevice - else : - type = self._comedi.comedi_get_subdevice_type(self._dev, subdevice) - if type != target_type : - raise simAioError, "Comedi subdevice %d has wrong type %d" % (subdevice, type) - return subdevice - def _check_chan(self, subdevice, chan, aref, range) : - subdev_n_chan = self._comedi.comedi_get_n_channels(self._dev, subdevice) - if chan >= subdev_n_chan : - raise simAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, subdevice, subdev_n_chan-1) - n_range = self._comedi.comedi_get_n_ranges(self._dev, subdevice, chan) - if range >= n_range : - raise simAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdevice, chan, n_range-1) - maxdata = self._comedi.comedi_get_maxdata(self._dev, subdevice, chan) - comrange = self._comedi.comedi_get_range(self._dev, subdevice, chan, range) - return {"maxdata":maxdata, "comrange": comrange} - def _gen_rough_output_cmd(self) : - if VERBOSE : - print "generate rough output command" - cmd = self._comedi.comedi_cmd_struct() - cmd.subdev = self._osubdev - cmd.flags = self._comedi.CMDF_WRITE - if AO_TRIGGERS_OFF_AI_START : - cmd.start_src = self._comedi.TRIG_EXT - cmd.start_arg = 18 # AI_START1 internal AI start signal - else : - cmd.start_src = self._comedi.TRIG_INT - cmd.start_arg = 0 - cmd.scan_begin_src = self._comedi.TRIG_TIMER - cmd.scan_begin_arg = 1 # temporary value for now - cmd.convert_src = self._comedi.TRIG_NOW - cmd.convert_arg = 0 - cmd.scan_end_src = self._comedi.TRIG_COUNT - cmd.scan_end_arg = self.o_nchan - cmd.stop_src = self._comedi.TRIG_COUNT - cmd.stop_arg = 1 # temporary value for now - cmd.chanlist = self._ochanlist - cmd.chanlist_len = self.o_nchan - self._test_cmd(cmd, max_passes=3) - self._ocmd = cmd - def _gen_rough_input_cmd(self) : - if VERBOSE : - print "generate rough input command" - cmd = self._comedi.comedi_cmd_struct() - cmd.subdev = self._isubdev - cmd.flags = 0 - cmd.start_src = self._comedi.TRIG_INT - cmd.start_arg = 0 - cmd.scan_begin_src = self._comedi.TRIG_TIMER - cmd.scan_begin_arg = 1 # temporary value for now - cmd.convert_src = self._comedi.TRIG_TIMER - cmd.convert_arg = 1 - cmd.scan_end_src = self._comedi.TRIG_COUNT - cmd.scan_end_arg = self.i_nchan - cmd.stop_src = self._comedi.TRIG_COUNT - cmd.stop_arg = 1 # temporary value for now - cmd.chanlist = self._ichanlist - cmd.chanlist_len = self.i_nchan - self._test_cmd(cmd, max_passes=3) - self._icmd = cmd - def _test_cmd(self, cmd, max_passes=1) : - very_verbose = False - i = 0 - rc = 0 - if very_verbose : - print "Testing command:" - _print_command(cmd) - while i < max_passes : - rc = self._comedi.comedi_command_test(self._dev, cmd) - if (rc == 0) : - break - if VERBOSE or very_verbose : - print "test pass %d, %s" % (i, _cmdtest_message[rc]) - i += 1 - if (VERBOSE or very_verbose) and i < max_passes : - print "Passing command:" - _print_command(cmd) - if i >= max_passes : - print "Failing command:" - _print_command(cmd) - raise simAioError, "Invalid command: %s" % _cmdtest_message[rc] - def setup(self, nsamps, freq, out_buffer) : - if self.state != "Initialized" : - raise simAioError, "Invalid state %s" % self.state - if type(out_buffer) != type(_example_array) : - raise simAioError, "out_buffer must be a numpy array, not a %s" % str(type(out_buffer)) - self._ocmd.scan_begin_arg = int(1e9/freq) - self._ocmd.stop_arg = nsamps - if VERBOSE : - print "Configure the board (%d ns per scan, %d samps)" % (self._ocmd.scan_begin_arg, self._ocmd.stop_arg) - self._onremain = nsamps - self._test_cmd(self._ocmd) - rc = self._comedi.comedi_command(self._dev, self._ocmd) - if rc < 0 : - self._comedi.comedi_perror("comedi_command") - raise simAioError, "Error executing output command %d" % rc - self._icmd.scan_begin_arg = int(1e9/freq) - self._icmd.stop_arg = nsamps - self._test_cmd(self._icmd) - self._inremain = nsamps - rc = self._comedi.comedi_command(self._dev, self._icmd) - if rc < 0 : - self._comedi.comedi_perror("comedi_command") - raise simAioError, "Error executing input command" - - if VERBOSE : - print "Write %d output samples to the card" % (nsamps*self.o_nchan) - rc = int16_rw.write_samples(self._fd, nsamps*self.o_nchan, out_buffer, 1) - if rc != nsamps*self.o_nchan : - raise simAioError, "Error %d writing output buffer\n" % rc - rc = int16_rw.write_samples(self._fd, self.o_nchan, out_buffer[-self.o_nchan:], 1) # HACK, add an extra sample for each channel to the output buffer - if rc != self.o_nchan : - raise simAioError, "Error %d writing hack output buffer\n" % rc - # maybe will avoid resetting... - self._nsamps = nsamps - self.state = "Setup" - def arm(self) : - if self.state != "Setup" : - raise simAioError, "Invalid state %s" % self.state - if VERBOSE : - print "Arm the analog ouptut" - self._comedi_internal_trigger(self._osubdev) - self.state = "Armed" - def start_read(self, in_buffer) : - if self.state != "Armed" : - raise simAioError, "Invalid state %s" % self.state - if VERBOSE : - print "Start the run" - self._comedi_internal_trigger(self._isubdev) - if VERBOSE : - print "Read %d input samples from the card" % (self._nsamps*self.i_nchan) - rc = int16_rw.read_samples(self._fd, self._nsamps*self.i_nchan, in_buffer, -1) - if rc != self._nsamps*self.i_nchan : - raise simAioError, "Error %d reading input buffer\n" % rc - self.state = "Read" - def _comedi_internal_trigger(self, subdevice) : - data = self._comedi.chanlist(1) # by luck, data is an array of lsampl_t (unsigned ints), as is chanlist - insn = self._comedi.comedi_insn_struct() - insn.insn = self._comedi.INSN_INTTRIG - insn.subdev = subdevice - insn.data = data - insn.n = 1 - data[0] = 0 - rc = self._comedi.comedi_do_insn(self._dev, insn) - def reset(self, force=False) : - if VERBOSE : - print "Reset the analog subdevices" - # clean up after the read - rc = self._comedi.comedi_cancel(self._dev, self._osubdev) - if rc < 0 : - self._comedi.comedi_perror("comedi_cancel") - raise simAioError, "Error cleaning up output command" - rc = self._comedi.comedi_cancel(self._dev, self._isubdev) - if rc < 0 : - self._comedi.comedi_perror("comedi_cancel") - raise simAioError, "Error cleaning up input command" - self.state = "Initialized" - - -# define the test suite - -def _test_aio_obj(aio=None, start_wait=0, verbose=False) : - if (verbose) : - print "_test_aio_obj(start_wait = %g)" % start_wait - nsamps = 10 - out_data = array([0]*nsamps, dtype=uint16) - in_data = array([0]*nsamps, dtype=uint16) - for i in range(nsamps) : - out_data[i] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps))) - aio.setup(10, 1000, out_data) - aio.arm() - sleep(start_wait) - aio.start_read(in_data) - aio.reset() - if (verbose) : - print "out_data:\n", out_data - print "in_data:\n", in_data - print "residual:\n[", - for i, o in zip(in_data, out_data) : - print int(i)-int(o), - print "]" - return (out_data, in_data) - -def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=False) : - print "_repeat_aio_test()" - grads = array([0]*num_tests, dtype=float32) - good = 0 - bad = 0 - good_run = 0 - good_run_arr = [] - for i in range(num_tests) : - out_data, in_data = _test_aio_obj(aio, start_wait) - gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data) - grads[i] = gradient - if verbose : - print "wait %2d, run %2d, gradient %g" % (start_wait, i, gradient) - if gradient < .7 : - bad += 1 - good_run_arr.append(good_run) - good_run = 0 - else : - good += 1 - good_run += 1 - good_run_arr.append(good_run) - print "failure rate %g%% in %d runs" % ((float(bad)/float(good+bad))*100.0, num_tests) - call = 'echo "' - for num in good_run_arr : - call += "%d " % num - call += '" | stem_leaf 2' - print "good run stem and leaf:" - system(call) - -def _test_aio_obj_multi_chan(aio=None, start_wait=0, verbose=False) : - if (verbose) : - print "_test_aio_obj_multi_chan(start_wait = %g)" % start_wait - nsamps = 10 - out_data = array([0]*nsamps*aio.o_nchan, dtype=uint16) - in_data = array([0]*nsamps*aio.i_nchan, dtype=uint16) - # set up interleaved data - for i in range(nsamps) : - out_data[i*aio.o_nchan] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps))) - for j in range(1, aio.o_nchan) : - out_data[i*aio.o_nchan + j] = 0 - aio.setup(10, 1000, out_data) - aio.arm() - sleep(start_wait) - aio.start_read(in_data) - aio.reset() - if (verbose) : - print "#", - for j in range(aio.o_nchan) : - print "%s\t" % aio._ochan[j], - for j in range(aio.i_nchan) : - print "%s\t" % aio._ichan[j], - print "" - for i in range(nsamps) : - for j in range(aio.o_nchan) : - print "%s\t" % out_data[i*aio.o_nchan+j], - for j in range(aio.i_nchan) : - print "%s\t" % in_data[i*aio.i_nchan+j], - print "" - return (out_data, in_data) - - - -def test() : - aio = aio_obj() - _test_aio_obj(aio, start_wait = 0, verbose=True) - _test_aio_obj(aio, start_wait = 0.5, verbose=True) - aio.close() - aio.open() - #_repeat_aio_test(aio, num_tests=100, start_wait=0, verbose=False) - aio.close() - - aiom = aio_obj(in_chan=(0,1,2,3), out_chan=(0,1)) - _test_aio_obj_multi_chan(aiom, start_wait = 0, verbose=True) - -if __name__ == "__main__" : - test() diff --git a/pycomedi/comedi_single_aio.py b/pycomedi/comedi_single_aio.py deleted file mode 100644 index 0b270b8..0000000 --- a/pycomedi/comedi_single_aio.py +++ /dev/null @@ -1,239 +0,0 @@ -# Use Comedi drivers for single-shot analog input/output - -import comedi - -VERSION = 0.0 -VERBOSE_DEBUG = True - -class sngAioError (Exception) : - "Single point Analog IO error" - pass - -class ai_obj : - def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1,2,3), aref=0, range=0) : - self.verbose = False - self.comedi = comedi - self.filename = filename - self.state = "Closed" - self.open() - if (subdevice < 0) : # autodetect an output device - self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AI, 0) # 0 is starting subdevice - else : - self.subdev = subdevice - type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev) - if type != self.comedi.COMEDI_SUBD_AI : - raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type) - self.chan = chan - self.aref = aref - self.range = range - subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev) - self.maxdata = [] - self.comedi_range = [] - for chan in self.chan : - if int(chan) != chan : - raise sngAioError, "Channels must be integers, not %s" % str(chan) - if chan >= subdev_n_chan : - raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1) - n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan) - if range > n_range : - raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1) - maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan) - self.maxdata.append(maxdata) - comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range) - # comrange becomes invalid if device is closed, so make a copy... - comrange_copy = self.comedi.comedi_range() - comrange_copy.min = comrange.min - comrange_copy.max = comrange.max - comrange_copy.unit = comrange.unit - self.comedi_range.append(comrange_copy) - def __del__(self) : - self.close() - def open(self) : - if self.state == "Closed" : - self.dev = self.comedi.comedi_open(self.filename) - self.state = "Opened" - def close(self) : - if self.state != "Closed" : - rc = self.comedi.comedi_close(self.dev) - if rc < 0 : - self.comedi.comedi_perror("comedi_close") - raise sngAioError, "Cannot close %s" % self.filename - self.state = "Closed" - def comedi_to_phys(self, chan_index, comedi) : - phys = self.comedi.comedi_to_phys(comedi, self.comedi_range[chan_index], self.maxdata[chan_index]) - if self.verbose : - print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index]) - return phys - def phys_to_comedi(self, chan_index, phys) : - comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index]) - if self.verbose : - print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index]) - return comedi - def read_chan_index(self, chan_index) : - rc, data = self.comedi.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref); - if rc != 1 : # the number of samples read - raise sngAioError, "comedi_data_read returned %d" % rc - return data - def read(self) : - out = range(len(self.chan)) - for i in range(len(self.chan)) : - out[i] = self.read_chan_index(i) - #print "Read %s, got %s" % (str(self.chan), str(out)) - return out - -def _test_ai_obj() : - ai = ai_obj() - print "read ", ai.read() - print "read ", ai.read() - print "read ", ai.read() - print "read ", ai.read() - ai.close() - print "ai success" - -class ao_obj : - def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1), aref=0, range=0) : - self.verbose = False - self.comedi = comedi - self.filename = filename - self.state = "Closed" - self.open() - if (subdevice < 0) : # autodetect an output device - self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AO, 0) # 0 is starting subdevice - else : - self.subdev = subdevice - type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev) - if type != self.comedi.COMEDI_SUBD_AO : - raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type) - self.chan = chan - self.aref = aref - self.range = range - subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev) - self.maxdata = [] - self.comedi_range = [] - for chan in self.chan : - if chan >= subdev_n_chan : - raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1) - n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan) - if range > n_range : - raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1) - maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan) - self.maxdata.append(maxdata) - comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range) - # comrange becomes invalid if device is closed, so make a copy... - comrange_copy = self.comedi.comedi_range() - comrange_copy.min = comrange.min - comrange_copy.max = comrange.max - comrange_copy.unit = comrange.unit - self.comedi_range.append(comrange_copy) - def __del__(self) : - self.close() - def open(self) : - if self.state != "Closed" : - raise sngAioError, "Invalid state %s" % self.state - self.dev = self.comedi.comedi_open(self.filename) - self.state = "Opened" - def close(self) : - if self.state != "Closed" : - for i in range(len(self.chan)) : - self.write_chan_index(i, self.phys_to_comedi(i, 0)) - rc = self.comedi.comedi_close(self.dev) - if rc < 0 : - self.comedi.comedi_perror("comedi_close") - raise sngAioError, "Cannot close %s" % self.filename - self.state = "Closed" - def comedi_to_phys(self, chan_index, comedi) : - phys = self.comedi.comedi_to_phys(int(comedi), self.comedi_range[chan_index], self.maxdata[chan_index]) - if self.verbose : - print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index]) - return phys - def phys_to_comedi(self, chan_index, phys) : - comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index]) - if self.verbose : - print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index]) - return comedi - def write_chan_index(self, chan_index, data) : - #print "set output on chan %d to %d" % (chan_index, data) - rc = self.comedi.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, int(data)); - if rc != 1 : # the number of samples written - raise sngAioError, 'comedi_data_write returned %d' % rc - def write(self, data) : - if len(data) != len(self.chan) : - raise sngAioError, "data length %d != the number of channels (%d)" % (len(data), len(self.chan)) - for i in range(len(self.chan)) : - self.write_chan_index(i, data[i]) - -def _test_ao_obj() : - ao = ao_obj() - ao.write([0,0]) - ao.write([3000,3000]) - ao.write([0,0]) - ao.close() - print "ao success" - -def _fit_with_residual(out_data, in_data) : - from scipy.stats import linregress - gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data) - print "y = %g + %g x" % (intercept, gradient) - print "r = ", r_value # correlation coefficient = covariance / (std_dev_x*std_dev_y) - print "p = ", p_value # probablility of measuring this ?slope? for non-correlated, normally-distruibuted data - print "err = ", std_err # root mean sqared error of best fit - if gradient < .7 or p_value > 0.05 : - raise sngAioError, "Out channel 0 != in channel 0" - residual = zeros((points,)) - for i in range(points) : - pred_y = intercept + gradient * out_data[i] - residual[i] = in_data[i] - pred_y - return residual - -def plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1) : - try : - from pylab import plot, show, subplot - subplot(311) - plot(out_data0, in_data0, 'r.-', out_data1, in_data1, 'b.') - subplot(312) - plot(out_data0, residual0, 'r.', residual1, 'b.') - subplot(313) - plot(in_data0, 'r.', out_data1, 'b.') - show() # if interactive mode is off... - #raw_input("Press enter to continue") # otherwise, pause - except ImportError : - pass # ignore plot erros - -def _test_aio() : - from scipy.stats import linregress - from numpy import linspace, zeros - ao = ao_obj(chan=(0,1)) - ai = ai_obj(chan=(0,1)) - start = 0.1 * ao.maxdata[0] - stop = 0.9 * ao.maxdata[0] - points = 10 - out_data0 = linspace(start, stop, points) - out_data1 = linspace(stop, start, points) - in_data0 = zeros((points,)) - in_data1 = zeros((points,)) - for i in range(points) : - ao.write([out_data0[i], out_data1[i]]) - id = ai.read() - in_data0[i] = id[0] - in_data1[i] = id[1] - ai.close() - ao.close() - residual0 = _fit_with_residual(out_data0, in_data0) - residual1 = _fit_with_residual(out_data1, in_data1) - if VERBOSE_DEBUG : - plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1) - for i in range(points) : - if abs(residual0[i]) > 10 : # HACK, hardcoded maximum nonlinearity - raise Exception, "Input 0, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data0[i], in_data0[i], residual0[i]) - if abs(residual1[i]) > 10 : # HACK, hardcoded maximum nonlinearity - raise Exception, "Input 1, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data1[i], in_data1[i], residual1[i]) - - print "_test_aio success" - -def test() : - _test_ai_obj() - _test_ao_obj() - _test_aio() - -if __name__ == "__main__" : - test() diff --git a/pycomedi/comedi_single_dio.py b/pycomedi/comedi_single_dio.py deleted file mode 100644 index e3fd9c0..0000000 --- a/pycomedi/comedi_single_dio.py +++ /dev/null @@ -1,98 +0,0 @@ -"""Use Comedi drivers for single-shot digital input/output - -Being single-shot implementations, read/writes will be software timed, -so this module would not be a good choice if you need millisecond -resolution. However, it does provide a simple and robust way to -generate/aquire signals at 1 second and greater timescales. -""" - -import comedi as c - -VERSION = 0.0 - -class dioError (Exception) : - "Digital IO error" - pass - -class dio_obj : - def __init__(self, filename="/dev/comedi0", subdevice=2, chan=(0,1,2,3), aref=0, range=0, output=True) : - self.filename = filename - self.subdev = subdevice - self.chan = chan - self.aref = aref - self.range = range - self.output = output - self.dev = c.comedi_open(filename) - if self.dev < 0 : - raise dioError, "Cannot open %s" % self.filename - type = c.comedi_get_subdevice_type(self.dev, self.subdev) - if type != c.COMEDI_SUBD_DIO : - raise dioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type) - if self.output : - self.set_to_output() - else : - self.set_to_input() - def set_to_output(self) : - for chan in self.chan : - rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_OUTPUT) - if rc != 1 : # yes, comedi_dio_config returns 1 on success, -1 on failure, as of comedilib-0.8.1 - raise dioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_OUTPUT, rc) - self.output = True - def set_to_input(self) : - for chan in self.chan : - rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_INPUT) - if rc != 1 : - raise dioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_INPUT, rc) - self.output = False - def write_chan_index(self, chan_index, data) : - if self.output != True : - raise dioError, "Must be an output to write" - rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, data); - if rc != 1 : # the number of samples written - raise dioError, "comedi_data_write returned %d" % rc - def read_chan_index(self, chan_index) : - if self.output == True : - raise dioError, "Must be an input to read" - rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref); - if rc != 1 : # the number of samples read - raise dioError, "comedi_data_read returned %d" % rc - return data - def write_port(self, data) : - "Channel significance increases with array index" - for i in range(len(self.chan)) : - self.write_chan_index(i, (data >> i) % 2) - def read_port(self) : - data = 0 - for i in range(len(self.chan)) : - data += self.read_chan_index(i) << i - return data - -class write_dig_port (dio_obj) : - def __call__(self, data) : - self.write_port(data) - -def _test_dio_obj() : - d = dio_obj() - d.set_to_output() - d.write_chan_index(0, 1) - d.write_chan_index(0, 0) - d.write_port(7) - d.set_to_input() - data = d.read_chan_index(0) - print "channel %d is %d" % (d.chan[0], data) - data = d.read_port() - print "port value is %d" % data - print "dio_obj success" - -def _test_write_dig_port() : - p = write_dig_port() - for data in [0, 1, 2, 3, 4, 5, 6, 7] : - p(data) - print "write_dig_port success" - -def test() : - _test_dio_obj() - _test_write_dig_port() - -if __name__ == "__main__" : - test() -- 2.26.2