From 1302cd7d02bc7de750035edc09cdf6a97c496d7f Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Wed, 9 Feb 2011 09:22:32 -0500 Subject: [PATCH] Convert to more generic Pythonic wrapper. Not everything works yet. --- LICENSE => COPYING | 0 MANIFEST.in | 2 +- Makefile | 19 - README | 137 ++++-- doc/software_timed_analog_IO.txt | 74 +++ doc/software_timed_digital_IO.txt | 40 ++ doc/synchronized_analog_IO.txt | 139 ++++++ pycomedi/__init__.py | 25 +- pycomedi/classes.py | 775 ++++++++++++++++++++++++++++++ pycomedi/common.py | 424 ---------------- pycomedi/constants.py | 344 +++++++++++++ pycomedi/simult_aio.py | 592 ----------------------- pycomedi/single_aio.py | 173 ------- pycomedi/single_dio.py | 119 ----- pycomedi/utility.py | 422 ++++++++++++++++ setup.py | 18 +- 16 files changed, 1915 insertions(+), 1388 deletions(-) rename LICENSE => COPYING (100%) delete mode 100644 Makefile create mode 100644 doc/software_timed_analog_IO.txt create mode 100644 doc/software_timed_digital_IO.txt create mode 100644 doc/synchronized_analog_IO.txt create mode 100644 pycomedi/classes.py delete mode 100644 pycomedi/common.py create mode 100644 pycomedi/constants.py delete mode 100644 pycomedi/simult_aio.py delete mode 100644 pycomedi/single_aio.py delete mode 100644 pycomedi/single_dio.py create mode 100644 pycomedi/utility.py diff --git a/LICENSE b/COPYING similarity index 100% rename from LICENSE rename to COPYING diff --git a/MANIFEST.in b/MANIFEST.in index 1aba38f..eb762f3 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1 @@ -include LICENSE +include COPYING diff --git a/Makefile b/Makefile deleted file mode 100644 index 5d044c6..0000000 --- a/Makefile +++ /dev/null @@ -1,19 +0,0 @@ -.PHONY : all check dist clean - -all : dummy_py - -dummy_py : setup.py pycomedi/*.py - python setup.py install --home=~ - echo "dummy for Makefile dependencies" > $@ - -check : all - python pycomedi.py - -dist : - python setup.py sdist - scp dist/pycomedi*tar.gz einstein:public_html/code/python/ - -clean : - python setup.py clean - rm -rf build dist pycomedi.egg-info - rm -f dummy_py *.pyc diff --git a/README b/README index b90010a..c6eccf6 100644 --- a/README +++ b/README @@ -1,70 +1,113 @@ -This package provides an object-oriented interface to the Comedi +This package provides an object-oriented interface to the Comedi_ drivers. The standard Python interface bundled with Comedilib is a -simple SWIG clone of the C interface. In PyComedi, we wrap that basic -interface into one which is more transparent. There are simple -wrappers for common tasks, such as single point digital and analog -input/output (the single_dio and single_aio modules), as well as -synchronized, multipoint analog input/output (ther simult_aio module). +simple SWIG clone of the C interface. In pycomedi, we convert the +functions into class methods (see `pycomedi.classes`), so you don't +have to worry about dragging around opaque types like `comedi_t *` +device pointers. We also bundle related constants together in +`_Enums` and `_Flags` (see `pycomedi.constants`), to make handling +common operations like flag manipulations simpler. Finally, there are +a number of utility classes (see `pycomedi.utility`) to make common +tasks like creating instructions or reading hardware-timed analog +input easier. -== Installation == -Non-Python dependencies (Debian packagename): - easy_install (python-setuptools) - Numpy source (python-numpy-dev) - Comedi +Installation +============ -PyComedi uses `setuptools' for installation. Setuptools is basically -an extension of the standard Python distutils package which supports -automatic package dependency tracking. The installation procedure -should be (on Debian-esque systems) - # apt-get intall python-setuptools python-numpy-dev - # easy_install -f http://www.physics.drexel.edu/~wking/code/python/ pycomedi +Packages +-------- -There is a Debian package of Comedi, but in order for the -simultaneous, synchronized input/output to work you need to have both -a DAQ card that supports it (obviously), and a Comedi driver that -supports it. As of 2008-10-09, CVS versions of Comedi contain a patch -supporting this behavior for National Instruments (ni.com) cards based -on the DAQ-STC timing chip. I don't know the status of other cards or -drivers. The Comedi modules must be compiled against your kernel -headers, and the process is described in the Comedi section below. +Gentoo +~~~~~~ -There is one speedbump you might run into: - * an outdated version of easy_install (see ez_setup.py section) +I've packaged pycomedi for Gentoo. You need layman_ and my `wtk +overlay`_. Install with:: -** ez_setup.py + # emerge -av app-portage/layman + # layman --add wtk + # emerge -av dev-python/pycomedi -This package bundles - http://peak.telecommunity.com/dist/ez_setup.py -to bootstrap setuputils installation on your machine (if neccessary). -If the bootstrapping doesn't work, you may need to install a current version -of setuptools. On Debian-based systems `apt-get install python-setuptools'. -Once you have *some* version of setuptools, upgrade with - easy_install -U setuptools +Dependencies +------------ -For more information see - http://peak.telecommunity.com/DevCenter/EasyInstall - http://peak.telecommunity.com/DevCenter/setuptools#what-your-users-should-know +If you're installing by hand or packaging pycomedi for another +distribution, you'll need the following dependencies: -** Comedi +======= =================== ================ ========================== +Package Purpose Debian_ Gentoo_ +======= =================== ================ ========================== +NumPy_ ? python-numpy dev-python/numpy +SciPy_ testing python-scipy sci-libs/scipy +Comedi_ Comedilib interface python-comedilib sci-libs/comedilib [#clb]_ +nose_ testing python-nose dev-python/nose +Cython_ Comedilib interface cython dev-python/cython +======= =================== ================ ========================== -TODO (write up source installation) +.. [#clb] In the `wtk overlay`_. +TODO: Migrate from SWIG bindings to direct calls on comedilib via Cython. -== Usage == -See the tests in the various modules for simple examples. +Installing by hand +------------------ +Pycomedi is available as a Git_ repository:: -== Licence == + $ git clone http://www.physics.drexel.edu/~wking/code/git/pycomedi.git -This project is distributed under the GNU Public License Version 3 or greater. -http://www.gnu.org/licenses/gpl.txt +See the homepage_ for details. To install the checkout, run the +standard:: -== Author == + $ python setup.py install + + +Usage +===== + +See the examples in the `doc` directory. + + +Testing +======= + +Integration tests with:: + + $ nosetests --with-doctest --doctest-extension=txt doc + +Internal unit tests with:: + + $ nosetests --with-doctest pycomedi + + +Licence +======= + +This project is distributed under the `GNU General Public License +Version 3`_ or greater. + + +Author +====== W. Trevor King wking@drexel.edu -Copyright 2007, 2008 +Copyright 2007-2011 + + +.. _Comedi: http://www.comedi.org/ +.. _layman: http://layman.sourceforge.net/ +.. _wtk overlay: + http://www.physics.drexel.edu/~wking/unfolding-disasters/posts/Gentoo_overlay +.. _science overlay: http://overlays.gentoo.org/proj/science/wiki/en +.. _Debian: http://www.debian.org/ +.. _Gentoo: http://www.gentoo.org/ +.. _NumPy: http://numpy.scipy.org/ +.. _SciPy: http://www.scipy.org/ +.. _nose: http://somethingaboutorange.com/mrl/projects/nose/ +.. _Cython: http://www.cython.org/ +.. _Git: http://git-scm.com/ +.. _homepage: + http://www.physics.drexel.edu/~wking/unfolding-disasters/posts/pycomedi/ +.. _GNU General Public License Version 3: http://www.gnu.org/licenses/gpl.txt diff --git a/doc/software_timed_analog_IO.txt b/doc/software_timed_analog_IO.txt new file mode 100644 index 0000000..2b325c3 --- /dev/null +++ b/doc/software_timed_analog_IO.txt @@ -0,0 +1,74 @@ +Import required modules. + +>>> from pycomedi.classes import Device, DataChannel +>>> from pycomedi.constants import SUBDEVICE_TYPE, AREF, UNIT + +Open a device. + +>>> device = Device('/dev/comedi0') +>>> device.open() + +Get your I/O subdevice (alternatively, use `device.subdevice()` to +select the subdevice directly by index). + +>>> subdevice = device.find_subdevice_by_type(SUBDEVICE_TYPE.ai) + +Generate a list of channels you wish to control. + +>>> channels = [subdevice.channel(i, factory=DataChannel, aref=AREF.diff) +... for i in (0, 1, 2, 3)] + +Configure the channels. + +>>> for chan in channels: +... chan.range = chan.find_range(unit=UNIT.volt, min=0, max=5) + +Read/write sequentially. + +>>> value = [c.data_read_delayed(nano_sec=1e3) for c in channels] +>>> value # doctest: +SKIP +[0, 9634, 0, 15083] + +TODO: convert to physical values. + +Close the device when you're done. + +>>> device.close() + + + +As a more elaborate test, we can cable AO0 into AI0 and sweep the +voltage. + +>>> from numpy import linspace +>>> from scipy.stats import linregress +>>> device.open() +>>> ai_subdevice = device.find_subdevice_by_type(SUBDEVICE_TYPE.ai) +>>> ao_subdevice = device.find_subdevice_by_type(SUBDEVICE_TYPE.ao) +>>> ai_channel = ai_subdevice.channel(0, factory=DataChannel, aref=AREF.diff) +>>> ao_channel = ao_subdevice.channel(0, factory=DataChannel, aref=AREF.diff) +>>> ai_channel.range = ai_channel.find_range(unit=UNIT.volt, min=0, max=5) +>>> ao_channel.range = ao_channel.find_range(unit=UNIT.volt, min=0, max=5) +>>> ao_maxdata = ao_channel.get_maxdata() +>>> ai_maxdata = ai_channel.get_maxdata() +>>> ao_start = 0.3 * ao_maxdata +>>> ao_stop = 0.7 * ao_maxdata +>>> points = 10 +>>> ao_data = linspace(ao_start, ao_stop, points) +>>> ai_data = [] +>>> for i in range(points): +... written = ao_channel.data_write(ao_data[i]) +... assert written == 1, written +... ai_data.append(ai_channel.data_read_delayed(nano_sec=1e3)) +>>> ao_data +>>> ai_data +>>> scaled_ao_data = [d/float(ao_maxdata) for d in ao_data] +>>> scaled_ai_data = [d/float(ai_maxdata) for d in ai_data] +>>> gradient,intercept,r_value,p_value,std_err = linregress( +... scaled_ao_data, scaled_ai_data) +>>> gradient +>>> intercept +>>> r_value +>>> p_value +>>> std_err +>>> device.close() diff --git a/doc/software_timed_digital_IO.txt b/doc/software_timed_digital_IO.txt new file mode 100644 index 0000000..b764638 --- /dev/null +++ b/doc/software_timed_digital_IO.txt @@ -0,0 +1,40 @@ +Import required modules. + +>>> from pycomedi.classes import Device, DataChannel +>>> from pycomedi.constants import SUBDEVICE_TYPE, IO_DIRECTION + +Open a device. + +>>> device = Device('/dev/comedi0') +>>> device.open() + +Get your DIO subdevice (alternatively, use `device.subdevice()` to +select the subdevice directly by index). + +>>> subdevice = device.find_subdevice_by_type(SUBDEVICE_TYPE.dio) + +Generate a list of channels you wish to control. + +>>> channels = [subdevice.channel(i, factory=DataChannel) +... for i in (0, 1, 2, 3)] + +Configure the channels. + +>>> for chan in channels: +... chan.dio_config(IO_DIRECTION.input) + +Read/write either as a block... + +>>> value = subdevice.dio_bitfield(base_channel=0) +>>> value # doctest: +SKIP +7 + +... or sequentially. + +>>> value = [c.dio_read() for c in channels] +>>> value # doctest: +SKIP +[1, 1, 1, 0] + +Close the device when you're done. + +>>> device.close() diff --git a/doc/synchronized_analog_IO.txt b/doc/synchronized_analog_IO.txt new file mode 100644 index 0000000..823427b --- /dev/null +++ b/doc/synchronized_analog_IO.txt @@ -0,0 +1,139 @@ +Import required modules. + +>>> from numpy import arange, linspace, zeros, sin, pi +>>> from pycomedi.classes import (Device, StreamingSubdevice, DataChannel, +... Chanlist) +>>> from pycomedi.constants import (AREF, CMDF, INSN, SUBDEVICE_TYPE, TRIG_SRC, +... UNIT) +>>> from pycomedi.utility import inttrig_insn, subdevice_dtype, Reader, Writer + +Open a device. + +>>> device = Device('/dev/comedi0') +>>> device.open() + +Get your I/O subdevices (alternatively, use `device.subdevice()` to +select the subdevice directly by index). + +>>> ai_subdevice = device.find_subdevice_by_type(SUBDEVICE_TYPE.ai, +... factory=StreamingSubdevice) +>>> ao_subdevice = device.find_subdevice_by_type(SUBDEVICE_TYPE.ao, +... factory=StreamingSubdevice) + +Generate a list of channels you wish to control. + +>>> ai_channels = Chanlist( +... [ai_subdevice.channel(i, factory=DataChannel, aref=AREF.diff) +... for i in (0, 1, 2, 3)]) +>>> ao_channels = Chanlist( +... [ao_subdevice.channel(i, factory=DataChannel, aref=AREF.diff) +... for i in (0, 1)]) + +Configure the channels. + +>>> for chan in ai_channels + ao_channels: +... chan.range = chan.find_range(unit=UNIT.volt, min=-10, max=10) + +Use the subdevice flags to determine data types. + +>>> ai_dtype = subdevice_dtype(ai_subdevice) +>>> ao_dtype = subdevice_dtype(ao_subdevice) + +Allocate buffers (with channel data interleaved). + +>>> n_samps = 100 +>>> ai_buffer = zeros((n_samps, len(ai_channels)), dtype=ai_dtype) +>>> ao_buffer = zeros((n_samps, len(ao_channels)), dtype=ao_dtype) + +Initialize output buffer. + +>>> midpoint = int(ao_channels[0].get_maxdata()/2) +>>> bitrange = float(midpoint/2) +>>> ao_buffer[:,0] = midpoint+bitrange*sin(2*pi*arange(n_samps)/(n_samps-1.0)) +>>> ao_buffer[:,1] = linspace(0, midpoint, n_samps) +>>> ao_buffer[-1,:] = midpoint + +Get rough commands. + +>>> frequency = 1000.0 # Hz +>>> scan_period_ns = int(1e9 / frequency) # nanosecond period +>>> ai_subdevice.get_cmd_generic_timed(len(ai_channels), scan_period_ns) +>>> ai_cmd = ai_subdevice.cmd +>>> ao_subdevice.get_cmd_generic_timed(len(ao_channels), scan_period_ns) +>>> ao_cmd = ao_subdevice.cmd + +Setup multi-scan run triggering. + +>>> ai_cmd.start_src = TRIG_SRC.int +>>> ai_cmd.start_arg = 0 +>>> ai_cmd.stop_src = TRIG_SRC.count +>>> ai_cmd.stop_arg = n_samps +>>> ao_cmd.start_src = TRIG_SRC.ext +>>> ao_cmd.start_arg = 18 # NI card AI_START1 internal AI start signal +>>> ao_cmd.stop_src = TRIG_SRC.count +>>> ao_cmd.stop_arg = n_samps + +Setup multi-channel scan triggering (handled by get_cmd_generic_timed?). + +ai_cmd.scan_begin_src = TRIG_SRC.timer +ai_cmd.scan_begin_arg +ai_cmd.convert_src +ai_cmd.convert_arg +ai_cmd.scan_end_src +ai_cmd.scan_end_arg + +Add channel lists. + +>>> ai_cmd.chanlist = ai_channels +>>> ao_cmd.chanlist = ao_channels + +Test the commands. + +>>> ai_subdevice.cmd = ai_cmd +>>> ao_subdevice.cmd = ao_cmd +>>> for i in range(3): +... rc = ai_subdevice.command_test() +... if rc == None: break +... print i, rc +>>> for i in range(3): +... rc = ao_subdevice.command_test() +... if rc == None: break +... print i, rc + +Start the commands. + +>>> ao_subdevice.command() +>>> ai_subdevice.command() + +Start the reader and writer threads. + +>>> writer = Writer(ao_subdevice, ao_buffer) +>>> writer.start() +>>> reader = Reader(ai_subdevice, ai_buffer) +>>> reader.start() + +Arm the AO command. + +>>> device.do_insn(inttrig_insn(ao_subdevice)) +1 + +Trigger AI aquisition, which in turn triggers the AO. + +>>> device.do_insn(inttrig_insn(ai_subdevice)) +1 + +Join the reader and writer threads when they finish. + +>>> writer.join() +>>> reader.join() + +>>> ai_buffer # doctest: +SKIP +array([[32342, 31572, 30745, 31926], + [33376, 31797, 30904, 31761], +... + [31308, 24246, 22215, 21824], + [32343, 25044, 22659, 21959]], dtype=uint16) + +Close the device when you're done. + +>>> device.close() diff --git a/pycomedi/__init__.py b/pycomedi/__init__.py index a16f436..9b746a5 100644 --- a/pycomedi/__init__.py +++ b/pycomedi/__init__.py @@ -1,5 +1,4 @@ -# PyComedi provides an object-oriented interface to the Comedi drivers. -# Copyright (C) 2008-2010 W. Trevor King +# Copyright (C) 2008-2011 W. Trevor King # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,4 +13,24 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -__version__ = '0.2' +import logging as _logging + + +__version__ = '0.3' + + +LOG = _logging.getLogger('pycomedi') +"Pycomedi logger" + +LOG.setLevel(_logging.DEBUG) +h = _logging.StreamHandler() +h.setLevel(_logging.WARN) +f = _logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +h.setFormatter(f) +LOG.addHandler(h) +del h, f + + +class PyComediError (Exception): + "Error in pycomedi" + pass diff --git a/pycomedi/classes.py b/pycomedi/classes.py new file mode 100644 index 0000000..7a5c14d --- /dev/null +++ b/pycomedi/classes.py @@ -0,0 +1,775 @@ +# Copyright (C) 2010-2011 W. Trevor King +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"Object oriented wrappers around the comedi module." + +import os as _os + +import comedi as _comedi +import numpy as _numpy + +from . import LOG as _LOG +from . import PyComediError as _PyComediError +from . import constants as _constants + + +def _comedi_arg(arg): + "Replace arguments with their comedilib value." + if isinstance(arg, _constants._BitwiseOperator): + return arg.value + elif isinstance(arg, Command): + _LOG.debug(str(arg)) + return arg.cmd + elif isinstance(arg, Chanlist): + return arg.chanlist() + return arg + +def _comedi_getter(name, is_invalid): + def comedi_get(function_name, *args, **kwargs): + if 'error_msg' in kwargs: + error_msg = kwargs.pop('error_msg') + else: + error_msg = 'error while running %s with %s and %s' % ( + function_name, args, kwargs) + fn = getattr(_comedi, function_name) + + _LOG.debug('calling %s with %s %s' % (function_name, args, kwargs)) + + args = list(args) + for i,arg in enumerate(args): + args[i] = _comedi_arg(arg) + for key,value in kwargs.iteritems(): + kwargs[key] = _comedi_arg(value) + + ret = fn(*args, **kwargs) + _LOG.debug(' call to %s returned %s' % (function_name, ret)) + if is_invalid(ret): + errno = _comedi.comedi_errno() + comedi_msg = _comedi.comedi_strerror(errno) + _comedi.comedi_perror(function_name) + raise _PyComediError('%s: %s (%s)' % (error_msg, comedi_msg, ret)) + return ret + comedi_get.__name__ = name + comedi_get.__doc__ = ( + 'Execute `comedi.(*args, **kwargs)` safely.') + return comedi_get + +_comedi_int = _comedi_getter('comedi_int', lambda ret: ret < 0) +_comedi_ptr = _comedi_getter('comedi_ptr', lambda ret: ret == None) +_comedi_tup = _comedi_getter('comedi_tup', lambda ret: ret[0] < 0) + + +def _cache(method): + def wrapper(self, *args, **kwargs): + key = (method.__name__, args, str(kwargs)) + if key not in self._cache: + self._cache[key] = method(self, *args, **kwargs) + return self._cache[key] + wrapper.__name__ = method.__name__ + wrapper.__doc__ = method.__doc__ + return wrapper + + +class CacheObject (object): + """An object that caches return values for decoreated merthods. + + >>> class A (CacheObject): + ... @_cache + ... def double(self, x): + ... print 'calculating 2*%d' % x + ... return x*2 + >>> a = A() + >>> a.double(2) + calculating 2*2 + 4 + >>> a.double(2) + 4 + >>> a.double(3) + calculating 2*3 + 6 + >>> a.double(3) + 6 + >>> print sorted(a._cache.keys()) + [('double', (2,), '{}'), ('double', (3,), '{}')] + """ + def __init__(self): + self.clear_cache() + + def clear_cache(self): + self._cache = {} + + +class Channel (CacheObject): + def __init__(self, subdevice, index): + super(Channel, self).__init__() + self._subdevice = subdevice + self._index = index + + @_cache + def get_maxdata(self): + return _comedi_int( + 'comedi_get_maxdata', self._subdevice._device._device, + self._subdevice._index, self._index) + + @_cache + def get_n_ranges(self): + return _comedi_int( + 'comedi_get_n_ranges', self._subdevice._device._device, + self._subdevice._index, self._index) + + def get_range(self, index): + r = _comedi_ptr( + 'comedi_get_range', self._subdevice._device._device, + self._subdevice._index, self._index, index) + return Range(index=index, range=r) + + @_cache + def _find_range(self, unit, min, max): + "Search for range" + return _comedi_int( + 'comedi_find_range', self._subdevice._device._device, + self._subdevice._index, self._index, unit.value, min, max) + + def find_range(self, unit, min, max): + """Search for range + + `unit` should be an item from `constants.UNIT`. + """ + return self.get_range(self._find_range(unit, min, max)) + + +class Subdevice (CacheObject): + def __init__(self, device, index): + super(Subdevice, self).__init__() + self._device = device + self._index = index + + @_cache + def get_type(self): + "Type of subdevice (from `SUBDEVICE_TYPE`)" + _type = _comedi_int( + 'comedi_get_subdevice_type', self._device._device, self._index) + return _constants.SUBDEVICE_TYPE.index_by_value(_type) + + @_cache + def _get_flags(self): + "Subdevice flags" + return _comedi_int( + 'comedi_get_subdevice_flags', self._device._device, self._index) + + def get_flags(self): + "Subdevice flags (an `SDF` `FlagValue`)" + return _constants.FlagValue( + _constants.SDF, self._get_flags()) + + @_cache + def n_channels(self): + "Number of subdevice channels" + return _comedi_int( + 'comedi_get_n_channels', self._device._device, self._index) + + @_cache + def range_is_chan_specific(self): + return _comedi_int( + 'comedi_range_is_chan_specific', self._device._device, self._index) + + @_cache + def maxdata_is_chan_specific(self): + return _comedi_int( + 'comedi_maxdata_is_chan_specific', + self._device._device, self._index) + + def lock(self): + "Reserve the subdevice" + _comedi_int('comedi_lock', self._device._device, self._index) + + def unlock(self): + "Release the subdevice" + _comedi_int('comedi_unlock', self._device._device, self._index) + + def dio_bitfield(self, bits=0, write_mask=0, base_channel=0): + """Read/write multiple digital channels. + + `bits` and `write_mask` are bit fields with the least + significant bit representing channel `base_channel`. + + Returns a bit field containing the read value of all input + channels and the last written value of all output channels. + """ + rc,bits = _comedi_tup( + 'comedi_dio_bitfield2', self._device._device, + self._index, write_mask, bits, base_channel) + return bits + + # extensions to make a more idomatic Python interface + + def insn(self): + insn = self._device.insn() + insn.subdev = self._index + return insn + + def channel(self, index, factory=Channel, **kwargs): + "`Channel` instance for the `index`\ed channel." + return factory(subdevice=self, index=index, **kwargs) + + +class Device (CacheObject): + "Class bundling device-related functions." + def __init__(self, filename): + super(Device, self).__init__() + self.filename = filename + self._device = None + self.file = None + + def open(self): + "Open device" + self._device = _comedi_ptr('comedi_open', self.filename) + self.file = _os.fdopen(self.fileno(), 'r+') + self.clear_cache() + + def close(self): + "Close device" + self.file.flush() + self.file.close() + _comedi_int('comedi_close', self._device) + self._device = None + self.file = None + self.clear_cache() + + @_cache + def fileno(self): + "File descriptor for this device" + return _comedi_int('comedi_fileno', self._device) + + @_cache + def get_n_subdevices(self): + "Number of subdevices" + self._cache + return _comedi_int('comedi_get_n_subdevices', self._device) + + @_cache + def get_version_code(self): + """Comedi version code. + + This is a kernel-module level property, but a valid device is + necessary to communicate with the kernel module. + + Returns a tuple of version numbers, e.g. `(0, 7, 61)`. + """ + version = _comedi_int('comedi_get_version_code', self._device) + ret = [] + for i in range(3): + ret.insert(0, version & (2**8-1)) + version >>= 2**8 # shift over 8 bits + return tuple(ret) + + @_cache + def get_driver_name(self): + "Comedi driver name" + return _comedi_ptr('get_driver_name', self._device) + + @_cache + def get_board_name(self): + "Comedi board name" + return _comedi_ptr('get_board_name', self._device) + + @_cache + def _get_read_subdevice(self): + "Find streaming input subdevice index" + return _comedi_int('comedi_get_read_subdevice', self._device) + + def get_read_subdevice(self, **kwargs): + "Find streaming input subdevice" + return self.subdevice(self._get_read_subdevice(), **kwargs) + + @_cache + def _get_write_subdevice(self): + "Find streaming output subdevice index" + return _comedi_int('comedi_get_write_subdevice', self._device) + + def _get_write_subdevice(self, **kwargs): + "Find streaming output subdevice" + return self.subdevice(self._get_write_subdevice(), **kwargs) + + @_cache + def _find_subdevice_by_type(self, subdevice_type): + "Search for a subdevice index for type `subdevice_type`)." + return _comedi_int( + 'comedi_find_subdevice_by_type', + self._device, subdevice_type.value, 0) # 0 is starting subdevice + + def find_subdevice_by_type(self, subdevice_type, **kwargs): + """Search for a subdevice by type `subdevice_type`)." + + `subdevice_type` should be an item from `constants.SUBDEVICE_TYPE`. + """ + return self.subdevice( + self._find_subdevice_by_type(subdevice_type), **kwargs) + + def do_insnlist(self, insnlist): + """Perform multiple instructions + + Returns the number of successfully completed instructions. + """ + return _comedi_int('comedi_do_insn', self._device, insn) + + def do_insn(self, insn): + """Preform a single instruction. + + Returns an instruction-specific integer. + """ + return _comedi_int('comedi_do_insn', self._device, insn) + + def get_default_calibration_path(self): + "The default calibration path for this device" + assert self._device != None, ( + 'Must call get_default_calibration_path on an open device.') + return _comedi_ptr('comedi_get_default_calibration_path', self._device) + + # extensions to make a more idomatic Python interface + + def insn(self): + return _comedi.comedi_insn_struct() + + def subdevices(self, **kwargs): + "Iterate through all available subdevices." + for i in range(self.n_subdevices): + yield self.subdevice(i, **kwargs) + + def subdevice(self, index, factory=Subdevice, **kwargs): + return factory(device=self, index=index, **kwargs) + + +class Range (object): + def __init__(self, index, range): + self.index = index + self.range = range + + def __getattr__(self, name): + return getattr(self.range, name) + + +class Command (object): + """Wrap `comedi.comedi_cmd` with a nicer interface. + + Examples + -------- + + >>> from .utility import set_cmd_chanlist, set_cmd_data + >>> CMDF = _constants.CMDF + >>> TRIG_SRC = _constants.TRIG_SRC + >>> c = Command() + >>> c.subdev = 1 + >>> c.flags = CMDF.priority | CMDF.write + >>> c.start_src = TRIG_SRC.int | TRIG_SRC.now + >>> c.scan_begin_src = TRIG_SRC.timer + >>> c.scan_begin_arg = 10 + >>> c.scan_convert_src = TRIG_SRC.now + >>> c.scan_end_src = TRIG_SRC.count + >>> c.scan_end_arg = 4 + >>> c.stop_src = TRIG_SRC.none + >>> set_cmd_chanlist(c, []) + >>> set_cmd_data(c, [1,2,3]) + >>> print c # doctest: +ELLIPSIS, +REPORT_UDIFF + Comedi command: + subdev : 1 + flags : priority|write + start_src : now|int + start_arg : 0 + scan_begin_src : timer + scan_begin_arg : 10 + convert_src : - + convert_arg : 0 + scan_end_src : count + scan_end_arg : 4 + stop_src : none + stop_arg : 0 + chanlist : + chanlist_len : 0 + data : + data_len : 3 + """ + _str_fields = [ + 'subdev', 'flags', 'start_src', 'start_arg', 'scan_begin_src', + 'scan_begin_arg', 'convert_src', 'convert_arg', 'scan_end_src', + 'scan_end_arg', 'stop_src', 'stop_arg', 'chanlist', 'chanlist_len', + 'data', 'data_len'] + + def __init__(self): + self.cmd = _comedi.comedi_cmd_struct() + + def _get_flag_field(self, name, flag): + f = _constants.FlagValue(flag, getattr(self.cmd, name)) + return f + + def get_flags(self): + return self._get_flag_field('flags', _constants.CMDF) + + def get_trigger_field(self, name): + return self._get_flag_field(name, _constants.TRIG_SRC) + + def __str__(self): + values = {} + for f in self._str_fields: + if f.endswith('_src'): + values[f] = str(self.get_trigger_field(f)) + elif f == 'flags': + values[f] = str(self.get_flags()) + else: + values[f] = getattr(self, f) + max_len = max([len(f) for f in self._str_fields]) + lines = ['%*s : %s' % (max_len, f, values[f]) + for f in self._str_fields] + return 'Comedi command:\n %s' % '\n '.join(lines) + + def __setattr__(self, name, value): + if name == 'cmd': + return super(Command, self).__setattr__(name, value) + return setattr(self.cmd, name, _comedi_arg(value)) + + def __getattr__(self, name): + return getattr(self.cmd, name) # TODO: lookup _NamedInt? + + +class DataChannel (Channel): + """Channel configured for reading data. + + `range` should be a `Range` instance, `aref` should be an + `constants.AREF` instance, + """ + def __init__(self, range=None, aref=None, **kwargs): + super(DataChannel, self).__init__(**kwargs) + self.range = range + self.aref = aref + + # syncronous stuff + + def data_read(self): + "Read one sample" + read,data = _comedi_tup( + 'comedi_data_read', self._subdevice._device._device, + self._subdevice._index, self._index, self.range.index, self.aref) + return data + + def data_read_n(self, n): + "Read `n` samples (timing between samples is undefined)." + read,data = _comedi_tup( + 'comedi_data_read', self._subdevice._device._device, + self._subdevice._index, self._index, self.range.index, self.aref, + n) + return data + + def data_read_hint(self): + """Tell driver which channel/range/aref you will read next + + Used to prepare an analog input for a subsequent call to + comedi_data_read. It is not necessary to use this function, + but it can be useful for eliminating inaccuaracies caused by + insufficient settling times when switching the channel or gain + on an analog input. This function sets an analog input to the + channel, range, and aref specified but does not perform an + actual analog to digital conversion. + + Alternatively, one can simply use `.data_read_delayed()`, which + sets up the input, pauses to allow settling, then performs a + conversion. + """ + _comedi_int( + 'comedi_data_read_hint', self._subdevice._device._device, + self._subdevice._index, self._index, self.range.index, + self.aref.value) + + def data_read_delayed(self, nano_sec=0): + """Read single sample after delaying specified settling time. + + Although the settling time is specified in integer + nanoseconds, the actual settling time will be rounded up to + the nearest microsecond. + """ + read,data = _comedi_tup( + 'comedi_data_read_delayed', self._subdevice._device._device, + self._subdevice._index, self._index, self.range.index, + self.aref.value, int(nano_sec)) + return data + + def data_write(self, data): + "Write one sample" + written = _comedi_int( + 'comedi_data_write', self._subdevice._device._device, + self._subdevice._index, self._index, self.range.index, self.aref, + int(data)) + return written + + def dio_config(self, dir): + """Change input/output properties + + `dir` should be an item from `constants.IO_DIRECTION`. + """ + _comedi_int( + 'comedi_dio_config', self._subdevice._device._device, + self._subdevice._index, self._index, dir) + + def _dio_get_config(self): + "Query input/output properties" + return _comedi_int( + 'comedi_dio_get_config', self._subdevice._device._device, + self._subdevice._index, self._index) + + def dio_get_config(self): + """Query input/output properties + + Return an item from `constants.IO_DIRECTION`. + """ + return _constants.IO_DIRECTION.index_by_value(self._dio_get_config()) + + def dio_read(self): + "Read a single bit" + rc,bit = _comedi_tup( + 'comedi_dio_read', self._subdevice._device._device, + self._subdevice._index, self._index) + return bit + + def dio_write(self, bit): + "Write a single bit" + return _comedi_int( + 'comedi_dio_write', self._subdevice._device._device, + self._subdevice._index, self._index, bit) + + def cr_pack(self): + return _comedi.cr_pack(self._index, self.range.index, self.aref.value) + + +class SlowlyVaryingChannel (Channel): + "Slowly varying channel" + def __init__(self, **kwargs): + super(SlowlyVaryingChannel, self).__init__(**kwargs) + self._sv = _comedi.comedi_sv_t() + self.init() + + def init(self): + "Initialise `._sv`" + _comedi_int( + 'comedi_sv_init', self._sv, self._subdevice._device._device, + self._subdevice._index, self._index) + + def update(self): + "Update internal `._sv` parameters." + _comedi_int('comedi_sv_update', self._sv) + + def measure(self): + """Measure a slowy varying signal. + + Returns `(num_samples, physical_value)`. + """ + return _comedi_tup( + 'comedi_sv_measure', self._sv) + + +class StreamingSubdevice (Subdevice): + "Streaming I/O channel" + def __init__(self, **kwargs): + super(StreamingSubdevice, self).__init__(**kwargs) + self.cmd = Command() + + def get_cmd_src_mask(self): + """Detect streaming input/output capabilities + + The command capabilities of the subdevice indicated by the + parameters device and subdevice are probed, and the results + placed in the command structure *command. The trigger source + elements of the command structure are set to be the bitwise-or + of the subdevice's supported trigger sources. Other elements + in the structure are undefined. + """ + rc = _comedi_int( + 'comedi_get_cmd_src_mask', self._device._device, self._index, + self.cmd) + + def get_cmd_generic_timed(self, chanlist_len, scan_period_ns=0): + """Detect streaming input/output capabilities + + The command capabilities of the subdevice indicated by the + parameters device and subdevice are probed, and the results + placed in the command structure pointed to by the parameter + command. The command structure *command is modified to be a + valid command that can be used as a parameter to + comedi_command (after the command has additionally been + assigned a valid chanlist array). The command measures scans + consisting of chanlist_len channels at a scan rate that + corresponds to a period of scan_period_ns nanoseconds. The + rate is adjusted to a rate that the device can handle. + """ + rc = _comedi_int( + 'comedi_get_cmd_generic_timed', self._device._device, self._index, + self.cmd, chanlist_len, scan_period_ns) + + def cancel(self): + "Stop streaming input/output in progress." + _comedi_int('comedi_cancel', self._device._device, self._index) + + def command(self): + "Start streaming input/output" + _comedi_int('comedi_command', self._device._device, self.cmd) + + _command_test_errors = [ + None, # valid + 'unsupported *_src trigger', # unsupported trigger bits zeroed + 'unsupported *_src combo, or multiple triggers', + '*_arg out of range', # offending members adjusted to valid values + '*_arg required adjustment', # e.g. trigger timing period rounded + 'invalid chanlist', # e.g. some boards require same range across chans + ] + + def command_test(self): + "Test streaming input/output configuration" + rc = _comedi.comedi_command_test( + self._device._device, _comedi_arg(self.cmd)) + return self._command_test_errors[rc] + + def poll(self): + """Force updating of streaming buffer + + If supported by the driver, all available samples are copied + to the streaming buffer. These samples may be pending in DMA + buffers or device FIFOs. If successful, the number of + additional bytes available is returned. + """ + return _comedi_int('comedi_poll', self._device._device, self._index) + + def get_buffer_size(self): + "Streaming buffer size of subdevice" + return _comedi_int( + 'comedi_get_buffer_size', self._device._device, self._index) + + def set_buffer_size(self, size): + """Change the size of the streaming buffer + + Returns the new buffer size in bytes. + + The buffer size will be set to size bytes, rounded up to a + multiple of the virtual memory page size. The virtual memory + page size can be determined using `sysconf(_SC_PAGE_SIZE)`. + + This function does not require special privileges. However, it + is limited to a (adjustable) maximum buffer size, which can be + changed by a priveliged user calling + `.comedi_set_max_buffer_size`, or running the program + `comedi_config`. + """ + return _comedi_int( + 'comedi_set_buffer_size', + self._device._device, self._index, size) + + def get_max_buffer_size(self): + "Maximum streaming buffer size of subdevice" + return _comedi_int( + 'comedi_get_max_buffer_size', self._device._device, self._index) + + def set_max_buffer_size(self, max_size): + """Set the maximum streaming buffer size of subdevice + + Returns the old (max?) buffer size on success. + """ + return _comedi_int( + 'comedi_set_max_buffer_size', self._device._device, self._index, + max_size) + + def get_buffer_contents(self): + "Number of bytes available on an in-progress command" + return _comedi_int( + 'comedi_get_buffer_contents', self._device._device, self._index) + + def mark_buffer_read(self, num_bytes): + """Next `num_bytes` bytes in the buffer are no longer needed + + Returns the number of bytes successfully marked as read. + + This method should only be used if you are using a `mmap()` to + read data from Comedi's buffer (as opposed to calling `read()` + on the device file), since Comedi will automatically keep + track of how many bytes have been transferred via `read()` + calls. + """ + return _comedi_int( + 'comedi_mark_buffer_read', self._device._device, self._index, + num_bytes) + + def mark_buffer_written(self, num_bytes): + """Next `num_bytes` bytes in the buffer are no longer needed + + Returns the number of bytes successfully marked as written. + + This method should only be used if you are using a `mmap()` to + read data from Comedi's buffer (as opposed to calling + `write()` on the device file), since Comedi will automatically + keep track of how many bytes have been transferred via + `write()` calls. + """ + return _comedi_int( + 'comedi_mark_buffer_written', self._device._device, self._index, + num_bytes) + + def get_buffer_offset(self): + """Offset in bytes of the read(/write?) pointer in the streaming buffer + + This offset is only useful for memory mapped buffers. + """ + return _comedi_int( + 'comedi_get_buffer_offset', self._device._device, self._index) + + +class Chanlist (list): + def chanlist(self): + ret = _comedi.chanlist(len(self)) + for i,channel in enumerate(self): + ret[i] = channel.cr_pack() + return ret + + +class CalibratedConverter (object): + """Apply a converion polynomial + + Usually you would get the conversion polynomial from + `Channel.get_hardcal_converter()` or similar. bit for testing, + we'll just create one out of thin air. + + TODO: we'll need to use Cython, because the current SWIG bindings + don't provide a way to create or edit `double *` arrays. + + >>> p = _comedi.comedi_polynomial_t() + >>> p.order = 2 + >>> p.coefficients[0] = 1 # this fails. Silly SWIG. + >>> p.coefficients[1] = 2 + >>> p.coefficients[2] = 3 + >>> dir(p.coefficients) + >>> p.coefficients = _numpy.array([1, 2, 3, 4], dtype=_numpy.double) + >>> p.expansion_origin = -1; + + >>> c = CalibratedConverter(polynomial=p) + >>> c(-1) + >>> c(_numpy.array([-1, 0, 0.5, 2], dtype=_numpy.double)) + """ + def __init__(self, polynomial): + self.polynomial = polynomial + + def __call__(self, data): + # Iterating through the coefficients fails. Silly SWIG. + coefficients = list(reversed(self.polynomial.coefficients))[0:p.order] + print coefficients + print self.polynomial.expansion_origin + return _numpy.polyval( + coefficients, data-self.polynomial.expansion_origin) + +# see comedi_caldac_t and related at end of comedilib.h diff --git a/pycomedi/common.py b/pycomedi/common.py deleted file mode 100644 index 0423158..0000000 --- a/pycomedi/common.py +++ /dev/null @@ -1,424 +0,0 @@ -# Some Comedi operations common to analog and digital IO -# Copyright (C) 2007-2010 W. Trevor King -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -"""Some Comedi operations common to analog and digital IO""" - -import comedi as c -import time - - -class PycomediError (Exception) : - "Error in pycomedi.common" - pass - -def _expand_tuple(tup, length) : - "Expand an iterable TUP to a tuple of LENGTH by repeating the last element" - if len(tup) > length : - raise ValueError('tuple %s has more than %d elements.' % (tup, length)) - elif len(tup) < length : - temp_tup = tup + tuple((tup[-1],)*(length-len(tup))) - tup = temp_tup - return tup - -class SubdeviceFlags (object) : - """ - Descriptions from http://www.comedi.org/doc/r5153.html - (the comedi_get_subdevice_flags documentation) - - SDF_BUSY: - The subdevice is busy performing an asynchronous command. A - subdevice being "busy" is slightly different from the "running" - state flagged by SDF_RUNNING. A "running" subdevice is always - "busy", but a "busy" subdevice is not necessarily "running". For - example, suppose an analog input command has been completed by the - hardware, but there are still samples in Comedi's buffer waiting - to be read out. In this case, the subdevice is not "running", but - is still "busy" until all the samples are read out or - comedi_cancel() is called. - - SDF_BUSY_OWNER: - The subdevice is "Busy", and the command it is running was started - by the current process. - - SDF_LOCKED: - The subdevice has been locked by comedi_lock(). - - SDF_LOCK_OWNER: - The subdevice is locked, and was locked by the current process. - - SDF_MAXDATA: - The maximum data value for the subdevice depends on the channel. - - SDF_FLAGS: - The subdevice flags depend on the channel (unfinished/broken - support in library). - - SDF_RANGETYPE: - The range type depends on the channel. - - SDF_CMD: - The subdevice supports asynchronous commands. - - SDF_SOFT_CALIBRATED: - The subdevice relies on the host to do calibration in software. - Software calibration coefficients are determined by the - comedi_soft_calibrate utility. See the description of the - comedi_get_softcal_converter() function for more information. - - SDF_READABLE: - The subdevice can be read (e.g. analog input). - - SDF_WRITABLE: - The subdevice can be written to (e.g. analog output). - - SDF_INTERNAL: - The subdevice does not have externally visible lines. - - SDF_GROUND: - The subdevice supports AREF_GROUND. - - SDF_COMMON: - The subdevice supports AREF_COMMON. - - SDF_DIFF: - The subdevice supports AREF_DIFF. - - SDF_OTHER: - The subdevice supports AREF_OTHER - - SDF_DITHER: - The subdevice supports dithering (via the CR_ALT_FILTER chanspec - flag). - - SDF_DEGLITCH: - The subdevice supports deglitching (via the CR_ALT_FILTER chanspec - flag). - - SDF_RUNNING: - An asynchronous command is running. You can use this flag to poll - for the completion of an output command. - - SDF_LSAMPL: - The subdevice uses the 32 bit lsampl_t type instead of the 16 bit - sampl_t for asynchronous command data. - - SDF_PACKED: - The subdevice uses bitfield samples for asynchronous command data, - one bit per channel (otherwise it uses one sampl_t or lsampl_t per - channel). Commonly used for digital subdevices. - """ - def __init__(self, comedi) : - self.lastUpdate = None - self._flags = None - self._comedi = comedi - def update(self, dev, subdev) : - "Must be called on an open device" - self._flags = self._comedi.comedi_get_subdevice_flags(dev, subdev) - self.lastupdate = time.time() - def _check(self, sdf_flag) : - assert self._flags != None, "Cannot return status of uninitialized flag" - if self._flags & sdf_flag : - return True - return False - def busy(self) : - return(self._check(self._comedi.SDF_BUSY)) - def busyOwner(self) : - return(self._check(self._comedi.SDF_BUSY_OWNER)) - def locked(self) : - return(self._check(self._comedi.SDF_LOCKED)) - def lockOwner(self) : - return(self._check(self._comedi.SDF_LOCK_OWNER)) - def maxdata(self) : - return(self._check(self._comedi.SDF_MAXDATA)) - def flags(self) : - return(self._check(self._comedi.SDF_FLAGS)) - def rangetype(self) : - return(self._check(self._comedi.SDF_RANGETYPE)) - def cmd(self) : - return(self._check(self._comedi.SDF_CMD)) - def softCalibrated(self) : - return(self._check(self._comedi.SDF_SOFT_CALIBRATED)) - def readable(self) : - return(self._check(self._comedi.SDF_READABLE)) - def writable(self) : - return(self._check(self._comedi.SDF_WRITABLE)) - def internal(self) : - return(self._check(self._comedi.SDF_INTERNAL)) - def ground(self) : - return(self._check(self._comedi.SDF_GROUND)) - def common(self) : - return(self._check(self._comedi.SDF_COMMON)) - def diff(self) : - return(self._check(self._comedi.SDF_DIFF)) - def other(self) : - return(self._check(self._comedi.SDF_OTHER)) - def dither(self) : - return(self._check(self._comedi.SDF_DITHER)) - def deglitch(self) : - return(self._check(self._comedi.SDF_DEGLITCH)) - def running(self) : - return(self._check(self._comedi.SDF_RUNNING)) - def lsampl(self) : - return(self._check(self._comedi.SDF_LSAMPL)) - def packed(self) : - return(self._check(self._comedi.SDF_PACKED)) - def __str__(self) : - s = "" - s += "busy : %s\n" % self.busy() - s += "busyOwner : %s\n" % self.busyOwner() - s += "locked : %s\n" % self.locked() - s += "lockedOwner : %s\n" % self.lockOwner() - s += "maxdata : %s\n" % self.maxdata() - s += "flags : %s\n" % self.flags() - s += "rangetype : %s\n" % self.rangetype() - s += "cmd : %s\n" % self.cmd() - s += "softCalibrated : %s\n" % self.softCalibrated() - s += "readable : %s\n" % self.readable() - s += "writable : %s\n" % self.writable() - s += "internal : %s\n" % self.internal() - s += "ground : %s\n" % self.ground() - s += "common : %s\n" % self.common() - s += "diff : %s\n" % self.diff() - s += "other : %s\n" % self.other() - s += "dither : %s\n" % self.dither() - s += "deglitch : %s\n" % self.deglitch() - s += "running : %s\n" % self.running() - s += "lsampl : %s\n" % self.lsampl() - s += "packed : %s" % self.packed() - return s - -class PyComediIO (object) : - "Base class for Comedi IO operations" - def __init__(self, filename="/dev/comedi0", subdevice=-1, devtype=c.COMEDI_SUBD_AI, chan=(0,1,2,3), aref=(c.AREF_GROUND,), range=(0,), output=False, dev=None) : - """inputs: - filename: comedi device file for your device ["/dev/comedi0"]. - subdevice: the IO subdevice [-1 for autodetect] - devtype: the device type [c.COMEDI_SUBD_AI] - values include - comedi.COMEDI_SUBD_DI - comedi.COMEDI_SUBD_DO - comedi.COMEDI_SUBD_DIO - comedi.COMEDI_SUBD_AI - comedi.COMEDI_SUBD_AO - chan: an iterable of the channels you wish to control [(0,1,2,3)] - aref: the analog references for these channels [(comedi.AREF_GROUND,)] - values include - comedi.AREF_GROUND - comedi.AREF_COMMON - comedi.AREF_DIFF - comedi.AREF_OTHER - range: the ranges for these channels [(0,)] - output: whether to use the lines as output (vs input) (False) - dev: if you've already opened the device file, pass in the - open device (None for internal open) - Note that neither aref nor range need to be the same length as - the channel tuple. If they are shorter, their last entry will - be repeated as needed. If they are longer, PycomediError will - be raised (was: their extra entries will not be used). - """ - self.verbose = False - self._comedi = c # keep a local copy around - # sometimes I got errors on exitting python, which looked like - # the imported comedi package was unset before my IO had a - # chance to call comedi_close(). We avoid that by keeping a - # local reference here. - self.filename = filename - self.state = "Closed" - if dev == None : - self.open() - else : - self.fakeOpen(dev) - self._setup_device_type(subdevice, devtype) - self._setup_channels(chan, aref, range) - self.output = output - def __del__(self) : - self.close() - def open(self) : - if self.state == "Closed" : - dev = self._comedi.comedi_open(self.filename) - if dev < 0 : - self._comedi.comedi_perror("comedi_open") - raise PycomediError, "Cannot open %s" % self.filename - self.fakeOpen(dev) - def fakeOpen(self, dev): - """fake open: if you open the comedi device file yourself, use this - method to pass in the opened file descriptor and declare the - port "Open". - """ - if dev < 0 : - raise PycomediError, "Invalid file descriptor %d" % dev - self.dev = dev - self.state = "Open" - def close(self) : - if self.state != "Closed" : - rc = self._comedi.comedi_close(self.dev) - if rc < 0 : - self._comedi.comedi_perror("comedi_close") - raise PycomediError, "Cannot close %s" % self.filename - self.fakeClose() - def fakeClose(self): - """fake close: if you want to close the comedi device file yourself, - use this method to let the port know it has been "Closed". - """ - self.dev = None - self.state = "Closed" - def _setup_device_type(self, subdevice, devtype) : - """check that the specified subdevice exists, - searching for an appropriate subdevice if subdevice == -1 - inputs: - subdevice: the analog output subdevice (-1 for autodetect) - devtype: the devoce type - values include - comedi.COMEDI_SUBD_DI - comedi.COMEDI_SUBD_DO - comedi.COMEDI_SUBD_DIO - comedi.COMEDI_SUBD_AI - comedi.COMEDI_SUBD_AO - """ - self._devtype = devtype - if (subdevice < 0) : # autodetect an output device - self.subdev = self._comedi.comedi_find_subdevice_by_type(self.dev, self._devtype, 0) # 0 is starting subdevice - if self.subdev < 0 : - self._comedi.comedi_perror("comedi_find_subdevice_by_type") - raise PycomediError, "Could not find a %d device" % (self._devtype) - else : - self.subdev = subdevice - type = self._comedi.comedi_get_subdevice_type(self.dev, self.subdev) - if type != self._devtype : - if type < 0 : - self._comedi.comedi_perror("comedi_get_subdevice_type") - raise PycomediError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type) - self.flags = SubdeviceFlags(self._comedi) - self.updateFlags() - def updateFlags(self) : - self.flags.update(self.dev, self.subdev) - def _setup_channels(self, chan, aref, rng) : - """check that the specified channels exists, and that the arefs and - ranges are legal for those channels. Also allocate a range - item for each channel, to allow converting between physical - units and comedi units even when the device is not open. - inputs: - chan: an iterable of the channels you wish to control [(0,1,2,3)] - aref: the analog references for these channels [(comedi.AREF_GROUND,)] - values include - comedi.AREF_GROUND - comedi.AREF_COMMON - comedi.AREF_DIFF - comedi.AREF_OTHER - rng: the ranges for these channels [(0,)] - Note that neither aref nor rng need to be the same length as - the channel tuple. If they are shorter, their last entry will - be repeated as needed. If they are longer, PycomediError will - be raised (was: their extra entries will not be used). - """ - self.chan = chan - self.nchan = len(self.chan) - self._aref = _expand_tuple(aref, self.nchan) - self._range = _expand_tuple(rng, self.nchan) - self.maxdata = [] - self._comedi_range = [] - subdev_n_chan = self._comedi.comedi_get_n_channels(self.dev, self.subdev) - self.cr_chan = self._comedi.chanlist(self.nchan) - for i,chan,aref,rng in zip(range(self.nchan), self.chan, self._aref, self._range) : - if int(chan) != chan : - raise PycomediError, "Channels must be integers, not %s" % str(chan) - if chan >= subdev_n_chan : - raise PycomediError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1) - n_range = self._comedi.comedi_get_n_ranges(self.dev, self.subdev, chan) - if rng > n_range : - raise PycomediError, "Range %d > subdevice %d, chan %d's largest range %d" % (rng, subdev, chan, n_range-1) - maxdata = self._comedi.comedi_get_maxdata(self.dev, self.subdev, chan) - self.maxdata.append(maxdata) - comrange = self._comedi.comedi_get_range(self.dev, self.subdev, chan, rng) - # comrange becomes invalid if device is closed, so make a copy... - comrange_copy = self._comedi.comedi_range() - comrange_copy.min = comrange.min - comrange_copy.max = comrange.max - comrange_copy.unit = comrange.unit - self._comedi_range.append(comrange_copy) - self.cr_chan[i] = self._comedi.cr_pack(chan, rng, aref) - def comedi_to_phys(self, chan_index, comedi, useNAN=True) : - if useNAN == True : - oor = self._comedi.COMEDI_OOR_NAN - else : - oor = self._comedi.COMEDI_OOR_NUMBER - self._comedi.comedi_set_global_oor_behavior(oor) - phys = self._comedi.comedi_to_phys(int(comedi), self._comedi_range[chan_index], self.maxdata[chan_index]) - if self.verbose : - print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self.maxdata[chan_index]) - return phys - def phys_to_comedi(self, chan_index, phys) : - comedi = self._comedi.comedi_from_phys(phys, self._comedi_range[chan_index], self.maxdata[chan_index]) - if self.verbose : - print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self.maxdata[chan_index]) - return comedi - -class PyComediSingleIO (PyComediIO) : - "Software timed single-point input/ouput" - def __init__(self, **kwargs) : - """inputs: - filename: comedi device file for your device ["/dev/comedi0"]. - subdevice: the analog output subdevice [-1 for autodetect] - devtype: the device type [c.COMEDI_SUBD_AI] - values include - comedi.COMEDI_SUBD_DI - comedi.COMEDI_SUBD_DO - comedi.COMEDI_SUBD_DIO - comedi.COMEDI_SUBD_AI - comedi.COMEDI_SUBD_AO - chan: an iterable of the channels you wish to control [(0,1,2,3)] - aref: the analog references for these channels [(comedi.AREF_GROUND,)] - values include - comedi.AREF_GROUND - comedi.AREF_COMMON - comedi.AREF_DIFF - comedi.AREF_OTHER - range: the ranges for these channels [(0,)] - output: whether to use the lines as output (vs input) (False) - dev: if you've already opened the device file, pass in the - open device (None for internal open) - Note that neither aref nor range need to be the same length as - the channel tuple. If they are shorter, their last entry will - be repeated as needed. If they are longer, PycomediError will - be raised (was: their extra entries will not be used). - """ - PyComediIO.__init__(self, **kwargs) - def write_chan_index(self, chan_index, data) : - """inputs: - chan_index: the channel you wish to write to - data: the value you wish to write to that channel - """ - if self.output != True : - raise PycomediError, "Must be an output to write" - rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index], - self._range[chan_index], - self._aref[chan_index], int(data)); - if rc != 1 : # the number of samples written - self._comedi.comedi_perror("comedi_data_write") - raise PycomediError, "comedi_data_write returned %d" % rc - def read_chan_index(self, chan_index) : - """inputs: - chan_index: the channel you wish to read from - outputs: - data: the value read from that channel - """ - if self.output != False : - raise PycomediError, "Must be an input to read" - rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self._range[chan_index], self._aref[chan_index]); - if rc != 1 : # the number of samples read - self._comedi.comedi_perror("comedi_data_read") - raise PycomediError, "comedi_data_read returned %d" % rc - return data - diff --git a/pycomedi/constants.py b/pycomedi/constants.py new file mode 100644 index 0000000..550cf39 --- /dev/null +++ b/pycomedi/constants.py @@ -0,0 +1,344 @@ +# Copyright (C) 2010 W. Trevor King +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Enums and flags are bundled into class instances for easier browsing. + +>>> SUBDEVICE_TYPE # doctest: +NORMALIZE_WHITESPACE +[<_NamedInt unused>, <_NamedInt ai>, <_NamedInt ao>, <_NamedInt di>, + <_NamedInt do>, <_NamedInt dio>, <_NamedInt counter>, <_NamedInt timer>, + <_NamedInt memory>, <_NamedInt calib>, <_NamedInt proc>, <_NamedInt serial>, + <_NamedInt pwm>] +>>> SUBDEVICE_TYPE.dio +<_NamedInt dio> +>>> SUBDEVICE_TYPE.dio.value == _comedi.COMEDI_SUBD_DIO +True +>>> SUBDEVICE_TYPE.dio.doc +'COMEDI_SUBD_DIO (digital input/output)' + +You can also search by name or value. + +>>> TRIG_SRC.index_by_name('timer') +<_NamedInt timer> +>>> TRIG_SRC.index_by_value(_comedi.TRIG_NOW) +<_NamedInt now> + +Some flags have constants for setting or clearing all the flags at once. + +>>> TRIG_SRC # doctest: +NORMALIZE_WHITESPACE +[<_NamedInt none>, <_NamedInt now>, <_NamedInt follow>, <_NamedInt time>, + <_NamedInt timer>, <_NamedInt count>, <_NamedInt ext>, <_NamedInt int>, + <_NamedInt other>] +>>> TRIG_SRC._empty +<_NamedInt invalid> +>>> TRIG_SRC._all +<_NamedInt any> + +Flag instances have a special wrapper that stores their value. + +>>> f = FlagValue(SDF, 17) +>>> f.flag # doctest: +ELLIPSIS +[<_NamedInt busy>, <_NamedInt busy_owner>, ...] +>>> f.busy +True +>>> f.busy = False +>>> f._value +16 + +You can treat named integers as Python integers with bitwise operations, + +>>> a = TRIG_SRC.now | TRIG_SRC.follow | TRIG_SRC.time | 64 +>>> a +<_BitwiseOperator 78> +>>> a.value +78 +>>> TRIG_SRC.none & TRIG_SRC.now +<_BitwiseOperator 0> + +But because of the way Python operator overloading works, plain +integers must go at the end of bitwise chains. + +>>> 64 | TRIG_SRC.now +Traceback (most recent call last): + ... +TypeError: unsupported operand type(s) for |: 'int' and '_NamedInt' +""" + +from math import log as _log + +import comedi as _comedi + + +class _BitwiseOperator (object): + def __init__(self, value): + self.value = value + + def __str__(self): + return self.__repr__() + + def __repr__(self): + return '<%s %s>' % (self.__class__.__name__, self.value) + + def __and__(self, other): + "Bitwise and acts on `_BitwiseOperator.value`." + if isinstance(other, _BitwiseOperator): + other = other.value + return _BitwiseOperator(int.__and__(self.value, other)) + + def __or__(self, other): + "Bitwise or acts on `_BitwiseOperator.value`." + if isinstance(other, _BitwiseOperator): + other = other.value + return _BitwiseOperator(int.__or__(self.value, other)) + + +class _NamedInt (_BitwiseOperator): + "A flag or enum item." + def __init__(self, name, value, doc=None): + super(_NamedInt, self).__init__(value) + self.name = name + self.doc = doc + + def __str__(self): + return self.name + + def __repr__(self): + return '<%s %s>' % (self.__class__.__name__, self.name) + + +class _Enum (list): + "An enumerated list" + def __init__(self, name, prefix, blacklist=None, whitelist=None, + translation=None): + super(_Enum, self).__init__() + self.name = name + if blacklist == None: + blacklist = [] + if translation == None: + translation = {} + self._name_keys = {} + self._value_keys = {} + for attr in dir(_comedi): + if attr.startswith(prefix): + item_name = self._item_name(attr, prefix, translation) + if self._is_ignored(item_name, blacklist, whitelist): + continue + self._add_item(attr, item_name) + self.sort(key=lambda item: item.value) + + def _item_name(self, attr, prefix, translation): + item_name = attr[len(prefix):] + if item_name in translation: + return translation[item_name] + else: + return item_name.lower() + + def _is_ignored(self, item_name, blacklist, whitelist): + return (item_name in blacklist + or whitelist != None and item_name not in whitelist) + + def _add_item(self, attr, item_name): + item_value = getattr(_comedi, attr) + item = _NamedInt(item_name, item_value, doc=attr) + self.append(item) + self._name_keys[item_name] = item + if item_value in self._value_keys and item_name: + raise ValueError('value collision in %s: %s = %s = %#x' + % (self.name, item_name, + self._value_keys[item_value], item_value)) + self._value_keys[item_value] = item + setattr(self, item_name, item) + + def index_by_name(self, name): + return self._name_keys[name] + + def index_by_value(self, value): + return self._value_keys[value] + + +class _Flag (_Enum): + "A flag" + def __init__(self, *args, **kwargs): + super(_Flag, self).__init__(*args, **kwargs) + self._empty = None + self._all = None + for flag in self: + if flag.value == 0: + self._empty = flag + elif flag.value < 0 or _log(flag.value, 2) % 1 != 0: + if self._all: + raise ValueError( + 'mutliple multi-bit flags in %s: %s = %#x and %s = %#x' + % (self.name, self._all.name, self._all.value, + flag.name, flag.value)) + self._all = flag + if self._empty: + self.remove(self._empty) + if self._all: + self.remove(self._all) + + def get(self, value, name): + flag = getattr(self, name) + assert flag.value != 0, '%s: %s' % (self.name, flag) + return value & flag.value == flag.value + + def set(self, value, name, status): + flag = getattr(self, name) + if status == True: + return value | flag.value + return (value | flag.value) - flag.value + + +class FlagValue (object): + """A flag instance (flag + value) + + Examples + -------- + + >>> f = FlagValue(flag=TRIG_SRC, value=0, default='empty') + >>> f.any + False + >>> print f + empty + >>> f.now = True + >>> f.timer = True + >>> f.int = True + >>> print f + now|timer|int + """ + def __init__(self, flag, value, default='-'): + self.flag = flag + self._value = value + self._default = default + + def __str__(self): + flags = [f for f in self.flag if getattr(self, f.name)] + if len(flags) == 0: + return self._default + return '|'.join([f.name for f in flags]) + + def __getattr__(self, name): + return self.flag.get(self._value, name) + + def __setattr__(self, name, value): + if name != 'flag' and not name.startswith('_'): + value = self.flag.set(self._value, name, value) + name = '_value' + super(FlagValue, self).__setattr__(name, value) + + +# blacklist deprecated values (and those belonging to other _Enums or _Flags) + +AREF = _Enum('analog_reference', 'AREF_') +AREF.diff.doc += ' (differential)' +AREF.other.doc += ' (other / undefined)' + +#GPCT = _Flag('general_purpose_counter_timer', 'GPCT_') +# Two competing flag sets? Need some documentation. + +INSN_MASK = _Flag('instruction_mask', 'INSN_MASK_') + +CONFIGURATION_IDS = _Enum('configuration_ids', 'INSN_CONFIG_', blacklist=[ + '8254_set_mode']) + +INSN = _Enum('instruction', 'INSN_', + blacklist=['mask_%s' % i.name for i in INSN_MASK] + [ + 'config_%s' % i.name for i in CONFIGURATION_IDS]) + +TRIG = _Flag('trigger_flags', 'TRIG_', whitelist=[ + 'bogus', 'dither', 'deglitch', 'config', 'wake_eos']) +TRIG.bogus.doc += ' (do the motions)' +TRIG.config.doc += ' (perform configuration, not triggering)' +TRIG.wake_eos.doc += ' (wake up on end-of-scan events)' + +CMDF = _Flag('command_flags', 'CMDF_') +CMDF.priority.doc += ( + ' (try to use a real-time interrupt while performing command)') + +EV = _Flag('??', 'COMEDI_EV_') + +TRIG_ROUND = _Enum('trigger_round', 'TRIG_ROUND_', blacklist=['mask']) +TRIG_ROUND.mask = _comedi.TRIG_ROUND_MASK + +TRIG_SRC = _Flag('trigger_source_flags', 'TRIG_', + blacklist=[i.name for i in TRIG] + [ + 'round_%s' % i.name for i in TRIG_ROUND] + [ + 'round_mask', 'rt', 'write']) +TRIG_SRC.none.doc += ' (never trigger)' +TRIG_SRC.now.doc += ' (trigger now + N ns)' +TRIG_SRC.follow.doc += ' (trigger on next lower level trig)' +TRIG_SRC.time.doc += ' (trigger at time N ns)' +TRIG_SRC.timer.doc += ' (trigger at rate N ns)' +TRIG_SRC.count.doc += ' (trigger when count reaches N)' +TRIG_SRC.ext.doc += ' (trigger on external signal N)' +TRIG_SRC.int.doc += ' (trigger on comedi-internal signal N)' +TRIG_SRC.other.doc += ' (driver defined)' + +SDF_PWM = _Flag('pulse_width_modulation_subdevice_flags', 'SDF_PWM_') +SDF_PWM.counter.doc += ' (PWM can automatically switch off)' +SDF_PWM.hbridge.doc += ' (PWM is signed (H-bridge))' + +SDF = _Flag('subdevice_flags', 'SDF_', blacklist=[ + 'pwm_%s' % i.name for i in SDF_PWM] + [ + 'cmd', 'writeable', 'rt']) +SDF.busy.doc += ' (device is busy)' +SDF.busy_owner.doc += ' (device is busy with your job)' +SDF.locked.doc += ' (subdevice is locked)' +SDF.lock_owner.doc += ' (you own lock)' +SDF.maxdata.doc += ' (maxdata depends on channel)' +SDF.flags.doc += ' (flags depend on channel)' +SDF.rangetype.doc += ' (range type depends on channel)' +SDF.soft_calibrated.doc += ' (subdevice uses software calibration)' +SDF.cmd_write.doc += ' (can do output commands)' +SDF.cmd_read.doc += ' (can to input commands)' +SDF.readable.doc += ' (subdevice can be read, e.g. analog input)' +SDF.writable.doc += ' (subdevice can be written, e.g. analog output)' +SDF.internal.doc += ' (subdevice does not have externally visible lines)' +SDF.ground.doc += ' (can do aref=ground)' +SDF.common.doc += ' (can do aref=common)' +SDF.diff.doc += ' (can do aref=diff)' +SDF.other.doc += ' (can do aref=other)' +SDF.dither.doc += ' (can do dithering)' +SDF.deglitch.doc += ' (can do deglitching)' +SDF.mmap.doc += ' (can do mmap())' +SDF.running.doc += ' (subdevice is acquiring data)' +SDF.lsampl.doc += ' (subdevice uses 32-bit samples)' +SDF.packed.doc += ' (subdevice can do packed DIO)' + +SUBDEVICE_TYPE = _Enum('subdevice_type', 'COMEDI_SUBD_') +SUBDEVICE_TYPE.unused.doc += ' (unused by driver)' +SUBDEVICE_TYPE.ai.doc += ' (analog input)' +SUBDEVICE_TYPE.ao.doc += ' (analog output)' +SUBDEVICE_TYPE.di.doc += ' (digital input)' +SUBDEVICE_TYPE.do.doc += ' (digital output)' +SUBDEVICE_TYPE.dio.doc += ' (digital input/output)' +SUBDEVICE_TYPE.memory.doc += ' (memory, EEPROM, DPRAM)' +SUBDEVICE_TYPE.calib.doc += ' (calibration DACs)' +SUBDEVICE_TYPE.proc.doc += ' (processor, DSP)' +SUBDEVICE_TYPE.serial.doc += ' (serial IO)' +SUBDEVICE_TYPE.pwm.doc += ' (pulse-with modulation)' + +IO_DIRECTION = _Enum('io_direction', 'COMEDI_', whitelist=[ + 'input', 'output', 'opendrain']) + +SUPPORT_LEVEL = _Enum('support_level', 'COMEDI_', whitelist=[ + 'unknown_support', 'supported', 'unsupported']) + +UNIT = _Enum('unit', 'UNIT_', translation={'mA':'mA'}) + +CB = _Enum('callback_flags', 'COMEDI_CB_', blacklist=['block', 'eobuf']) +CB.eos.doc += ' (end of scan)' +CB.eoa.doc += ' (end of acquisition)' +CB.error.doc += ' (card error during acquisition)' +CB.overflow.doc += ' (buffer overflow/underflow)' diff --git a/pycomedi/simult_aio.py b/pycomedi/simult_aio.py deleted file mode 100644 index 67cc5e8..0000000 --- a/pycomedi/simult_aio.py +++ /dev/null @@ -1,592 +0,0 @@ -# Simultaneous, finite, buffered analog inpout/output using comedi drivers -# Copyright (C) 2007-2010 W. Trevor King -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import comedi as c -import common -from numpy import array, fromstring, float32, pi, sin -import int16_rw -import time - -# imports for testing -from time import sleep -from scipy.stats import linregress -from os import system - -#VERBOSE = True -VERBOSE = False -AO_TRIGGERS_OFF_AI_START = True -#AO_TRIGGERS_OFF_AI_START = False - - -# HACK! outputting last point can cause jumps to random positions -# This is probably due to some clocking issue when we trigger AO off of -# the AI Start signal. -DONT_OUTPUT_LAST_SAMPLE_HACK = True - -class SimAioError (common.PycomediError) : - "Simultaneous Analog IO error" - pass - -_example_array = array([0], dtype=int16_rw.DATA_T) # for typing, since I don't know what type(array) should be - -_cmdtest_message = ["success", - "invalid source", - "source conflict", - "invalid argument", - "argument conflict", - "invalid chanlist"] - -class cmd (object) : - """Wrap a comedi command in more Pythonic trappings. - - Due to my limited needs, this class currently only supports - software triggered runs (possibly with output triggering off the - input trigger) for a finite number of output samples where all of - the scan and sample timing is internal and as fast as possible. - - See http://www.comedi.org/doc/x621.html#COMEDICMDSTRUCTURE - for more details/possibilities. - """ - def __init__(self, IO) : - """input: - IO : an initialized common.PyComediIO object - """ - self.IO = IO - if self.IO.output == True : - self.cmdTypeString = "output" - else : - self.cmdTypeString = "input" - self.generate_rough_command() - def generate_rough_command(self) : - if VERBOSE : - print "generate rough %s command" % self.cmdTypeString - cmd = self.IO._comedi.comedi_cmd_struct() - cmd.subdev = self.IO.subdev - if self.IO.output : - cmd.flags = self.IO._comedi.CMDF_WRITE - else : - cmd.flags = 0 - # decide how to trigger a multi-scan run - if self.IO.output and AO_TRIGGERS_OFF_AI_START : - cmd.start_src = self.IO._comedi.TRIG_EXT - cmd.start_arg = 18 # AI_START1 internal AI start signal - else : - cmd.start_src = self.IO._comedi.TRIG_INT - cmd.start_arg = 0 - # decide how to trigger a multi-channel scan - cmd.scan_begin_src = self.IO._comedi.TRIG_TIMER - cmd.scan_begin_arg = 1 # temporary value for now - # decide how to trigger a single channel's aquisition - if self.IO.output : - cmd.convert_src = self.IO._comedi.TRIG_NOW # convert simultaneously (each output has it's own DAC) - cmd.convert_arg = 0 - else : - cmd.convert_src = self.IO._comedi.TRIG_TIMER # convert sequentially (all inputs share single ADC) - cmd.convert_arg = 1 # time between channels in ns, 1 to convert ASAP - # decide when a scan is complete - cmd.scan_end_src = self.IO._comedi.TRIG_COUNT - cmd.scan_end_arg = self.IO.nchan - cmd.stop_src = self.IO._comedi.TRIG_COUNT - cmd.stop_arg = 1 # temporary value for now - cmd.chanlist = self.IO.cr_chan - cmd.chanlist_len = self.IO.nchan - self.cmd = cmd - self.test_cmd(max_passes=3) - def test_cmd(self, max_passes=1) : - very_verbose = False - i = 0 - rc = 0 - if very_verbose : - print "Testing command:" - print self - while i < max_passes : - rc = self.IO._comedi.comedi_command_test(self.IO.dev, self.cmd) - if (rc == 0) : - break - if VERBOSE or very_verbose : - print "test pass %d, %s" % (i, _cmdtest_message[rc]) - i += 1 - if (VERBOSE or very_verbose) and i < max_passes : - print "Passing command:\n", self - if i >= max_passes : - print "Failing command (%d):\n" % rc, self - raise SimAioError, "Invalid command: %s" % _cmdtest_message[rc] - def execute(self) : - if VERBOSE : - print "Loading %s command" % self.cmdTypeString - rc = self.IO._comedi.comedi_command(self.IO.dev, self.cmd) - if rc < 0 : - self.IO._comedi.comedi_perror("comedi_command") - raise SimAioError, "Error executing %s command %d" % (self.cmdTypeString, rc) - def _cmdsrc(self, source) : - str = "" - if source & c.TRIG_NONE : str += "none|" - if source & c.TRIG_NOW : str += "now|" - if source & c.TRIG_FOLLOW : str += "follow|" - if source & c.TRIG_TIME : str += "time|" - if source & c.TRIG_TIMER : str += "timer|" - if source & c.TRIG_COUNT : str += "count|" - if source & c.TRIG_EXT : str += "ext|" - if source & c.TRIG_INT : str += "int|" - if source & c.TRIG_OTHER : str += "other|" - return str - def __str__(self) : - str = "Command on %s (%s):\n" % (self.IO, self.cmdTypeString) - str += "subdevice: \t%d\n" % self.cmd.subdev - str += "flags: \t0x%x\n" % self.cmd.flags - str += "start: \t" - str += self._cmdsrc(self.cmd.start_src) - str += "\t%d\n" % self.cmd.start_arg - str += "scan_begin:\t" - str += self._cmdsrc(self.cmd.scan_begin_src) - str += "\t%d\n" % self.cmd.scan_begin_arg - str += "convert: \t" - str += self._cmdsrc(self.cmd.convert_src) - str += "\t%d\n" % self.cmd.convert_arg - str += "scan_end: \t" - str += self._cmdsrc(self.cmd.scan_end_src) - str += "\t%d\n" % self.cmd.scan_end_arg - str += "stop: \t" - str += self._cmdsrc(self.cmd.stop_src) - str += "\t%d" % self.cmd.stop_arg - return str - -class AIO (object) : - """Control a simultaneous analog input/output (AIO) device using - Comedi drivers. - - The AIO device is modeled as being in one of the following states: - - Open Device file has been opened, various one-off setup - tasks completed. - Initialized Any previous activity is complete, ready for a new - task - Setup New task assigned. - Armed The output task is "triggered" (see below) - Read The input task is triggered, and input read in - Closed - Transitions between these states may be achieved with class methods - open, __init__ - through Open to Initialized - close Any to Closed - setup Initialized to Setup - arm Setup to Armed - start_read Armed to Read - reset Setup, Armed, or Read to Initialized - - There are two triggering methods set by the module global - AO_TRIGGERS_OFF_AI_START - When this global is true, the output and input will start on the - exactly the same clock tick (in this case the output "trigger" - when "Arming" just primes the output to start when the input start - is signaled). However, this functionality at the moment depends - on your having a National Instruments card with a DAQ-STC module - controling the timing (e.g. E series) and a patched version of - ni_mio_common.c in your Comedi kernel. If you do not have an - appropriate card, you will either have to implement an appropriate - method for your card, or set the global to false, in which case - the IO synchronicity depends on the synchronicity of the AO and AI - software triggers. - """ - def __init__(self, filename="/dev/comedi0", - in_subdevice=-1, in_chan=(0,), in_aref=(0,), in_range=(0,), - out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,), - buffsize=32768) : - """inputs: - filename: comedi device file for your device ("/dev/comedi0"). - And then for both input and output (in_* and out_*): - subdevice: the analog output subdevice (-1 for autodetect) - values include - comedi.COMEDI_SUBD_DI - comedi.COMEDI_SUBD_DO - comedi.COMEDI_SUBD_DIO - comedi.COMEDI_SUBD_AI - comedi.COMEDI_SUBD_AO - chan: an iterable of the channels you wish to control ((0,1,2,3)) - aref: the analog reference for these channels (comedi.AREF_GROUND) - values include - comedi.AREF_GROUND - comedi.AREF_COMMON - comedi.AREF_DIFF - comedi.AREF_OTHER - range: the range for these channels (0) - """ - self._comedi = c - self._filename = filename - assert buffsize > 0 - assert buffsize % 2 == 0, "buffsize = %d is not even" % buffsize - self.buffsize = buffsize - # the next section is much like the open() method below, - # but in this one we set up all the extra details associated - # with the AO and AI structures - self.dev = self._comedi.comedi_open(self._filename) - self._fd = self._comedi.comedi_fileno(self.dev) - if VERBOSE : - print "Opened %s on fd %d" % (self._filename, self._fd) - self.AI = common.PyComediIO(filename=self._filename, subdevice=in_subdevice, devtype=c.COMEDI_SUBD_AI, chan=in_chan, aref=in_aref, range=in_range, output=False, dev=self.dev) - self.AO = common.PyComediIO(filename=self._filename, subdevice=out_subdevice, devtype=c.COMEDI_SUBD_AO, chan=out_chan, aref=out_aref, range=out_range, output=True, dev=self.dev) - self.state = "Open" - self._icmd = cmd(self.AI) - self._ocmd = cmd(self.AO) - self.state = "Initialized" - def __del__(self) : - self.close() - def close(self) : - if self.state != "Closed" : - self.reset(force=True) - rc = self._comedi.comedi_close(self.dev) - if rc < 0 : - self._comedi.comedi_perror("comedi_close") - raise SimAioError, "Cannot close %s" % self._filename - if VERBOSE : - print "Closed %s on fd %d" % (self._filename, self._fd) - self.AI.fakeClose() - self.AO.fakeClose() - self.dev = None - self._fd = None - self.state = "Closed" - def open(self) : - if self.state != "Closed" : - raise SimAioError, "Invalid state %s" % self.state - self.dev = self._comedi.comedi_open(self._filename) - self._fd = self._comedi.comedi_fileno(self.dev) - if VERBOSE : - print "Opened %s on fd %d" % (self._filename, self._fd) - self.AI.fakeOpen(self.dev) - self.AO.fakeOpen(self.dev) - self.state = "Open" - self._icmd = cmd(self.AI) - self._ocmd = cmd(self.AO) - self.state = "Initialized" - def genBuffer(self, nsamp, nchan=1, value=0) : - return array([value]*nsamp*nchan, dtype=int16_rw.DATA_T) - def setup(self, nsamps=None, freq=1.0, out_buffer=None) : - if self.state != "Initialized" : - raise SimAioError, "Invalid state %s" % self.state - if out_buffer == None : # read-only command - assert self.AO == None - assert nsamps > 1 - if nsamps == None : - assert len(out_buffer) % self.AO.nchan == 0 - nsamps = int(len(out_buffer) / self.AO.nchan) - if type(out_buffer) != type(_example_array) : - raise SimAioError, "out_buffer must be a numpy array, not a %s" % str(type(out_buffer)) - if DONT_OUTPUT_LAST_SAMPLE_HACK : - for i in range(1, self.AO.nchan+1) : # i in [1, ... ,nchan] - if out_buffer[-i] != out_buffer[-self.AO.nchan-i] : - raise SimAioError, """To ensure that you are not suprised by the effects of the -DONT_OUTPUT_LAST_SAMPLE_HACK flag, please ensure that the last two -values samples output on each channel are the same.""" - onsamps = nsamps - 1 - else : - onsamps = nsamps - self._nsamps = nsamps - self._ocmd.cmd.scan_begin_arg = int(1e9/freq) - self._ocmd.cmd.stop_arg = onsamps - if VERBOSE : - print "Configure the board (%d ns per scan, %d samps, %g Hz)" % (self._icmd.cmd.scan_begin_arg, self._icmd.cmd.stop_arg, 1e9/self._ocmd.cmd.scan_begin_arg) - self._obuffer = out_buffer - self._onremain = nsamps - self._ocmd.test_cmd(max_passes=2) - self._ocmd.execute() - self._icmd.cmd.scan_begin_arg = self._ocmd.cmd.scan_begin_arg - self._icmd.cmd.stop_arg = nsamps - self._icmd.test_cmd() - self._inremain = nsamps - self._icmd.execute() - nsamps = min(self._onremain, self.buffsize) - offset = 0 - if VERBOSE : - print "Write %d output samples to the card" % (nsamps*self.AO.nchan) - rc = int16_rw.write_samples(self._fd, out_buffer, offset, nsamps*self.AO.nchan, 1) - if rc != nsamps*self.AO.nchan : - raise SimAioError, "Error %d writing output buffer\n" % rc - self._onremain -= nsamps - self.state = "Setup" - def arm(self) : - if self.state != "Setup" : - raise SimAioError, "Invalid state %s" % self.state - if VERBOSE : - print "Arm the analog ouptut" - self._comedi_internal_trigger(self.AO.subdev) - self.state = "Armed" - def start_read(self, in_buffer) : - if self.state != "Armed" : - raise SimAioError, "Invalid state %s" % self.state - if len(in_buffer) < self._nsamps * self.AI.nchan : - raise SimAioError, "in_buffer not long enough (size %d < required %d)" \ - % (len(in_buffer), self._nsamps * self.AI.nchan) - - if VERBOSE : - print "Start the run" - self._comedi_internal_trigger(self.AI.subdev) - self.state = "Read" - while self._inremain > 0 : - # read half a buffer - nsamps = min(self._inremain, self.buffsize/2) - offset = (self._nsamps - self._inremain) * self.AI.nchan - if VERBOSE : - print "Read %d input samples from the card" % (nsamps*self.i_nchan) - rc = int16_rw.read_samples(self._fd, in_buffer, offset, nsamps*self.AI.nchan, 20) - if rc != nsamps*self.AI.nchan : - raise SimAioError, "Error %d reading input buffer\n" % rc - self._inremain -= nsamps - # write half a buffer - nsamps = min(self._onremain, self.buffsize/2) - if nsamps > 0 : - offset = (self._nsamps - self._onremain) * self.AO.nchan - if VERBOSE : - print "Write %d output samples to the card" % (nsamps*self.AO.nchan) - rc = int16_rw.write_samples(self._fd, self._obuffer, offset, nsamps*self.AO.nchan, 20) - if rc != nsamps*self.AO.nchan : - raise SimAioError, "Error %d writing output buffer\n" % rc - self._onremain -= nsamps - def _comedi_internal_trigger(self, subdevice) : - data = self._comedi.chanlist(1) # by luck, data is an array of lsampl_t (unsigned ints), as is chanlist - insn = self._comedi.comedi_insn_struct() - insn.insn = self._comedi.INSN_INTTRIG - insn.subdev = subdevice - insn.data = data - insn.n = 1 - data[0] = 0 - rc = self._comedi.comedi_do_insn(self.dev, insn) - def reset(self, force=False) : - if VERBOSE : - print "Reset the analog subdevices" - # clean up after the read - self.AO.updateFlags() - self.AI.updateFlags() - # I would expect self.AO.flags.busy() to be False by this point, - # but after a write that does not seem to be the case. - # It doesn't seem to cause any harm to cancel things anyway... - rc = self._comedi.comedi_cancel(self.dev, self.AO.subdev) - if rc < 0 : - self._comedi.comedi_perror("comedi_cancel") - raise SimAioError, "Error cleaning up output command" - rc = self._comedi.comedi_cancel(self.dev, self.AI.subdev) - if rc < 0 : - self._comedi.comedi_perror("comedi_cancel") - raise SimAioError, "Error cleaning up input command" - self.state = "Initialized" - - -# define the test suite -# verbose -# 0 - no output -# 1 - print test names -# 2 - print test results -# 3 - print some details -# 4 - print lots of details - -def _test_AIO(aio=None, start_wait=0, verbose=0) : - if verbose >= 1 : - print "_test_AIO(start_wait = %g)" % start_wait - nsamps = 20 - out_data = aio.genBuffer(nsamps) - in_data = aio.genBuffer(nsamps) - midpoint = int(aio.AO.maxdata[0]/2) - bitrange = float(midpoint/2) - for i in range(nsamps) : - out_data[i] = int(midpoint+bitrange*sin(2*pi*i/float(nsamps))) - out_data[-2] = out_data[-1] = midpoint - aio.setup(freq=1000, out_buffer=out_data) - aio.arm() - sleep(start_wait) - aio.start_read(in_data) - aio.reset() - if verbose >= 4 : - print "out_data:\n", out_data - print "in_data:\n", in_data - print "residual:\n[", - for i, o in zip(in_data, out_data) : - print int(i)-int(o), - print "]" - return (out_data, in_data) - -def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=0) : - if verbose >= 1 : - print "_repeat_aio_test()" - grads = array([0]*num_tests, dtype=float32) # test input with `wrong' type - good = 0 - bad = 0 - good_run = 0 - good_run_arr = [] - for i in range(num_tests) : - out_data, in_data = _test_AIO(aio, start_wait) - gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data) - grads[i] = gradient - if verbose >= 4 : - print "wait %2d, run %2d, gradient %g" % (start_wait, i, gradient) - if gradient < .7 : - bad += 1 - good_run_arr.append(good_run) - good_run = 0 - else : - good += 1 - good_run += 1 - good_run_arr.append(good_run) - fail_rate = (float(bad)/float(good+bad))*100.0 - if verbose >= 2 : - print "failure rate %g%% in %d runs" % (fail_rate, num_tests) - call = 'echo "' - for num in good_run_arr : - call += "%d " % num - call += '" | stem_leaf 2' - print "good run stem and leaf:" - system(call) - return fail_rate - -def _test_AIO_multi_chan(aio=None, start_wait=0, verbose=0) : - from sys import stdout - if verbose >= 1 : - print "_test_AIO_multi_chan(start_wait = %g)" % start_wait - nsamps = 100 - out_data = aio.genBuffer(nsamps, aio.AO.nchan) - in_data = aio.genBuffer(nsamps, aio.AI.nchan) - # set up interleaved data - midpoint = int(aio.AO.maxdata[0]/2) - bitrange = float(midpoint/2) - for i in range(nsamps) : - out_data[i*aio.AO.nchan] = int(midpoint+bitrange*sin(2*pi*i/float(nsamps))) - for j in range(1, aio.AO.nchan) : - out_data[i*aio.AO.nchan + j] = 0 - if DONT_OUTPUT_LAST_SAMPLE_HACK : - for ind in [-1,-1-aio.AO.nchan] : - for chan in range(aio.AO.nchan) : - out_data[ind-chan] = midpoint - aio.setup(freq=1000, out_buffer=out_data) - aio.arm() - sleep(start_wait) - aio.start_read(in_data) - aio.reset() - #fid = file('/tmp/comedi_test.o', 'w') - fid = stdout - if verbose >= 4 : - print >> fid, "#", - for j in range(aio.AO.nchan) : - print >> fid, "%s\t" % aio.AO.chan[j], - for j in range(aio.AI.nchan) : - print >> fid, "%s\t" % aio.AI.chan[j], - print "" - for i in range(nsamps) : - for j in range(aio.AO.nchan) : - print >> fid, "%s\t" % out_data[i*aio.AO.nchan+j], - for j in range(aio.AI.nchan) : - print >> fid, "%s\t" % in_data[i*aio.AI.nchan+j], - print >> fid, "" - return (out_data, in_data) - -def _test_big_bufs(aio=None, freq=100e3, verbose=False) : - if verbose >= 1 : - print "_test_big_bufs()" - if aio == None : - our_aio = True - aio = AIO(in_chan=(1,), out_chan=(0,)) - else : - our_aio = False - nsamps = int(100e3) - out_data = aio.genBuffer(nsamps, aio.AO.nchan) - midpoint = int(aio.AO.maxdata[0]/2) - bitrange = float(midpoint/2) - for i in range(nsamps) : - out_data[i] = int(sin(2*pi*i/float(nsamps))*bitrange) + midpoint - if DONT_OUTPUT_LAST_SAMPLE_HACK : - out_data[-2] = out_data[-1] = midpoint - in_data = aio.genBuffer(nsamps, aio.AO.nchan) - aio.setup(freq=freq, out_buffer=out_data) - aio.arm() - aio.start_read(in_data) - aio.reset() - if our_aio : - aio.close() - del(aio) - -def _test_output_persistence(freq=100, verbose=False) : - import single_aio - if verbose >= 1 : - print "_test_output_persistence()" - aio = AIO(in_chan=(0,), out_chan=(0,)) - aio.close() - ai = single_aio.AI(chan=(0,)) - ai.close() - - def simult_set_voltage(aio, range_fraction=None, physical_value=None, freq=freq, verbose=False) : - "use either range_fraction or physical_value, not both." - aio.open() - if range_fraction != None : - assert physical_value == None - assert range_fraction >= 0 and range_fraction <= 1 - out_val = int(range_fraction*aio.AO.maxdata[0]) - physical_value = aio.AO.comedi_to_phys(chan_index=0, comedi=out_val, useNAN=False) - else : - assert physical_value != None - out_val = aio.AO.phys_to_comedi(chan_index=0, phys=physical_value) - if verbose >= 3 : - print "Output : %d = %g V" % (out_val, physical_value) - nsamps = int(int(freq)) - out_data = aio.genBuffer(nsamps, aio.AO.nchan, value=out_val) - in_data = aio.genBuffer(nsamps, aio.AO.nchan) - aio.setup(freq=freq, out_buffer=out_data) - aio.arm() - aio.start_read(in_data) - aio.reset() - # by this point the output returns to 0 V when - # DONT_OUTPUT_LATH_SAMPLE_HACK == False - time.sleep(5) - aio.close() - if verbose >= 4 : - print "Output complete" - print in_data - return physical_value - def single_get_voltage(ai, verbose=False) : - ai.open() - in_val = ai.read()[0] - ai.close() - in_phys = ai.comedi_to_phys(chan_index=0, comedi=in_val, useNAN=False) - if verbose >= 3 : - print "Input : %d = %g V" % (in_val, in_phys) - return in_phys - def tp(aio, ai, range_fraction, verbose=False) : - out_phys = simult_set_voltage(aio, range_fraction=range_fraction, verbose=verbose) - - # It helps me to play a sound so I know where the test is - # while confirming the output on an oscilliscope. - #system("aplay /home/wking/Music/system/sonar.wav") - - time.sleep(1) - in_phys = single_get_voltage(ai, verbose) - assert abs((in_phys-out_phys)/out_phys) < 0.1, "Output %g V, but input %g V" % (out_phys, in_phys) - - tp(aio,ai,0,verbose) - tp(aio,ai,1,verbose) - simult_set_voltage(aio,physical_value=0.0,verbose=verbose) - -def test(verbose=2) : - aio = AIO(in_chan=(0,), out_chan=(0,)) - _test_AIO(aio, start_wait = 0, verbose=verbose) - _test_AIO(aio, start_wait = 0.5, verbose=verbose) - aio.close() - aio.open() - _repeat_aio_test(aio, num_tests=100, start_wait=0, verbose=verbose) - _test_big_bufs(aio, verbose=verbose) - aio.close() - - aiom = AIO(in_chan=(0,1,2,3), out_chan=(0,1)) - _test_AIO_multi_chan(aiom, start_wait = 0, verbose=verbose) - del(aiom) - _test_output_persistence(verbose=verbose) - -if __name__ == "__main__" : - test() diff --git a/pycomedi/single_aio.py b/pycomedi/single_aio.py deleted file mode 100644 index 434eff2..0000000 --- a/pycomedi/single_aio.py +++ /dev/null @@ -1,173 +0,0 @@ -# Use Comedi drivers for single-shot analog input/output -# Copyright (C) 2007-2010 W. Trevor King -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -"""Use Comedi drivers for single-shot analog input/output - -Being single-shot implementations, read/writes will be software timed, -so this module would not be a good choice if you need millisecond -resolution. However, it does provide a simple and robust way to -generate/aquire signals at 1 second and greater timescales. -""" - -import comedi as c -import common - -VERBOSE_DEBUG = False - -class SngAioError (common.PycomediError) : - "Single point Analog IO error" - pass - -class AI (common.PyComediSingleIO) : - def __init__(self, **kwargs) : - """inputs: - filename: comedi device file for your device ["/dev/comedi0"]. - subdevice: the analog input subdevice (-1 for autodetect) - chan: an iterable of the channels you wish to control [(0,1,2,3)] - aref: the analog references for these channels [(comedi.AREF_GROUND,)] - values include: - comedi.AREF_GROUND - comedi.AREF_COMMON - comedi.AREF_DIFF - comedi.AREF_OTHER - range: the ranges for these channels [(0,)] - """ - common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AI, output=False, **kwargs) - def read(self) : - """outputs: - data: a list of read data values in Comedi units - """ - data = range(self.nchan) - for i in range(self.nchan) : - data[i] = self.read_chan_index(i) - #print "Read %s, got %s" % (str(self.chan), str(data)) - return data - -def _test_AI() : - ai = AI() - print "read ", ai.read() - print "read ", ai.read() - print "read ", ai.read() - print "read ", ai.read() - ai.close() - print "ai success" - -class AO (common.PyComediSingleIO) : - def __init__(self, **kwargs) : - """inputs: - filename: comedi device file for your device ["/dev/comedi0"]. - subdevice: the analog output subdevice [-1 for autodetect] - chan: an iterable of the channels you wish to control [(0,1,2,3)] - aref: the analog references for these channels [(comedi.AREF_GROUND,)] - values include: - comedi.AREF_GROUND - comedi.AREF_COMMON - comedi.AREF_DIFF - comedi.AREF_OTHER - range: the ranges for these channels [(0,)] - """ - common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AO, output=True, **kwargs) - def write(self, data) : - if len(data) != self.nchan : - raise SngAioError, "data length %d != the number of channels (%d)" % (len(data), self.nchan) - for i in range(self.nchan) : - self.write_chan_index(i, data[i]) - -def _test_AO() : - ao = AO(chan=(0,1)) - ao.write([0,0]) - ao.write([3000,3000]) - ao.write([0,0]) - ao.close() - print "ao success" - -def _fit_with_residual(out_data, in_data, channel) : - "Fit in_data(out_data) to a straight line & return residual" - from scipy.stats import linregress - from numpy import zeros - gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data) - print "y = %g + %g x" % (intercept, gradient) - print "r = ", r_value # correlation coefficient = covariance / (std_dev_x*std_dev_y) - print "p = ", p_value # probablility of measuring this ?slope? for non-correlated, normally-distruibuted data - print "err = ", std_err # root mean sqared error of best fit - if gradient < .7 or p_value > 0.05 : - raise SngAioError, "Out channel %d != in channel %d" % (channel, channel) - residual = zeros((len(out_data),)) - for i in range(len(out_data)) : - pred_y = intercept + gradient * out_data[i] - residual[i] = in_data[i] - pred_y - return residual - -def plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1) : - try : - from pylab import plot, show, subplot, xlabel, ylabel - subplot(311) - plot(out_data0, in_data0, 'r.-', out_data1, in_data1, 'b.') - ylabel("Read") - xlabel("Wrote") - if residual0 != None and residual1 != None: - subplot(312) - plot(out_data0, residual0, 'r.', out_data1, residual1, 'b.') - ylabel("Residual") - xlabel("Wrote") - subplot(313) - plot(in_data0, 'r.', in_data1, 'b.') - xlabel("Read") - show() # if interactive mode is off... - #raw_input("Press enter to continue") # otherwise, pause - except ImportError : - pass # ignore plot erros - -def _test_AIO() : - "Test AO and AI by cabling AO0 into AI0 and sweeping voltage" - from scipy.stats import linregress - from numpy import linspace, zeros - ao = AO(chan=(0,1)) - ai = AI(chan=(0,1)) - start = 0.1 * ao.maxdata[0] - stop = 0.9 * ao.maxdata[0] - points = 10 - out_data0 = linspace(start, stop, points) - out_data1 = linspace(stop, start, points) - in_data0 = zeros((points,)) - in_data1 = zeros((points,)) - for i in range(points) : - ao.write([int(out_data0[i]), int(out_data1[i])]) - id = ai.read() - in_data0[i] = id[0] - in_data1[i] = id[1] - ai.close() - ao.close() - if VERBOSE_DEBUG : - plot_data(out_data0, in_data0, None, out_data1, in_data1, None) - residual0 = _fit_with_residual(out_data0, in_data0, 0) - residual1 = _fit_with_residual(out_data1, in_data1, 1) - if VERBOSE_DEBUG : - plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1) - for i in range(points) : - if abs(residual0[i]) > 10 : # HACK, hardcoded maximum nonlinearity - raise Exception, "Input 0, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data0[i], in_data0[i], residual0[i]) - if abs(residual1[i]) > 10 : # HACK, hardcoded maximum nonlinearity - raise Exception, "Input 1, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data1[i], in_data1[i], residual1[i]) - print "aio success" - -def test() : - _test_AI() - _test_AO() - _test_AIO() - -if __name__ == "__main__" : - test() diff --git a/pycomedi/single_dio.py b/pycomedi/single_dio.py deleted file mode 100644 index 57cafaa..0000000 --- a/pycomedi/single_dio.py +++ /dev/null @@ -1,119 +0,0 @@ -# Use Comedi drivers for single-shot digital input/output -# Copyright (C) 2007-2010 W. Trevor King -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -"""Use Comedi drivers for single-shot digital input/output - -Being single-shot implementations, read/writes will be software timed, -so this module would not be a good choice if you need millisecond -resolution. However, it does provide a simple and robust way to -generate/aquire signals at 1 second and greater timescales. -""" - -import comedi as c -import common - - -class SngDioError (common.PycomediError): - "Digital IO error" - pass - -class DIO_port (common.PyComediSingleIO) : - def __init__(self, output=True, **kwargs) : - """inputs: - filename: comedi device file for your device ["/dev/comedi0"]. - subdevice: the digital IO subdevice [-1 for autodetect] - chan: an iterable of the channels you wish to control [(0,1,2,3)] - aref: the analog references for these channels [(comedi.AREF_GROUND,)] - range: the ranges for these channels [(0,)] - output: whether to use the lines as output (vs input) (True) - """ - common.PyComediSingleIO.__init__(self, devtype=c.COMEDI_SUBD_DIO, output=output, **kwargs) - if self.output : - self.set_to_output() - else : - self.set_to_input() - def set_to_output(self) : - "switch all the channels associated with this object to be outputs" - for chan in self.chan : - rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_OUTPUT) - if rc != 1 : # yes, comedi_dio_config returns 1 on success, -1 on failure, as of comedilib-0.8.1 - self._comedi.comedi_perror("comedi_dio_config") - raise SngDioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_OUTPUT, rc) - self.output = True - def set_to_input(self) : - "switch all the channels associated with this object to be inputs" - for chan in self.chan : - rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_INPUT) - if rc != 1 : - self._comedi.comedi_perror("comedi_dio_config") - raise SngDioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_INPUT, rc) - self.output = False - def write_port(self, data) : - """inputs: - data: decimal number representing data to write - For example, setting data=6 will write - 0 to chan[0] - 1 to chan[1] - 1 to chan[2] - 0 to higher channels... - """ - for i in range(self.nchan) : - self.write_chan_index(i, (data >> i) % 2) - def read_port(self) : - """outputs: - data: decimal number representing data read - For example, data=6 represents - 0 on chan[0] - 1 on chan[1] - 1 on chan[2] - 0 on higher channels... - """ - data = 0 - for i in range(self.nchan) : - data += self.read_chan_index(i) << i - return data - -class DO_port (DIO_port) : - "A simple wrapper on dio_obj to make writing easier" - def __call__(self, data) : - self.write_port(data) - -def _test_DIO_port() : - d = DIO_port() - d.set_to_output() - d.write_chan_index(0, 1) - d.write_chan_index(0, 0) - d.write_port(7) - d.set_to_input() - data = d.read_chan_index(0) - print "channel %d is %d" % (d.chan[0], data) - data = d.read_port() - print "port value is %d" % data - print "dio_obj success" - d.close() - -def _test_DO_port() : - p = DO_port() - for data in [0, 1, 2, 3, 4, 5, 6, 7] : - p(data) - p.close() - print "write_dig_port success" - -def test() : - _test_DIO_port() - _test_DO_port() - -if __name__ == "__main__" : - test() diff --git a/pycomedi/utility.py b/pycomedi/utility.py new file mode 100644 index 0000000..41174e2 --- /dev/null +++ b/pycomedi/utility.py @@ -0,0 +1,422 @@ +# Copyright (C) 2010 W. Trevor King +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"Useful utility functions and classes." + +import array as _array +import mmap as _mmap +import os as _os +import threading as _threading +import time as _time + +import comedi as _comedi +import numpy as _numpy + +from . import LOG as _LOG +from . import classes as _classes +from . import constants as _constants + + +# types from comedi.h +sampl = _numpy.uint16 +lsampl = _numpy.uint32 +sampl_typecode = 'H' +lsampl_typecode = 'L' + +def subdevice_dtype(subdevice): + "Return the appropriate `numpy.dtype` based on subdevice flags" + if subdevice.get_flags().lsampl: + return lsampl + return sampl + +def subdevice_typecode(subdevice): + "Return the appropriate `array` type based on subdevice flags" + if subdevice.get_flags().lsampl: + return lsampl_typecode + return sampl_typecode + +def sampl_array(list): + "Convert a Python list/tuple into a `sampl` array" + ret = _comedi.sampl_array(len(list)) + for i,x in enumerate(list): + ret[i] = x + return ret + #return _array.array(sampl_typecode, list) + +def lsampl_array(list): + "Convert a Python list/tuple into an `lsampl` array" + ret = _comedi.lsampl_array(len(list)) + for i,x in enumerate(list): + ret[i] = x + return ret + #return _array.array(lsampl_typecode, list) + +def set_insn_chanspec(insn, channels): + "Configure `insn.chanspec` from the list `channels`" + chanlist = _classes.Chanlist(channels) + insn.chanspec = chanlist.chanlist() + +def set_insn_data(insn, data): + "Configure `insn.data` and `insn.n` from the iterable `data`" + insn.n = len(data) + insn.data = lsampl_array(data) + +def set_cmd_chanlist(cmd, channels): + "Configure `cmd.chanlist` and `cmd.chanlist_len` from the list `channels`" + chanlist = _classes.Chanlist(channels) + cmd.chanlist = chanlist.chanlist() + cmd.chanlist_len = len(chanlist) + +def set_cmd_data(cmd, data): + "Configure `cmd.data` and `cmd.data_len` from the iterable `data`" + cmd.data = sampl_array(data) + cmd.data_len = len(data) + +def inttrig_insn(subdevice): + """Setup an internal trigger for a given `subdevice` + + From the Comedi docs `section 4.4`_ (Instruction for internal + triggering): + + This special instruction has `INSN_INTTRIG` as the insn flag in + its instruction data structure. Its execution causes an internal + triggering event. This event can, for example, cause the device + driver to start a conversion, or to stop an ongoing + acquisition. The exact meaning of the triggering depends on the + card and its particular driver. + + The `data[0]` field of the `INSN_INTTRIG` instruction is + reserved for future use, and should be set to `0`. + + From the comedi source (`comedi.comedi_fops.c:parse_insn()`), we + see that the `chanspec` attribute is ignored for `INSN_INTTRIG`, + so we don't bother setting it here. + + .. _section 4.4: http://www.comedi.org/doc/x621.html + """ + insn = subdevice.insn() + insn.insn = _constants.INSN.inttrig.value + set_insn_data(insn, [0]) + return insn + + +def _builtin_array(array): + "`array` is an array from the builtin :mod:`array` module" + return isinstance(array, _array.array) + + +class _ReadWriteThread (_threading.Thread): + "Base class for all reader/writer threads" + def __init__(self, subdevice, buffer, name=None): + if name == None: + name = '<%s subdevice %d>' % ( + self.__class__.__name__, subdevice._index) + self.subdevice = subdevice + self.buffer = buffer + super(_ReadWriteThread, self).__init__(name=name) + + def _file(self): + """File for reading/writing data to `.subdevice` + + This file may use the internal comedi fileno, so do not close + it when you are finished. The file will eventually be closed + when the backing `Device` instance is closed. + """ + return self.subdevice._device.file + + +class Reader (_ReadWriteThread): + """`read()`-based reader + + Examples + -------- + + Setup a temporary data file for testing. + + >>> from os import close, remove + >>> from tempfile import mkstemp + >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-') + >>> f = _os.fdopen(fd, 'r+') + >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16) + >>> buf.tofile(t) + + Override the default `Reader` methods for our dummy subdevice. + + >>> class TestReader (Reader): + ... def _file(self): + ... return f + + Run the test reader. + + >>> rbuf = 0*buf + >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest') + >>> r.start() + >>> r.join() + + The input buffer is updated in place, and is also available as the + reader's `buffer` attribute. + + >>> rbuf + array([[ 0, 10], + [ 1, 11], + [ 2, 12]], dtype=uint16) + >>> r.buffer + array([[ 0, 10], + [ 1, 11], + [ 2, 12]], dtype=uint16) + + While `numpy` arrays make multi-channel indexing easy, they do + require an external library. For single-channel input, the + `array` module is sufficient. + + >>> f.seek(0) + >>> rbuf = _array.array('H', [0]*buf.size) + >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest') + >>> r.start() + >>> r.join() + >>> rbuf + array('H', [0, 10, 1, 11, 2, 12]) + >>> r.buffer + array('H', [0, 10, 1, 11, 2, 12]) + + Cleanup the temporary data file. + + >>> f.close() # no need for `close(fd)` + >>> remove(t) + """ + def run(self): + builtin_array = _builtin_array(self.buffer) + f = self._file() + if builtin_array: + # TODO: read into already allocated memory (somehow) + size = len(self.buffer) + a = _array.array(self.buffer.typecode) + a.fromfile(f, size) + self.buffer[:] = a + else: # numpy.ndarray + # TODO: read into already allocated memory (somehow) + buf = _numpy.fromfile( + f, dtype=self.buffer.dtype, count=self.buffer.size) + a = _numpy.ndarray( + shape=self.buffer.shape, dtype=self.buffer.dtype, + buffer=buf) + self.buffer[:] = a + + +class Writer (_ReadWriteThread): + """`write()`-based writer + + Examples + -------- + + Setup a temporary data file for testing. + + >>> from os import close, remove + >>> from tempfile import mkstemp + >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-') + >>> f = _os.fdopen(fd, 'r+') + >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16) + + Override the default `Writer` methods for our dummy subdevice. + + >>> class TestWriter (Writer): + ... def _file(self): + ... return f + + Run the test writer. + + >>> r = TestWriter(subdevice=None, buffer=buf, name='Writer-doctest') + >>> r.start() + >>> r.join() + >>> a = _array.array('H') + >>> a.fromfile(open(t, 'rb'), buf.size) + >>> a + array('H', [0, 10, 1, 11, 2, 12]) + + While `numpy` arrays make multi-channel indexing easy, they do + require an external library. For single-channel input, the + `array` module is sufficient. + + >>> f.seek(0) + >>> buf = _array.array('H', [2*x for x in buf.flat]) + >>> r = TestWriter(subdevice=None, buffer=buf, name='Writer-doctest') + >>> r.start() + >>> r.join() + >>> a = _array.array('H') + >>> a.fromfile(open(t, 'rb'), len(buf)) + >>> a + array('H', [0, 20, 2, 22, 4, 24]) + + Cleanup the temporary data file. + + >>> f.close() # no need for `close(fd)` + >>> remove(t) + """ + def run(self): + f = self._file() + self.buffer.tofile(f) + f.flush() + + +class _MMapReadWriteThread (_ReadWriteThread): + "`mmap()`-based reader/wrtier" + def __init__(self, *args, **kwargs): + super(_MMapReadWriteThread, self).__init__(*args, **kwargs) + self._prot = None + + def _sleep_time(self, mmap_size): + "Expected seconds needed to write a tenth of the mmap buffer" + return 0 + + def run(self): + # all sizes measured in bytes + builtin_array = _builtin_array(self.buffer) + mmap_size = self._mmap_size() + sleep_time = self._sleep_time(mmap_size) + mmap = _mmap.mmap( + self._fileno(), mmap_size, self._prot, _mmap.MAP_SHARED) + mmap_offset = 0 + buffer_offset = 0 + if builtin_array: + remaining = len(self.buffer) + else: # numpy.ndarray + remaining = self.buffer.size + remaining *= self.buffer.itemsize + action = self._initial_action( + mmap, buffer_offset, remaining, + mmap_size, action_bytes=mmap_size) + buffer_offset += action + remaining -= action + + while remaining > 0: + action_bytes = self._action_bytes() + if action_bytes > 0: + action,mmap_offset = self._act( + mmap, mmap_offset, buffer_offset, remaining, + mmap_size, action_bytes=action_bytes, + builtin_array=builtin_array) + buffer_offset += action + remaining -= action + else: + _time.sleep(sleep_time) + + def _act(self, mmap, mmap_offset, buffer_offset, remaining, mmap_size, + action_bytes=None, builtin_array=None): + if action_bytes == None: + action_bytes = self.subdevice.get_buffer_contents() + if mmap_offset + action_bytes >= mmap_size - 1: + action_bytes = mmap_size - mmap_offset + wrap = True + else: + wrap = False + action_size = min(action_bytes, remaining) + self._mmap_action(mmap, buffer_offset, action_size, builtin_array) + mmap.flush() # (offset, size), necessary? calls msync? + self._mark_action(action_size) + if wrap: + mmap.seek(0) + mmap_offset = 0 + return action_size, mmap_offset + + # pull out subdevice calls for easier testing + + def _mmap_size(self): + return self.subdevice.get_buffer_size() + + def _fileno(self): + return self.subdevice._device.fileno() + + def _action_bytes(self): + return self.subdevice.get_buffer_contents() + + # hooks for subclasses + + def _initial_action(self, mmap, buffer_offset, remaining, mmap_size, + action_bytes=None): + return 0 + + def _mmap_action(self, mmap, offset, size): + raise NotImplementedError() + + def _mark_action(self, size): + raise NotImplementedError() + + +# MMap classes have more subdevice-based methods to override +_mmap_docstring_overrides = '\n ... '.join([ + 'def _mmap_size(self):', + ' from os.path import getsize', + ' return getsize(t)', + 'def _fileno(self):', + ' return fd', + 'def _action_bytes(self):', + ' return 4', + 'def _mark_action(self, size):', + ' pass', + 'def _file', + ]) + + +class MMapReader (_MMapReadWriteThread): + __doc__ = Reader.__doc__ + for _from,_to in [ + # convert class and function names + ('`read()`', '`mmap()`'), + ('Writer', 'MMapWriter'), + ('def _file', _mmap_docstring_overrides)]: + __doc__ = __doc__.replace(_from, _to) + + def __init__(self, *args, **kwargs): + super(MMapReader, self).__init__(*args, **kwargs) + self._prot = _mmap.PROT_READ + + def _mmap_action(self, mmap, offset, size, builtin_array): + offset /= self.buffer.itemsize + s = size / self.buffer.itemsize + if builtin_array: + # TODO: read into already allocated memory (somehow) + a = _array.array(self.buffer.typecode) + a.fromstring(mmap.read(size)) + self.buffer[offset:offset+s] = a + else: # numpy.ndarray + # TODO: read into already allocated memory (somehow) + a = _numpy.fromstring(mmap.read(size), dtype=self.buffer.dtype) + self.buffer.flat[offset:offset+s] = a + + def _mark_action(self, size): + self.subdevice.mark_buffer_read(size) + + +class MMapWriter (_MMapReadWriteThread): + __doc__ = Writer.__doc__ + for _from,_to in [ + ('`write()`', '`mmap()`'), + ('Reader', 'MMapReader'), + ('def _file', _mmap_docstring_overrides)]: + __doc__ = __doc__.replace(_from, _to) + + def __init__(self, *args, **kwargs): + super(MMapWriter, self).__init__(*args, **kwargs) + self._prot = _mmap.PROT_WRITE + self.buffer = buffer(self.buffer) + + def _mmap_action(self, mmap, offset, size, builtin_array): + mmap.write(self.buffer[offset:offset+size]) + + def _mark_action(self, size): + self.subdevice.mark_buffer_written(size) + + +del _mmap_docstring_overrides diff --git a/setup.py b/setup.py index b60ce77..2f0394c 100644 --- a/setup.py +++ b/setup.py @@ -1,14 +1,13 @@ -"""PyComedi: an object-oriented interface for the Comedi drivers. +# Copyright -Requires - * Numpy (http://numpy.scipy.org/) - * Comedi (http://www.comedi.org/) -""" +"An object-oriented interface for the Comedi drivers." from distutils.core import setup +import os.path from pycomedi import __version__ + classifiers = """\ Development Status :: 2 - Pre-Alpha Intended Audience :: Developers @@ -21,19 +20,18 @@ Topic :: Scientific/Engineering Topic :: Software Development :: Libraries :: Python Modules """ -doclines = __doc__.split('\n') - +_this_dir = os.path.dirname(__file__) setup(name='pycomedi', version=__version__, maintainer='W. Trevor King', maintainer_email='wking@drexel.edu', - url='http://www.physics.drexel.edu/~wking/unfolding-disasters/pycomedi/', + url='http://www.physics.drexel.edu/~wking/unfolding-disasters/posts/pycomedi/', download_url='http://www.physics.drexel.edu/~wking/code/python/pycomedi-%s.tar.gz' % __version__, license='GNU General Public License (GPL)', platforms=['all'], - description=doclines[0], - long_description='\n'.join(doclines[2:]), + description=__doc__, + long_description=open(os.path.join(_this_dir, 'README'), 'r').read(), classifiers=filter(None, classifiers.split('\n')), packages=['pycomedi'], provides=['pycomedi'], -- 2.26.2