"Error in pycomedi.common"
pass
+def _expand_tuple(tup, length) :
+ "Expand an iterable TUP to a tuple of LENGTH by repeating the last element"
+ if len(tup) > length :
+ raise simAioError, "Tuple too long."
+ elif len(tup) < length :
+ temp_tup = tup + tuple((tup[-1],)*(length-len(tup)))
+ tup = temp_tup
+ return tup
+
class PyComediIO (object) :
"Base class for Comedi IO operations"
- def __init__(self, filename="/dev/comedi0", subdevice=-1, devtype=c.COMEDI_SUBD_AI, chan=(0,1,2,3), aref=c.AREF_GROUND, range=0, output=False) :
+ def __init__(self, filename="/dev/comedi0", subdevice=-1, devtype=c.COMEDI_SUBD_AI, chan=(0,1,2,3), aref=(c.AREF_GROUND,), range=(0,), output=False, dev=None) :
"""inputs:
- filename: comedi device file for your device ("/dev/comedi0").
- subdevice: the analog output subdevice (-1 for autodetect)
- devtype: the devoce type (c.COMEDI_SUBD_AI)
+ filename: comedi device file for your device ["/dev/comedi0"].
+ subdevice: the IO subdevice [-1 for autodetect]
+ devtype: the device type [c.COMEDI_SUBD_AI]
values include
comedi.COMEDI_SUBD_DI
comedi.COMEDI_SUBD_DO
comedi.COMEDI_SUBD_DIO
comedi.COMEDI_SUBD_AI
comedi.COMEDI_SUBD_AO
- chan: an iterable of the channels you wish to control ((0,1,2,3))
- aref: the analog reference for these channels (comedi.AREF_GROUND)
+ chan: an iterable of the channels you wish to control [(0,1,2,3)]
+ aref: the analog references for these channels [(comedi.AREF_GROUND,)]
values include
comedi.AREF_GROUND
comedi.AREF_COMMON
comedi.AREF_DIFF
comedi.AREF_OTHER
- range: the range for these channels (0)
+ range: the ranges for these channels [(0,)]
output: whether to use the lines as output (vs input) (False)
+ dev: if you've already opened the device file, pass in the
+ open device (None for internal open)
+ Note that neither aref nor range need to be the same length as
+ the channel tuple. If they are shorter, their last entry will
+ be repeated as needed. If they are longer, pycomediError will
+ be raised (was: their extra entries will not be used).
"""
self.verbose = False
self._comedi = c # keep a local copy around
# local reference here.
self.filename = filename
self.state = "Closed"
- self.open()
+ if dev == None :
+ self.open()
+ else :
+ self.fakeOpen(dev)
self._setup_device_type(subdevice, devtype)
self._setup_channels(chan, aref, range)
- self._output = output
+ self.output = output
def __del__(self) :
self.close()
def open(self) :
if self.state == "Closed" :
- self._dev = self._comedi.comedi_open(self.filename)
- if self.dev < 0 :
+ dev = self._comedi.comedi_open(self.filename)
+ if dev < 0 :
self._comedi.comedi_perror("comedi_open")
raise pycomediError, "Cannot open %s" % self.filename
- self.state = "Opened"
+ self.fakeOpen(dev)
+ def fakeOpen(self, dev):
+ """fake open: if you open the comedi device file yourself, use this
+ method to pass in the opened file descriptor and declare the
+ port "Open".
+ """
+ if dev < 0 :
+ raise pycomediError, "Invalid file descriptor %d" % dev
+ self.dev = dev
+ self.state = "Open"
def close(self) :
if self.state != "Closed" :
- rc = self._comedi.comedi_close(self._dev)
+ rc = self._comedi.comedi_close(self.dev)
if rc < 0 :
self._comedi.comedi_perror("comedi_close")
raise pycomediError, "Cannot close %s" % self.filename
- self.state = "Closed"
+ self.fakeClose()
+ def fakeClose(self):
+ """fake close: if you want to close the comedi device file yourself,
+ use this method to let the port know it has been "Closed".
+ """
+ self.dev = None
+ self.state = "Closed"
def _setup_device_type(self, subdevice, devtype) :
"""check that the specified subdevice exists,
searching for an appropriate subdevice if subdevice == -1
"""
self._devtype = devtype
if (subdevice < 0) : # autodetect an output device
- self._subdev = self._comedi.comedi_find_subdevice_by_type(self._dev, self._devtype, 0) # 0 is starting subdevice
- if self._subdev < 0 :
+ self.subdev = self._comedi.comedi_find_subdevice_by_type(self.dev, self._devtype, 0) # 0 is starting subdevice
+ if self.subdev < 0 :
self._comedi.comedi_perror("comedi_find_subdevice_by_type")
raise pycomediError, "Could not find a %d device" % (self._devtype)
else :
- self._subdev = subdevice
- type = self._comedi.comedi_get_subdevice_type(self._dev, self._subdev)
+ self.subdev = subdevice
+ type = self._comedi.comedi_get_subdevice_type(self.dev, self.subdev)
if type != self._devtype :
if type < 0 :
self._comedi.comedi_perror("comedi_get_subdevice_type")
- raise pycomediError, "Comedi subdevice %d has wrong type %d" % (self._subdev, type)
- def _setup_channels(self, chan, aref, range) :
+ raise pycomediError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
+ def _setup_channels(self, chan, aref, rng) :
"""check that the specified channels exists, and that the arefs and
ranges are legal for those channels. Also allocate a range
item for each channel, to allow converting between physical
units and comedi units even when the device is not open.
inputs:
- chan: an iterable of the channels you wish to control ((0,1,2,3))
- aref: the analog reference for these channels (comedi.AREF_GROUND)
+ chan: an iterable of the channels you wish to control [(0,1,2,3)]
+ aref: the analog references for these channels [(comedi.AREF_GROUND,)]
values include
comedi.AREF_GROUND
comedi.AREF_COMMON
comedi.AREF_DIFF
comedi.AREF_OTHER
- range: the range for these channels (0)
+ rng: the ranges for these channels [(0,)]
+ Note that neither aref nor rng need to be the same length as
+ the channel tuple. If they are shorter, their last entry will
+ be repeated as needed. If they are longer, pycomediError will
+ be raised (was: their extra entries will not be used).
"""
- self._chan = chan
- self._aref = aref
- self._range = range
- subdev_n_chan = self._comedi.comedi_get_n_channels(self._dev, self._subdev)
- self._maxdata = []
+ self.chan = chan
+ self.nchan = len(self.chan)
+ self._aref = _expand_tuple(aref, self.nchan)
+ self._range = _expand_tuple(rng, self.nchan)
+ self.maxdata = []
self._comedi_range = []
- for chan in self._chan :
+ subdev_n_chan = self._comedi.comedi_get_n_channels(self.dev, self.subdev)
+ self.cr_chan = self._comedi.chanlist(self.nchan)
+ for i,chan,aref,rng in zip(range(self.nchan), self.chan, self._aref, self._range) :
if int(chan) != chan :
raise pycomediError, "Channels must be integers, not %s" % str(chan)
if chan >= subdev_n_chan :
- raise pycomediError, "Channel %d > subdevice %d's largest chan %d" % (chan, self._subdev, subdev_n_chan-1)
- n_range = self._comedi.comedi_get_n_ranges(self._dev, self._subdev, chan)
- if range > n_range :
- raise pycomediError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1)
- maxdata = self._comedi.comedi_get_maxdata(self._dev, self._subdev, chan)
- self._maxdata.append(maxdata)
- comrange = self._comedi.comedi_get_range(self._dev, self._subdev, chan, range)
+ raise pycomediError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1)
+ n_range = self._comedi.comedi_get_n_ranges(self.dev, self.subdev, chan)
+ if rng > n_range :
+ raise pycomediError, "Range %d > subdevice %d, chan %d's largest range %d" % (rng, subdev, chan, n_range-1)
+ maxdata = self._comedi.comedi_get_maxdata(self.dev, self.subdev, chan)
+ self.maxdata.append(maxdata)
+ comrange = self._comedi.comedi_get_range(self.dev, self.subdev, chan, rng)
# comrange becomes invalid if device is closed, so make a copy...
comrange_copy = self._comedi.comedi_range()
comrange_copy.min = comrange.min
comrange_copy.max = comrange.max
comrange_copy.unit = comrange.unit
self._comedi_range.append(comrange_copy)
+ self.cr_chan[i] = self._comedi.cr_pack(chan, rng, aref)
def comedi_to_phys(self, chan_index, comedi) :
- phys = self._comedi.comedi_to_phys(comedi, self._comedi_range[chan_index], self._maxdata[chan_index])
+ phys = self._comedi.comedi_to_phys(comedi, self._comedi_range[chan_index], self.maxdata[chan_index])
if self.verbose :
- print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self._subdev, self._chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self._maxdata[chan_index])
+ print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self.maxdata[chan_index])
return phys
def phys_to_comedi(self, chan_index, phys) :
- comedi = self._comedi.comedi_from_phys(phys, self._comedi_range[chan_index], self._maxdata[chan_index])
+ comedi = self._comedi.comedi_from_phys(phys, self._comedi_range[chan_index], self.maxdata[chan_index])
if self.verbose :
- print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self._subdev, self._chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self._maxdata[chan_index])
+ print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self.maxdata[chan_index])
return comedi
class PyComediSingleIO (PyComediIO) :
"Software timed single-point input/ouput"
def __init__(self, **kwargs) :
"""inputs:
- filename: comedi device file for your device ("/dev/comedi0").
- subdevice: the digital IO subdevice (-1 for autodetect)
- devtype: the devoce type
+ filename: comedi device file for your device ["/dev/comedi0"].
+ subdevice: the analog output subdevice [-1 for autodetect]
+ devtype: the device type [c.COMEDI_SUBD_AI]
values include
comedi.COMEDI_SUBD_DI
comedi.COMEDI_SUBD_DO
comedi.COMEDI_SUBD_DIO
comedi.COMEDI_SUBD_AI
comedi.COMEDI_SUBD_AO
- chan: an iterable of the channels you wish to control ((0,1,2,3))
- aref: the analog reference for these channels (comedi.AREF_GROUND)
+ chan: an iterable of the channels you wish to control [(0,1,2,3)]
+ aref: the analog references for these channels [(comedi.AREF_GROUND,)]
values include
comedi.AREF_GROUND
comedi.AREF_COMMON
comedi.AREF_DIFF
comedi.AREF_OTHER
- range: the range for these channels (0)
+ range: the ranges for these channels [(0,)]
+ output: whether to use the lines as output (vs input) (False)
+ dev: if you've already opened the device file, pass in the
+ open device (None for internal open)
+ Note that neither aref nor range need to be the same length as
+ the channel tuple. If they are shorter, their last entry will
+ be repeated as needed. If they are longer, pycomediError will
+ be raised (was: their extra entries will not be used).
"""
- common.PyComediIO.__init__(self, **kwargs)
+ PyComediIO.__init__(self, **kwargs)
def write_chan_index(self, chan_index, data) :
"""inputs:
chan_index: the channel you wish to write to
data: the value you wish to write to that channel
"""
- if self._output != True :
+ if self.output != True :
raise pycomediError, "Must be an output to write"
- rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, data);
+ rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self._range[chan_index], self._aref[chan_index], data);
if rc != 1 : # the number of samples written
self._comedi.comedi_perror("comedi_data_write")
raise pycomediError, "comedi_data_write returned %d" % rc
outputs:
data: the value read from that channel
"""
- if self._output == True :
+ if self.output != False :
raise pycomediError, "Must be an input to read"
- rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref);
+ rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self._range[chan_index], self._aref[chan_index]);
if rc != 1 : # the number of samples read
self._comedi.comedi_perror("comedi_data_read")
raise pycomediError, "comedi_data_read returned %d" % rc
# Simultaneous, finite, buffered analog inpout/output using comedi drivers
-import comedi
-from numpy import array, fromstring, uint16, float32, pi, sin
+import comedi as c
+import common
+from numpy import array, fromstring, float32, pi, sin
import int16_rw
# imports for testing
from scipy.stats import linregress
from os import system
+VERSION = common.VERSION
#VERBOSE = True
VERBOSE = False
AO_TRIGGERS_OFF_AI_START = True
#AO_TRIGGERS_OFF_AI_START = False
-class simAioError (Exception) :
+class simAioError (common.pycomediError) :
"Simultaneous Analog IO error"
pass
-_example_array = array([0], dtype=uint16) # for typing, since I don't know what type(array) should be
+_example_array = array([0], dtype=int16_rw.DATA_T) # for typing, since I don't know what type(array) should be
_cmdtest_message = ["success",
"invalid source",
"argument conflict",
"invalid chanlist"]
-def _print_cmdsrc(source) :
- if source & comedi.TRIG_NONE : print "none|",
- if source & comedi.TRIG_NOW : print "now|",
- if source & comedi.TRIG_FOLLOW : print "follow|",
- if source & comedi.TRIG_TIME : print "time|",
- if source & comedi.TRIG_TIMER : print "timer|",
- if source & comedi.TRIG_COUNT : print "count|",
- if source & comedi.TRIG_EXT : print "ext|",
- if source & comedi.TRIG_INT : print "int|",
- if source & comedi.TRIG_OTHER : print "other|",
+class cmd (object) :
+ """Wrap a comedi command in more Pythonic trappings.
-def _print_command(cmd) :
- print "subdevice: \t%d" % cmd.subdev
- print "flags: \t0x%x" % cmd.flags
- print "start: \t",
- _print_cmdsrc(cmd.start_src)
- print "\t%d" % cmd.start_arg
- print "scan_begin:\t",
- _print_cmdsrc(cmd.scan_begin_src)
- print "\t%d" % cmd.scan_begin_arg
- print "convert: \t",
- _print_cmdsrc(cmd.convert_src)
- print "\t%d" % cmd.convert_arg
- print "scan_end: \t",
- _print_cmdsrc(cmd.scan_end_src)
- print "\t%d" % cmd.scan_end_arg
- print "stop: \t",
- _print_cmdsrc(cmd.stop_src)
- print "\t%d" % cmd.stop_arg
+ Due to my limited needs, this class currently only supports
+ software triggered runs (possibly with output triggering off the
+ input trigger) for a finite number of output samples where all of
+ the scan and sample timing is internal and as fast as possible.
-def _expand_tuple(tup, length) :
- "Expand an iterable TUP to a tuple of LENGTH by repeating the last element"
- if len(tup) > length :
- raise simAioError, "Tuple too long."
- elif len(tup) < length :
- temp_tup = tup + tuple((tup[-1],)*(length-len(tup)))
- tup = temp_tup
- return tup
+ See http://www.comedi.org/doc/x621.html#COMEDICMDSTRUCTURE
+ for more details/possibilities.
+ """
+ def __init__(self, IO) :
+ """input:
+ IO : an initialized common.PyComediIO object
+ """
+ self.IO = IO
+ if self.IO.output == True :
+ self.cmdTypeString = "output"
+ else :
+ self.cmdTypeString = "input"
+ self.generate_rough_command()
+ def generate_rough_command(self) :
+ if VERBOSE :
+ print "generate rough %s command" % self.cmdTypeString
+ cmd = self.IO._comedi.comedi_cmd_struct()
+ cmd.subdev = self.IO.subdev
+ if self.IO.output :
+ cmd.flags = self.IO._comedi.CMDF_WRITE
+ else :
+ cmd.flags = 0
+ # decide how to trigger a multi-scan run
+ if self.IO.output and AO_TRIGGERS_OFF_AI_START :
+ cmd.start_src = self.IO._comedi.TRIG_EXT
+ cmd.start_arg = 18 # AI_START1 internal AI start signal
+ else :
+ cmd.start_src = self.IO._comedi.TRIG_INT
+ cmd.start_arg = 0
+ # decide how to trigger a multi-channel scan
+ cmd.scan_begin_src = self.IO._comedi.TRIG_TIMER
+ cmd.scan_begin_arg = 1 # temporary value for now
+ # decide how to trigger a single channel's aquisition
+ if self.IO.output :
+ cmd.convert_src = self.IO._comedi.TRIG_NOW # convert simultaneously (each output has it's own DAC)
+ cmd.convert_arg = 0
+ else :
+ cmd.convert_src = self.IO._comedi.TRIG_TIMER # convert sequentially (all inputs share single ADC)
+ cmd.convert_arg = 1 # time between channels in ns, 1 to convert ASAP
+ # decide when a scan is complete
+ cmd.scan_end_src = self.IO._comedi.TRIG_COUNT
+ cmd.scan_end_arg = self.IO.nchan
+ cmd.stop_src = self.IO._comedi.TRIG_COUNT
+ cmd.stop_arg = 1 # temporary value for now
+ cmd.chanlist = self.IO.cr_chan
+ cmd.chanlist_len = self.IO.nchan
+ self.cmd = cmd
+ self.test_cmd(max_passes=3)
+ def test_cmd(self, max_passes=1) :
+ very_verbose = False
+ i = 0
+ rc = 0
+ if very_verbose :
+ print "Testing command:"
+ _print_command(self.cmd)
+ while i < max_passes :
+ rc = self.IO._comedi.comedi_command_test(self.IO.dev, self.cmd)
+ if (rc == 0) :
+ break
+ if VERBOSE or very_verbose :
+ print "test pass %d, %s" % (i, _cmdtest_message[rc])
+ i += 1
+ if (VERBOSE or very_verbose) and i < max_passes :
+ print "Passing command:\n", self
+ if i >= max_passes :
+ print "Failing command (%d):\n" % rc, self
+ raise simAioError, "Invalid command: %s" % _cmdtest_message[rc]
+ def execute(self) :
+ if VERBOSE :
+ print "Loading %s command" % self.cmdTypeString
+ rc = self.IO._comedi.comedi_command(self.IO.dev, self.cmd)
+ if rc < 0 :
+ self.IO._comedi.comedi_perror("comedi_command")
+ raise simAioError, "Error executing %s command %d" % (self.cmdTypeString, rc)
+ def _cmdsrc(self, source) :
+ str = ""
+ if source & c.TRIG_NONE : str += "none|"
+ if source & c.TRIG_NOW : str += "now|"
+ if source & c.TRIG_FOLLOW : str += "follow|"
+ if source & c.TRIG_TIME : str += "time|"
+ if source & c.TRIG_TIMER : str += "timer|"
+ if source & c.TRIG_COUNT : str += "count|"
+ if source & c.TRIG_EXT : str += "ext|"
+ if source & c.TRIG_INT : str += "int|"
+ if source & c.TRIG_OTHER : str += "other|"
+ return str
+ def __str__(self) :
+ str = "Command on %s (%s):\n" % (self.IO, self.cmdTypeString)
+ str += "subdevice: \t%d\n" % self.cmd.subdev
+ str += "flags: \t0x%x\n" % self.cmd.flags
+ str += "start: \t"
+ str += self._cmdsrc(self.cmd.start_src)
+ str += "\t%d\n" % self.cmd.start_arg
+ str += "scan_begin:\t"
+ str += self._cmdsrc(self.cmd.scan_begin_src)
+ str += "\t%d\n" % self.cmd.scan_begin_arg
+ str += "convert: \t"
+ str += self._cmdsrc(self.cmd.convert_src)
+ str += "\t%d\n" % self.cmd.convert_arg
+ str += "scan_end: \t"
+ str += self._cmdsrc(self.cmd.scan_end_src)
+ str += "\t%d\n" % self.cmd.scan_end_arg
+ str += "stop: \t"
+ str += self._cmdsrc(self.cmd.stop_src)
+ str += "\t%d" % self.cmd.stop_arg
+ return str
+
+class AIO (object) :
+ """Control a simultaneous analog input/output (AIO) device using
+ Comedi drivers.
+
+ The AIO device is modeled as being in one of the following states:
-class aio_obj :
+ Open Device file has been opened, various one-off setup
+ tasks completed.
+ Initialized Any previous activity is complete, ready for a new
+ task
+ Setup New task assigned.
+ Armed The output task is "triggered" (see below)
+ Read The input task is triggered, and input read in
+ Closed
+ Transitions between these states may be achieved with class methods
+ open, __init__ - through Open to Initialized
+ close Any to Closed
+ setup Initialized to Setup
+ arm Setup to Armed
+ start_read Armed to Read
+ reset Setup, Armed, or Read to Initialized
+
+ There are two triggering methods set by the module global
+ AO_TRIGGERS_OFF_AI_START
+ When this global is true, the output and input will start on the
+ exactly the same clock tick (in this case the output "trigger"
+ when "Arming" just primes the output to start when the input start
+ is signaled). However, this functionality at the moment depends
+ on your having a National Instruments card with a DAQ-STC module
+ controling the timing (e.g. E series) and a patched version of
+ ni_mio_common.c in your Comedi kernel. If you do not have an
+ appropriate card, you will either have to implement an appropriate
+ method for your card, or set the global to false, in which case
+ the IO synchronicity depends on the synchronicity of the AO and AI
+ software triggers.
+ """
def __init__(self, filename="/dev/comedi0",
in_subdevice=-1, in_chan=(0,), in_aref=(0,), in_range=(0,),
out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,)) :
- self._comedi = comedi
+ """inputs:
+ filename: comedi device file for your device ("/dev/comedi0").
+ And then for both input and output (in_* and out_*):
+ subdevice: the analog output subdevice (-1 for autodetect)
+ values include
+ comedi.COMEDI_SUBD_DI
+ comedi.COMEDI_SUBD_DO
+ comedi.COMEDI_SUBD_DIO
+ comedi.COMEDI_SUBD_AI
+ comedi.COMEDI_SUBD_AO
+ chan: an iterable of the channels you wish to control ((0,1,2,3))
+ aref: the analog reference for these channels (comedi.AREF_GROUND)
+ values include
+ comedi.AREF_GROUND
+ comedi.AREF_COMMON
+ comedi.AREF_DIFF
+ comedi.AREF_OTHER
+ range: the range for these channels (0)
+ """
+ self._comedi = c
self._filename = filename
- self.state = "Closed"
- self.open()
-
- self._iaref = _expand_tuple(in_aref, len(in_chan))
- self._irange = _expand_tuple(in_range, len(in_chan))
- temp = self._check_options(in_subdevice, in_chan, self._iaref, self._irange, output=False)
- self._isubdev = temp["subdevice"]
- self._ichan_params = temp["chan_params"]
- self._ichan = in_chan
- self.i_nchan = len(self._ichan)
- self._ichanlist = self._comedi.chanlist(self.i_nchan)
- for i in range(self.i_nchan) :
- self._ichanlist[i] = self._comedi.cr_pack(self._ichan[i], self._irange[i], self._iaref[i])
-
- self._oaref = _expand_tuple(out_aref, len(in_chan))
- self._orange = _expand_tuple(out_range, len(in_chan))
- temp = self._check_options(out_subdevice, out_chan, self._oaref, self._orange, output=True)
- self._osubdev = temp["subdevice"]
- self._ochan_params = temp["chan_params"]
- self._ochan = out_chan
- self.o_nchan = len(self._ochan)
- self._ochanlist = self._comedi.chanlist(self.o_nchan)
- for i in range(self.o_nchan) :
- self._ochanlist[i] = self._comedi.cr_pack(self._ochan[i], self._orange[i], self._oaref[i])
-
- self._gen_rough_output_cmd()
- self._gen_rough_input_cmd()
+ # the next section is much like the open() method below,
+ # but in this one we set up all the extra details associated
+ # with the AO and AI structures
+ self.dev = self._comedi.comedi_open(self._filename)
+ self._fd = self._comedi.comedi_fileno(self.dev)
+ if VERBOSE :
+ print "Opened %s on fd %d" % (self._filename, self._fd)
+ self.AI = common.PyComediIO(filename=self._filename, subdevice=in_subdevice, devtype=c.COMEDI_SUBD_AI, chan=in_chan, aref=in_aref, range=in_range, output=False, dev=self.dev)
+ self.AO = common.PyComediIO(filename=self._filename, subdevice=out_subdevice, devtype=c.COMEDI_SUBD_AO, chan=out_chan, aref=out_aref, range=out_range, output=True, dev=self.dev)
+ self.state = "Open"
+ self._icmd = cmd(self.AI)
+ self._ocmd = cmd(self.AO)
self.state = "Initialized"
def __del__(self) :
self.close()
def close(self) :
if self.state != "Closed" :
self.reset(force=True)
- rc = self._comedi.comedi_close(self._dev)
+ rc = self._comedi.comedi_close(self.dev)
if rc < 0 :
self._comedi.comedi_perror("comedi_close")
raise simAioError, "Cannot close %s" % self._filename
if VERBOSE :
print "Closed %s on fd %d" % (self._filename, self._fd)
+ self.AI.fakeClose()
+ self.AO.fakeClose()
+ self.dev = None
+ self._fd = None
self.state = "Closed"
def open(self) :
if self.state != "Closed" :
raise simAioError, "Invalid state %s" % self.state
- self._dev = self._comedi.comedi_open(self._filename)
- self._fd = self._comedi.comedi_fileno(self._dev)
+ self.dev = self._comedi.comedi_open(self._filename)
+ self._fd = self._comedi.comedi_fileno(self.dev)
if VERBOSE :
print "Opened %s on fd %d" % (self._filename, self._fd)
+ self.AI.fakeOpen(self.dev)
+ self.AO.fakeOpen(self.dev)
+ self.state = "Open"
+ self._icmd = cmd(self.AI)
+ self._ocmd = cmd(self.AO)
self.state = "Initialized"
- def _check_options(self, subdevice, chan, aref, rnge, output=True) :
- subdevice = self._check_subdevice(subdevice, output=output)
- chan_params = []
- for i in range(len(chan)) :
- chan_params.append(self._check_chan(subdevice, chan[i], aref[i], rnge[i]))
- if VERBOSE :
- if output :
- print "Output",
- else :
- print "Input",
- print " subdevice with channels %s is valid" % (str(chan))
- return {"subdevice":subdevice,
- "chan_params":chan_params}
- def _check_subdevice(self, subdevice, output=True) :
- if output == True :
- target_type = self._comedi.COMEDI_SUBD_AO
- else :
- target_type = self._comedi.COMEDI_SUBD_AI
- if (subdevice < 0) : # autodetect an input device
- subdevice = self._comedi.comedi_find_subdevice_by_type(self._dev, target_type, 0) # 0 is starting subdevice
- else :
- type = self._comedi.comedi_get_subdevice_type(self._dev, subdevice)
- if type != target_type :
- raise simAioError, "Comedi subdevice %d has wrong type %d" % (subdevice, type)
- return subdevice
- def _check_chan(self, subdevice, chan, aref, range) :
- subdev_n_chan = self._comedi.comedi_get_n_channels(self._dev, subdevice)
- if chan >= subdev_n_chan :
- raise simAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, subdevice, subdev_n_chan-1)
- n_range = self._comedi.comedi_get_n_ranges(self._dev, subdevice, chan)
- if range >= n_range :
- raise simAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdevice, chan, n_range-1)
- maxdata = self._comedi.comedi_get_maxdata(self._dev, subdevice, chan)
- comrange = self._comedi.comedi_get_range(self._dev, subdevice, chan, range)
- return {"maxdata":maxdata, "comrange": comrange}
- def _gen_rough_output_cmd(self) :
- if VERBOSE :
- print "generate rough output command"
- cmd = self._comedi.comedi_cmd_struct()
- cmd.subdev = self._osubdev
- cmd.flags = self._comedi.CMDF_WRITE
- if AO_TRIGGERS_OFF_AI_START :
- cmd.start_src = self._comedi.TRIG_EXT
- cmd.start_arg = 18 # AI_START1 internal AI start signal
- else :
- cmd.start_src = self._comedi.TRIG_INT
- cmd.start_arg = 0
- cmd.scan_begin_src = self._comedi.TRIG_TIMER
- cmd.scan_begin_arg = 1 # temporary value for now
- cmd.convert_src = self._comedi.TRIG_NOW
- cmd.convert_arg = 0
- cmd.scan_end_src = self._comedi.TRIG_COUNT
- cmd.scan_end_arg = self.o_nchan
- cmd.stop_src = self._comedi.TRIG_COUNT
- cmd.stop_arg = 1 # temporary value for now
- cmd.chanlist = self._ochanlist
- cmd.chanlist_len = self.o_nchan
- self._test_cmd(cmd, max_passes=3)
- self._ocmd = cmd
- def _gen_rough_input_cmd(self) :
- if VERBOSE :
- print "generate rough input command"
- cmd = self._comedi.comedi_cmd_struct()
- cmd.subdev = self._isubdev
- cmd.flags = 0
- cmd.start_src = self._comedi.TRIG_INT
- cmd.start_arg = 0
- cmd.scan_begin_src = self._comedi.TRIG_TIMER
- cmd.scan_begin_arg = 1 # temporary value for now
- cmd.convert_src = self._comedi.TRIG_TIMER
- cmd.convert_arg = 1
- cmd.scan_end_src = self._comedi.TRIG_COUNT
- cmd.scan_end_arg = self.i_nchan
- cmd.stop_src = self._comedi.TRIG_COUNT
- cmd.stop_arg = 1 # temporary value for now
- cmd.chanlist = self._ichanlist
- cmd.chanlist_len = self.i_nchan
- self._test_cmd(cmd, max_passes=3)
- self._icmd = cmd
- def _test_cmd(self, cmd, max_passes=1) :
- very_verbose = False
- i = 0
- rc = 0
- if very_verbose :
- print "Testing command:"
- _print_command(cmd)
- while i < max_passes :
- rc = self._comedi.comedi_command_test(self._dev, cmd)
- if (rc == 0) :
- break
- if VERBOSE or very_verbose :
- print "test pass %d, %s" % (i, _cmdtest_message[rc])
- i += 1
- if (VERBOSE or very_verbose) and i < max_passes :
- print "Passing command:"
- _print_command(cmd)
- if i >= max_passes :
- print "Failing command:"
- _print_command(cmd)
- raise simAioError, "Invalid command: %s" % _cmdtest_message[rc]
def setup(self, nsamps, freq, out_buffer) :
if self.state != "Initialized" :
raise simAioError, "Invalid state %s" % self.state
if type(out_buffer) != type(_example_array) :
raise simAioError, "out_buffer must be a numpy array, not a %s" % str(type(out_buffer))
- self._ocmd.scan_begin_arg = int(1e9/freq)
- self._ocmd.stop_arg = nsamps
+ self._ocmd.cmd.scan_begin_arg = int(1e9/freq)
+ self._ocmd.cmd.stop_arg = nsamps
if VERBOSE :
- print "Configure the board (%d ns per scan, %d samps)" % (self._ocmd.scan_begin_arg, self._ocmd.stop_arg)
+ print "Configure the board (%d ns per scan, %d samps)" % (self._ocmd.cmd.scan_begin_arg, self._ocmd.cmd.stop_arg)
self._onremain = nsamps
- self._test_cmd(self._ocmd)
- rc = self._comedi.comedi_command(self._dev, self._ocmd)
- if rc < 0 :
- self._comedi.comedi_perror("comedi_command")
- raise simAioError, "Error executing output command %d" % rc
- self._icmd.scan_begin_arg = int(1e9/freq)
- self._icmd.stop_arg = nsamps
- self._test_cmd(self._icmd)
+ self._ocmd.test_cmd()
+ self._ocmd.execute()
+ self._icmd.cmd.scan_begin_arg = int(1e9/freq)
+ self._icmd.cmd.stop_arg = nsamps
+ self._icmd.test_cmd()
self._inremain = nsamps
- rc = self._comedi.comedi_command(self._dev, self._icmd)
- if rc < 0 :
- self._comedi.comedi_perror("comedi_command")
- raise simAioError, "Error executing input command"
-
+ self._icmd.execute()
if VERBOSE :
- print "Write %d output samples to the card" % (nsamps*self.o_nchan)
- rc = int16_rw.write_samples(self._fd, nsamps*self.o_nchan, out_buffer, 1)
- if rc != nsamps*self.o_nchan :
+ print "Write %d output samples to the card" % (nsamps*self.AO.nchan)
+ rc = int16_rw.write_samples(self._fd, out_buffer, 0, nsamps*self.AO.nchan, 1)
+ if rc != nsamps*self.AO.nchan :
raise simAioError, "Error %d writing output buffer\n" % rc
- rc = int16_rw.write_samples(self._fd, self.o_nchan, out_buffer[-self.o_nchan:], 1) # HACK, add an extra sample for each channel to the output buffer
- if rc != self.o_nchan :
+ if VERBOSE :
+ print "Writing extra output"
+ rc = int16_rw.write_samples(self._fd, out_buffer, (nsamps-1)*self.AO.nchan, self.AO.nchan, 1) # HACK, add an extra sample for each channel to the output buffer
+ if rc != self.AO.nchan :
raise simAioError, "Error %d writing hack output buffer\n" % rc
- # maybe will avoid resetting...
+ # Without the hack, output jumps back to 0V after the command completes
self._nsamps = nsamps
self.state = "Setup"
def arm(self) :
raise simAioError, "Invalid state %s" % self.state
if VERBOSE :
print "Arm the analog ouptut"
- self._comedi_internal_trigger(self._osubdev)
+ self._comedi_internal_trigger(self.AO.subdev)
self.state = "Armed"
def start_read(self, in_buffer) :
if self.state != "Armed" :
raise simAioError, "Invalid state %s" % self.state
if VERBOSE :
print "Start the run"
- self._comedi_internal_trigger(self._isubdev)
+ self._comedi_internal_trigger(self.AI.subdev)
if VERBOSE :
- print "Read %d input samples from the card" % (self._nsamps*self.i_nchan)
- rc = int16_rw.read_samples(self._fd, self._nsamps*self.i_nchan, in_buffer, -1)
- if rc != self._nsamps*self.i_nchan :
+ print "Read %d input samples from the card" % (self._nsamps*self.AI.nchan)
+ rc = int16_rw.read_samples(self._fd, in_buffer, 0, self._nsamps*self.AI.nchan, -1)
+ if rc != self._nsamps*self.AI.nchan :
raise simAioError, "Error %d reading input buffer\n" % rc
self.state = "Read"
def _comedi_internal_trigger(self, subdevice) :
insn.data = data
insn.n = 1
data[0] = 0
- rc = self._comedi.comedi_do_insn(self._dev, insn)
+ rc = self._comedi.comedi_do_insn(self.dev, insn)
def reset(self, force=False) :
if VERBOSE :
print "Reset the analog subdevices"
# clean up after the read
- rc = self._comedi.comedi_cancel(self._dev, self._osubdev)
+ rc = self._comedi.comedi_cancel(self.dev, self.AO.subdev)
if rc < 0 :
self._comedi.comedi_perror("comedi_cancel")
raise simAioError, "Error cleaning up output command"
- rc = self._comedi.comedi_cancel(self._dev, self._isubdev)
+ rc = self._comedi.comedi_cancel(self.dev, self.AI.subdev)
if rc < 0 :
self._comedi.comedi_perror("comedi_cancel")
raise simAioError, "Error cleaning up input command"
# define the test suite
-def _test_aio_obj(aio=None, start_wait=0, verbose=False) :
+def _test_AIO(aio=None, start_wait=0, verbose=False) :
if (verbose) :
- print "_test_aio_obj(start_wait = %g)" % start_wait
+ print "_test_AIO(start_wait = %g)" % start_wait
nsamps = 10
- out_data = array([0]*nsamps, dtype=uint16)
- in_data = array([0]*nsamps, dtype=uint16)
+ out_data = array([0]*nsamps, dtype=int16_rw.DATA_T)
+ in_data = array([0]*nsamps, dtype=int16_rw.DATA_T)
for i in range(nsamps) :
out_data[i] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
aio.setup(10, 1000, out_data)
good_run = 0
good_run_arr = []
for i in range(num_tests) :
- out_data, in_data = _test_aio_obj(aio, start_wait)
+ out_data, in_data = _test_AIO(aio, start_wait)
gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
grads[i] = gradient
if verbose :
print "good run stem and leaf:"
system(call)
-def _test_aio_obj_multi_chan(aio=None, start_wait=0, verbose=False) :
+def _test_AIO_multi_chan(aio=None, start_wait=0, verbose=False) :
if (verbose) :
- print "_test_aio_obj_multi_chan(start_wait = %g)" % start_wait
+ print "_test_AIO_multi_chan(start_wait = %g)" % start_wait
nsamps = 10
- out_data = array([0]*nsamps*aio.o_nchan, dtype=uint16)
- in_data = array([0]*nsamps*aio.i_nchan, dtype=uint16)
+ out_data = array([0]*nsamps*aio.AO.nchan, dtype=int16_rw.DATA_T)
+ in_data = array([0]*nsamps*aio.AI.nchan, dtype=int16_rw.DATA_T)
# set up interleaved data
for i in range(nsamps) :
- out_data[i*aio.o_nchan] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
- for j in range(1, aio.o_nchan) :
- out_data[i*aio.o_nchan + j] = 0
+ out_data[i*aio.AO.nchan] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
+ for j in range(1, aio.AO.nchan) :
+ out_data[i*aio.AO.nchan + j] = 0
aio.setup(10, 1000, out_data)
aio.arm()
sleep(start_wait)
aio.reset()
if (verbose) :
print "#",
- for j in range(aio.o_nchan) :
- print "%s\t" % aio._ochan[j],
- for j in range(aio.i_nchan) :
- print "%s\t" % aio._ichan[j],
+ for j in range(aio.AO.nchan) :
+ print "%s\t" % aio.AO.chan[j],
+ for j in range(aio.AI.nchan) :
+ print "%s\t" % aio.AI.chan[j],
print ""
for i in range(nsamps) :
- for j in range(aio.o_nchan) :
- print "%s\t" % out_data[i*aio.o_nchan+j],
- for j in range(aio.i_nchan) :
- print "%s\t" % in_data[i*aio.i_nchan+j],
+ for j in range(aio.AO.nchan) :
+ print "%s\t" % out_data[i*aio.AO.nchan+j],
+ for j in range(aio.AI.nchan) :
+ print "%s\t" % in_data[i*aio.AI.nchan+j],
print ""
return (out_data, in_data)
def test() :
- aio = aio_obj()
- _test_aio_obj(aio, start_wait = 0, verbose=True)
- _test_aio_obj(aio, start_wait = 0.5, verbose=True)
+ aio = AIO(in_chan=(0,), out_chan=(0,))
+ _test_AIO(aio, start_wait = 0, verbose=True)
+ _test_AIO(aio, start_wait = 0.5, verbose=True)
aio.close()
aio.open()
- #_repeat_aio_test(aio, num_tests=100, start_wait=0, verbose=False)
+ _repeat_aio_test(aio, num_tests=500, start_wait=0, verbose=False)
aio.close()
- aiom = aio_obj(in_chan=(0,1,2,3), out_chan=(0,1))
- _test_aio_obj_multi_chan(aiom, start_wait = 0, verbose=True)
+ aiom = AIO(in_chan=(0,1,2,3), out_chan=(0,1))
+ _test_AIO_multi_chan(aiom, start_wait = 0, verbose=True)
if __name__ == "__main__" :
test()
class AI (common.PyComediSingleIO) :
def __init__(self, **kwargs) :
"""inputs:
- filename: comedi device file for your device ("/dev/comedi0").
- subdevice: the analog output subdevice (-1 for autodetect)
- chan: an iterable of the channels you wish to control ((0,1,2,3))
- aref: the analog reference for these channels (comedi.AREF_GROUND)
+ filename: comedi device file for your device ["/dev/comedi0"].
+ subdevice: the analog input subdevice (-1 for autodetect)
+ chan: an iterable of the channels you wish to control [(0,1,2,3)]
+ aref: the analog references for these channels [(comedi.AREF_GROUND,)]
values include:
comedi.AREF_GROUND
comedi.AREF_COMMON
comedi.AREF_DIFF
comedi.AREF_OTHER
- range: the range for these channels (0)
- output: whether to use the lines as output (vs input) (True)
+ range: the ranges for these channels [(0,)]
"""
- common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AI, **kwargs)
+ common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AI, output=False, **kwargs)
def read(self) :
"""outputs:
data: a list of read data values in Comedi units
"""
- data = range(len(self.chan))
- for i in range(len(self.chan)) :
+ data = range(self.nchan)
+ for i in range(self.nchan) :
data[i] = self.read_chan_index(i)
#print "Read %s, got %s" % (str(self.chan), str(data))
return data
-def _test_ai_obj() :
+def _test_AI() :
ai = AI()
print "read ", ai.read()
print "read ", ai.read()
class AO (common.PyComediSingleIO) :
def __init__(self, **kwargs) :
"""inputs:
- filename: comedi device file for your device ("/dev/comedi0").
- subdevice: the analog output subdevice (-1 for autodetect)
- chan: an iterable of the channels you wish to control ((0,1,2,3))
- aref: the analog reference for these channels (comedi.AREF_GROUND)
+ filename: comedi device file for your device ["/dev/comedi0"].
+ subdevice: the analog output subdevice [-1 for autodetect]
+ chan: an iterable of the channels you wish to control [(0,1,2,3)]
+ aref: the analog references for these channels [(comedi.AREF_GROUND,)]
values include:
comedi.AREF_GROUND
comedi.AREF_COMMON
comedi.AREF_DIFF
comedi.AREF_OTHER
- range: the range for these channels (0)
- output: whether to use the lines as output (vs input) (True)
+ range: the ranges for these channels [(0,)]
"""
- common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AO, **kwargs)
+ common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AO, output=True, **kwargs)
def write(self, data) :
- if len(data) != len(self.chan) :
- raise sngAioError, "data length %d != the number of channels (%d)" % (len(data), len(self.chan))
- for i in range(len(self.chan)) :
+ if len(data) != self.nchan :
+ raise sngAioError, "data length %d != the number of channels (%d)" % (len(data), self.nchan)
+ for i in range(self.nchan) :
self.write_chan_index(i, data[i])
-def _test_ao_obj() :
- ao = AO()
+def _test_AO() :
+ ao = AO(chan=(0,1))
ao.write([0,0])
ao.write([3000,3000])
ao.write([0,0])
def _fit_with_residual(out_data, in_data, channel) :
"Fit in_data(out_data) to a straight line & return residual"
from scipy.stats import linregress
+ from numpy import zeros
gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
print "y = %g + %g x" % (intercept, gradient)
print "r = ", r_value # correlation coefficient = covariance / (std_dev_x*std_dev_y)
print "err = ", std_err # root mean sqared error of best fit
if gradient < .7 or p_value > 0.05 :
raise sngAioError, "Out channel %d != in channel %d" % (channel, channel)
- residual = zeros((points,))
- for i in range(points) :
+ residual = zeros((len(out_data),))
+ for i in range(len(out_data)) :
pred_y = intercept + gradient * out_data[i]
residual[i] = in_data[i] - pred_y
return residual
except ImportError :
pass # ignore plot erros
-def _test_aio() :
+def _test_AIO() :
"Test AO and AI by cabling AO0 into AI0 and sweeping voltage"
from scipy.stats import linregress
from numpy import linspace, zeros
- ao = ao_obj(chan=(0,1))
- ai = ai_obj(chan=(0,1))
+ ao = AO(chan=(0,1))
+ ai = AI(chan=(0,1))
start = 0.1 * ao.maxdata[0]
stop = 0.9 * ao.maxdata[0]
points = 10
in_data0 = zeros((points,))
in_data1 = zeros((points,))
for i in range(points) :
- ao.write([out_data0[i], out_data1[i]])
+ ao.write([int(out_data0[i]), int(out_data1[i])])
id = ai.read()
in_data0[i] = id[0]
in_data1[i] = id[1]
raise Exception, "Input 0, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data0[i], in_data0[i], residual0[i])
if abs(residual1[i]) > 10 : # HACK, hardcoded maximum nonlinearity
raise Exception, "Input 1, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data1[i], in_data1[i], residual1[i])
- print "_test_aio success"
+ print "aio success"
def test() :
- _test_ai_obj()
- _test_ao_obj()
- _test_aio()
+ _test_AI()
+ _test_AO()
+ _test_AIO()
if __name__ == "__main__" :
test()