"""Some Comedi operations common to analog and digital IO"""
import comedi as c
+import time
VERSION = 0.1
tup = temp_tup
return tup
+class SubdeviceFlags (object) :
+ """
+ Descriptions from http://www.comedi.org/doc/r5153.html
+ (the comedi_get_subdevice_flags documentation)
+
+ SDF_BUSY:
+ The subdevice is busy performing an asynchronous command. A
+ subdevice being "busy" is slightly different from the "running"
+ state flagged by SDF_RUNNING. A "running" subdevice is always
+ "busy", but a "busy" subdevice is not necessarily "running". For
+ example, suppose an analog input command has been completed by the
+ hardware, but there are still samples in Comedi's buffer waiting
+ to be read out. In this case, the subdevice is not "running", but
+ is still "busy" until all the samples are read out or
+ comedi_cancel() is called.
+
+ SDF_BUSY_OWNER:
+ The subdevice is "Busy", and the command it is running was started
+ by the current process.
+
+ SDF_LOCKED:
+ The subdevice has been locked by comedi_lock().
+
+ SDF_LOCK_OWNER:
+ The subdevice is locked, and was locked by the current process.
+
+ SDF_MAXDATA:
+ The maximum data value for the subdevice depends on the channel.
+
+ SDF_FLAGS:
+ The subdevice flags depend on the channel (unfinished/broken
+ support in library).
+
+ SDF_RANGETYPE:
+ The range type depends on the channel.
+
+ SDF_CMD:
+ The subdevice supports asynchronous commands.
+
+ SDF_SOFT_CALIBRATED:
+ The subdevice relies on the host to do calibration in software.
+ Software calibration coefficients are determined by the
+ comedi_soft_calibrate utility. See the description of the
+ comedi_get_softcal_converter() function for more information.
+
+ SDF_READABLE:
+ The subdevice can be read (e.g. analog input).
+
+ SDF_WRITABLE:
+ The subdevice can be written to (e.g. analog output).
+
+ SDF_INTERNAL:
+ The subdevice does not have externally visible lines.
+
+ SDF_GROUND:
+ The subdevice supports AREF_GROUND.
+
+ SDF_COMMON:
+ The subdevice supports AREF_COMMON.
+
+ SDF_DIFF:
+ The subdevice supports AREF_DIFF.
+
+ SDF_OTHER:
+ The subdevice supports AREF_OTHER
+
+ SDF_DITHER:
+ The subdevice supports dithering (via the CR_ALT_FILTER chanspec
+ flag).
+
+ SDF_DEGLITCH:
+ The subdevice supports deglitching (via the CR_ALT_FILTER chanspec
+ flag).
+
+ SDF_RUNNING:
+ An asynchronous command is running. You can use this flag to poll
+ for the completion of an output command.
+
+ SDF_LSAMPL:
+ The subdevice uses the 32 bit lsampl_t type instead of the 16 bit
+ sampl_t for asynchronous command data.
+
+ SDF_PACKED:
+ The subdevice uses bitfield samples for asynchronous command data,
+ one bit per channel (otherwise it uses one sampl_t or lsampl_t per
+ channel). Commonly used for digital subdevices.
+ """
+ def __init__(self, comedi) :
+ self.lastUpdate = None
+ self._flags = None
+ self._comedi = comedi
+ def update(self, dev, subdev) :
+ "Must be called on an open device"
+ self._flags = self._comedi.comedi_get_subdevice_flags(dev, subdev)
+ self.lastupdate = time.time()
+ def _check(self, sdf_flag) :
+ assert self._flags != None, "Cannot return status of uninitialized flag"
+ if self._flags & sdf_flag :
+ return True
+ return False
+ def busy(self) :
+ return(self._check(self._comedi.SDF_BUSY))
+ def busyOwner(self) :
+ return(self._check(self._comedi.SDF_BUSY_OWNER))
+ def locked(self) :
+ return(self._check(self._comedi.SDF_LOCKED))
+ def lockOwner(self) :
+ return(self._check(self._comedi.SDF_LOCK_OWNER))
+ def maxdata(self) :
+ return(self._check(self._comedi.SDF_MAXDATA))
+ def flags(self) :
+ return(self._check(self._comedi.SDF_FLAGS))
+ def rangetype(self) :
+ return(self._check(self._comedi.SDF_RANGETYPE))
+ def cmd(self) :
+ return(self._check(self._comedi.SDF_CMD))
+ def softCalibrated(self) :
+ return(self._check(self._comedi.SDF_SOFT_CALIBRATED))
+ def readable(self) :
+ return(self._check(self._comedi.SDF_READABLE))
+ def writable(self) :
+ return(self._check(self._comedi.SDF_WRITABLE))
+ def internal(self) :
+ return(self._check(self._comedi.SDF_INTERNAL))
+ def ground(self) :
+ return(self._check(self._comedi.SDF_GROUND))
+ def common(self) :
+ return(self._check(self._comedi.SDF_COMMON))
+ def diff(self) :
+ return(self._check(self._comedi.SDF_DIFF))
+ def other(self) :
+ return(self._check(self._comedi.SDF_OTHER))
+ def dither(self) :
+ return(self._check(self._comedi.SDF_DITHER))
+ def deglitch(self) :
+ return(self._check(self._comedi.SDF_DEGLITCH))
+ def running(self) :
+ return(self._check(self._comedi.SDF_RUNNING))
+ def lsampl(self) :
+ return(self._check(self._comedi.SDF_LSAMPL))
+ def packed(self) :
+ return(self._check(self._comedi.SDF_PACKED))
+ def __str__(self) :
+ s = ""
+ s += "busy : %s\n" % self.busy()
+ s += "busyOwner : %s\n" % self.busyOwner()
+ s += "locked : %s\n" % self.locked()
+ s += "lockedOwner : %s\n" % self.lockOwner()
+ s += "maxdata : %s\n" % self.maxdata()
+ s += "flags : %s\n" % self.flags()
+ s += "rangetype : %s\n" % self.rangetype()
+ s += "cmd : %s\n" % self.cmd()
+ s += "softCalibrated : %s\n" % self.softCalibrated()
+ s += "readable : %s\n" % self.readable()
+ s += "writable : %s\n" % self.writable()
+ s += "internal : %s\n" % self.internal()
+ s += "ground : %s\n" % self.ground()
+ s += "common : %s\n" % self.common()
+ s += "diff : %s\n" % self.diff()
+ s += "other : %s\n" % self.other()
+ s += "dither : %s\n" % self.dither()
+ s += "deglitch : %s\n" % self.deglitch()
+ s += "running : %s\n" % self.running()
+ s += "lsampl : %s\n" % self.lsampl()
+ s += "packed : %s" % self.packed()
+ return s
+
class PyComediIO (object) :
"Base class for Comedi IO operations"
def __init__(self, filename="/dev/comedi0", subdevice=-1, devtype=c.COMEDI_SUBD_AI, chan=(0,1,2,3), aref=(c.AREF_GROUND,), range=(0,), output=False, dev=None) :
if type < 0 :
self._comedi.comedi_perror("comedi_get_subdevice_type")
raise pycomediError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
+ self.flags = SubdeviceFlags(self._comedi)
+ self.updateFlags()
+ def updateFlags(self) :
+ self.flags.update(self.dev, self.subdev)
def _setup_channels(self, chan, aref, rng) :
"""check that the specified channels exists, and that the arefs and
ranges are legal for those channels. Also allocate a range
comrange_copy.unit = comrange.unit
self._comedi_range.append(comrange_copy)
self.cr_chan[i] = self._comedi.cr_pack(chan, rng, aref)
- def comedi_to_phys(self, chan_index, comedi) :
+ def comedi_to_phys(self, chan_index, comedi, useNAN=True) :
+ if useNAN == True :
+ oor = self._comedi.COMEDI_OOR_NAN
+ else :
+ oor = self._comedi.COMEDI_OOR_NUMBER
+ self._comedi.comedi_set_global_oor_behavior(oor)
phys = self._comedi.comedi_to_phys(comedi, self._comedi_range[chan_index], self.maxdata[chan_index])
- if self.verbose :
+ if self.verbose :
print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self.maxdata[chan_index])
return phys
def phys_to_comedi(self, chan_index, phys) :
import common
from numpy import array, fromstring, float32, pi, sin
import int16_rw
+import time
# imports for testing
from time import sleep
AO_TRIGGERS_OFF_AI_START = True
#AO_TRIGGERS_OFF_AI_START = False
-class simAioError (common.pycomediError) :
+
+# HACK! outputting last point can cause jumps to random positions
+# This is probably due to some clocking issue when we trigger AO off of
+# the AI Start signal.
+DONT_OUTPUT_LAST_SAMPLE_HACK = True
+
+class simAioError (Exception) :
"Simultaneous Analog IO error"
pass
"""
def __init__(self, filename="/dev/comedi0",
in_subdevice=-1, in_chan=(0,), in_aref=(0,), in_range=(0,),
- out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,)) :
+ out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,),
+ buffsize=32768) :
"""inputs:
filename: comedi device file for your device ("/dev/comedi0").
And then for both input and output (in_* and out_*):
"""
self._comedi = c
self._filename = filename
+ assert buffsize > 0
+ assert buffsize % 2 == 0, "buffsize = %d is not even" % buffsize
+ self.buffsize = buffsize
# the next section is much like the open() method below,
# but in this one we set up all the extra details associated
# with the AO and AI structures
self._icmd = cmd(self.AI)
self._ocmd = cmd(self.AO)
self.state = "Initialized"
- def setup(self, nsamps, freq, out_buffer) :
+ def genBuffer(self, nsamp, nchan=1, value=0) :
+ return array([value]*nsamp*nchan, dtype=int16_rw.DATA_T)
+ def setup(self, nsamps=None, freq=1.0, out_buffer=None) :
if self.state != "Initialized" :
raise simAioError, "Invalid state %s" % self.state
+ if out_buffer == None : # read-only command
+ assert self.AO == None
+ assert nsamps > 1
+ if nsamps == None :
+ assert len(out_buffer) % self.AO.nchan == 0
+ nsamps = int(len(out_buffer) / self.AO.nchan)
if type(out_buffer) != type(_example_array) :
raise simAioError, "out_buffer must be a numpy array, not a %s" % str(type(out_buffer))
+ if DONT_OUTPUT_LAST_SAMPLE_HACK :
+ for i in range(1, self.AO.nchan+1) : # i in [1, ... ,nchan]
+ if out_buffer[-i] != out_buffer[-self.AO.nchan-i] :
+ raise simAioError, """To ensure that you are not suprised by the effects of the
+DONT_OUTPUT_LAST_SAMPLE_HACK flag, please ensure that the last two
+values samples output on each channel are the same."""
+ onsamps = nsamps - 1
+ else :
+ onsamps = nsamps
+ self._nsamps = nsamps
self._ocmd.cmd.scan_begin_arg = int(1e9/freq)
- self._ocmd.cmd.stop_arg = nsamps
+ self._ocmd.cmd.stop_arg = onsamps
if VERBOSE :
- print "Configure the board (%d ns per scan, %d samps)" % (self._ocmd.cmd.scan_begin_arg, self._ocmd.cmd.stop_arg)
+ print "Configure the board (%d ns per scan, %d samps)" % (self._icmd.cmd.scan_begin_arg, self._icmd.cmd.stop_arg)
+ self._obuffer = out_buffer
self._onremain = nsamps
self._ocmd.test_cmd()
self._ocmd.execute()
self._icmd.test_cmd()
self._inremain = nsamps
self._icmd.execute()
+ nsamps = min(self._onremain, self.buffsize)
+ offset = 0
if VERBOSE :
print "Write %d output samples to the card" % (nsamps*self.AO.nchan)
- rc = int16_rw.write_samples(self._fd, out_buffer, 0, nsamps*self.AO.nchan, 1)
+ rc = int16_rw.write_samples(self._fd, out_buffer, offset, nsamps*self.AO.nchan, 1)
if rc != nsamps*self.AO.nchan :
raise simAioError, "Error %d writing output buffer\n" % rc
- if VERBOSE :
- print "Writing extra output"
- rc = int16_rw.write_samples(self._fd, out_buffer, (nsamps-1)*self.AO.nchan, self.AO.nchan, 1) # HACK, add an extra sample for each channel to the output buffer
- if rc != self.AO.nchan :
- raise simAioError, "Error %d writing hack output buffer\n" % rc
- # Without the hack, output jumps back to 0V after the command completes
- self._nsamps = nsamps
+ self._onremain -= nsamps
self.state = "Setup"
def arm(self) :
if self.state != "Setup" :
self._comedi_internal_trigger(self.AO.subdev)
self.state = "Armed"
def start_read(self, in_buffer) :
+ print "_nsamps", self._nsamps
if self.state != "Armed" :
raise simAioError, "Invalid state %s" % self.state
+ if len(in_buffer) < self._nsamps * self.AI.nchan :
+ raise simAioError, "in_buffer not long enough (size %d < required %d)" \
+ % (len(in_buffer), self._nsamps * self.AI.nchan)
+
if VERBOSE :
print "Start the run"
self._comedi_internal_trigger(self.AI.subdev)
- if VERBOSE :
- print "Read %d input samples from the card" % (self._nsamps*self.AI.nchan)
- rc = int16_rw.read_samples(self._fd, in_buffer, 0, self._nsamps*self.AI.nchan, -1)
- if rc != self._nsamps*self.AI.nchan :
- raise simAioError, "Error %d reading input buffer\n" % rc
self.state = "Read"
+ while self._inremain > 0 :
+ # read half a buffer
+ nsamps = min(self._inremain, self.buffsize/2)
+ offset = (self._nsamps - self._inremain) * self.AI.nchan
+ if VERBOSE :
+ print "Read %d input samples from the card" % (nsamps*self.i_nchan)
+ rc = int16_rw.read_samples(self._fd, in_buffer, offset, nsamps*self.AI.nchan, 20)
+ if rc != nsamps*self.AI.nchan :
+ raise simAioError, "Error %d reading input buffer\n" % rc
+ self._inremain -= nsamps
+ # write half a buffer
+ nsamps = min(self._onremain, self.buffsize/2)
+ if nsamps > 0 :
+ offset = (self._nsamps - self._onremain) * self.AO.nchan
+ if VERBOSE :
+ print "Write %d output samples to the card" % (nsamps*self.AO.nchan)
+ rc = int16_rw.write_samples(self._fd, self._obuffer, offset, nsamps*self.AO.nchan, 20)
+ if rc != nsamps*self.AO.nchan :
+ raise simAioError, "Error %d writing output buffer\n" % rc
+ self._onremain -= nsamps
def _comedi_internal_trigger(self, subdevice) :
data = self._comedi.chanlist(1) # by luck, data is an array of lsampl_t (unsigned ints), as is chanlist
insn = self._comedi.comedi_insn_struct()
if VERBOSE :
print "Reset the analog subdevices"
# clean up after the read
+ self.AO.updateFlags()
+ self.AI.updateFlags()
+ # I would expect self.AO.flags.busy() to be False by this point,
+ # but after a write that does not seem to be the case.
+ # It doesn't seem to cause any harm to cancel things anyway...
rc = self._comedi.comedi_cancel(self.dev, self.AO.subdev)
if rc < 0 :
self._comedi.comedi_perror("comedi_cancel")
# define the test suite
+# verbose
+# 0 - no output
+# 1 - print test names
+# 2 - print test results
+# 3 - print some details
+# 4 - print lots of details
-def _test_AIO(aio=None, start_wait=0, verbose=False) :
- if (verbose) :
+def _test_AIO(aio=None, start_wait=0, verbose=0) :
+ if verbose >= 1 :
print "_test_AIO(start_wait = %g)" % start_wait
- nsamps = 10
- out_data = array([0]*nsamps, dtype=int16_rw.DATA_T)
- in_data = array([0]*nsamps, dtype=int16_rw.DATA_T)
+ nsamps = 20
+ out_data = aio.genBuffer(nsamps)
+ in_data = aio.genBuffer(nsamps)
+ midpoint = int(aio.AO.maxdata[0]/2)
+ bitrange = float(midpoint/2)
for i in range(nsamps) :
- out_data[i] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
- aio.setup(10, 1000, out_data)
+ out_data[i] = int(midpoint+bitrange*sin(2*pi*i/float(nsamps)))
+ out_data[-2] = out_data[-1] = midpoint
+ aio.setup(freq=1000, out_buffer=out_data)
aio.arm()
sleep(start_wait)
aio.start_read(in_data)
aio.reset()
- if (verbose) :
+ if verbose >= 4 :
print "out_data:\n", out_data
print "in_data:\n", in_data
print "residual:\n[",
print "]"
return (out_data, in_data)
-def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=False) :
- print "_repeat_aio_test()"
- grads = array([0]*num_tests, dtype=float32)
+def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=0) :
+ if verbose >= 1 :
+ print "_repeat_aio_test()"
+ grads = array([0]*num_tests, dtype=float32) # test input with `wrong' type
good = 0
bad = 0
good_run = 0
out_data, in_data = _test_AIO(aio, start_wait)
gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
grads[i] = gradient
- if verbose :
+ if verbose >= 4 :
print "wait %2d, run %2d, gradient %g" % (start_wait, i, gradient)
if gradient < .7 :
bad += 1
good += 1
good_run += 1
good_run_arr.append(good_run)
- print "failure rate %g%% in %d runs" % ((float(bad)/float(good+bad))*100.0, num_tests)
- call = 'echo "'
- for num in good_run_arr :
- call += "%d " % num
- call += '" | stem_leaf 2'
- print "good run stem and leaf:"
- system(call)
+ fail_rate = (float(bad)/float(good+bad))*100.0
+ if verbose >= 2 :
+ print "failure rate %g%% in %d runs" % (fail_rate, num_tests)
+ call = 'echo "'
+ for num in good_run_arr :
+ call += "%d " % num
+ call += '" | stem_leaf 2'
+ print "good run stem and leaf:"
+ system(call)
+ return fail_rate
-def _test_AIO_multi_chan(aio=None, start_wait=0, verbose=False) :
- if (verbose) :
+def _test_AIO_multi_chan(aio=None, start_wait=0, verbose=0) :
+ from sys import stdout
+ if verbose >= 1 :
print "_test_AIO_multi_chan(start_wait = %g)" % start_wait
- nsamps = 10
- out_data = array([0]*nsamps*aio.AO.nchan, dtype=int16_rw.DATA_T)
- in_data = array([0]*nsamps*aio.AI.nchan, dtype=int16_rw.DATA_T)
+ nsamps = 100
+ out_data = aio.genBuffer(nsamps, aio.AO.nchan)
+ in_data = aio.genBuffer(nsamps, aio.AI.nchan)
# set up interleaved data
+ midpoint = int(aio.AO.maxdata[0]/2)
+ bitrange = float(midpoint/2)
for i in range(nsamps) :
- out_data[i*aio.AO.nchan] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
+ out_data[i*aio.AO.nchan] = int(midpoint+bitrange*sin(2*pi*i/float(nsamps)))
for j in range(1, aio.AO.nchan) :
out_data[i*aio.AO.nchan + j] = 0
- aio.setup(10, 1000, out_data)
+ if DONT_OUTPUT_LAST_SAMPLE_HACK :
+ for ind in [-1,-1-aio.AO.nchan] :
+ for chan in range(aio.AO.nchan) :
+ out_data[ind-chan] = midpoint
+ aio.setup(freq=1000, out_buffer=out_data)
aio.arm()
sleep(start_wait)
aio.start_read(in_data)
aio.reset()
- if (verbose) :
- print "#",
+ #fid = file('/tmp/comedi_test.o', 'w')
+ fid = stdout
+ if verbose >= 4 :
+ print >> fid, "#",
for j in range(aio.AO.nchan) :
- print "%s\t" % aio.AO.chan[j],
+ print >> fid, "%s\t" % aio.AO.chan[j],
for j in range(aio.AI.nchan) :
- print "%s\t" % aio.AI.chan[j],
+ print >> fid, "%s\t" % aio.AI.chan[j],
print ""
for i in range(nsamps) :
for j in range(aio.AO.nchan) :
- print "%s\t" % out_data[i*aio.AO.nchan+j],
+ print >> fid, "%s\t" % out_data[i*aio.AO.nchan+j],
for j in range(aio.AI.nchan) :
- print "%s\t" % in_data[i*aio.AI.nchan+j],
- print ""
+ print >> fid, "%s\t" % in_data[i*aio.AI.nchan+j],
+ print >> fid, ""
return (out_data, in_data)
+def _test_big_bufs(aio=None, freq=100e3, verbose=False) :
+ if verbose >= 1 :
+ print "_test_big_bufs()"
+ if aio == None :
+ our_aio = True
+ aio = AIO(in_chan=(1,), out_chan=(0,))
+ else :
+ our_aio = False
+ nsamps = int(100e3)
+ out_data = aio.genBuffer(nsamps, aio.AO.nchan)
+ midpoint = int(aio.AO.maxdata[0]/2)
+ bitrange = float(midpoint/2)
+ for i in range(nsamps) :
+ out_data[i] = int(sin(2*pi*i/float(nsamps))*bitrange) + midpoint
+ if DONT_OUTPUT_LAST_SAMPLE_HACK :
+ out_data[-2] = out_data[-1] = midpoint
+ in_data = aio.genBuffer(nsamps, aio.AO.nchan)
+ aio.setup(freq=freq, out_buffer=out_data)
+ aio.arm()
+ aio.start_read(in_data)
+ aio.reset()
+ if our_aio :
+ aio.close()
+ del(aio)
+
+def _test_output_persistence(freq=100, verbose=False) :
+ import single_aio
+ if verbose >= 1 :
+ print "_test_output_persistence()"
+ aio = AIO(in_chan=(0,), out_chan=(0,))
+ aio.close()
+ ai = single_aio.AI(chan=(0,))
+ ai.close()
+
+ def simult_set_voltage(aio, range_fraction=None, physical_value=None, freq=freq, verbose=False) :
+ "use either range_fraction or physical_value, not both."
+ aio.open()
+ if range_fraction != None :
+ assert physical_value == None
+ assert range_fraction >= 0 and range_fraction <= 1
+ out_val = int(range_fraction*aio.AO.maxdata[0])
+ physical_value = aio.AO.comedi_to_phys(chan_index=0, comedi=out_val, useNAN=False)
+ else :
+ assert physical_value != None
+ out_val = aio.AO.phys_to_comedi(chan_index=0, phys=physical_value)
+ if verbose >= 3 :
+ print "Output : %d = %g V" % (out_val, physical_value)
+ nsamps = int(int(freq))
+ out_data = aio.genBuffer(nsamps, aio.AO.nchan, value=out_val)
+ in_data = aio.genBuffer(nsamps, aio.AO.nchan)
+ aio.setup(freq=freq, out_buffer=out_data)
+ aio.arm()
+ aio.start_read(in_data)
+ aio.reset()
+ # by this point the output returns to 0 V when
+ # DONT_OUTPUT_LATH_SAMPLE_HACK == False
+ time.sleep(5)
+ aio.close()
+ if verbose >= 4 :
+ print "Output complete"
+ print in_data
+ return physical_value
+ def single_get_voltage(ai, verbose=False) :
+ ai.open()
+ in_val = ai.read()[0]
+ ai.close()
+ in_phys = ai.comedi_to_phys(chan_index=0, comedi=in_val, useNAN=False)
+ if verbose >= 3 :
+ print "Input : %d = %g V" % (in_val, in_phys)
+ return in_phys
+ def tp(aio, ai, range_fraction, verbose=False) :
+ out_phys = simult_set_voltage(aio, range_fraction=range_fraction, verbose=verbose)
+
+ # It helps me to play a sound so I know where the test is
+ # while confirming the output on an oscilliscope.
+ #system("aplay /home/wking/Music/system/sonar.wav")
+ time.sleep(1)
+ in_phys = single_get_voltage(ai, verbose)
+ assert abs((in_phys-out_phys)/out_phys) < 0.1, "Output %g V, but input %g V" % (out_phys, in_phys)
+
+ tp(aio,ai,0,verbose)
+ tp(aio,ai,1,verbose)
+ simult_set_voltage(aio,physical_value=0.0,verbose=verbose)
-def test() :
+def test(verbose=2) :
aio = AIO(in_chan=(0,), out_chan=(0,))
- _test_AIO(aio, start_wait = 0, verbose=True)
- _test_AIO(aio, start_wait = 0.5, verbose=True)
+ _test_AIO(aio, start_wait = 0, verbose=verbose)
+ _test_AIO(aio, start_wait = 0.5, verbose=verbose)
aio.close()
aio.open()
- _repeat_aio_test(aio, num_tests=500, start_wait=0, verbose=False)
+ _repeat_aio_test(aio, num_tests=100, start_wait=0, verbose=verbose)
+ _test_big_bufs(aio, verbose=verbose)
aio.close()
-
+
aiom = AIO(in_chan=(0,1,2,3), out_chan=(0,1))
- _test_AIO_multi_chan(aiom, start_wait = 0, verbose=True)
+ _test_AIO_multi_chan(aiom, start_wait = 0, verbose=verbose)
+ del(aiom)
+ _test_output_persistence(verbose=verbose)
if __name__ == "__main__" :
test()