--- /dev/null
+With Python 3, Cython is pickier about circular dependencies. This is
+all well and good, but sometimes lower level objects (e.g. Subdevice
+instances) need pointers from higher level objects (e.g. Device
+instances). How does that work?
+
+All of the C types used in the library are defined in library-specific
+``_*_h.pxd`` files at the bottom of the dependency tree. The
+low-level objects don't really need the higher level objects (which
+would create a circular import), they need the *pointers* held by the
+higher level object. The solution is to store higher-level objects as
+generic Python objects (e.g. Subdevice.device). When you need the
+device ``comedi_t *`` for a subdevice method, use
+``Subdevice._device``, which assumes the ``Subdevice.device`` has been
+setup with a ``Device`` instance, and extracts the pointer with the
+appropriate casting.
+
+This deals with the circular imports, but is the casting safe?
+Kindof. While it would be nice to do something like::
+
+ cdef class Subdevice (object):
+ cdef _comedilib_h.comedi_t * _device(self):
+ if not isinstance(self.device, _device.Device):
+ raise ValueError(self.device)
+ return <_comedilib_h.comedi_t *> self.device.device
+
+we can't perform the check without circularly importing
+``pycomedi.device``. So the cast is blind, which could cause all
+sorts of problems.
+
+In practice, however, all ``Subdevice`` instances are likely to come
+from the ``Device.subdevice()`` method, which sets up the ``.device``
+attribute appropriately. This means that while you *could* put
+something else in the ``.device`` attribute and blow things up, you
+probably won't do so accidentally. This conforms to the Python style
+of assuming that we are all consenting adults.
+
+For clarity in the above, I discussed the ``Device`` / ``Subdevice``
+relationship, but the same logic holds for similar high- / low-level
+relationships (e.g. ``Subdevice`` / ``Channel``, etc.).
"Expose `CalibratedConverter` internals at the C level for other Cython modules"
cimport _comedilib_h
-from device cimport Device as _Device
-from subdevice cimport Subdevice as _Subdevice
cdef class CalibratedConverter (object):
cdef class CalibrationSetting (object):
cdef _comedilib_h.comedi_calibration_setting_t *setting
- cdef public _Subdevice subdevice
+ cdef public object subdevice # pycomedi.subdevice.Subdevice
cdef _caldacs_set_single(self, index, Caldac caldac)
cpdef _soft_calibration_set(self, CalibratedConverter value)
cdef class Calibration (object):
cdef _comedilib_h.comedi_calibration_t *calibration
- cdef public _Device device
+ cdef public object device # pycomedi.device.Device
cpdef from_file(self, path)
"""
def __cinit__(self):
self.setting = NULL
+ self.subdevice = None
def __init__(self, subdevice):
super(CalibrationSetting, self).__init__()
"""
def __cinit__(self):
self.calibration = NULL
+ self.device = None
def __init__(self, device):
super(Calibration, self).__init__()
from calibration cimport CalibratedConverter as _CalibratedConverter
from calibration cimport Calibration as _Calibration
from range cimport Range as _Range
-from subdevice cimport Subdevice as _Subdevice
from pycomedi import LOG as _LOG
from chanspec import ChanSpec as _ChanSpec
>>> d.close()
"""
- cdef public _Subdevice subdevice
+ cdef public object subdevice # pycomedi.subdevice.Subdevice
cdef public int index
def __cinit__(self):
+ self.subdevice = None
self.index = -1
def __init__(self, subdevice, index):
self.subdevice = subdevice
self.index = index
+ cdef _comedilib_h.comedi_t * _device(self):
+ return <_comedilib_h.comedi_t *> self.subdevice.device.device
+
def get_maxdata(self):
ret = _comedilib_h.comedi_get_maxdata(
- self.subdevice.device.device,
- self.subdevice.index, self.index)
+ self._device(), self.subdevice.index, self.index)
if ret < 0:
_error.raise_error(function_name='comedi_get_maxdata', ret=ret)
return ret
def get_n_ranges(self):
ret = _comedilib_h.comedi_get_n_ranges(
- self.subdevice.device.device,
- self.subdevice.index, self.index)
+ self._device(), self.subdevice.index, self.index)
if ret < 0:
_error.raise_error(function_name='comedi_get_n_ranges', ret=ret)
return ret
cdef _Range ret
# Memory pointed to by the return value is freed on Device.close().
rng = _comedilib_h.comedi_get_range(
- self.subdevice.device.device,
- self.subdevice.index, self.index, index)
+ self._device(), self.subdevice.index, self.index, index)
if rng is NULL:
_error.raise_error(function_name='comedi_get_range')
ret = _Range(value=index)
def _find_range(self, unit, min, max):
"Search for range"
ret = _comedilib_h.comedi_find_range(
- self.subdevice.device.device,
- self.subdevice.index, self.index,
+ self._device(), self.subdevice.index, self.index,
_constant.bitwise_value(unit), min, max)
if ret < 0:
_error.raise_error(function_name='comedi_find_range', ret=ret)
`dir` should be an item from `constants.IO_DIRECTION`.
"""
ret = _comedilib_h.comedi_dio_config(
- self.subdevice.device.device,
- self.subdevice.index, self.index,
+ self._device(), self.subdevice.index, self.index,
_constant.bitwise_value(dir))
if ret < 0:
_error.raise_error(function_name='comedi_dio_config', ret=ret)
"""
cpdef unsigned int dir
ret = _comedilib_h.comedi_dio_get_config(
- self.subdevice.device.device,
- self.subdevice.index, self.index, &dir)
+ self._device(), self.subdevice.index, self.index, &dir)
if ret < 0:
_error.raise_error(function_name='comedi_dio_get_config', ret=ret)
return _constant.IO_DIRECTION.index_by_value(dir)
"Read a single bit"
cpdef unsigned int bit
ret = _comedilib_h.comedi_dio_read(
- self.subdevice.device.device,
- self.subdevice.index, self.index, &bit)
+ self._device(), self.subdevice.index, self.index, &bit)
if ret < 0:
_error.raise_error(function_name='comedi_dio_read', ret=ret)
return int(bit)
def dio_write(self, bit):
"Write a single bit"
ret = _comedilib_h.comedi_dio_write(
- self.subdevice.device.device,
- self.subdevice.index, self.index, bit)
+ self._device(), self.subdevice.index, self.index, bit)
if ret < 0:
_error.raise_error(function_name='comedi_dio_write', ret=ret)
"Read one sample"
cdef _comedi_h.lsampl_t data
ret = _comedilib_h.comedi_data_read(
- self.subdevice.device.device,
- self.subdevice.index, self.index,
+ self._device(), self.subdevice.index, self.index,
_constant.bitwise_value(self.range),
_constant.bitwise_value(self.aref),
&data)
"Read `n` samples (timing between samples is undefined)."
data = _numpy.ndarray(shape=(n,), dtype=_numpy.uint32)
ret = _comedilib_h.comedi_data_read_n(
- self.subdevice.device.device,
- self.subdevice.index, self.index,
+ self._device(), self.subdevice.index, self.index,
_constant.bitwise_value(self.range),
_constant.bitwise_value(self.aref),
<_comedilib_h.lsampl_t *>_numpy.PyArray_DATA(data), n)
performs a conversion.
"""
ret = _comedilib_h.comedi_data_read_hint(
- self.subdevice.device.device,
- self.subdevice.index, self.index,
+ self._device(), self.subdevice.index, self.index,
_constant.bitwise_value(self.range),
_constant.bitwise_value(self.aref))
if ret < 0:
"""
cdef _comedi_h.lsampl_t data
ret = _comedilib_h.comedi_data_read_delayed(
- self.subdevice.device.device,
- self.subdevice.index, self.index,
+ self._device(), self.subdevice.index, self.index,
_constant.bitwise_value(self.range),
_constant.bitwise_value(self.aref),
&data, int(nano_sec))
Returns 1 (the number of data samples written).
"""
ret = _comedilib_h.comedi_data_write(
- self.subdevice.device.device,
- self.subdevice.index, self.index,
+ self._device(), self.subdevice.index, self.index,
_constant.bitwise_value(self.range),
_constant.bitwise_value(self.aref),
int(data))
"""
cdef _comedilib_h.comedi_polynomial_t poly
rc = _comedilib_h.comedi_get_hardcal_converter(
- self.subdevice.device.device,
- self.subdevice.index, self.index,
+ self._device(), self.subdevice.index, self.index,
_constant.bitwise_value(self.range),
_constant.bitwise_value(direction), &poly)
if rc < 0:
# automatically get a char * refernce into the Python string p
path = p
ret = _comedilib_h.comedi_apply_calibration(
- self.subdevice.device.device,
- self.subdevice.index, self.index,
+ self._device(), self.subdevice.index, self.index,
_constant.bitwise_value(self.range),
_constant.bitwise_value(self.aref),
path)
cdef _apply_parsed_calibration(self, _Calibration calibration):
ret = _comedilib_h.comedi_apply_parsed_calibration(
- self.subdevice.device.device,
- self.subdevice.index, self.index,
+ self._device(), self.subdevice.index, self.index,
_constant.bitwise_value(self.range),
_constant.bitwise_value(self.aref),
calibration.calibration)
"Expose `Subdevice` internals at the C level for other Cython modules"
-from device cimport Device as _Device
+cimport _comedilib_h
from command cimport Command as _Command
cdef class Subdevice (object):
- cdef public _Device device
+ cdef public object device # pycomedi.device.Device
cdef public int index
+ cdef _comedilib_h.comedi_t * _device(self)
cpdef dio_bitfield(self, unsigned int bits=*, write_mask=*, base_channel=*)
cdef class StreamingSubdevice (Subdevice):
cimport _comedi_h
cimport _comedilib_h
-cimport device as _device
cimport command as _command
from pycomedi import LOG as _LOG
import _error
"""
def __cinit__(self):
self.index = -1
+ self.device = None
def __init__(self, device, index):
super(Subdevice, self).__init__()
self.device = device
self.index = index
+ cdef _comedilib_h.comedi_t * _device(self):
+ return <_comedilib_h.comedi_t *> self.device.device
+
def get_type(self):
"Type of subdevice (from `SUBDEVICE_TYPE`)"
ret = _comedilib_h.comedi_get_subdevice_type(
- self.device.device, self.index)
+ self._device(), self.index)
if ret < 0:
_error.raise_error(function_name='comedi_get_subdevice_type',
ret=ret)
def _get_flags(self):
"Subdevice flags"
ret = _comedilib_h.comedi_get_subdevice_flags(
- self.device.device, self.index)
+ self._device(), self.index)
if ret < 0:
_error.raise_error(function_name='comedi_get_subdevice_flags',
ret=ret)
def get_n_channels(self):
"Number of subdevice channels"
ret = _comedilib_h.comedi_get_n_channels(
- self.device.device, self.index)
+ self._device(), self.index)
if ret < 0:
_error.raise_error(function_name='comedi_get_n_channels',
ret=ret)
def range_is_chan_specific(self):
ret = _comedilib_h.comedi_range_is_chan_specific(
- self.device.device, self.index)
+ self._device(), self.index)
if ret < 0:
_error.raise_error(
function_name='comedi_range_is_chan_specific', ret=ret)
def maxdata_is_chan_specific(self):
ret = _comedilib_h.comedi_maxdata_is_chan_specific(
- self.device.device, self.index)
+ self._device(), self.index)
if ret < 0:
_error.raise_error(
function_name='comedi_maxdata_is_chan_specific', ret=ret)
def lock(self):
"Reserve the subdevice"
- ret = _comedilib_h.comedi_lock(self.device.device, self.index)
+ ret = _comedilib_h.comedi_lock(self._device(), self.index)
if ret < 0:
_error.raise_error(function_name='comedi_lock', ret=ret)
def unlock(self):
"Release the subdevice"
- ret = _comedilib_h.comedi_unlock(self.device.device, self.index)
+ ret = _comedilib_h.comedi_unlock(self._device(), self.index)
if ret < 0:
_error.raise_error(function_name='comedi_unlock', ret=ret)
channels and the last written value of all output channels.
"""
ret = _comedilib_h.comedi_dio_bitfield2(
- self.device.device, self.index, write_mask, &bits, base_channel)
+ self._device(), self.index, write_mask, &bits, base_channel)
if ret < 0:
_error.raise_error(function_name='comedi_dio_bitfield2', ret=ret)
return bits
cdef _command.Command cmd
cmd = _command.Command()
ret = _comedilib_h.comedi_get_cmd_src_mask(
- self.device.device, self.index, cmd.get_comedi_cmd_pointer())
+ self._device(), self.index, cmd.get_comedi_cmd_pointer())
if ret < 0:
_error.raise_error(function_name='comedi_get_cmd_src_mask', ret=ret)
return cmd
cdef _command.Command cmd
cmd = _command.Command()
ret = _comedilib_h.comedi_get_cmd_generic_timed(
- self.device.device, self.index, cmd.get_comedi_cmd_pointer(),
+ self._device(), self.index, cmd.get_comedi_cmd_pointer(),
chanlist_len, int(scan_period_ns))
cmd.chanlist = [0 for i in range(chanlist_len)]
if ret < 0:
def cancel(self):
"Stop streaming input/output in progress."
- ret = _comedilib_h.comedi_cancel(self.device.device, self.index)
+ ret = _comedilib_h.comedi_cancel(self._device(), self.index)
if ret < 0:
_error.raise_error(function_name='comedi_cancel', ret=ret)
def command(self):
"Start streaming input/output"
ret = _comedilib_h.comedi_command(
- self.device.device, self.cmd.get_comedi_cmd_pointer())
+ self._device(), self.cmd.get_comedi_cmd_pointer())
if ret < 0:
_error.raise_error(function_name='comedi_command', ret=ret)
def command_test(self):
"Test streaming input/output configuration"
ret = _comedilib_h.comedi_command_test(
- self.device.device, self.cmd.get_comedi_cmd_pointer())
+ self._device(), self.cmd.get_comedi_cmd_pointer())
return self._command_test_errors[ret]
def poll(self):
buffers or device FIFOs. If successful, the number of
additional bytes available is returned.
"""
- ret = _comedilib_h.comedi_poll(self.device.device, self.index)
+ ret = _comedilib_h.comedi_poll(self._device(), self.index)
if ret < 0:
_error.raise_error(function_name='comedi_poll', ret=ret)
return ret
def get_buffer_size(self):
"Streaming buffer size of subdevice"
ret = _comedilib_h.comedi_get_buffer_size(
- self.device.device, self.index)
+ self._device(), self.index)
if ret < 0:
_error.raise_error(function_name='comedi_get_buffer_size', ret=ret)
return ret
`comedi_config`.
"""
ret = _comedilib_h.comedi_set_buffer_size(
- self.device.device, self.index, int(size))
+ self._device(), self.index, int(size))
if ret < 0:
_error.raise_error(function_name='comedi_set_buffer_size', ret=ret)
return ret
def get_max_buffer_size(self):
"Maximum streaming buffer size of subdevice"
ret = _comedilib_h.comedi_get_max_buffer_size(
- self.device.device, self.index)
+ self._device(), self.index)
if ret < 0:
_error.raise_error(function_name='comedi_get_max_buffer_size',
ret=ret)
Returns the old (max?) buffer size on success.
"""
ret = _comedilib_h.comedi_set_max_buffer_size(
- self.device.device, self.index, int(max_size))
+ self._device(), self.index, int(max_size))
if ret < 0:
_error.raise_error(function_name='comedi_set_max_buffer_size',
ret=ret)
def get_buffer_contents(self):
"Number of bytes available on an in-progress command"
ret = _comedilib_h.comedi_get_buffer_contents(
- self.device.device, self.index)
+ self._device(), self.index)
if ret < 0:
_error.raise_error(function_name='comedi_get_buffer_contents',
ret=ret)
calls.
"""
ret = _comedilib_h.comedi_mark_buffer_read(
- self.device.device, self.index, num_bytes)
+ self._device(), self.index, num_bytes)
if ret < 0:
_error.raise_error(function_name='comedi_mark_buffer_read',
ret=ret)
`write()` calls.
"""
ret = _comedilib_h.comedi_mark_buffer_written(
- self.device.device, self.index, num_bytes)
+ self._device(), self.index, num_bytes)
if ret < 0:
_error.raise_error(function_name='comedi_mark_buffer_written',
ret=ret)
This offset is only useful for memory mapped buffers.
"""
ret = _comedilib_h.comedi_get_buffer_offset(
- self.device.device, self.index)
+ self._device(), self.index)
if ret < 0:
_error.raise_error(function_name='comedi_get_buffer_offset', ret=ret)
return ret