No changes?
authorW. Trevor King <wking@drexel.edu>
Sat, 4 Oct 2008 22:35:29 +0000 (18:35 -0400)
committerW. Trevor King <wking@drexel.edu>
Sat, 4 Oct 2008 22:35:29 +0000 (18:35 -0400)
pycomedi/comedi_simult_aio.py [deleted file]
pycomedi/comedi_single_aio.py [deleted file]
pycomedi/comedi_single_dio.py [deleted file]

diff --git a/pycomedi/comedi_simult_aio.py b/pycomedi/comedi_simult_aio.py
deleted file mode 100644 (file)
index 303b72b..0000000
+++ /dev/null
@@ -1,400 +0,0 @@
-# Simultaneous, finite, buffered analog inpout/output using comedi drivers
-
-import comedi
-from numpy import array, fromstring, uint16, float32, pi, sin
-import int16_rw
-
-# imports for testing
-from time import sleep
-from scipy.stats import linregress
-from os import system
-
-#VERBOSE = True
-VERBOSE = False
-AO_TRIGGERS_OFF_AI_START = True
-#AO_TRIGGERS_OFF_AI_START = False
-
-class simAioError (Exception) :
-    "Simultaneous Analog IO error"
-    pass
-
-_example_array = array([0], dtype=uint16) # for typing, since I don't know what type(array) should be
-
-_cmdtest_message = ["success",
-                     "invalid source",
-                     "source conflict",
-                     "invalid argument",
-                     "argument conflict",
-                     "invalid chanlist"]
-
-def _print_cmdsrc(source) :
-    if source & comedi.TRIG_NONE : print "none|",
-    if source & comedi.TRIG_NOW : print "now|",
-    if source & comedi.TRIG_FOLLOW : print "follow|",
-    if source & comedi.TRIG_TIME : print "time|",
-    if source & comedi.TRIG_TIMER : print "timer|",
-    if source & comedi.TRIG_COUNT : print "count|",
-    if source & comedi.TRIG_EXT : print "ext|",
-    if source & comedi.TRIG_INT : print "int|",
-    if source & comedi.TRIG_OTHER : print "other|",
-
-def _print_command(cmd) :
-    print "subdevice: \t%d" % cmd.subdev
-    print "flags:     \t0x%x" % cmd.flags
-    print "start:     \t",
-    _print_cmdsrc(cmd.start_src)
-    print "\t%d" % cmd.start_arg
-    print "scan_begin:\t",
-    _print_cmdsrc(cmd.scan_begin_src)
-    print "\t%d" % cmd.scan_begin_arg
-    print "convert:   \t",
-    _print_cmdsrc(cmd.convert_src)
-    print "\t%d" % cmd.convert_arg
-    print "scan_end:  \t",
-    _print_cmdsrc(cmd.scan_end_src)
-    print "\t%d" % cmd.scan_end_arg
-    print "stop:      \t",
-    _print_cmdsrc(cmd.stop_src)
-    print "\t%d" % cmd.stop_arg
-
-def _expand_tuple(tup, length) : 
-    "Expand an iterable TUP to a tuple of LENGTH by repeating the last element"
-    if len(tup) > length :
-        raise simAioError, "Tuple too long."
-    elif len(tup) < length :
-        temp_tup = tup + tuple((tup[-1],)*(length-len(tup)))
-        tup = temp_tup
-    return tup
-
-class aio_obj :
-    def __init__(self, filename="/dev/comedi0",
-                in_subdevice=-1, in_chan=(0,), in_aref=(0,), in_range=(0,),
-                out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,)) :
-        self._comedi = comedi
-        self._filename = filename
-        self.state = "Closed"
-        self.open()
-
-        self._iaref = _expand_tuple(in_aref, len(in_chan))
-        self._irange = _expand_tuple(in_range, len(in_chan))
-        temp = self._check_options(in_subdevice, in_chan, self._iaref, self._irange, output=False)
-        self._isubdev = temp["subdevice"]
-        self._ichan_params = temp["chan_params"]
-        self._ichan = in_chan
-        self.i_nchan = len(self._ichan)
-        self._ichanlist = self._comedi.chanlist(self.i_nchan)
-        for i in range(self.i_nchan) :
-            self._ichanlist[i] = self._comedi.cr_pack(self._ichan[i], self._irange[i], self._iaref[i])
-
-        self._oaref = _expand_tuple(out_aref, len(in_chan))
-        self._orange = _expand_tuple(out_range, len(in_chan))
-        temp = self._check_options(out_subdevice, out_chan, self._oaref, self._orange, output=True)
-        self._osubdev = temp["subdevice"]
-        self._ochan_params = temp["chan_params"]
-        self._ochan = out_chan
-        self.o_nchan = len(self._ochan)
-        self._ochanlist = self._comedi.chanlist(self.o_nchan)
-        for i in range(self.o_nchan) :
-            self._ochanlist[i] = self._comedi.cr_pack(self._ochan[i], self._orange[i], self._oaref[i])
-
-        self._gen_rough_output_cmd()
-        self._gen_rough_input_cmd()
-        self.state = "Initialized"
-    def __del__(self) :
-        self.close()
-    def close(self) :
-        if self.state != "Closed" :
-            self.reset(force=True)
-            rc = self._comedi.comedi_close(self._dev)
-            if rc < 0 :
-                self._comedi.comedi_perror("comedi_close")
-                raise simAioError, "Cannot close %s" % self._filename
-            if VERBOSE :
-                print "Closed %s on fd %d" % (self._filename, self._fd)
-            self.state = "Closed"
-    def open(self) :
-        if self.state != "Closed" :
-            raise simAioError, "Invalid state %s" % self.state
-        self._dev = self._comedi.comedi_open(self._filename)
-        self._fd = self._comedi.comedi_fileno(self._dev)
-        if VERBOSE :
-            print "Opened %s on fd %d" % (self._filename, self._fd)
-        self.state = "Initialized"
-    def _check_options(self, subdevice, chan, aref, rnge, output=True) :
-        subdevice = self._check_subdevice(subdevice, output=output)
-        chan_params = []
-        for i in range(len(chan)) :
-            chan_params.append(self._check_chan(subdevice, chan[i], aref[i], rnge[i]))
-        if VERBOSE :
-            if output :
-                print "Output",
-            else :
-                print "Input",
-            print " subdevice with channels %s is valid" % (str(chan))
-        return {"subdevice":subdevice,
-                "chan_params":chan_params}
-    def _check_subdevice(self, subdevice, output=True) :
-        if output == True :
-            target_type = self._comedi.COMEDI_SUBD_AO
-        else :
-            target_type = self._comedi.COMEDI_SUBD_AI
-        if (subdevice < 0) : # autodetect an input device
-            subdevice = self._comedi.comedi_find_subdevice_by_type(self._dev, target_type, 0) # 0 is starting subdevice
-        else :
-            type = self._comedi.comedi_get_subdevice_type(self._dev, subdevice)
-            if type != target_type :
-                raise simAioError, "Comedi subdevice %d has wrong type %d" % (subdevice, type)
-        return subdevice
-    def _check_chan(self, subdevice, chan, aref, range) :
-        subdev_n_chan = self._comedi.comedi_get_n_channels(self._dev, subdevice)
-        if chan >= subdev_n_chan :
-            raise simAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, subdevice, subdev_n_chan-1)
-        n_range = self._comedi.comedi_get_n_ranges(self._dev, subdevice, chan)
-        if range >= n_range :
-            raise simAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdevice, chan, n_range-1)
-        maxdata = self._comedi.comedi_get_maxdata(self._dev, subdevice, chan)
-        comrange = self._comedi.comedi_get_range(self._dev, subdevice, chan, range)
-        return {"maxdata":maxdata, "comrange": comrange}
-    def _gen_rough_output_cmd(self) :
-        if VERBOSE :
-            print "generate rough output command"
-        cmd = self._comedi.comedi_cmd_struct()
-        cmd.subdev = self._osubdev
-        cmd.flags = self._comedi.CMDF_WRITE
-        if AO_TRIGGERS_OFF_AI_START :
-            cmd.start_src = self._comedi.TRIG_EXT
-            cmd.start_arg = 18 # AI_START1 internal AI start signal
-        else :
-            cmd.start_src = self._comedi.TRIG_INT
-            cmd.start_arg = 0
-        cmd.scan_begin_src = self._comedi.TRIG_TIMER
-        cmd.scan_begin_arg = 1 # temporary value for now
-        cmd.convert_src = self._comedi.TRIG_NOW
-        cmd.convert_arg = 0
-        cmd.scan_end_src = self._comedi.TRIG_COUNT
-        cmd.scan_end_arg = self.o_nchan
-        cmd.stop_src = self._comedi.TRIG_COUNT
-        cmd.stop_arg = 1 # temporary value for now
-        cmd.chanlist = self._ochanlist
-        cmd.chanlist_len = self.o_nchan
-        self._test_cmd(cmd, max_passes=3)
-        self._ocmd = cmd
-    def _gen_rough_input_cmd(self) :
-        if VERBOSE :
-            print "generate rough input command"
-        cmd = self._comedi.comedi_cmd_struct()
-        cmd.subdev = self._isubdev
-        cmd.flags = 0
-        cmd.start_src = self._comedi.TRIG_INT
-        cmd.start_arg = 0
-        cmd.scan_begin_src = self._comedi.TRIG_TIMER
-        cmd.scan_begin_arg = 1 # temporary value for now
-        cmd.convert_src = self._comedi.TRIG_TIMER
-        cmd.convert_arg = 1
-        cmd.scan_end_src = self._comedi.TRIG_COUNT
-        cmd.scan_end_arg = self.i_nchan
-        cmd.stop_src = self._comedi.TRIG_COUNT
-        cmd.stop_arg = 1 # temporary value for now
-        cmd.chanlist = self._ichanlist
-        cmd.chanlist_len = self.i_nchan
-        self._test_cmd(cmd, max_passes=3)
-        self._icmd = cmd
-    def _test_cmd(self, cmd, max_passes=1) :
-        very_verbose = False
-        i = 0
-        rc = 0
-        if  very_verbose : 
-            print "Testing command:"
-            _print_command(cmd)
-        while i < max_passes :
-            rc = self._comedi.comedi_command_test(self._dev, cmd)
-            if (rc == 0) :
-                break
-            if VERBOSE or very_verbose :
-                print "test pass %d, %s" % (i, _cmdtest_message[rc])
-            i += 1
-        if (VERBOSE or very_verbose) and i < max_passes :
-            print "Passing command:"
-            _print_command(cmd)
-        if i >= max_passes :
-            print "Failing command:"
-            _print_command(cmd)
-            raise simAioError, "Invalid command: %s" % _cmdtest_message[rc]
-    def setup(self, nsamps, freq, out_buffer) :
-        if self.state != "Initialized" :
-            raise simAioError, "Invalid state %s" % self.state
-        if type(out_buffer) != type(_example_array) :
-            raise simAioError, "out_buffer must be a numpy array, not a %s" % str(type(out_buffer))
-        self._ocmd.scan_begin_arg = int(1e9/freq)
-        self._ocmd.stop_arg = nsamps
-        if VERBOSE :
-            print "Configure the board (%d ns per scan, %d samps)" % (self._ocmd.scan_begin_arg, self._ocmd.stop_arg)
-        self._onremain = nsamps
-        self._test_cmd(self._ocmd)
-        rc = self._comedi.comedi_command(self._dev, self._ocmd)
-        if rc < 0 :
-            self._comedi.comedi_perror("comedi_command")
-            raise simAioError, "Error executing output command %d" % rc
-        self._icmd.scan_begin_arg = int(1e9/freq)
-        self._icmd.stop_arg = nsamps
-        self._test_cmd(self._icmd)
-        self._inremain = nsamps
-        rc = self._comedi.comedi_command(self._dev, self._icmd)
-        if rc < 0 :
-            self._comedi.comedi_perror("comedi_command")
-            raise simAioError, "Error executing input command"
-
-        if VERBOSE :
-            print "Write %d output samples to the card" % (nsamps*self.o_nchan)
-        rc = int16_rw.write_samples(self._fd, nsamps*self.o_nchan, out_buffer, 1)
-        if rc != nsamps*self.o_nchan :
-            raise simAioError, "Error %d writing output buffer\n" % rc
-        rc = int16_rw.write_samples(self._fd, self.o_nchan, out_buffer[-self.o_nchan:], 1) # HACK, add an extra sample for each channel to the output buffer
-        if rc != self.o_nchan :
-            raise simAioError, "Error %d writing hack output buffer\n" % rc
-        # maybe will avoid resetting...
-        self._nsamps = nsamps
-        self.state = "Setup"
-    def arm(self) :
-        if self.state != "Setup" :
-            raise simAioError, "Invalid state %s" % self.state 
-        if VERBOSE :
-            print "Arm the analog ouptut"
-        self._comedi_internal_trigger(self._osubdev)
-        self.state = "Armed"
-    def start_read(self, in_buffer) :
-        if self.state != "Armed" :
-            raise simAioError, "Invalid state %s" % self.state
-        if VERBOSE :
-            print "Start the run"
-        self._comedi_internal_trigger(self._isubdev)
-        if VERBOSE :
-            print "Read %d input samples from the card" % (self._nsamps*self.i_nchan)
-        rc = int16_rw.read_samples(self._fd, self._nsamps*self.i_nchan, in_buffer, -1)
-        if rc != self._nsamps*self.i_nchan :
-            raise simAioError, "Error %d reading input buffer\n" % rc
-        self.state = "Read"
-    def _comedi_internal_trigger(self, subdevice) :
-        data = self._comedi.chanlist(1) # by luck, data is an array of lsampl_t (unsigned ints), as is chanlist
-        insn = self._comedi.comedi_insn_struct()
-        insn.insn = self._comedi.INSN_INTTRIG
-        insn.subdev = subdevice
-        insn.data = data
-        insn.n = 1
-        data[0] = 0
-        rc = self._comedi.comedi_do_insn(self._dev, insn)
-    def reset(self, force=False) :
-        if VERBOSE :
-            print "Reset the analog subdevices"
-        # clean up after the read
-        rc = self._comedi.comedi_cancel(self._dev, self._osubdev)
-        if rc < 0 :
-            self._comedi.comedi_perror("comedi_cancel")
-            raise simAioError, "Error cleaning up output command"
-        rc = self._comedi.comedi_cancel(self._dev, self._isubdev)
-        if rc < 0 :
-            self._comedi.comedi_perror("comedi_cancel")
-            raise simAioError, "Error cleaning up input command"
-        self.state = "Initialized"
-
-
-# define the test suite
-
-def _test_aio_obj(aio=None, start_wait=0, verbose=False) :
-    if (verbose) :
-        print "_test_aio_obj(start_wait = %g)" % start_wait
-    nsamps = 10
-    out_data = array([0]*nsamps, dtype=uint16)
-    in_data =  array([0]*nsamps, dtype=uint16)
-    for i in range(nsamps) :
-        out_data[i] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
-    aio.setup(10, 1000, out_data)
-    aio.arm()
-    sleep(start_wait)
-    aio.start_read(in_data)
-    aio.reset()
-    if (verbose) :
-        print "out_data:\n", out_data
-        print "in_data:\n", in_data
-        print "residual:\n[",
-        for i, o in zip(in_data, out_data) :
-            print int(i)-int(o),
-        print "]"
-    return (out_data, in_data)
-
-def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=False) :
-    print "_repeat_aio_test()"
-    grads = array([0]*num_tests, dtype=float32)
-    good = 0
-    bad = 0
-    good_run = 0
-    good_run_arr = []
-    for i in range(num_tests) :
-        out_data, in_data = _test_aio_obj(aio, start_wait)
-        gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
-        grads[i] = gradient
-        if verbose :
-            print "wait %2d, run %2d, gradient %g" % (start_wait, i, gradient)
-        if gradient < .7 :
-            bad += 1
-            good_run_arr.append(good_run)
-            good_run = 0
-        else :
-            good += 1
-            good_run += 1
-    good_run_arr.append(good_run)
-    print "failure rate %g%% in %d runs" % ((float(bad)/float(good+bad))*100.0, num_tests)
-    call = 'echo "'
-    for num in good_run_arr :
-        call += "%d " % num
-    call += '" | stem_leaf 2'
-    print "good run stem and leaf:"
-    system(call)
-
-def _test_aio_obj_multi_chan(aio=None, start_wait=0, verbose=False) :
-    if (verbose) :
-        print "_test_aio_obj_multi_chan(start_wait = %g)" % start_wait
-    nsamps = 10
-    out_data = array([0]*nsamps*aio.o_nchan, dtype=uint16)
-    in_data =  array([0]*nsamps*aio.i_nchan, dtype=uint16)
-    # set up interleaved data
-    for i in range(nsamps) :
-        out_data[i*aio.o_nchan] = int(30000.0+3000.0*sin(2*pi*i/float(nsamps)))
-        for j in range(1, aio.o_nchan) :
-            out_data[i*aio.o_nchan + j] = 0
-    aio.setup(10, 1000, out_data)
-    aio.arm()
-    sleep(start_wait)
-    aio.start_read(in_data)
-    aio.reset()
-    if (verbose) :
-        print "#",
-        for j in range(aio.o_nchan) :
-            print "%s\t" % aio._ochan[j],
-        for j in range(aio.i_nchan) :
-            print "%s\t" % aio._ichan[j],
-        print ""
-        for i in range(nsamps) :
-            for j in range(aio.o_nchan) :
-                print "%s\t" % out_data[i*aio.o_nchan+j],
-            for j in range(aio.i_nchan) :
-                print "%s\t" % in_data[i*aio.i_nchan+j],
-            print ""
-    return (out_data, in_data)
-
-
-
-def test() :
-    aio = aio_obj()
-    _test_aio_obj(aio, start_wait = 0, verbose=True)
-    _test_aio_obj(aio, start_wait = 0.5, verbose=True)
-    aio.close()
-    aio.open()
-    #_repeat_aio_test(aio, num_tests=100, start_wait=0, verbose=False)
-    aio.close()
-
-    aiom = aio_obj(in_chan=(0,1,2,3), out_chan=(0,1))
-    _test_aio_obj_multi_chan(aiom, start_wait = 0, verbose=True)
-
-if __name__ == "__main__" :
-    test()
diff --git a/pycomedi/comedi_single_aio.py b/pycomedi/comedi_single_aio.py
deleted file mode 100644 (file)
index 0b270b8..0000000
+++ /dev/null
@@ -1,239 +0,0 @@
-# Use Comedi drivers for single-shot analog input/output
-
-import comedi
-
-VERSION = 0.0
-VERBOSE_DEBUG = True
-
-class sngAioError (Exception) :
-    "Single point Analog IO error"
-    pass
-
-class ai_obj :
-    def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1,2,3), aref=0, range=0) :
-        self.verbose = False
-        self.comedi = comedi
-        self.filename = filename
-        self.state = "Closed"
-        self.open()
-        if (subdevice < 0) : # autodetect an output device
-            self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AI, 0) # 0 is starting subdevice
-        else :
-            self.subdev = subdevice
-            type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev)
-            if type != self.comedi.COMEDI_SUBD_AI :
-                raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
-        self.chan = chan
-        self.aref = aref
-        self.range = range
-        subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev)
-        self.maxdata = []
-        self.comedi_range = []
-        for chan in self.chan :
-            if int(chan) != chan :
-                raise sngAioError, "Channels must be integers, not %s" % str(chan)
-            if chan >= subdev_n_chan :
-                raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1)
-            n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan)
-            if range > n_range :
-                raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1)
-            maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan)
-            self.maxdata.append(maxdata)
-            comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range)
-            # comrange becomes invalid if device is closed, so make a copy...
-            comrange_copy = self.comedi.comedi_range()
-            comrange_copy.min = comrange.min
-            comrange_copy.max = comrange.max
-            comrange_copy.unit = comrange.unit
-            self.comedi_range.append(comrange_copy)
-    def __del__(self) :
-        self.close()
-    def open(self) :
-        if self.state == "Closed" :
-            self.dev = self.comedi.comedi_open(self.filename)
-            self.state = "Opened"
-    def close(self) :
-        if self.state != "Closed" :
-            rc = self.comedi.comedi_close(self.dev)
-            if rc < 0 :
-                self.comedi.comedi_perror("comedi_close")
-                raise sngAioError, "Cannot close %s" % self.filename
-            self.state = "Closed"
-    def comedi_to_phys(self, chan_index, comedi) :
-        phys = self.comedi.comedi_to_phys(comedi, self.comedi_range[chan_index], self.maxdata[chan_index])
-        if self.verbose : 
-            print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
-        return phys
-    def phys_to_comedi(self, chan_index, phys) :
-        comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index])
-        if self.verbose : 
-            print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
-        return comedi
-    def read_chan_index(self, chan_index) :
-        rc, data = self.comedi.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref);
-        if rc != 1 : # the number of samples read
-            raise sngAioError, "comedi_data_read returned %d" % rc
-        return data
-    def read(self) :
-        out = range(len(self.chan))
-        for i in range(len(self.chan)) :
-            out[i] = self.read_chan_index(i)
-        #print "Read %s, got %s" % (str(self.chan), str(out))
-        return out
-
-def _test_ai_obj() :
-    ai = ai_obj()
-    print "read ", ai.read()
-    print "read ", ai.read()
-    print "read ", ai.read()
-    print "read ", ai.read()
-    ai.close()
-    print "ai success"
-
-class ao_obj :
-    def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1), aref=0, range=0) :
-        self.verbose = False
-        self.comedi = comedi
-        self.filename = filename
-        self.state = "Closed"
-        self.open()
-        if (subdevice < 0) : # autodetect an output device
-            self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AO, 0) # 0 is starting subdevice
-        else :
-            self.subdev = subdevice
-            type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev)
-            if type != self.comedi.COMEDI_SUBD_AO :
-                raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
-        self.chan = chan
-        self.aref = aref
-        self.range = range
-        subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev)
-        self.maxdata = []
-        self.comedi_range = []
-        for chan in self.chan :
-            if chan >= subdev_n_chan :
-                raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1)
-            n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan)
-            if range > n_range :
-                raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1)
-            maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan)
-            self.maxdata.append(maxdata)
-            comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range)
-            # comrange becomes invalid if device is closed, so make a copy...
-            comrange_copy = self.comedi.comedi_range()
-            comrange_copy.min = comrange.min
-            comrange_copy.max = comrange.max
-            comrange_copy.unit = comrange.unit
-            self.comedi_range.append(comrange_copy)
-    def __del__(self) :
-        self.close()
-    def open(self) :
-        if self.state != "Closed" :
-            raise sngAioError, "Invalid state %s" % self.state
-        self.dev = self.comedi.comedi_open(self.filename)
-        self.state = "Opened"
-    def close(self) :
-        if self.state != "Closed" :
-            for i in range(len(self.chan)) : 
-                self.write_chan_index(i, self.phys_to_comedi(i, 0))
-            rc = self.comedi.comedi_close(self.dev)
-            if rc < 0 :
-                self.comedi.comedi_perror("comedi_close")
-                raise sngAioError, "Cannot close %s" % self.filename
-            self.state = "Closed"
-    def comedi_to_phys(self, chan_index, comedi) :
-        phys = self.comedi.comedi_to_phys(int(comedi), self.comedi_range[chan_index], self.maxdata[chan_index])
-        if self.verbose : 
-            print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
-        return phys
-    def phys_to_comedi(self, chan_index, phys) :
-        comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index])
-        if self.verbose : 
-            print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
-        return comedi
-    def write_chan_index(self, chan_index, data) :
-        #print "set output on chan %d to %d" % (chan_index, data)
-        rc = self.comedi.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, int(data));
-        if rc != 1 : # the number of samples written
-            raise sngAioError, 'comedi_data_write returned %d' % rc
-    def write(self, data) :
-        if len(data) != len(self.chan) :
-            raise sngAioError,  "data length %d != the number of channels (%d)" % (len(data), len(self.chan))
-        for i in range(len(self.chan)) :
-            self.write_chan_index(i, data[i])
-
-def _test_ao_obj() :
-    ao = ao_obj()
-    ao.write([0,0])
-    ao.write([3000,3000])
-    ao.write([0,0])
-    ao.close()
-    print "ao success"
-
-def _fit_with_residual(out_data, in_data) :
-    from scipy.stats import linregress
-    gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
-    print "y = %g + %g x" % (intercept, gradient)
-    print "r = ", r_value # correlation coefficient = covariance / (std_dev_x*std_dev_y)
-    print "p = ", p_value # probablility of measuring this ?slope? for non-correlated, normally-distruibuted data
-    print "err = ", std_err # root mean sqared error of best fit
-    if gradient < .7 or p_value > 0.05 :
-        raise sngAioError, "Out channel 0 != in channel 0"
-    residual = zeros((points,))
-    for i in range(points) :
-        pred_y = intercept + gradient * out_data[i]
-        residual[i] = in_data[i] - pred_y
-    return residual
-
-def plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1) :
-    try :
-        from pylab import plot, show, subplot
-        subplot(311)
-        plot(out_data0, in_data0, 'r.-', out_data1, in_data1, 'b.')
-        subplot(312)
-        plot(out_data0, residual0, 'r.', residual1, 'b.')
-        subplot(313)
-        plot(in_data0, 'r.', out_data1, 'b.')
-        show() # if interactive mode is off...
-        #raw_input("Press enter to continue") # otherwise, pause
-    except ImportError :
-        pass # ignore plot erros
-
-def _test_aio() :
-    from scipy.stats import linregress
-    from numpy import linspace, zeros
-    ao = ao_obj(chan=(0,1))
-    ai = ai_obj(chan=(0,1))
-    start = 0.1 * ao.maxdata[0]
-    stop = 0.9 * ao.maxdata[0]
-    points = 10
-    out_data0 = linspace(start, stop, points)
-    out_data1 = linspace(stop, start, points)
-    in_data0 = zeros((points,))
-    in_data1 = zeros((points,))
-    for i in range(points) :
-        ao.write([out_data0[i], out_data1[i]])
-        id = ai.read()
-        in_data0[i] = id[0]
-        in_data1[i] = id[1]
-    ai.close()
-    ao.close()
-    residual0 = _fit_with_residual(out_data0, in_data0)
-    residual1 = _fit_with_residual(out_data1, in_data1)
-    if VERBOSE_DEBUG :
-        plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1)
-    for i in range(points) :
-        if abs(residual0[i]) > 10 : # HACK, hardcoded maximum nonlinearity
-            raise Exception, "Input 0, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data0[i], in_data0[i], residual0[i])
-        if abs(residual1[i]) > 10 : # HACK, hardcoded maximum nonlinearity
-            raise Exception, "Input 1, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data1[i], in_data1[i], residual1[i])
-
-    print "_test_aio success"
-
-def test() :
-    _test_ai_obj()
-    _test_ao_obj()
-    _test_aio()
-
-if __name__ == "__main__" :
-    test()
diff --git a/pycomedi/comedi_single_dio.py b/pycomedi/comedi_single_dio.py
deleted file mode 100644 (file)
index e3fd9c0..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-"""Use Comedi drivers for single-shot digital input/output
-
-Being single-shot implementations, read/writes will be software timed,
-so this module would not be a good choice if you need millisecond
-resolution.  However, it does provide a simple and robust way to
-generate/aquire signals at 1 second and greater timescales.
-"""
-
-import comedi as c
-
-VERSION = 0.0
-
-class dioError (Exception) :
-    "Digital IO error"
-    pass
-
-class dio_obj :
-    def __init__(self, filename="/dev/comedi0", subdevice=2, chan=(0,1,2,3), aref=0, range=0, output=True) :
-        self.filename = filename
-        self.subdev = subdevice
-        self.chan = chan
-        self.aref = aref
-        self.range = range
-        self.output = output
-        self.dev = c.comedi_open(filename)
-        if self.dev < 0 :
-            raise dioError, "Cannot open %s" % self.filename
-        type = c.comedi_get_subdevice_type(self.dev, self.subdev)
-        if type != c.COMEDI_SUBD_DIO :
-            raise dioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
-        if self.output :
-            self.set_to_output()
-        else :
-            self.set_to_input()
-    def set_to_output(self) :
-        for chan in self.chan :
-            rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_OUTPUT)
-            if rc != 1 : # yes, comedi_dio_config returns 1 on success, -1 on failure, as of comedilib-0.8.1
-                raise dioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_OUTPUT, rc)
-        self.output = True
-    def set_to_input(self) :
-        for chan in self.chan :
-            rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_INPUT)
-            if rc != 1 :
-                raise dioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_INPUT, rc)
-        self.output = False
-    def write_chan_index(self, chan_index, data) :
-        if self.output != True :
-            raise dioError, "Must be an output to write"
-        rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, data);
-        if rc != 1 : # the number of samples written
-            raise dioError, "comedi_data_write returned %d" % rc
-    def read_chan_index(self, chan_index) :
-        if self.output == True :
-            raise dioError, "Must be an input to read"
-        rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref);
-        if rc != 1 : # the number of samples read
-            raise dioError, "comedi_data_read returned %d" % rc
-        return data
-    def write_port(self, data) :
-        "Channel significance increases with array index"
-        for i in range(len(self.chan)) :
-            self.write_chan_index(i, (data >> i) % 2)
-    def read_port(self) :
-        data = 0
-        for i in range(len(self.chan)) :
-            data += self.read_chan_index(i) << i
-        return data
-
-class write_dig_port (dio_obj) :
-    def __call__(self, data) :
-        self.write_port(data)
-
-def _test_dio_obj() :
-    d = dio_obj()
-    d.set_to_output()
-    d.write_chan_index(0, 1)
-    d.write_chan_index(0, 0)
-    d.write_port(7)
-    d.set_to_input()
-    data = d.read_chan_index(0)
-    print "channel %d is %d" % (d.chan[0], data)
-    data = d.read_port()
-    print "port value is %d" % data
-    print "dio_obj success"
-
-def _test_write_dig_port() :
-    p = write_dig_port()
-    for data in [0, 1, 2, 3, 4, 5, 6, 7] :
-        p(data)
-    print "write_dig_port success"
-
-def test() :
-    _test_dio_obj()
-    _test_write_dig_port()
-
-if __name__ == "__main__" :
-    test()