From 3554ea120ae300847210c5f79ff00870a637c887 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Thu, 23 Oct 2008 08:49:39 -0400 Subject: [PATCH] Fairly large rewrite for more object-oriented pycomedi. test.py should never have been versioned; it was simply an example of the test logic for simult_aio._repeat_aio_test(). --- pycomedi/common.py | 149 +++++++----- pycomedi/linreg.py | 14 -- pycomedi/simult_aio.py | 477 ++++++++++++++++++++------------------- pycomedi/single_aio.py | 63 +++--- pycomedi/single_dio.py | 22 +- pycomedi/test.py | 59 ----- pycomedi/zero_initial.py | 14 -- 7 files changed, 389 insertions(+), 409 deletions(-) delete mode 100755 pycomedi/linreg.py delete mode 100755 pycomedi/test.py delete mode 100755 pycomedi/zero_initial.py diff --git a/pycomedi/common.py b/pycomedi/common.py index e565e03..ca8a844 100644 --- a/pycomedi/common.py +++ b/pycomedi/common.py @@ -8,28 +8,43 @@ class pycomediError (Exception) : "Error in pycomedi.common" pass +def _expand_tuple(tup, length) : + "Expand an iterable TUP to a tuple of LENGTH by repeating the last element" + if len(tup) > length : + raise simAioError, "Tuple too long." + elif len(tup) < length : + temp_tup = tup + tuple((tup[-1],)*(length-len(tup))) + tup = temp_tup + return tup + class PyComediIO (object) : "Base class for Comedi IO operations" - def __init__(self, filename="/dev/comedi0", subdevice=-1, devtype=c.COMEDI_SUBD_AI, chan=(0,1,2,3), aref=c.AREF_GROUND, range=0, output=False) : + def __init__(self, filename="/dev/comedi0", subdevice=-1, devtype=c.COMEDI_SUBD_AI, chan=(0,1,2,3), aref=(c.AREF_GROUND,), range=(0,), output=False, dev=None) : """inputs: - filename: comedi device file for your device ("/dev/comedi0"). - subdevice: the analog output subdevice (-1 for autodetect) - devtype: the devoce type (c.COMEDI_SUBD_AI) + filename: comedi device file for your device ["/dev/comedi0"]. + subdevice: the IO subdevice [-1 for autodetect] + devtype: the device type [c.COMEDI_SUBD_AI] values include comedi.COMEDI_SUBD_DI comedi.COMEDI_SUBD_DO comedi.COMEDI_SUBD_DIO comedi.COMEDI_SUBD_AI comedi.COMEDI_SUBD_AO - chan: an iterable of the channels you wish to control ((0,1,2,3)) - aref: the analog reference for these channels (comedi.AREF_GROUND) + chan: an iterable of the channels you wish to control [(0,1,2,3)] + aref: the analog references for these channels [(comedi.AREF_GROUND,)] values include comedi.AREF_GROUND comedi.AREF_COMMON comedi.AREF_DIFF comedi.AREF_OTHER - range: the range for these channels (0) + range: the ranges for these channels [(0,)] output: whether to use the lines as output (vs input) (False) + dev: if you've already opened the device file, pass in the + open device (None for internal open) + Note that neither aref nor range need to be the same length as + the channel tuple. If they are shorter, their last entry will + be repeated as needed. If they are longer, pycomediError will + be raised (was: their extra entries will not be used). """ self.verbose = False self._comedi = c # keep a local copy around @@ -39,26 +54,44 @@ class PyComediIO (object) : # local reference here. self.filename = filename self.state = "Closed" - self.open() + if dev == None : + self.open() + else : + self.fakeOpen(dev) self._setup_device_type(subdevice, devtype) self._setup_channels(chan, aref, range) - self._output = output + self.output = output def __del__(self) : self.close() def open(self) : if self.state == "Closed" : - self._dev = self._comedi.comedi_open(self.filename) - if self.dev < 0 : + dev = self._comedi.comedi_open(self.filename) + if dev < 0 : self._comedi.comedi_perror("comedi_open") raise pycomediError, "Cannot open %s" % self.filename - self.state = "Opened" + self.fakeOpen(dev) + def fakeOpen(self, dev): + """fake open: if you open the comedi device file yourself, use this + method to pass in the opened file descriptor and declare the + port "Open". + """ + if dev < 0 : + raise pycomediError, "Invalid file descriptor %d" % dev + self.dev = dev + self.state = "Open" def close(self) : if self.state != "Closed" : - rc = self._comedi.comedi_close(self._dev) + rc = self._comedi.comedi_close(self.dev) if rc < 0 : self._comedi.comedi_perror("comedi_close") raise pycomediError, "Cannot close %s" % self.filename - self.state = "Closed" + self.fakeClose() + def fakeClose(self): + """fake close: if you want to close the comedi device file yourself, + use this method to let the port know it has been "Closed". + """ + self.dev = None + self.state = "Closed" def _setup_device_type(self, subdevice, devtype) : """check that the specified subdevice exists, searching for an appropriate subdevice if subdevice == -1 @@ -74,97 +107,111 @@ class PyComediIO (object) : """ self._devtype = devtype if (subdevice < 0) : # autodetect an output device - self._subdev = self._comedi.comedi_find_subdevice_by_type(self._dev, self._devtype, 0) # 0 is starting subdevice - if self._subdev < 0 : + self.subdev = self._comedi.comedi_find_subdevice_by_type(self.dev, self._devtype, 0) # 0 is starting subdevice + if self.subdev < 0 : self._comedi.comedi_perror("comedi_find_subdevice_by_type") raise pycomediError, "Could not find a %d device" % (self._devtype) else : - self._subdev = subdevice - type = self._comedi.comedi_get_subdevice_type(self._dev, self._subdev) + self.subdev = subdevice + type = self._comedi.comedi_get_subdevice_type(self.dev, self.subdev) if type != self._devtype : if type < 0 : self._comedi.comedi_perror("comedi_get_subdevice_type") - raise pycomediError, "Comedi subdevice %d has wrong type %d" % (self._subdev, type) - def _setup_channels(self, chan, aref, range) : + raise pycomediError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type) + def _setup_channels(self, chan, aref, rng) : """check that the specified channels exists, and that the arefs and ranges are legal for those channels. Also allocate a range item for each channel, to allow converting between physical units and comedi units even when the device is not open. inputs: - chan: an iterable of the channels you wish to control ((0,1,2,3)) - aref: the analog reference for these channels (comedi.AREF_GROUND) + chan: an iterable of the channels you wish to control [(0,1,2,3)] + aref: the analog references for these channels [(comedi.AREF_GROUND,)] values include comedi.AREF_GROUND comedi.AREF_COMMON comedi.AREF_DIFF comedi.AREF_OTHER - range: the range for these channels (0) + rng: the ranges for these channels [(0,)] + Note that neither aref nor rng need to be the same length as + the channel tuple. If they are shorter, their last entry will + be repeated as needed. If they are longer, pycomediError will + be raised (was: their extra entries will not be used). """ - self._chan = chan - self._aref = aref - self._range = range - subdev_n_chan = self._comedi.comedi_get_n_channels(self._dev, self._subdev) - self._maxdata = [] + self.chan = chan + self.nchan = len(self.chan) + self._aref = _expand_tuple(aref, self.nchan) + self._range = _expand_tuple(rng, self.nchan) + self.maxdata = [] self._comedi_range = [] - for chan in self._chan : + subdev_n_chan = self._comedi.comedi_get_n_channels(self.dev, self.subdev) + self.cr_chan = self._comedi.chanlist(self.nchan) + for i,chan,aref,rng in zip(range(self.nchan), self.chan, self._aref, self._range) : if int(chan) != chan : raise pycomediError, "Channels must be integers, not %s" % str(chan) if chan >= subdev_n_chan : - raise pycomediError, "Channel %d > subdevice %d's largest chan %d" % (chan, self._subdev, subdev_n_chan-1) - n_range = self._comedi.comedi_get_n_ranges(self._dev, self._subdev, chan) - if range > n_range : - raise pycomediError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1) - maxdata = self._comedi.comedi_get_maxdata(self._dev, self._subdev, chan) - self._maxdata.append(maxdata) - comrange = self._comedi.comedi_get_range(self._dev, self._subdev, chan, range) + raise pycomediError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1) + n_range = self._comedi.comedi_get_n_ranges(self.dev, self.subdev, chan) + if rng > n_range : + raise pycomediError, "Range %d > subdevice %d, chan %d's largest range %d" % (rng, subdev, chan, n_range-1) + maxdata = self._comedi.comedi_get_maxdata(self.dev, self.subdev, chan) + self.maxdata.append(maxdata) + comrange = self._comedi.comedi_get_range(self.dev, self.subdev, chan, rng) # comrange becomes invalid if device is closed, so make a copy... comrange_copy = self._comedi.comedi_range() comrange_copy.min = comrange.min comrange_copy.max = comrange.max comrange_copy.unit = comrange.unit self._comedi_range.append(comrange_copy) + self.cr_chan[i] = self._comedi.cr_pack(chan, rng, aref) def comedi_to_phys(self, chan_index, comedi) : - phys = self._comedi.comedi_to_phys(comedi, self._comedi_range[chan_index], self._maxdata[chan_index]) + phys = self._comedi.comedi_to_phys(comedi, self._comedi_range[chan_index], self.maxdata[chan_index]) if self.verbose : - print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self._subdev, self._chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self._maxdata[chan_index]) + print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self.maxdata[chan_index]) return phys def phys_to_comedi(self, chan_index, phys) : - comedi = self._comedi.comedi_from_phys(phys, self._comedi_range[chan_index], self._maxdata[chan_index]) + comedi = self._comedi.comedi_from_phys(phys, self._comedi_range[chan_index], self.maxdata[chan_index]) if self.verbose : - print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self._subdev, self._chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self._maxdata[chan_index]) + print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self.maxdata[chan_index]) return comedi class PyComediSingleIO (PyComediIO) : "Software timed single-point input/ouput" def __init__(self, **kwargs) : """inputs: - filename: comedi device file for your device ("/dev/comedi0"). - subdevice: the digital IO subdevice (-1 for autodetect) - devtype: the devoce type + filename: comedi device file for your device ["/dev/comedi0"]. + subdevice: the analog output subdevice [-1 for autodetect] + devtype: the device type [c.COMEDI_SUBD_AI] values include comedi.COMEDI_SUBD_DI comedi.COMEDI_SUBD_DO comedi.COMEDI_SUBD_DIO comedi.COMEDI_SUBD_AI comedi.COMEDI_SUBD_AO - chan: an iterable of the channels you wish to control ((0,1,2,3)) - aref: the analog reference for these channels (comedi.AREF_GROUND) + chan: an iterable of the channels you wish to control [(0,1,2,3)] + aref: the analog references for these channels [(comedi.AREF_GROUND,)] values include comedi.AREF_GROUND comedi.AREF_COMMON comedi.AREF_DIFF comedi.AREF_OTHER - range: the range for these channels (0) + range: the ranges for these channels [(0,)] + output: whether to use the lines as output (vs input) (False) + dev: if you've already opened the device file, pass in the + open device (None for internal open) + Note that neither aref nor range need to be the same length as + the channel tuple. If they are shorter, their last entry will + be repeated as needed. If they are longer, pycomediError will + be raised (was: their extra entries will not be used). """ - common.PyComediIO.__init__(self, **kwargs) + PyComediIO.__init__(self, **kwargs) def write_chan_index(self, chan_index, data) : """inputs: chan_index: the channel you wish to write to data: the value you wish to write to that channel """ - if self._output != True : + if self.output != True : raise pycomediError, "Must be an output to write" - rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, data); + rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self._range[chan_index], self._aref[chan_index], data); if rc != 1 : # the number of samples written self._comedi.comedi_perror("comedi_data_write") raise pycomediError, "comedi_data_write returned %d" % rc @@ -174,9 +221,9 @@ class PyComediSingleIO (PyComediIO) : outputs: data: the value read from that channel """ - if self._output == True : + if self.output != False : raise pycomediError, "Must be an input to read" - rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref); + rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self._range[chan_index], self._aref[chan_index]); if rc != 1 : # the number of samples read self._comedi.comedi_perror("comedi_data_read") raise pycomediError, "comedi_data_read returned %d" % rc diff --git a/pycomedi/linreg.py b/pycomedi/linreg.py deleted file mode 100755 index 1caa198..0000000 --- a/pycomedi/linreg.py +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/python - -from scipy.stats import linregress -from scipy.io import read_array, write_array -import sys - -if __name__ == "__main__" : - data = read_array(sys.argv[1]) #, atype='Integer' numpy.typecodes - gradient, intercept, r_value, p_value, std_err = linregress(data) - print "y = %g + %g x" % (intercept, gradient) - print "r = ", r_value # correlation coefficient = covariance / (std_dev_x*std_dev_y) - print "p = ", p_value # probablility of measuring this ?slope? for non-correlated, normally-distruibuted data - print "err = ", std_err # root mean sqared error of best fit - diff --git a/pycomedi/simult_aio.py b/pycomedi/simult_aio.py index 303b72b..50ef5b2 100644 --- a/pycomedi/simult_aio.py +++ b/pycomedi/simult_aio.py @@ -1,7 +1,8 @@ # Simultaneous, finite, buffered analog inpout/output using comedi drivers -import comedi -from numpy import array, fromstring, uint16, float32, pi, sin +import comedi as c +import common +from numpy import array, fromstring, float32, pi, sin import int16_rw # imports for testing @@ -9,16 +10,17 @@ from time import sleep from scipy.stats import linregress from os import system +VERSION = common.VERSION #VERBOSE = True VERBOSE = False AO_TRIGGERS_OFF_AI_START = True #AO_TRIGGERS_OFF_AI_START = False -class simAioError (Exception) : +class simAioError (common.pycomediError) : "Simultaneous Analog IO error" pass -_example_array = array([0], dtype=uint16) # for typing, since I don't know what type(array) should be +_example_array = array([0], dtype=int16_rw.DATA_T) # for typing, since I don't know what type(array) should be _cmdtest_message = ["success", "invalid source", @@ -27,232 +29,251 @@ _cmdtest_message = ["success", "argument conflict", "invalid chanlist"] -def _print_cmdsrc(source) : - if source & comedi.TRIG_NONE : print "none|", - if source & comedi.TRIG_NOW : print "now|", - if source & comedi.TRIG_FOLLOW : print "follow|", - if source & comedi.TRIG_TIME : print "time|", - if source & comedi.TRIG_TIMER : print "timer|", - if source & comedi.TRIG_COUNT : print "count|", - if source & comedi.TRIG_EXT : print "ext|", - if source & comedi.TRIG_INT : print "int|", - if source & comedi.TRIG_OTHER : print "other|", +class cmd (object) : + """Wrap a comedi command in more Pythonic trappings. -def _print_command(cmd) : - print "subdevice: \t%d" % cmd.subdev - print "flags: \t0x%x" % cmd.flags - print "start: \t", - _print_cmdsrc(cmd.start_src) - print "\t%d" % cmd.start_arg - print "scan_begin:\t", - _print_cmdsrc(cmd.scan_begin_src) - print "\t%d" % cmd.scan_begin_arg - print "convert: \t", - _print_cmdsrc(cmd.convert_src) - print "\t%d" % cmd.convert_arg - print "scan_end: \t", - _print_cmdsrc(cmd.scan_end_src) - print "\t%d" % cmd.scan_end_arg - print "stop: \t", - _print_cmdsrc(cmd.stop_src) - print "\t%d" % cmd.stop_arg + Due to my limited needs, this class currently only supports + software triggered runs (possibly with output triggering off the + input trigger) for a finite number of output samples where all of + the scan and sample timing is internal and as fast as possible. -def _expand_tuple(tup, length) : - "Expand an iterable TUP to a tuple of LENGTH by repeating the last element" - if len(tup) > length : - raise simAioError, "Tuple too long." - elif len(tup) < length : - temp_tup = tup + tuple((tup[-1],)*(length-len(tup))) - tup = temp_tup - return tup + See http://www.comedi.org/doc/x621.html#COMEDICMDSTRUCTURE + for more details/possibilities. + """ + def __init__(self, IO) : + """input: + IO : an initialized common.PyComediIO object + """ + self.IO = IO + if self.IO.output == True : + self.cmdTypeString = "output" + else : + self.cmdTypeString = "input" + self.generate_rough_command() + def generate_rough_command(self) : + if VERBOSE : + print "generate rough %s command" % self.cmdTypeString + cmd = self.IO._comedi.comedi_cmd_struct() + cmd.subdev = self.IO.subdev + if self.IO.output : + cmd.flags = self.IO._comedi.CMDF_WRITE + else : + cmd.flags = 0 + # decide how to trigger a multi-scan run + if self.IO.output and AO_TRIGGERS_OFF_AI_START : + cmd.start_src = self.IO._comedi.TRIG_EXT + cmd.start_arg = 18 # AI_START1 internal AI start signal + else : + cmd.start_src = self.IO._comedi.TRIG_INT + cmd.start_arg = 0 + # decide how to trigger a multi-channel scan + cmd.scan_begin_src = self.IO._comedi.TRIG_TIMER + cmd.scan_begin_arg = 1 # temporary value for now + # decide how to trigger a single channel's aquisition + if self.IO.output : + cmd.convert_src = self.IO._comedi.TRIG_NOW # convert simultaneously (each output has it's own DAC) + cmd.convert_arg = 0 + else : + cmd.convert_src = self.IO._comedi.TRIG_TIMER # convert sequentially (all inputs share single ADC) + cmd.convert_arg = 1 # time between channels in ns, 1 to convert ASAP + # decide when a scan is complete + cmd.scan_end_src = self.IO._comedi.TRIG_COUNT + cmd.scan_end_arg = self.IO.nchan + cmd.stop_src = self.IO._comedi.TRIG_COUNT + cmd.stop_arg = 1 # temporary value for now + cmd.chanlist = self.IO.cr_chan + cmd.chanlist_len = self.IO.nchan + self.cmd = cmd + self.test_cmd(max_passes=3) + def test_cmd(self, max_passes=1) : + very_verbose = False + i = 0 + rc = 0 + if very_verbose : + print "Testing command:" + _print_command(self.cmd) + while i < max_passes : + rc = self.IO._comedi.comedi_command_test(self.IO.dev, self.cmd) + if (rc == 0) : + break + if VERBOSE or very_verbose : + print "test pass %d, %s" % (i, _cmdtest_message[rc]) + i += 1 + if (VERBOSE or very_verbose) and i < max_passes : + print "Passing command:\n", self + if i >= max_passes : + print "Failing command (%d):\n" % rc, self + raise simAioError, "Invalid command: %s" % _cmdtest_message[rc] + def execute(self) : + if VERBOSE : + print "Loading %s command" % self.cmdTypeString + rc = self.IO._comedi.comedi_command(self.IO.dev, self.cmd) + if rc < 0 : + self.IO._comedi.comedi_perror("comedi_command") + raise simAioError, "Error executing %s command %d" % (self.cmdTypeString, rc) + def _cmdsrc(self, source) : + str = "" + if source & c.TRIG_NONE : str += "none|" + if source & c.TRIG_NOW : str += "now|" + if source & c.TRIG_FOLLOW : str += "follow|" + if source & c.TRIG_TIME : str += "time|" + if source & c.TRIG_TIMER : str += "timer|" + if source & c.TRIG_COUNT : str += "count|" + if source & c.TRIG_EXT : str += "ext|" + if source & c.TRIG_INT : str += "int|" + if source & c.TRIG_OTHER : str += "other|" + return str + def __str__(self) : + str = "Command on %s (%s):\n" % (self.IO, self.cmdTypeString) + str += "subdevice: \t%d\n" % self.cmd.subdev + str += "flags: \t0x%x\n" % self.cmd.flags + str += "start: \t" + str += self._cmdsrc(self.cmd.start_src) + str += "\t%d\n" % self.cmd.start_arg + str += "scan_begin:\t" + str += self._cmdsrc(self.cmd.scan_begin_src) + str += "\t%d\n" % self.cmd.scan_begin_arg + str += "convert: \t" + str += self._cmdsrc(self.cmd.convert_src) + str += "\t%d\n" % self.cmd.convert_arg + str += "scan_end: \t" + str += self._cmdsrc(self.cmd.scan_end_src) + str += "\t%d\n" % self.cmd.scan_end_arg + str += "stop: \t" + str += self._cmdsrc(self.cmd.stop_src) + str += "\t%d" % self.cmd.stop_arg + return str + +class AIO (object) : + """Control a simultaneous analog input/output (AIO) device using + Comedi drivers. + + The AIO device is modeled as being in one of the following states: -class aio_obj : + Open Device file has been opened, various one-off setup + tasks completed. + Initialized Any previous activity is complete, ready for a new + task + Setup New task assigned. + Armed The output task is "triggered" (see below) + Read The input task is triggered, and input read in + Closed + Transitions between these states may be achieved with class methods + open, __init__ - through Open to Initialized + close Any to Closed + setup Initialized to Setup + arm Setup to Armed + start_read Armed to Read + reset Setup, Armed, or Read to Initialized + + There are two triggering methods set by the module global + AO_TRIGGERS_OFF_AI_START + When this global is true, the output and input will start on the + exactly the same clock tick (in this case the output "trigger" + when "Arming" just primes the output to start when the input start + is signaled). However, this functionality at the moment depends + on your having a National Instruments card with a DAQ-STC module + controling the timing (e.g. E series) and a patched version of + ni_mio_common.c in your Comedi kernel. If you do not have an + appropriate card, you will either have to implement an appropriate + method for your card, or set the global to false, in which case + the IO synchronicity depends on the synchronicity of the AO and AI + software triggers. + """ def __init__(self, filename="/dev/comedi0", in_subdevice=-1, in_chan=(0,), in_aref=(0,), in_range=(0,), out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,)) : - self._comedi = comedi + """inputs: + filename: comedi device file for your device ("/dev/comedi0"). + And then for both input and output (in_* and out_*): + subdevice: the analog output subdevice (-1 for autodetect) + values include + comedi.COMEDI_SUBD_DI + comedi.COMEDI_SUBD_DO + comedi.COMEDI_SUBD_DIO + comedi.COMEDI_SUBD_AI + comedi.COMEDI_SUBD_AO + chan: an iterable of the channels you wish to control ((0,1,2,3)) + aref: the analog reference for these channels (comedi.AREF_GROUND) + values include + comedi.AREF_GROUND + comedi.AREF_COMMON + comedi.AREF_DIFF + comedi.AREF_OTHER + range: the range for these channels (0) + """ + self._comedi = c self._filename = filename - self.state = "Closed" - self.open() - - self._iaref = _expand_tuple(in_aref, len(in_chan)) - self._irange = _expand_tuple(in_range, len(in_chan)) - temp = self._check_options(in_subdevice, in_chan, self._iaref, self._irange, output=False) - self._isubdev = temp["subdevice"] - self._ichan_params = temp["chan_params"] - self._ichan = in_chan - self.i_nchan = len(self._ichan) - self._ichanlist = self._comedi.chanlist(self.i_nchan) - for i in range(self.i_nchan) : - self._ichanlist[i] = self._comedi.cr_pack(self._ichan[i], self._irange[i], self._iaref[i]) - - self._oaref = _expand_tuple(out_aref, len(in_chan)) - self._orange = _expand_tuple(out_range, len(in_chan)) - temp = self._check_options(out_subdevice, out_chan, self._oaref, self._orange, output=True) - self._osubdev = temp["subdevice"] - self._ochan_params = temp["chan_params"] - self._ochan = out_chan - self.o_nchan = len(self._ochan) - self._ochanlist = self._comedi.chanlist(self.o_nchan) - for i in range(self.o_nchan) : - self._ochanlist[i] = self._comedi.cr_pack(self._ochan[i], self._orange[i], self._oaref[i]) - - self._gen_rough_output_cmd() - self._gen_rough_input_cmd() + # the next section is much like the open() method below, + # but in this one we set up all the extra details associated + # with the AO and AI structures + self.dev = self._comedi.comedi_open(self._filename) + self._fd = self._comedi.comedi_fileno(self.dev) + if VERBOSE : + print "Opened %s on fd %d" % (self._filename, self._fd) + self.AI = common.PyComediIO(filename=self._filename, subdevice=in_subdevice, devtype=c.COMEDI_SUBD_AI, chan=in_chan, aref=in_aref, range=in_range, output=False, dev=self.dev) + self.AO = common.PyComediIO(filename=self._filename, subdevice=out_subdevice, devtype=c.COMEDI_SUBD_AO, chan=out_chan, aref=out_aref, range=out_range, output=True, dev=self.dev) + self.state = "Open" + self._icmd = cmd(self.AI) + self._ocmd = cmd(self.AO) self.state = "Initialized" def __del__(self) : self.close() def close(self) : if self.state != "Closed" : self.reset(force=True) - rc = self._comedi.comedi_close(self._dev) + rc = self._comedi.comedi_close(self.dev) if rc < 0 : self._comedi.comedi_perror("comedi_close") raise simAioError, "Cannot close %s" % self._filename if VERBOSE : print "Closed %s on fd %d" % (self._filename, self._fd) + self.AI.fakeClose() + self.AO.fakeClose() + self.dev = None + self._fd = None self.state = "Closed" def open(self) : if self.state != "Closed" : raise simAioError, "Invalid state %s" % self.state - self._dev = self._comedi.comedi_open(self._filename) - self._fd = self._comedi.comedi_fileno(self._dev) + self.dev = self._comedi.comedi_open(self._filename) + self._fd = self._comedi.comedi_fileno(self.dev) if VERBOSE : print "Opened %s on fd %d" % (self._filename, self._fd) + self.AI.fakeOpen(self.dev) + self.AO.fakeOpen(self.dev) + self.state = "Open" + self._icmd = cmd(self.AI) + self._ocmd = cmd(self.AO) self.state = "Initialized" - def _check_options(self, subdevice, chan, aref, rnge, output=True) : - subdevice = self._check_subdevice(subdevice, output=output) - chan_params = [] - for i in range(len(chan)) : - chan_params.append(self._check_chan(subdevice, chan[i], aref[i], rnge[i])) - if VERBOSE : - if output : - print "Output", - else : - print "Input", - print " subdevice with channels %s is valid" % (str(chan)) - return {"subdevice":subdevice, - "chan_params":chan_params} - def _check_subdevice(self, subdevice, output=True) : - if output == True : - target_type = self._comedi.COMEDI_SUBD_AO - else : - target_type = self._comedi.COMEDI_SUBD_AI - if (subdevice < 0) : # autodetect an input device - subdevice = self._comedi.comedi_find_subdevice_by_type(self._dev, target_type, 0) # 0 is starting subdevice - else : - type = self._comedi.comedi_get_subdevice_type(self._dev, subdevice) - if type != target_type : - raise simAioError, "Comedi subdevice %d has wrong type %d" % (subdevice, type) - return subdevice - def _check_chan(self, subdevice, chan, aref, range) : - subdev_n_chan = self._comedi.comedi_get_n_channels(self._dev, subdevice) - if chan >= subdev_n_chan : - raise simAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, subdevice, subdev_n_chan-1) - n_range = self._comedi.comedi_get_n_ranges(self._dev, subdevice, chan) - if range >= n_range : - raise simAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdevice, chan, n_range-1) - maxdata = self._comedi.comedi_get_maxdata(self._dev, subdevice, chan) - comrange = self._comedi.comedi_get_range(self._dev, subdevice, chan, range) - return {"maxdata":maxdata, "comrange": comrange} - def _gen_rough_output_cmd(self) : - if VERBOSE : - print "generate rough output command" - cmd = self._comedi.comedi_cmd_struct() - cmd.subdev = self._osubdev - cmd.flags = self._comedi.CMDF_WRITE - if AO_TRIGGERS_OFF_AI_START : - cmd.start_src = self._comedi.TRIG_EXT - cmd.start_arg = 18 # AI_START1 internal AI start signal - else : - cmd.start_src = self._comedi.TRIG_INT - cmd.start_arg = 0 - cmd.scan_begin_src = self._comedi.TRIG_TIMER - cmd.scan_begin_arg = 1 # temporary value for now - cmd.convert_src = self._comedi.TRIG_NOW - cmd.convert_arg = 0 - cmd.scan_end_src = self._comedi.TRIG_COUNT - cmd.scan_end_arg = self.o_nchan - cmd.stop_src = self._comedi.TRIG_COUNT - cmd.stop_arg = 1 # temporary value for now - cmd.chanlist = self._ochanlist - cmd.chanlist_len = self.o_nchan - self._test_cmd(cmd, max_passes=3) - self._ocmd = cmd - def _gen_rough_input_cmd(self) : - if VERBOSE : - print "generate rough input command" - cmd = self._comedi.comedi_cmd_struct() - cmd.subdev = self._isubdev - cmd.flags = 0 - cmd.start_src = self._comedi.TRIG_INT - cmd.start_arg = 0 - cmd.scan_begin_src = self._comedi.TRIG_TIMER - cmd.scan_begin_arg = 1 # temporary value for now - cmd.convert_src = self._comedi.TRIG_TIMER - cmd.convert_arg = 1 - cmd.scan_end_src = self._comedi.TRIG_COUNT - cmd.scan_end_arg = self.i_nchan - cmd.stop_src = self._comedi.TRIG_COUNT - cmd.stop_arg = 1 # temporary value for now - cmd.chanlist = self._ichanlist - cmd.chanlist_len = self.i_nchan - self._test_cmd(cmd, max_passes=3) - self._icmd = cmd - def _test_cmd(self, cmd, max_passes=1) : - very_verbose = False - i = 0 - rc = 0 - if very_verbose : - print "Testing command:" - _print_command(cmd) - while i < max_passes : - rc = self._comedi.comedi_command_test(self._dev, cmd) - if (rc == 0) : - break - if VERBOSE or very_verbose : - print "test pass %d, %s" % (i, _cmdtest_message[rc]) - i += 1 - if (VERBOSE or very_verbose) and i < max_passes : - print "Passing command:" - _print_command(cmd) - if i >= max_passes : - print "Failing command:" - _print_command(cmd) - raise simAioError, "Invalid command: %s" % _cmdtest_message[rc] def setup(self, nsamps, freq, out_buffer) : if self.state != "Initialized" : raise simAioError, "Invalid state %s" % self.state if type(out_buffer) != type(_example_array) : raise simAioError, "out_buffer must be a numpy array, not a %s" % str(type(out_buffer)) - self._ocmd.scan_begin_arg = int(1e9/freq) - self._ocmd.stop_arg = nsamps + self._ocmd.cmd.scan_begin_arg = int(1e9/freq) + self._ocmd.cmd.stop_arg = nsamps if VERBOSE : - print "Configure the board (%d ns per scan, %d samps)" % (self._ocmd.scan_begin_arg, self._ocmd.stop_arg) + print "Configure the board (%d ns per scan, %d samps)" % (self._ocmd.cmd.scan_begin_arg, self._ocmd.cmd.stop_arg) self._onremain = nsamps - self._test_cmd(self._ocmd) - rc = self._comedi.comedi_command(self._dev, self._ocmd) - if rc < 0 : - self._comedi.comedi_perror("comedi_command") - raise simAioError, "Error executing output command %d" % rc - self._icmd.scan_begin_arg = int(1e9/freq) - self._icmd.stop_arg = nsamps - self._test_cmd(self._icmd) + self._ocmd.test_cmd() + self._ocmd.execute() + self._icmd.cmd.scan_begin_arg = int(1e9/freq) + self._icmd.cmd.stop_arg = nsamps + self._icmd.test_cmd() self._inremain = nsamps - rc = self._comedi.comedi_command(self._dev, self._icmd) - if rc < 0 : - self._comedi.comedi_perror("comedi_command") - raise simAioError, "Error executing input command" - + self._icmd.execute() if VERBOSE : - print "Write %d output samples to the card" % (nsamps*self.o_nchan) - rc = int16_rw.write_samples(self._fd, nsamps*self.o_nchan, out_buffer, 1) - if rc != nsamps*self.o_nchan : + print "Write %d output samples to the card" % (nsamps*self.AO.nchan) + rc = int16_rw.write_samples(self._fd, out_buffer, 0, nsamps*self.AO.nchan, 1) + if rc != nsamps*self.AO.nchan : raise simAioError, "Error %d writing output buffer\n" % rc - rc = int16_rw.write_samples(self._fd, self.o_nchan, out_buffer[-self.o_nchan:], 1) # HACK, add an extra sample for each channel to the output buffer - if rc != self.o_nchan : + if VERBOSE : + print "Writing extra output" + rc = int16_rw.write_samples(self._fd, out_buffer, (nsamps-1)*self.AO.nchan, self.AO.nchan, 1) # HACK, add an extra sample for each channel to the output buffer + if rc != self.AO.nchan : raise simAioError, "Error %d writing hack output buffer\n" % rc - # maybe will avoid resetting... + # Without the hack, output jumps back to 0V after the command completes self._nsamps = nsamps self.state = "Setup" def arm(self) : @@ -260,18 +281,18 @@ class aio_obj : raise simAioError, "Invalid state %s" % self.state if VERBOSE : print "Arm the analog ouptut" - self._comedi_internal_trigger(self._osubdev) + self._comedi_internal_trigger(self.AO.subdev) self.state = "Armed" def start_read(self, in_buffer) : if self.state != "Armed" : raise simAioError, "Invalid state %s" % self.state if VERBOSE : print "Start the run" - self._comedi_internal_trigger(self._isubdev) + self._comedi_internal_trigger(self.AI.subdev) if VERBOSE : - print "Read %d input samples from the card" % (self._nsamps*self.i_nchan) - rc = int16_rw.read_samples(self._fd, self._nsamps*self.i_nchan, in_buffer, -1) - if rc != self._nsamps*self.i_nchan : + print "Read %d input samples from the card" % (self._nsamps*self.AI.nchan) + rc = int16_rw.read_samples(self._fd, in_buffer, 0, self._nsamps*self.AI.nchan, -1) + if rc != self._nsamps*self.AI.nchan : raise simAioError, "Error %d reading input buffer\n" % rc self.state = "Read" def _comedi_internal_trigger(self, subdevice) : @@ -282,16 +303,16 @@ class aio_obj : insn.data = data insn.n = 1 data[0] = 0 - rc = self._comedi.comedi_do_insn(self._dev, insn) + rc = self._comedi.comedi_do_insn(self.dev, insn) def reset(self, force=False) : if VERBOSE : print "Reset the analog subdevices" # clean up after the read - rc = self._comedi.comedi_cancel(self._dev, self._osubdev) + rc = self._comedi.comedi_cancel(self.dev, self.AO.subdev) if rc < 0 : self._comedi.comedi_perror("comedi_cancel") raise simAioError, "Error cleaning up output command" - rc = self._comedi.comedi_cancel(self._dev, self._isubdev) + rc = self._comedi.comedi_cancel(self.dev, self.AI.subdev) if rc < 0 : self._comedi.comedi_perror("comedi_cancel") raise simAioError, "Error cleaning up input command" @@ -300,12 +321,12 @@ class aio_obj : # define the test suite -def _test_aio_obj(aio=None, start_wait=0, verbose=False) : +def _test_AIO(aio=None, start_wait=0, verbose=False) : if (verbose) : - print "_test_aio_obj(start_wait = %g)" % start_wait + print "_test_AIO(start_wait = %g)" % start_wait nsamps = 10 - out_data = array([0]*nsamps, dtype=uint16) - in_data = array([0]*nsamps, dtype=uint16) + out_data = array([0]*nsamps, dtype=int16_rw.DATA_T) + in_data = array([0]*nsamps, dtype=int16_rw.DATA_T) for i in range(nsamps) : out_data[i] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps))) aio.setup(10, 1000, out_data) @@ -330,7 +351,7 @@ def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=False) : good_run = 0 good_run_arr = [] for i in range(num_tests) : - out_data, in_data = _test_aio_obj(aio, start_wait) + out_data, in_data = _test_AIO(aio, start_wait) gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data) grads[i] = gradient if verbose : @@ -351,17 +372,17 @@ def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=False) : print "good run stem and leaf:" system(call) -def _test_aio_obj_multi_chan(aio=None, start_wait=0, verbose=False) : +def _test_AIO_multi_chan(aio=None, start_wait=0, verbose=False) : if (verbose) : - print "_test_aio_obj_multi_chan(start_wait = %g)" % start_wait + print "_test_AIO_multi_chan(start_wait = %g)" % start_wait nsamps = 10 - out_data = array([0]*nsamps*aio.o_nchan, dtype=uint16) - in_data = array([0]*nsamps*aio.i_nchan, dtype=uint16) + out_data = array([0]*nsamps*aio.AO.nchan, dtype=int16_rw.DATA_T) + in_data = array([0]*nsamps*aio.AI.nchan, dtype=int16_rw.DATA_T) # set up interleaved data for i in range(nsamps) : - out_data[i*aio.o_nchan] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps))) - for j in range(1, aio.o_nchan) : - out_data[i*aio.o_nchan + j] = 0 + out_data[i*aio.AO.nchan] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps))) + for j in range(1, aio.AO.nchan) : + out_data[i*aio.AO.nchan + j] = 0 aio.setup(10, 1000, out_data) aio.arm() sleep(start_wait) @@ -369,32 +390,32 @@ def _test_aio_obj_multi_chan(aio=None, start_wait=0, verbose=False) : aio.reset() if (verbose) : print "#", - for j in range(aio.o_nchan) : - print "%s\t" % aio._ochan[j], - for j in range(aio.i_nchan) : - print "%s\t" % aio._ichan[j], + for j in range(aio.AO.nchan) : + print "%s\t" % aio.AO.chan[j], + for j in range(aio.AI.nchan) : + print "%s\t" % aio.AI.chan[j], print "" for i in range(nsamps) : - for j in range(aio.o_nchan) : - print "%s\t" % out_data[i*aio.o_nchan+j], - for j in range(aio.i_nchan) : - print "%s\t" % in_data[i*aio.i_nchan+j], + for j in range(aio.AO.nchan) : + print "%s\t" % out_data[i*aio.AO.nchan+j], + for j in range(aio.AI.nchan) : + print "%s\t" % in_data[i*aio.AI.nchan+j], print "" return (out_data, in_data) def test() : - aio = aio_obj() - _test_aio_obj(aio, start_wait = 0, verbose=True) - _test_aio_obj(aio, start_wait = 0.5, verbose=True) + aio = AIO(in_chan=(0,), out_chan=(0,)) + _test_AIO(aio, start_wait = 0, verbose=True) + _test_AIO(aio, start_wait = 0.5, verbose=True) aio.close() aio.open() - #_repeat_aio_test(aio, num_tests=100, start_wait=0, verbose=False) + _repeat_aio_test(aio, num_tests=500, start_wait=0, verbose=False) aio.close() - aiom = aio_obj(in_chan=(0,1,2,3), out_chan=(0,1)) - _test_aio_obj_multi_chan(aiom, start_wait = 0, verbose=True) + aiom = AIO(in_chan=(0,1,2,3), out_chan=(0,1)) + _test_AIO_multi_chan(aiom, start_wait = 0, verbose=True) if __name__ == "__main__" : test() diff --git a/pycomedi/single_aio.py b/pycomedi/single_aio.py index cb6d99b..ac344b6 100644 --- a/pycomedi/single_aio.py +++ b/pycomedi/single_aio.py @@ -19,30 +19,29 @@ class sngAioError (common.pycomediError) : class AI (common.PyComediSingleIO) : def __init__(self, **kwargs) : """inputs: - filename: comedi device file for your device ("/dev/comedi0"). - subdevice: the analog output subdevice (-1 for autodetect) - chan: an iterable of the channels you wish to control ((0,1,2,3)) - aref: the analog reference for these channels (comedi.AREF_GROUND) + filename: comedi device file for your device ["/dev/comedi0"]. + subdevice: the analog input subdevice (-1 for autodetect) + chan: an iterable of the channels you wish to control [(0,1,2,3)] + aref: the analog references for these channels [(comedi.AREF_GROUND,)] values include: comedi.AREF_GROUND comedi.AREF_COMMON comedi.AREF_DIFF comedi.AREF_OTHER - range: the range for these channels (0) - output: whether to use the lines as output (vs input) (True) + range: the ranges for these channels [(0,)] """ - common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AI, **kwargs) + common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AI, output=False, **kwargs) def read(self) : """outputs: data: a list of read data values in Comedi units """ - data = range(len(self.chan)) - for i in range(len(self.chan)) : + data = range(self.nchan) + for i in range(self.nchan) : data[i] = self.read_chan_index(i) #print "Read %s, got %s" % (str(self.chan), str(data)) return data -def _test_ai_obj() : +def _test_AI() : ai = AI() print "read ", ai.read() print "read ", ai.read() @@ -54,27 +53,26 @@ def _test_ai_obj() : class AO (common.PyComediSingleIO) : def __init__(self, **kwargs) : """inputs: - filename: comedi device file for your device ("/dev/comedi0"). - subdevice: the analog output subdevice (-1 for autodetect) - chan: an iterable of the channels you wish to control ((0,1,2,3)) - aref: the analog reference for these channels (comedi.AREF_GROUND) + filename: comedi device file for your device ["/dev/comedi0"]. + subdevice: the analog output subdevice [-1 for autodetect] + chan: an iterable of the channels you wish to control [(0,1,2,3)] + aref: the analog references for these channels [(comedi.AREF_GROUND,)] values include: comedi.AREF_GROUND comedi.AREF_COMMON comedi.AREF_DIFF comedi.AREF_OTHER - range: the range for these channels (0) - output: whether to use the lines as output (vs input) (True) + range: the ranges for these channels [(0,)] """ - common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AO, **kwargs) + common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AO, output=True, **kwargs) def write(self, data) : - if len(data) != len(self.chan) : - raise sngAioError, "data length %d != the number of channels (%d)" % (len(data), len(self.chan)) - for i in range(len(self.chan)) : + if len(data) != self.nchan : + raise sngAioError, "data length %d != the number of channels (%d)" % (len(data), self.nchan) + for i in range(self.nchan) : self.write_chan_index(i, data[i]) -def _test_ao_obj() : - ao = AO() +def _test_AO() : + ao = AO(chan=(0,1)) ao.write([0,0]) ao.write([3000,3000]) ao.write([0,0]) @@ -84,6 +82,7 @@ def _test_ao_obj() : def _fit_with_residual(out_data, in_data, channel) : "Fit in_data(out_data) to a straight line & return residual" from scipy.stats import linregress + from numpy import zeros gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data) print "y = %g + %g x" % (intercept, gradient) print "r = ", r_value # correlation coefficient = covariance / (std_dev_x*std_dev_y) @@ -91,8 +90,8 @@ def _fit_with_residual(out_data, in_data, channel) : print "err = ", std_err # root mean sqared error of best fit if gradient < .7 or p_value > 0.05 : raise sngAioError, "Out channel %d != in channel %d" % (channel, channel) - residual = zeros((points,)) - for i in range(points) : + residual = zeros((len(out_data),)) + for i in range(len(out_data)) : pred_y = intercept + gradient * out_data[i] residual[i] = in_data[i] - pred_y return residual @@ -111,12 +110,12 @@ def plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1) : except ImportError : pass # ignore plot erros -def _test_aio() : +def _test_AIO() : "Test AO and AI by cabling AO0 into AI0 and sweeping voltage" from scipy.stats import linregress from numpy import linspace, zeros - ao = ao_obj(chan=(0,1)) - ai = ai_obj(chan=(0,1)) + ao = AO(chan=(0,1)) + ai = AI(chan=(0,1)) start = 0.1 * ao.maxdata[0] stop = 0.9 * ao.maxdata[0] points = 10 @@ -125,7 +124,7 @@ def _test_aio() : in_data0 = zeros((points,)) in_data1 = zeros((points,)) for i in range(points) : - ao.write([out_data0[i], out_data1[i]]) + ao.write([int(out_data0[i]), int(out_data1[i])]) id = ai.read() in_data0[i] = id[0] in_data1[i] = id[1] @@ -140,12 +139,12 @@ def _test_aio() : raise Exception, "Input 0, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data0[i], in_data0[i], residual0[i]) if abs(residual1[i]) > 10 : # HACK, hardcoded maximum nonlinearity raise Exception, "Input 1, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data1[i], in_data1[i], residual1[i]) - print "_test_aio success" + print "aio success" def test() : - _test_ai_obj() - _test_ao_obj() - _test_aio() + _test_AI() + _test_AO() + _test_AIO() if __name__ == "__main__" : test() diff --git a/pycomedi/single_dio.py b/pycomedi/single_dio.py index a185810..3af6638 100644 --- a/pycomedi/single_dio.py +++ b/pycomedi/single_dio.py @@ -18,15 +18,15 @@ class dioError (common.pycomediError): class DIO_port (common.PyComediSingleIO) : def __init__(self, output=True, **kwargs) : """inputs: - filename: comedi device file for your device ("/dev/comedi0"). - subdevice: the digital IO subdevice (-1 for autodetect) - chan: an iterable of the channels you wish to control ((0,1,2,3)) - aref: the analog reference for these channels (comedi.AREF_GROUND) - range: the range for these channels (0) + filename: comedi device file for your device ["/dev/comedi0"]. + subdevice: the digital IO subdevice [-1 for autodetect] + chan: an iterable of the channels you wish to control [(0,1,2,3)] + aref: the analog references for these channels [(comedi.AREF_GROUND,)] + range: the ranges for these channels [(0,)] output: whether to use the lines as output (vs input) (True) """ common.PyComediSingleIO.__init__(self, devtype=c.COMEDI_SUBD_DIO, output=output, **kwargs) - if self._output : + if self.output : self.set_to_output() else : self.set_to_input() @@ -37,7 +37,7 @@ class DIO_port (common.PyComediSingleIO) : if rc != 1 : # yes, comedi_dio_config returns 1 on success, -1 on failure, as of comedilib-0.8.1 self._comedi.comedi_perror("comedi_dio_config") raise dioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_OUTPUT, rc) - self._output = True + self.output = True def set_to_input(self) : "switch all the channels associated with this object to be inputs" for chan in self.chan : @@ -45,7 +45,7 @@ class DIO_port (common.PyComediSingleIO) : if rc != 1 : self._comedi.comedi_perror("comedi_dio_config") raise dioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_INPUT, rc) - self._output = False + self.output = False def write_port(self, data) : """inputs: data: decimal number representing data to write @@ -55,7 +55,7 @@ class DIO_port (common.PyComediSingleIO) : 1 to chan[2] 0 to higher channels... """ - for i in range(len(self.chan)) : + for i in range(self.nchan) : self.write_chan_index(i, (data >> i) % 2) def read_port(self) : """outputs: @@ -67,7 +67,7 @@ class DIO_port (common.PyComediSingleIO) : 0 on higher channels... """ data = 0 - for i in range(len(self.chan)) : + for i in range(self.nchan) : data += self.read_chan_index(i) << i return data @@ -91,7 +91,7 @@ def _test_DIO_port() : d.close() def _test_DO_port() : - p = DO_port + p = DO_port() for data in [0, 1, 2, 3, 4, 5, 6, 7] : p(data) p.close() diff --git a/pycomedi/test.py b/pycomedi/test.py deleted file mode 100755 index c77a5a8..0000000 --- a/pycomedi/test.py +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/python -# -# With analog output 0 cabled directly to analog input 0 -# output a single period of a 1000kHz sine wave -# and perform a linear regression between the output and input data. -# If the input is closely correlated to the output (gradient > 0.7) -# consider the synchronized input/output a success. -# Prints the success rate for num_runs - -from scipy.stats import linregress -from scipy.io import read_array, write_array -from os import system -from numpy import zeros - -# I had been checking at different waits between AO arming and AI triggering, -# thinking that the problem might be due to my patch. -# But I get failure rates of ~ 20% regardless of the wait time (-t option) -# So currently only use waits of 0 seconds to get more data. - -def test() : - #waits = range(5, -1, -1) - waits = [0] - - num_runs = 200 - runs = range(0, num_runs) - results = zeros((1, num_runs)) - good_run_arr = [] - - fails = 0 - successes = 0 - good_run = 0 - for wait in waits : - for run in runs : - call = './simult_aio -n 50 -F 50000 -w 1000 -t%d' % wait - if system(call) != 0 : - return 1 - if system('int16s_to_ascii_array out in > data') != 0 : - return 1 - data = read_array('data') - gradient, intercept, r_value, p_value, std_err = linregress(data) - results[wait,run] = gradient - print "wait %2d, run %2d, gradient %g" % (wait, run, gradient) - if gradient < .7 : - fails += 1 - good_run_arr.append(good_run) - good_run = 0 - else : - successes += 1 - good_run += 1 - print "failure rate ", (float(fails)/float(fails+successes)) - call = 'echo "' - for num in good_run_arr : - call += "%d " % num - call += '" | stem_leaf 2' - print "good runs:" - system(call) - -if __name__ == "__main__" : - test() diff --git a/pycomedi/zero_initial.py b/pycomedi/zero_initial.py deleted file mode 100755 index a0abd55..0000000 --- a/pycomedi/zero_initial.py +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/python - -from scipy.io import read_array, write_array -import sys - -data = read_array(sys.argv[1]) #, atype='Integer' numpy.typecodes - -start_vals = data[0] -for point in data : - x = point-start_vals - for val in x : - print val, "\t", - print "" - -- 2.26.2