def open_channels(device, subdevice, channels, range, aref):
"""Subdevice index and list of channel indexes
- -> ``Subdevice`` instance and list of ``AnalogChannel`` instances
+ to ``Subdevice`` instance and list of ``AnalogChannel`` instances
"""
- if args.subdevice >= 0:
+ if subdevice >= 0:
subdevice = device.subdevice(subdevice, factory=_StreamingSubdevice)
else:
subdevice = device.find_subdevice_by_type(
_LOG.info('test {} returned {}\n{}'.format(i, rc, subdevice.cmd))
_LOG.error('error preparing command: {}'.format(rc))
_sys.exit(1)
+test_command.__test__ = False # test_command is not a Nose test
-def print_data(channels, data, physical=False):
+def write_data(stream, channels, data, physical=False):
if physical:
converters = [c.get_converter() for c in channels]
physical_data = _numpy.zeros(data.shape, dtype=float32)
physical_data[:,i] = c.to_physical(data[:,i])
data = physical_data
for row in range(data.shape[0]):
- print '\t'.join(str(x) for x in data[row,:])
+ stream.write('\t'.join(str(x) for x in data[row,:]))
+ stream.write('\n')
def read(device, subdevice=None, channels=[0], range=0, aref=0, period=0,
- num_scans=2, reader=_utility.Reader, physical=False):
+ num_scans=2, reader=_utility.Reader, physical=False,
+ stream=_sys.stdout):
"""Read ``num_scans`` samples from each specified channel.
"""
subdevice,channels = open_channels(
stop = _time.time()
_LOG.info('stop time: {}'.format(stop))
_LOG.info('time: {}'.format(stop - start))
- print_data(channels=channels, data=read_buffer, physical=physical)
+ write_data(
+ stream=stream, channels=channels, data=read_buffer, physical=physical)
+
+
+def run(filename, **kwargs):
+ """
+ >>> import StringIO
+ >>> stdout = StringIO.StringIO()
+ >>> run(filename='/dev/comedi0', stream=stdout)
+ >>> print(stdout.getvalue()) # doctest: +SKIP
+ 29694
+ 29693
+ <BLANKLINE>
+ >>> stdout.truncate(0)
+ >>> run(filename='/dev/comedi0', reader=_utility.MMapReader, stream=stdout)
+ >>> print(stdout.getvalue()) # doctest: +SKIP
+ 29691
+ 29691
+ <BLANKLINE>
+ """
+ device = _Device(filename=filename)
+ device.open()
+ try:
+ read(device=device, **kwargs)
+ finally:
+ device.close()
if __name__ == '__main__':
else:
reader = _utility.Reader
- device = _Device(filename=args.filename)
- device.open()
- try:
- read(
- device=device, subdevice=args.subdevice, channels=args.channels,
- range=args.range, aref=args.aref, period=args.period,
- num_scans=args.num_scans, reader=reader, physical=args.physical)
- finally:
- device.close()
+ run(filename=args.filename, subdevice=args.subdevice,
+ channels=args.channels, aref=args.aref, range=args.range,
+ num_scans=args.num_scans, reader=reader, period=args.period,
+ physical=args.physical)
for subdevice in device.subdevices():
display_subdevice(subdevice=subdevice)
+def run(filename):
+ """
+ >>> run(filename='/dev/comedi0') # doctest: +ELLIPSIS, +REPORT_UDIFF
+ overall info
+ comedi version: 0.7.76
+ driver name: ni_pcimio
+ board name: pci-6052e
+ number of subdevices: 14
+ subdevice 0:
+ type: ai
+ flags: cmd_read|readable|ground|common|diff|other|dither
+ ...
+ subdevice 13:
+ type: counter
+ flags: readable|writable
+ number of channels: 1
+ max data value: 15
+ ranges: <Range unit:none min:0.0 max:1.0>
+ command: (not supported)
+ """
+ device = _Device(filename=filename)
+ device.open()
+ try:
+ display(device)
+ finally:
+ device.close()
+
if __name__ == '__main__':
import pycomedi_demo_args
args = pycomedi_demo_args.parse_args(description=__doc__, argnames=['filename'])
- device = _Device(filename=args.filename)
- device.open()
- try:
- display(device)
- finally:
- device.close()
+ run(filename=args.filename)
from pycomedi.chanspec import ChanSpec as _ChanSpec
-if __name__ == '__main__':
- import argparse
- import pycomedi_demo_args
-
- args = pycomedi_demo_args.parse_args(
- description=__doc__,
- argnames=['filename', 'subdevice', 'channel', 'aref', 'range',
- 'num-scans', 'verbose'])
+def run(filename, subdevice, channel, range, aref, num_scans):
+ """
+ >>> t1,data,t2 = run(filename='/dev/comedi0', subdevice=None,
+ ... channel=0, range=0, aref=_constant.AREF.ground, num_scans=10)
+ >>> t1 # doctest: +SKIP
+ 1332242184.029691
+ >>> t2 # doctest: +SKIP
+ 1332242184.3311629
+ >>> data # doctest: +ELLIPSIS
+ array([...], dtype=uint32)
+ >>> data.shape
+ (10,)
+ """
+ _LOG.info(('measuring device={} subdevice={} channel={} range={} '
+ 'analog-reference={} num-scans={}'
+ ).format(filename, subdevice, channel, range, aref, num_scans))
+ device = _Device(filename=filename)
+ device.open()
+ try:
+ if subdevice is None:
+ subdevice = device.find_subdevice_by_type(
+ _constant.SUBDEVICE_TYPE.ai)
+ else:
+ subdevice = device.subdevice(subdevice)
- print(args)
- _LOG.info(('measuring device={0.filename} subdevice={0.subdevice} '
- 'channel={0.channel} range={0.range} analog reference={0.aref}'
- ).format(args))
+ insns = [subdevice.insn(), subdevice.insn(), subdevice.insn()]
+ insns[0].insn = insns[2].insn = _constant.INSN.gtod
+ insns[0].data = insns[2].data = [0, 0]
+ insns[1].insn = _constant.INSN.read
+ insns[1].data = [0] * num_scans
+ insns[1].chanspec = _ChanSpec(chan=channel, range=range, aref=aref)
- device = _Device(filename=args.filename)
- device.open()
- if args.subdevice is None:
- subdevice = device.find_subdevice_by_type(_constant.SUBDEVICE_TYPE.ai)
- else:
- subdevice = device.subdevice(args.subdevice)
-
- insns = [subdevice.insn(), subdevice.insn(), subdevice.insn()]
- insns[0].insn = insns[2].insn = _constant.INSN.gtod
- insns[0].data = insns[2].data = [0, 0]
- insns[1].insn = _constant.INSN.read
- insns[1].data = [0] * args.num_scans
- insns[1].chanspec = _ChanSpec(
- chan=args.channel, range=args.range, aref=args.aref)
-
- device.do_insnlist(insns)
- device.close()
+ device.do_insnlist(insns)
+ finally:
+ device.close()
t1 = insns[0].data[0] + insns[1].data[1]/1e6
t2 = insns[2].data[0] + insns[2].data[1]/1e6
+ return (t1, insns[1].data, t2)
+
+def display(t1, data, t2):
_LOG.info('initial time: {}'.format(t1))
_LOG.info('final time: {}'.format(t2))
_LOG.info('difference: {}'.format(t2-t1))
for x in insns[1].data:
print(x)
+
+if __name__ == '__main__':
+ import pycomedi_demo_args
+
+ args = pycomedi_demo_args.parse_args(
+ description=__doc__,
+ argnames=['filename', 'subdevice', 'channel', 'aref', 'range',
+ 'num-scans', 'verbose'],
+ args=args)
+
+ t1,data,t2 = run(
+ filename=args.filename, subdevice=args.subdevice, channel=args.channel,
+ range=args.range, aref=args.aref, num_scans=args.num_scans)
+ display(t1=t1, data=data, t2=t2)
{'action':_IncrementVerbosityAction}),
}
-def parse_args(description, argnames):
+def parse_args(description, argnames, args=None):
+ """
+ >>> args = parse_args(
+ ... description='Parse a bunch of arguments',
+ ... argnames=['frequency'], args=['--frequency', '2'])
+ >>> args.period
+ 0.5
+ """
parser = _argparse.ArgumentParser(description=description)
for argument in argnames:
- args,kwargs = ARGUMENTS[argument]
- parser.add_argument(*args, **kwargs)
- args = parser.parse_args()
+ args_,kwargs = ARGUMENTS[argument]
+ parser.add_argument(*args_, **kwargs)
+ args = parser.parse_args(args=args)
if 'frequency' in argnames and not hasattr(args, 'period'):
args.period = 0
return args
python -c "import doctest, sys; import pycomedi.$mod as m; r = doctest.testmod(m); print r; sys.exit(r.failed)"
done
nosetests --with-doctest --doctest-extension=.txt doc
+nosetests --with-doctest doc/demo/*.py