1 # Use Comedi drivers for single-shot analog input/output
8 class sngAioError (Exception) :
9 "Single point Analog IO error"
13 def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1,2,3), aref=0, range=0) :
16 self.filename = filename
19 if (subdevice < 0) : # autodetect an output device
20 self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AI, 0) # 0 is starting subdevice
22 self.subdev = subdevice
23 type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev)
24 if type != self.comedi.COMEDI_SUBD_AI :
25 raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
29 subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev)
31 self.comedi_range = []
32 for chan in self.chan :
33 if int(chan) != chan :
34 raise sngAioError, "Channels must be integers, not %s" % str(chan)
35 if chan >= subdev_n_chan :
36 raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1)
37 n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan)
39 raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1)
40 maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan)
41 self.maxdata.append(maxdata)
42 comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range)
43 # comrange becomes invalid if device is closed, so make a copy...
44 comrange_copy = self.comedi.comedi_range()
45 comrange_copy.min = comrange.min
46 comrange_copy.max = comrange.max
47 comrange_copy.unit = comrange.unit
48 self.comedi_range.append(comrange_copy)
52 if self.state == "Closed" :
53 self.dev = self.comedi.comedi_open(self.filename)
56 if self.state != "Closed" :
57 rc = self.comedi.comedi_close(self.dev)
59 self.comedi.comedi_perror("comedi_close")
60 raise sngAioError, "Cannot close %s" % self.filename
62 def comedi_to_phys(self, chan_index, comedi) :
63 phys = self.comedi.comedi_to_phys(comedi, self.comedi_range[chan_index], self.maxdata[chan_index])
65 print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
67 def phys_to_comedi(self, chan_index, phys) :
68 comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index])
70 print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
72 def read_chan_index(self, chan_index) :
73 rc, data = self.comedi.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref);
74 if rc != 1 : # the number of samples read
75 raise sngAioError, "comedi_data_read returned %d" % rc
78 out = range(len(self.chan))
79 for i in range(len(self.chan)) :
80 out[i] = self.read_chan_index(i)
81 #print "Read %s, got %s" % (str(self.chan), str(out))
86 print "read ", ai.read()
87 print "read ", ai.read()
88 print "read ", ai.read()
89 print "read ", ai.read()
94 def __init__(self, filename="/dev/comedi0", subdevice=-1, chan=(0,1), aref=0, range=0) :
97 self.filename = filename
100 if (subdevice < 0) : # autodetect an output device
101 self.subdev = self.comedi.comedi_find_subdevice_by_type(self.dev, self.comedi.COMEDI_SUBD_AO, 0) # 0 is starting subdevice
103 self.subdev = subdevice
104 type = self.comedi.comedi_get_subdevice_type(self.dev, self.subdev)
105 if type != self.comedi.COMEDI_SUBD_AO :
106 raise sngAioError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
110 subdev_n_chan = self.comedi.comedi_get_n_channels(self.dev, self.subdev)
112 self.comedi_range = []
113 for chan in self.chan :
114 if chan >= subdev_n_chan :
115 raise sngAioError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1)
116 n_range = self.comedi.comedi_get_n_ranges(self.dev, self.subdev, chan)
118 raise sngAioError, "Range %d > subdevice %d, chan %d's largest range %d" % (range, subdev, chan, n_range-1)
119 maxdata = self.comedi.comedi_get_maxdata(self.dev, self.subdev, chan)
120 self.maxdata.append(maxdata)
121 comrange = self.comedi.comedi_get_range(self.dev, self.subdev, chan, range)
122 # comrange becomes invalid if device is closed, so make a copy...
123 comrange_copy = self.comedi.comedi_range()
124 comrange_copy.min = comrange.min
125 comrange_copy.max = comrange.max
126 comrange_copy.unit = comrange.unit
127 self.comedi_range.append(comrange_copy)
131 if self.state != "Closed" :
132 raise sngAioError, "Invalid state %s" % self.state
133 self.dev = self.comedi.comedi_open(self.filename)
134 self.state = "Opened"
136 if self.state != "Closed" :
137 for i in range(len(self.chan)) :
138 self.write_chan_index(i, self.phys_to_comedi(i, 0))
139 rc = self.comedi.comedi_close(self.dev)
141 self.comedi.comedi_perror("comedi_close")
142 raise sngAioError, "Cannot close %s" % self.filename
143 self.state = "Closed"
144 def comedi_to_phys(self, chan_index, comedi) :
145 phys = self.comedi.comedi_to_phys(int(comedi), self.comedi_range[chan_index], self.maxdata[chan_index])
147 print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
149 def phys_to_comedi(self, chan_index, phys) :
150 comedi = self.comedi.comedi_from_phys(phys, self.comedi_range[chan_index], self.maxdata[chan_index])
152 print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self.comedi_range[chan_index].max, self.comedi_range[chan_index].min, self.maxdata[chan_index])
154 def write_chan_index(self, chan_index, data) :
155 #print "set output on chan %d to %d" % (chan_index, data)
156 rc = self.comedi.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], self.range, self.aref, int(data));
157 if rc != 1 : # the number of samples written
158 raise sngAioError, 'comedi_data_write returned %d' % rc
159 def write(self, data) :
160 if len(data) != len(self.chan) :
161 raise sngAioError, "data length %d != the number of channels (%d)" % (len(data), len(self.chan))
162 for i in range(len(self.chan)) :
163 self.write_chan_index(i, data[i])
168 ao.write([3000,3000])
173 def _fit_with_residual(out_data, in_data) :
174 from scipy.stats import linregress
175 gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
176 print "y = %g + %g x" % (intercept, gradient)
177 print "r = ", r_value # correlation coefficient = covariance / (std_dev_x*std_dev_y)
178 print "p = ", p_value # probablility of measuring this ?slope? for non-correlated, normally-distruibuted data
179 print "err = ", std_err # root mean sqared error of best fit
180 if gradient < .7 or p_value > 0.05 :
181 raise sngAioError, "Out channel 0 != in channel 0"
182 residual = zeros((points,))
183 for i in range(points) :
184 pred_y = intercept + gradient * out_data[i]
185 residual[i] = in_data[i] - pred_y
188 def plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1) :
190 from pylab import plot, show, subplot
192 plot(out_data0, in_data0, 'r.-', out_data1, in_data1, 'b.')
194 plot(out_data0, residual0, 'r.', residual1, 'b.')
196 plot(in_data0, 'r.', out_data1, 'b.')
197 show() # if interactive mode is off...
198 #raw_input("Press enter to continue") # otherwise, pause
200 pass # ignore plot erros
203 from scipy.stats import linregress
204 from numpy import linspace, zeros
205 ao = ao_obj(chan=(0,1))
206 ai = ai_obj(chan=(0,1))
207 start = 0.1 * ao.maxdata[0]
208 stop = 0.9 * ao.maxdata[0]
210 out_data0 = linspace(start, stop, points)
211 out_data1 = linspace(stop, start, points)
212 in_data0 = zeros((points,))
213 in_data1 = zeros((points,))
214 for i in range(points) :
215 ao.write([out_data0[i], out_data1[i]])
221 residual0 = _fit_with_residual(out_data0, in_data0)
222 residual1 = _fit_with_residual(out_data1, in_data1)
224 plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1)
225 for i in range(points) :
226 if abs(residual0[i]) > 10 : # HACK, hardcoded maximum nonlinearity
227 raise Exception, "Input 0, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data0[i], in_data0[i], residual0[i])
228 if abs(residual1[i]) > 10 : # HACK, hardcoded maximum nonlinearity
229 raise Exception, "Input 1, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data1[i], in_data1[i], residual1[i])
231 print "_test_aio success"
238 if __name__ == "__main__" :