from optparse import OptionParser
from matplotlib.pyplot import close, get_fignums, figure, show
-from calibcant.analyze import calib_load_all, calib_plot_all
+from calibcant.calibrate import load_all
+from calibcant.analyze import analyze_all
def main(args):
options,args = p.parse_args(args)
filename = args[0]
- stuff = calib_load_all(filename=filename,
+ calibrator,data,raw_data = load_all(filename=filename,
if not options.bumps:
- stuff['bump_details'] = []
+ raw_data['bump'] = []
if not options.vibrations:
- stuff['vibration_details'] = []
- calib_plot_all(**stuff)
+ raw_data['vibration'] = []
+ analyze_all(config=calibrator.config, data=data, raw_data=raw_data,
+ plot=True, dry_run=True)
for i in get_fignums():
fig = figure(i)
+++ /dev/null
-# T family.
-# Fairly stubby, since a one shot Temp measurement is a common thing.
-# We just wrap that to provide a consistent interface.
-from . import LOG as _LOG
-from . import package_config as _package_config
-from .T_analyze import T_analyze as _T_analyze
-from .T_analyze import T_save as _T_save
-def T_acquire(get_T=None):
- """Measure the current temperature of the sample,
- If `get_T` is `None`, fake it by returning
- `package_config['temperature']`.
- """
- if get_T:
-'measure temperature')
- T = get_T()
- else:
- T = None
- if T is None:
-'fake temperature %g' % _package_config['temperature'])
- T = _package_config['temperature']
- return T
-def T(get_T, temperature_config, filename, group='/'):
- """Wrapper around T_acquire(), T_analyze(), T_save().
- >>> import os
- >>> import tempfile
- >>> from import HDF5_Storage, pprint_HDF5
- >>> from .config import TemperatureConfig
- >>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='calibcant-')
- >>> os.close(fd)
- >>> temperature_config = TemperatureConfig(storage=HDF5_Storage(
- ... filename=filename, group='/T/config/'))
- >>> def get_T():
- ... return 19.2
- >>> t = T(get_T=get_T, temperature_config=temperature_config,
- ... filename=filename, group='/T/')
- >>> pprint_HDF5(filename) # doctest: +ELLIPSIS, +REPORT_UDIFF
- /
- /T
- /T/config
- <HDF5 dataset "default": shape (), type "|b1">
- False
- <HDF5 dataset "units": shape (), type "|S7">
- Celsius
- <HDF5 dataset "processed": shape (), type "<f8">
- 292.35
- <HDF5 dataset "raw": shape (), type "<f8">
- 19.2
- >>> t = T(get_T=None, temperature_config=temperature_config,
- ... filename=filename, group='/T/')
- >>> pprint_HDF5(filename) # doctest: +ELLIPSIS, +REPORT_UDIFF
- /
- /T
- /T/config
- <HDF5 dataset "default": shape (), type "|b1">
- True
- <HDF5 dataset "units": shape (), type "|S7">
- Celsius
- <HDF5 dataset "processed": shape (), type "<f8">
- 295.15
- <HDF5 dataset "raw": shape (), type "<i4">
- 22
- Cleanup our temporary config file.
- >>> os.remove(filename)
- """
- T_raw = T_acquire(get_T)
- _LOG.debug('got T: %s' % T_raw)
- T_ret = _T_analyze(T_raw, temperature_config)
- temperature_config['default'] = not get_T
- _T_save(filename, group=group, raw_T=T_raw,
- temperature_config=temperature_config, processed_T=T_ret)
- return T_ret
+++ /dev/null
-# calibcant - tools for thermally calibrating AFM cantilevers
-# Copyright (C) 2008-2012 W. Trevor King <>
-# This file is part of calibcant.
-# calibcant is free software: you can redistribute it and/or modify it under
-# the terms of the GNU General Public License as published by the Free Software
-# Foundation, either version 3 of the License, or (at your option) any later
-# version.
-# calibcant is distributed in the hope that it will be useful, but WITHOUT ANY
-# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
-# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
-# You should have received a copy of the GNU General Public License along with
-# calibcant. If not, see <>.
-"""Temperature analysis.
-Separate the more general `T_analyze()` from the other `T_*()`
-functions in calibcant.
-The relevant physical quantities are:
-* `T` Temperature at which thermal vibration measurements were acquired
->>> import os
->>> import tempfile
->>> import numpy
->>> from .config import TemperatureConfig
->>> from import pprint_HDF5, HDF5_Storage
->>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='calibcant-')
->>> os.close(fd)
->>> temperature_config = TemperatureConfig(storage=HDF5_Storage(
-... filename=filename, group='/T/config/'))
->>> raw_T = numpy.array([22, 23.5, 24])
->>> processed_T = T_analyze(raw_T, temperature_config)
->>> T_plot(raw_T=raw_T, processed_T=processed_T)
->>> T_save(filename=filename, group='/T/', raw_T=raw_T,
-... temperature_config=temperature_config, processed_T=processed_T)
->>> pprint_HDF5(filename) # doctest: +REPORT_UDIFF
- /T
- /T/config
- <HDF5 dataset "default": shape (), type "|b1">
- True
- <HDF5 dataset "units": shape (), type "|S7">
- Celsius
- <HDF5 dataset "processed": shape (3,), type "<f8">
- [ 295.15 296.65 297.15]
- <HDF5 dataset "raw": shape (3,), type "<f8">
- [ 22. 23.5 24. ]
->>> raw_T,temperature_config,processed_T = T_load(
-... filename=filename, group='/T/')
->>> print temperature_config.dump()
-units: Celsius
-default: yes
->>> raw_T
-array([ 22. , 23.5, 24. ])
->>> type(raw_T)
-<type 'numpy.ndarray'>
->>> processed_T
-array([ 295.15, 296.65, 297.15])
->>> os.remove(filename)
-import h5py as _h5py
-from scipy.constants import C2K as _C2K
- import matplotlib as _matplotlib
- import matplotlib.pyplot as _matplotlib_pyplot
- import time as _time # for timestamping lines on plots
-except (ImportError, RuntimeError), e:
- _matplotlib = None
- _matplotlib_import_error = e
-from import HDF5_Storage as _HDF5_Storage
-from import h5_create_group as _h5_create_group
-from . import LOG as _LOG
-from . import package_config as _package_config
-from .config import Celsius as _Celsius
-from .config import Kelvin as _Kelvin
-from .config import TemperatureConfig as _TemperatureConfig
-def T_analyze(T, temperature_config):
- """Convert measured temperature to Kelvin.
- `T` should be a numpy ndarray or scalar. `temperature_config`
- should be a `config._TemperatureConfig` instance.
- """
- if temperature_config['units'] == _Celsius:
- return _C2K(T)
- else:
- return T
-def T_save(filename, group='/', raw_T=None, temperature_config=None,
- processed_T=None):
- with _h5py.File(filename, 'a') as f:
- cwg = _h5_create_group(f, group)
- if raw_T is not None:
- try:
- del cwg['raw']
- except KeyError:
- pass
- cwg['raw'] = raw_T
- if temperature_config is not None:
- config_cwg = _h5_create_group(cwg, 'config')
- storage = _HDF5_Storage()
-, group=config_cwg)
- if processed_T is not None:
- try:
- del cwg['processed']
- except KeyError:
- pass
- cwg['processed'] = processed_T
-def T_load(filename, group='/'):
- assert group.endswith('/')
- raw_T = processed_T = None
- with _h5py.File(filename, 'a') as f:
- try:
- raw_T = f[group+'raw'][...]
- except KeyError:
- pass
- temperature_config = _TemperatureConfig(storage=_HDF5_Storage(
- filename=filename, group=group+'config/'))
- try:
- processed_T = f[group+'processed'][...]
- except KeyError:
- pass
- temperature_config.load()
- return (raw_T, temperature_config, processed_T)
-def T_plot(raw_T=None, processed_T=None):
- if not _matplotlib:
- raise _matplotlib_import_error
- figure = _matplotlib_pyplot.figure()
- timestamp = _time.strftime('%H%M%S')
- if raw_T is None:
- if processed_T is None:
- return # nothing to plot
- axes1 = None
- axes2 = figure.add_subplot(1, 1, 1)
- elif processed_T is None:
- axes1 = figure.add_subplot(1, 1, 1)
- axes2 = None
- else:
- axes1 = figure.add_subplot(2, 1, 1)
- axes2 = figure.add_subplot(2, 1, 2)
- if axes1:
- axes1.set_title('Raw Temperatures %s' % timestamp)
- axes1.plot(raw_T, label='raw')
- if axes2:
- axes2.set_title('Processed Temperatures %s' % timestamp)
- axes2.plot(processed_T, label='processed')
- if hasattr(figure, 'show'):
"""Calculate `k` from arrays of bumps, temperatures, and vibrations.
-Separate the more general `calib_analyze()` from the other `calib_*()`
+Separate the more general `analyze()` from the other calibration
functions in calibcant.
The relevent physical quantities are :
k_cant Fcant / Zcant
->>> import os
->>> import tempfile
>>> import numpy
->>> from import HDF5_Storage, pprint_HDF5
->>> from .config import CalibrationConfig
+>>> from .config import CalibrateConfig
->>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='calibcant-')
->>> os.close(fd)
->>> calibration_config = CalibrationConfig(storage=HDF5_Storage(
-... filename=filename, group='/calib/config/'))
+>>> config = CalibrateConfig()
>>> bumps = numpy.array((15.9e6, 16.9e6, 16.3e6))
>>> temperatures = numpy.array((295, 295.2, 294.8))
>>> vibrations = numpy.array((2.20e-5, 2.22e-5, 2.21e-5))
->>> k,k_s = calib_analyze(bumps=bumps, temperatures=temperatures,
+>>> k,k_s = analyze(bumps=bumps, temperatures=temperatures,
... vibrations=vibrations)
>>> (k, k_s) # doctest: +ELLIPSIS
(0.0493..., 0.00248...)
>>> vibrations.std()/vibrations.mean() # doctest: +ELLIPSIS
->>> calib_save(filename=filename, group='/calib/',
-... bumps=bumps, temperatures=temperatures, vibrations=vibrations,
-... calibration_config=calibration_config, k=k, k_s=k_s)
->>> pprint_HDF5(filename) # doctest: +ELLIPSIS, +REPORT_UDIFF
- /calib
- /calib/config
- <HDF5 dataset "bump": shape (), type "|S1">
- <HDF5 dataset "num-bumps": shape (), type "<i4">
- 10
- <HDF5 dataset "num-temperatures": shape (), type "<i4">
- 10
- <HDF5 dataset "num-vibrations": shape (), type "<i4">
- 20
- <HDF5 dataset "temperature": shape (), type "|S1">
- <HDF5 dataset "temperature-sleep": shape (), type "<i4">
- 1
- <HDF5 dataset "vibration": shape (), type "|S1">
- <HDF5 dataset "vibration-spacing": shape (), type "<f8">
- 5e-05
- /calib/processed
- /calib/processed/spring-constant
- <HDF5 dataset "data": shape (), type "<f8">
- 0.0493...
- <HDF5 dataset "standard-deviation": shape (), type "<f8">
- 0.00248...
- <HDF5 dataset "units": shape (), type "|S3">
- N/m
- /calib/raw
- /calib/raw/photodiode-sensitivity
- <HDF5 dataset "data": shape (3,), type "<f8">
- [ 15900000. 16900000. 16300000.]
- <HDF5 dataset "units": shape (), type "|S3">
- V/m
- /calib/raw/temperature
- <HDF5 dataset "data": shape (3,), type "<f8">
- [ 295. 295.2 294.8]
- <HDF5 dataset "units": shape (), type "|S1">
- K
- /calib/raw/thermal-vibration-variance
- <HDF5 dataset "data": shape (3,), type "<f8">
- [ 2.20...e-05 2.220...e-05 2.210...e-05]
- <HDF5 dataset "units": shape (), type "|S3">
- V^2
->>> bumps,temperatures,vibrations,calibration_config,k,k_s = calib_load(
-... filename=filename, group='/calib/')
->>> (k, k_s) # doctest: +ELLIPSIS
-(0.0493..., 0.00248...)
->>> os.remove(filename)
import h5py as _h5py
_matplotlib = None
_matplotlib_import_error = e
-from import HDF5_Storage as _HDF5_Storage
from import h5_create_group as _h5_create_group
+from pypiezo.base import get_axis_name as _get_axis_name
from . import LOG as _LOG
from . import package_config as _package_config
-from .bump_analyze import bump_analyze as _bump_analyze
-from .bump_analyze import bump_load as _bump_load
-from .bump_analyze import bump_save as _bump_save
-from .config import CalibrationConfig as _CalibrationConfig
-from .T_analyze import T_analyze as _temperature_analyze
-from .T_analyze import T_load as _temperature_load
-from .T_analyze import T_save as _temperature_save
-from .vib_analyze import vib_analyze as _vibration_analyze
-from .vib_analyze import vib_load as _vibration_load
-from .vib_analyze import vib_save as _vibration_save
-def calib_analyze(bumps, temperatures, vibrations):
+from .bump_analyze import analyze as _bump_analyze
+from .bump_analyze import save as _bump_save
+from .temperature_analyze import analyze as _temperature_analyze
+from .temperature_analyze import save as _temperature_save
+from .vibration_analyze import analyze as _vibration_analyze
+from .vibration_analyze import save as _vibration_save
+def analyze(bumps, temperatures, vibrations):
"""Analyze data from `get_calibration_data()`
Inputs (all are arrays of recorded data):
% (v2_m, v2_s, v2_s/v2_m))
if _package_config['matplotlib']:
- calib_plot(bumps, temperatures, vibrations)
+ plot(bumps, temperatures, vibrations)
return (k, k_s)
-def calib_save(filename, group='/', bumps=None, temperatures=None,
- vibrations=None, calibration_config=None, k=None, k_s=None):
- with _h5py.File(filename, 'a') as f:
- cwg = _h5_create_group(f, group)
- if calibration_config is not None:
- config_cwg = _h5_create_group(cwg, 'config')
- storage = _HDF5_Storage()
-, group=config_cwg)
- if bumps is not None:
- try:
- del cwg['raw/photodiode-sensitivity/data']
- except KeyError:
- pass
- try:
- del cwg['raw/photodiode-sensitivity/units']
- except KeyError:
- pass
- cwg['raw/photodiode-sensitivity/data'] = bumps
- cwg['raw/photodiode-sensitivity/units'] = 'V/m'
- if temperatures is not None:
- try:
- del cwg['raw/temperature/data']
- except KeyError:
- pass
- try:
- del cwg['raw/temperature/units']
- except KeyError:
- pass
- cwg['raw/temperature/data'] = temperatures
- cwg['raw/temperature/units'] = 'K'
- if vibrations is not None:
- try:
- del cwg['raw/thermal-vibration-variance/data']
- except KeyError:
- pass
- try:
- del cwg['raw/thermal-vibration-variance/units']
- except KeyError:
- pass
- cwg['raw/thermal-vibration-variance/data'] = vibrations
- cwg['raw/thermal-vibration-variance/units'] = 'V^2'
- if k is not None:
- try:
- del cwg['processed/spring-constant/data']
- except KeyError:
- pass
- try:
- del cwg['processed/spring-constant/units']
- except KeyError:
- pass
- cwg['processed/spring-constant/data'] = k
- cwg['processed/spring-constant/units'] = 'N/m'
- if k_s is not None:
- try:
- del cwg['processed/spring-constant/standard-deviation']
- except KeyError:
- pass
- cwg['processed/spring-constant/standard-deviation'] = k_s
-def calib_load(filename, group='/'):
- assert group.endswith('/')
- bumps = temperatures = vibrations = k = k_s = None
- configs = []
- with _h5py.File(filename, 'a') as f:
- try:
- bumps = f[group+'raw/photodiode-sensitivity/data'][...]
- except KeyError:
- pass
- try:
- temperatures = f[group+'raw/temperature/data'][...]
- except KeyError:
- pass
- try:
- vibrations = f[group+'raw/thermal-vibration-variance/data'][...]
- except KeyError:
- pass
- try:
- k = float(f[group+'processed/spring-constant/data'][...])
- except KeyError:
- pass
- try:
- k_s = float(
- f[group+'processed/spring-constant/standard-deviation'][...])
- except KeyError:
- pass
- calibration_config = _CalibrationConfig(storage=_HDF5_Storage(
- filename=filename, group=group+'config/'))
- calibration_config.load()
- return (bumps, temperatures, vibrations, calibration_config, k, k_s)
-def calib_plot(bumps, temperatures, vibrations):
+def plot(bumps, temperatures, vibrations):
if not _matplotlib:
raise _matplotlib_import_error
figure = _matplotlib_pyplot.figure()
if hasattr(figure, 'show'):
+_plot = plot # alternative name for use inside analyze_all()
+def analyze_all(config, data, raw_data, maximum_relative_error=1e-5,
+ filename=None, group=None, plot=False, dry_run=False):
+ "(Re)analyze (and possibly plot) all data from a `calib()` run."
+ if not data.get('bump', None):
+ data['bump'] = _numpy.zeros((config['num-bumps'],), dtype=float)
+ if not data.get('temperature', None):
+ data['temperature'] = _numpy.zeros(
+ (config['num-temperatures'],), dtype=float)
+ if not data.get('vibrations', None):
+ data['vibration'] = _numpy.zeros(
+ (config['num-vibrations'],), dtype=float)
+ axis_config = config['afm']['piezo'].select_config(
+ setting_name='axes',
+ attribute_value=config['afm']['main-axis'],
+ get_attribute=_get_axis_name)
+ input_config = config['afm']['piezo'].select_config(
+ setting_name='inputs', attribute_value='deflection')
+ bumps_changed = temperatures_changed = vibrations_changed = False
+ if not isinstance(group, _h5py.Group) and not dry_run:
+ f = _h5py.File(filename, mode)
+ group = _h5_create_group(f, group)
+ else:
+ f = None
+ try:
+ for i,bump in enumerate(raw_data['bump']):
+ data['bump'][i],changed = check_bump(
+ index=i, bump=bump, z_axis_config=axis_config,
+ deflection_channel_config=input_config, plot=plot,
+ maximum_relative_error=maximum_relative_error)
+ if changed and not dry_run:
+ bumps_changed = True
+ bump_group = _h5_create_group(group, 'bump/{}'.format(i))
+ _bump_save(group=bump_group, processed=data['bump'][i])
+ for i,temperature in enumerate(raw_data['temperature']):
+ data['temperature'][i],changed = check_temperature(
+ index=i, temperature=temperature,
+ maximum_relative_error=maximum_relative_error)
+ if changed and not dry_run:
+ temperatures_changed = True
+ temperature_group = _h5_create_group(
+ group, 'temperature/{}'.format(i))
+ _temperature_save(
+ group=temerature_group, processed=data['temperature'][i])
+ for i,vibration in enumerate(raw_data['vibration']):
+ data['vibration'][i],changed = check_vibration(
+ index=i, vibration=vibration,
+ deflection_channel_config=input_config, plot=plot,
+ maximum_relative_error=maximum_relative_error)
+ if changed and not dry_run:
+ vibrations_changed = True
+ vibration_group = _h5_create_group(
+ group, 'vibration/{}'.format(i))
+ _vibration_save(
+ group=vibration_group, processed=data['vibration'])
+ k,k_s,changed = check_calibration(
+ k=data['processed']['spring_constant'],
+ k_s=data['processed']['spring_constant_deviation'],
+ bumps=data['bump'],
+ temperatures=data['temperature'], vibrations=data['vibration'],
+ maximum_relative_error=maximum_relative_error)
+ if (changed or bumps_changed or temperatures_changed or
+ vibrations_changed) and not dry_run:
+ calibration_group = _h5_create_group(group, 'calibration')
+ if bumps_changed:
+ calib_save(group=calibration_group, bump=data['bump'])
+ if temperatures_changed:
+ calib_save(
+ group=calibration_group, temperature=data['temperature'])
+ if vibrations_changed:
+ calib_save(
+ group=calibration_group, vibration=data['vibration'])
+ if changed:
+ calib_save(group=calibration_group, k=k, k_s=k_s)
+ finally:
+ if f:
+ f.close()
+ if plot:
+ _plot(bumps=data['raw']['bump'],
+ temperatures=data['raw']['temperature'],
+ vibrations=data['raw']['vibration'])
+ return (k, k_s)
-def calib_load_all(filename, group='/'):
- "Load all data from a `calib()` run."
- assert group.endswith('/'), group
- bumps,temperatures,vibrations,calibration_config,k,k_s = calib_load(
- filename, group+'calibration/')
- bump_details = []
- for i in range(calibration_config['num-bumps']):
- (raw_bump,bump_config,z_axis_config,deflection_channel_config,
- processed_bump) = _bump_load(
- filename=filename, group='%sbump/%d/' % (group, i))
- bump_details.append({
- 'raw_bump': raw_bump,
- 'bump_config': bump_config,
- 'z_axis_config': z_axis_config,
- 'deflection_channel_config': deflection_channel_config,
- 'processed_bump': processed_bump,
- })
- temperature_details = []
- for i in range(calibration_config['num-temperatures']):
- (raw_temperature,temperature_config,processed_temperature
- ) = _temperature_load(
- filename=filename, group='%stemperature/%d/' % (group, i))
- temperature_details.append({
- 'raw_temperature': raw_temperature,
- 'temperature_config': temperature_config,
- 'processed_temperature': processed_temperature,
- })
- vibration_details = []
- for i in range(calibration_config['num-vibrations']):
- (raw_vibration,vibration_config,deflection_channel_config,
- processed_vibration) = _vibration_load(
- filename=filename, group='%svibration/%d/' % (group, i))
- vibration_details.append({
- 'raw_vibration': raw_vibration,
- 'vibration_config': vibration_config,
- 'deflection_channel_config': deflection_channel_config,
- 'processed_vibration': processed_vibration,
- })
- return {
- 'bumps': bumps,
- 'bump_details': bump_details,
- 'temperatures': temperatures,
- 'temperature_details': temperature_details,
- 'vibrations': vibrations,
- 'vibration_details': vibration_details,
- 'calibration_config': calibration_config,
- 'k': k,
- 'k_s': k_s,
- }
-def calib_analyze_all(filename, group='/', maximum_relative_error=1e-5,
- dry_run=False):
- "(Re)analyze all data from a `calib()` run."
- assert group.endswith('/'), group
- bumps,temperatures,vibrations,calibration_config,k,k_s = calib_load(
- filename, group+'calibration/')
- if bumps is None:
- bumps = _numpy.zeros(
- (calibration_config['num-bumps'],), dtype=float)
- if temperatures is None:
- temperatures = _numpy.zeros(
- (calibration_config['num-temperatures'],), dtype=float)
- if vibrations is None:
- vibrations = _numpy.zeros(
- (calibration_config['num-vibrations'],), dtype=float)
- changed_bump = changed_temperature = changed_vibration = False
- for i in range(calibration_config['num-bumps']):
- _changed_bump = False
- bump_group = '%sbump/%d/' % (group, i)
- (raw_bump,bump_config,z_axis_config,
- deflection_channel_config,processed_bump) = _bump_load(
- filename=filename, group=bump_group)
- sensitivity = _bump_analyze(
- data=raw_bump, bump_config=bump_config,
- z_axis_config=z_axis_config,
- deflection_channel_config=deflection_channel_config)
- bumps[i] = sensitivity
- if processed_bump is None:
- _changed_bump = True
- _LOG.warn('new analysis for bump %d: %g' % (i, sensitivity))
- else:
- rel_error = abs(sensitivity - processed_bump)/processed_bump
- if rel_error > maximum_relative_error:
- _changed_bump = True
- _LOG.warn(("new analysis doesn't match for bump %d: %g -> %g "
- "(difference: %g, relative error: %g)")
- % (i, processed_bump, sensitivity,
- sensitivity-processed_bump, rel_error))
- if _changed_bump and not dry_run:
- changed_bump = True
- _bump_save(filename, bump_group, processed_bump=sensitivity)
- for i in range(calibration_config['num-temperatures']):
- _changed_temperature = False
- temperature_group = '%stemperature/%d/' % (group, i)
- (raw_temperature,temperature_config,processed_temperature
- ) = _temperature_load(
- filename=filename, group=temperature_group)
- temperature = _temperature_analyze(
- raw_temperature, temperature_config)
- temperatures[i] = temperature
- if processed_temperature is None:
- _changed_temperature = True
- _LOG.warn('new analysis for temperature %d: %g' % (i, temperature))
- else:
- rel_error = abs(temperature - processed_temperature
- )/processed_temperature
- if rel_error > maximum_relative_error:
- _changed_temperature = True
- _LOG.warn(("new analysis doesn't match for temperature %d: "
- "%g -> %g (difference: %g, relative error: %g)")
- % (i, processed_temperature, temperature,
- temperature-processed_temperature, rel_error))
- if _changed_temperature and not dry_run:
- changed_temperature = True
- _temperature_save(
- filename, temperature_group,
- processed_T=temperature)
- for i in range(calibration_config['num-vibrations']):
- _changed_vibration = False
- vibration_group = '%svibration/%d/' % (group, i)
- (raw_vibration,vibration_config,deflection_channel_config,
- processed_vibration) = _vibration_load(
- filename=filename, group=vibration_group)
- variance = _vibration_analyze(
- deflection=raw_vibration, vibration_config=vibration_config,
- deflection_channel_config=deflection_channel_config)
- vibrations[i] = variance
- if processed_vibration is None:
- _changed_vibration = True
- _LOG.warn('new analysis for vibration %d: %g' % (i, variance))
- else:
- rel_error = abs(variance - processed_vibration)/processed_vibration
- if rel_error > maximum_relative_error:
- _changed_vibration = True
- _LOG.warn(("new analysis doesn't match for vibration %d: "
- "%g -> %g (difference: %g, relative error: %g)")
- % (i, processed_vibration, variance,
- variance-processed_vibration, rel_error))
- if _changed_vibration and not dry_run:
- changed_vibration = True
- _vibration_save(
- filename, vibration_group, processed_vibration=variance)
- calib_group = '%scalibration/' % group
- if changed_bump and not dry_run:
- calib_save(filename, calib_group, bumps=bumps)
- if changed_temperature and not dry_run:
- calib_save(filename, calib_group, temperatures=temperatures)
- if changed_vibration and not dry_run:
- calib_save(filename, calib_group, vibrations=vibrations)
- new_k,new_k_s = calib_analyze(
- bumps=bumps, temperatures=temperatures, vibrations=vibrations)
- new_calib_k = False
+def check_bump(index, bump, maximum_relative_error, **kwargs):
+ changed = False
+ sensitivity = _bump_analyze(
+ config=bump['config']['bump'], data=bump['raw'], **kwargs)
+ if bump.get('processed', None) is None:
+ changed = True
+ _LOG.warn('new analysis for bump {}: {}'.format(index, sensitivity))
+ else:
+ rel_error = abs(sensitivity - bump['processed'])/bump['processed']
+ if rel_error > maximum_relative_error:
+ changed = True
+ _LOG.warn(("new analysis doesn't match for bump {}: {} -> {} "
+ "(difference: {}, relative error: {})").format(
+ index, bump['processed'], sensitivity,
+ sensitivity-bump['processed'], rel_error))
+ return (sensitivity, changed)
+def check_temperature(index, temperature, maximum_relative_error, **kwargs):
+ changed = False
+ temp = _temperature_analyze(
+ config=temperature['config']['temperature'],
+ temperature=temperature['raw'], **kwargs)
+ if temperature.get('processed', None) is None:
+ changed = True
+ _LOG.warn('new analysis for temperature {}: {}'.format(index, temp))
+ else:
+ rel_error = abs(temp - temperature['processed']
+ )/temperature['processed']
+ if rel_error > maximum_relative_error:
+ changed = True
+ _LOG.warn(("new analysis doesn't match for temperature "
+ "{} -> {} (difference: {}, relative error: {})"
+ ).format(
+ index, temperature['processed'], temp,
+ temp-temperature['processed'], rel_error))
+ return (temp, changed)
+def check_vibration(index, vibration, maximum_relative_error, **kwargs):
+ changed = False
+ variance = _vibration_analyze(
+ config=vibration['config']['vibration'],
+ deflection=vibration['raw'], **kwargs)
+ if vibration.get('processed', None) is None:
+ changed = True
+ _LOG.warn('new analysis for temperature {}: {}'.format(
+ index, variance))
+ else:
+ rel_error = abs(variance-vibration['processed'])/vibration['processed']
+ if rel_error > maximum_relative_error:
+ _LOG.warn(("new analysis doesn't match for vibration {}: {} != {} "
+ "(difference: {}, relative error: {})").format(
+ index, variance, vibration['processed'],
+ variance-vibration['processed'], rel_error))
+ return (variance, changed)
+def check_calibration(k, k_s, maximum_relative_error, **kwargs):
+ changed = False
+ new_k,new_k_s = analyze(**kwargs)
if k is None:
- new_calib_k = True
- _LOG.warn('new analysis for k: %g' % new_k)
+ changed = True
+ _LOG.warn('new analysis for the spring constant: {}'.format(new_k))
rel_error = abs(new_k-k)/k
if rel_error > maximum_relative_error:
- new_calib_k = True
- _LOG.warn(("new analysis doesn't match for k: %g -> %g "
- "(difference: %g, relative error: %g)")
- % (k, new_k, new_k-k, rel_error))
- if new_calib_k and not dry_run:
- calib_save(filename, calib_group, k=new_k)
- new_calib_k_s = False
+ _LOG.warn(("new analysis doesn't match for the spring constant: "
+ "{} != {} (difference: {}, relative error: {})").format(
+ new_k, k, new_k-k, rel_error))
if k_s is None:
- new_calib_k_s = True
- _LOG.warn('new analysis for k_s: %g' % new_k_s)
+ changed = True
+ _LOG.warn('new analysis for the spring constant deviation: {}'.format(
+ new_k_s))
- rel_error = abs(new_k_s-k_s)/k_s
- if rel_error > maximum_relative_error:
- new_calib_k_s = True
- _LOG.warn(("new analysis doesn't match for k_s: %g -> %g "
- "(difference: %g, relative error: %g)")
- % (k_s, new_k_s, new_k_s-k_s, rel_error))
- if new_calib_k_s and not dry_run:
- calib_save(filename, calib_group, k_s=new_k_s)
- return (new_k, new_k_s)
-def calib_plot_all(bumps, bump_details, temperatures, temperature_details,
- vibrations, vibration_details, calibration_config, k, k_s,
- maximum_relative_error=1e-5):
- calib_plot(bumps, temperatures, vibrations)
- for i,bump in enumerate(bump_details):
- sensitivity = _bump_analyze(
- data=bump['raw_bump'], bump_config=bump['bump_config'],
- z_axis_config=bump['z_axis_config'],
- deflection_channel_config=bump['deflection_channel_config'],
- plot=True)
- rel_error = abs(sensitivity - bump['processed_bump']
- )/bump['processed_bump']
- if rel_error > maximum_relative_error:
- _LOG.warn(("new analysis doesn't match for bump %d: %g != %g "
- "(difference: %g, relative error: %g)")
- % (i, sensitivity, bump['processed_bump'],
- sensitivity-bump['processed_bump'], rel_error))
- # nothing interesting to plot for temperatures...
- for i,vibration in enumerate(vibration_details):
- variance = _vibration_analyze(
- deflection=vibration['raw_vibration'],
- vibration_config=vibration['vibration_config'],
- deflection_channel_config=vibration['deflection_channel_config'],
- plot=True)
- rel_error = abs(variance - vibration['processed_vibration']
- )/vibration['processed_vibration']
+ rel_error = abs(new_k-k)/k
if rel_error > maximum_relative_error:
- _LOG.warn(("new analysis doesn't match for vibration %d: %g != %g "
- "(difference: %g, relative error: %g)")
- % (i, variance, vibration['processed_vibration'],
- variance-vibration['processed_vibration'], rel_error))
+ _LOG.warn(
+ ("new analysis doesn't match for the spring constant deviation"
+ ": {} != {} (difference: {}, relative error: {})").format(
+ new_k_s, k_s, new_k_s-k_s, rel_error))
+ return (new_k, new_k_s, changed)
Photo-sensitivity is measured by bumping the cantilever against the
-surface, where `Zp = Zcant` (see the `bump_*()` family of functions).
-The measured slope Vphoto/Vout is converted to photo-sensitivity via
+surface, where `Zp = Zcant`. The measured slope Vphoto/Vout is
+converted to photo-sensitivity via::
Vphoto/Vzp_out * Vzp_out/Vzp * Vzp/Zp * Zp/Zcant = Vphoto/Zcant
(measured) (1/zp_gain) (1/zp_sensitivity) (1) (photo_sensitivity)
We do all these measurements a few times to estimate statistical
-The functions are layed out in the families:
- bump_*()
-For each family, * can be any of:
- acquire get real-world data
- save store real-world data to disk
- load get real-world data from disk
- analyze interperate the real-world data.
- plot show a nice graphic to convince people we're working :p
-A family name without any _* extension (e.g. `bump()`), runs `*_acquire()`,
-`*_save()`, `*_analyze()`.
-If `package_config['matplotlib']` is `True`, `*_analyze()` will call
-`*_plot()` internally.
import numpy as _numpy
from pypiezo.base import convert_bits_to_meters as _convert_bits_to_meters
from . import LOG as _LOG
-from .bump_analyze import bump_analyze as _bump_analyze
-from .bump_analyze import bump_save as _bump_save
+from .bump_analyze import analyze as _analyze
+from .bump_analyze import save as _save
-def bump_acquire(afm, bump_config):
+def acquire(afm, config):
"""Ramps `push_depth` closer and returns to the original position.
- afm a pyafm.AFM instance
- bump_config a .config._BumpConfig instance
+ afm a pyafm.AFM instance
+ config a .config._BumpConfig instance
Returns the acquired ramp data dictionary, with data in DAC/ADC bits.
- depth=bump_config['initial-position'], far=bump_config['far-steps'],
- setpoint=bump_config['setpoint'],
- min_slope_ratio=bump_config['min-slope-ratio'])
+ depth=config['initial-position'], far=config['far-steps'],
+ setpoint=config['setpoint'],
+ min_slope_ratio=config['min-slope-ratio'])
#afm.piezo.jump('z', 32000)
-'bump the surface to a depth of %g m with a setpoint of %g V'
- % (bump_config['push-depth'], bump_config['setpoint']))
+ 'bump the surface to a depth of {} m with a setpoint of {} V'.format(
+ config['push-depth'], config['setpoint']))
- axis = afm.piezo.axis_by_name(afm.axis_name)
+ axis = afm.piezo.axis_by_name(afm.config['main-axis'])
- start_pos = afm.piezo.last_output[afm.axis_name]
+ start_pos = afm.piezo.last_output[afm.config['main-axis']]
start_pos_m = _convert_bits_to_meters(axis.config, start_pos)
- close_pos_m = start_pos_m + bump_config['push-depth']
+ close_pos_m = start_pos_m + config['push-depth']
close_pos = _convert_meters_to_bits(axis.config, close_pos_m)
- dtype = afm.piezo.channel_dtype(afm.axis_name, direction='output')
+ dtype = afm.piezo.channel_dtype(
+ afm.config['main-axis'], direction='output')
appr = _numpy.linspace(
- start_pos, close_pos, bump_config['samples']).astype(dtype)
+ start_pos, close_pos, config['samples']).astype(dtype)
# switch numpy.append to numpy.concatenate with version 2.0+
out = _numpy.append(appr, appr[::-1])
out = out.reshape((len(out), 1))
# (samples) / (meters) * (meters/second) = (samples/second)
- freq = (bump_config['samples'] / bump_config['push-depth']
- * bump_config['push-speed'])
+ freq = (config['samples'] / config['push-depth']
+ * config['push-speed'])
- data = afm.piezo.ramp(out, freq, output_names=[afm.axis_name],
+ data = afm.piezo.ramp(out, freq, output_names=[afm.config['main-axis']],
out = out.reshape((len(out),))
data = data.reshape((data.size,))
- return {afm.axis_name: out, 'deflection': data}
+ return {afm.config['main-axis']: out, 'deflection': data}
-def bump(afm, bump_config, filename, group='/'):
- """Wrapper around bump_acquire(), bump_analyze(), bump_save().
+def run(afm, config, filename, group='/'):
+ """Wrapper around acquire(), analyze(), save().
>>> import os
>>> import tempfile
>>> from import pprint_HDF5
- >>> from pycomedi.device import Device
- >>> from pycomedi.subdevice import StreamingSubdevice
- >>> from import AnalogChannel, DigitalChannel
- >>> from pycomedi.constant import AREF, IO_DIRECTION, SUBDEVICE_TYPE, UNIT
- >>> from pypiezo.afm import AFMPiezo
- >>> from pypiezo.base import PiezoAxis, InputChannel
- >>> from pypiezo.config import ChannelConfig, AxisConfig
- >>> from stepper import Stepper
- >>> from pyafm.afm import AFM
+ >>> from import load_afm
>>> from .config import BumpConfig
>>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='calibcant-')
>>> os.close(fd)
- >>> d = Device('/dev/comedi0')
- >>>
- Setup an `AFMPiezo` instance.
- >>> s_in = d.find_subdevice_by_type(,
- ... factory=StreamingSubdevice)
- >>> s_out = d.find_subdevice_by_type(,
- ... factory=StreamingSubdevice)
- >>> axis_channel =
- ... 0, factory=AnalogChannel, aref=AREF.ground)
- >>> input_channel =, factory=AnalogChannel, aref=AREF.diff)
- >>> for chan in [axis_channel, input_channel]:
- ... chan.range = chan.find_range(unit=UNIT.volt, min=-10, max=10)
- We set the minimum voltage for the `z` axis to -9 (a volt above
- the minimum possible voltage) to help with testing
- `.get_surface_position`. Without this minimum voltage, small
- calibration errors could lead to a railed -10 V input for the
- first few surface approaching steps, which could lead to an
- `EdgeKink` error instead of a `FlatFit` error.
- >>> axis_config = AxisConfig()
- >>> axis_config.update(
- ... {'gain':20, 'sensitivity':8e-9, 'minimum':-9})
- >>> axis_channel_config = ChannelConfig()
- >>> axis_channel_config['name'] = 'z'
- >>> axis_config['channel'] = axis_channel_config
- >>> input_channel_config = ChannelConfig()
- >>> input_channel_config['name'] = 'deflection'
- >>> a = PiezoAxis(config=axis_config, axis_channel=axis_channel)
- >>> a.setup_config()
- >>> c = InputChannel(config=input_channel_config, channel=input_channel)
- >>> c.setup_config()
- >>> piezo = AFMPiezo(axes=[a], inputs=[c])
- Setup a `stepper` instance.
- >>> s_d = d.find_subdevice_by_type(SUBDEVICE_TYPE.dio)
- >>> d_channels = [, factory=DigitalChannel)
- ... for i in (0, 1, 2, 3)]
- >>> for chan in d_channels:
- ... chan.dio_config(IO_DIRECTION.output)
- >>> def write(value):
- ... s_d.dio_bitfield(bits=value, write_mask=2**4-1)
- >>> stepper = Stepper(write=write)
- Setup an `AFM` instance.
- >>> afm = AFM(piezo, stepper)
+ >>> devices = []
+ >>> afm = load_afm()
+ >>> afm.load_from_config(devices=devices)
Test a bump:
- >>> bump_config = BumpConfig()
- >>> bump(afm, bump_config, filename, group='/bump')
- TODO: replace skipped example data with real-world values
+ >>> config = BumpConfig()
+ >>> output = run(afm=afm, config=config, filename=filename, group='/')
+ >>> output # doctest: +SKIP
+ 23265462.3047795
>>> pprint_HDF5(filename) # doctest: +ELLIPSIS, +REPORT_UDIFF
- /bump
- /bump/config
- /bump/config/bump
- <HDF5 dataset "far-steps": shape (), type "<i4">
- 200
- <HDF5 dataset "initial-position": shape (), type "<f8">
- -5e-08
- <HDF5 dataset "model": shape (), type "|S9">
- quadratic
- <HDF5 dataset "push-depth": shape (), type "<f8">
- 2e-07
- <HDF5 dataset "push-speed": shape (), type "<f8">
- 1e-06
- <HDF5 dataset "samples": shape (), type "<i4">
- 1024
- <HDF5 dataset "setpoint": shape (), type "<f8">
- 2.0
- /bump/config/deflection
- /bump/config/deflection/channel
- <HDF5 dataset "channel": shape (), type "<i4">
- 0
- <HDF5 dataset "conversion-coefficients": shape (2,), type "<f8">
- [ -1.00000000e+01 3.05180438e-04]
- <HDF5 dataset "conversion-origin": shape (), type "<f8">
- 0.0
- <HDF5 dataset "device": shape (), type "|S12">
- /dev/comedi0
- <HDF5 dataset "inverse-conversion-coefficients": shape (2,), type "<f8">
- [ 0. 3276.75]
- <HDF5 dataset "inverse-conversion-origin": shape (), type "<f8">
- -10.0
- <HDF5 dataset "maxdata": shape (), type "<i8">
- 65535
- <HDF5 dataset "name": shape (), type "|S10">
- deflection
- <HDF5 dataset "range": shape (), type "<i4">
- 0
- <HDF5 dataset "subdevice": shape (), type "<i4">
- 0
- /bump/config/z
- /bump/config/z/axis
- /bump/config/z/axis/channel
- <HDF5 dataset "channel": shape (), type "<i4">
- 0
- <HDF5 dataset "conversion-coefficients": shape (2,), type "<f8">
- [ -1.00000000e+01 3.05180438e-04]
- <HDF5 dataset "conversion-origin": shape (), type "<f8">
- 0.0
- <HDF5 dataset "device": shape (), type "|S12">
- /dev/comedi0
- <HDF5 dataset "inverse-conversion-coefficients": shape (2,), type "<f8">
- [ 0. 3276.75]
- <HDF5 dataset "inverse-conversion-origin": shape (), type "<f8">
- -10.0
- <HDF5 dataset "maxdata": shape (), type "<i8">
- 65535
- <HDF5 dataset "name": shape (), type "|S1">
- z
- <HDF5 dataset "range": shape (), type "<i4">
- 0
- <HDF5 dataset "subdevice": shape (), type "<i4">
- 1
- <HDF5 dataset "gain": shape (), type "<i4">
- 20
- <HDF5 dataset "maximum": shape (), type "<f8">
- 10.0
- <HDF5 dataset "minimum": shape (), type "<i4">
- -9
- <HDF5 dataset "monitor": shape (), type "|S1">
- <HDF5 dataset "sensitivity": shape (), type "<f8">
- 8e-09
- <HDF5 dataset "processed": shape (), type "<f8">
+ /config
+ /config/bump
+ <HDF5 dataset "far-steps": shape (), type "<i4">
+ 200
+ <HDF5 dataset "initial-position": shape (), type "<f8">
+ -5e-08
+ <HDF5 dataset "min-slope-ratio": shape (), type "<f8">
+ 10.0
+ <HDF5 dataset "model": shape (), type "|S9">
+ quadratic
+ <HDF5 dataset "push-depth": shape (), type "<f8">
+ 2e-07
+ <HDF5 dataset "push-speed": shape (), type "<f8">
+ 1e-06
+ <HDF5 dataset "samples": shape (), type "<i4">
+ 1024
+ <HDF5 dataset "setpoint": shape (), type "<f8">
+ 2.0
+ /processed
+ <HDF5 dataset "data": shape (), type "<f8">
- /bump/raw
- <HDF5 dataset "deflection": shape (2048,), type "<u2">
+ <HDF5 dataset "units": shape (), type "|S3">
+ V/m
+ /raw
+ /raw/deflection
+ <HDF5 dataset "data": shape (2048,), type "<u2">
- <HDF5 dataset "z": shape (2048,), type "<u2">
+ <HDF5 dataset "units": shape (), type "|S4">
+ bits
+ /raw/z
+ <HDF5 dataset "data": shape (2048,), type "<u2">
+ <HDF5 dataset "units": shape (), type "|S4">
+ bits
- Close the Comedi device.
+ Close the Comedi devices.
- >>> d.close()
+ >>> for device in devices:
+ ... device.close()
Cleanup our temporary config file.
>>> os.remove(filename)
deflection_channel = afm.piezo.input_channel_by_name('deflection')
- axis = afm.piezo.axis_by_name(afm.axis_name)
+ axis = afm.piezo.axis_by_name(afm.config['main-axis'])
- data = bump_acquire(afm, bump_config)
- photo_sensitivity = _bump_analyze(
- data, bump_config, z_axis_config=axis.config,
+ raw = acquire(afm, config)
+ photo_sensitivity = _analyze(
+ config=config, data=raw, z_axis_config=axis.config,
- _bump_save(
- filename, group, data, bump_config,
- z_axis_config=axis.config,
- deflection_channel_config=deflection_channel.config,
- processed_bump=photo_sensitivity)
+ _save(filename=filename, group=group, config=config,
+ raw=raw, processed=photo_sensitivity)
return photo_sensitivity
"""Surface bump analysis (measures photodiode sensitivity).
-Separate the more general `bump_analyze()` from the other `bump_*()`
+Separate the more general `analyze()` from the other `bump_*()`
functions in calibcant.
The relevant physical quantities are:
`photo_sensitivity` is measured by bumping the cantilever against the
surface, where `Zp = Zcant` (see `calibrate.bump_acquire()`). The
measured slope `Vphoto/Vout` is converted to `photo_sensitivity` with
>>> import os
>>> from pprint import pprint
>>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='calibcant-')
>>> os.close(fd)
->>> bump_config = BumpConfig()
+>>> config = BumpConfig()
>>> z_channel_config = ChannelConfig()
>>> z_channel_config['name'] = 'z'
>>> z_channel_config['maxdata'] = 200
>>> deflection_channel_config['conversion-coefficients'] = (0,1)
>>> deflection_channel_config['conversion-origin'] = 0
->>> raw_bump = {
+>>> raw = {
... 'z': numpy.arange(100, dtype=numpy.uint16),
... 'deflection': numpy.arange(100, dtype=numpy.uint16),
... }
->>> raw_bump['deflection'][:50] = 50
->>> processed_bump = bump_analyze(
-... raw_bump, bump_config, z_axis_config, deflection_channel_config)
->>> bump_plot(data=raw_bump) # TODO: convert to V and m
->>> bump_save(filename=filename, group='/bump/', raw_bump=raw_bump,
-... bump_config=bump_config, z_axis_config=z_axis_config,
-... deflection_channel_config=deflection_channel_config,
-... processed_bump=processed_bump)
+>>> raw['deflection'][:50] = 50
+>>> processed = analyze(
+... config=config, data=raw, z_axis_config=z_axis_config,
+... deflection_channel_config=deflection_channel_config)
+>>> plot(data=raw) # TODO: convert to V and m
+>>> save(filename=filename, group='/bump/',
+... config=config, raw=raw, processed=processed)
>>> pprint_HDF5(filename) # doctest: +ELLIPSIS, +REPORT_UDIFF
<HDF5 dataset "initial-position": shape (), type "<f8">
+ <HDF5 dataset "min-slope-ratio": shape (), type "<f8">
+ 10.0
<HDF5 dataset "model": shape (), type "|S9">
<HDF5 dataset "push-depth": shape (), type "<f8">
<HDF5 dataset "setpoint": shape (), type "<f8">
- /bump/config/deflection
- /bump/config/deflection/channel
- <HDF5 dataset "channel": shape (), type "<i4">
- 0
- <HDF5 dataset "conversion-coefficients": shape (2,), type "<i4">
- [0 1]
- <HDF5 dataset "conversion-origin": shape (), type "<i4">
- 0
- <HDF5 dataset "device": shape (), type "|S12">
- /dev/comedi0
- <HDF5 dataset "inverse-conversion-coefficients": shape (), type "|S1">
- <HDF5 dataset "inverse-conversion-origin": shape (), type "<f8">
- 1.0
- <HDF5 dataset "maxdata": shape (), type "<i4">
- 200
- <HDF5 dataset "name": shape (), type "|S10">
- deflection
- <HDF5 dataset "range": shape (), type "<i4">
- 1
- <HDF5 dataset "subdevice": shape (), type "<i4">
- -1
- /bump/config/z
- /bump/config/z/axis
- /bump/config/z/axis/channel
- <HDF5 dataset "channel": shape (), type "<i4">
- 0
- <HDF5 dataset "conversion-coefficients": shape (2,), type "<i4">
- [0 1]
- <HDF5 dataset "conversion-origin": shape (), type "<i4">
- 0
- <HDF5 dataset "device": shape (), type "|S12">
- /dev/comedi0
- <HDF5 dataset "inverse-conversion-coefficients": shape (), type "|S1">
- <HDF5 dataset "inverse-conversion-origin": shape (), type "<f8">
- 1.0
- <HDF5 dataset "maxdata": shape (), type "<i4">
- 200
- <HDF5 dataset "name": shape (), type "|S1">
- z
- <HDF5 dataset "range": shape (), type "<i4">
- 1
- <HDF5 dataset "subdevice": shape (), type "<i4">
- -1
- <HDF5 dataset "gain": shape (), type "<f8">
- 1.0
- <HDF5 dataset "maximum": shape (), type "|S4">
- None
- <HDF5 dataset "minimum": shape (), type "|S4">
- None
- <HDF5 dataset "monitor": shape (), type "|S1">
- <HDF5 dataset "sensitivity": shape (), type "<f8">
- 1.0
- <HDF5 dataset "processed": shape (), type "<f8">
- 1.00...
+ /bump/processed
+ <HDF5 dataset "data": shape (), type "<f8">
+ 1.00...
+ <HDF5 dataset "units": shape (), type "|S3">
+ V/m
- <HDF5 dataset "deflection": shape (100,), type "<u2">
- [50 50 ... 50 51 52 ... 97 98 99]
- <HDF5 dataset "z": shape (100,), type "<u2">
- [ 0 1 2 3 ... 97 98 99]
->>> (raw_bump,bump_config,z_axis_config,deflection_channel_config,
-... processed_bump) = bump_load(filename=filename, group='/bump/')
->>> pprint(raw_bump) # doctest: +ELLIPSIS
-{'deflection': array([50, 50, ... 51, 52, 53, ..., 97, 98, 99], dtype=uint16),
- 'z': array([ 0, 1, 2, ..., 97, 98, 99], dtype=uint16)}
->>> processed_bump # doctest: +ELLIPSIS
+ /bump/raw/deflection
+ <HDF5 dataset "data": shape (100,), type "<u2">
+ [50 50 ... 50 51 52 ... 97 98 99]
+ <HDF5 dataset "units": shape (), type "|S4">
+ bits
+ /bump/raw/z
+ <HDF5 dataset "data": shape (100,), type "<u2">
+ [ 0 1 2 3 ... 97 98 99]
+ <HDF5 dataset "units": shape (), type "|S4">
+ bits
+>>> data = load(filename=filename, group='/bump/')
+>>> pprint(data) # doctest: +ELLIPSIS, +REPORT_UDIFF
+{'config': {'bump': <BumpConfig ...>},
+ 'processed': 1.00...,
+ 'raw': {'deflection': array([50, 50, ..., 52, 53, ..., 98, 99], dtype=uint16),
+ 'z': array([ 0, 1, 2, ..., 97, 98, 99], dtype=uint16)}}
>>> os.remove(filename)
-import h5py as _h5py
import numpy as _numpy
from scipy.optimize import leastsq as _leastsq
_matplotlib = None
_matplotlib_import_error = e
-from import HDF5_Storage as _HDF5_Storage
-from import h5_create_group as _h5_create_group
from pypiezo.base import convert_bits_to_volts as _convert_bits_to_volts
from pypiezo.base import convert_bits_to_meters as _convert_bits_to_meters
-from pypiezo.config import ChannelConfig as _ChannelConfig
from pypiezo.config import AxisConfig as _AxisConfig
+from pypiezo.config import InputChannelConfig as _InputChannelConfig
from . import LOG as _LOG
from . import package_config as _package_config
from .config import Linear as _Linear
from .config import Quadratic as _Quadratic
from .config import BumpConfig as _BumpConfig
+from .util import SaveSpec as _SaveSpec
+from .util import save as _save
+from .util import load as _load
-def bump_analyze(data, bump_config, z_axis_config,
- deflection_channel_config, plot=False):
+def analyze(config, data, z_axis_config,
+ deflection_channel_config, plot=False):
"""Return the slope of the bump.
data dictionary of data in DAC/ADC bits
- bump_config `.config._BumpConfig` instance
+ config `.config._BumpConfig` instance
z_axis_config z `pypiezo.config.AxisConfig` instance
- deflection `pypiezo.config.ChannelConfig` instance
+ deflection `pypiezo.config.InputChannelConfig` instance
plot boolean overriding matplotlib config setting.
photo_sensitivity (Vphoto/Zcant) in Volts/m
deflection_channel_config, data['deflection'])
high_voltage_rail = _convert_bits_to_volts(
deflection_channel_config, deflection_channel_config['maxdata'])
- if bump_config['model'] == _Linear:
+ if config['model'] == _Linear:
kwargs = {
'param_guesser': limited_linear_param_guess,
'model': limited_linear,
'model': limited_quadratic,
'sensitivity_from_fit_params': limited_quadratic_sensitivity,
- photo_sensitivity = bump_fit(
+ photo_sensitivity = fit(
z, deflection, high_voltage_rail=high_voltage_rail, plot=plot,
return photo_sensitivity
slope = params[2]
return slope
-def bump_fit(z, deflection, high_voltage_rail,
- param_guesser=limited_quadratic_param_guess,
- model=limited_quadratic,
- sensitivity_from_fit_params=limited_quadratic_sensitivity,
- plot=False):
+def fit(z, deflection, high_voltage_rail,
+ param_guesser=limited_quadratic_param_guess,
+ model=limited_quadratic,
+ sensitivity_from_fit_params=limited_quadratic_sensitivity,
+ plot=False):
"""Fit a aurface bump and return the photodiode sensitivitiy.
def residual(p, deflection, z):
return model(z, p, high_voltage_rail=high_voltage_rail) - deflection
param_guess = param_guesser(z, deflection)
- p,cov,info,mesg,ier = _leastsq(
- residual, param_guess, args=(deflection, z), full_output=True,
- maxfev=int(10e3))
+ try:
+ p,cov,info,mesg,ier = _leastsq(
+ residual, param_guess, args=(deflection, z), full_output=True,
+ maxfev=int(10e3))
+ except ValueError:
+ zd = _numpy.ndarray(list(z.shape) + [2], dtype=z.dtype)
+ zd[:,0] = z
+ zd[:,1] = d
+ _numpy.savetxt('/tmp/z-deflection.dat', zd, delimiter='\t')
+ raise
_LOG.debug('fitted params: %s' % p)
_LOG.debug('covariance matrix: %s' % cov)
#_LOG.debug('info: %s' % info)
if plot or _package_config['matplotlib']:
yguess = model(z, param_guess, high_voltage_rail=high_voltage_rail)
yfit = model(z, p, high_voltage_rail=high_voltage_rail)
- bump_plot({'z': z, 'deflection': deflection}, yguess=yguess, yfit=yfit)
+ _plot({'z': z, 'deflection': deflection}, yguess=yguess, yfit=yfit)
return sensitivity_from_fit_params(p)
-def bump_save(filename, group='/', raw_bump=None, bump_config=None,
- z_axis_config=None, deflection_channel_config=None,
- processed_bump=None):
- with _h5py.File(filename, 'a') as f:
- cwg = _h5_create_group(f, group)
- if raw_bump is not None:
- for k in ['z', 'deflection']:
- try:
- del cwg['raw/{}'.format(k)]
- except KeyError:
- pass
- cwg['raw/z'] = raw_bump['z']
- cwg['raw/deflection'] = raw_bump['deflection']
- storage = _HDF5_Storage()
- for config,key in [(bump_config, 'config/bump'),
- (z_axis_config, 'config/z/axis'),
- (deflection_channel_config,
- 'config/deflection/channel')]:
- if config is None:
- continue
- config_cwg = _h5_create_group(cwg, key)
-, group=config_cwg)
- if processed_bump is not None:
- try:
- del cwg['processed']
- except KeyError:
- pass
- cwg['processed'] = processed_bump
-def bump_load(filename, group='/'):
- assert group.endswith('/')
- raw_bump = processed_bump = None
- configs = []
- with _h5py.File(filename, 'a') as f:
- try:
- raw_bump = {
- 'z': f[group+'raw/z'][...],
- 'deflection': f[group+'raw/deflection'][...],
- }
- except KeyError:
- pass
- for Config,key in [(_BumpConfig, 'config/bump'),
- (_AxisConfig, 'config/z/axis'),
- (_ChannelConfig, 'config/deflection/channel')]:
- config = Config(storage=_HDF5_Storage(
- filename=filename, group=group+key))
- configs.append(config)
- try:
- processed_bump = float(f[group+'processed'][...])
- except KeyError:
- pass
- ret = [raw_bump]
- ret.extend(configs)
- ret.append(processed_bump)
- for config in configs:
- config.load()
- return tuple(ret)
-def bump_plot(data, yguess=None, yfit=None):
+def save(filename=None, group='/', config=None, z_axis_config=None,
+ deflection_channel_config=None, raw=None, processed=None):
+ specs = [
+ _SaveSpec(item=config, relpath='config/bump', config=_BumpConfig),
+ _SaveSpec(item=z_axis_config, relpath='config/z', config=_AxisConfig),
+ _SaveSpec(item=deflection_channel_config, relpath='config/deflection',
+ config=_InputChannelConfig),
+ _SaveSpec(item=processed, relpath='processed', units='V/m'),
+ ]
+ if raw is not None:
+ for key in raw.keys():
+ specs.append(_SaveSpec(
+ item=raw[key], relpath='raw/{}'.format(key), array=True,
+ units='bits'))
+ _save(filename=filename, group=group, specs=specs)
+def load(filename=None, group='/'):
+ specs = [
+ _SaveSpec(key=('config', 'bump'), relpath='config/bump',
+ config=_BumpConfig),
+ _SaveSpec(key=('config', 'z_axis_config'), relpath='config/z',
+ config=_AxisConfig),
+ _SaveSpec(key=('config', 'deflection_channel_config'),
+ relpath='config/deflection', config=_InputChannelConfig),
+ _SaveSpec(key=('raw', 'z'), relpath='raw/z', array=True, units='bits'),
+ _SaveSpec(key=('raw', 'deflection'), relpath='raw/deflection',
+ array=True, units='bits'),
+ _SaveSpec(key=('processed',), relpath='processed', units='V/m'),
+ ]
+ return _load(filename=filename, group=group, specs=specs)
+def plot(data, yguess=None, yfit=None):
"Plot the bump (Vphoto vs Vzp)"
if not _matplotlib:
raise _matplotlib_import_error
residual_axes.set_ylabel('Photodiode (Volts)')
if hasattr(figure, 'show'):
+_plot = plot # alternative name for use inside fit()
* Vphoto The photodiode vertical deflection voltage (what we measure)
* Fcant The force on the cantilever
* T The temperature of the cantilever and surrounding solution
-* (another thing we measure or guess)
+ (another thing we measure or guess)
* k_b Boltzmann's constant
Which are related by the parameters:
We do all these measurements a few times to estimate statistical
-The functions are layed out in the families::
- bump_*(), vib_*(), T_*(), and calib_*()
-For each family, * can be any of:
-* acquire get real-world data
-* save store real-world data to disk
-* load get real-world data from disk
-* analyze interperate the real-world data.
-* plot show a nice graphic to convince people we're working :p
-A family name without any `_*` extension (e.g. `bump()`), runs
-`*_acquire()`, `*_analyze()`, and `*_save()`. `*_analyze()` will run
-`*_plot()` if `matplotlib` is set in `calibcant.package_config`.
-from numpy import zeros as _zeros
-from numpy import float as _float
from time import sleep as _sleep
-from . import LOG as _LOG
-from .bump import bump as _bump
-from .T import T as _T
-from .vib import vib as _vib
-from .analyze import calib_analyze as _calib_analyze
-from .analyze import calib_save as _calib_save
-def move_far_from_surface(stepper, distance):
- """Step back approximately `distance` meters.
- """
- steps = int(distance/stepper.step_size)
-'step back %d steps (~%g m)' % (steps, distance))
- stepper.step_relative(-steps)
-def calib_acquire(afm, calibration_config, filename=None, group='/'):
- """Acquire data for calibrating a cantilever in one function.
- Inputs:
- afm a pyafm.AFM instance
- calibration_config a .config._CalibrationConfig instance
+from numpy import zeros as _zeros
+from numpy import float as _float
- Outputs (all are arrays of recorded data):
- bumps measured (V_photodiode / nm_tip) proportionality constant
- Ts measured temperature (K)
- vibs measured V_photodiode variance (Volts**2) in free solution
+import h5py as _h5py
+from pyafm.afm import AFM as _AFM
+from import HDF5_Storage as _HDF5_Storage
- The temperatures are collected after moving far from the surface
- but before and vibrations are measured to give everything time to
- settle after the big move.
- """
+from . import LOG as _LOG
+from .config import CalibrateConfig as _CalibrateConfig
+from .bump import run as _bump
+from .bump_analyze import load as _bump_load
+from .temperature import run as _temperature
+from .temperature_analyze import load as _temperature_load
+from .vibration import run as _vibration
+from .vibration_analyze import load as _vibration_load
+from .analyze import analyze as _analyze
+from .util import SaveSpec as _SaveSpec
+from .util import save as _save
+from .util import load as _load
+def load(filename=None, group='/'):
+ config = _CalibrateConfig(storage=_HDF5_Storage(
+ filename=filename, group=group))
+ config.load()
+ return Calibrator(config=config)
+def load_all(filename=None, group='/', raw=True):
+ "Load all data from a `Calibration.calibrate()` run."
assert group.endswith('/'), group
+ calibrator = load(
+ filename=filename, group='{}config/'.format(group))
+ data = calibrator.load_results(
+ filename=filename, group='{}calibration/'.format(group))
+ if raw:
+ raw_data = calibrator.load_raw(filename=filename, group=group)
+ else:
+ raw_data = None
+ return (calibrator, data, raw_data)
- bumps = _zeros((calibration_config['num-bumps'],), dtype=_float)
- for i in range(calibration_config['num-bumps']):
-'acquire bump %d of %d' % (i, calibration_config['num-bumps']))
- bumps[i] = _bump(afm=afm, bump_config=calibration_config['bump'],
- filename=filename, group='%sbump/%d/' % (group, i))
- _LOG.debug('bumps: %s' % bumps)
- move_far_from_surface(
- afm.stepper, distance=calibration_config['vibration-spacing'])
- Ts = _zeros((calibration_config['num-temperatures'],), dtype=_float)
- for i in range(calibration_config['num-temperatures']):
-'acquire T %d of %d'
- % (i, calibration_config['num-temperatures']))
- Ts[i] = _T(
- get_T=afm.get_temperature,
- temperature_config=calibration_config['temperature'],
- filename=filename, group='%stemperature/%d/' % (group, i))
- _sleep(calibration_config['temperature-sleep'])
- _LOG.debug('temperatures: %s' % Ts)
- # get vibs
- vibs = _zeros((calibration_config['num-vibrations'],), dtype=_float)
- for i in range(calibration_config['num-vibrations']):
- vibs[i] = _vib(
- piezo=afm.piezo, vibration_config=calibration_config['vibration'],
- filename=filename, group='%svibration/%d/' % (group, i))
- _LOG.debug('vibrations: %s' % vibs)
- return (bumps, Ts, vibs)
-def calib(afm, calibration_config, filename=None, group='/'):
- """Calibrate a cantilever in one function.
- Inputs:
- (see `calib_acquire()`)
- Outputs:
- k cantilever spring constant (in N/m, or equivalently nN/nm)
- k_s standard deviation in our estimate of k
+class Calibrator (object):
+ """Calibrate a cantilever spring constant using the thermal tune method.
>>> import os
>>> from pprint import pprint
>>> import tempfile
>>> from import pprint_HDF5
- >>> from pycomedi.device import Device
- >>> from pycomedi.subdevice import StreamingSubdevice
- >>> from import AnalogChannel, DigitalChannel
- >>> from pycomedi.constant import AREF, IO_DIRECTION, SUBDEVICE_TYPE, UNIT
- >>> from pypiezo.afm import AFMPiezo
- >>> from pypiezo.base import PiezoAxis, InputChannel
- >>> from pypiezo.config import ChannelConfig, AxisConfig
- >>> from stepper import Stepper
- >>> from pyafm.afm import AFM
- >>> from .config import (CalibrationConfig, BumpConfig,
+ >>> from import load_afm
+ >>> from .config import (CalibrateConfig, BumpConfig,
... TemperatureConfig, VibrationConfig)
- >>> from .analyze import calib_load_all
>>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='calibcant-')
>>> os.close(fd)
- >>> d = Device('/dev/comedi0')
- >>>
- Setup an `AFMPiezo` instance.
- >>> s_in = d.find_subdevice_by_type(,
- ... factory=StreamingSubdevice)
- >>> s_out = d.find_subdevice_by_type(,
- ... factory=StreamingSubdevice)
- >>> axis_channel =
- ... 0, factory=AnalogChannel, aref=AREF.ground)
- >>> input_channel =, factory=AnalogChannel, aref=AREF.diff)
- >>> for chan in [axis_channel, input_channel]:
- ... chan.range = chan.find_range(unit=UNIT.volt, min=-10, max=10)
- We set the minimum voltage for the `z` axis to -9 (a volt above
- the minimum possible voltage) to help with testing
- `.get_surface_position`. Without this minimum voltage, small
- calibration errors could lead to a railed -10 V input for the
- first few surface approaching steps, which could lead to an
- `EdgeKink` error instead of a `FlatFit` error.
- >>> axis_config = AxisConfig()
- >>> axis_config.update(
- ... {'gain':20, 'sensitivity':8e-9, 'minimum':-9})
- >>> axis_channel_config = ChannelConfig()
- >>> axis_channel_config['name'] = 'z'
- >>> axis_config['channel'] = axis_channel_config
- >>> input_channel_config = ChannelConfig()
- >>> input_channel_config['name'] = 'deflection'
- >>> a = PiezoAxis(config=axis_config, axis_channel=axis_channel)
- >>> a.setup_config()
- >>> c = InputChannel(config=input_channel_config, channel=input_channel)
+ >>> devices = []
+ >>> afm = load_afm()
+ >>> afm.load_from_config(devices=devices)
+ >>> if afm.piezo is None:
+ ... raise NotImplementedError('save a better default AFM!')
+ >>> config = CalibrateConfig()
+ >>> config['bump'] = BumpConfig()
+ >>> config['temperature'] = TemperatureConfig()
+ >>> config['vibration'] = VibrationConfig()
+ >>> c = Calibrator(config=config, afm=afm)
>>> c.setup_config()
- >>> piezo = AFMPiezo(axes=[a], inputs=[c])
- Setup a `stepper` instance.
- >>> s_d = d.find_subdevice_by_type(SUBDEVICE_TYPE.dio)
- >>> d_channels = [, factory=DigitalChannel)
- ... for i in (0, 1, 2, 3)]
- >>> for chan in d_channels:
- ... chan.dio_config(IO_DIRECTION.output)
- >>> def write(value):
- ... s_d.dio_bitfield(bits=value, write_mask=2**4-1)
- >>> stepper = Stepper(write=write)
- Setup an `AFM` instance.
- >>> afm = AFM(piezo, stepper)
- Test calibration:
- >>> calibration_config = CalibrationConfig()
- >>> calibration_config['bump'] = BumpConfig()
- >>> calibration_config['temperature'] = TemperatureConfig()
- >>> calibration_config['vibration'] = VibrationConfig()
- >>> calib(afm, calibration_config, filename=filename, group='/')
- TODO: replace skipped example data with real-world values
+ >>> k,k_s,data = c.calibrate(filename=filename)
+ >>> k # doctest: +SKIP
+ 0.058402262154840491
+ >>> k_s # doctest: +SKIP
+ 0.0010609833397949553
+ >>> pprint(data) # doctest: +ELLIPSIS, +REPORT_UDIFF
+ {'bump': array([...]),
+ 'temperature': array([...]),
+ 'vibration': array([...])}
>>> pprint_HDF5(filename) # doctest: +ELLIPSIS, +REPORT_UDIFF
<HDF5 dataset "far-steps": shape (), type "<i4">
+ <HDF5 dataset "initial-position": shape (), type "<f8">
+ -5e-08
- /bump/0/config/deflection
- /bump/0/config/deflection/channel
- <HDF5 dataset "channel": shape (), type "<i4">
- 0
- ...
- /bump/0/config/z
- /bump/0/config/z/axis
- /bump/0/config/z/axis/channel
- <HDF5 dataset "channel": shape (), type "<i4">
- 0
- ...
- <HDF5 dataset "gain": shape (), type "<i4">
- 20
- ...
- <HDF5 dataset "processed": shape (), type "<f8">
- ...
+ /bump/0/processed
+ <HDF5 dataset "data": shape (), type "<f8">
+ ...
+ <HDF5 dataset "units": shape (), type "|S3">
+ V/m
- <HDF5 dataset "deflection": shape (2048,), type "<u2">
- [...]
- <HDF5 dataset "z": shape (2048,), type "<u2">
- [...]
+ /bump/0/raw/deflection
+ <HDF5 dataset "data": shape (2048,), type "<u2">
+ [...]
+ <HDF5 dataset "units": shape (), type "|S4">
+ bits
+ /bump/0/raw/z
+ <HDF5 dataset "data": shape (2048,), type "<u2">
+ [...]
+ <HDF5 dataset "units": shape (), type "|S4">
+ bits
- /calibration
- /calibration/config
- /calibration/config/bump
- <HDF5 dataset "far-steps": shape (), type "<i4">
- 200
- ...
- <HDF5 dataset "num-bumps": shape (), type "<i4">
- 10
- ...
- /calibration/processed
- /calibration/processed/spring-constant
+ /config
+ /config/afm
+ <HDF5 dataset "fallback-temperature": shape (), type "<f8">
+ 295.15
+ <HDF5 dataset "far": shape (), type "<f8">
+ 3e-05
+ <HDF5 dataset "main-axis": shape (), type "|S1">
+ z
+ <HDF5 dataset "name": shape (), type "|S5">
+ 1B3D9
+ /config/afm/piezo
+ /config/afm/piezo/axes
+ /config/afm/piezo/axes/0
+ /config/afm/piezo/axes/0/channel
+ <HDF5 dataset "analog-reference": shape (), type "|S6">
+ ground
+ <HDF5 dataset "channel": shape (), type "<i4">
+ 0
+ <HDF5 dataset "conversion-coefficients": shape (2,), type "<f8">
+ [ -1.00000000e+01 3.05180438e-04]
+ <HDF5 dataset "conversion-origin": shape (), type "<f8">
+ 0.0
+ <HDF5 dataset "device": shape (), type "|S12">
+ /dev/comedi0
+ <HDF5 dataset "inverse-conversion-coefficients": shape (2,), type "<f8">
+ [ 0. 3276.75]
+ <HDF5 dataset "inverse-conversion-origin": shape (), type "<f8">
+ -10.0
+ <HDF5 dataset "maxdata": shape (), type "<i8">
+ 65535
+ <HDF5 dataset "name": shape (), type "|S1">
+ z
+ <HDF5 dataset "range": shape (), type "<i4">
+ 0
+ <HDF5 dataset "subdevice": shape (), type "<i4">
+ 1
+ <HDF5 dataset "gain": shape (), type "<f8">
+ 20.0
+ <HDF5 dataset "maximum": shape (), type "<f8">
+ 9.0
+ <HDF5 dataset "minimum": shape (), type "<f8">
+ -9.0
+ <HDF5 dataset "monitor": shape (), type "|S1">
+ <HDF5 dataset "sensitivity": shape (), type "<f8">
+ 8.8e-09
+ /config/afm/piezo/axes/1
+ /config/afm/piezo/axes/1/channel
+ <HDF5 dataset "analog-reference": shape (), type "|S6">
+ ground
+ <HDF5 dataset "channel": shape (), type "<i4">
+ 1
+ <HDF5 dataset "conversion-coefficients": shape (2,), type "<f8">
+ [ -1.00000000e+01 3.05180438e-04]
+ <HDF5 dataset "conversion-origin": shape (), type "<f8">
+ 0.0
+ <HDF5 dataset "device": shape (), type "|S12">
+ /dev/comedi0
+ <HDF5 dataset "inverse-conversion-coefficients": shape (2,), type "<f8">
+ [ 0. 3276.75]
+ <HDF5 dataset "inverse-conversion-origin": shape (), type "<f8">
+ -10.0
+ <HDF5 dataset "maxdata": shape (), type "<i8">
+ 65535
+ <HDF5 dataset "name": shape (), type "|S1">
+ x
+ <HDF5 dataset "range": shape (), type "<i4">
+ 0
+ <HDF5 dataset "subdevice": shape (), type "<i4">
+ 1
+ <HDF5 dataset "gain": shape (), type "<f8">
+ 20.0
+ <HDF5 dataset "maximum": shape (), type "<f8">
+ 8.0
+ <HDF5 dataset "minimum": shape (), type "<f8">
+ -8.0
+ <HDF5 dataset "monitor": shape (), type "|S1">
+ <HDF5 dataset "sensitivity": shape (), type "<f8">
+ 4.16e-09
+ /config/afm/piezo/inputs
+ /config/afm/piezo/inputs/0
+ <HDF5 dataset "analog-reference": shape (), type "|S4">
+ diff
+ <HDF5 dataset "channel": shape (), type "<i4">
+ 0
+ <HDF5 dataset "conversion-coefficients": shape (2,), type "<f8">
+ [ -1.00000000e+01 3.05180438e-04]
+ <HDF5 dataset "conversion-origin": shape (), type "<f8">
+ 0.0
+ <HDF5 dataset "device": shape (), type "|S12">
+ /dev/comedi0
+ <HDF5 dataset "inverse-conversion-coefficients": shape (2,), type "<f8">
+ [ 0. 3276.75]
+ <HDF5 dataset "inverse-conversion-origin": shape (), type "<f8">
+ -10.0
+ <HDF5 dataset "maxdata": shape (), type "<i8">
+ 65535
+ <HDF5 dataset "name": shape (), type "|S10">
+ deflection
+ <HDF5 dataset "range": shape (), type "<i4">
+ 0
+ <HDF5 dataset "subdevice": shape (), type "<i4">
+ 0
+ <HDF5 dataset "name": shape (), type "|S5">
+ 2253E
+ /config/afm/stepper
+ <HDF5 dataset "backlash": shape (), type "<i4">
+ 100
+ <HDF5 dataset "delay": shape (), type "<f8">
+ 0.01
+ <HDF5 dataset "full-step": shape (), type "|b1">
+ True
+ <HDF5 dataset "logic": shape (), type "|b1">
+ True
+ <HDF5 dataset "name": shape (), type "|S9">
+ z-stepper
+ /config/afm/stepper/port
+ <HDF5 dataset "channels": shape (4,), type "<i4">
+ [0 1 2 3]
+ <HDF5 dataset "device": shape (), type "|S12">
+ /dev/comedi0
+ <HDF5 dataset "direction": shape (), type "|S6">
+ output
+ <HDF5 dataset "name": shape (), type "|S12">
+ stepper DB-9
+ <HDF5 dataset "subdevice": shape (), type "<i4">
+ 2
+ <HDF5 dataset "subdevice-type": shape (), type "|S3">
+ dio
+ <HDF5 dataset "step-size": shape (), type "<f8">
+ 1.7e-07
+ /config/afm/temperature
+ <HDF5 dataset "baudrate": shape (), type "<i4">
+ 9600
+ <HDF5 dataset "controller": shape (), type "<i4">
+ 1
+ <HDF5 dataset "device": shape (), type "|S10">
+ /dev/ttyS0
+ <HDF5 dataset "max-current": shape (), type "<f8">
+ 0.0
+ <HDF5 dataset "name": shape (), type "|S14">
+ room (ambient)
+ <HDF5 dataset "units": shape (), type "|S7">
+ Celsius
+ /config/bump
+ <HDF5 dataset "far-steps": shape (), type "<i4">
+ 200
+ <HDF5 dataset "initial-position": shape (), type "<f8">
+ -5e-08
+ <HDF5 dataset "min-slope-ratio": shape (), type "<f8">
+ 10.0
+ <HDF5 dataset "model": shape (), type "|S9">
+ quadratic
+ <HDF5 dataset "push-depth": shape (), type "<f8">
+ 2e-07
+ <HDF5 dataset "push-speed": shape (), type "<f8">
+ 1e-06
+ <HDF5 dataset "samples": shape (), type "<i4">
+ 1024
+ <HDF5 dataset "setpoint": shape (), type "<f8">
+ 2.0
+ <HDF5 dataset "num-bumps": shape (), type "<i4">
+ 10
+ <HDF5 dataset "num-temperatures": shape (), type "<i4">
+ 10
+ <HDF5 dataset "num-vibrations": shape (), type "<i4">
+ 20
+ /config/temperature
+ <HDF5 dataset "sleep": shape (), type "<i4">
+ 1
+ /config/vibration
+ <HDF5 dataset "chunk-size": shape (), type "<i4">
+ 2048
+ <HDF5 dataset "frequency": shape (), type "<f8">
+ 50000.0
+ <HDF5 dataset "maximum-fit-frequency": shape (), type "<f8">
+ 25000.0
+ <HDF5 dataset "minimum-fit-frequency": shape (), type "<f8">
+ 500.0
+ <HDF5 dataset "model": shape (), type "|S12">
+ Breit-Wigner
+ <HDF5 dataset "overlap": shape (), type "|b1">
+ False
+ <HDF5 dataset "sample-time": shape (), type "<i4">
+ 1
+ <HDF5 dataset "window": shape (), type "|S4">
+ Hann
+ <HDF5 dataset "vibration-spacing": shape (), type "<f8">
+ 5e-05
+ /temperature
+ /temperature/0
+ /temperature/0/config
+ /temperature/0/config/temperature
+ <HDF5 dataset "sleep": shape (), type "<i4">
+ 1
+ /temperature/0/processed
<HDF5 dataset "data": shape (), type "<f8">
- <HDF5 dataset "standard-deviation": shape (), type "<f8">
+ <HDF5 dataset "units": shape (), type "|S1">
+ K
+ /temperature/0/raw
+ <HDF5 dataset "data": shape (), type "<f8">
- <HDF5 dataset "units": shape (), type "|S3">
- N/m
- /calibration/raw
- /calibration/raw/photodiode-sensitivity
- <HDF5 dataset "data": shape (10,), type "<f8">
- [...]
- <HDF5 dataset "units": shape (), type "|S3">
- V/m
- /calibration/raw/temperature
- <HDF5 dataset "data": shape (10,), type "<f8">
- [...]
<HDF5 dataset "units": shape (), type "|S1">
- /calibration/raw/thermal-vibration-variance
- <HDF5 dataset "data": shape (20,), type "<f8">
- [...]
- <HDF5 dataset "units": shape (), type "|S3">
- V^2
- /temperature
- /temperature/0
- /temperature/0/config
- <HDF5 dataset "default": shape (), type "|b1">
- False
- <HDF5 dataset "units": shape (), type "|S7">
- Celsius
- <HDF5 dataset "processed": shape (), type "<f8">
- 295.15
- <HDF5 dataset "raw": shape (), type "<i4">
- 22
- <HDF5 dataset "channel": shape (), type "<i4">
- 0
<HDF5 dataset "chunk-size": shape (), type "<i4">
+ <HDF5 dataset "frequency": shape (), type "<f8">
+ 50000.0
- <HDF5 dataset "processed": shape (), type "<f8">
- ...
+ /vibration/0/processed
+ <HDF5 dataset "data": shape (), type "<f8">
+ ...
+ <HDF5 dataset "units": shape (), type "|S6">
+ V^2/Hz
- <HDF5 dataset "deflection": shape (65536,), type "<u2">
+ <HDF5 dataset "data": shape (65536,), type "<u2">
- /vibration/1
+ <HDF5 dataset "units": shape (), type "|S4">
+ bits
- /vibration/19
- ...
- /vibration/19/raw
- <HDF5 dataset "deflection": shape (65536,), type "<u2">
- [...]
- >>> everything = calib_load_all(filename, '/')
- >>> pprint(everything) # doctest: +ELLIPSIS, +REPORT_UDIFF
- {'bump_details': [{'bump_config': <BumpConfig ...>,
- 'deflection_channel_config': <ChannelConfig ...>,
- 'processed_bump': ...,
- 'raw_bump': {'deflection': array([...], dtype=uint16),
- 'z': array([...], dtype=uint16)},
- 'z_axis_config': <AxisConfig ...>},
- ...],
- 'bumps': array([...]),
- 'calibration_config': <CalibrationConfig ...>,
- 'k': ...,
- 'k_s': ...,
- 'temperature_details': [{'processed_temperature': ...,
- 'raw_temperature': array(22),
- 'temperature_config': <TemperatureConfig ...>},
- ...],
- 'temperatures': array([...]),
- 'vibration_details': [{'deflection_channel_config': <ChannelConfig ...>,
- 'processed_vibration': ...,
- 'raw_vibration': array([...], dtype=uint16),
- 'vibration_config': <VibrationConfig ...>},
- ...],
- 'vibrations': array([...])}
- Close the Comedi device.
- >>> d.close()
+ >>> calibrator,data,raw_data = load_all(filename=filename)
+ >>> calibrator.load_from_config(devices=devices)
+ >>> print(calibrator.config.dump()) # doctest: +ELLIPSIS, +REPORT_UDIFF
+ afm:
+ name: 1B3D9
+ main-axis: z
+ piezo:
+ name: 2253E
+ ...
+ >>> pprint(data) # doctest: +ELLIPSIS, +REPORT_UDIFF
+ {'processed': {'spring_constant': ...
+ 'spring_constant_deviation': ...},
+ 'raw': {'bump': array([...]),
+ 'temperature': array([...]),
+ 'vibration': array([...])}}
+ >>> pprint(raw_data) # doctest: +ELLIPSIS, +REPORT_UDIFF
+ {'bump': [{'config': {'bump': <BumpConfig ...>},
+ 'processed': ...,
+ 'raw': {'deflection': array([...], dtype=uint16),
+ 'z': array([...], dtype=uint16)}},
+ {...},
+ ...],
+ 'temperature': [{'config': {'temperature': <TemperatureConfig ...>},
+ 'processed': ...,
+ 'raw': ...},
+ {...},
+ ...],
+ 'vibration': [{'config': {'vibration': <InputChannelConfig ...>},
+ 'processed': ...
+ 'raw': array([...], dtype=uint16)},
+ {...},
+ ...]}
+ Close the Comedi devices.
+ >>> for device in devices:
+ ... device.close()
Cleanup our temporary config file.
- os.remove(filename)
+ >>> os.remove(filename)
- bumps, Ts, vibs = calib_acquire(
- afm, calibration_config, filename=filename, group=group)
- # TODO: convert vib units?
- k,k_s = _calib_analyze(bumps, Ts, vibs)
- _calib_save(filename, group=group+'calibration/', bumps=bumps,
- temperatures=Ts, vibrations=vibs,
- calibration_config=calibration_config, k=k, k_s=k_s)
- return (k, k_s)
+ def __init__(self, config, afm=None):
+ self.config = config
+ self.afm = afm
+ def load_from_config(self, devices):
+ if self.afm is None:
+ self.afm = _AFM(config=self.config['afm'])
+ self.afm.load_from_config(devices=devices)
+ def setup_config(self):
+ if self.afm:
+ self.afm.setup_config()
+ self.config['afm'] = self.afm.config
+ def calibrate(self, filename=None, group='/'):
+ """Main calibration method.
+ Outputs:
+ k cantilever spring constant (in N/m, or equivalently nN/nm)
+ k_s standard deviation in our estimate of k
+ data the data used to determine k
+ """
+ data = self.acquire(filename=filename, group=group)
+ k = k_s = bumps = temperatures = vibrations = None
+ bumps = data.get('bump', None)
+ temperatures = data.get('temperature', None)
+ vibrations = data.get('vibration', None)
+ if None not in [bumps, temperatures, vibrations]:
+ k,k_s = _analyze(
+ bumps=bumps, temperatures=temperatures, vibrations=vibrations)
+ if filename is not None:
+ self.save_results(
+ filename=filename, group='{}calibration/'.format(group),
+ spring_constant=k, spring_constant_deviation=k_s, **data)
+ return (k, k_s, data)
+ def acquire(self, filename=None, group='/'):
+ """Acquire data for calibrating a cantilever in one function.
+ Outputs a dict of `action` -> `data_array` pairs, for each
+ action (bump, temperature, vibration) that is actually
+ configured. For example, if you wanted to skip the surface
+ approach, bumping, and retraction, you could just set
+ `.config['bump']` to `None`.
+ The temperatures are collected after moving far from the
+ surface but before and vibrations are measured to give
+ everything time to settle after the big move.
+ Because theres a fair amount of data coming in during a
+ calibration, we save the data as it comes in. So the
+ procedure is bump-0, save-bump-0, bump-1, save-bump-0, etc.
+ To disable the saving, just set `filename` to `None`.
+ """
+ if filename is not None:
+ assert group.endswith('/'), group
+, group='{}config/'.format(group))
+ data = {}
+ if self.config['bump'] and self.config['num-bumps'] > 0:
+ data['bump'] = _zeros((self.config['num-bumps'],), dtype=_float)
+ for i in range(self.config['num-bumps']):
+'acquire bump {} of {}'.format(
+ i, self.config['num-bumps']))
+ data['bump'][i] = _bump(
+ afm=self.afm, config=self.config['bump'],
+ filename=filename, group='{}bump/{}/'.format(group, i))
+ _LOG.debug('bumps: {}'.format(data['bump']))
+ self.afm.move_away_from_surface(
+ distance=self.config['vibration-spacing'])
+ if self.config['temperature'] and self.config['num-temperatures'] > 0:
+ data['temperature'] = _zeros(
+ (self.config['num-temperatures'],), dtype=_float)
+ for i in range(self.config['num-temperatures']):
+'acquire temperature {} of {}'.format(
+ i, self.config['num-temperatures']))
+ data['temperature'][i] = _temperature(
+ get=self.afm.get_temperature,
+ config=self.config['temperature'],
+ filename=filename,
+ group='{}temperature/{}/'.format(group, i))
+ _sleep(self.config['temperature']['sleep'])
+ _LOG.debug('temperatures: {}'.format(data['temperature']))
+ if self.config['vibration'] and self.config['num-vibrations'] > 0:
+ data['vibration'] = _zeros(
+ (self.config['num-vibrations'],), dtype=_float)
+ for i in range(self.config['num-vibrations']):
+ data['vibration'][i] = _vibration(
+ piezo=self.afm.piezo, config=self.config['vibration'],
+ filename=filename,
+ group='{}vibration/{}/'.format(group, i))
+ _LOG.debug('vibrations: {}'.format(data['vibration']))
+ return data
+ def save(self, filename=None, group='/'):
+ storage = _HDF5_Storage(filename=filename, group=group)
+ @staticmethod
+ def save_results(filename=None, group='/', bump=None,
+ temperature=None, vibration=None, spring_constant=None,
+ spring_constant_deviation=None):
+ specs = [
+ _SaveSpec(item=bump, relpath='raw/photodiode-sensitivity',
+ array=True, units='V/m'),
+ _SaveSpec(item=temperature, relpath='raw/temperature',
+ array=True, units='K'),
+ _SaveSpec(item=vibration, relpath='raw/vibration',
+ array=True, units='V^2/Hz'),
+ _SaveSpec(item=spring_constant, relpath='processed/spring-constant',
+ units='N/m', deviation=spring_constant_deviation),
+ ]
+ _save(filename=filename, group=group, specs=specs)
+ @staticmethod
+ def load_results(filename, group='/'):
+ """Load results saved with `.save_results()`."""
+ specs = [
+ _SaveSpec(key=('raw', 'bump'),
+ relpath='raw/photodiode-sensitivity',
+ array=True, units='V/m'),
+ _SaveSpec(key=('raw', 'temperature'), relpath='raw/temperature',
+ array=True, units='K'),
+ _SaveSpec(key=('raw', 'vibration'),
+ relpath='raw/vibration',
+ array=True, units='V^2/Hz'),
+ _SaveSpec(key=('processed', 'spring_constant'),
+ relpath='processed/spring-constant',
+ units='N/m', deviation='spring_constant_deviation'),
+ ]
+ return _load(filename=filename, group=group, specs=specs)
+ def load_raw(self, filename=None, group='/'):
+ """Load results saved during `.aquire()` by bumps, etc."""
+ data = {}
+ with _h5py.File(filename, 'a') as f:
+ for name,loader in [('bump',_bump_load),
+ ('temperature', _temperature_load),
+ ('vibration', _vibration_load),
+ ]:
+ n = self.config['num-{}s'.format(name)]
+ if n > 0:
+ data[name] = []
+ for i in range(n):
+ try:
+ cwg = f['{}{}/{}/'.format(group, name, i)]
+ except KeyError:
+ pass
+ else:
+ data[name].append(loader(group=cwg))
+ return data
# You should have received a copy of the GNU General Public License along with
# calibcant. If not, see <>.
-"""Define some variables to configure the package for a particular lab
-and workflow."""
+"""h5config support, so we can easily save what we did and load it later.
import sys as _sys
from FFT_tools import window_hann as _window_hann
import h5config.config as _config
import as _h5config_tools
+from pyafm.config import AFMConfig as _AFMConfig
class PackageConfig (_h5config_tools.PackageConfig):
help='Plot piezo motion using `matplotlib`.',
- _config.FloatSetting(
- name='temperature',
- help=('Default temperature for thermal calibration in degrees '
- 'Celsius.'),
- default=22),
-class _TemperatureUnit (object):
- pass
-class Celsius (_TemperatureUnit):
- pass
-class Kelvin (_TemperatureUnit):
- pass
-class TemperatureConfig (_config.Config):
- "Configure `calibcant` temperature operation"
- settings = [
- _config.ChoiceSetting(
- name='units',
- help='Units of raw temperature measurements.',
- default=Celsius,
- choices=[
- ('Celsius', Celsius),
- ('Kelvin', Kelvin),
- ]),
- _config.BooleanSetting(
- name='default',
- help=('The temperature values are defaults (vs. real '
- 'measurements).'),
- default=True),
- ]
class _BumpModel (object):
+class TemperatureConfig (_config.Config):
+ "Configure `calibcant` temperature operation"
+ settings = [
+ _config.FloatSetting(
+ name='sleep',
+ help=('Time between temperature measurements (in seconds) to get '
+ 'independent measurements when reading from slow sensors.'),
+ default=1),
+ ]
class _VibrationModel (object):
class Variance (_VibrationModel):
-class CalibrationConfig (_config.Config):
+class CalibrateConfig (_config.Config):
"Configure a full `calibcant` calibration run"
settings = [
+ _config.ConfigSetting(
+ name='afm',
+ help='Configure the AFM used to carry out the calibration',
+ config_class=_AFMConfig),
help='Configure the surface bumps',
help='Number of thermal vibration measurements.',
- _config.FloatSetting(
- name='temperature-sleep',
- help=('Time between temperature measurements (in seconds) to get '
- 'independent measurements when reading from slow sensors.'),
- default=1),
help=('Approximate distance from the surface in meters for the '
--- /dev/null
+"""Temperature measurement tools
+Fairly stubby, since a one shot temperature measurement is a common
+thing. We just wrap that to provide a consistent interface.
+from . import LOG as _LOG
+from .temperature_analyze import analyze as _analyze
+from .temperature_analyze import save as _save
+def acquire(get=None):
+ """Measure the current temperature of the sample,
+ """
+ if get:
+'measure temperature')
+ temperature = get()
+ else:
+ temperature = None
+ return temperature
+def run(get, config, filename, group='/'):
+ """Wrapper around acquire(), analyze(), save().
+ >>> import os
+ >>> import tempfile
+ >>> from import HDF5_Storage, pprint_HDF5
+ >>> from .config import TemperatureConfig
+ >>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='calibcant-')
+ >>> os.close(fd)
+ >>> config = TemperatureConfig()
+ >>> def get():
+ ... return 19.2
+ >>> t = run(
+ ... get=get, config=config, filename=filename, group='/')
+ >>> pprint_HDF5(filename) # doctest: +ELLIPSIS, +REPORT_UDIFF
+ /
+ /config
+ /config/temperature
+ <HDF5 dataset "sleep": shape (), type "<i4">
+ 1
+ /processed
+ <HDF5 dataset "data": shape (), type "<f8">
+ 19.2
+ <HDF5 dataset "units": shape (), type "|S1">
+ K
+ /raw
+ <HDF5 dataset "data": shape (), type "<f8">
+ 19.2
+ <HDF5 dataset "units": shape (), type "|S1">
+ K
+ Cleanup our temporary config file.
+ >>> os.remove(filename)
+ """
+ raw = acquire(get)
+ _LOG.debug('got temperature: {} K'.format(raw))
+ processed = _analyze(config=config, temperature=raw)
+ _save(filename=filename, group=group, config=config, raw=raw,
+ processed=processed)
+ return processed
--- /dev/null
+# calibcant - tools for thermally calibrating AFM cantilevers
+# Copyright (C) 2008-2012 W. Trevor King <>
+# This file is part of calibcant.
+# calibcant is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+# calibcant is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along with
+# calibcant. If not, see <>.
+"""Temperature analysis.
+Separate the more general `temperature.analyze()` from the other
+`temperature.*()` functions in calibcant.
+The relevant physical quantities are:
+* `T` Temperature at which thermal vibration measurements were acquired
+>>> import os
+>>> from pprint import pprint
+>>> import tempfile
+>>> import numpy
+>>> from .config import TemperatureConfig
+>>> from import pprint_HDF5, HDF5_Storage
+>>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='calibcant-')
+>>> os.close(fd)
+>>> config = TemperatureConfig()
+>>> raw = 296.5
+>>> processed = analyze(config=config, temperature=raw)
+>>> plot(raw=raw, processed=processed)
+>>> save(filename=filename, group='/',
+... config=config, raw=raw, processed=processed)
+>>> pprint_HDF5(filename) # doctest: +REPORT_UDIFF
+ /config
+ /config/temperature
+ <HDF5 dataset "sleep": shape (), type "<i4">
+ 1
+ /processed
+ <HDF5 dataset "data": shape (), type "<f8">
+ 296.5
+ <HDF5 dataset "units": shape (), type "|S1">
+ K
+ /raw
+ <HDF5 dataset "data": shape (), type "<f8">
+ 296.5
+ <HDF5 dataset "units": shape (), type "|S1">
+ K
+>>> data = load(filename=filename, group='/')
+>>> pprint(data) # doctest: +ELLIPSIS
+{'config': {'temperature': <TemperatureConfig ...>},
+ 'processed': 296.5,
+ 'raw': 296.5}
+>>> print(data['config']['temperature'].dump())
+sleep: 1.0
+>>> data['raw']
+>>> type(data['raw'])
+<type 'float'>
+>>> data['processed']
+>>> os.remove(filename)
+import h5py as _h5py
+ import matplotlib as _matplotlib
+ import matplotlib.pyplot as _matplotlib_pyplot
+ import time as _time # for timestamping lines on plots
+except (ImportError, RuntimeError), e:
+ _matplotlib = None
+ _matplotlib_import_error = e
+from import HDF5_Storage as _HDF5_Storage
+from import h5_create_group as _h5_create_group
+from . import LOG as _LOG
+from . import package_config as _package_config
+from .config import TemperatureConfig as _TemperatureConfig
+from .util import SaveSpec as _SaveSpec
+from .util import save as _save
+from .util import load as _load
+def analyze(config, temperature, units='Kelvin'):
+ """Convert measured temperature to Kelvin.
+ `temperature` should be a numpy ndarray or scalar. `config`
+ should be a `config._temperatureemperatureConfig` instance.
+ The `units` option is just for fun. The AFM's `get_temperature`
+ method always returns temperatures in Kelvin.
+ """
+ if units == 'Kelvin':
+ return temperature
+ elif units == 'Celsius':
+ return _C2K(temperature)
+ else:
+ raise NotImplementedError()
+def save(filename, group='/', config=None, raw=None, processed=None):
+ specs = [
+ _SaveSpec(item=config, relpath='config/temperature',
+ config=_TemperatureConfig),
+ _SaveSpec(item=raw, relpath='raw', units='K'),
+ _SaveSpec(item=processed, relpath='processed', units='K'),
+ ]
+ _save(filename=filename, group=group, specs=specs)
+def load(filename=None, group='/'):
+ specs = [
+ _SaveSpec(key=('config', 'temperature',), relpath='config/temperature',
+ config=_TemperatureConfig),
+ _SaveSpec(key=('processed',), relpath='processed', units='K'),
+ _SaveSpec(key=('raw',), relpath='raw', units='K'),
+ ]
+ return _load(filename=filename, group=group, specs=specs)
+def plot(raw=None, processed=None):
+ if not _matplotlib:
+ raise _matplotlib_import_error
+ figure = _matplotlib_pyplot.figure()
+ timestamp = _time.strftime('%H%M%S')
+ if raw is None:
+ if processed is None:
+ return # nothing to plot
+ axes1 = None
+ axes2 = figure.add_subplot(1, 1, 1)
+ elif processed is None:
+ axes1 = figure.add_subplot(1, 1, 1)
+ axes2 = None
+ else:
+ axes1 = figure.add_subplot(2, 1, 1)
+ axes2 = figure.add_subplot(2, 1, 2)
+ if axes1:
+ axes1.set_title('Raw Temperatures %s' % timestamp)
+ axes1.plot(raw, label='raw')
+ if axes2:
+ axes2.set_title('Processed Temperatures %s' % timestamp)
+ axes2.plot(processed, label='processed')
+ if hasattr(figure, 'show'):
--- /dev/null
+# Copyright
+"""Useful utilites not related to calibration.
+Currently just a framework for consistently saving/loading calibration
+import h5py as _h5py
+from import HDF5_Storage as _HDF5_Storage
+from import h5_create_group as _h5_create_group
+from . import LOG as _LOG
+class SaveSpec (object):
+ def __init__(self, item=None, relpath='/', key=None, config=False,
+ array=False, units=None, deviation=None):
+ self.item = item
+ self.relpath = relpath
+ self.key = key
+ self.config = config
+ self.array = array
+ self.units = units
+ self.deviation = deviation
+def save(filename=None, group='/', specs=tuple()):
+ f = None
+ storage = _HDF5_Storage()
+ try:
+ if isinstance(group, str):
+ f = _h5py.File(filename, 'a')
+ group = _h5_create_group(f, group)
+ for spec in specs:
+ if spec.item is None:
+ continue
+ cwg = _h5_create_group(group, spec.relpath)
+ if spec.config:
+, group=cwg)
+ continue
+ assert spec.units, spec.item
+ try:
+ del cwg['data']
+ except KeyError:
+ pass
+ try:
+ del cwg['units']
+ except KeyError:
+ pass
+ cwg['data'] = spec.item
+ cwg['units'] = spec.units
+ if spec.deviation is not None:
+ cwg['standard-deviation'] = spec.deviation
+ finally:
+ if f:
+ f.close()
+def load(filename=None, group='/', specs=tuple()):
+ data = {}
+ f = None
+ storage = _HDF5_Storage()
+ try:
+ if isinstance(group, str):
+ f = _h5py.File(filename, 'a')
+ group = _h5_create_group(f, group)
+ for spec in specs:
+ try:
+ cwg = group[spec.relpath]
+ except KeyError:
+ continue
+ d = data
+ for n in spec.key[:-1]:
+ if n not in d:
+ d[n] = {}
+ d = d[n]
+ if spec.config:
+ d[spec.key[-1]] = spec.config(storage=_HDF5_Storage(group=cwg))
+ d[spec.key[-1]].load()
+ continue
+ assert spec.units, spec.key
+ try:
+ if spec.array:
+ d[spec.key[-1]] = cwg['data'][...]
+ else:
+ d[spec.key[-1]] = float(cwg['data'][...])
+ except KeyError:
+ continue
+ except TypeError, e:
+ _LOG.warn('while loading {} from {}: {}'.format(
+ spec.key, cwg['data'], e))
+ raise
+ if spec.key[-1] in d:
+ units_ = cwg['units'][...]
+ assert units_ == spec.units, (units_, spec.units)
+ if spec.deviation is not None:
+ try:
+ d[spec.deviation] = float(
+ cwg['standard-deviation'][...])
+ except KeyError:
+ pass
+ finally:
+ if f:
+ f.close()
+ return data
from pycomedi.utility import Reader as _Reader
from . import LOG as _LOG
-from .vib_analyze import vib_analyze as _vib_analyze
-from .vib_analyze import vib_save as _vib_save
+from .vibration_analyze import analyze as _analyze
+from .vibration_analyze import save as _save
-def vib_acquire(piezo, vibration_config):
+def acquire(piezo, config):
"""Record thermal vibration data for `piezo` at its current position.
piezo a pypiezo.afm.AFMPiezo instance
- vibration_config a .config._VibrationConfig instance
+ config a .config.Config instance
_LOG.debug('prepare vibration aquisition command')
# round up to the nearest power of two, for efficient FFT-ing
n_samps = _ceil_pow_of_two(
- vibration_config['sample-time']*vibration_config['frequency'])
- time = n_samps / vibration_config['frequency']
- scan_period_ns = int(1e9 / vibration_config['frequency'])
+ config['sample-time']*config['frequency'])
+ time = n_samps / config['frequency']
+ scan_period_ns = int(1e9 / config['frequency'])
input_channel = piezo.input_channel_by_name('deflection')
channel =
_LOG.debug('command test %d: %s' % (i,rc))'get %g seconds of vibration data at %g Hz'
- % (vibration_config['sample-time'],
- vibration_config['frequency']))
+ % (config['sample-time'],
+ config['frequency']))
reader = _Reader(channel.subdevice, data)
data = data.reshape((data.size,))
return data
-def vib(piezo, vibration_config, filename, group='/'):
- """Wrapper around vib_acquire(), vib_analyze(), vib_save().
+def run(piezo, config, filename, group='/'):
+ """Wrapper around acquire(), analyze(), save().
>>> import os
>>> import tempfile
>>> from import pprint_HDF5
- >>> from pycomedi.device import Device
- >>> from pycomedi.subdevice import StreamingSubdevice
- >>> from import AnalogChannel
- >>> from pycomedi.constant import AREF, SUBDEVICE_TYPE, UNIT
- >>> from pypiezo.afm import AFMPiezo
- >>> from pypiezo.base import InputChannel
- >>> from pypiezo.config import ChannelConfig
+ >>> from import load_afm
>>> from .config import VibrationConfig
- Setup an `AFMPiezo` instance.
>>> fd,filename = tempfile.mkstemp(suffix='.h5', prefix='calibcant-')
>>> os.close(fd)
- >>> d = Device('/dev/comedi0')
- >>>
- >>> s_in = d.find_subdevice_by_type(,
- ... factory=StreamingSubdevice)
- >>> channel =, factory=AnalogChannel, aref=AREF.diff)
- >>> channel.range = channel.find_range(
- ... unit=UNIT.volt, min=-10, max=10)
- >>> channel_config = ChannelConfig()
- >>> channel_config['name'] = 'deflection'
- >>> c = InputChannel(config=channel_config, channel=channel)
- >>> c.setup_config()
- >>> piezo = AFMPiezo(axes=[], inputs=[c])
+ >>> devices = []
+ >>> afm = load_afm(devices=devices)
+ >>> afm.load_from_config()
Test a vibration:
- >>> vibration_config = VibrationConfig()
- >>> vib(piezo, vibration_config, filename, group='/vibration')
- TODO: replace skipped example data with real-world values
+ >>> config = VibrationConfig()
+ >>> output = run(piezo=afm.piezo, config=config, filename=filename,
+ ... group='/vibration')
+ >>> output # doctest: +SKIP
+ 4.1589771694838657e-05
>>> pprint_HDF5(filename) # doctest: +ELLIPSIS, +REPORT_UDIFF
- <HDF5 dataset "channel": shape (), type "<i4">
+ <HDF5 dataset "analog-reference": shape (), type "|S4">
+ diff
+ <HDF5 dataset "channel": shape (), type "<i...">
<HDF5 dataset "conversion-coefficients": shape (2,), type "<f8">
[ -1.00000000e+01 3.05180438e-04]
[ 0. 3276.75]
<HDF5 dataset "inverse-conversion-origin": shape (), type "<f8">
- <HDF5 dataset "maxdata": shape (), type "<i8">
+ <HDF5 dataset "maxdata": shape (), type "<i...">
<HDF5 dataset "name": shape (), type "|S10">
- <HDF5 dataset "range": shape (), type "<i4">
+ <HDF5 dataset "range": shape (), type "<i...">
- <HDF5 dataset "subdevice": shape (), type "<i4">
+ <HDF5 dataset "subdevice": shape (), type "<i...">
- <HDF5 dataset "chunk-size": shape (), type "<i4">
+ <HDF5 dataset "chunk-size": shape (), type "<i...">
<HDF5 dataset "frequency": shape (), type "<f8">
<HDF5 dataset "overlap": shape (), type "|b1">
- <HDF5 dataset "sample-time": shape (), type "<i4">
+ <HDF5 dataset "sample-time": shape (), type "<i...">
<HDF5 dataset "window": shape (), type "|S4">
- <HDF5 dataset "processed": shape (), type "<f8">
- ...
+ /vibration/processed
+ <HDF5 dataset "data": shape (), type "<f8">
+ ...
+ <HDF5 dataset "units": shape (), type "|S6">
+ V^2/Hz
- <HDF5 dataset "deflection": shape (65536,), type "<u2">
+ <HDF5 dataset "data": shape (65536,), type "<u2">
+ <HDF5 dataset "units": shape (), type "|S4">
+ bits
- Close the Comedi device.
+ Close the Comedi devices.
- >>> d.close()
+ >>> for device in devices:
+ ... device.close()
Cleanup our temporary config file.
deflection_channel_config = deflection_input_channel.config
- deflection = vib_acquire(piezo, vibration_config)
- variance = _vib_analyze(
- deflection, vibration_config, deflection_channel_config)
- _vib_save(
- filename, group, raw_vibration=deflection,
- vibration_config=vibration_config,
+ deflection = acquire(piezo, config)
+ variance = _analyze(
+ deflection, config, deflection_channel_config)
+ _save(
+ filename=filename, group=group, raw=deflection, config=config,
- processed_vibration=variance)
+ processed=variance)
return variance
"""Thermal vibration analysis.
-Separate the more general `vib_analyze()` from the other `vib_*()`
+Separate the more general `analyze()` from the other `vibration`
functions in calibcant.
The relevent physical quantities are :
>>> os.close(fd)
>>> piezo_config = get_piezo_config()
->>> vibration_config = VibrationConfig()
->>> vibration_config['frequency'] = 50e3
+>>> config = VibrationConfig()
+>>> config['frequency'] = 50e3
We'll be generating a test vibration time series with the following
parameters. Make sure these are all floats to avoid accidental
>>> m = 5e-11 # kg
>>> gamma = 1.6e-6 # N*s/m
>>> k = 0.05 # N/m
->>> T = 1/vibration_config['frequency']
+>>> T = 1/config['frequency']
>>> T # doctest: +ELLIPSIS
>>> N = int(2**15) # count
>>> f0 = w0/(2*numpy.pi)
>>> f0 # doctest: +ELLIPSIS
->>> f_nyquist = vibration_config['frequency']/2
+>>> f_nyquist = config['frequency']/2
>>> f_nyquist # doctest: +ELLIPSIS
Analyze the simulated data.
->>> naive_vibration = vib_analyze_naive(deflection)
->>> naive_vibration # doctest: +SKIP
+>>> naive = analyze_naive(deflection)
+>>> naive # doctest: +SKIP
->>> abs(naive_vibration / 136.9e6 - 1) < 0.1
+>>> abs(naive / 136.9e6 - 1) < 0.1
->>> processed_vibration = vib_analyze(
-... deflection_bits, vibration_config,
+>>> processed = analyze(
+... deflection_bits, config,
... piezo_config.select_config('inputs', 'deflection'))
->>> processed_vibration # doctest: +SKIP
+>>> processed # doctest: +SKIP
->>> vib_plot(deflection=deflection_bits, vibration_config=vibration_config)
->>> vib_save(filename=filename, group='/vibration/',
-... raw_vibration=deflection_bits, vibration_config=vibration_config,
+>>> plot(deflection=deflection_bits, config=config)
+>>> save(filename=filename, group='/vibration/',
+... raw=deflection_bits, config=config,
... deflection_channel_config=piezo_config.select_config(
... 'inputs', 'deflection'),
-... processed_vibration=processed_vibration)
+... processed=processed)
>>> pprint_HDF5(filename) # doctest: +ELLIPSIS, +REPORT_UDIFF
+ <HDF5 dataset "analog-reference": shape (), type "|S6">
+ ground
<HDF5 dataset "channel": shape (), type "<i4">
<HDF5 dataset "conversion-coefficients": shape (2,), type "<i4">
<HDF5 dataset "name": shape (), type "|S10">
<HDF5 dataset "range": shape (), type "<i4">
- 1
+ 0
<HDF5 dataset "subdevice": shape (), type "<i4">
<HDF5 dataset "window": shape (), type "|S4">
- <HDF5 dataset "processed": shape (), type "<f8">
- ...
+ /vibration/processed
+ <HDF5 dataset "data": shape (), type "<f8">
+ ...
+ <HDF5 dataset "units": shape (), type "|S6">
+ V^2/Hz
- <HDF5 dataset "deflection": shape (32768,), type "<f8">
+ <HDF5 dataset "data": shape (32768,), type "<f8">
+ <HDF5 dataset "units": shape (), type "|S4">
+ bits
->>> (raw_vibration,vibration_config,deflection_channel_config,
-... processed_vibration) = vib_load(
-... filename=filename, group='/vibration/')
+>>> data = load(filename=filename, group='/vibration/')
->>> processed_vibration # doctest: +SKIP
+>>> pprint(data) # doctest: +ELLIPSIS
+{'config': {'vibration': <InputChannelConfig ...>},
+ 'processed': ...,
+ 'raw': array([...])}
+>>> data['processed'] # doctest: +SKIP
->>> abs(processed_vibration / 136.5e6 - 1) < 0.1
+>>> abs(data['processed'] / 136.5e6 - 1) < 0.1
>>> os.remove(filename)
from import h5_create_group as _h5_create_group
import FFT_tools as _FFT_tools
from pypiezo.base import convert_bits_to_volts as _convert_bits_to_volts
-from pypiezo.config import ChannelConfig as _ChannelConfig
+from pypiezo.config import InputChannelConfig as _InputChannelConfig
from . import LOG as _LOG
from . import package_config as _package_config
from .config import BreitWigner as _BreitWigner
from .config import OffsetBreitWigner as _OffsetBreitWigner
from .config import VibrationConfig as _VibrationConfig
+from .util import SaveSpec as _SaveSpec
+from .util import save as _save
+from .util import load as _load
-def vib_analyze_naive(deflection):
+def analyze_naive(deflection):
"""Calculate the deflection variance in Volts**2.
This method is simple and easy to understand, but it highly
_LOG.debug('naive deflection variance: %g V**2' % var)
return var
-def vib_analyze(deflection, vibration_config, deflection_channel_config,
- plot=False):
+def analyze(deflection, config, deflection_channel_config,
+ plot=False):
"""Calculate the deflection variance in Volts**2.
- Improves on `vib_analyze_naive()` by first converting `Vphoto(t)`
+ Improves on `analyze_naive()` by first converting `Vphoto(t)`
to frequency space, and fitting a Breit-Wigner in the relevant
frequency range (see cantilever_calib.pdf for derivation).
However, there may be cases where the fit is thrown off by noise
deflection Vphoto deflection input in bits.
- vibration_config `.config._VibrationConfig` instance
+ config `.config.VibrationConfig` instance
deflection `pypiezo.config.ChannelConfig` instance
plot boolean overriding matplotlib config setting.
mean = deflection_v.mean()
_LOG.debug('average thermal deflection (Volts): %g' % mean)
- naive_variance = vib_analyze_naive(deflection_v)
- if vibration_config['model'] == _Variance:
+ naive_variance = analyze_naive(deflection_v)
+ if config['model'] == _Variance:
return naive_variance
# Compute the average power spectral density per unit time (in uV**2/Hz)
_LOG.debug('compute the averaged power spectral density in uV**2/Hz')
freq_axis,power = _FFT_tools.unitary_avg_power_spectrum(
- (deflection_v - mean)*1e6, vibration_config['frequency'],
- vibration_config['chunk-size'], vibration_config['overlap'],
- vibration_config['window'])
+ (deflection_v - mean)*1e6, config['frequency'],
+ config['chunk-size'], config['overlap'],
+ config['window'])
A,B,C,D = fit_psd(
freq_axis, power,
- min_frequency=vibration_config['minimum-fit-frequency'],
- max_frequency=vibration_config['maximum-fit-frequency'],
- offset=vibration_config['model'] == _OffsetBreitWigner)
+ min_frequency=config['minimum-fit-frequency'],
+ max_frequency=config['maximum-fit-frequency'],
+ offset=config['model'] == _OffsetBreitWigner)
_LOG.debug('fit PSD(f) = C / ((A**2-f**2)**2 + (f*B)**2) with '
'A = %g, B = %g, C = %g, D = %g' % (A, B, C, D))
if plot or _package_config['matplotlib']:
- vib_plot(deflection, freq_axis, power, A, B, C, D,
- vibration_config=vibration_config)
+ _plot(deflection, freq_axis, power, A, B, C, D, config=config)
# Our A is in uV**2, so convert back to Volts**2
fitted_variance = breit_wigner_area(A,B,C) * 1e-12
_LOG.debug('fitted deflection variance: %g V**2' % fitted_variance)
if _package_config['matplotlib']:
- vib_plot(deflection, freq_axis, power, A, B, C, D,
- vibration_config=vibration_config)
+ plot(deflection, freq_axis, power, A, B, C, D,
+ config=config)
return min(fitted_variance, naive_variance)
# <V(t)**2> = (pi*C) / (2*B*A**2)
return (_numpy.pi * C) / (2 * B * A**2)
-def vib_save(filename, group='/', raw_vibration=None, vibration_config=None,
- deflection_channel_config=None, processed_vibration=None):
- with _h5py.File(filename, 'a') as f:
- cwg = _h5_create_group(f, group)
- if raw_vibration is not None:
- try:
- del cwg['raw/deflection']
- except KeyError:
- pass
- cwg['raw/deflection'] = raw_vibration
- storage = _HDF5_Storage()
- for config,key in [(vibration_config, 'config/vibration'),
- (deflection_channel_config,
- 'config/deflection')]:
- if config is None:
- continue
- config_cwg = _h5_create_group(cwg, key)
-, group=config_cwg)
- if processed_vibration is not None:
- try:
- del cwg['processed']
- except KeyError:
- pass
- cwg['processed'] = processed_vibration
-def vib_load(filename, group='/'):
- assert group.endswith('/')
- raw_vibration = processed_vibration = None
- configs = []
- with _h5py.File(filename, 'a') as f:
- try:
- raw_vibration = f[group+'raw/deflection'][...]
- except KeyError:
- pass
- for Config,key in [(_VibrationConfig, 'config/vibration'),
- (_ChannelConfig, 'config/deflection')]:
- config = Config(storage=_HDF5_Storage(
- filename=filename, group=group+key))
- configs.append(config)
- try:
- processed_vibration = float(f[group+'processed'][...])
- except KeyError:
- pass
- ret = [raw_vibration]
- ret.extend(configs)
- ret.append(processed_vibration)
- for config in configs:
- config.load()
- return tuple(ret)
-def vib_plot(deflection=None, freq_axis=None, power=None, A=None, B=None,
- C=None, D=0, vibration_config=None, analyze=False):
+def save(filename, group='/', raw=None, config=None,
+ deflection_channel_config=None, processed=None):
+ specs = [
+ _SaveSpec(item=config, relpath='config/vibration',
+ config=_VibrationConfig),
+ _SaveSpec(item=deflection_channel_config,
+ relpath='config/deflection',
+ config=_InputChannelConfig),
+ _SaveSpec(item=raw, relpath='raw', units='bits'),
+ _SaveSpec(item=processed, relpath='processed', units='V^2/Hz'),
+ ]
+ _save(filename=filename, group=group, specs=specs)
+def load(filename=None, group='/'):
+ specs = [
+ _SaveSpec(key=('config', 'vibration'), relpath='config/vibration',
+ config=_VibrationConfig),
+ _SaveSpec(key=('config', 'deflection'), relpath='config/deflection',
+ config=_InputChannelConfig),
+ _SaveSpec(key=('raw',), relpath='raw', array=True, units='bits'),
+ _SaveSpec(key=('processed',), relpath='processed', units='V^2/Hz'),
+ ]
+ return _load(filename=filename, group=group, specs=specs)
+def plot(deflection=None, freq_axis=None, power=None, A=None, B=None,
+ C=None, D=0, config=None, analyze=False):
"""Plot 3 subfigures displaying vibration data and analysis.
Time series (Vphoto vs sample index) (show any major drift events),
# highlight the region we're fitting
- vibration_config['minimum-fit-frequency'],
- vibration_config['maximum-fit-frequency'],
+ config['minimum-fit-frequency'],
+ config['maximum-fit-frequency'],
facecolor='g', alpha=0.1, zorder=-2)
if A is not None:
if hasattr(figure, 'show'):
+_plot = plot # alternative name for use inside analyze()