Add doc/demo/aout.py example of repeated analog output.
authorW. Trevor King <wking@drexel.edu>
Tue, 17 Apr 2012 16:30:27 +0000 (12:30 -0400)
committerW. Trevor King <wking@drexel.edu>
Tue, 17 Apr 2012 16:39:49 +0000 (12:39 -0400)
This script contains the condensed wisdom of mailing list
discussions between Richard Höchenberger and myself.

Cc: Richard Höchenberger
doc/demo/aout.py [new file with mode: 0755]

diff --git a/doc/demo/aout.py b/doc/demo/aout.py
new file mode 100755 (executable)
index 0000000..b3db7c7
--- /dev/null
@@ -0,0 +1,164 @@
+#!/usr/bin/env python
+#
+# Copyright
+
+"""Output a series of data files using an analog output Comedi subdevice.
+"""
+
+import os.path as _os_path
+
+import numpy as _numpy
+from scipy.io import wavfile as _wavfile
+
+from pycomedi.device import Device as _Device
+from pycomedi.subdevice import StreamingSubdevice as _StreamingSubdevice
+from pycomedi.channel import AnalogChannel as _AnalogChannel
+from pycomedi import constant as _constant
+from pycomedi import utility as _utility
+
+
+NUMPY_FREQ = 8000
+LOADER = {  # frequency,raw_signal = LOADER[extension](filename)
+    '.npy': lambda filename: (NUMPY_FREQ, _numpy.load(filename)),
+    '.wav': _wavfile.read,
+    }
+
+
+def setup_device(filename, subdevice, channels, range, aref):
+    """Open the Comedi device at filename and setup analog output channels.
+    """
+    device = _Device(filename=filename)
+    device.open()
+    if subdevice is None:
+        ao_subdevice = device.find_subdevice_by_type(
+            _constant.SUBDEVICE_TYPE.ao, factory=_StreamingSubdevice)
+    else:
+        ao_subdevice = device.subdevice(subdevice, factory=_StreamingSubdevice)
+    ao_channels = [
+        ao_subdevice.channel(i, factory=_AnalogChannel, range=range, aref=aref)
+        for i in channels]
+    return (device, ao_subdevice, ao_channels)
+
+def load(filename):
+    """Load a date file and return (frequency, unit_output_signal)
+
+    Values in unit_output_signal are scaled to the range [-1,1].
+    """
+    root,ext = _os_path.splitext(filename)
+    loader = LOADER[ext]
+    frequency,raw_signal = loader(filename)
+    iinfo = _numpy.iinfo(raw_signal.dtype)
+    raw_signal_midpoint = (iinfo.max + iinfo.min)/2.
+    raw_signal_range = iinfo.max - raw_signal_midpoint
+    unit_output_signal = (raw_signal - raw_signal_midpoint)/raw_signal_range
+    return (frequency, unit_output_signal)
+
+def generate_output_buffer(ao_subdevice, ao_channels, unit_output_signal):
+    """Setup an output buffer from unit_output_signal
+
+    The output signal in bits is scaled so that -1 in
+    unit_output_signal maps to the minimum output voltage for each
+    channel, and +1 in unit_output_signal maps to the maximum output
+    voltage for each channel.
+    """
+    ao_dtype = ao_subdevice.get_dtype()
+    n_samps,n_chans = unit_output_signal.shape
+    assert n_chans <= len(ao_channels), (
+            'need at least {0} channels but have only {1}'.format(
+                n_chans, ao_channels))
+    ao_buffer = _numpy.zeros((n_samps, n_chans), dtype=ao_dtype)
+    for i in range(n_chans):
+        range_ = ao_channels[i].range
+        midpoint = (range_.max + range_.min)/2
+        v_amp = range_.max - midpoint
+        converter = ao_channels[i].get_converter()
+        volt_output_signal = midpoint + v_amp*unit_output_signal[:,i]
+        ao_buffer[:,i] = converter.from_physical(volt_output_signal)
+    return ao_buffer
+
+def setup_command(ao_subdevice, ao_channels, frequency, output_buffer):
+    """Setup ao_subdevice.cmd to output output_buffer using ao_channels
+    """
+    scan_period_ns = int(1e9 / frequency)
+    n_chan = output_buffer.shape[1]
+    ao_cmd = ao_subdevice.get_cmd_generic_timed(n_chan, scan_period_ns)
+    ao_cmd.start_src = _constant.TRIG_SRC.int
+    ao_cmd.start_arg = 0
+    ao_cmd.stop_src = _constant.TRIG_SRC.count
+    ao_cmd.stop_arg = len(output_buffer)
+    ao_cmd.chanlist = ao_channels[:n_chan]
+    ao_subdevice.cmd = ao_cmd
+
+def run_command(device, ao_subdevice, output_buffer):
+    """Write output_buffer using ao_subdevice
+
+    Blocks until the output is complete.
+    """
+    ao_subdevice.command()
+    writer = _utility.Writer(
+        ao_subdevice, output_buffer,
+        preload=ao_subdevice.get_buffer_size()/output_buffer.itemsize,
+        block_while_running=True)
+    writer.start()
+    device.do_insn(_utility.inttrig_insn(ao_subdevice))
+    writer.join()
+
+def run(filename, subdevice, channels, range, aref, mmap=False, files=[]):
+    """
+    >>> import os
+    >>> import tempfile
+    >>> from numpy import arange, iinfo, int16, pi, save, sin, zeros
+
+    Create temporary files for testing.
+
+    >>> time = arange(NUMPY_FREQ, dtype=float)/NUMPY_FREQ
+    >>> f = 440
+    >>> iint16 = iinfo(int16)
+    >>> a = (iint16.max - iint16.min)/2.
+    >>> one_chan = zeros((NUMPY_FREQ,1), dtype=int16)
+    >>> one_chan[:,0] = a*sin(f*time/(2*pi))
+    >>> fd,one_chan_path = tempfile.mkstemp(prefix='pycomedi-', suffix='.npy')
+    >>> fp = os.fdopen(fd, 'w')
+    >>> save(fp, one_chan)
+    >>> fp.close()
+
+    >>> two_chan = zeros((NUMPY_FREQ,2), dtype=int16)
+    >>> two_chan[:,0] = a*sin(f*time/(2*pi))
+    >>> two_chan[:,1] = a*sin(2*f*time/(2*pi))
+    >>> fd,two_chan_path = tempfile.mkstemp(prefix='pycomedi-', suffix='.npy')
+    >>> fp = os.fdopen(fd, 'w')
+    >>> save(fp, two_chan)
+    >>> fp.close()
+
+    >>> run(filename='/dev/comedi0', subdevice=None,
+    ...     channels=[0,1], range=0, aref=_constant.AREF.ground,
+    ...     files=[one_chan_path, two_chan_path])
+
+    >>> os.remove(one_chan_path)
+    >>> os.remove(two_chan_path)
+    """
+    device,ao_subdevice,ao_channels = setup_device(
+        filename=filename, subdevice=subdevice, channels=channels,
+        range=range, aref=aref)
+    for filename in files:
+        frequency,unit_output_signal = load(filename=filename)
+        output_buffer = generate_output_buffer(
+            ao_subdevice, ao_channels, unit_output_signal)
+        setup_command(ao_subdevice, ao_channels, frequency, output_buffer)
+        run_command(device, ao_subdevice, output_buffer)
+    device.close()
+
+
+if __name__ == '__main__':
+    import pycomedi_demo_args
+
+    pycomedi_demo_args.ARGUMENTS['files'] = (['files'], {'nargs': '+'})
+    args = pycomedi_demo_args.parse_args(
+        description=__doc__,
+        argnames=[
+            'filename', 'subdevice', 'channels', 'range', 'aref', 'mmap',
+            'files', 'verbose'])
+
+    run(filename=args.filename, subdevice=args.subdevice,
+        channels=args.channels, range=args.range, aref=args.aref,
+        mmap=args.mmap, files=args.files)