Ran update-copyright.py.
[pycomedi.git] / doc / demo / aout.py
1 #!/usr/bin/env python
2 #
3 # Copyright (C) 2012 W. Trevor King <wking@tremily.us>
4 #
5 # This file is part of pycomedi.
6 #
7 # pycomedi is free software: you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation, either version 2 of the License, or (at your option) any later
10 # version.
11 #
12 # pycomedi is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along with
17 # pycomedi.  If not, see <http://www.gnu.org/licenses/>.
18
19 """Output a series of data files using an analog output Comedi subdevice.
20 """
21
22 import os.path as _os_path
23
24 import numpy as _numpy
25 from scipy.io import wavfile as _wavfile
26
27 from pycomedi.device import Device as _Device
28 from pycomedi.subdevice import StreamingSubdevice as _StreamingSubdevice
29 from pycomedi.channel import AnalogChannel as _AnalogChannel
30 from pycomedi import constant as _constant
31 from pycomedi import utility as _utility
32
33
34 NUMPY_FREQ = 8000
35 LOADER = {  # frequency,raw_signal = LOADER[extension](filename)
36     '.npy': lambda filename: (NUMPY_FREQ, _numpy.load(filename)),
37     '.wav': _wavfile.read,
38     }
39
40
41 def setup_device(filename, subdevice, channels, range, aref):
42     """Open the Comedi device at filename and setup analog output channels.
43     """
44     device = _Device(filename=filename)
45     device.open()
46     if subdevice is None:
47         ao_subdevice = device.find_subdevice_by_type(
48             _constant.SUBDEVICE_TYPE.ao, factory=_StreamingSubdevice)
49     else:
50         ao_subdevice = device.subdevice(subdevice, factory=_StreamingSubdevice)
51     ao_channels = [
52         ao_subdevice.channel(i, factory=_AnalogChannel, range=range, aref=aref)
53         for i in channels]
54     return (device, ao_subdevice, ao_channels)
55
56 def load(filename):
57     """Load a date file and return (frequency, unit_output_signal)
58
59     Values in unit_output_signal are scaled to the range [-1,1].
60     """
61     root,ext = _os_path.splitext(filename)
62     loader = LOADER[ext]
63     frequency,raw_signal = loader(filename)
64     iinfo = _numpy.iinfo(raw_signal.dtype)
65     raw_signal_midpoint = (iinfo.max + iinfo.min)/2.
66     raw_signal_range = iinfo.max - raw_signal_midpoint
67     unit_output_signal = (raw_signal - raw_signal_midpoint)/raw_signal_range
68     return (frequency, unit_output_signal)
69
70 def generate_output_buffer(ao_subdevice, ao_channels, unit_output_signal):
71     """Setup an output buffer from unit_output_signal
72
73     The output signal in bits is scaled so that -1 in
74     unit_output_signal maps to the minimum output voltage for each
75     channel, and +1 in unit_output_signal maps to the maximum output
76     voltage for each channel.
77     """
78     ao_dtype = ao_subdevice.get_dtype()
79     n_samps,n_chans = unit_output_signal.shape
80     assert n_chans <= len(ao_channels), (
81             'need at least {0} channels but have only {1}'.format(
82                 n_chans, ao_channels))
83     ao_buffer = _numpy.zeros((n_samps, n_chans), dtype=ao_dtype)
84     for i in range(n_chans):
85         range_ = ao_channels[i].range
86         midpoint = (range_.max + range_.min)/2
87         v_amp = range_.max - midpoint
88         converter = ao_channels[i].get_converter()
89         volt_output_signal = midpoint + v_amp*unit_output_signal[:,i]
90         ao_buffer[:,i] = converter.from_physical(volt_output_signal)
91     return ao_buffer
92
93 def setup_command(ao_subdevice, ao_channels, frequency, output_buffer):
94     """Setup ao_subdevice.cmd to output output_buffer using ao_channels
95     """
96     scan_period_ns = int(1e9 / frequency)
97     n_chan = output_buffer.shape[1]
98     ao_cmd = ao_subdevice.get_cmd_generic_timed(n_chan, scan_period_ns)
99     ao_cmd.start_src = _constant.TRIG_SRC.int
100     ao_cmd.start_arg = 0
101     ao_cmd.stop_src = _constant.TRIG_SRC.count
102     ao_cmd.stop_arg = len(output_buffer)
103     ao_cmd.chanlist = ao_channels[:n_chan]
104     ao_subdevice.cmd = ao_cmd
105
106 def run_command(device, ao_subdevice, output_buffer):
107     """Write output_buffer using ao_subdevice
108
109     Blocks until the output is complete.
110     """
111     ao_subdevice.command()
112     writer = _utility.Writer(
113         ao_subdevice, output_buffer,
114         preload=ao_subdevice.get_buffer_size()/output_buffer.itemsize,
115         block_while_running=True)
116     writer.start()
117     device.do_insn(_utility.inttrig_insn(ao_subdevice))
118     writer.join()
119
120 def run(filename, subdevice, channels, range, aref, mmap=False, files=[]):
121     """
122     >>> import os
123     >>> import tempfile
124     >>> from numpy import arange, iinfo, int16, pi, save, sin, zeros
125
126     Create temporary files for testing.
127
128     >>> time = arange(NUMPY_FREQ, dtype=float)/NUMPY_FREQ
129     >>> f = 440
130     >>> iint16 = iinfo(int16)
131     >>> a = (iint16.max - iint16.min)/2.
132     >>> one_chan = zeros((NUMPY_FREQ,1), dtype=int16)
133     >>> one_chan[:,0] = a*sin(f*time/(2*pi))
134     >>> fd,one_chan_path = tempfile.mkstemp(prefix='pycomedi-', suffix='.npy')
135     >>> fp = os.fdopen(fd, 'w')
136     >>> save(fp, one_chan)
137     >>> fp.close()
138
139     >>> two_chan = zeros((NUMPY_FREQ,2), dtype=int16)
140     >>> two_chan[:,0] = a*sin(f*time/(2*pi))
141     >>> two_chan[:,1] = a*sin(2*f*time/(2*pi))
142     >>> fd,two_chan_path = tempfile.mkstemp(prefix='pycomedi-', suffix='.npy')
143     >>> fp = os.fdopen(fd, 'w')
144     >>> save(fp, two_chan)
145     >>> fp.close()
146
147     >>> run(filename='/dev/comedi0', subdevice=None,
148     ...     channels=[0,1], range=0, aref=_constant.AREF.ground,
149     ...     files=[one_chan_path, two_chan_path])
150
151     >>> os.remove(one_chan_path)
152     >>> os.remove(two_chan_path)
153     """
154     device,ao_subdevice,ao_channels = setup_device(
155         filename=filename, subdevice=subdevice, channels=channels,
156         range=range, aref=aref)
157     for filename in files:
158         frequency,unit_output_signal = load(filename=filename)
159         output_buffer = generate_output_buffer(
160             ao_subdevice, ao_channels, unit_output_signal)
161         setup_command(ao_subdevice, ao_channels, frequency, output_buffer)
162         run_command(device, ao_subdevice, output_buffer)
163     device.close()
164
165
166 if __name__ == '__main__':
167     import pycomedi_demo_args
168
169     pycomedi_demo_args.ARGUMENTS['files'] = (['files'], {'nargs': '+'})
170     args = pycomedi_demo_args.parse_args(
171         description=__doc__,
172         argnames=[
173             'filename', 'subdevice', 'channels', 'range', 'aref', 'mmap',
174             'files', 'verbose'])
175
176     run(filename=args.filename, subdevice=args.subdevice,
177         channels=args.channels, range=args.range, aref=args.aref,
178         mmap=args.mmap, files=args.files)