Versioning started.
[pycomedi.git] / pycomedi / comedi_single_aio.py
1 # Use Comedi drivers for single-shot analog input/output
2
3 import comedi
4
5 VERSION = 0.0
6 VERBOSE_DEBUG = True
7
8 class sngAioError (Exception) :
9     "Single point Analog IO error"
10     pass
11
12 class ai_obj :
13     def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1,2,3), aref=0, range=0) :
14         self.verbose = False
15         self.comedi = comedi
16         self.filename = filename
17         self.state = "Closed"
18         self.open()
19         if (subdevice < 0) : # autodetect an output device
20             self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AI, 0) # 0 is starting subdevice
21         else :
22             self.subdev = subdevice
23             type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev)
24             if type != self.comedi.COMEDI_SUBD_AI :
25                 raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
26         self.chan = chan
27         self.aref = aref
28         self.range = range
29         subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev)
30         self.maxdata = []
31         self.comedi_range = []
32         for chan in self.chan :
33             if int(chan) != chan :
34                 raise sngAioError, "Channels must be integers, not %s" % str(chan)
35             if chan >= subdev_n_chan :
36                 raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1)
37             n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan)
38             if range > n_range :
39                 raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1)
40             maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan)
41             self.maxdata.append(maxdata)
42             comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range)
43             # comrange becomes invalid if device is closed, so make a copy...
44             comrange_copy = self.comedi.comedi_range()
45             comrange_copy.min = comrange.min
46             comrange_copy.max = comrange.max
47             comrange_copy.unit = comrange.unit
48             self.comedi_range.append(comrange_copy)
49     def __del__(self) :
50         self.close()
51     def open(self) :
52         if self.state == "Closed" :
53             self.dev = self.comedi.comedi_open(self.filename)
54             self.state = "Opened"
55     def close(self) :
56         if self.state != "Closed" :
57             rc = self.comedi.comedi_close(self.dev)
58             if rc < 0 :
59                 self.comedi.comedi_perror("comedi_close")
60                 raise sngAioError, "Cannot close %s" % self.filename
61             self.state = "Closed"
62     def comedi_to_phys(self, chan_index, comedi) :
63         phys = self.comedi.comedi_to_phys(comedi, self.comedi_range[chan_index], self.maxdata[chan_index])
64         if self.verbose : 
65             print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
66         return phys
67     def phys_to_comedi(self, chan_index, phys) :
68         comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index])
69         if self.verbose : 
70             print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
71         return comedi
72     def read_chan_index(self, chan_index) :
73         rc, data = self.comedi.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref);
74         if rc != 1 : # the number of samples read
75             raise sngAioError, "comedi_data_read returned %d" % rc
76         return data
77     def read(self) :
78         out = range(len(self.chan))
79         for i in range(len(self.chan)) :
80             out[i] = self.read_chan_index(i)
81         #print "Read %s, got %s" % (str(self.chan), str(out))
82         return out
83
84 def _test_ai_obj() :
85     ai = ai_obj()
86     print "read ", ai.read()
87     print "read ", ai.read()
88     print "read ", ai.read()
89     print "read ", ai.read()
90     ai.close()
91     print "ai success"
92
93 class ao_obj :
94     def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1), aref=0, range=0) :
95         self.verbose = False
96         self.comedi = comedi
97         self.filename = filename
98         self.state = "Closed"
99         self.open()
100         if (subdevice < 0) : # autodetect an output device
101             self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AO, 0) # 0 is starting subdevice
102         else :
103             self.subdev = subdevice
104             type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev)
105             if type != self.comedi.COMEDI_SUBD_AO :
106                 raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
107         self.chan = chan
108         self.aref = aref
109         self.range = range
110         subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev)
111         self.maxdata = []
112         self.comedi_range = []
113         for chan in self.chan :
114             if chan >= subdev_n_chan :
115                 raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1)
116             n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan)
117             if range > n_range :
118                 raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1)
119             maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan)
120             self.maxdata.append(maxdata)
121             comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range)
122             # comrange becomes invalid if device is closed, so make a copy...
123             comrange_copy = self.comedi.comedi_range()
124             comrange_copy.min = comrange.min
125             comrange_copy.max = comrange.max
126             comrange_copy.unit = comrange.unit
127             self.comedi_range.append(comrange_copy)
128     def __del__(self) :
129         self.close()
130     def open(self) :
131         if self.state != "Closed" :
132             raise sngAioError, "Invalid state %s" % self.state
133         self.dev = self.comedi.comedi_open(self.filename)
134         self.state = "Opened"
135     def close(self) :
136         if self.state != "Closed" :
137             for i in range(len(self.chan)) : 
138                 self.write_chan_index(i, self.phys_to_comedi(i, 0))
139             rc = self.comedi.comedi_close(self.dev)
140             if rc < 0 :
141                 self.comedi.comedi_perror("comedi_close")
142                 raise sngAioError, "Cannot close %s" % self.filename
143             self.state = "Closed"
144     def comedi_to_phys(self, chan_index, comedi) :
145         phys = self.comedi.comedi_to_phys(int(comedi), self.comedi_range[chan_index], self.maxdata[chan_index])
146         if self.verbose : 
147             print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
148         return phys
149     def phys_to_comedi(self, chan_index, phys) :
150         comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index])
151         if self.verbose : 
152             print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
153         return comedi
154     def write_chan_index(self, chan_index, data) :
155         #print "set output on chan %d to %d" % (chan_index, data)
156         rc = self.comedi.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, int(data));
157         if rc != 1 : # the number of samples written
158             raise sngAioError, 'comedi_data_write returned %d' % rc
159     def write(self, data) :
160         if len(data) != len(self.chan) :
161             raise sngAioError,  "data length %d != the number of channels (%d)" % (len(data), len(self.chan))
162         for i in range(len(self.chan)) :
163             self.write_chan_index(i, data[i])
164
165 def _test_ao_obj() :
166     ao = ao_obj()
167     ao.write([0,0])
168     ao.write([3000,3000])
169     ao.write([0,0])
170     ao.close()
171     print "ao success"
172
173 def _fit_with_residual(out_data, in_data) :
174     from scipy.stats import linregress
175     gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
176     print "y = %g + %g x" % (intercept, gradient)
177     print "r = ", r_value # correlation coefficient = covariance / (std_dev_x*std_dev_y)
178     print "p = ", p_value # probablility of measuring this ?slope? for non-correlated, normally-distruibuted data
179     print "err = ", std_err # root mean sqared error of best fit
180     if gradient < .7 or p_value > 0.05 :
181         raise sngAioError, "Out channel 0 != in channel 0"
182     residual = zeros((points,))
183     for i in range(points) :
184         pred_y = intercept + gradient * out_data[i]
185         residual[i] = in_data[i] - pred_y
186     return residual
187
188 def plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1) :
189     try :
190         from pylab import plot, show, subplot
191         subplot(311)
192         plot(out_data0, in_data0, 'r.-', out_data1, in_data1, 'b.')
193         subplot(312)
194         plot(out_data0, residual0, 'r.', residual1, 'b.')
195         subplot(313)
196         plot(in_data0, 'r.', out_data1, 'b.')
197         show() # if interactive mode is off...
198         #raw_input("Press enter to continue") # otherwise, pause
199     except ImportError :
200         pass # ignore plot erros
201
202 def _test_aio() :
203     from scipy.stats import linregress
204     from numpy import linspace, zeros
205     ao = ao_obj(chan=(0,1))
206     ai = ai_obj(chan=(0,1))
207     start = 0.1 * ao.maxdata[0]
208     stop = 0.9 * ao.maxdata[0]
209     points = 10
210     out_data0 = linspace(start, stop, points)
211     out_data1 = linspace(stop, start, points)
212     in_data0 = zeros((points,))
213     in_data1 = zeros((points,))
214     for i in range(points) :
215         ao.write([out_data0[i], out_data1[i]])
216         id = ai.read()
217         in_data0[i] = id[0]
218         in_data1[i] = id[1]
219     ai.close()
220     ao.close()
221     residual0 = _fit_with_residual(out_data0, in_data0)
222     residual1 = _fit_with_residual(out_data1, in_data1)
223     if VERBOSE_DEBUG :
224         plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1)
225     for i in range(points) :
226         if abs(residual0[i]) > 10 : # HACK, hardcoded maximum nonlinearity
227             raise Exception, "Input 0, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data0[i], in_data0[i], residual0[i])
228         if abs(residual1[i]) > 10 : # HACK, hardcoded maximum nonlinearity
229             raise Exception, "Input 1, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data1[i], in_data1[i], residual1[i])
230
231     print "_test_aio success"
232
233 def test() :
234     _test_ai_obj()
235     _test_ao_obj()
236     _test_aio()
237
238 if __name__ == "__main__" :
239     test()