--- /dev/null
+"""Some Comedi operations common to analog and digital IO"""
+
+import comedi as c
+
+VERSION = 0.1
+
+class pycomediError (Exception) :
+ "Error in pycomedi.common"
+ pass
+
+class PyComediIO (object) :
+ "Base class for Comedi IO operations"
+ def __init__(self, filename="/dev/comedi0", subdevice=-1, devtype=c.COMEDI_SUBD_AI, chan=(0,1,2,3), aref=c.AREF_GROUND, range=0, output=False) :
+ """inputs:
+ filename: comedi device file for your device ("/dev/comedi0").
+ subdevice: the analog output subdevice (-1 for autodetect)
+ devtype: the devoce type (c.COMEDI_SUBD_AI)
+ values include
+ comedi.COMEDI_SUBD_DI
+ comedi.COMEDI_SUBD_DO
+ comedi.COMEDI_SUBD_DIO
+ comedi.COMEDI_SUBD_AI
+ comedi.COMEDI_SUBD_AO
+ chan: an iterable of the channels you wish to control ((0,1,2,3))
+ aref: the analog reference for these channels (comedi.AREF_GROUND)
+ values include
+ comedi.AREF_GROUND
+ comedi.AREF_COMMON
+ comedi.AREF_DIFF
+ comedi.AREF_OTHER
+ range: the range for these channels (0)
+ output: whether to use the lines as output (vs input) (False)
+ """
+ self.verbose = False
+ self._comedi = c # keep a local copy around
+ # sometimes I got errors on exitting python, which looked like
+ # the imported comedi package was unset before my IO had a
+ # chance to call comedi_close(). We avoid that by keeping a
+ # local reference here.
+ self.filename = filename
+ self.state = "Closed"
+ self.open()
+ self._setup_device_type(subdevice, devtype)
+ self._setup_channels(chan, aref, range)
+ self._output = output
+ def __del__(self) :
+ self.close()
+ def open(self) :
+ if self.state == "Closed" :
+ self._dev = self._comedi.comedi_open(self.filename)
+ if self.dev < 0 :
+ self._comedi.comedi_perror("comedi_open")
+ raise pycomediError, "Cannot open %s" % self.filename
+ self.state = "Opened"
+ def close(self) :
+ if self.state != "Closed" :
+ rc = self._comedi.comedi_close(self._dev)
+ if rc < 0 :
+ self._comedi.comedi_perror("comedi_close")
+ raise pycomediError, "Cannot close %s" % self.filename
+ self.state = "Closed"
+ def _setup_device_type(self, subdevice, devtype) :
+ """check that the specified subdevice exists,
+ searching for an appropriate subdevice if subdevice == -1
+ inputs:
+ subdevice: the analog output subdevice (-1 for autodetect)
+ devtype: the devoce type
+ values include
+ comedi.COMEDI_SUBD_DI
+ comedi.COMEDI_SUBD_DO
+ comedi.COMEDI_SUBD_DIO
+ comedi.COMEDI_SUBD_AI
+ comedi.COMEDI_SUBD_AO
+ """
+ self._devtype = devtype
+ if (subdevice < 0) : # autodetect an output device
+ self._subdev = self._comedi.comedi_find_subdevice_by_type(self._dev, self._devtype, 0) # 0 is starting subdevice
+ if self._subdev < 0 :
+ self._comedi.comedi_perror("comedi_find_subdevice_by_type")
+ raise pycomediError, "Could not find a %d device" % (self._devtype)
+ else :
+ self._subdev = subdevice
+ type = self._comedi.comedi_get_subdevice_type(self._dev, self._subdev)
+ if type != self._devtype :
+ if type < 0 :
+ self._comedi.comedi_perror("comedi_get_subdevice_type")
+ raise pycomediError, "Comedi subdevice %d has wrong type %d" % (self._subdev, type)
+ def _setup_channels(self, chan, aref, range) :
+ """check that the specified channels exists, and that the arefs and
+ ranges are legal for those channels. Also allocate a range
+ item for each channel, to allow converting between physical
+ units and comedi units even when the device is not open.
+ inputs:
+ chan: an iterable of the channels you wish to control ((0,1,2,3))
+ aref: the analog reference for these channels (comedi.AREF_GROUND)
+ values include
+ comedi.AREF_GROUND
+ comedi.AREF_COMMON
+ comedi.AREF_DIFF
+ comedi.AREF_OTHER
+ range: the range for these channels (0)
+ """
+ self._chan = chan
+ self._aref = aref
+ self._range = range
+ subdev_n_chan = self._comedi.comedi_get_n_channels(self._dev, self._subdev)
+ self._maxdata = []
+ self._comedi_range = []
+ for chan in self._chan :
+ if int(chan) != chan :
+ raise pycomediError, "Channels must be integers, not %s" % str(chan)
+ if chan >= subdev_n_chan :
+ raise pycomediError, "Channel %d > subdevice %d's largest chan %d" % (chan, self._subdev, subdev_n_chan-1)
+ n_range = self._comedi.comedi_get_n_ranges(self._dev, self._subdev, chan)
+ if range > n_range :
+ raise pycomediError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1)
+ maxdata = self._comedi.comedi_get_maxdata(self._dev, self._subdev, chan)
+ self._maxdata.append(maxdata)
+ comrange = self._comedi.comedi_get_range(self._dev, self._subdev, chan, range)
+ # comrange becomes invalid if device is closed, so make a copy...
+ comrange_copy = self._comedi.comedi_range()
+ comrange_copy.min = comrange.min
+ comrange_copy.max = comrange.max
+ comrange_copy.unit = comrange.unit
+ self._comedi_range.append(comrange_copy)
+ def comedi_to_phys(self, chan_index, comedi) :
+ phys = self._comedi.comedi_to_phys(comedi, self._comedi_range[chan_index], self._maxdata[chan_index])
+ if self.verbose :
+ print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self._subdev, self._chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self._maxdata[chan_index])
+ return phys
+ def phys_to_comedi(self, chan_index, phys) :
+ comedi = self._comedi.comedi_from_phys(phys, self._comedi_range[chan_index], self._maxdata[chan_index])
+ if self.verbose :
+ print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self._subdev, self._chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self._maxdata[chan_index])
+ return comedi
+
+class PyComediSingleIO (PyComediIO) :
+ "Software timed single-point input/ouput"
+ def __init__(self, **kwargs) :
+ """inputs:
+ filename: comedi device file for your device ("/dev/comedi0").
+ subdevice: the digital IO subdevice (-1 for autodetect)
+ devtype: the devoce type
+ values include
+ comedi.COMEDI_SUBD_DI
+ comedi.COMEDI_SUBD_DO
+ comedi.COMEDI_SUBD_DIO
+ comedi.COMEDI_SUBD_AI
+ comedi.COMEDI_SUBD_AO
+ chan: an iterable of the channels you wish to control ((0,1,2,3))
+ aref: the analog reference for these channels (comedi.AREF_GROUND)
+ values include
+ comedi.AREF_GROUND
+ comedi.AREF_COMMON
+ comedi.AREF_DIFF
+ comedi.AREF_OTHER
+ range: the range for these channels (0)
+ """
+ common.PyComediIO.__init__(self, **kwargs)
+ def write_chan_index(self, chan_index, data) :
+ """inputs:
+ chan_index: the channel you wish to write to
+ data: the value you wish to write to that channel
+ """
+ if self._output != True :
+ raise pycomediError, "Must be an output to write"
+ rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, data);
+ if rc != 1 : # the number of samples written
+ self._comedi.comedi_perror("comedi_data_write")
+ raise pycomediError, "comedi_data_write returned %d" % rc
+ def read_chan_index(self, chan_index) :
+ """inputs:
+ chan_index: the channel you wish to read from
+ outputs:
+ data: the value read from that channel
+ """
+ if self._output == True :
+ raise pycomediError, "Must be an input to read"
+ rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref);
+ if rc != 1 : # the number of samples read
+ self._comedi.comedi_perror("comedi_data_read")
+ raise pycomediError, "comedi_data_read returned %d" % rc
+ return data
+
-# Use Comedi drivers for single-shot analog input/output
+"""Use Comedi drivers for single-shot analog input/output
-import comedi
+Being single-shot implementations, read/writes will be software timed,
+so this module would not be a good choice if you need millisecond
+resolution. However, it does provide a simple and robust way to
+generate/aquire signals at 1 second and greater timescales.
+"""
-VERSION = 0.0
+import comedi as c
+import common
+
+VERSION = common.VERSION
VERBOSE_DEBUG = True
-class sngAioError (Exception) :
+class sngAioError (common.pycomediError) :
"Single point Analog IO error"
pass
-class ai_obj :
- def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1,2,3), aref=0, range=0) :
- self.verbose = False
- self.comedi = comedi
- self.filename = filename
- self.state = "Closed"
- self.open()
- if (subdevice < 0) : # autodetect an output device
- self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AI, 0) # 0 is starting subdevice
- else :
- self.subdev = subdevice
- type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev)
- if type != self.comedi.COMEDI_SUBD_AI :
- raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
- self.chan = chan
- self.aref = aref
- self.range = range
- subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev)
- self.maxdata = []
- self.comedi_range = []
- for chan in self.chan :
- if int(chan) != chan :
- raise sngAioError, "Channels must be integers, not %s" % str(chan)
- if chan >= subdev_n_chan :
- raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1)
- n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan)
- if range > n_range :
- raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1)
- maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan)
- self.maxdata.append(maxdata)
- comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range)
- # comrange becomes invalid if device is closed, so make a copy...
- comrange_copy = self.comedi.comedi_range()
- comrange_copy.min = comrange.min
- comrange_copy.max = comrange.max
- comrange_copy.unit = comrange.unit
- self.comedi_range.append(comrange_copy)
- def __del__(self) :
- self.close()
- def open(self) :
- if self.state == "Closed" :
- self.dev = self.comedi.comedi_open(self.filename)
- self.state = "Opened"
- def close(self) :
- if self.state != "Closed" :
- rc = self.comedi.comedi_close(self.dev)
- if rc < 0 :
- self.comedi.comedi_perror("comedi_close")
- raise sngAioError, "Cannot close %s" % self.filename
- self.state = "Closed"
- def comedi_to_phys(self, chan_index, comedi) :
- phys = self.comedi.comedi_to_phys(comedi, self.comedi_range[chan_index], self.maxdata[chan_index])
- if self.verbose :
- print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
- return phys
- def phys_to_comedi(self, chan_index, phys) :
- comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index])
- if self.verbose :
- print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
- return comedi
- def read_chan_index(self, chan_index) :
- rc, data = self.comedi.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref);
- if rc != 1 : # the number of samples read
- raise sngAioError, "comedi_data_read returned %d" % rc
- return data
+class AI (common.PyComediSingleIO) :
+ def __init__(self, **kwargs) :
+ """inputs:
+ filename: comedi device file for your device ("/dev/comedi0").
+ subdevice: the analog output subdevice (-1 for autodetect)
+ chan: an iterable of the channels you wish to control ((0,1,2,3))
+ aref: the analog reference for these channels (comedi.AREF_GROUND)
+ values include:
+ comedi.AREF_GROUND
+ comedi.AREF_COMMON
+ comedi.AREF_DIFF
+ comedi.AREF_OTHER
+ range: the range for these channels (0)
+ output: whether to use the lines as output (vs input) (True)
+ """
+ common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AI, **kwargs)
def read(self) :
- out = range(len(self.chan))
+ """outputs:
+ data: a list of read data values in Comedi units
+ """
+ data = range(len(self.chan))
for i in range(len(self.chan)) :
- out[i] = self.read_chan_index(i)
- #print "Read %s, got %s" % (str(self.chan), str(out))
- return out
+ data[i] = self.read_chan_index(i)
+ #print "Read %s, got %s" % (str(self.chan), str(data))
+ return data
def _test_ai_obj() :
- ai = ai_obj()
+ ai = AI()
print "read ", ai.read()
print "read ", ai.read()
print "read ", ai.read()
ai.close()
print "ai success"
-class ao_obj :
- def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1), aref=0, range=0) :
- self.verbose = False
- self.comedi = comedi
- self.filename = filename
- self.state = "Closed"
- self.open()
- if (subdevice < 0) : # autodetect an output device
- self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AO, 0) # 0 is starting subdevice
- else :
- self.subdev = subdevice
- type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev)
- if type != self.comedi.COMEDI_SUBD_AO :
- raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
- self.chan = chan
- self.aref = aref
- self.range = range
- subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev)
- self.maxdata = []
- self.comedi_range = []
- for chan in self.chan :
- if chan >= subdev_n_chan :
- raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1)
- n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan)
- if range > n_range :
- raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1)
- maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan)
- self.maxdata.append(maxdata)
- comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range)
- # comrange becomes invalid if device is closed, so make a copy...
- comrange_copy = self.comedi.comedi_range()
- comrange_copy.min = comrange.min
- comrange_copy.max = comrange.max
- comrange_copy.unit = comrange.unit
- self.comedi_range.append(comrange_copy)
- def __del__(self) :
- self.close()
- def open(self) :
- if self.state != "Closed" :
- raise sngAioError, "Invalid state %s" % self.state
- self.dev = self.comedi.comedi_open(self.filename)
- self.state = "Opened"
- def close(self) :
- if self.state != "Closed" :
- for i in range(len(self.chan)) :
- self.write_chan_index(i, self.phys_to_comedi(i, 0))
- rc = self.comedi.comedi_close(self.dev)
- if rc < 0 :
- self.comedi.comedi_perror("comedi_close")
- raise sngAioError, "Cannot close %s" % self.filename
- self.state = "Closed"
- def comedi_to_phys(self, chan_index, comedi) :
- phys = self.comedi.comedi_to_phys(int(comedi), self.comedi_range[chan_index], self.maxdata[chan_index])
- if self.verbose :
- print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
- return phys
- def phys_to_comedi(self, chan_index, phys) :
- comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index])
- if self.verbose :
- print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
- return comedi
- def write_chan_index(self, chan_index, data) :
- #print "set output on chan %d to %d" % (chan_index, data)
- rc = self.comedi.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, int(data));
- if rc != 1 : # the number of samples written
- raise sngAioError, 'comedi_data_write returned %d' % rc
+class AO (common.PyComediSingleIO) :
+ def __init__(self, **kwargs) :
+ """inputs:
+ filename: comedi device file for your device ("/dev/comedi0").
+ subdevice: the analog output subdevice (-1 for autodetect)
+ chan: an iterable of the channels you wish to control ((0,1,2,3))
+ aref: the analog reference for these channels (comedi.AREF_GROUND)
+ values include:
+ comedi.AREF_GROUND
+ comedi.AREF_COMMON
+ comedi.AREF_DIFF
+ comedi.AREF_OTHER
+ range: the range for these channels (0)
+ output: whether to use the lines as output (vs input) (True)
+ """
+ common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AO, **kwargs)
def write(self, data) :
if len(data) != len(self.chan) :
raise sngAioError, "data length %d != the number of channels (%d)" % (len(data), len(self.chan))
self.write_chan_index(i, data[i])
def _test_ao_obj() :
- ao = ao_obj()
+ ao = AO()
ao.write([0,0])
ao.write([3000,3000])
ao.write([0,0])
ao.close()
print "ao success"
-def _fit_with_residual(out_data, in_data) :
+def _fit_with_residual(out_data, in_data, channel) :
+ "Fit in_data(out_data) to a straight line & return residual"
from scipy.stats import linregress
gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
print "y = %g + %g x" % (intercept, gradient)
print "p = ", p_value # probablility of measuring this ?slope? for non-correlated, normally-distruibuted data
print "err = ", std_err # root mean sqared error of best fit
if gradient < .7 or p_value > 0.05 :
- raise sngAioError, "Out channel 0 != in channel 0"
+ raise sngAioError, "Out channel %d != in channel %d" % (channel, channel)
residual = zeros((points,))
for i in range(points) :
pred_y = intercept + gradient * out_data[i]
pass # ignore plot erros
def _test_aio() :
+ "Test AO and AI by cabling AO0 into AI0 and sweeping voltage"
from scipy.stats import linregress
from numpy import linspace, zeros
ao = ao_obj(chan=(0,1))
in_data1[i] = id[1]
ai.close()
ao.close()
- residual0 = _fit_with_residual(out_data0, in_data0)
- residual1 = _fit_with_residual(out_data1, in_data1)
+ residual0 = _fit_with_residual(out_data0, in_data0, 0)
+ residual1 = _fit_with_residual(out_data1, in_data1, 1)
if VERBOSE_DEBUG :
plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1)
for i in range(points) :
raise Exception, "Input 0, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data0[i], in_data0[i], residual0[i])
if abs(residual1[i]) > 10 : # HACK, hardcoded maximum nonlinearity
raise Exception, "Input 1, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data1[i], in_data1[i], residual1[i])
-
print "_test_aio success"
def test() :
"""
import comedi as c
+import common
-VERSION = 0.1
+VERSION = common.VERSION
-class dioError (Exception) :
+class dioError (common.pycomediError):
"Digital IO error"
pass
-class DIO_port :
- def __init__(self, filename="/dev/comedi0", subdevice=2, chan=(0,1,2,3), aref=0, range=0, output=True) :
+class DIO_port (common.PyComediSingleIO) :
+ def __init__(self, output=True, **kwargs) :
"""inputs:
filename: comedi device file for your device ("/dev/comedi0").
- subdevice: the digital IO subdevice (2)
+ subdevice: the digital IO subdevice (-1 for autodetect)
chan: an iterable of the channels you wish to control ((0,1,2,3))
- aref: the analog reference for these channels (0)
+ aref: the analog reference for these channels (comedi.AREF_GROUND)
range: the range for these channels (0)
output: whether to use the lines as output (vs input) (True)
"""
- self.filename = filename
- self.subdev = subdevice
- self.chan = chan
- self.aref = aref
- self.range = range
- self.output = output
- self.dev = c.comedi_open(filename)
- if self.dev < 0 :
- raise dioError, "Cannot open %s" % self.filename
- type = c.comedi_get_subdevice_type(self.dev, self.subdev)
- if type != c.COMEDI_SUBD_DIO :
- raise dioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
- if self.output :
+ common.PyComediSingleIO.__init__(self, devtype=c.COMEDI_SUBD_DIO, output=output, **kwargs)
+ if self._output :
self.set_to_output()
else :
self.set_to_input()
for chan in self.chan :
rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_OUTPUT)
if rc != 1 : # yes, comedi_dio_config returns 1 on success, -1 on failure, as of comedilib-0.8.1
+ self._comedi.comedi_perror("comedi_dio_config")
raise dioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_OUTPUT, rc)
- self.output = True
+ self._output = True
def set_to_input(self) :
"switch all the channels associated with this object to be inputs"
for chan in self.chan :
rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_INPUT)
if rc != 1 :
+ self._comedi.comedi_perror("comedi_dio_config")
raise dioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_INPUT, rc)
- self.output = False
- def write_chan_index(self, chan_index, data) :
- """inputs:
- chan_index: the channel you wish to write to
- data: the value you wish to write to that channel
- """
- if self.output != True :
- raise dioError, "Must be an output to write"
- rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, data);
- if rc != 1 : # the number of samples written
- raise dioError, "comedi_data_write returned %d" % rc
- def read_chan_index(self, chan_index) :
- """inputs:
- chan_index: the channel you wish to read from
- outputs:
- data: the value read from that channel
- """
- if self.output == True :
- raise dioError, "Must be an input to read"
- rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref);
- if rc != 1 : # the number of samples read
- raise dioError, "comedi_data_read returned %d" % rc
- return data
+ self._output = False
def write_port(self, data) :
"""inputs:
data: decimal number representing data to write
data = d.read_port()
print "port value is %d" % data
print "dio_obj success"
+ d.close()
def _test_DO_port() :
p = DO_port
for data in [0, 1, 2, 3, 4, 5, 6, 7] :
p(data)
+ p.close()
print "write_dig_port success"
def test() :