From: W. Trevor King Date: Sun, 5 Oct 2008 04:07:02 +0000 (-0400) Subject: Broke out common functionality into common.py X-Git-Tag: 0.2~11 X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=a48370216b3ea3cb1c0a6e804f839c410a63a84c;p=pycomedi.git Broke out common functionality into common.py Still need to go through and clean up the simultaneous code. --- diff --git a/pycomedi/common.py b/pycomedi/common.py new file mode 100644 index 0000000..e565e03 --- /dev/null +++ b/pycomedi/common.py @@ -0,0 +1,184 @@ +"""Some Comedi operations common to analog and digital IO""" + +import comedi as c + +VERSION = 0.1 + +class pycomediError (Exception) : + "Error in pycomedi.common" + pass + +class PyComediIO (object) : + "Base class for Comedi IO operations" + def __init__(self, filename="/dev/comedi0", subdevice=-1, devtype=c.COMEDI_SUBD_AI, chan=(0,1,2,3), aref=c.AREF_GROUND, range=0, output=False) : + """inputs: + filename: comedi device file for your device ("/dev/comedi0"). + subdevice: the analog output subdevice (-1 for autodetect) + devtype: the devoce type (c.COMEDI_SUBD_AI) + values include + comedi.COMEDI_SUBD_DI + comedi.COMEDI_SUBD_DO + comedi.COMEDI_SUBD_DIO + comedi.COMEDI_SUBD_AI + comedi.COMEDI_SUBD_AO + chan: an iterable of the channels you wish to control ((0,1,2,3)) + aref: the analog reference for these channels (comedi.AREF_GROUND) + values include + comedi.AREF_GROUND + comedi.AREF_COMMON + comedi.AREF_DIFF + comedi.AREF_OTHER + range: the range for these channels (0) + output: whether to use the lines as output (vs input) (False) + """ + self.verbose = False + self._comedi = c # keep a local copy around + # sometimes I got errors on exitting python, which looked like + # the imported comedi package was unset before my IO had a + # chance to call comedi_close(). We avoid that by keeping a + # local reference here. + self.filename = filename + self.state = "Closed" + self.open() + self._setup_device_type(subdevice, devtype) + self._setup_channels(chan, aref, range) + self._output = output + def __del__(self) : + self.close() + def open(self) : + if self.state == "Closed" : + self._dev = self._comedi.comedi_open(self.filename) + if self.dev < 0 : + self._comedi.comedi_perror("comedi_open") + raise pycomediError, "Cannot open %s" % self.filename + self.state = "Opened" + def close(self) : + if self.state != "Closed" : + rc = self._comedi.comedi_close(self._dev) + if rc < 0 : + self._comedi.comedi_perror("comedi_close") + raise pycomediError, "Cannot close %s" % self.filename + self.state = "Closed" + def _setup_device_type(self, subdevice, devtype) : + """check that the specified subdevice exists, + searching for an appropriate subdevice if subdevice == -1 + inputs: + subdevice: the analog output subdevice (-1 for autodetect) + devtype: the devoce type + values include + comedi.COMEDI_SUBD_DI + comedi.COMEDI_SUBD_DO + comedi.COMEDI_SUBD_DIO + comedi.COMEDI_SUBD_AI + comedi.COMEDI_SUBD_AO + """ + self._devtype = devtype + if (subdevice < 0) : # autodetect an output device + self._subdev = self._comedi.comedi_find_subdevice_by_type(self._dev, self._devtype, 0) # 0 is starting subdevice + if self._subdev < 0 : + self._comedi.comedi_perror("comedi_find_subdevice_by_type") + raise pycomediError, "Could not find a %d device" % (self._devtype) + else : + self._subdev = subdevice + type = self._comedi.comedi_get_subdevice_type(self._dev, self._subdev) + if type != self._devtype : + if type < 0 : + self._comedi.comedi_perror("comedi_get_subdevice_type") + raise pycomediError, "Comedi subdevice %d has wrong type %d" % (self._subdev, type) + def _setup_channels(self, chan, aref, range) : + """check that the specified channels exists, and that the arefs and + ranges are legal for those channels. Also allocate a range + item for each channel, to allow converting between physical + units and comedi units even when the device is not open. + inputs: + chan: an iterable of the channels you wish to control ((0,1,2,3)) + aref: the analog reference for these channels (comedi.AREF_GROUND) + values include + comedi.AREF_GROUND + comedi.AREF_COMMON + comedi.AREF_DIFF + comedi.AREF_OTHER + range: the range for these channels (0) + """ + self._chan = chan + self._aref = aref + self._range = range + subdev_n_chan = self._comedi.comedi_get_n_channels(self._dev, self._subdev) + self._maxdata = [] + self._comedi_range = [] + for chan in self._chan : + if int(chan) != chan : + raise pycomediError, "Channels must be integers, not %s" % str(chan) + if chan >= subdev_n_chan : + raise pycomediError, "Channel %d > subdevice %d's largest chan %d" % (chan, self._subdev, subdev_n_chan-1) + n_range = self._comedi.comedi_get_n_ranges(self._dev, self._subdev, chan) + if range > n_range : + raise pycomediError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1) + maxdata = self._comedi.comedi_get_maxdata(self._dev, self._subdev, chan) + self._maxdata.append(maxdata) + comrange = self._comedi.comedi_get_range(self._dev, self._subdev, chan, range) + # comrange becomes invalid if device is closed, so make a copy... + comrange_copy = self._comedi.comedi_range() + comrange_copy.min = comrange.min + comrange_copy.max = comrange.max + comrange_copy.unit = comrange.unit + self._comedi_range.append(comrange_copy) + def comedi_to_phys(self, chan_index, comedi) : + phys = self._comedi.comedi_to_phys(comedi, self._comedi_range[chan_index], self._maxdata[chan_index]) + if self.verbose : + print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self._subdev, self._chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self._maxdata[chan_index]) + return phys + def phys_to_comedi(self, chan_index, phys) : + comedi = self._comedi.comedi_from_phys(phys, self._comedi_range[chan_index], self._maxdata[chan_index]) + if self.verbose : + print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self._subdev, self._chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self._maxdata[chan_index]) + return comedi + +class PyComediSingleIO (PyComediIO) : + "Software timed single-point input/ouput" + def __init__(self, **kwargs) : + """inputs: + filename: comedi device file for your device ("/dev/comedi0"). + subdevice: the digital IO subdevice (-1 for autodetect) + devtype: the devoce type + values include + comedi.COMEDI_SUBD_DI + comedi.COMEDI_SUBD_DO + comedi.COMEDI_SUBD_DIO + comedi.COMEDI_SUBD_AI + comedi.COMEDI_SUBD_AO + chan: an iterable of the channels you wish to control ((0,1,2,3)) + aref: the analog reference for these channels (comedi.AREF_GROUND) + values include + comedi.AREF_GROUND + comedi.AREF_COMMON + comedi.AREF_DIFF + comedi.AREF_OTHER + range: the range for these channels (0) + """ + common.PyComediIO.__init__(self, **kwargs) + def write_chan_index(self, chan_index, data) : + """inputs: + chan_index: the channel you wish to write to + data: the value you wish to write to that channel + """ + if self._output != True : + raise pycomediError, "Must be an output to write" + rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, data); + if rc != 1 : # the number of samples written + self._comedi.comedi_perror("comedi_data_write") + raise pycomediError, "comedi_data_write returned %d" % rc + def read_chan_index(self, chan_index) : + """inputs: + chan_index: the channel you wish to read from + outputs: + data: the value read from that channel + """ + if self._output == True : + raise pycomediError, "Must be an input to read" + rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref); + if rc != 1 : # the number of samples read + self._comedi.comedi_perror("comedi_data_read") + raise pycomediError, "comedi_data_read returned %d" % rc + return data + diff --git a/pycomedi/single_aio.py b/pycomedi/single_aio.py index 0b270b8..cb6d99b 100644 --- a/pycomedi/single_aio.py +++ b/pycomedi/single_aio.py @@ -1,88 +1,49 @@ -# Use Comedi drivers for single-shot analog input/output +"""Use Comedi drivers for single-shot analog input/output -import comedi +Being single-shot implementations, read/writes will be software timed, +so this module would not be a good choice if you need millisecond +resolution. However, it does provide a simple and robust way to +generate/aquire signals at 1 second and greater timescales. +""" -VERSION = 0.0 +import comedi as c +import common + +VERSION = common.VERSION VERBOSE_DEBUG = True -class sngAioError (Exception) : +class sngAioError (common.pycomediError) : "Single point Analog IO error" pass -class ai_obj : - def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1,2,3), aref=0, range=0) : - self.verbose = False - self.comedi = comedi - self.filename = filename - self.state = "Closed" - self.open() - if (subdevice < 0) : # autodetect an output device - self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AI, 0) # 0 is starting subdevice - else : - self.subdev = subdevice - type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev) - if type != self.comedi.COMEDI_SUBD_AI : - raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type) - self.chan = chan - self.aref = aref - self.range = range - subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev) - self.maxdata = [] - self.comedi_range = [] - for chan in self.chan : - if int(chan) != chan : - raise sngAioError, "Channels must be integers, not %s" % str(chan) - if chan >= subdev_n_chan : - raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1) - n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan) - if range > n_range : - raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1) - maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan) - self.maxdata.append(maxdata) - comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range) - # comrange becomes invalid if device is closed, so make a copy... - comrange_copy = self.comedi.comedi_range() - comrange_copy.min = comrange.min - comrange_copy.max = comrange.max - comrange_copy.unit = comrange.unit - self.comedi_range.append(comrange_copy) - def __del__(self) : - self.close() - def open(self) : - if self.state == "Closed" : - self.dev = self.comedi.comedi_open(self.filename) - self.state = "Opened" - def close(self) : - if self.state != "Closed" : - rc = self.comedi.comedi_close(self.dev) - if rc < 0 : - self.comedi.comedi_perror("comedi_close") - raise sngAioError, "Cannot close %s" % self.filename - self.state = "Closed" - def comedi_to_phys(self, chan_index, comedi) : - phys = self.comedi.comedi_to_phys(comedi, self.comedi_range[chan_index], self.maxdata[chan_index]) - if self.verbose : - print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index]) - return phys - def phys_to_comedi(self, chan_index, phys) : - comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index]) - if self.verbose : - print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index]) - return comedi - def read_chan_index(self, chan_index) : - rc, data = self.comedi.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref); - if rc != 1 : # the number of samples read - raise sngAioError, "comedi_data_read returned %d" % rc - return data +class AI (common.PyComediSingleIO) : + def __init__(self, **kwargs) : + """inputs: + filename: comedi device file for your device ("/dev/comedi0"). + subdevice: the analog output subdevice (-1 for autodetect) + chan: an iterable of the channels you wish to control ((0,1,2,3)) + aref: the analog reference for these channels (comedi.AREF_GROUND) + values include: + comedi.AREF_GROUND + comedi.AREF_COMMON + comedi.AREF_DIFF + comedi.AREF_OTHER + range: the range for these channels (0) + output: whether to use the lines as output (vs input) (True) + """ + common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AI, **kwargs) def read(self) : - out = range(len(self.chan)) + """outputs: + data: a list of read data values in Comedi units + """ + data = range(len(self.chan)) for i in range(len(self.chan)) : - out[i] = self.read_chan_index(i) - #print "Read %s, got %s" % (str(self.chan), str(out)) - return out + data[i] = self.read_chan_index(i) + #print "Read %s, got %s" % (str(self.chan), str(data)) + return data def _test_ai_obj() : - ai = ai_obj() + ai = AI() print "read ", ai.read() print "read ", ai.read() print "read ", ai.read() @@ -90,72 +51,22 @@ def _test_ai_obj() : ai.close() print "ai success" -class ao_obj : - def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1), aref=0, range=0) : - self.verbose = False - self.comedi = comedi - self.filename = filename - self.state = "Closed" - self.open() - if (subdevice < 0) : # autodetect an output device - self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AO, 0) # 0 is starting subdevice - else : - self.subdev = subdevice - type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev) - if type != self.comedi.COMEDI_SUBD_AO : - raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type) - self.chan = chan - self.aref = aref - self.range = range - subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev) - self.maxdata = [] - self.comedi_range = [] - for chan in self.chan : - if chan >= subdev_n_chan : - raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1) - n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan) - if range > n_range : - raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1) - maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan) - self.maxdata.append(maxdata) - comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range) - # comrange becomes invalid if device is closed, so make a copy... - comrange_copy = self.comedi.comedi_range() - comrange_copy.min = comrange.min - comrange_copy.max = comrange.max - comrange_copy.unit = comrange.unit - self.comedi_range.append(comrange_copy) - def __del__(self) : - self.close() - def open(self) : - if self.state != "Closed" : - raise sngAioError, "Invalid state %s" % self.state - self.dev = self.comedi.comedi_open(self.filename) - self.state = "Opened" - def close(self) : - if self.state != "Closed" : - for i in range(len(self.chan)) : - self.write_chan_index(i, self.phys_to_comedi(i, 0)) - rc = self.comedi.comedi_close(self.dev) - if rc < 0 : - self.comedi.comedi_perror("comedi_close") - raise sngAioError, "Cannot close %s" % self.filename - self.state = "Closed" - def comedi_to_phys(self, chan_index, comedi) : - phys = self.comedi.comedi_to_phys(int(comedi), self.comedi_range[chan_index], self.maxdata[chan_index]) - if self.verbose : - print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index]) - return phys - def phys_to_comedi(self, chan_index, phys) : - comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index]) - if self.verbose : - print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index]) - return comedi - def write_chan_index(self, chan_index, data) : - #print "set output on chan %d to %d" % (chan_index, data) - rc = self.comedi.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, int(data)); - if rc != 1 : # the number of samples written - raise sngAioError, 'comedi_data_write returned %d' % rc +class AO (common.PyComediSingleIO) : + def __init__(self, **kwargs) : + """inputs: + filename: comedi device file for your device ("/dev/comedi0"). + subdevice: the analog output subdevice (-1 for autodetect) + chan: an iterable of the channels you wish to control ((0,1,2,3)) + aref: the analog reference for these channels (comedi.AREF_GROUND) + values include: + comedi.AREF_GROUND + comedi.AREF_COMMON + comedi.AREF_DIFF + comedi.AREF_OTHER + range: the range for these channels (0) + output: whether to use the lines as output (vs input) (True) + """ + common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AO, **kwargs) def write(self, data) : if len(data) != len(self.chan) : raise sngAioError, "data length %d != the number of channels (%d)" % (len(data), len(self.chan)) @@ -163,14 +74,15 @@ class ao_obj : self.write_chan_index(i, data[i]) def _test_ao_obj() : - ao = ao_obj() + ao = AO() ao.write([0,0]) ao.write([3000,3000]) ao.write([0,0]) ao.close() print "ao success" -def _fit_with_residual(out_data, in_data) : +def _fit_with_residual(out_data, in_data, channel) : + "Fit in_data(out_data) to a straight line & return residual" from scipy.stats import linregress gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data) print "y = %g + %g x" % (intercept, gradient) @@ -178,7 +90,7 @@ def _fit_with_residual(out_data, in_data) : print "p = ", p_value # probablility of measuring this ?slope? for non-correlated, normally-distruibuted data print "err = ", std_err # root mean sqared error of best fit if gradient < .7 or p_value > 0.05 : - raise sngAioError, "Out channel 0 != in channel 0" + raise sngAioError, "Out channel %d != in channel %d" % (channel, channel) residual = zeros((points,)) for i in range(points) : pred_y = intercept + gradient * out_data[i] @@ -200,6 +112,7 @@ def plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1) : pass # ignore plot erros def _test_aio() : + "Test AO and AI by cabling AO0 into AI0 and sweeping voltage" from scipy.stats import linregress from numpy import linspace, zeros ao = ao_obj(chan=(0,1)) @@ -218,8 +131,8 @@ def _test_aio() : in_data1[i] = id[1] ai.close() ao.close() - residual0 = _fit_with_residual(out_data0, in_data0) - residual1 = _fit_with_residual(out_data1, in_data1) + residual0 = _fit_with_residual(out_data0, in_data0, 0) + residual1 = _fit_with_residual(out_data1, in_data1, 1) if VERBOSE_DEBUG : plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1) for i in range(points) : @@ -227,7 +140,6 @@ def _test_aio() : raise Exception, "Input 0, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data0[i], in_data0[i], residual0[i]) if abs(residual1[i]) > 10 : # HACK, hardcoded maximum nonlinearity raise Exception, "Input 1, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data1[i], in_data1[i], residual1[i]) - print "_test_aio success" def test() : diff --git a/pycomedi/single_dio.py b/pycomedi/single_dio.py index 3a0cea8..a185810 100644 --- a/pycomedi/single_dio.py +++ b/pycomedi/single_dio.py @@ -7,36 +7,26 @@ generate/aquire signals at 1 second and greater timescales. """ import comedi as c +import common -VERSION = 0.1 +VERSION = common.VERSION -class dioError (Exception) : +class dioError (common.pycomediError): "Digital IO error" pass -class DIO_port : - def __init__(self, filename="/dev/comedi0", subdevice=2, chan=(0,1,2,3), aref=0, range=0, output=True) : +class DIO_port (common.PyComediSingleIO) : + def __init__(self, output=True, **kwargs) : """inputs: filename: comedi device file for your device ("/dev/comedi0"). - subdevice: the digital IO subdevice (2) + subdevice: the digital IO subdevice (-1 for autodetect) chan: an iterable of the channels you wish to control ((0,1,2,3)) - aref: the analog reference for these channels (0) + aref: the analog reference for these channels (comedi.AREF_GROUND) range: the range for these channels (0) output: whether to use the lines as output (vs input) (True) """ - self.filename = filename - self.subdev = subdevice - self.chan = chan - self.aref = aref - self.range = range - self.output = output - self.dev = c.comedi_open(filename) - if self.dev < 0 : - raise dioError, "Cannot open %s" % self.filename - type = c.comedi_get_subdevice_type(self.dev, self.subdev) - if type != c.COMEDI_SUBD_DIO : - raise dioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type) - if self.output : + common.PyComediSingleIO.__init__(self, devtype=c.COMEDI_SUBD_DIO, output=output, **kwargs) + if self._output : self.set_to_output() else : self.set_to_input() @@ -45,37 +35,17 @@ class DIO_port : for chan in self.chan : rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_OUTPUT) if rc != 1 : # yes, comedi_dio_config returns 1 on success, -1 on failure, as of comedilib-0.8.1 + self._comedi.comedi_perror("comedi_dio_config") raise dioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_OUTPUT, rc) - self.output = True + self._output = True def set_to_input(self) : "switch all the channels associated with this object to be inputs" for chan in self.chan : rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_INPUT) if rc != 1 : + self._comedi.comedi_perror("comedi_dio_config") raise dioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_INPUT, rc) - self.output = False - def write_chan_index(self, chan_index, data) : - """inputs: - chan_index: the channel you wish to write to - data: the value you wish to write to that channel - """ - if self.output != True : - raise dioError, "Must be an output to write" - rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, data); - if rc != 1 : # the number of samples written - raise dioError, "comedi_data_write returned %d" % rc - def read_chan_index(self, chan_index) : - """inputs: - chan_index: the channel you wish to read from - outputs: - data: the value read from that channel - """ - if self.output == True : - raise dioError, "Must be an input to read" - rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref); - if rc != 1 : # the number of samples read - raise dioError, "comedi_data_read returned %d" % rc - return data + self._output = False def write_port(self, data) : """inputs: data: decimal number representing data to write @@ -118,11 +88,13 @@ def _test_DIO_port() : data = d.read_port() print "port value is %d" % data print "dio_obj success" + d.close() def _test_DO_port() : p = DO_port for data in [0, 1, 2, 3, 4, 5, 6, 7] : p(data) + p.close() print "write_dig_port success" def test() :