-include LICENSE
+include COPYING
+++ /dev/null
-.PHONY : all check dist clean
-
-all : dummy_py
-
-dummy_py : setup.py pycomedi/*.py
- python setup.py install --home=~
- echo "dummy for Makefile dependencies" > $@
-
-check : all
- python pycomedi.py
-
-dist :
- python setup.py sdist
- scp dist/pycomedi*tar.gz einstein:public_html/code/python/
-
-clean :
- python setup.py clean
- rm -rf build dist pycomedi.egg-info
- rm -f dummy_py *.pyc
-This package provides an object-oriented interface to the Comedi
+This package provides an object-oriented interface to the Comedi_
drivers. The standard Python interface bundled with Comedilib is a
-simple SWIG clone of the C interface. In PyComedi, we wrap that basic
-interface into one which is more transparent. There are simple
-wrappers for common tasks, such as single point digital and analog
-input/output (the single_dio and single_aio modules), as well as
-synchronized, multipoint analog input/output (ther simult_aio module).
+simple SWIG clone of the C interface. In pycomedi, we convert the
+functions into class methods (see `pycomedi.classes`), so you don't
+have to worry about dragging around opaque types like `comedi_t *`
+device pointers. We also bundle related constants together in
+`_Enums` and `_Flags` (see `pycomedi.constants`), to make handling
+common operations like flag manipulations simpler. Finally, there are
+a number of utility classes (see `pycomedi.utility`) to make common
+tasks like creating instructions or reading hardware-timed analog
+input easier.
-== Installation ==
-Non-Python dependencies (Debian packagename):
- easy_install (python-setuptools)
- Numpy source (python-numpy-dev)
- Comedi
+Installation
+============
-PyComedi uses `setuptools' for installation. Setuptools is basically
-an extension of the standard Python distutils package which supports
-automatic package dependency tracking. The installation procedure
-should be (on Debian-esque systems)
- # apt-get intall python-setuptools python-numpy-dev
- # easy_install -f http://www.physics.drexel.edu/~wking/code/python/ pycomedi
+Packages
+--------
-There is a Debian package of Comedi, but in order for the
-simultaneous, synchronized input/output to work you need to have both
-a DAQ card that supports it (obviously), and a Comedi driver that
-supports it. As of 2008-10-09, CVS versions of Comedi contain a patch
-supporting this behavior for National Instruments (ni.com) cards based
-on the DAQ-STC timing chip. I don't know the status of other cards or
-drivers. The Comedi modules must be compiled against your kernel
-headers, and the process is described in the Comedi section below.
+Gentoo
+~~~~~~
-There is one speedbump you might run into:
- * an outdated version of easy_install (see ez_setup.py section)
+I've packaged pycomedi for Gentoo. You need layman_ and my `wtk
+overlay`_. Install with::
-** ez_setup.py
+ # emerge -av app-portage/layman
+ # layman --add wtk
+ # emerge -av dev-python/pycomedi
-This package bundles
- http://peak.telecommunity.com/dist/ez_setup.py
-to bootstrap setuputils installation on your machine (if neccessary).
-If the bootstrapping doesn't work, you may need to install a current version
-of setuptools. On Debian-based systems `apt-get install python-setuptools'.
-Once you have *some* version of setuptools, upgrade with
- easy_install -U setuptools
+Dependencies
+------------
-For more information see
- http://peak.telecommunity.com/DevCenter/EasyInstall
- http://peak.telecommunity.com/DevCenter/setuptools#what-your-users-should-know
+If you're installing by hand or packaging pycomedi for another
+distribution, you'll need the following dependencies:
-** Comedi
+======= =================== ================ ==========================
+Package Purpose Debian_ Gentoo_
+======= =================== ================ ==========================
+NumPy_ ? python-numpy dev-python/numpy
+SciPy_ testing python-scipy sci-libs/scipy
+Comedi_ Comedilib interface python-comedilib sci-libs/comedilib [#clb]_
+nose_ testing python-nose dev-python/nose
+Cython_ Comedilib interface cython dev-python/cython
+======= =================== ================ ==========================
-TODO (write up source installation)
+.. [#clb] In the `wtk overlay`_.
+TODO: Migrate from SWIG bindings to direct calls on comedilib via Cython.
-== Usage ==
-See the tests in the various modules for simple examples.
+Installing by hand
+------------------
+Pycomedi is available as a Git_ repository::
-== Licence ==
+ $ git clone http://www.physics.drexel.edu/~wking/code/git/pycomedi.git
-This project is distributed under the GNU Public License Version 3 or greater.
-http://www.gnu.org/licenses/gpl.txt
+See the homepage_ for details. To install the checkout, run the
+standard::
-== Author ==
+ $ python setup.py install
+
+
+Usage
+=====
+
+See the examples in the `doc` directory.
+
+
+Testing
+=======
+
+Integration tests with::
+
+ $ nosetests --with-doctest --doctest-extension=txt doc
+
+Internal unit tests with::
+
+ $ nosetests --with-doctest pycomedi
+
+
+Licence
+=======
+
+This project is distributed under the `GNU General Public License
+Version 3`_ or greater.
+
+
+Author
+======
W. Trevor King
wking@drexel.edu
-Copyright 2007, 2008
+Copyright 2007-2011
+
+
+.. _Comedi: http://www.comedi.org/
+.. _layman: http://layman.sourceforge.net/
+.. _wtk overlay:
+ http://www.physics.drexel.edu/~wking/unfolding-disasters/posts/Gentoo_overlay
+.. _science overlay: http://overlays.gentoo.org/proj/science/wiki/en
+.. _Debian: http://www.debian.org/
+.. _Gentoo: http://www.gentoo.org/
+.. _NumPy: http://numpy.scipy.org/
+.. _SciPy: http://www.scipy.org/
+.. _nose: http://somethingaboutorange.com/mrl/projects/nose/
+.. _Cython: http://www.cython.org/
+.. _Git: http://git-scm.com/
+.. _homepage:
+ http://www.physics.drexel.edu/~wking/unfolding-disasters/posts/pycomedi/
+.. _GNU General Public License Version 3: http://www.gnu.org/licenses/gpl.txt
--- /dev/null
+Import required modules.
+
+>>> from pycomedi.classes import Device, DataChannel
+>>> from pycomedi.constants import SUBDEVICE_TYPE, AREF, UNIT
+
+Open a device.
+
+>>> device = Device('/dev/comedi0')
+>>> device.open()
+
+Get your I/O subdevice (alternatively, use `device.subdevice()` to
+select the subdevice directly by index).
+
+>>> subdevice = device.find_subdevice_by_type(SUBDEVICE_TYPE.ai)
+
+Generate a list of channels you wish to control.
+
+>>> channels = [subdevice.channel(i, factory=DataChannel, aref=AREF.diff)
+... for i in (0, 1, 2, 3)]
+
+Configure the channels.
+
+>>> for chan in channels:
+... chan.range = chan.find_range(unit=UNIT.volt, min=0, max=5)
+
+Read/write sequentially.
+
+>>> value = [c.data_read_delayed(nano_sec=1e3) for c in channels]
+>>> value # doctest: +SKIP
+[0, 9634, 0, 15083]
+
+TODO: convert to physical values.
+
+Close the device when you're done.
+
+>>> device.close()
+
+
+
+As a more elaborate test, we can cable AO0 into AI0 and sweep the
+voltage.
+
+>>> from numpy import linspace
+>>> from scipy.stats import linregress
+>>> device.open()
+>>> ai_subdevice = device.find_subdevice_by_type(SUBDEVICE_TYPE.ai)
+>>> ao_subdevice = device.find_subdevice_by_type(SUBDEVICE_TYPE.ao)
+>>> ai_channel = ai_subdevice.channel(0, factory=DataChannel, aref=AREF.diff)
+>>> ao_channel = ao_subdevice.channel(0, factory=DataChannel, aref=AREF.diff)
+>>> ai_channel.range = ai_channel.find_range(unit=UNIT.volt, min=0, max=5)
+>>> ao_channel.range = ao_channel.find_range(unit=UNIT.volt, min=0, max=5)
+>>> ao_maxdata = ao_channel.get_maxdata()
+>>> ai_maxdata = ai_channel.get_maxdata()
+>>> ao_start = 0.3 * ao_maxdata
+>>> ao_stop = 0.7 * ao_maxdata
+>>> points = 10
+>>> ao_data = linspace(ao_start, ao_stop, points)
+>>> ai_data = []
+>>> for i in range(points):
+... written = ao_channel.data_write(ao_data[i])
+... assert written == 1, written
+... ai_data.append(ai_channel.data_read_delayed(nano_sec=1e3))
+>>> ao_data
+>>> ai_data
+>>> scaled_ao_data = [d/float(ao_maxdata) for d in ao_data]
+>>> scaled_ai_data = [d/float(ai_maxdata) for d in ai_data]
+>>> gradient,intercept,r_value,p_value,std_err = linregress(
+... scaled_ao_data, scaled_ai_data)
+>>> gradient
+>>> intercept
+>>> r_value
+>>> p_value
+>>> std_err
+>>> device.close()
--- /dev/null
+Import required modules.
+
+>>> from pycomedi.classes import Device, DataChannel
+>>> from pycomedi.constants import SUBDEVICE_TYPE, IO_DIRECTION
+
+Open a device.
+
+>>> device = Device('/dev/comedi0')
+>>> device.open()
+
+Get your DIO subdevice (alternatively, use `device.subdevice()` to
+select the subdevice directly by index).
+
+>>> subdevice = device.find_subdevice_by_type(SUBDEVICE_TYPE.dio)
+
+Generate a list of channels you wish to control.
+
+>>> channels = [subdevice.channel(i, factory=DataChannel)
+... for i in (0, 1, 2, 3)]
+
+Configure the channels.
+
+>>> for chan in channels:
+... chan.dio_config(IO_DIRECTION.input)
+
+Read/write either as a block...
+
+>>> value = subdevice.dio_bitfield(base_channel=0)
+>>> value # doctest: +SKIP
+7
+
+... or sequentially.
+
+>>> value = [c.dio_read() for c in channels]
+>>> value # doctest: +SKIP
+[1, 1, 1, 0]
+
+Close the device when you're done.
+
+>>> device.close()
--- /dev/null
+Import required modules.
+
+>>> from numpy import arange, linspace, zeros, sin, pi
+>>> from pycomedi.classes import (Device, StreamingSubdevice, DataChannel,
+... Chanlist)
+>>> from pycomedi.constants import (AREF, CMDF, INSN, SUBDEVICE_TYPE, TRIG_SRC,
+... UNIT)
+>>> from pycomedi.utility import inttrig_insn, subdevice_dtype, Reader, Writer
+
+Open a device.
+
+>>> device = Device('/dev/comedi0')
+>>> device.open()
+
+Get your I/O subdevices (alternatively, use `device.subdevice()` to
+select the subdevice directly by index).
+
+>>> ai_subdevice = device.find_subdevice_by_type(SUBDEVICE_TYPE.ai,
+... factory=StreamingSubdevice)
+>>> ao_subdevice = device.find_subdevice_by_type(SUBDEVICE_TYPE.ao,
+... factory=StreamingSubdevice)
+
+Generate a list of channels you wish to control.
+
+>>> ai_channels = Chanlist(
+... [ai_subdevice.channel(i, factory=DataChannel, aref=AREF.diff)
+... for i in (0, 1, 2, 3)])
+>>> ao_channels = Chanlist(
+... [ao_subdevice.channel(i, factory=DataChannel, aref=AREF.diff)
+... for i in (0, 1)])
+
+Configure the channels.
+
+>>> for chan in ai_channels + ao_channels:
+... chan.range = chan.find_range(unit=UNIT.volt, min=-10, max=10)
+
+Use the subdevice flags to determine data types.
+
+>>> ai_dtype = subdevice_dtype(ai_subdevice)
+>>> ao_dtype = subdevice_dtype(ao_subdevice)
+
+Allocate buffers (with channel data interleaved).
+
+>>> n_samps = 100
+>>> ai_buffer = zeros((n_samps, len(ai_channels)), dtype=ai_dtype)
+>>> ao_buffer = zeros((n_samps, len(ao_channels)), dtype=ao_dtype)
+
+Initialize output buffer.
+
+>>> midpoint = int(ao_channels[0].get_maxdata()/2)
+>>> bitrange = float(midpoint/2)
+>>> ao_buffer[:,0] = midpoint+bitrange*sin(2*pi*arange(n_samps)/(n_samps-1.0))
+>>> ao_buffer[:,1] = linspace(0, midpoint, n_samps)
+>>> ao_buffer[-1,:] = midpoint
+
+Get rough commands.
+
+>>> frequency = 1000.0 # Hz
+>>> scan_period_ns = int(1e9 / frequency) # nanosecond period
+>>> ai_subdevice.get_cmd_generic_timed(len(ai_channels), scan_period_ns)
+>>> ai_cmd = ai_subdevice.cmd
+>>> ao_subdevice.get_cmd_generic_timed(len(ao_channels), scan_period_ns)
+>>> ao_cmd = ao_subdevice.cmd
+
+Setup multi-scan run triggering.
+
+>>> ai_cmd.start_src = TRIG_SRC.int
+>>> ai_cmd.start_arg = 0
+>>> ai_cmd.stop_src = TRIG_SRC.count
+>>> ai_cmd.stop_arg = n_samps
+>>> ao_cmd.start_src = TRIG_SRC.ext
+>>> ao_cmd.start_arg = 18 # NI card AI_START1 internal AI start signal
+>>> ao_cmd.stop_src = TRIG_SRC.count
+>>> ao_cmd.stop_arg = n_samps
+
+Setup multi-channel scan triggering (handled by get_cmd_generic_timed?).
+
+ai_cmd.scan_begin_src = TRIG_SRC.timer
+ai_cmd.scan_begin_arg
+ai_cmd.convert_src
+ai_cmd.convert_arg
+ai_cmd.scan_end_src
+ai_cmd.scan_end_arg
+
+Add channel lists.
+
+>>> ai_cmd.chanlist = ai_channels
+>>> ao_cmd.chanlist = ao_channels
+
+Test the commands.
+
+>>> ai_subdevice.cmd = ai_cmd
+>>> ao_subdevice.cmd = ao_cmd
+>>> for i in range(3):
+... rc = ai_subdevice.command_test()
+... if rc == None: break
+... print i, rc
+>>> for i in range(3):
+... rc = ao_subdevice.command_test()
+... if rc == None: break
+... print i, rc
+
+Start the commands.
+
+>>> ao_subdevice.command()
+>>> ai_subdevice.command()
+
+Start the reader and writer threads.
+
+>>> writer = Writer(ao_subdevice, ao_buffer)
+>>> writer.start()
+>>> reader = Reader(ai_subdevice, ai_buffer)
+>>> reader.start()
+
+Arm the AO command.
+
+>>> device.do_insn(inttrig_insn(ao_subdevice))
+1
+
+Trigger AI aquisition, which in turn triggers the AO.
+
+>>> device.do_insn(inttrig_insn(ai_subdevice))
+1
+
+Join the reader and writer threads when they finish.
+
+>>> writer.join()
+>>> reader.join()
+
+>>> ai_buffer # doctest: +SKIP
+array([[32342, 31572, 30745, 31926],
+ [33376, 31797, 30904, 31761],
+...
+ [31308, 24246, 22215, 21824],
+ [32343, 25044, 22659, 21959]], dtype=uint16)
+
+Close the device when you're done.
+
+>>> device.close()
-# PyComedi provides an object-oriented interface to the Comedi drivers.
-# Copyright (C) 2008-2010 W. Trevor King
+# Copyright (C) 2008-2011 W. Trevor King
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-__version__ = '0.2'
+import logging as _logging
+
+
+__version__ = '0.3'
+
+
+LOG = _logging.getLogger('pycomedi')
+"Pycomedi logger"
+
+LOG.setLevel(_logging.DEBUG)
+h = _logging.StreamHandler()
+h.setLevel(_logging.WARN)
+f = _logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+h.setFormatter(f)
+LOG.addHandler(h)
+del h, f
+
+
+class PyComediError (Exception):
+ "Error in pycomedi"
+ pass
--- /dev/null
+# Copyright (C) 2010-2011 W. Trevor King
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"Object oriented wrappers around the comedi module."
+
+import os as _os
+
+import comedi as _comedi
+import numpy as _numpy
+
+from . import LOG as _LOG
+from . import PyComediError as _PyComediError
+from . import constants as _constants
+
+
+def _comedi_arg(arg):
+ "Replace arguments with their comedilib value."
+ if isinstance(arg, _constants._BitwiseOperator):
+ return arg.value
+ elif isinstance(arg, Command):
+ _LOG.debug(str(arg))
+ return arg.cmd
+ elif isinstance(arg, Chanlist):
+ return arg.chanlist()
+ return arg
+
+def _comedi_getter(name, is_invalid):
+ def comedi_get(function_name, *args, **kwargs):
+ if 'error_msg' in kwargs:
+ error_msg = kwargs.pop('error_msg')
+ else:
+ error_msg = 'error while running %s with %s and %s' % (
+ function_name, args, kwargs)
+ fn = getattr(_comedi, function_name)
+
+ _LOG.debug('calling %s with %s %s' % (function_name, args, kwargs))
+
+ args = list(args)
+ for i,arg in enumerate(args):
+ args[i] = _comedi_arg(arg)
+ for key,value in kwargs.iteritems():
+ kwargs[key] = _comedi_arg(value)
+
+ ret = fn(*args, **kwargs)
+ _LOG.debug(' call to %s returned %s' % (function_name, ret))
+ if is_invalid(ret):
+ errno = _comedi.comedi_errno()
+ comedi_msg = _comedi.comedi_strerror(errno)
+ _comedi.comedi_perror(function_name)
+ raise _PyComediError('%s: %s (%s)' % (error_msg, comedi_msg, ret))
+ return ret
+ comedi_get.__name__ = name
+ comedi_get.__doc__ = (
+ 'Execute `comedi.<function_name>(*args, **kwargs)` safely.')
+ return comedi_get
+
+_comedi_int = _comedi_getter('comedi_int', lambda ret: ret < 0)
+_comedi_ptr = _comedi_getter('comedi_ptr', lambda ret: ret == None)
+_comedi_tup = _comedi_getter('comedi_tup', lambda ret: ret[0] < 0)
+
+
+def _cache(method):
+ def wrapper(self, *args, **kwargs):
+ key = (method.__name__, args, str(kwargs))
+ if key not in self._cache:
+ self._cache[key] = method(self, *args, **kwargs)
+ return self._cache[key]
+ wrapper.__name__ = method.__name__
+ wrapper.__doc__ = method.__doc__
+ return wrapper
+
+
+class CacheObject (object):
+ """An object that caches return values for decoreated merthods.
+
+ >>> class A (CacheObject):
+ ... @_cache
+ ... def double(self, x):
+ ... print 'calculating 2*%d' % x
+ ... return x*2
+ >>> a = A()
+ >>> a.double(2)
+ calculating 2*2
+ 4
+ >>> a.double(2)
+ 4
+ >>> a.double(3)
+ calculating 2*3
+ 6
+ >>> a.double(3)
+ 6
+ >>> print sorted(a._cache.keys())
+ [('double', (2,), '{}'), ('double', (3,), '{}')]
+ """
+ def __init__(self):
+ self.clear_cache()
+
+ def clear_cache(self):
+ self._cache = {}
+
+
+class Channel (CacheObject):
+ def __init__(self, subdevice, index):
+ super(Channel, self).__init__()
+ self._subdevice = subdevice
+ self._index = index
+
+ @_cache
+ def get_maxdata(self):
+ return _comedi_int(
+ 'comedi_get_maxdata', self._subdevice._device._device,
+ self._subdevice._index, self._index)
+
+ @_cache
+ def get_n_ranges(self):
+ return _comedi_int(
+ 'comedi_get_n_ranges', self._subdevice._device._device,
+ self._subdevice._index, self._index)
+
+ def get_range(self, index):
+ r = _comedi_ptr(
+ 'comedi_get_range', self._subdevice._device._device,
+ self._subdevice._index, self._index, index)
+ return Range(index=index, range=r)
+
+ @_cache
+ def _find_range(self, unit, min, max):
+ "Search for range"
+ return _comedi_int(
+ 'comedi_find_range', self._subdevice._device._device,
+ self._subdevice._index, self._index, unit.value, min, max)
+
+ def find_range(self, unit, min, max):
+ """Search for range
+
+ `unit` should be an item from `constants.UNIT`.
+ """
+ return self.get_range(self._find_range(unit, min, max))
+
+
+class Subdevice (CacheObject):
+ def __init__(self, device, index):
+ super(Subdevice, self).__init__()
+ self._device = device
+ self._index = index
+
+ @_cache
+ def get_type(self):
+ "Type of subdevice (from `SUBDEVICE_TYPE`)"
+ _type = _comedi_int(
+ 'comedi_get_subdevice_type', self._device._device, self._index)
+ return _constants.SUBDEVICE_TYPE.index_by_value(_type)
+
+ @_cache
+ def _get_flags(self):
+ "Subdevice flags"
+ return _comedi_int(
+ 'comedi_get_subdevice_flags', self._device._device, self._index)
+
+ def get_flags(self):
+ "Subdevice flags (an `SDF` `FlagValue`)"
+ return _constants.FlagValue(
+ _constants.SDF, self._get_flags())
+
+ @_cache
+ def n_channels(self):
+ "Number of subdevice channels"
+ return _comedi_int(
+ 'comedi_get_n_channels', self._device._device, self._index)
+
+ @_cache
+ def range_is_chan_specific(self):
+ return _comedi_int(
+ 'comedi_range_is_chan_specific', self._device._device, self._index)
+
+ @_cache
+ def maxdata_is_chan_specific(self):
+ return _comedi_int(
+ 'comedi_maxdata_is_chan_specific',
+ self._device._device, self._index)
+
+ def lock(self):
+ "Reserve the subdevice"
+ _comedi_int('comedi_lock', self._device._device, self._index)
+
+ def unlock(self):
+ "Release the subdevice"
+ _comedi_int('comedi_unlock', self._device._device, self._index)
+
+ def dio_bitfield(self, bits=0, write_mask=0, base_channel=0):
+ """Read/write multiple digital channels.
+
+ `bits` and `write_mask` are bit fields with the least
+ significant bit representing channel `base_channel`.
+
+ Returns a bit field containing the read value of all input
+ channels and the last written value of all output channels.
+ """
+ rc,bits = _comedi_tup(
+ 'comedi_dio_bitfield2', self._device._device,
+ self._index, write_mask, bits, base_channel)
+ return bits
+
+ # extensions to make a more idomatic Python interface
+
+ def insn(self):
+ insn = self._device.insn()
+ insn.subdev = self._index
+ return insn
+
+ def channel(self, index, factory=Channel, **kwargs):
+ "`Channel` instance for the `index`\ed channel."
+ return factory(subdevice=self, index=index, **kwargs)
+
+
+class Device (CacheObject):
+ "Class bundling device-related functions."
+ def __init__(self, filename):
+ super(Device, self).__init__()
+ self.filename = filename
+ self._device = None
+ self.file = None
+
+ def open(self):
+ "Open device"
+ self._device = _comedi_ptr('comedi_open', self.filename)
+ self.file = _os.fdopen(self.fileno(), 'r+')
+ self.clear_cache()
+
+ def close(self):
+ "Close device"
+ self.file.flush()
+ self.file.close()
+ _comedi_int('comedi_close', self._device)
+ self._device = None
+ self.file = None
+ self.clear_cache()
+
+ @_cache
+ def fileno(self):
+ "File descriptor for this device"
+ return _comedi_int('comedi_fileno', self._device)
+
+ @_cache
+ def get_n_subdevices(self):
+ "Number of subdevices"
+ self._cache
+ return _comedi_int('comedi_get_n_subdevices', self._device)
+
+ @_cache
+ def get_version_code(self):
+ """Comedi version code.
+
+ This is a kernel-module level property, but a valid device is
+ necessary to communicate with the kernel module.
+
+ Returns a tuple of version numbers, e.g. `(0, 7, 61)`.
+ """
+ version = _comedi_int('comedi_get_version_code', self._device)
+ ret = []
+ for i in range(3):
+ ret.insert(0, version & (2**8-1))
+ version >>= 2**8 # shift over 8 bits
+ return tuple(ret)
+
+ @_cache
+ def get_driver_name(self):
+ "Comedi driver name"
+ return _comedi_ptr('get_driver_name', self._device)
+
+ @_cache
+ def get_board_name(self):
+ "Comedi board name"
+ return _comedi_ptr('get_board_name', self._device)
+
+ @_cache
+ def _get_read_subdevice(self):
+ "Find streaming input subdevice index"
+ return _comedi_int('comedi_get_read_subdevice', self._device)
+
+ def get_read_subdevice(self, **kwargs):
+ "Find streaming input subdevice"
+ return self.subdevice(self._get_read_subdevice(), **kwargs)
+
+ @_cache
+ def _get_write_subdevice(self):
+ "Find streaming output subdevice index"
+ return _comedi_int('comedi_get_write_subdevice', self._device)
+
+ def _get_write_subdevice(self, **kwargs):
+ "Find streaming output subdevice"
+ return self.subdevice(self._get_write_subdevice(), **kwargs)
+
+ @_cache
+ def _find_subdevice_by_type(self, subdevice_type):
+ "Search for a subdevice index for type `subdevice_type`)."
+ return _comedi_int(
+ 'comedi_find_subdevice_by_type',
+ self._device, subdevice_type.value, 0) # 0 is starting subdevice
+
+ def find_subdevice_by_type(self, subdevice_type, **kwargs):
+ """Search for a subdevice by type `subdevice_type`)."
+
+ `subdevice_type` should be an item from `constants.SUBDEVICE_TYPE`.
+ """
+ return self.subdevice(
+ self._find_subdevice_by_type(subdevice_type), **kwargs)
+
+ def do_insnlist(self, insnlist):
+ """Perform multiple instructions
+
+ Returns the number of successfully completed instructions.
+ """
+ return _comedi_int('comedi_do_insn', self._device, insn)
+
+ def do_insn(self, insn):
+ """Preform a single instruction.
+
+ Returns an instruction-specific integer.
+ """
+ return _comedi_int('comedi_do_insn', self._device, insn)
+
+ def get_default_calibration_path(self):
+ "The default calibration path for this device"
+ assert self._device != None, (
+ 'Must call get_default_calibration_path on an open device.')
+ return _comedi_ptr('comedi_get_default_calibration_path', self._device)
+
+ # extensions to make a more idomatic Python interface
+
+ def insn(self):
+ return _comedi.comedi_insn_struct()
+
+ def subdevices(self, **kwargs):
+ "Iterate through all available subdevices."
+ for i in range(self.n_subdevices):
+ yield self.subdevice(i, **kwargs)
+
+ def subdevice(self, index, factory=Subdevice, **kwargs):
+ return factory(device=self, index=index, **kwargs)
+
+
+class Range (object):
+ def __init__(self, index, range):
+ self.index = index
+ self.range = range
+
+ def __getattr__(self, name):
+ return getattr(self.range, name)
+
+
+class Command (object):
+ """Wrap `comedi.comedi_cmd` with a nicer interface.
+
+ Examples
+ --------
+
+ >>> from .utility import set_cmd_chanlist, set_cmd_data
+ >>> CMDF = _constants.CMDF
+ >>> TRIG_SRC = _constants.TRIG_SRC
+ >>> c = Command()
+ >>> c.subdev = 1
+ >>> c.flags = CMDF.priority | CMDF.write
+ >>> c.start_src = TRIG_SRC.int | TRIG_SRC.now
+ >>> c.scan_begin_src = TRIG_SRC.timer
+ >>> c.scan_begin_arg = 10
+ >>> c.scan_convert_src = TRIG_SRC.now
+ >>> c.scan_end_src = TRIG_SRC.count
+ >>> c.scan_end_arg = 4
+ >>> c.stop_src = TRIG_SRC.none
+ >>> set_cmd_chanlist(c, [])
+ >>> set_cmd_data(c, [1,2,3])
+ >>> print c # doctest: +ELLIPSIS, +REPORT_UDIFF
+ Comedi command:
+ subdev : 1
+ flags : priority|write
+ start_src : now|int
+ start_arg : 0
+ scan_begin_src : timer
+ scan_begin_arg : 10
+ convert_src : -
+ convert_arg : 0
+ scan_end_src : count
+ scan_end_arg : 4
+ stop_src : none
+ stop_arg : 0
+ chanlist : <comedi.lsampl_array... at 0x...>
+ chanlist_len : 0
+ data : <comedi.sampl_array... at 0x...>
+ data_len : 3
+ """
+ _str_fields = [
+ 'subdev', 'flags', 'start_src', 'start_arg', 'scan_begin_src',
+ 'scan_begin_arg', 'convert_src', 'convert_arg', 'scan_end_src',
+ 'scan_end_arg', 'stop_src', 'stop_arg', 'chanlist', 'chanlist_len',
+ 'data', 'data_len']
+
+ def __init__(self):
+ self.cmd = _comedi.comedi_cmd_struct()
+
+ def _get_flag_field(self, name, flag):
+ f = _constants.FlagValue(flag, getattr(self.cmd, name))
+ return f
+
+ def get_flags(self):
+ return self._get_flag_field('flags', _constants.CMDF)
+
+ def get_trigger_field(self, name):
+ return self._get_flag_field(name, _constants.TRIG_SRC)
+
+ def __str__(self):
+ values = {}
+ for f in self._str_fields:
+ if f.endswith('_src'):
+ values[f] = str(self.get_trigger_field(f))
+ elif f == 'flags':
+ values[f] = str(self.get_flags())
+ else:
+ values[f] = getattr(self, f)
+ max_len = max([len(f) for f in self._str_fields])
+ lines = ['%*s : %s' % (max_len, f, values[f])
+ for f in self._str_fields]
+ return 'Comedi command:\n %s' % '\n '.join(lines)
+
+ def __setattr__(self, name, value):
+ if name == 'cmd':
+ return super(Command, self).__setattr__(name, value)
+ return setattr(self.cmd, name, _comedi_arg(value))
+
+ def __getattr__(self, name):
+ return getattr(self.cmd, name) # TODO: lookup _NamedInt?
+
+
+class DataChannel (Channel):
+ """Channel configured for reading data.
+
+ `range` should be a `Range` instance, `aref` should be an
+ `constants.AREF` instance,
+ """
+ def __init__(self, range=None, aref=None, **kwargs):
+ super(DataChannel, self).__init__(**kwargs)
+ self.range = range
+ self.aref = aref
+
+ # syncronous stuff
+
+ def data_read(self):
+ "Read one sample"
+ read,data = _comedi_tup(
+ 'comedi_data_read', self._subdevice._device._device,
+ self._subdevice._index, self._index, self.range.index, self.aref)
+ return data
+
+ def data_read_n(self, n):
+ "Read `n` samples (timing between samples is undefined)."
+ read,data = _comedi_tup(
+ 'comedi_data_read', self._subdevice._device._device,
+ self._subdevice._index, self._index, self.range.index, self.aref,
+ n)
+ return data
+
+ def data_read_hint(self):
+ """Tell driver which channel/range/aref you will read next
+
+ Used to prepare an analog input for a subsequent call to
+ comedi_data_read. It is not necessary to use this function,
+ but it can be useful for eliminating inaccuaracies caused by
+ insufficient settling times when switching the channel or gain
+ on an analog input. This function sets an analog input to the
+ channel, range, and aref specified but does not perform an
+ actual analog to digital conversion.
+
+ Alternatively, one can simply use `.data_read_delayed()`, which
+ sets up the input, pauses to allow settling, then performs a
+ conversion.
+ """
+ _comedi_int(
+ 'comedi_data_read_hint', self._subdevice._device._device,
+ self._subdevice._index, self._index, self.range.index,
+ self.aref.value)
+
+ def data_read_delayed(self, nano_sec=0):
+ """Read single sample after delaying specified settling time.
+
+ Although the settling time is specified in integer
+ nanoseconds, the actual settling time will be rounded up to
+ the nearest microsecond.
+ """
+ read,data = _comedi_tup(
+ 'comedi_data_read_delayed', self._subdevice._device._device,
+ self._subdevice._index, self._index, self.range.index,
+ self.aref.value, int(nano_sec))
+ return data
+
+ def data_write(self, data):
+ "Write one sample"
+ written = _comedi_int(
+ 'comedi_data_write', self._subdevice._device._device,
+ self._subdevice._index, self._index, self.range.index, self.aref,
+ int(data))
+ return written
+
+ def dio_config(self, dir):
+ """Change input/output properties
+
+ `dir` should be an item from `constants.IO_DIRECTION`.
+ """
+ _comedi_int(
+ 'comedi_dio_config', self._subdevice._device._device,
+ self._subdevice._index, self._index, dir)
+
+ def _dio_get_config(self):
+ "Query input/output properties"
+ return _comedi_int(
+ 'comedi_dio_get_config', self._subdevice._device._device,
+ self._subdevice._index, self._index)
+
+ def dio_get_config(self):
+ """Query input/output properties
+
+ Return an item from `constants.IO_DIRECTION`.
+ """
+ return _constants.IO_DIRECTION.index_by_value(self._dio_get_config())
+
+ def dio_read(self):
+ "Read a single bit"
+ rc,bit = _comedi_tup(
+ 'comedi_dio_read', self._subdevice._device._device,
+ self._subdevice._index, self._index)
+ return bit
+
+ def dio_write(self, bit):
+ "Write a single bit"
+ return _comedi_int(
+ 'comedi_dio_write', self._subdevice._device._device,
+ self._subdevice._index, self._index, bit)
+
+ def cr_pack(self):
+ return _comedi.cr_pack(self._index, self.range.index, self.aref.value)
+
+
+class SlowlyVaryingChannel (Channel):
+ "Slowly varying channel"
+ def __init__(self, **kwargs):
+ super(SlowlyVaryingChannel, self).__init__(**kwargs)
+ self._sv = _comedi.comedi_sv_t()
+ self.init()
+
+ def init(self):
+ "Initialise `._sv`"
+ _comedi_int(
+ 'comedi_sv_init', self._sv, self._subdevice._device._device,
+ self._subdevice._index, self._index)
+
+ def update(self):
+ "Update internal `._sv` parameters."
+ _comedi_int('comedi_sv_update', self._sv)
+
+ def measure(self):
+ """Measure a slowy varying signal.
+
+ Returns `(num_samples, physical_value)`.
+ """
+ return _comedi_tup(
+ 'comedi_sv_measure', self._sv)
+
+
+class StreamingSubdevice (Subdevice):
+ "Streaming I/O channel"
+ def __init__(self, **kwargs):
+ super(StreamingSubdevice, self).__init__(**kwargs)
+ self.cmd = Command()
+
+ def get_cmd_src_mask(self):
+ """Detect streaming input/output capabilities
+
+ The command capabilities of the subdevice indicated by the
+ parameters device and subdevice are probed, and the results
+ placed in the command structure *command. The trigger source
+ elements of the command structure are set to be the bitwise-or
+ of the subdevice's supported trigger sources. Other elements
+ in the structure are undefined.
+ """
+ rc = _comedi_int(
+ 'comedi_get_cmd_src_mask', self._device._device, self._index,
+ self.cmd)
+
+ def get_cmd_generic_timed(self, chanlist_len, scan_period_ns=0):
+ """Detect streaming input/output capabilities
+
+ The command capabilities of the subdevice indicated by the
+ parameters device and subdevice are probed, and the results
+ placed in the command structure pointed to by the parameter
+ command. The command structure *command is modified to be a
+ valid command that can be used as a parameter to
+ comedi_command (after the command has additionally been
+ assigned a valid chanlist array). The command measures scans
+ consisting of chanlist_len channels at a scan rate that
+ corresponds to a period of scan_period_ns nanoseconds. The
+ rate is adjusted to a rate that the device can handle.
+ """
+ rc = _comedi_int(
+ 'comedi_get_cmd_generic_timed', self._device._device, self._index,
+ self.cmd, chanlist_len, scan_period_ns)
+
+ def cancel(self):
+ "Stop streaming input/output in progress."
+ _comedi_int('comedi_cancel', self._device._device, self._index)
+
+ def command(self):
+ "Start streaming input/output"
+ _comedi_int('comedi_command', self._device._device, self.cmd)
+
+ _command_test_errors = [
+ None, # valid
+ 'unsupported *_src trigger', # unsupported trigger bits zeroed
+ 'unsupported *_src combo, or multiple triggers',
+ '*_arg out of range', # offending members adjusted to valid values
+ '*_arg required adjustment', # e.g. trigger timing period rounded
+ 'invalid chanlist', # e.g. some boards require same range across chans
+ ]
+
+ def command_test(self):
+ "Test streaming input/output configuration"
+ rc = _comedi.comedi_command_test(
+ self._device._device, _comedi_arg(self.cmd))
+ return self._command_test_errors[rc]
+
+ def poll(self):
+ """Force updating of streaming buffer
+
+ If supported by the driver, all available samples are copied
+ to the streaming buffer. These samples may be pending in DMA
+ buffers or device FIFOs. If successful, the number of
+ additional bytes available is returned.
+ """
+ return _comedi_int('comedi_poll', self._device._device, self._index)
+
+ def get_buffer_size(self):
+ "Streaming buffer size of subdevice"
+ return _comedi_int(
+ 'comedi_get_buffer_size', self._device._device, self._index)
+
+ def set_buffer_size(self, size):
+ """Change the size of the streaming buffer
+
+ Returns the new buffer size in bytes.
+
+ The buffer size will be set to size bytes, rounded up to a
+ multiple of the virtual memory page size. The virtual memory
+ page size can be determined using `sysconf(_SC_PAGE_SIZE)`.
+
+ This function does not require special privileges. However, it
+ is limited to a (adjustable) maximum buffer size, which can be
+ changed by a priveliged user calling
+ `.comedi_set_max_buffer_size`, or running the program
+ `comedi_config`.
+ """
+ return _comedi_int(
+ 'comedi_set_buffer_size',
+ self._device._device, self._index, size)
+
+ def get_max_buffer_size(self):
+ "Maximum streaming buffer size of subdevice"
+ return _comedi_int(
+ 'comedi_get_max_buffer_size', self._device._device, self._index)
+
+ def set_max_buffer_size(self, max_size):
+ """Set the maximum streaming buffer size of subdevice
+
+ Returns the old (max?) buffer size on success.
+ """
+ return _comedi_int(
+ 'comedi_set_max_buffer_size', self._device._device, self._index,
+ max_size)
+
+ def get_buffer_contents(self):
+ "Number of bytes available on an in-progress command"
+ return _comedi_int(
+ 'comedi_get_buffer_contents', self._device._device, self._index)
+
+ def mark_buffer_read(self, num_bytes):
+ """Next `num_bytes` bytes in the buffer are no longer needed
+
+ Returns the number of bytes successfully marked as read.
+
+ This method should only be used if you are using a `mmap()` to
+ read data from Comedi's buffer (as opposed to calling `read()`
+ on the device file), since Comedi will automatically keep
+ track of how many bytes have been transferred via `read()`
+ calls.
+ """
+ return _comedi_int(
+ 'comedi_mark_buffer_read', self._device._device, self._index,
+ num_bytes)
+
+ def mark_buffer_written(self, num_bytes):
+ """Next `num_bytes` bytes in the buffer are no longer needed
+
+ Returns the number of bytes successfully marked as written.
+
+ This method should only be used if you are using a `mmap()` to
+ read data from Comedi's buffer (as opposed to calling
+ `write()` on the device file), since Comedi will automatically
+ keep track of how many bytes have been transferred via
+ `write()` calls.
+ """
+ return _comedi_int(
+ 'comedi_mark_buffer_written', self._device._device, self._index,
+ num_bytes)
+
+ def get_buffer_offset(self):
+ """Offset in bytes of the read(/write?) pointer in the streaming buffer
+
+ This offset is only useful for memory mapped buffers.
+ """
+ return _comedi_int(
+ 'comedi_get_buffer_offset', self._device._device, self._index)
+
+
+class Chanlist (list):
+ def chanlist(self):
+ ret = _comedi.chanlist(len(self))
+ for i,channel in enumerate(self):
+ ret[i] = channel.cr_pack()
+ return ret
+
+
+class CalibratedConverter (object):
+ """Apply a converion polynomial
+
+ Usually you would get the conversion polynomial from
+ `Channel.get_hardcal_converter()` or similar. bit for testing,
+ we'll just create one out of thin air.
+
+ TODO: we'll need to use Cython, because the current SWIG bindings
+ don't provide a way to create or edit `double *` arrays.
+
+ >>> p = _comedi.comedi_polynomial_t()
+ >>> p.order = 2
+ >>> p.coefficients[0] = 1 # this fails. Silly SWIG.
+ >>> p.coefficients[1] = 2
+ >>> p.coefficients[2] = 3
+ >>> dir(p.coefficients)
+ >>> p.coefficients = _numpy.array([1, 2, 3, 4], dtype=_numpy.double)
+ >>> p.expansion_origin = -1;
+
+ >>> c = CalibratedConverter(polynomial=p)
+ >>> c(-1)
+ >>> c(_numpy.array([-1, 0, 0.5, 2], dtype=_numpy.double))
+ """
+ def __init__(self, polynomial):
+ self.polynomial = polynomial
+
+ def __call__(self, data):
+ # Iterating through the coefficients fails. Silly SWIG.
+ coefficients = list(reversed(self.polynomial.coefficients))[0:p.order]
+ print coefficients
+ print self.polynomial.expansion_origin
+ return _numpy.polyval(
+ coefficients, data-self.polynomial.expansion_origin)
+
+# see comedi_caldac_t and related at end of comedilib.h
+++ /dev/null
-# Some Comedi operations common to analog and digital IO
-# Copyright (C) 2007-2010 W. Trevor King
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""Some Comedi operations common to analog and digital IO"""
-
-import comedi as c
-import time
-
-
-class PycomediError (Exception) :
- "Error in pycomedi.common"
- pass
-
-def _expand_tuple(tup, length) :
- "Expand an iterable TUP to a tuple of LENGTH by repeating the last element"
- if len(tup) > length :
- raise ValueError('tuple %s has more than %d elements.' % (tup, length))
- elif len(tup) < length :
- temp_tup = tup + tuple((tup[-1],)*(length-len(tup)))
- tup = temp_tup
- return tup
-
-class SubdeviceFlags (object) :
- """
- Descriptions from http://www.comedi.org/doc/r5153.html
- (the comedi_get_subdevice_flags documentation)
-
- SDF_BUSY:
- The subdevice is busy performing an asynchronous command. A
- subdevice being "busy" is slightly different from the "running"
- state flagged by SDF_RUNNING. A "running" subdevice is always
- "busy", but a "busy" subdevice is not necessarily "running". For
- example, suppose an analog input command has been completed by the
- hardware, but there are still samples in Comedi's buffer waiting
- to be read out. In this case, the subdevice is not "running", but
- is still "busy" until all the samples are read out or
- comedi_cancel() is called.
-
- SDF_BUSY_OWNER:
- The subdevice is "Busy", and the command it is running was started
- by the current process.
-
- SDF_LOCKED:
- The subdevice has been locked by comedi_lock().
-
- SDF_LOCK_OWNER:
- The subdevice is locked, and was locked by the current process.
-
- SDF_MAXDATA:
- The maximum data value for the subdevice depends on the channel.
-
- SDF_FLAGS:
- The subdevice flags depend on the channel (unfinished/broken
- support in library).
-
- SDF_RANGETYPE:
- The range type depends on the channel.
-
- SDF_CMD:
- The subdevice supports asynchronous commands.
-
- SDF_SOFT_CALIBRATED:
- The subdevice relies on the host to do calibration in software.
- Software calibration coefficients are determined by the
- comedi_soft_calibrate utility. See the description of the
- comedi_get_softcal_converter() function for more information.
-
- SDF_READABLE:
- The subdevice can be read (e.g. analog input).
-
- SDF_WRITABLE:
- The subdevice can be written to (e.g. analog output).
-
- SDF_INTERNAL:
- The subdevice does not have externally visible lines.
-
- SDF_GROUND:
- The subdevice supports AREF_GROUND.
-
- SDF_COMMON:
- The subdevice supports AREF_COMMON.
-
- SDF_DIFF:
- The subdevice supports AREF_DIFF.
-
- SDF_OTHER:
- The subdevice supports AREF_OTHER
-
- SDF_DITHER:
- The subdevice supports dithering (via the CR_ALT_FILTER chanspec
- flag).
-
- SDF_DEGLITCH:
- The subdevice supports deglitching (via the CR_ALT_FILTER chanspec
- flag).
-
- SDF_RUNNING:
- An asynchronous command is running. You can use this flag to poll
- for the completion of an output command.
-
- SDF_LSAMPL:
- The subdevice uses the 32 bit lsampl_t type instead of the 16 bit
- sampl_t for asynchronous command data.
-
- SDF_PACKED:
- The subdevice uses bitfield samples for asynchronous command data,
- one bit per channel (otherwise it uses one sampl_t or lsampl_t per
- channel). Commonly used for digital subdevices.
- """
- def __init__(self, comedi) :
- self.lastUpdate = None
- self._flags = None
- self._comedi = comedi
- def update(self, dev, subdev) :
- "Must be called on an open device"
- self._flags = self._comedi.comedi_get_subdevice_flags(dev, subdev)
- self.lastupdate = time.time()
- def _check(self, sdf_flag) :
- assert self._flags != None, "Cannot return status of uninitialized flag"
- if self._flags & sdf_flag :
- return True
- return False
- def busy(self) :
- return(self._check(self._comedi.SDF_BUSY))
- def busyOwner(self) :
- return(self._check(self._comedi.SDF_BUSY_OWNER))
- def locked(self) :
- return(self._check(self._comedi.SDF_LOCKED))
- def lockOwner(self) :
- return(self._check(self._comedi.SDF_LOCK_OWNER))
- def maxdata(self) :
- return(self._check(self._comedi.SDF_MAXDATA))
- def flags(self) :
- return(self._check(self._comedi.SDF_FLAGS))
- def rangetype(self) :
- return(self._check(self._comedi.SDF_RANGETYPE))
- def cmd(self) :
- return(self._check(self._comedi.SDF_CMD))
- def softCalibrated(self) :
- return(self._check(self._comedi.SDF_SOFT_CALIBRATED))
- def readable(self) :
- return(self._check(self._comedi.SDF_READABLE))
- def writable(self) :
- return(self._check(self._comedi.SDF_WRITABLE))
- def internal(self) :
- return(self._check(self._comedi.SDF_INTERNAL))
- def ground(self) :
- return(self._check(self._comedi.SDF_GROUND))
- def common(self) :
- return(self._check(self._comedi.SDF_COMMON))
- def diff(self) :
- return(self._check(self._comedi.SDF_DIFF))
- def other(self) :
- return(self._check(self._comedi.SDF_OTHER))
- def dither(self) :
- return(self._check(self._comedi.SDF_DITHER))
- def deglitch(self) :
- return(self._check(self._comedi.SDF_DEGLITCH))
- def running(self) :
- return(self._check(self._comedi.SDF_RUNNING))
- def lsampl(self) :
- return(self._check(self._comedi.SDF_LSAMPL))
- def packed(self) :
- return(self._check(self._comedi.SDF_PACKED))
- def __str__(self) :
- s = ""
- s += "busy : %s\n" % self.busy()
- s += "busyOwner : %s\n" % self.busyOwner()
- s += "locked : %s\n" % self.locked()
- s += "lockedOwner : %s\n" % self.lockOwner()
- s += "maxdata : %s\n" % self.maxdata()
- s += "flags : %s\n" % self.flags()
- s += "rangetype : %s\n" % self.rangetype()
- s += "cmd : %s\n" % self.cmd()
- s += "softCalibrated : %s\n" % self.softCalibrated()
- s += "readable : %s\n" % self.readable()
- s += "writable : %s\n" % self.writable()
- s += "internal : %s\n" % self.internal()
- s += "ground : %s\n" % self.ground()
- s += "common : %s\n" % self.common()
- s += "diff : %s\n" % self.diff()
- s += "other : %s\n" % self.other()
- s += "dither : %s\n" % self.dither()
- s += "deglitch : %s\n" % self.deglitch()
- s += "running : %s\n" % self.running()
- s += "lsampl : %s\n" % self.lsampl()
- s += "packed : %s" % self.packed()
- return s
-
-class PyComediIO (object) :
- "Base class for Comedi IO operations"
- def __init__(self, filename="/dev/comedi0", subdevice=-1, devtype=c.COMEDI_SUBD_AI, chan=(0,1,2,3), aref=(c.AREF_GROUND,), range=(0,), output=False, dev=None) :
- """inputs:
- filename: comedi device file for your device ["/dev/comedi0"].
- subdevice: the IO subdevice [-1 for autodetect]
- devtype: the device type [c.COMEDI_SUBD_AI]
- values include
- comedi.COMEDI_SUBD_DI
- comedi.COMEDI_SUBD_DO
- comedi.COMEDI_SUBD_DIO
- comedi.COMEDI_SUBD_AI
- comedi.COMEDI_SUBD_AO
- chan: an iterable of the channels you wish to control [(0,1,2,3)]
- aref: the analog references for these channels [(comedi.AREF_GROUND,)]
- values include
- comedi.AREF_GROUND
- comedi.AREF_COMMON
- comedi.AREF_DIFF
- comedi.AREF_OTHER
- range: the ranges for these channels [(0,)]
- output: whether to use the lines as output (vs input) (False)
- dev: if you've already opened the device file, pass in the
- open device (None for internal open)
- Note that neither aref nor range need to be the same length as
- the channel tuple. If they are shorter, their last entry will
- be repeated as needed. If they are longer, PycomediError will
- be raised (was: their extra entries will not be used).
- """
- self.verbose = False
- self._comedi = c # keep a local copy around
- # sometimes I got errors on exitting python, which looked like
- # the imported comedi package was unset before my IO had a
- # chance to call comedi_close(). We avoid that by keeping a
- # local reference here.
- self.filename = filename
- self.state = "Closed"
- if dev == None :
- self.open()
- else :
- self.fakeOpen(dev)
- self._setup_device_type(subdevice, devtype)
- self._setup_channels(chan, aref, range)
- self.output = output
- def __del__(self) :
- self.close()
- def open(self) :
- if self.state == "Closed" :
- dev = self._comedi.comedi_open(self.filename)
- if dev < 0 :
- self._comedi.comedi_perror("comedi_open")
- raise PycomediError, "Cannot open %s" % self.filename
- self.fakeOpen(dev)
- def fakeOpen(self, dev):
- """fake open: if you open the comedi device file yourself, use this
- method to pass in the opened file descriptor and declare the
- port "Open".
- """
- if dev < 0 :
- raise PycomediError, "Invalid file descriptor %d" % dev
- self.dev = dev
- self.state = "Open"
- def close(self) :
- if self.state != "Closed" :
- rc = self._comedi.comedi_close(self.dev)
- if rc < 0 :
- self._comedi.comedi_perror("comedi_close")
- raise PycomediError, "Cannot close %s" % self.filename
- self.fakeClose()
- def fakeClose(self):
- """fake close: if you want to close the comedi device file yourself,
- use this method to let the port know it has been "Closed".
- """
- self.dev = None
- self.state = "Closed"
- def _setup_device_type(self, subdevice, devtype) :
- """check that the specified subdevice exists,
- searching for an appropriate subdevice if subdevice == -1
- inputs:
- subdevice: the analog output subdevice (-1 for autodetect)
- devtype: the devoce type
- values include
- comedi.COMEDI_SUBD_DI
- comedi.COMEDI_SUBD_DO
- comedi.COMEDI_SUBD_DIO
- comedi.COMEDI_SUBD_AI
- comedi.COMEDI_SUBD_AO
- """
- self._devtype = devtype
- if (subdevice < 0) : # autodetect an output device
- self.subdev = self._comedi.comedi_find_subdevice_by_type(self.dev, self._devtype, 0) # 0 is starting subdevice
- if self.subdev < 0 :
- self._comedi.comedi_perror("comedi_find_subdevice_by_type")
- raise PycomediError, "Could not find a %d device" % (self._devtype)
- else :
- self.subdev = subdevice
- type = self._comedi.comedi_get_subdevice_type(self.dev, self.subdev)
- if type != self._devtype :
- if type < 0 :
- self._comedi.comedi_perror("comedi_get_subdevice_type")
- raise PycomediError, "Comedi subdevice %d has wrong type %d" % (self.subdev, type)
- self.flags = SubdeviceFlags(self._comedi)
- self.updateFlags()
- def updateFlags(self) :
- self.flags.update(self.dev, self.subdev)
- def _setup_channels(self, chan, aref, rng) :
- """check that the specified channels exists, and that the arefs and
- ranges are legal for those channels. Also allocate a range
- item for each channel, to allow converting between physical
- units and comedi units even when the device is not open.
- inputs:
- chan: an iterable of the channels you wish to control [(0,1,2,3)]
- aref: the analog references for these channels [(comedi.AREF_GROUND,)]
- values include
- comedi.AREF_GROUND
- comedi.AREF_COMMON
- comedi.AREF_DIFF
- comedi.AREF_OTHER
- rng: the ranges for these channels [(0,)]
- Note that neither aref nor rng need to be the same length as
- the channel tuple. If they are shorter, their last entry will
- be repeated as needed. If they are longer, PycomediError will
- be raised (was: their extra entries will not be used).
- """
- self.chan = chan
- self.nchan = len(self.chan)
- self._aref = _expand_tuple(aref, self.nchan)
- self._range = _expand_tuple(rng, self.nchan)
- self.maxdata = []
- self._comedi_range = []
- subdev_n_chan = self._comedi.comedi_get_n_channels(self.dev, self.subdev)
- self.cr_chan = self._comedi.chanlist(self.nchan)
- for i,chan,aref,rng in zip(range(self.nchan), self.chan, self._aref, self._range) :
- if int(chan) != chan :
- raise PycomediError, "Channels must be integers, not %s" % str(chan)
- if chan >= subdev_n_chan :
- raise PycomediError, "Channel %d > subdevice %d's largest chan %d" % (chan, self.subdev, subdev_n_chan-1)
- n_range = self._comedi.comedi_get_n_ranges(self.dev, self.subdev, chan)
- if rng > n_range :
- raise PycomediError, "Range %d > subdevice %d, chan %d's largest range %d" % (rng, subdev, chan, n_range-1)
- maxdata = self._comedi.comedi_get_maxdata(self.dev, self.subdev, chan)
- self.maxdata.append(maxdata)
- comrange = self._comedi.comedi_get_range(self.dev, self.subdev, chan, rng)
- # comrange becomes invalid if device is closed, so make a copy...
- comrange_copy = self._comedi.comedi_range()
- comrange_copy.min = comrange.min
- comrange_copy.max = comrange.max
- comrange_copy.unit = comrange.unit
- self._comedi_range.append(comrange_copy)
- self.cr_chan[i] = self._comedi.cr_pack(chan, rng, aref)
- def comedi_to_phys(self, chan_index, comedi, useNAN=True) :
- if useNAN == True :
- oor = self._comedi.COMEDI_OOR_NAN
- else :
- oor = self._comedi.COMEDI_OOR_NUMBER
- self._comedi.comedi_set_global_oor_behavior(oor)
- phys = self._comedi.comedi_to_phys(int(comedi), self._comedi_range[chan_index], self.maxdata[chan_index])
- if self.verbose :
- print "comedi %d = %g Volts on subdev %d, chan %d, range [%g, %g], max %d" % (comedi, phys, self.subdev, self.chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self.maxdata[chan_index])
- return phys
- def phys_to_comedi(self, chan_index, phys) :
- comedi = self._comedi.comedi_from_phys(phys, self._comedi_range[chan_index], self.maxdata[chan_index])
- if self.verbose :
- print "%g Volts = comedi %d on subdev %d, chan %d, range [%g, %g], max %d" % (phys, comedi, self.subdev, self.chan[chan_index], self._comedi_range[chan_index].max, self._comedi_range[chan_index].min, self.maxdata[chan_index])
- return comedi
-
-class PyComediSingleIO (PyComediIO) :
- "Software timed single-point input/ouput"
- def __init__(self, **kwargs) :
- """inputs:
- filename: comedi device file for your device ["/dev/comedi0"].
- subdevice: the analog output subdevice [-1 for autodetect]
- devtype: the device type [c.COMEDI_SUBD_AI]
- values include
- comedi.COMEDI_SUBD_DI
- comedi.COMEDI_SUBD_DO
- comedi.COMEDI_SUBD_DIO
- comedi.COMEDI_SUBD_AI
- comedi.COMEDI_SUBD_AO
- chan: an iterable of the channels you wish to control [(0,1,2,3)]
- aref: the analog references for these channels [(comedi.AREF_GROUND,)]
- values include
- comedi.AREF_GROUND
- comedi.AREF_COMMON
- comedi.AREF_DIFF
- comedi.AREF_OTHER
- range: the ranges for these channels [(0,)]
- output: whether to use the lines as output (vs input) (False)
- dev: if you've already opened the device file, pass in the
- open device (None for internal open)
- Note that neither aref nor range need to be the same length as
- the channel tuple. If they are shorter, their last entry will
- be repeated as needed. If they are longer, PycomediError will
- be raised (was: their extra entries will not be used).
- """
- PyComediIO.__init__(self, **kwargs)
- def write_chan_index(self, chan_index, data) :
- """inputs:
- chan_index: the channel you wish to write to
- data: the value you wish to write to that channel
- """
- if self.output != True :
- raise PycomediError, "Must be an output to write"
- rc = c.comedi_data_write(self.dev, self.subdev, self.chan[chan_index],
- self._range[chan_index],
- self._aref[chan_index], int(data));
- if rc != 1 : # the number of samples written
- self._comedi.comedi_perror("comedi_data_write")
- raise PycomediError, "comedi_data_write returned %d" % rc
- def read_chan_index(self, chan_index) :
- """inputs:
- chan_index: the channel you wish to read from
- outputs:
- data: the value read from that channel
- """
- if self.output != False :
- raise PycomediError, "Must be an input to read"
- rc, data = c.comedi_data_read(self.dev, self.subdev, self.chan[chan_index], self._range[chan_index], self._aref[chan_index]);
- if rc != 1 : # the number of samples read
- self._comedi.comedi_perror("comedi_data_read")
- raise PycomediError, "comedi_data_read returned %d" % rc
- return data
-
--- /dev/null
+# Copyright (C) 2010 W. Trevor King
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Enums and flags are bundled into class instances for easier browsing.
+
+>>> SUBDEVICE_TYPE # doctest: +NORMALIZE_WHITESPACE
+[<_NamedInt unused>, <_NamedInt ai>, <_NamedInt ao>, <_NamedInt di>,
+ <_NamedInt do>, <_NamedInt dio>, <_NamedInt counter>, <_NamedInt timer>,
+ <_NamedInt memory>, <_NamedInt calib>, <_NamedInt proc>, <_NamedInt serial>,
+ <_NamedInt pwm>]
+>>> SUBDEVICE_TYPE.dio
+<_NamedInt dio>
+>>> SUBDEVICE_TYPE.dio.value == _comedi.COMEDI_SUBD_DIO
+True
+>>> SUBDEVICE_TYPE.dio.doc
+'COMEDI_SUBD_DIO (digital input/output)'
+
+You can also search by name or value.
+
+>>> TRIG_SRC.index_by_name('timer')
+<_NamedInt timer>
+>>> TRIG_SRC.index_by_value(_comedi.TRIG_NOW)
+<_NamedInt now>
+
+Some flags have constants for setting or clearing all the flags at once.
+
+>>> TRIG_SRC # doctest: +NORMALIZE_WHITESPACE
+[<_NamedInt none>, <_NamedInt now>, <_NamedInt follow>, <_NamedInt time>,
+ <_NamedInt timer>, <_NamedInt count>, <_NamedInt ext>, <_NamedInt int>,
+ <_NamedInt other>]
+>>> TRIG_SRC._empty
+<_NamedInt invalid>
+>>> TRIG_SRC._all
+<_NamedInt any>
+
+Flag instances have a special wrapper that stores their value.
+
+>>> f = FlagValue(SDF, 17)
+>>> f.flag # doctest: +ELLIPSIS
+[<_NamedInt busy>, <_NamedInt busy_owner>, ...]
+>>> f.busy
+True
+>>> f.busy = False
+>>> f._value
+16
+
+You can treat named integers as Python integers with bitwise operations,
+
+>>> a = TRIG_SRC.now | TRIG_SRC.follow | TRIG_SRC.time | 64
+>>> a
+<_BitwiseOperator 78>
+>>> a.value
+78
+>>> TRIG_SRC.none & TRIG_SRC.now
+<_BitwiseOperator 0>
+
+But because of the way Python operator overloading works, plain
+integers must go at the end of bitwise chains.
+
+>>> 64 | TRIG_SRC.now
+Traceback (most recent call last):
+ ...
+TypeError: unsupported operand type(s) for |: 'int' and '_NamedInt'
+"""
+
+from math import log as _log
+
+import comedi as _comedi
+
+
+class _BitwiseOperator (object):
+ def __init__(self, value):
+ self.value = value
+
+ def __str__(self):
+ return self.__repr__()
+
+ def __repr__(self):
+ return '<%s %s>' % (self.__class__.__name__, self.value)
+
+ def __and__(self, other):
+ "Bitwise and acts on `_BitwiseOperator.value`."
+ if isinstance(other, _BitwiseOperator):
+ other = other.value
+ return _BitwiseOperator(int.__and__(self.value, other))
+
+ def __or__(self, other):
+ "Bitwise or acts on `_BitwiseOperator.value`."
+ if isinstance(other, _BitwiseOperator):
+ other = other.value
+ return _BitwiseOperator(int.__or__(self.value, other))
+
+
+class _NamedInt (_BitwiseOperator):
+ "A flag or enum item."
+ def __init__(self, name, value, doc=None):
+ super(_NamedInt, self).__init__(value)
+ self.name = name
+ self.doc = doc
+
+ def __str__(self):
+ return self.name
+
+ def __repr__(self):
+ return '<%s %s>' % (self.__class__.__name__, self.name)
+
+
+class _Enum (list):
+ "An enumerated list"
+ def __init__(self, name, prefix, blacklist=None, whitelist=None,
+ translation=None):
+ super(_Enum, self).__init__()
+ self.name = name
+ if blacklist == None:
+ blacklist = []
+ if translation == None:
+ translation = {}
+ self._name_keys = {}
+ self._value_keys = {}
+ for attr in dir(_comedi):
+ if attr.startswith(prefix):
+ item_name = self._item_name(attr, prefix, translation)
+ if self._is_ignored(item_name, blacklist, whitelist):
+ continue
+ self._add_item(attr, item_name)
+ self.sort(key=lambda item: item.value)
+
+ def _item_name(self, attr, prefix, translation):
+ item_name = attr[len(prefix):]
+ if item_name in translation:
+ return translation[item_name]
+ else:
+ return item_name.lower()
+
+ def _is_ignored(self, item_name, blacklist, whitelist):
+ return (item_name in blacklist
+ or whitelist != None and item_name not in whitelist)
+
+ def _add_item(self, attr, item_name):
+ item_value = getattr(_comedi, attr)
+ item = _NamedInt(item_name, item_value, doc=attr)
+ self.append(item)
+ self._name_keys[item_name] = item
+ if item_value in self._value_keys and item_name:
+ raise ValueError('value collision in %s: %s = %s = %#x'
+ % (self.name, item_name,
+ self._value_keys[item_value], item_value))
+ self._value_keys[item_value] = item
+ setattr(self, item_name, item)
+
+ def index_by_name(self, name):
+ return self._name_keys[name]
+
+ def index_by_value(self, value):
+ return self._value_keys[value]
+
+
+class _Flag (_Enum):
+ "A flag"
+ def __init__(self, *args, **kwargs):
+ super(_Flag, self).__init__(*args, **kwargs)
+ self._empty = None
+ self._all = None
+ for flag in self:
+ if flag.value == 0:
+ self._empty = flag
+ elif flag.value < 0 or _log(flag.value, 2) % 1 != 0:
+ if self._all:
+ raise ValueError(
+ 'mutliple multi-bit flags in %s: %s = %#x and %s = %#x'
+ % (self.name, self._all.name, self._all.value,
+ flag.name, flag.value))
+ self._all = flag
+ if self._empty:
+ self.remove(self._empty)
+ if self._all:
+ self.remove(self._all)
+
+ def get(self, value, name):
+ flag = getattr(self, name)
+ assert flag.value != 0, '%s: %s' % (self.name, flag)
+ return value & flag.value == flag.value
+
+ def set(self, value, name, status):
+ flag = getattr(self, name)
+ if status == True:
+ return value | flag.value
+ return (value | flag.value) - flag.value
+
+
+class FlagValue (object):
+ """A flag instance (flag + value)
+
+ Examples
+ --------
+
+ >>> f = FlagValue(flag=TRIG_SRC, value=0, default='empty')
+ >>> f.any
+ False
+ >>> print f
+ empty
+ >>> f.now = True
+ >>> f.timer = True
+ >>> f.int = True
+ >>> print f
+ now|timer|int
+ """
+ def __init__(self, flag, value, default='-'):
+ self.flag = flag
+ self._value = value
+ self._default = default
+
+ def __str__(self):
+ flags = [f for f in self.flag if getattr(self, f.name)]
+ if len(flags) == 0:
+ return self._default
+ return '|'.join([f.name for f in flags])
+
+ def __getattr__(self, name):
+ return self.flag.get(self._value, name)
+
+ def __setattr__(self, name, value):
+ if name != 'flag' and not name.startswith('_'):
+ value = self.flag.set(self._value, name, value)
+ name = '_value'
+ super(FlagValue, self).__setattr__(name, value)
+
+
+# blacklist deprecated values (and those belonging to other _Enums or _Flags)
+
+AREF = _Enum('analog_reference', 'AREF_')
+AREF.diff.doc += ' (differential)'
+AREF.other.doc += ' (other / undefined)'
+
+#GPCT = _Flag('general_purpose_counter_timer', 'GPCT_')
+# Two competing flag sets? Need some documentation.
+
+INSN_MASK = _Flag('instruction_mask', 'INSN_MASK_')
+
+CONFIGURATION_IDS = _Enum('configuration_ids', 'INSN_CONFIG_', blacklist=[
+ '8254_set_mode'])
+
+INSN = _Enum('instruction', 'INSN_',
+ blacklist=['mask_%s' % i.name for i in INSN_MASK] + [
+ 'config_%s' % i.name for i in CONFIGURATION_IDS])
+
+TRIG = _Flag('trigger_flags', 'TRIG_', whitelist=[
+ 'bogus', 'dither', 'deglitch', 'config', 'wake_eos'])
+TRIG.bogus.doc += ' (do the motions)'
+TRIG.config.doc += ' (perform configuration, not triggering)'
+TRIG.wake_eos.doc += ' (wake up on end-of-scan events)'
+
+CMDF = _Flag('command_flags', 'CMDF_')
+CMDF.priority.doc += (
+ ' (try to use a real-time interrupt while performing command)')
+
+EV = _Flag('??', 'COMEDI_EV_')
+
+TRIG_ROUND = _Enum('trigger_round', 'TRIG_ROUND_', blacklist=['mask'])
+TRIG_ROUND.mask = _comedi.TRIG_ROUND_MASK
+
+TRIG_SRC = _Flag('trigger_source_flags', 'TRIG_',
+ blacklist=[i.name for i in TRIG] + [
+ 'round_%s' % i.name for i in TRIG_ROUND] + [
+ 'round_mask', 'rt', 'write'])
+TRIG_SRC.none.doc += ' (never trigger)'
+TRIG_SRC.now.doc += ' (trigger now + N ns)'
+TRIG_SRC.follow.doc += ' (trigger on next lower level trig)'
+TRIG_SRC.time.doc += ' (trigger at time N ns)'
+TRIG_SRC.timer.doc += ' (trigger at rate N ns)'
+TRIG_SRC.count.doc += ' (trigger when count reaches N)'
+TRIG_SRC.ext.doc += ' (trigger on external signal N)'
+TRIG_SRC.int.doc += ' (trigger on comedi-internal signal N)'
+TRIG_SRC.other.doc += ' (driver defined)'
+
+SDF_PWM = _Flag('pulse_width_modulation_subdevice_flags', 'SDF_PWM_')
+SDF_PWM.counter.doc += ' (PWM can automatically switch off)'
+SDF_PWM.hbridge.doc += ' (PWM is signed (H-bridge))'
+
+SDF = _Flag('subdevice_flags', 'SDF_', blacklist=[
+ 'pwm_%s' % i.name for i in SDF_PWM] + [
+ 'cmd', 'writeable', 'rt'])
+SDF.busy.doc += ' (device is busy)'
+SDF.busy_owner.doc += ' (device is busy with your job)'
+SDF.locked.doc += ' (subdevice is locked)'
+SDF.lock_owner.doc += ' (you own lock)'
+SDF.maxdata.doc += ' (maxdata depends on channel)'
+SDF.flags.doc += ' (flags depend on channel)'
+SDF.rangetype.doc += ' (range type depends on channel)'
+SDF.soft_calibrated.doc += ' (subdevice uses software calibration)'
+SDF.cmd_write.doc += ' (can do output commands)'
+SDF.cmd_read.doc += ' (can to input commands)'
+SDF.readable.doc += ' (subdevice can be read, e.g. analog input)'
+SDF.writable.doc += ' (subdevice can be written, e.g. analog output)'
+SDF.internal.doc += ' (subdevice does not have externally visible lines)'
+SDF.ground.doc += ' (can do aref=ground)'
+SDF.common.doc += ' (can do aref=common)'
+SDF.diff.doc += ' (can do aref=diff)'
+SDF.other.doc += ' (can do aref=other)'
+SDF.dither.doc += ' (can do dithering)'
+SDF.deglitch.doc += ' (can do deglitching)'
+SDF.mmap.doc += ' (can do mmap())'
+SDF.running.doc += ' (subdevice is acquiring data)'
+SDF.lsampl.doc += ' (subdevice uses 32-bit samples)'
+SDF.packed.doc += ' (subdevice can do packed DIO)'
+
+SUBDEVICE_TYPE = _Enum('subdevice_type', 'COMEDI_SUBD_')
+SUBDEVICE_TYPE.unused.doc += ' (unused by driver)'
+SUBDEVICE_TYPE.ai.doc += ' (analog input)'
+SUBDEVICE_TYPE.ao.doc += ' (analog output)'
+SUBDEVICE_TYPE.di.doc += ' (digital input)'
+SUBDEVICE_TYPE.do.doc += ' (digital output)'
+SUBDEVICE_TYPE.dio.doc += ' (digital input/output)'
+SUBDEVICE_TYPE.memory.doc += ' (memory, EEPROM, DPRAM)'
+SUBDEVICE_TYPE.calib.doc += ' (calibration DACs)'
+SUBDEVICE_TYPE.proc.doc += ' (processor, DSP)'
+SUBDEVICE_TYPE.serial.doc += ' (serial IO)'
+SUBDEVICE_TYPE.pwm.doc += ' (pulse-with modulation)'
+
+IO_DIRECTION = _Enum('io_direction', 'COMEDI_', whitelist=[
+ 'input', 'output', 'opendrain'])
+
+SUPPORT_LEVEL = _Enum('support_level', 'COMEDI_', whitelist=[
+ 'unknown_support', 'supported', 'unsupported'])
+
+UNIT = _Enum('unit', 'UNIT_', translation={'mA':'mA'})
+
+CB = _Enum('callback_flags', 'COMEDI_CB_', blacklist=['block', 'eobuf'])
+CB.eos.doc += ' (end of scan)'
+CB.eoa.doc += ' (end of acquisition)'
+CB.error.doc += ' (card error during acquisition)'
+CB.overflow.doc += ' (buffer overflow/underflow)'
+++ /dev/null
-# Simultaneous, finite, buffered analog inpout/output using comedi drivers
-# Copyright (C) 2007-2010 W. Trevor King
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import comedi as c
-import common
-from numpy import array, fromstring, float32, pi, sin
-import int16_rw
-import time
-
-# imports for testing
-from time import sleep
-from scipy.stats import linregress
-from os import system
-
-#VERBOSE = True
-VERBOSE = False
-AO_TRIGGERS_OFF_AI_START = True
-#AO_TRIGGERS_OFF_AI_START = False
-
-
-# HACK! outputting last point can cause jumps to random positions
-# This is probably due to some clocking issue when we trigger AO off of
-# the AI Start signal.
-DONT_OUTPUT_LAST_SAMPLE_HACK = True
-
-class SimAioError (common.PycomediError) :
- "Simultaneous Analog IO error"
- pass
-
-_example_array = array([0], dtype=int16_rw.DATA_T) # for typing, since I don't know what type(array) should be
-
-_cmdtest_message = ["success",
- "invalid source",
- "source conflict",
- "invalid argument",
- "argument conflict",
- "invalid chanlist"]
-
-class cmd (object) :
- """Wrap a comedi command in more Pythonic trappings.
-
- Due to my limited needs, this class currently only supports
- software triggered runs (possibly with output triggering off the
- input trigger) for a finite number of output samples where all of
- the scan and sample timing is internal and as fast as possible.
-
- See http://www.comedi.org/doc/x621.html#COMEDICMDSTRUCTURE
- for more details/possibilities.
- """
- def __init__(self, IO) :
- """input:
- IO : an initialized common.PyComediIO object
- """
- self.IO = IO
- if self.IO.output == True :
- self.cmdTypeString = "output"
- else :
- self.cmdTypeString = "input"
- self.generate_rough_command()
- def generate_rough_command(self) :
- if VERBOSE :
- print "generate rough %s command" % self.cmdTypeString
- cmd = self.IO._comedi.comedi_cmd_struct()
- cmd.subdev = self.IO.subdev
- if self.IO.output :
- cmd.flags = self.IO._comedi.CMDF_WRITE
- else :
- cmd.flags = 0
- # decide how to trigger a multi-scan run
- if self.IO.output and AO_TRIGGERS_OFF_AI_START :
- cmd.start_src = self.IO._comedi.TRIG_EXT
- cmd.start_arg = 18 # AI_START1 internal AI start signal
- else :
- cmd.start_src = self.IO._comedi.TRIG_INT
- cmd.start_arg = 0
- # decide how to trigger a multi-channel scan
- cmd.scan_begin_src = self.IO._comedi.TRIG_TIMER
- cmd.scan_begin_arg = 1 # temporary value for now
- # decide how to trigger a single channel's aquisition
- if self.IO.output :
- cmd.convert_src = self.IO._comedi.TRIG_NOW # convert simultaneously (each output has it's own DAC)
- cmd.convert_arg = 0
- else :
- cmd.convert_src = self.IO._comedi.TRIG_TIMER # convert sequentially (all inputs share single ADC)
- cmd.convert_arg = 1 # time between channels in ns, 1 to convert ASAP
- # decide when a scan is complete
- cmd.scan_end_src = self.IO._comedi.TRIG_COUNT
- cmd.scan_end_arg = self.IO.nchan
- cmd.stop_src = self.IO._comedi.TRIG_COUNT
- cmd.stop_arg = 1 # temporary value for now
- cmd.chanlist = self.IO.cr_chan
- cmd.chanlist_len = self.IO.nchan
- self.cmd = cmd
- self.test_cmd(max_passes=3)
- def test_cmd(self, max_passes=1) :
- very_verbose = False
- i = 0
- rc = 0
- if very_verbose :
- print "Testing command:"
- print self
- while i < max_passes :
- rc = self.IO._comedi.comedi_command_test(self.IO.dev, self.cmd)
- if (rc == 0) :
- break
- if VERBOSE or very_verbose :
- print "test pass %d, %s" % (i, _cmdtest_message[rc])
- i += 1
- if (VERBOSE or very_verbose) and i < max_passes :
- print "Passing command:\n", self
- if i >= max_passes :
- print "Failing command (%d):\n" % rc, self
- raise SimAioError, "Invalid command: %s" % _cmdtest_message[rc]
- def execute(self) :
- if VERBOSE :
- print "Loading %s command" % self.cmdTypeString
- rc = self.IO._comedi.comedi_command(self.IO.dev, self.cmd)
- if rc < 0 :
- self.IO._comedi.comedi_perror("comedi_command")
- raise SimAioError, "Error executing %s command %d" % (self.cmdTypeString, rc)
- def _cmdsrc(self, source) :
- str = ""
- if source & c.TRIG_NONE : str += "none|"
- if source & c.TRIG_NOW : str += "now|"
- if source & c.TRIG_FOLLOW : str += "follow|"
- if source & c.TRIG_TIME : str += "time|"
- if source & c.TRIG_TIMER : str += "timer|"
- if source & c.TRIG_COUNT : str += "count|"
- if source & c.TRIG_EXT : str += "ext|"
- if source & c.TRIG_INT : str += "int|"
- if source & c.TRIG_OTHER : str += "other|"
- return str
- def __str__(self) :
- str = "Command on %s (%s):\n" % (self.IO, self.cmdTypeString)
- str += "subdevice: \t%d\n" % self.cmd.subdev
- str += "flags: \t0x%x\n" % self.cmd.flags
- str += "start: \t"
- str += self._cmdsrc(self.cmd.start_src)
- str += "\t%d\n" % self.cmd.start_arg
- str += "scan_begin:\t"
- str += self._cmdsrc(self.cmd.scan_begin_src)
- str += "\t%d\n" % self.cmd.scan_begin_arg
- str += "convert: \t"
- str += self._cmdsrc(self.cmd.convert_src)
- str += "\t%d\n" % self.cmd.convert_arg
- str += "scan_end: \t"
- str += self._cmdsrc(self.cmd.scan_end_src)
- str += "\t%d\n" % self.cmd.scan_end_arg
- str += "stop: \t"
- str += self._cmdsrc(self.cmd.stop_src)
- str += "\t%d" % self.cmd.stop_arg
- return str
-
-class AIO (object) :
- """Control a simultaneous analog input/output (AIO) device using
- Comedi drivers.
-
- The AIO device is modeled as being in one of the following states:
-
- Open Device file has been opened, various one-off setup
- tasks completed.
- Initialized Any previous activity is complete, ready for a new
- task
- Setup New task assigned.
- Armed The output task is "triggered" (see below)
- Read The input task is triggered, and input read in
- Closed
- Transitions between these states may be achieved with class methods
- open, __init__ - through Open to Initialized
- close Any to Closed
- setup Initialized to Setup
- arm Setup to Armed
- start_read Armed to Read
- reset Setup, Armed, or Read to Initialized
-
- There are two triggering methods set by the module global
- AO_TRIGGERS_OFF_AI_START
- When this global is true, the output and input will start on the
- exactly the same clock tick (in this case the output "trigger"
- when "Arming" just primes the output to start when the input start
- is signaled). However, this functionality at the moment depends
- on your having a National Instruments card with a DAQ-STC module
- controling the timing (e.g. E series) and a patched version of
- ni_mio_common.c in your Comedi kernel. If you do not have an
- appropriate card, you will either have to implement an appropriate
- method for your card, or set the global to false, in which case
- the IO synchronicity depends on the synchronicity of the AO and AI
- software triggers.
- """
- def __init__(self, filename="/dev/comedi0",
- in_subdevice=-1, in_chan=(0,), in_aref=(0,), in_range=(0,),
- out_subdevice=-1, out_chan=(0,), out_aref=(0,), out_range=(0,),
- buffsize=32768) :
- """inputs:
- filename: comedi device file for your device ("/dev/comedi0").
- And then for both input and output (in_* and out_*):
- subdevice: the analog output subdevice (-1 for autodetect)
- values include
- comedi.COMEDI_SUBD_DI
- comedi.COMEDI_SUBD_DO
- comedi.COMEDI_SUBD_DIO
- comedi.COMEDI_SUBD_AI
- comedi.COMEDI_SUBD_AO
- chan: an iterable of the channels you wish to control ((0,1,2,3))
- aref: the analog reference for these channels (comedi.AREF_GROUND)
- values include
- comedi.AREF_GROUND
- comedi.AREF_COMMON
- comedi.AREF_DIFF
- comedi.AREF_OTHER
- range: the range for these channels (0)
- """
- self._comedi = c
- self._filename = filename
- assert buffsize > 0
- assert buffsize % 2 == 0, "buffsize = %d is not even" % buffsize
- self.buffsize = buffsize
- # the next section is much like the open() method below,
- # but in this one we set up all the extra details associated
- # with the AO and AI structures
- self.dev = self._comedi.comedi_open(self._filename)
- self._fd = self._comedi.comedi_fileno(self.dev)
- if VERBOSE :
- print "Opened %s on fd %d" % (self._filename, self._fd)
- self.AI = common.PyComediIO(filename=self._filename, subdevice=in_subdevice, devtype=c.COMEDI_SUBD_AI, chan=in_chan, aref=in_aref, range=in_range, output=False, dev=self.dev)
- self.AO = common.PyComediIO(filename=self._filename, subdevice=out_subdevice, devtype=c.COMEDI_SUBD_AO, chan=out_chan, aref=out_aref, range=out_range, output=True, dev=self.dev)
- self.state = "Open"
- self._icmd = cmd(self.AI)
- self._ocmd = cmd(self.AO)
- self.state = "Initialized"
- def __del__(self) :
- self.close()
- def close(self) :
- if self.state != "Closed" :
- self.reset(force=True)
- rc = self._comedi.comedi_close(self.dev)
- if rc < 0 :
- self._comedi.comedi_perror("comedi_close")
- raise SimAioError, "Cannot close %s" % self._filename
- if VERBOSE :
- print "Closed %s on fd %d" % (self._filename, self._fd)
- self.AI.fakeClose()
- self.AO.fakeClose()
- self.dev = None
- self._fd = None
- self.state = "Closed"
- def open(self) :
- if self.state != "Closed" :
- raise SimAioError, "Invalid state %s" % self.state
- self.dev = self._comedi.comedi_open(self._filename)
- self._fd = self._comedi.comedi_fileno(self.dev)
- if VERBOSE :
- print "Opened %s on fd %d" % (self._filename, self._fd)
- self.AI.fakeOpen(self.dev)
- self.AO.fakeOpen(self.dev)
- self.state = "Open"
- self._icmd = cmd(self.AI)
- self._ocmd = cmd(self.AO)
- self.state = "Initialized"
- def genBuffer(self, nsamp, nchan=1, value=0) :
- return array([value]*nsamp*nchan, dtype=int16_rw.DATA_T)
- def setup(self, nsamps=None, freq=1.0, out_buffer=None) :
- if self.state != "Initialized" :
- raise SimAioError, "Invalid state %s" % self.state
- if out_buffer == None : # read-only command
- assert self.AO == None
- assert nsamps > 1
- if nsamps == None :
- assert len(out_buffer) % self.AO.nchan == 0
- nsamps = int(len(out_buffer) / self.AO.nchan)
- if type(out_buffer) != type(_example_array) :
- raise SimAioError, "out_buffer must be a numpy array, not a %s" % str(type(out_buffer))
- if DONT_OUTPUT_LAST_SAMPLE_HACK :
- for i in range(1, self.AO.nchan+1) : # i in [1, ... ,nchan]
- if out_buffer[-i] != out_buffer[-self.AO.nchan-i] :
- raise SimAioError, """To ensure that you are not suprised by the effects of the
-DONT_OUTPUT_LAST_SAMPLE_HACK flag, please ensure that the last two
-values samples output on each channel are the same."""
- onsamps = nsamps - 1
- else :
- onsamps = nsamps
- self._nsamps = nsamps
- self._ocmd.cmd.scan_begin_arg = int(1e9/freq)
- self._ocmd.cmd.stop_arg = onsamps
- if VERBOSE :
- print "Configure the board (%d ns per scan, %d samps, %g Hz)" % (self._icmd.cmd.scan_begin_arg, self._icmd.cmd.stop_arg, 1e9/self._ocmd.cmd.scan_begin_arg)
- self._obuffer = out_buffer
- self._onremain = nsamps
- self._ocmd.test_cmd(max_passes=2)
- self._ocmd.execute()
- self._icmd.cmd.scan_begin_arg = self._ocmd.cmd.scan_begin_arg
- self._icmd.cmd.stop_arg = nsamps
- self._icmd.test_cmd()
- self._inremain = nsamps
- self._icmd.execute()
- nsamps = min(self._onremain, self.buffsize)
- offset = 0
- if VERBOSE :
- print "Write %d output samples to the card" % (nsamps*self.AO.nchan)
- rc = int16_rw.write_samples(self._fd, out_buffer, offset, nsamps*self.AO.nchan, 1)
- if rc != nsamps*self.AO.nchan :
- raise SimAioError, "Error %d writing output buffer\n" % rc
- self._onremain -= nsamps
- self.state = "Setup"
- def arm(self) :
- if self.state != "Setup" :
- raise SimAioError, "Invalid state %s" % self.state
- if VERBOSE :
- print "Arm the analog ouptut"
- self._comedi_internal_trigger(self.AO.subdev)
- self.state = "Armed"
- def start_read(self, in_buffer) :
- if self.state != "Armed" :
- raise SimAioError, "Invalid state %s" % self.state
- if len(in_buffer) < self._nsamps * self.AI.nchan :
- raise SimAioError, "in_buffer not long enough (size %d < required %d)" \
- % (len(in_buffer), self._nsamps * self.AI.nchan)
-
- if VERBOSE :
- print "Start the run"
- self._comedi_internal_trigger(self.AI.subdev)
- self.state = "Read"
- while self._inremain > 0 :
- # read half a buffer
- nsamps = min(self._inremain, self.buffsize/2)
- offset = (self._nsamps - self._inremain) * self.AI.nchan
- if VERBOSE :
- print "Read %d input samples from the card" % (nsamps*self.i_nchan)
- rc = int16_rw.read_samples(self._fd, in_buffer, offset, nsamps*self.AI.nchan, 20)
- if rc != nsamps*self.AI.nchan :
- raise SimAioError, "Error %d reading input buffer\n" % rc
- self._inremain -= nsamps
- # write half a buffer
- nsamps = min(self._onremain, self.buffsize/2)
- if nsamps > 0 :
- offset = (self._nsamps - self._onremain) * self.AO.nchan
- if VERBOSE :
- print "Write %d output samples to the card" % (nsamps*self.AO.nchan)
- rc = int16_rw.write_samples(self._fd, self._obuffer, offset, nsamps*self.AO.nchan, 20)
- if rc != nsamps*self.AO.nchan :
- raise SimAioError, "Error %d writing output buffer\n" % rc
- self._onremain -= nsamps
- def _comedi_internal_trigger(self, subdevice) :
- data = self._comedi.chanlist(1) # by luck, data is an array of lsampl_t (unsigned ints), as is chanlist
- insn = self._comedi.comedi_insn_struct()
- insn.insn = self._comedi.INSN_INTTRIG
- insn.subdev = subdevice
- insn.data = data
- insn.n = 1
- data[0] = 0
- rc = self._comedi.comedi_do_insn(self.dev, insn)
- def reset(self, force=False) :
- if VERBOSE :
- print "Reset the analog subdevices"
- # clean up after the read
- self.AO.updateFlags()
- self.AI.updateFlags()
- # I would expect self.AO.flags.busy() to be False by this point,
- # but after a write that does not seem to be the case.
- # It doesn't seem to cause any harm to cancel things anyway...
- rc = self._comedi.comedi_cancel(self.dev, self.AO.subdev)
- if rc < 0 :
- self._comedi.comedi_perror("comedi_cancel")
- raise SimAioError, "Error cleaning up output command"
- rc = self._comedi.comedi_cancel(self.dev, self.AI.subdev)
- if rc < 0 :
- self._comedi.comedi_perror("comedi_cancel")
- raise SimAioError, "Error cleaning up input command"
- self.state = "Initialized"
-
-
-# define the test suite
-# verbose
-# 0 - no output
-# 1 - print test names
-# 2 - print test results
-# 3 - print some details
-# 4 - print lots of details
-
-def _test_AIO(aio=None, start_wait=0, verbose=0) :
- if verbose >= 1 :
- print "_test_AIO(start_wait = %g)" % start_wait
- nsamps = 20
- out_data = aio.genBuffer(nsamps)
- in_data = aio.genBuffer(nsamps)
- midpoint = int(aio.AO.maxdata[0]/2)
- bitrange = float(midpoint/2)
- for i in range(nsamps) :
- out_data[i] = int(midpoint+bitrange*sin(2*pi*i/float(nsamps)))
- out_data[-2] = out_data[-1] = midpoint
- aio.setup(freq=1000, out_buffer=out_data)
- aio.arm()
- sleep(start_wait)
- aio.start_read(in_data)
- aio.reset()
- if verbose >= 4 :
- print "out_data:\n", out_data
- print "in_data:\n", in_data
- print "residual:\n[",
- for i, o in zip(in_data, out_data) :
- print int(i)-int(o),
- print "]"
- return (out_data, in_data)
-
-def _repeat_aio_test(aio=None, num_tests=100, start_wait=0, verbose=0) :
- if verbose >= 1 :
- print "_repeat_aio_test()"
- grads = array([0]*num_tests, dtype=float32) # test input with `wrong' type
- good = 0
- bad = 0
- good_run = 0
- good_run_arr = []
- for i in range(num_tests) :
- out_data, in_data = _test_AIO(aio, start_wait)
- gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
- grads[i] = gradient
- if verbose >= 4 :
- print "wait %2d, run %2d, gradient %g" % (start_wait, i, gradient)
- if gradient < .7 :
- bad += 1
- good_run_arr.append(good_run)
- good_run = 0
- else :
- good += 1
- good_run += 1
- good_run_arr.append(good_run)
- fail_rate = (float(bad)/float(good+bad))*100.0
- if verbose >= 2 :
- print "failure rate %g%% in %d runs" % (fail_rate, num_tests)
- call = 'echo "'
- for num in good_run_arr :
- call += "%d " % num
- call += '" | stem_leaf 2'
- print "good run stem and leaf:"
- system(call)
- return fail_rate
-
-def _test_AIO_multi_chan(aio=None, start_wait=0, verbose=0) :
- from sys import stdout
- if verbose >= 1 :
- print "_test_AIO_multi_chan(start_wait = %g)" % start_wait
- nsamps = 100
- out_data = aio.genBuffer(nsamps, aio.AO.nchan)
- in_data = aio.genBuffer(nsamps, aio.AI.nchan)
- # set up interleaved data
- midpoint = int(aio.AO.maxdata[0]/2)
- bitrange = float(midpoint/2)
- for i in range(nsamps) :
- out_data[i*aio.AO.nchan] = int(midpoint+bitrange*sin(2*pi*i/float(nsamps)))
- for j in range(1, aio.AO.nchan) :
- out_data[i*aio.AO.nchan + j] = 0
- if DONT_OUTPUT_LAST_SAMPLE_HACK :
- for ind in [-1,-1-aio.AO.nchan] :
- for chan in range(aio.AO.nchan) :
- out_data[ind-chan] = midpoint
- aio.setup(freq=1000, out_buffer=out_data)
- aio.arm()
- sleep(start_wait)
- aio.start_read(in_data)
- aio.reset()
- #fid = file('/tmp/comedi_test.o', 'w')
- fid = stdout
- if verbose >= 4 :
- print >> fid, "#",
- for j in range(aio.AO.nchan) :
- print >> fid, "%s\t" % aio.AO.chan[j],
- for j in range(aio.AI.nchan) :
- print >> fid, "%s\t" % aio.AI.chan[j],
- print ""
- for i in range(nsamps) :
- for j in range(aio.AO.nchan) :
- print >> fid, "%s\t" % out_data[i*aio.AO.nchan+j],
- for j in range(aio.AI.nchan) :
- print >> fid, "%s\t" % in_data[i*aio.AI.nchan+j],
- print >> fid, ""
- return (out_data, in_data)
-
-def _test_big_bufs(aio=None, freq=100e3, verbose=False) :
- if verbose >= 1 :
- print "_test_big_bufs()"
- if aio == None :
- our_aio = True
- aio = AIO(in_chan=(1,), out_chan=(0,))
- else :
- our_aio = False
- nsamps = int(100e3)
- out_data = aio.genBuffer(nsamps, aio.AO.nchan)
- midpoint = int(aio.AO.maxdata[0]/2)
- bitrange = float(midpoint/2)
- for i in range(nsamps) :
- out_data[i] = int(sin(2*pi*i/float(nsamps))*bitrange) + midpoint
- if DONT_OUTPUT_LAST_SAMPLE_HACK :
- out_data[-2] = out_data[-1] = midpoint
- in_data = aio.genBuffer(nsamps, aio.AO.nchan)
- aio.setup(freq=freq, out_buffer=out_data)
- aio.arm()
- aio.start_read(in_data)
- aio.reset()
- if our_aio :
- aio.close()
- del(aio)
-
-def _test_output_persistence(freq=100, verbose=False) :
- import single_aio
- if verbose >= 1 :
- print "_test_output_persistence()"
- aio = AIO(in_chan=(0,), out_chan=(0,))
- aio.close()
- ai = single_aio.AI(chan=(0,))
- ai.close()
-
- def simult_set_voltage(aio, range_fraction=None, physical_value=None, freq=freq, verbose=False) :
- "use either range_fraction or physical_value, not both."
- aio.open()
- if range_fraction != None :
- assert physical_value == None
- assert range_fraction >= 0 and range_fraction <= 1
- out_val = int(range_fraction*aio.AO.maxdata[0])
- physical_value = aio.AO.comedi_to_phys(chan_index=0, comedi=out_val, useNAN=False)
- else :
- assert physical_value != None
- out_val = aio.AO.phys_to_comedi(chan_index=0, phys=physical_value)
- if verbose >= 3 :
- print "Output : %d = %g V" % (out_val, physical_value)
- nsamps = int(int(freq))
- out_data = aio.genBuffer(nsamps, aio.AO.nchan, value=out_val)
- in_data = aio.genBuffer(nsamps, aio.AO.nchan)
- aio.setup(freq=freq, out_buffer=out_data)
- aio.arm()
- aio.start_read(in_data)
- aio.reset()
- # by this point the output returns to 0 V when
- # DONT_OUTPUT_LATH_SAMPLE_HACK == False
- time.sleep(5)
- aio.close()
- if verbose >= 4 :
- print "Output complete"
- print in_data
- return physical_value
- def single_get_voltage(ai, verbose=False) :
- ai.open()
- in_val = ai.read()[0]
- ai.close()
- in_phys = ai.comedi_to_phys(chan_index=0, comedi=in_val, useNAN=False)
- if verbose >= 3 :
- print "Input : %d = %g V" % (in_val, in_phys)
- return in_phys
- def tp(aio, ai, range_fraction, verbose=False) :
- out_phys = simult_set_voltage(aio, range_fraction=range_fraction, verbose=verbose)
-
- # It helps me to play a sound so I know where the test is
- # while confirming the output on an oscilliscope.
- #system("aplay /home/wking/Music/system/sonar.wav")
-
- time.sleep(1)
- in_phys = single_get_voltage(ai, verbose)
- assert abs((in_phys-out_phys)/out_phys) < 0.1, "Output %g V, but input %g V" % (out_phys, in_phys)
-
- tp(aio,ai,0,verbose)
- tp(aio,ai,1,verbose)
- simult_set_voltage(aio,physical_value=0.0,verbose=verbose)
-
-def test(verbose=2) :
- aio = AIO(in_chan=(0,), out_chan=(0,))
- _test_AIO(aio, start_wait = 0, verbose=verbose)
- _test_AIO(aio, start_wait = 0.5, verbose=verbose)
- aio.close()
- aio.open()
- _repeat_aio_test(aio, num_tests=100, start_wait=0, verbose=verbose)
- _test_big_bufs(aio, verbose=verbose)
- aio.close()
-
- aiom = AIO(in_chan=(0,1,2,3), out_chan=(0,1))
- _test_AIO_multi_chan(aiom, start_wait = 0, verbose=verbose)
- del(aiom)
- _test_output_persistence(verbose=verbose)
-
-if __name__ == "__main__" :
- test()
+++ /dev/null
-# Use Comedi drivers for single-shot analog input/output
-# Copyright (C) 2007-2010 W. Trevor King
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""Use Comedi drivers for single-shot analog input/output
-
-Being single-shot implementations, read/writes will be software timed,
-so this module would not be a good choice if you need millisecond
-resolution. However, it does provide a simple and robust way to
-generate/aquire signals at 1 second and greater timescales.
-"""
-
-import comedi as c
-import common
-
-VERBOSE_DEBUG = False
-
-class SngAioError (common.PycomediError) :
- "Single point Analog IO error"
- pass
-
-class AI (common.PyComediSingleIO) :
- def __init__(self, **kwargs) :
- """inputs:
- filename: comedi device file for your device ["/dev/comedi0"].
- subdevice: the analog input subdevice (-1 for autodetect)
- chan: an iterable of the channels you wish to control [(0,1,2,3)]
- aref: the analog references for these channels [(comedi.AREF_GROUND,)]
- values include:
- comedi.AREF_GROUND
- comedi.AREF_COMMON
- comedi.AREF_DIFF
- comedi.AREF_OTHER
- range: the ranges for these channels [(0,)]
- """
- common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AI, output=False, **kwargs)
- def read(self) :
- """outputs:
- data: a list of read data values in Comedi units
- """
- data = range(self.nchan)
- for i in range(self.nchan) :
- data[i] = self.read_chan_index(i)
- #print "Read %s, got %s" % (str(self.chan), str(data))
- return data
-
-def _test_AI() :
- ai = AI()
- print "read ", ai.read()
- print "read ", ai.read()
- print "read ", ai.read()
- print "read ", ai.read()
- ai.close()
- print "ai success"
-
-class AO (common.PyComediSingleIO) :
- def __init__(self, **kwargs) :
- """inputs:
- filename: comedi device file for your device ["/dev/comedi0"].
- subdevice: the analog output subdevice [-1 for autodetect]
- chan: an iterable of the channels you wish to control [(0,1,2,3)]
- aref: the analog references for these channels [(comedi.AREF_GROUND,)]
- values include:
- comedi.AREF_GROUND
- comedi.AREF_COMMON
- comedi.AREF_DIFF
- comedi.AREF_OTHER
- range: the ranges for these channels [(0,)]
- """
- common.PyComediIO.__init__(self, devtype=c.COMEDI_SUBD_AO, output=True, **kwargs)
- def write(self, data) :
- if len(data) != self.nchan :
- raise SngAioError, "data length %d != the number of channels (%d)" % (len(data), self.nchan)
- for i in range(self.nchan) :
- self.write_chan_index(i, data[i])
-
-def _test_AO() :
- ao = AO(chan=(0,1))
- ao.write([0,0])
- ao.write([3000,3000])
- ao.write([0,0])
- ao.close()
- print "ao success"
-
-def _fit_with_residual(out_data, in_data, channel) :
- "Fit in_data(out_data) to a straight line & return residual"
- from scipy.stats import linregress
- from numpy import zeros
- gradient, intercept, r_value, p_value, std_err = linregress(out_data, in_data)
- print "y = %g + %g x" % (intercept, gradient)
- print "r = ", r_value # correlation coefficient = covariance / (std_dev_x*std_dev_y)
- print "p = ", p_value # probablility of measuring this ?slope? for non-correlated, normally-distruibuted data
- print "err = ", std_err # root mean sqared error of best fit
- if gradient < .7 or p_value > 0.05 :
- raise SngAioError, "Out channel %d != in channel %d" % (channel, channel)
- residual = zeros((len(out_data),))
- for i in range(len(out_data)) :
- pred_y = intercept + gradient * out_data[i]
- residual[i] = in_data[i] - pred_y
- return residual
-
-def plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1) :
- try :
- from pylab import plot, show, subplot, xlabel, ylabel
- subplot(311)
- plot(out_data0, in_data0, 'r.-', out_data1, in_data1, 'b.')
- ylabel("Read")
- xlabel("Wrote")
- if residual0 != None and residual1 != None:
- subplot(312)
- plot(out_data0, residual0, 'r.', out_data1, residual1, 'b.')
- ylabel("Residual")
- xlabel("Wrote")
- subplot(313)
- plot(in_data0, 'r.', in_data1, 'b.')
- xlabel("Read")
- show() # if interactive mode is off...
- #raw_input("Press enter to continue") # otherwise, pause
- except ImportError :
- pass # ignore plot erros
-
-def _test_AIO() :
- "Test AO and AI by cabling AO0 into AI0 and sweeping voltage"
- from scipy.stats import linregress
- from numpy import linspace, zeros
- ao = AO(chan=(0,1))
- ai = AI(chan=(0,1))
- start = 0.1 * ao.maxdata[0]
- stop = 0.9 * ao.maxdata[0]
- points = 10
- out_data0 = linspace(start, stop, points)
- out_data1 = linspace(stop, start, points)
- in_data0 = zeros((points,))
- in_data1 = zeros((points,))
- for i in range(points) :
- ao.write([int(out_data0[i]), int(out_data1[i])])
- id = ai.read()
- in_data0[i] = id[0]
- in_data1[i] = id[1]
- ai.close()
- ao.close()
- if VERBOSE_DEBUG :
- plot_data(out_data0, in_data0, None, out_data1, in_data1, None)
- residual0 = _fit_with_residual(out_data0, in_data0, 0)
- residual1 = _fit_with_residual(out_data1, in_data1, 1)
- if VERBOSE_DEBUG :
- plot_data(out_data0, in_data0, residual0, out_data1, in_data1, residual1)
- for i in range(points) :
- if abs(residual0[i]) > 10 : # HACK, hardcoded maximum nonlinearity
- raise Exception, "Input 0, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data0[i], in_data0[i], residual0[i])
- if abs(residual1[i]) > 10 : # HACK, hardcoded maximum nonlinearity
- raise Exception, "Input 1, point %d (x %d), y-value %d has excessive residual %d" % (i, out_data1[i], in_data1[i], residual1[i])
- print "aio success"
-
-def test() :
- _test_AI()
- _test_AO()
- _test_AIO()
-
-if __name__ == "__main__" :
- test()
+++ /dev/null
-# Use Comedi drivers for single-shot digital input/output
-# Copyright (C) 2007-2010 W. Trevor King
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""Use Comedi drivers for single-shot digital input/output
-
-Being single-shot implementations, read/writes will be software timed,
-so this module would not be a good choice if you need millisecond
-resolution. However, it does provide a simple and robust way to
-generate/aquire signals at 1 second and greater timescales.
-"""
-
-import comedi as c
-import common
-
-
-class SngDioError (common.PycomediError):
- "Digital IO error"
- pass
-
-class DIO_port (common.PyComediSingleIO) :
- def __init__(self, output=True, **kwargs) :
- """inputs:
- filename: comedi device file for your device ["/dev/comedi0"].
- subdevice: the digital IO subdevice [-1 for autodetect]
- chan: an iterable of the channels you wish to control [(0,1,2,3)]
- aref: the analog references for these channels [(comedi.AREF_GROUND,)]
- range: the ranges for these channels [(0,)]
- output: whether to use the lines as output (vs input) (True)
- """
- common.PyComediSingleIO.__init__(self, devtype=c.COMEDI_SUBD_DIO, output=output, **kwargs)
- if self.output :
- self.set_to_output()
- else :
- self.set_to_input()
- def set_to_output(self) :
- "switch all the channels associated with this object to be outputs"
- for chan in self.chan :
- rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_OUTPUT)
- if rc != 1 : # yes, comedi_dio_config returns 1 on success, -1 on failure, as of comedilib-0.8.1
- self._comedi.comedi_perror("comedi_dio_config")
- raise SngDioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_OUTPUT, rc)
- self.output = True
- def set_to_input(self) :
- "switch all the channels associated with this object to be inputs"
- for chan in self.chan :
- rc = c.comedi_dio_config(self.dev, self.subdev, chan, c.COMEDI_INPUT)
- if rc != 1 :
- self._comedi.comedi_perror("comedi_dio_config")
- raise SngDioError, 'comedi_dio_config("%s", %d, %d, %d) returned %d' % (self.filename, self.subdev, chan, c.COMEDI_INPUT, rc)
- self.output = False
- def write_port(self, data) :
- """inputs:
- data: decimal number representing data to write
- For example, setting data=6 will write
- 0 to chan[0]
- 1 to chan[1]
- 1 to chan[2]
- 0 to higher channels...
- """
- for i in range(self.nchan) :
- self.write_chan_index(i, (data >> i) % 2)
- def read_port(self) :
- """outputs:
- data: decimal number representing data read
- For example, data=6 represents
- 0 on chan[0]
- 1 on chan[1]
- 1 on chan[2]
- 0 on higher channels...
- """
- data = 0
- for i in range(self.nchan) :
- data += self.read_chan_index(i) << i
- return data
-
-class DO_port (DIO_port) :
- "A simple wrapper on dio_obj to make writing easier"
- def __call__(self, data) :
- self.write_port(data)
-
-def _test_DIO_port() :
- d = DIO_port()
- d.set_to_output()
- d.write_chan_index(0, 1)
- d.write_chan_index(0, 0)
- d.write_port(7)
- d.set_to_input()
- data = d.read_chan_index(0)
- print "channel %d is %d" % (d.chan[0], data)
- data = d.read_port()
- print "port value is %d" % data
- print "dio_obj success"
- d.close()
-
-def _test_DO_port() :
- p = DO_port()
- for data in [0, 1, 2, 3, 4, 5, 6, 7] :
- p(data)
- p.close()
- print "write_dig_port success"
-
-def test() :
- _test_DIO_port()
- _test_DO_port()
-
-if __name__ == "__main__" :
- test()
--- /dev/null
+# Copyright (C) 2010 W. Trevor King
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"Useful utility functions and classes."
+
+import array as _array
+import mmap as _mmap
+import os as _os
+import threading as _threading
+import time as _time
+
+import comedi as _comedi
+import numpy as _numpy
+
+from . import LOG as _LOG
+from . import classes as _classes
+from . import constants as _constants
+
+
+# types from comedi.h
+sampl = _numpy.uint16
+lsampl = _numpy.uint32
+sampl_typecode = 'H'
+lsampl_typecode = 'L'
+
+def subdevice_dtype(subdevice):
+ "Return the appropriate `numpy.dtype` based on subdevice flags"
+ if subdevice.get_flags().lsampl:
+ return lsampl
+ return sampl
+
+def subdevice_typecode(subdevice):
+ "Return the appropriate `array` type based on subdevice flags"
+ if subdevice.get_flags().lsampl:
+ return lsampl_typecode
+ return sampl_typecode
+
+def sampl_array(list):
+ "Convert a Python list/tuple into a `sampl` array"
+ ret = _comedi.sampl_array(len(list))
+ for i,x in enumerate(list):
+ ret[i] = x
+ return ret
+ #return _array.array(sampl_typecode, list)
+
+def lsampl_array(list):
+ "Convert a Python list/tuple into an `lsampl` array"
+ ret = _comedi.lsampl_array(len(list))
+ for i,x in enumerate(list):
+ ret[i] = x
+ return ret
+ #return _array.array(lsampl_typecode, list)
+
+def set_insn_chanspec(insn, channels):
+ "Configure `insn.chanspec` from the list `channels`"
+ chanlist = _classes.Chanlist(channels)
+ insn.chanspec = chanlist.chanlist()
+
+def set_insn_data(insn, data):
+ "Configure `insn.data` and `insn.n` from the iterable `data`"
+ insn.n = len(data)
+ insn.data = lsampl_array(data)
+
+def set_cmd_chanlist(cmd, channels):
+ "Configure `cmd.chanlist` and `cmd.chanlist_len` from the list `channels`"
+ chanlist = _classes.Chanlist(channels)
+ cmd.chanlist = chanlist.chanlist()
+ cmd.chanlist_len = len(chanlist)
+
+def set_cmd_data(cmd, data):
+ "Configure `cmd.data` and `cmd.data_len` from the iterable `data`"
+ cmd.data = sampl_array(data)
+ cmd.data_len = len(data)
+
+def inttrig_insn(subdevice):
+ """Setup an internal trigger for a given `subdevice`
+
+ From the Comedi docs `section 4.4`_ (Instruction for internal
+ triggering):
+
+ This special instruction has `INSN_INTTRIG` as the insn flag in
+ its instruction data structure. Its execution causes an internal
+ triggering event. This event can, for example, cause the device
+ driver to start a conversion, or to stop an ongoing
+ acquisition. The exact meaning of the triggering depends on the
+ card and its particular driver.
+
+ The `data[0]` field of the `INSN_INTTRIG` instruction is
+ reserved for future use, and should be set to `0`.
+
+ From the comedi source (`comedi.comedi_fops.c:parse_insn()`), we
+ see that the `chanspec` attribute is ignored for `INSN_INTTRIG`,
+ so we don't bother setting it here.
+
+ .. _section 4.4: http://www.comedi.org/doc/x621.html
+ """
+ insn = subdevice.insn()
+ insn.insn = _constants.INSN.inttrig.value
+ set_insn_data(insn, [0])
+ return insn
+
+
+def _builtin_array(array):
+ "`array` is an array from the builtin :mod:`array` module"
+ return isinstance(array, _array.array)
+
+
+class _ReadWriteThread (_threading.Thread):
+ "Base class for all reader/writer threads"
+ def __init__(self, subdevice, buffer, name=None):
+ if name == None:
+ name = '<%s subdevice %d>' % (
+ self.__class__.__name__, subdevice._index)
+ self.subdevice = subdevice
+ self.buffer = buffer
+ super(_ReadWriteThread, self).__init__(name=name)
+
+ def _file(self):
+ """File for reading/writing data to `.subdevice`
+
+ This file may use the internal comedi fileno, so do not close
+ it when you are finished. The file will eventually be closed
+ when the backing `Device` instance is closed.
+ """
+ return self.subdevice._device.file
+
+
+class Reader (_ReadWriteThread):
+ """`read()`-based reader
+
+ Examples
+ --------
+
+ Setup a temporary data file for testing.
+
+ >>> from os import close, remove
+ >>> from tempfile import mkstemp
+ >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-')
+ >>> f = _os.fdopen(fd, 'r+')
+ >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16)
+ >>> buf.tofile(t)
+
+ Override the default `Reader` methods for our dummy subdevice.
+
+ >>> class TestReader (Reader):
+ ... def _file(self):
+ ... return f
+
+ Run the test reader.
+
+ >>> rbuf = 0*buf
+ >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest')
+ >>> r.start()
+ >>> r.join()
+
+ The input buffer is updated in place, and is also available as the
+ reader's `buffer` attribute.
+
+ >>> rbuf
+ array([[ 0, 10],
+ [ 1, 11],
+ [ 2, 12]], dtype=uint16)
+ >>> r.buffer
+ array([[ 0, 10],
+ [ 1, 11],
+ [ 2, 12]], dtype=uint16)
+
+ While `numpy` arrays make multi-channel indexing easy, they do
+ require an external library. For single-channel input, the
+ `array` module is sufficient.
+
+ >>> f.seek(0)
+ >>> rbuf = _array.array('H', [0]*buf.size)
+ >>> r = TestReader(subdevice=None, buffer=rbuf, name='Reader-doctest')
+ >>> r.start()
+ >>> r.join()
+ >>> rbuf
+ array('H', [0, 10, 1, 11, 2, 12])
+ >>> r.buffer
+ array('H', [0, 10, 1, 11, 2, 12])
+
+ Cleanup the temporary data file.
+
+ >>> f.close() # no need for `close(fd)`
+ >>> remove(t)
+ """
+ def run(self):
+ builtin_array = _builtin_array(self.buffer)
+ f = self._file()
+ if builtin_array:
+ # TODO: read into already allocated memory (somehow)
+ size = len(self.buffer)
+ a = _array.array(self.buffer.typecode)
+ a.fromfile(f, size)
+ self.buffer[:] = a
+ else: # numpy.ndarray
+ # TODO: read into already allocated memory (somehow)
+ buf = _numpy.fromfile(
+ f, dtype=self.buffer.dtype, count=self.buffer.size)
+ a = _numpy.ndarray(
+ shape=self.buffer.shape, dtype=self.buffer.dtype,
+ buffer=buf)
+ self.buffer[:] = a
+
+
+class Writer (_ReadWriteThread):
+ """`write()`-based writer
+
+ Examples
+ --------
+
+ Setup a temporary data file for testing.
+
+ >>> from os import close, remove
+ >>> from tempfile import mkstemp
+ >>> fd,t = mkstemp(suffix='.dat', prefix='pycomedi-')
+ >>> f = _os.fdopen(fd, 'r+')
+ >>> buf = _numpy.array([[0,10],[1,11],[2,12]], dtype=_numpy.uint16)
+
+ Override the default `Writer` methods for our dummy subdevice.
+
+ >>> class TestWriter (Writer):
+ ... def _file(self):
+ ... return f
+
+ Run the test writer.
+
+ >>> r = TestWriter(subdevice=None, buffer=buf, name='Writer-doctest')
+ >>> r.start()
+ >>> r.join()
+ >>> a = _array.array('H')
+ >>> a.fromfile(open(t, 'rb'), buf.size)
+ >>> a
+ array('H', [0, 10, 1, 11, 2, 12])
+
+ While `numpy` arrays make multi-channel indexing easy, they do
+ require an external library. For single-channel input, the
+ `array` module is sufficient.
+
+ >>> f.seek(0)
+ >>> buf = _array.array('H', [2*x for x in buf.flat])
+ >>> r = TestWriter(subdevice=None, buffer=buf, name='Writer-doctest')
+ >>> r.start()
+ >>> r.join()
+ >>> a = _array.array('H')
+ >>> a.fromfile(open(t, 'rb'), len(buf))
+ >>> a
+ array('H', [0, 20, 2, 22, 4, 24])
+
+ Cleanup the temporary data file.
+
+ >>> f.close() # no need for `close(fd)`
+ >>> remove(t)
+ """
+ def run(self):
+ f = self._file()
+ self.buffer.tofile(f)
+ f.flush()
+
+
+class _MMapReadWriteThread (_ReadWriteThread):
+ "`mmap()`-based reader/wrtier"
+ def __init__(self, *args, **kwargs):
+ super(_MMapReadWriteThread, self).__init__(*args, **kwargs)
+ self._prot = None
+
+ def _sleep_time(self, mmap_size):
+ "Expected seconds needed to write a tenth of the mmap buffer"
+ return 0
+
+ def run(self):
+ # all sizes measured in bytes
+ builtin_array = _builtin_array(self.buffer)
+ mmap_size = self._mmap_size()
+ sleep_time = self._sleep_time(mmap_size)
+ mmap = _mmap.mmap(
+ self._fileno(), mmap_size, self._prot, _mmap.MAP_SHARED)
+ mmap_offset = 0
+ buffer_offset = 0
+ if builtin_array:
+ remaining = len(self.buffer)
+ else: # numpy.ndarray
+ remaining = self.buffer.size
+ remaining *= self.buffer.itemsize
+ action = self._initial_action(
+ mmap, buffer_offset, remaining,
+ mmap_size, action_bytes=mmap_size)
+ buffer_offset += action
+ remaining -= action
+
+ while remaining > 0:
+ action_bytes = self._action_bytes()
+ if action_bytes > 0:
+ action,mmap_offset = self._act(
+ mmap, mmap_offset, buffer_offset, remaining,
+ mmap_size, action_bytes=action_bytes,
+ builtin_array=builtin_array)
+ buffer_offset += action
+ remaining -= action
+ else:
+ _time.sleep(sleep_time)
+
+ def _act(self, mmap, mmap_offset, buffer_offset, remaining, mmap_size,
+ action_bytes=None, builtin_array=None):
+ if action_bytes == None:
+ action_bytes = self.subdevice.get_buffer_contents()
+ if mmap_offset + action_bytes >= mmap_size - 1:
+ action_bytes = mmap_size - mmap_offset
+ wrap = True
+ else:
+ wrap = False
+ action_size = min(action_bytes, remaining)
+ self._mmap_action(mmap, buffer_offset, action_size, builtin_array)
+ mmap.flush() # (offset, size), necessary? calls msync?
+ self._mark_action(action_size)
+ if wrap:
+ mmap.seek(0)
+ mmap_offset = 0
+ return action_size, mmap_offset
+
+ # pull out subdevice calls for easier testing
+
+ def _mmap_size(self):
+ return self.subdevice.get_buffer_size()
+
+ def _fileno(self):
+ return self.subdevice._device.fileno()
+
+ def _action_bytes(self):
+ return self.subdevice.get_buffer_contents()
+
+ # hooks for subclasses
+
+ def _initial_action(self, mmap, buffer_offset, remaining, mmap_size,
+ action_bytes=None):
+ return 0
+
+ def _mmap_action(self, mmap, offset, size):
+ raise NotImplementedError()
+
+ def _mark_action(self, size):
+ raise NotImplementedError()
+
+
+# MMap classes have more subdevice-based methods to override
+_mmap_docstring_overrides = '\n ... '.join([
+ 'def _mmap_size(self):',
+ ' from os.path import getsize',
+ ' return getsize(t)',
+ 'def _fileno(self):',
+ ' return fd',
+ 'def _action_bytes(self):',
+ ' return 4',
+ 'def _mark_action(self, size):',
+ ' pass',
+ 'def _file',
+ ])
+
+
+class MMapReader (_MMapReadWriteThread):
+ __doc__ = Reader.__doc__
+ for _from,_to in [
+ # convert class and function names
+ ('`read()`', '`mmap()`'),
+ ('Writer', 'MMapWriter'),
+ ('def _file', _mmap_docstring_overrides)]:
+ __doc__ = __doc__.replace(_from, _to)
+
+ def __init__(self, *args, **kwargs):
+ super(MMapReader, self).__init__(*args, **kwargs)
+ self._prot = _mmap.PROT_READ
+
+ def _mmap_action(self, mmap, offset, size, builtin_array):
+ offset /= self.buffer.itemsize
+ s = size / self.buffer.itemsize
+ if builtin_array:
+ # TODO: read into already allocated memory (somehow)
+ a = _array.array(self.buffer.typecode)
+ a.fromstring(mmap.read(size))
+ self.buffer[offset:offset+s] = a
+ else: # numpy.ndarray
+ # TODO: read into already allocated memory (somehow)
+ a = _numpy.fromstring(mmap.read(size), dtype=self.buffer.dtype)
+ self.buffer.flat[offset:offset+s] = a
+
+ def _mark_action(self, size):
+ self.subdevice.mark_buffer_read(size)
+
+
+class MMapWriter (_MMapReadWriteThread):
+ __doc__ = Writer.__doc__
+ for _from,_to in [
+ ('`write()`', '`mmap()`'),
+ ('Reader', 'MMapReader'),
+ ('def _file', _mmap_docstring_overrides)]:
+ __doc__ = __doc__.replace(_from, _to)
+
+ def __init__(self, *args, **kwargs):
+ super(MMapWriter, self).__init__(*args, **kwargs)
+ self._prot = _mmap.PROT_WRITE
+ self.buffer = buffer(self.buffer)
+
+ def _mmap_action(self, mmap, offset, size, builtin_array):
+ mmap.write(self.buffer[offset:offset+size])
+
+ def _mark_action(self, size):
+ self.subdevice.mark_buffer_written(size)
+
+
+del _mmap_docstring_overrides
-"""PyComedi: an object-oriented interface for the Comedi drivers.
+# Copyright
-Requires
- * Numpy (http://numpy.scipy.org/)
- * Comedi (http://www.comedi.org/)
-"""
+"An object-oriented interface for the Comedi drivers."
from distutils.core import setup
+import os.path
from pycomedi import __version__
+
classifiers = """\
Development Status :: 2 - Pre-Alpha
Intended Audience :: Developers
Topic :: Software Development :: Libraries :: Python Modules
"""
-doclines = __doc__.split('\n')
-
+_this_dir = os.path.dirname(__file__)
setup(name='pycomedi',
version=__version__,
maintainer='W. Trevor King',
maintainer_email='wking@drexel.edu',
- url='http://www.physics.drexel.edu/~wking/unfolding-disasters/pycomedi/',
+ url='http://www.physics.drexel.edu/~wking/unfolding-disasters/posts/pycomedi/',
download_url='http://www.physics.drexel.edu/~wking/code/python/pycomedi-%s.tar.gz' % __version__,
license='GNU General Public License (GPL)',
platforms=['all'],
- description=doclines[0],
- long_description='\n'.join(doclines[2:]),
+ description=__doc__,
+ long_description=open(os.path.join(_this_dir, 'README'), 'r').read(),
classifiers=filter(None, classifiers.split('\n')),
packages=['pycomedi'],
provides=['pycomedi'],