From ce9d4a45f84e081206cd171d03ec09c26a3d6821 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Tue, 11 Nov 2008 10:26:35 -0500 Subject: [PATCH] Added subdevice flag checking ability and reworked some tests. I was hoping SDF_BUSY would let me know when the output job was complete, but for some reason it doesn't seem to. See simult_aio.AIO.reset(). --- pycomedi/__init__.py | 1 + pycomedi/common.py | 181 ++++++++++++++++++++++++++- pycomedi/simult_aio.py | 273 ++++++++++++++++++++++++++++++++--------- 3 files changed, 396 insertions(+), 59 deletions(-) diff --git a/pycomedi/__init__.py b/pycomedi/__init__.py index e69de29..c90c662 100644 --- a/pycomedi/__init__.py +++ b/pycomedi/__init__.py @@ -0,0 +1 @@ +from common import VERSION diff --git a/pycomedi/common.py b/pycomedi/common.py index ca8a844..c3b8c95 100644 --- a/pycomedi/common.py +++ b/pycomedi/common.py @@ -1,6 +1,7 @@ """Some Comedi operations common to analog and digital IO""" import comedi as c +import time VERSION = 0.1 @@ -17,6 +18,173 @@ def _expand_tuple(tup, length) : tup = temp_tup return tup +class SubdeviceFlags (object) : + """ + Descriptions from http://www.comedi.org/doc/r5153.html + (the comedi_get_subdevice_flags documentation) + + SDF_BUSY: + The subdevice is busy performing an asynchronous command. A + subdevice being "busy" is slightly different from the "running" + state flagged by SDF_RUNNING. A "running" subdevice is always + "busy", but a "busy" subdevice is not necessarily "running". For + example, suppose an analog input command has been completed by the + hardware, but there are still samples in Comedi's buffer waiting + to be read out. In this case, the subdevice is not "running", but + is still "busy" until all the samples are read out or + comedi_cancel() is called. + + SDF_BUSY_OWNER: + The subdevice is "Busy", and the command it is running was started + by the current process. + + SDF_LOCKED: + The subdevice has been locked by comedi_lock(). + + SDF_LOCK_OWNER: + The subdevice is locked, and was locked by the current process. + + SDF_MAXDATA: + The maximum data value for the subdevice depends on the channel. + + SDF_FLAGS: + The subdevice flags depend on the channel (unfinished/broken + support in library). + + SDF_RANGETYPE: + The range type depends on the channel. + + SDF_CMD: + The subdevice supports asynchronous commands. + + SDF_SOFT_CALIBRATED: + The subdevice relies on the host to do calibration in software. + Software calibration coefficients are determined by the + comedi_soft_calibrate utility. See the description of the + comedi_get_softcal_converter() function for more information. + + SDF_READABLE: + The subdevice can be read (e.g. analog input). + + SDF_WRITABLE: + The subdevice can be written to (e.g. analog output). + + SDF_INTERNAL: + The subdevice does not have externally visible lines. + + SDF_GROUND: + The subdevice supports AREF_GROUND. + + SDF_COMMON: + The subdevice supports AREF_COMMON. + + SDF_DIFF: + The subdevice supports AREF_DIFF. + + SDF_OTHER: + The subdevice supports AREF_OTHER + + SDF_DITHER: + The subdevice supports dithering (via the CR_ALT_FILTER chanspec + flag). + + SDF_DEGLITCH: + The subdevice supports deglitching (via the CR_ALT_FILTER chanspec + flag). + + SDF_RUNNING: + An asynchronous command is running. You can use this flag to poll + for the completion of an output command. + + SDF_LSAMPL: + The subdevice uses the 32 bit lsampl_t type instead of the 16 bit + sampl_t for asynchronous command data. + + SDF_PACKED: + The subdevice uses bitfield samples for asynchronous command data, + one bit per channel (otherwise it uses one sampl_t or lsampl_t per + channel). Commonly used for digital subdevices. + """ + def __init__(self, comedi) : + self.lastUpdate = None + self._flags = None + self._comedi = comedi + def update(self, dev, subdev) : + "Must be called on an open device" + self._flags = self._comedi.comedi_get_subdevice_flags(dev, subdev) + self.lastupdate = time.time() + def _check(self, sdf_flag) : + assert self._flags != None, "Cannot return status of uninitialized flag" + if self._flags & sdf_flag : + return True + return False + def busy(self) : + return(self._check(self._comedi.SDF_BUSY)) + def busyOwner(self) : + return(self._check(self._comedi.SDF_BUSY_OWNER)) + def locked(self) : + return(self._check(self._comedi.SDF_LOCKED)) + def lockOwner(self) : + return(self._check(self._comedi.SDF_LOCK_OWNER)) + def maxdata(self) : + return(self._check(self._comedi.SDF_MAXDATA)) + def flags(self) : + return(self._check(self._comedi.SDF_FLAGS)) + def rangetype(self) : + return(self._check(self._comedi.SDF_RANGETYPE)) + def cmd(self) : + return(self._check(self._comedi.SDF_CMD)) + def softCalibrated(self) : + return(self._check(self._comedi.SDF_SOFT_CALIBRATED)) + def readable(self) : + return(self._check(self._comedi.SDF_READABLE)) + def writable(self) : + return(self._check(self._comedi.SDF_WRITABLE)) + def internal(self) : + return(self._check(self._comedi.SDF_INTERNAL)) + def ground(self) : + return(self._check(self._comedi.SDF_GROUND)) + def common(self) : + return(self._check(self._comedi.SDF_COMMON)) + def diff(self) : + return(self._check(self._comedi.SDF_DIFF)) + def other(self) : + return(self._check(self._comedi.SDF_OTHER)) + def dither(self) : + return(self._check(self._comedi.SDF_DITHER)) + def deglitch(self) : + return(self._check(self._comedi.SDF_DEGLITCH)) + def running(self) : + return(self._check(self._comedi.SDF_RUNNING)) + def lsampl(self) : + return(self._check(self._comedi.SDF_LSAMPL)) + def packed(self) : + return(self._check(self._comedi.SDF_PACKED)) + def __str__(self) : + s = "" + s += "busy : %s\n" % self.busy() + s += "busyOwner : %s\n" % self.busyOwner() + s += "locked : %s\n" % self.locked() + s += "lockedOwner : %s\n" % self.lockOwner() + s += "maxdata : %s\n" % self.maxdata() + s += "flags : %s\n" % self.flags() + s += "rangetype : %s\n" % self.rangetype() + s += "cmd : %s\n" % self.cmd() + s += "softCalibrated : %s\n" % self.softCalibrated() + s += "readable : %s\n" % self.readable() + s += "writable : %s\n" % self.writable() + s += "internal : %s\n" % self.internal() + s += "ground : %s\n" % self.ground() + s += "common : %s\n" % self.common() + s += "diff : %s\n" % self.diff() + s += "other : %s\n" % self.other() + s += "dither : %s\n" % self.dither() + s += "deglitch : %s\n" % self.deglitch() + s += "running : %s\n" % self.running() + s += "lsampl : %s\n" % self.lsampl() + s += "packed : %s" % self.packed() + return s + class PyComediIO (object) : "Base class for Comedi IO operations" def __init__(self, filename="/dev/comedi0", subdevice=-1, devtype=c.COMEDI_SUBD_AI, chan=(0,1,2,3), aref=(c.AREF_GROUND,), range=(0,), output=False, dev=None) : @@ -118,6 +286,10 @@ class PyComediIO (object) : if type < 0 : self._comedi.comedi_perror("comedi_get_subdevice_type") raise pycomediError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type) + self.flags = SubdeviceFlags(self._comedi) + self.updateFlags() + def updateFlags(self) : + self.flags.update(self.dev, self.subdev) def _setup_channels(self, chan, aref, rng) : """check that the specified channels exists, and that the arefs and ranges are legal for those channels. Also allocate a range @@ -163,9 +335,14 @@ class PyComediIO (object) : comrange_copy.unit = comrange.unit self._comedi_range.append(comrange_copy) self.cr_chan[i] = self._comedi.cr_pack(chan, rng, aref) - def comedi_to_phys(self, chan_index, comedi) : + def comedi_to_phys(self, chan_index, comedi, useNAN=True) : + if useNAN == True : + oor = self._comedi.COMEDI_OOR_NAN + else : + oor = self._comedi.COMEDI_OOR_NUMBER + self._comedi.comedi_set_global_oor_behavior(oor) phys = self._comedi.comedi_to_phys(comedi, self._comedi_range[chan_index], self.maxdata[chan_index]) - if self.verbose : + if self.verbose : print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self.maxdata[chan_index]) return phys def phys_to_comedi(self, chan_index, phys) : diff --git a/pycomedi/simult_aio.py b/pycomedi/simult_aio.py index 50ef5b2..ec4ea4e 100644 --- a/pycomedi/simult_aio.py +++ b/pycomedi/simult_aio.py @@ -4,6 +4,7 @@ import comedi as c import common from numpy import array, fromstring, float32, pi, sin import int16_rw +import time # imports for testing from time import sleep @@ -16,7 +17,13 @@ VERBOSE = False AO_TRIGGERS_OFF_AI_START = True #AO_TRIGGERS_OFF_AI_START = False -class simAioError (common.pycomediError) : + +# HACK! outputting last point can cause jumps to random positions +# This is probably due to some clocking issue when we trigger AO off of +# the AI Start signal. +DONT_OUTPUT_LAST_SAMPLE_HACK = True + +class simAioError (Exception) : "Simultaneous Analog IO error" pass @@ -182,7 +189,8 @@ class AIO (object) : """ def __init__(self, filename="/dev/comedi0", in_subdevice=-1, in_chan=(0,), in_aref=(0,), in_range=(0,), - out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,)) : + out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,), + buffsize=32768) : """inputs: filename: comedi device file for your device ("/dev/comedi0"). And then for both input and output (in_* and out_*): @@ -204,6 +212,9 @@ class AIO (object) : """ self._comedi = c self._filename = filename + assert buffsize > 0 + assert buffsize % 2 == 0, "buffsize = %d is not even" % buffsize + self.buffsize = buffsize # the next section is much like the open() method below, # but in this one we set up all the extra details associated # with the AO and AI structures @@ -246,15 +257,34 @@ class AIO (object) : self._icmd = cmd(self.AI) self._ocmd = cmd(self.AO) self.state = "Initialized" - def setup(self, nsamps, freq, out_buffer) : + def genBuffer(self, nsamp, nchan=1, value=0) : + return array([value]*nsamp*nchan, dtype=int16_rw.DATA_T) + def setup(self, nsamps=None, freq=1.0, out_buffer=None) : if self.state != "Initialized" : raise simAioError, "Invalid state %s" % self.state + if out_buffer == None : # read-only command + assert self.AO == None + assert nsamps > 1 + if nsamps == None : + assert len(out_buffer) % self.AO.nchan == 0 + nsamps = int(len(out_buffer) / self.AO.nchan) if type(out_buffer) != type(_example_array) : raise simAioError, "out_buffer must be a numpy array, not a %s" % str(type(out_buffer)) + if DONT_OUTPUT_LAST_SAMPLE_HACK : + for i in range(1, self.AO.nchan+1) : # i in [1, ... ,nchan] + if out_buffer[-i] != out_buffer[-self.AO.nchan-i] : + raise simAioError, """To ensure that you are not suprised by the effects of the +DONT_OUTPUT_LAST_SAMPLE_HACK flag, please ensure that the last two +values samples output on each channel are the same.""" + onsamps = nsamps - 1 + else : + onsamps = nsamps + self._nsamps = nsamps self._ocmd.cmd.scan_begin_arg = int(1e9/freq) - self._ocmd.cmd.stop_arg = nsamps + self._ocmd.cmd.stop_arg = onsamps if VERBOSE : - print "Configure the board (%d ns per scan, %d samps)" % (self._ocmd.cmd.scan_begin_arg, self._ocmd.cmd.stop_arg) + print "Configure the board (%d ns per scan, %d samps)" % (self._icmd.cmd.scan_begin_arg, self._icmd.cmd.stop_arg) + self._obuffer = out_buffer self._onremain = nsamps self._ocmd.test_cmd() self._ocmd.execute() @@ -263,18 +293,14 @@ class AIO (object) : self._icmd.test_cmd() self._inremain = nsamps self._icmd.execute() + nsamps = min(self._onremain, self.buffsize) + offset = 0 if VERBOSE : print "Write %d output samples to the card" % (nsamps*self.AO.nchan) - rc = int16_rw.write_samples(self._fd, out_buffer, 0, nsamps*self.AO.nchan, 1) + rc = int16_rw.write_samples(self._fd, out_buffer, offset, nsamps*self.AO.nchan, 1) if rc != nsamps*self.AO.nchan : raise simAioError, "Error %d writing output buffer\n" % rc - if VERBOSE : - print "Writing extra output" - rc = int16_rw.write_samples(self._fd, out_buffer, (nsamps-1)*self.AO.nchan, self.AO.nchan, 1) # HACK, add an extra sample for each channel to the output buffer - if rc != self.AO.nchan : - raise simAioError, "Error %d writing hack output buffer\n" % rc - # Without the hack, output jumps back to 0V after the command completes - self._nsamps = nsamps + self._onremain -= nsamps self.state = "Setup" def arm(self) : if self.state != "Setup" : @@ -284,17 +310,37 @@ class AIO (object) : self._comedi_internal_trigger(self.AO.subdev) self.state = "Armed" def start_read(self, in_buffer) : + print "_nsamps", self._nsamps if self.state != "Armed" : raise simAioError, "Invalid state %s" % self.state + if len(in_buffer) < self._nsamps * self.AI.nchan : + raise simAioError, "in_buffer not long enough (size %d < required %d)" \ + % (len(in_buffer), self._nsamps * self.AI.nchan) + if VERBOSE : print "Start the run" self._comedi_internal_trigger(self.AI.subdev) - if VERBOSE : - print "Read %d input samples from the card" % (self._nsamps*self.AI.nchan) - rc = int16_rw.read_samples(self._fd, in_buffer, 0, self._nsamps*self.AI.nchan, -1) - if rc != self._nsamps*self.AI.nchan : - raise simAioError, "Error %d reading input buffer\n" % rc self.state = "Read" + while self._inremain > 0 : + # read half a buffer + nsamps = min(self._inremain, self.buffsize/2) + offset = (self._nsamps - self._inremain) * self.AI.nchan + if VERBOSE : + print "Read %d input samples from the card" % (nsamps*self.i_nchan) + rc = int16_rw.read_samples(self._fd, in_buffer, offset, nsamps*self.AI.nchan, 20) + if rc != nsamps*self.AI.nchan : + raise simAioError, "Error %d reading input buffer\n" % rc + self._inremain -= nsamps + # write half a buffer + nsamps = min(self._onremain, self.buffsize/2) + if nsamps > 0 : + offset = (self._nsamps - self._onremain) * self.AO.nchan + if VERBOSE : + print "Write %d output samples to the card" % (nsamps*self.AO.nchan) + rc = int16_rw.write_samples(self._fd, self._obuffer, offset, nsamps*self.AO.nchan, 20) + if rc != nsamps*self.AO.nchan : + raise simAioError, "Error %d writing output buffer\n" % rc + self._onremain -= nsamps def _comedi_internal_trigger(self, subdevice) : data = self._comedi.chanlist(1) # by luck, data is an array of lsampl_t (unsigned ints), as is chanlist insn = self._comedi.comedi_insn_struct() @@ -308,6 +354,11 @@ class AIO (object) : if VERBOSE : print "Reset the analog subdevices" # clean up after the read + self.AO.updateFlags() + self.AI.updateFlags() + # I would expect self.AO.flags.busy() to be False by this point, + # but after a write that does not seem to be the case. + # It doesn't seem to cause any harm to cancel things anyway... rc = self._comedi.comedi_cancel(self.dev, self.AO.subdev) if rc < 0 : self._comedi.comedi_perror("comedi_cancel") @@ -320,21 +371,30 @@ class AIO (object) : # define the test suite +# verbose +# 0 - no output +# 1 - print test names +# 2 - print test results +# 3 - print some details +# 4 - print lots of details -def _test_AIO(aio=None, start_wait=0, verbose=False) : - if (verbose) : +def _test_AIO(aio=None, start_wait=0, verbose=0) : + if verbose >= 1 : print "_test_AIO(start_wait = %g)" % start_wait - nsamps = 10 - out_data = array([0]*nsamps, dtype=int16_rw.DATA_T) - in_data = array([0]*nsamps, dtype=int16_rw.DATA_T) + nsamps = 20 + out_data = aio.genBuffer(nsamps) + in_data = aio.genBuffer(nsamps) + midpoint = int(aio.AO.maxdata[0]/2) + bitrange = float(midpoint/2) for i in range(nsamps) : - out_data[i] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps))) - aio.setup(10, 1000, out_data) + out_data[i] = int(midpoint+bitrange*sin(2*pi*i/float(nsamps))) + out_data[-2] = out_data[-1] = midpoint + aio.setup(freq=1000, out_buffer=out_data) aio.arm() sleep(start_wait) aio.start_read(in_data) aio.reset() - if (verbose) : + if verbose >= 4 : print "out_data:\n", out_data print "in_data:\n", in_data print "residual:\n[", @@ -343,9 +403,10 @@ def _test_AIO(aio=None, start_wait=0, verbose=False) : print "]" return (out_data, in_data) -def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=False) : - print "_repeat_aio_test()" - grads = array([0]*num_tests, dtype=float32) +def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=0) : + if verbose >= 1 : + print "_repeat_aio_test()" + grads = array([0]*num_tests, dtype=float32) # test input with `wrong' type good = 0 bad = 0 good_run = 0 @@ -354,7 +415,7 @@ def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=False) : out_data, in_data = _test_AIO(aio, start_wait) gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data) grads[i] = gradient - if verbose : + if verbose >= 4 : print "wait %2d, run %2d, gradient %g" % (start_wait, i, gradient) if gradient < .7 : bad += 1 @@ -364,58 +425,156 @@ def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=False) : good += 1 good_run += 1 good_run_arr.append(good_run) - print "failure rate %g%% in %d runs" % ((float(bad)/float(good+bad))*100.0, num_tests) - call = 'echo "' - for num in good_run_arr : - call += "%d " % num - call += '" | stem_leaf 2' - print "good run stem and leaf:" - system(call) + fail_rate = (float(bad)/float(good+bad))*100.0 + if verbose >= 2 : + print "failure rate %g%% in %d runs" % (fail_rate, num_tests) + call = 'echo "' + for num in good_run_arr : + call += "%d " % num + call += '" | stem_leaf 2' + print "good run stem and leaf:" + system(call) + return fail_rate -def _test_AIO_multi_chan(aio=None, start_wait=0, verbose=False) : - if (verbose) : +def _test_AIO_multi_chan(aio=None, start_wait=0, verbose=0) : + from sys import stdout + if verbose >= 1 : print "_test_AIO_multi_chan(start_wait = %g)" % start_wait - nsamps = 10 - out_data = array([0]*nsamps*aio.AO.nchan, dtype=int16_rw.DATA_T) - in_data = array([0]*nsamps*aio.AI.nchan, dtype=int16_rw.DATA_T) + nsamps = 100 + out_data = aio.genBuffer(nsamps, aio.AO.nchan) + in_data = aio.genBuffer(nsamps, aio.AI.nchan) # set up interleaved data + midpoint = int(aio.AO.maxdata[0]/2) + bitrange = float(midpoint/2) for i in range(nsamps) : - out_data[i*aio.AO.nchan] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps))) + out_data[i*aio.AO.nchan] = int(midpoint+bitrange*sin(2*pi*i/float(nsamps))) for j in range(1, aio.AO.nchan) : out_data[i*aio.AO.nchan + j] = 0 - aio.setup(10, 1000, out_data) + if DONT_OUTPUT_LAST_SAMPLE_HACK : + for ind in [-1,-1-aio.AO.nchan] : + for chan in range(aio.AO.nchan) : + out_data[ind-chan] = midpoint + aio.setup(freq=1000, out_buffer=out_data) aio.arm() sleep(start_wait) aio.start_read(in_data) aio.reset() - if (verbose) : - print "#", + #fid = file('/tmp/comedi_test.o', 'w') + fid = stdout + if verbose >= 4 : + print >> fid, "#", for j in range(aio.AO.nchan) : - print "%s\t" % aio.AO.chan[j], + print >> fid, "%s\t" % aio.AO.chan[j], for j in range(aio.AI.nchan) : - print "%s\t" % aio.AI.chan[j], + print >> fid, "%s\t" % aio.AI.chan[j], print "" for i in range(nsamps) : for j in range(aio.AO.nchan) : - print "%s\t" % out_data[i*aio.AO.nchan+j], + print >> fid, "%s\t" % out_data[i*aio.AO.nchan+j], for j in range(aio.AI.nchan) : - print "%s\t" % in_data[i*aio.AI.nchan+j], - print "" + print >> fid, "%s\t" % in_data[i*aio.AI.nchan+j], + print >> fid, "" return (out_data, in_data) +def _test_big_bufs(aio=None, freq=100e3, verbose=False) : + if verbose >= 1 : + print "_test_big_bufs()" + if aio == None : + our_aio = True + aio = AIO(in_chan=(1,), out_chan=(0,)) + else : + our_aio = False + nsamps = int(100e3) + out_data = aio.genBuffer(nsamps, aio.AO.nchan) + midpoint = int(aio.AO.maxdata[0]/2) + bitrange = float(midpoint/2) + for i in range(nsamps) : + out_data[i] = int(sin(2*pi*i/float(nsamps))*bitrange) + midpoint + if DONT_OUTPUT_LAST_SAMPLE_HACK : + out_data[-2] = out_data[-1] = midpoint + in_data = aio.genBuffer(nsamps, aio.AO.nchan) + aio.setup(freq=freq, out_buffer=out_data) + aio.arm() + aio.start_read(in_data) + aio.reset() + if our_aio : + aio.close() + del(aio) + +def _test_output_persistence(freq=100, verbose=False) : + import single_aio + if verbose >= 1 : + print "_test_output_persistence()" + aio = AIO(in_chan=(0,), out_chan=(0,)) + aio.close() + ai = single_aio.AI(chan=(0,)) + ai.close() + + def simult_set_voltage(aio, range_fraction=None, physical_value=None, freq=freq, verbose=False) : + "use either range_fraction or physical_value, not both." + aio.open() + if range_fraction != None : + assert physical_value == None + assert range_fraction >= 0 and range_fraction <= 1 + out_val = int(range_fraction*aio.AO.maxdata[0]) + physical_value = aio.AO.comedi_to_phys(chan_index=0, comedi=out_val, useNAN=False) + else : + assert physical_value != None + out_val = aio.AO.phys_to_comedi(chan_index=0, phys=physical_value) + if verbose >= 3 : + print "Output : %d = %g V" % (out_val, physical_value) + nsamps = int(int(freq)) + out_data = aio.genBuffer(nsamps, aio.AO.nchan, value=out_val) + in_data = aio.genBuffer(nsamps, aio.AO.nchan) + aio.setup(freq=freq, out_buffer=out_data) + aio.arm() + aio.start_read(in_data) + aio.reset() + # by this point the output returns to 0 V when + # DONT_OUTPUT_LATH_SAMPLE_HACK == False + time.sleep(5) + aio.close() + if verbose >= 4 : + print "Output complete" + print in_data + return physical_value + def single_get_voltage(ai, verbose=False) : + ai.open() + in_val = ai.read()[0] + ai.close() + in_phys = ai.comedi_to_phys(chan_index=0, comedi=in_val, useNAN=False) + if verbose >= 3 : + print "Input : %d = %g V" % (in_val, in_phys) + return in_phys + def tp(aio, ai, range_fraction, verbose=False) : + out_phys = simult_set_voltage(aio, range_fraction=range_fraction, verbose=verbose) + + # It helps me to play a sound so I know where the test is + # while confirming the output on an oscilliscope. + #system("aplay /home/wking/Music/system/sonar.wav") + time.sleep(1) + in_phys = single_get_voltage(ai, verbose) + assert abs((in_phys-out_phys)/out_phys) < 0.1, "Output %g V, but input %g V" % (out_phys, in_phys) + + tp(aio,ai,0,verbose) + tp(aio,ai,1,verbose) + simult_set_voltage(aio,physical_value=0.0,verbose=verbose) -def test() : +def test(verbose=2) : aio = AIO(in_chan=(0,), out_chan=(0,)) - _test_AIO(aio, start_wait = 0, verbose=True) - _test_AIO(aio, start_wait = 0.5, verbose=True) + _test_AIO(aio, start_wait = 0, verbose=verbose) + _test_AIO(aio, start_wait = 0.5, verbose=verbose) aio.close() aio.open() - _repeat_aio_test(aio, num_tests=500, start_wait=0, verbose=False) + _repeat_aio_test(aio, num_tests=100, start_wait=0, verbose=verbose) + _test_big_bufs(aio, verbose=verbose) aio.close() - + aiom = AIO(in_chan=(0,1,2,3), out_chan=(0,1)) - _test_AIO_multi_chan(aiom, start_wait = 0, verbose=True) + _test_AIO_multi_chan(aiom, start_wait = 0, verbose=verbose) + del(aiom) + _test_output_persistence(verbose=verbose) if __name__ == "__main__" : test() -- 2.26.2