#!/usr/bin/python
#
-# W. Trevor King, Jan. 29th, 2008
+# Copyright (C) 2008-2011 W. Trevor King <wking@drexel.edu>
#
-# Version 0.2, WT King, Feb. 20th, 2008
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+
+# Version 0.2, W. Trevor King, Feb. 20th, 2008
# (added OptionParser code and stripchart debug parameters)
-# Version 0.3, WT King, Mar. 25th, 2010
+# Version 0.3, W. Trevor King, Mar. 25th, 2010
# (added multiple channels option)
+# Version 0.4, W. Trevor King, Nov. 12th, 2011
+# Transition from optparse -> argparse and update to pycomedi 0.3.
-"""
-Simple single analog channel polling software.
-Spends most of its time sleeping,
-waking up every dt seconds to record the current voltage.
-Designed for Liming's cellulase activity measurements.
+"""Simple analog channel polling script.
+
+Spends most of its time sleeping, waking up every `dt` seconds to
+record the current voltage.
"""
-import pycomedi.single_aio as single_aio
-import time
-import curses_check_for_keypress
-import stripchart
-import data_logger
+import logging as _logging
+import os.path as _os_path
+import sys as _sys
+import time as _time
-DEFAULT_STRIPVERBOSE = False
-TEXT_VERBOSE = False
-LOG_DIR = '~/rsrch/data/slow_bend' # set LOG_DIR = None to disable logging
+import pycomedi.device as _pycomedi_device
+import pycomedi.subdevice as _pycomedi_subdevice
+import pycomedi.channel as _pycomedi_channel
+import pycomedi.constant as _pycomedi_constant
+try:
+ from pypid.backend.melcor import MelcorBackend as _TemperatureBackend
+except ImportError, pypid_import_error:
+ _TemperatureBackend = None
+
+
+__version__ = '0.4'
+
+
+class Monitor (object):
+ """Take measurements on a single channel every dt seconds.
-class slow_bend :
- """
- Take measurements on a single channel every dt seconds.
Save '<time>\t<bit_val>\t<physical_val>\n' records.
"""
- def __init__(self, chan=[0], dt=4, log_dir=LOG_DIR) :
- if TEXT_VERBOSE :
- print "Initializing slow_bend instance"
- print " chan %s, dt %g, log_dir %s" % (chan, dt, log_dir)
- self.chan = chan
- self.ai = single_aio.AI(chan=chan)
+
+ def __init__(self, channels=[0], temperature=False, dt=4,
+ comedi_device='/dev/comedi0', data_stream=None, logger=None):
+ self.channel_indexes = channels
+ self.with_temperature = temperature
self.dt = dt
- self._data_logger = data_logger.data_log(log_dir, log_name="slow_bend")
- self.savefilename,timestamp = self._data_logger.get_filename()
+ self.comedi_device = comedi_device
+ self.data_stream = data_stream
+ self._logger = logger
+ self.temperature = None
+
+ def log(self, level=_logging.DEBUG, msg=None):
+ assert msg is not None
+ if self._logger:
+ self._logger.log(level, msg)
+
+ def run(self):
+ try:
+ self._setup()
+ self._run()
+ finally:
+ self._teardown()
+
+ def _setup(self):
+ self.log(msg='setup slow bend monitor')
+ self.log(msg='chan {}, dt {}, data_stream {}'.format(
+ self.channel_indexes, self.dt, self.data_stream))
+ self._setup_channels()
+ self._setup_temperature()
self._make_header()
- self.stripcharts = [stripchart.stripchart('slow_bend_pipe_%d' % chan,
- title='slow bend data %d' % chan)
- for chan in self.chan]
- if DEFAULT_STRIPVERBOSE == True :
- for chart in self.stripcharts:
- chart.open(debug=TEXT_VERBOSE)
- def run(self) :
- if TEXT_VERBOSE :
- print "Running slow_bend"
- self.start_time = time.time()
+
+ def _setup_channels(self):
+ self.log(msg='setup input channels')
+ self.device = _pycomedi_device.Device(self.comedi_device)
+ self.device.open()
+ self.subdevice = self.device.find_subdevice_by_type(
+ _pycomedi_constant.SUBDEVICE_TYPE.ai,
+ factory=_pycomedi_subdevice.StreamingSubdevice)
+ self.channels = []
+ for c_index in self.channel_indexes:
+ c = self.subdevice.channel(
+ c_index,
+ factory=_pycomedi_channel.AnalogChannel,
+ aref=_pycomedi_constant.AREF.diff)
+ c.range = c.find_range(
+ unit=_pycomedi_constant.UNIT.volt,
+ min=-10, max=10)
+ self.channels.append(c)
+ self.converters = [c.get_converter() for c in self.channels]
+
+ def _setup_temperature(self):
+ self.log(msg='setup temperature channel')
+ if _TemperatureBackend is None:
+ raise pypid_import_error
+ self.temperature = _TemperatureBackend()
+
+ def _teardown(self):
+ self.log(msg='teardown slow bend monitor')
+ self._teardown_channels()
+ self._teardown_temperature()
+
+ def _teardown_channels(self):
+ self.log(msg='teardown input channels')
+ if hasattr(self, 'device') and self.device is not None:
+ self.device.close()
+ self.device = self.subdevice = self.channels = None
+
+ def _teardown_temperature(self):
+ self.log(msg='teardown temperature channel')
+ if self.temperature is not None:
+ self.temperature.cleanup()
+ self.temperature = None
+
+ def _run(self):
+ self.log(msg='running slow bend monitor')
+ self.start_time = _time.time()
next_read_time = self.start_time
- self.c = curses_check_for_keypress.check_for_keypress( \
- "Press any key to quit")
- if TEXT_VERBOSE :
- self.c.output("Starting loop\n")
- while self.c.input() == None :
- if TEXT_VERBOSE :
- self.c.output("Taking measurement\n")
- tm,bitvals,physicals = self._take_and_save_reading()
- self.c.output("%s\t%s\n" % (tm-self.start_time,
- '\t'.join(['%g'%p for p in physicals])))
- next_read_time += self.dt
- self._wait_until(next_read_time)
- del(self.c)
- def _measure_voltage(self) :
- # opening each time is slower than leaving it open
- # but for a slow measurement that's not worth
- # hogging the card.
- self.ai.open()
- bitvals = self.ai.read()
- physicals = [self.ai.comedi_to_phys(0,bitval)
- for chan,bitval in zip(self.chan, bitvals)]
- self.ai.close()
- if TEXT_VERBOSE :
- self.c.output('%s\n'
- % ('\t'.join(['%d\t%g' % (bitval, physical)
- for bitval,physical
- in zip(bitvals, physicals)])))
- if self.stripcharts[0].status == "open" :
- for bitval,physical,stripchart \
- in zip(bitvals, physicals, self.stripcharts):
- if bitval == 0 :
- stripchart.add_point(-10)
- elif bitval == 2**16-1 :
- stripchart.add_point(10)
- else :
- stripchart.add_point(physical)
- return (bitvals, physicals)
- def _make_header(self) :
- "create the save file header"
- fid = file(self.savefilename, 'w')
- fid.write('#%s\t%s\n' % ('time (s)',
- '\t'.join(['chan %d (bits)\tchan %g (V)'
- % (chan, chan)
- for chan in self.chan])))
- fid.close()
- def _save_reading(self, time, bitvals, physicals) :
- fid = file(self.savefilename, 'a+')
- fid.write('%g\t%s\n' % (time - self.start_time,
- '\t'.join(['%d\t%g' % (bitval, physical)
- for bitval,physical
- in zip(bitvals, physicals)])))
- fid.close()
- def _take_and_save_reading(self) :
- tm = time.time()
+ self.log(_logging.INFO, 'press Control-c to stop acquisition')
+ try:
+ while True:
+ tm,bitvals,physicals = self._take_and_save_reading()
+ next_read_time += self.dt
+ self._wait_until(next_read_time)
+ except KeyboardInterrupt:
+ pass
+
+ def _take_and_save_reading(self):
+ tm = _time.time()
bitvals,physicals = self._measure_voltage()
+ if self.with_temperature:
+ bitval,physical = self._measure_temperature()
+ bitvals.insert(0, bitval)
+ physicals.insert(0, physical)
self._save_reading(tm, bitvals, physicals)
- return tm, bitvals, physicals
- def _wait_until(self, targ_time) :
- dt = targ_time - time.time()
- if dt > 0 :
- time.sleep(dt)
-
-if __name__ == "__main__" :
- from optparse import OptionParser
-
- usage_string = """%prog [options]
-
-2008, W. Trevor King.
-"""
- parser = OptionParser(usage=usage_string, version="%prog 0.3")
-
- parser.add_option('-t', '--timestep', dest="dt",
- help="Measure voltage evert DT seconds (default '%default')",
- type='float', metavar="DATABASE", default=4.0)
- parser.add_option('-c', '--input-channel', dest="chan",
- help="Input voltage CHANNEL (default '%default'). Add multiple channels in a comma-seperated list (e.g. 0,1,2,...).",
- type='string', metavar="CHANNEL", default='0,1')
- parser.add_option('-d', '--log-directory', dest="log_dir",
- help="Write output to subdir of LOG_DIR (default %default)",
- type='string', metavar="LOG_DIR", default=LOG_DIR)
- parser.add_option('-s', '--stripchart-off', dest="stripchart",
- action="store_false",
- help="Turn off the stripchart display (default)",
- default=False)
- parser.add_option('-S', '--stripchart-on', dest="stripchart",
- action="store_true",
- help="Turn on the stripchart display")
- parser.add_option('-v', '--verbose', dest="verbose", action="store_true",
- help="Print lots of debugging information",
- default=False)
-
- (options, args) = parser.parse_args()
- parser.destroy()
- options.chan = [int(x) for x in options.chan.split(',')]
-
- if options.verbose :
- TEXT_VERBOSE = True
-
- sb = slow_bend(chan=options.chan, dt=options.dt, log_dir=options.log_dir)
- if options.stripchart :
- for stripchart in sb.stripcharts:
- stripchart.open(debug=TEXT_VERBOSE)
- sb.run()
+ return (tm, bitvals, physicals)
+
+ def _measure_voltage(self):
+ self.log(msg='measure voltage')
+ bitvals = [c.data_read_delayed(nano_sec=1e3) for c in self.channels]
+ self.log(msg='read bit values: {}'.format(
+ '\t'.join('{:g}'.format(b) for b in bitvals)))
+ physicals = [cv.to_physical(b)
+ for cv,b in zip(self.converters, bitvals)]
+ return (bitvals, physicals)
+
+ def _measure_temperature(self):
+ self.log(msg='measure temperature')
+ physical = self.temperature.get_pv()
+ bitvalue = -1 # self.temperature._read('INPUT_1') # or possibly ACTUALL_2, PROCESS_1, PROCESS_2
+ return (bitvalue, physical)
+
+ def _make_header(self):
+ 'Create the save the data header'
+ fields = ['time (second)']
+ name_units = [('chan {}'.format(c.index), c.range.unit.name)
+ for c in self.channels]
+ if self.with_temperature:
+ names.insert(0, ('temperature', self.temperature.pv_units))
+ for name,unit in name_units:
+ fields.extend(['{} ({})'.format(name, u) for u in ['bit', unit]])
+ headline = '#{}'.format('\t'.join(fields))
+ self.data_stream.write(headline + '\n')
+ self.log(_logging.INFO, headline)
+ self.data_stream.flush()
+
+ def _save_reading(self, time, bitvals, physicals):
+ self.log(msg='save measurement')
+ dataline = '{:g}\t{}'.format(
+ time - self.start_time,
+ '\t'.join('{:d}\t{:g}'.format(b, p)
+ for b,p in zip(bitvals, physicals)))
+ self.data_stream.write(dataline + '\n')
+ self.log(_logging.INFO, dataline)
+ self.data_stream.flush()
+
+ def _wait_until(self, target_time):
+ self.log(msg='sleep until {}'.format(target_time))
+ dt = target_time - _time.time()
+ if dt > 0:
+ _time.sleep(dt)
+
+
+def _get_data_stream(data_dir=None, logger=None):
+ if data_dir is None:
+ return _sys.stdout
+ while True:
+ timestamp = _time.strftime('%Y-%m-%d_%H-%M-%S')
+ data_path = _os_path.join(args.data_dir, timestamp)
+ if not _os_path.exists(data_path):
+ break
+ if logger:
+ logger.warning(
+ '{} already exists, wait a second and try again'.format(
+ data_path))
+ _time.sleep(1) # try the next second
+ return open(data_path, 'w')
+
+def _get_logger(log_level=_logging.DEBUG):
+ logger = _logging.getLogger('slow_bend')
+ logger.setLevel(log_level)
+ ch = _logging.StreamHandler()
+ ch.setLevel(log_level)
+ formatter = _logging.Formatter(
+ '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ ch.setFormatter(formatter)
+ logger.addHandler(ch)
+ return logger
+
+
+if __name__ == '__main__':
+ from argparse import ArgumentParser
+
+ parser = ArgumentParser(description=__doc__)
+ parser.add_argument(
+ '--version', action='version', version=__version__)
+ parser.add_argument(
+ '-t', '--timestep', dest='dt', type=float, default=4.0,
+ help='Measure voltage evert DT seconds (default: %(default)s)')
+ parser.add_argument(
+ dest='channels', nargs='+', metavar='CHANNEL', type=int,
+ help='Input voltage CHANNEL(S)')
+ parser.add_argument(
+ '-d', '--data-directory', dest='data_dir', metavar='DATA_DIR',
+ help='Write output to subdir of DATA_DIR')
+ parser.add_argument(
+ '-T', '--temperature', dest='temperature',
+ default=False, action='store_const', const=True,
+ help='Also record the temperature')
+ parser.add_argument(
+ '-v', '--verbose', dest='verbose',
+ default=False, action='store_const', const=True,
+ help='Print lots of debugging information')
+
+ args = parser.parse_args()
+
+ if args.verbose:
+ logger = _get_logger()
+ else:
+ logger = None
+
+ data_stream = _get_data_stream(data_dir=args.data_dir, logger=logger)
+ try:
+ m = Monitor(
+ channels=args.channels, temperature=args.temperature, dt=args.dt,
+ data_stream=data_stream, logger=logger)
+ m.run()
+ finally:
+ if data_stream != _sys.stdout:
+ data_stream.close()