3 # Copyright (C) 2012 W. Trevor King <wking@tremily.us>
5 # This file is part of pycomedi.
7 # pycomedi is free software: you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation, either version 2 of the License, or (at your option) any later
12 # pycomedi is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License along with
17 # pycomedi. If not, see <http://www.gnu.org/licenses/>.
19 """Output a series of data files using an analog output Comedi subdevice.
22 import os.path as _os_path
24 import numpy as _numpy
25 from scipy.io import wavfile as _wavfile
27 from pycomedi.device import Device as _Device
28 from pycomedi.subdevice import StreamingSubdevice as _StreamingSubdevice
29 from pycomedi.channel import AnalogChannel as _AnalogChannel
30 from pycomedi import constant as _constant
31 from pycomedi import utility as _utility
35 LOADER = { # frequency,raw_signal = LOADER[extension](filename)
36 '.npy': lambda filename: (NUMPY_FREQ, _numpy.load(filename)),
37 '.wav': _wavfile.read,
41 def setup_device(filename, subdevice, channels, range, aref):
42 """Open the Comedi device at filename and setup analog output channels.
44 device = _Device(filename=filename)
47 ao_subdevice = device.find_subdevice_by_type(
48 _constant.SUBDEVICE_TYPE.ao, factory=_StreamingSubdevice)
50 ao_subdevice = device.subdevice(subdevice, factory=_StreamingSubdevice)
52 ao_subdevice.channel(i, factory=_AnalogChannel, range=range, aref=aref)
54 return (device, ao_subdevice, ao_channels)
57 """Load a date file and return (frequency, unit_output_signal)
59 Values in unit_output_signal are scaled to the range [-1,1].
61 root,ext = _os_path.splitext(filename)
63 frequency,raw_signal = loader(filename)
64 iinfo = _numpy.iinfo(raw_signal.dtype)
65 raw_signal_midpoint = (iinfo.max + iinfo.min)/2.
66 raw_signal_range = iinfo.max - raw_signal_midpoint
67 unit_output_signal = (raw_signal - raw_signal_midpoint)/raw_signal_range
68 return (frequency, unit_output_signal)
70 def generate_output_buffer(ao_subdevice, ao_channels, unit_output_signal):
71 """Setup an output buffer from unit_output_signal
73 The output signal in bits is scaled so that -1 in
74 unit_output_signal maps to the minimum output voltage for each
75 channel, and +1 in unit_output_signal maps to the maximum output
76 voltage for each channel.
78 ao_dtype = ao_subdevice.get_dtype()
79 n_samps,n_chans = unit_output_signal.shape
80 assert n_chans <= len(ao_channels), (
81 'need at least {0} channels but have only {1}'.format(
82 n_chans, ao_channels))
83 ao_buffer = _numpy.zeros((n_samps, n_chans), dtype=ao_dtype)
84 for i in range(n_chans):
85 range_ = ao_channels[i].range
86 midpoint = (range_.max + range_.min)/2
87 v_amp = range_.max - midpoint
88 converter = ao_channels[i].get_converter()
89 volt_output_signal = midpoint + v_amp*unit_output_signal[:,i]
90 ao_buffer[:,i] = converter.from_physical(volt_output_signal)
93 def setup_command(ao_subdevice, ao_channels, frequency, output_buffer):
94 """Setup ao_subdevice.cmd to output output_buffer using ao_channels
96 scan_period_ns = int(1e9 / frequency)
97 n_chan = output_buffer.shape[1]
98 ao_cmd = ao_subdevice.get_cmd_generic_timed(n_chan, scan_period_ns)
99 ao_cmd.start_src = _constant.TRIG_SRC.int
101 ao_cmd.stop_src = _constant.TRIG_SRC.count
102 ao_cmd.stop_arg = len(output_buffer)
103 ao_cmd.chanlist = ao_channels[:n_chan]
104 ao_subdevice.cmd = ao_cmd
106 def run_command(device, ao_subdevice, output_buffer):
107 """Write output_buffer using ao_subdevice
109 Blocks until the output is complete.
111 ao_subdevice.command()
112 writer = _utility.Writer(
113 ao_subdevice, output_buffer,
114 preload=ao_subdevice.get_buffer_size()/output_buffer.itemsize,
115 block_while_running=True)
117 device.do_insn(_utility.inttrig_insn(ao_subdevice))
120 def run(filename, subdevice, channels, range, aref, mmap=False, files=[]):
124 >>> from numpy import arange, iinfo, int16, pi, save, sin, zeros
126 Create temporary files for testing.
128 >>> time = arange(NUMPY_FREQ, dtype=float)/NUMPY_FREQ
130 >>> iint16 = iinfo(int16)
131 >>> a = (iint16.max - iint16.min)/2.
132 >>> one_chan = zeros((NUMPY_FREQ,1), dtype=int16)
133 >>> one_chan[:,0] = a*sin(f*time/(2*pi))
134 >>> fd,one_chan_path = tempfile.mkstemp(prefix='pycomedi-', suffix='.npy')
135 >>> fp = os.fdopen(fd, 'w')
136 >>> save(fp, one_chan)
139 >>> two_chan = zeros((NUMPY_FREQ,2), dtype=int16)
140 >>> two_chan[:,0] = a*sin(f*time/(2*pi))
141 >>> two_chan[:,1] = a*sin(2*f*time/(2*pi))
142 >>> fd,two_chan_path = tempfile.mkstemp(prefix='pycomedi-', suffix='.npy')
143 >>> fp = os.fdopen(fd, 'w')
144 >>> save(fp, two_chan)
147 >>> run(filename='/dev/comedi0', subdevice=None,
148 ... channels=[0,1], range=0, aref=_constant.AREF.ground,
149 ... files=[one_chan_path, two_chan_path])
151 >>> os.remove(one_chan_path)
152 >>> os.remove(two_chan_path)
154 device,ao_subdevice,ao_channels = setup_device(
155 filename=filename, subdevice=subdevice, channels=channels,
156 range=range, aref=aref)
157 for filename in files:
158 frequency,unit_output_signal = load(filename=filename)
159 output_buffer = generate_output_buffer(
160 ao_subdevice, ao_channels, unit_output_signal)
161 setup_command(ao_subdevice, ao_channels, frequency, output_buffer)
162 run_command(device, ao_subdevice, output_buffer)
166 if __name__ == '__main__':
167 import pycomedi_demo_args
169 pycomedi_demo_args.ARGUMENTS['files'] = (['files'], {'nargs': '+'})
170 args = pycomedi_demo_args.parse_args(
173 'filename', 'subdevice', 'channels', 'range', 'aref', 'mmap',
176 run(filename=args.filename, subdevice=args.subdevice,
177 channels=args.channels, range=args.range, aref=args.aref,
178 mmap=args.mmap, files=args.files)