From: W. Trevor King Date: Tue, 17 Jan 2012 22:49:26 +0000 (-0500) Subject: Update to use new h5config, pycomedi, etc. X-Git-Tag: v0.2~26 X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=4615ce495d5fe8df8f767105fff93e67e55ee06d;hp=-c;p=unfold-protein.git Update to use new h5config, pycomedi, etc. --- 4615ce495d5fe8df8f767105fff93e67e55ee06d diff --git a/unfold.py b/unfold.py old mode 100644 new mode 100755 index 570c9fd..b1ca6f8 --- a/unfold.py +++ b/unfold.py @@ -1,520 +1,63 @@ -TEXT_VERBOSE = False -PYLAB_INTERACTIVE_VERBOSE = True -BASE_FIG_NUM = 40 -LOG_DATA = True -LOG_DIR = '$DEFAULT$/unfold' - -import stepper -import piezo.z_piezo as z_piezo -import piezo.z_piezo_utils as z_piezo_utils -import piezo.x_piezo as x_piezo -import temperature -import threading -import time -import os, os.path -from numpy import arange, sin, pi, isnan -import data_logger - -if PYLAB_INTERACTIVE_VERBOSE : - from pylab import figure, plot, title, legend, hold, subplot, draw - import time # for timestamping lines on plots - -EFILE = file(os.path.expanduser('~/rsrch/debug.unfold'), 'a') - -class ExceptionTooClose (Exception) : - """ - The piezo is too close to the surface, - an unfolding would extend past pzMin - """ -class ExceptionTooFar (Exception) : - """ - The piezo has reached pzMax without - reaching the desired deflection setpoint. - """ - -def plot_dict(d, label) : - try : - plot(d["Z piezo output"], d["Deflection input"], '.',label=label) - except KeyError, string: - print d.keys() - raise KeyError, string - -class unfold_data_log (data_logger.data_log) : - def save(self, unfold, timestamp, CTemp, VSetpoint, nmBindPos, - sBindTime, - nmDist, nmStep, ppStep, nmPsRate, freq, - approach_data, unfold_data) : - filepath,timestamp = self.get_filename(timestamp) - # save the unfolding data - dataFile = open(filepath, "w") - for i in range(0, len(unfold_data["Z piezo output"])) : - dataFile.write("%d\t%d\t%d\n" % (unfold_data["Z piezo output"][i], - unfold_data["Deflection input"][i], - unfold_data["Z piezo input"][i])) - dataFile.close() - dataFile = open(filepath+"_approach", "w") - for i in range(0, len(approach_data["Z piezo output"])) : - dataFile.write("%d\t%d\n" % (approach_data["Z piezo output"][i], - approach_data["Deflection input"][i])) - dataFile.close() - - # save parameters - paramFile = open(filepath+"_param", "w") - paramFile.write("Environment\n") - paramFile.write("Time:\t"+timestamp+"\n") - if CTemp != None : - paramFile.write("Temperature (C):\t"+str(CTemp)+"\n") - - paramFile.write("\nPiezo parameters\n") - paramFile.write("Z piezo sensitivity (nm/Vp):\t"+str(unfold.zp.sensitivity)+"\n") - paramFile.write("Z piezo gain (Vp/Vo):\t"+str(unfold.zp.gain)+"\n") - paramFile.write("X piezo sensitivity (nm/Vp):\t"+str(unfold.xp.xpSensitivity)+"\n") - paramFile.write("X piezo gain (Vp/Vo):\t"+str(unfold.xp.gain)+"\n") - paramFile.write("X piezo position (nm):\t"+str(unfold.xp.out2nm(unfold.xp.curPos()))+"\n") - - paramFile.write("\nApproach parameters\n") - paramFile.write("Setpoint (V):\t"+str(VSetpoint)+"\n") - paramFile.write("Bind pos (nm):\t"+str(nmBindPos)+"\n") - - paramFile.write("\nBinding parameters\n") - paramFile.write("Bind time (s):\t"+str(sBindTime)+"\n") - - paramFile.write("\nUnfolding parameters\n") - paramFile.write("Distance (nm):\t"+str(nmDist)+"\n") - paramFile.write("Step size (nm):\t"+str(nmStep)+"\n") - paramFile.write("Points per step:\t"+str(ppStep)+"\n") - paramFile.write("Unfold rate (nm/s):\t"+str(nmPsRate)+"\n") - paramFile.write("Sample rate (Hz):\t"+str(freq)+"\n") - - paramFile.write("\nData fields:\tZ_piezo_out Deflection_in Z_piezo_in\n") - paramFile.close() - - return timestamp - -class unfold : - def __init__(self, controlTemp=True, dataDirectory=LOG_DIR) : - self.step = stepper.stepper_obj() - self.zp = z_piezo.z_piezo() - self.xp = x_piezo.x_piezo() - #self.zp = z_piezo.z_piezo(zp_chan = '/Dev1/ao0', - # zp_mon_chan = '/Dev1/ai1', - # def_chan = '/Dev1/ai0') - #self.xp = x_piezo.x_piezo(xp_chan = '/Dev1/ao1') - self.zp.jumpToPos(self.zp.pos_nm2out(0)) - self.xp.jumpToPos(self.xp.nm2out(0)) - if controlTemp == True : - self.T = temperature.tempController(maxCurrent=1.0) - else: self.T = None - self.log = unfold_data_log(dataDirectory, log_name="unfold") - def unfold(self, setpoint=None, rel_setpoint=1.0, sBindTime = 10.0, - nmDist=600, nmStep=0.05, ppStep=1, nmPsRate=1000, - dataDirectory=LOG_DIR, fileID=None) : - while True : - try : - data = self.unfold_cycle(setpoint, rel_setpoint, sBindTime, - nmDist, nmStep, ppStep, nmPsRate, - dataDirectory, fileID) - break - except ExceptionTooFar : - EFILE.write('caught ExceptionTooFar\n'); EFILE.flush() - try : # try for a useful surface distance... - if setpoint == None : # HACK! redundant! - assert rel_setpoint != None, "must have some sort of setpoint" - setpoint = self.curDef() + rel_setpoint - print "setpoint = %g" % setpoint - EFILE.write('attempt getSurfPos\n'); EFILE.flush() - surfPos = self.getSurfPos(setpoint) - EFILE.write('getSurfPos succeeded\n'); EFILE.flush() - print "Too far (surface at %g nm), stepping closer" % surfPos - except z_piezo_utils.poorFit, string : # ... oh well, print what we know - EFILE.write('getSurfPos failed\n'); EFILE.flush() - print "Too far, stepping closer" - print "(Fit failed with: %s)" % string - EFILE.write('zero Z piezo\n'); EFILE.flush() - self.zp.jumpToPos(self.zp.pos_nm2out(0)) - EFILE.write('step closer\n'); EFILE.flush() - self.stepCloser() - EFILE.write('step closer successful\n'); EFILE.flush() - except ExceptionTooClose : - EFILE.write('caught ExceptionTooFar\n'); EFILE.flush() - try : # try for a useful surface distance... - if setpoint == None : # !HACK !redundant! - assert rel_setpoint != None, "must have some sort of setpoint" - setpoint = self.curDef() + rel_setpoint - print "setpoint = %g" % setpoint - EFILE.write('attempt getSurfPos\n'); EFILE.flush() - surfPos = self.getSurfPos(setpoint) - EFILE.write('getSurfPos succeeded\n'); EFILE.flush() - print "Too close (surface at %g nm), stepping away" % surfPos - except z_piezo_utils.poorFit, string : # ... oh well, print what we know - EFILE.write('getSurfPos failed\n'); EFILE.flush() - print "Too close, stepping away" - print "(Fit failed with: %s)" % string - EFILE.write('zero Z piezo\n'); EFILE.flush() - self.zp.jumpToPos(self.zp.pos_nm2out(0)) - EFILE.write('step away\n'); EFILE.flush() - self.stepAway() - EFILE.write('step away successful\n'); EFILE.flush() - print "Too close, stepping away" - return data - def stepApproach(self, setpoint) : - cd = self.curDef() - while cd < setpoint or isnan(cd) : # sometimes during approach, we get negative nan values - print "deflection %g < setpoint %g. step closer" % (cd, setpoint) - self.stepCloser() - cd = self.curDef() - def stepApproachRetractCurve(self, setpoint) : - def_array = [] - step_array = [] - orig_step = self.step.get_cur_pos() - def store_point() : - cd = self.curDef() - def_array.append(cd) - step_array.append(self.step.get_cur_pos()) - return cd - cd = store_point() - while cd < setpoint or isnan(cd) : # sometimes during approach, we get negative nan values - print "deflection %g < setpoint %g. step closer" % (cd, setpoint) - self.step.step_rel(2) - cd = store_point() - while self.step.get_cur_pos() > orig_step : - self.step.step_rel(-2) - store_point() - return {"Deflection":def_array, "Stepper position":step_array} - def piezoApproach(self, setpoint, return_data=False) : - startPos = self.zp.curPos() - curVals,data = z_piezo_utils.moveToPosOrDef(self.zp, - self.zp.zpMax, - self.zp.def_V2in(setpoint), - step=10, - return_data=return_data) - if curVals["Deflection input"] < self.zp.def_V2in(setpoint) : - EFILE.write('Unfolding too far\n'); EFILE.flush() - self.zp.jumpToPos(startPos) - self.zp.updateInputs() - if PYLAB_INTERACTIVE_VERBOSE == True : - figure(BASE_FIG_NUM+1) - hold(False) - plot_dict(data, 'Approach') - hold(True) - title('Unfolding too far') - EFILE.write('Raising ExceptionTooFar\n'); EFILE.flush() - raise ExceptionTooFar - if return_data == True : - return (curVals, data) - else : - return curVals - def bind(self, sTime=10.0) : - time.sleep(sTime) - def unfold_pull(self, nmDist=600, nmStep=0.5, ppStep=10, nmPsRate=1000) : - if nmStep < 0 : - nmStep = -nmStep - numSteps = int(nmDist/nmStep+1) - out = [0.0]*numSteps*ppStep - pos = self.zp.curPos() - startPos = pos - step = int(self.zp.pos_nm2out(nmStep)-self.zp.pos_nm2out(0)) - for i in range(0, numSteps) : - for j in range(0, ppStep) : - out[i*ppStep + j] = pos - pos -= step - # points/step * step/nm * nm/s = points/s - freq = ppStep / nmStep * nmPsRate - print "unfolding %d points per %d steps from %d to %d at freq of %g" % (ppStep, numSteps, startPos, pos, freq) - return {'freq':freq, 'data':self.zp.ramp(out, freq)} - def generate_refold_bind_pull_output(self, nmSurfPos=0, - sBindTime=2, nmUnfoldDist=160, nmRefoldDist=145, sPauseTime=2, - cycles=50, nmStep=0.5, ppStepUnfold=10, ppStepRefold=1, - nmPsRateUnfold=1000) : - - if nmStep < 0 :## - nmStep = -nmStep - numSteps = int(nmDist/nmStep+1) - out = [0.0]*numSteps*ppStep - pos = self.zp.curPos() - startPos = pos - step = int(self.zp.pos_nm2out(nmStep)-self.zp.pos_nm2out(0)) - for i in range(0, numSteps) : - for j in range(0, ppStep) : - out[i*ppStep + j] = pos - pos -= step - # points/step * step/nm * nm/s = points/s - freq = ppStep / nmStep * nmPsRate - print "unfolding %d points per %d steps from %d to %d at freq of %g" % (ppStep, numSteps, startPos, pos, freq) - return {'freq':freq, 'data':self.zp.ramp(out, freq)} - def unfold_cycle(self, setpoint=None, rel_setpoint=1.0, sBindTime = 10.0, - nmDist=600, nmStep=0.5, ppStep=10, nmPsRate=1000, - dataDirectory=LOG_DIR, fileID=None) : - if setpoint == None : - assert rel_setpoint != None, "must have some sort of setpoint" - setpoint = self.curDef() + rel_setpoint - print "setpoint = %g" % setpoint - timestamp = time.strftime("%Y%m%d%H%M%S") - temp = None - if self.T != None : temp = self.T.getTemp() - print "approaching" - startPos = self.zp.curPos() - curVals,approach_data = self.piezoApproach(setpoint, return_data=True) - bindPos = curVals['Z piezo output'] # in output units - finalPos = bindPos - (self.zp.pos_nm2out(nmDist)-self.zp.pos_nm2out(0)) - try : - self.zp._check_range(finalPos) - except z_piezo.outOfRange : - self.zp.jumpToPos(startPos) - self.zp.updateInputs() - if PYLAB_INTERACTIVE_VERBOSE == True : - figure(BASE_FIG_NUM+1) - hold(False) - plot_dict(approach_data, 'Approach') - hold(True) - title('Unfolding too close') - raise ExceptionTooClose - print "binding for %.3f seconds" % sBindTime - self.bind(sBindTime) - out = self.unfold_pull(nmDist, nmStep, ppStep, nmPsRate) - - if PYLAB_INTERACTIVE_VERBOSE == True : - figure(BASE_FIG_NUM) - hold(False) - plot_dict(approach_data, 'Approach') - hold(True) - plot_dict(out['data'], 'Unfold') - legend(loc='best') - title('Unfolding') - draw() - - if LOG_DATA == True: - print "saving" - timestamp = self.log.save(self, timestamp, temp, setpoint, - int(self.zp.pos_out2nm(bindPos)), # don't need lots of precision... - sBindTime, - nmDist, nmStep, ppStep, nmPsRate, out['freq'], - approach_data, out['data']) - return out - def getSurfPos(self, setpoint=2, textVerbose=False, plotVerbose=False) : - return self.zp.pos_out2nm(z_piezo_utils.getSurfPos(self.zp, self.zp.def_V2in(setpoint), textVerbose, plotVerbose)) - def curDef(self) : - self.zp.updateInputs() - return self.zp.def_in2V(self.zp.curDef()) - def stepCloser(self) : - "Backlash-robust stepping" - self.step.step_rel(2) # two half-steps in full step mode - def stepAway(self) : - "Backlash-robust stepping" - self.step.step_rel(-120) - self.step.step_rel(110) # HACK, should come back 118 - def xpWander(self) : - self.xp.wander() - -def _test_unfold(controlTemp=True) : - print "Test unfold" - u = unfold(controlTemp=controlTemp) - u.unfold(setpoint=0.5, sBindTime=1.0) - del u - print "unfold successful\n" - - -class unfold_expt : - def __init__(self, controlTemp=True) : - self.u = unfold(controlTemp=controlTemp) - self.runUnfolds = False - self.bgRun = None - self.getExternalTempSetpoint = None - if self.u.T != None : - self.tempSetpoint = self.u.T.getTemp() - self.getExternalDeflectionSetpoint = None - self.deflectionSetpoint = 0.5 - self.getExternalBindTime = None - self.bindTime = 1.0 - self.getExternalnmDist = None - self.nmDist = 600.0 - self.getExternalnmStep = None - self.nmStep = 0.5 - self.getExternalppStep = None - self.ppStep = 10 - self.getExternalnmPsRate = None - self.nmPsRate = 1000.0 - self.getExternalDataDirectory = None - self.dataDirectory = LOG_DIR - self.plotData = None - self.i=0 - self.showUnfoldingIndex = None - def run(self) : - print "running" - if self.bgRun != None : - raise Exception, "Can't run two backgrounds simultaneously" - self.runUnfolds = True - print "unfolding in the background" - self.bgRun = bg_run( self._run) - def stepApproach(self) : - print "approaching" - if self.getExternalDeflectionSetpoint != None : - self.deflectionSetpoint = self.getExternalDeflectionSetpoint() - self.u.stepApproach(self.deflectionSetpoint) - def _run(self) : - print "starting unfold loop" - while self.runUnfolds == True : - print "get unfold parameters" - if self.u.T != None and self.getExternalTempSetpoint != None : - setpoint = self.getExternalTempSetpoint() - if setpoint != self.tempSetpoint : - self.u.T.setTemp(setpoint) - self.tempSetpoint = setpoint - if self.getExternalDeflectionSetpoint != None : - self.deflectionSetpoint = self.getExternalDeflectionSetpoint() - if self.getExternalBindTime != None : - self.bindTime = self.getExternalBindTime() - if self.getExternalnmDist != None : - self.nmDist = self.getExternalnmDist() - if self.getExternalnmStep != None : - self.nmStep = self.getExternalnmStep() - if self.getExternalppStep != None : - self.ppStep = self.getExternalppStep() - if self.getExternalnmPsRate != None : - self.nmPsRate = self.getExternalnmPsRate() - if self.getExternalDataDirectory != None : - self.dataDirectory = self.getExternalDataDirectory() - print "run unfold" - #print "setpoint ", self.deflectionSetpoint - #print "bind time ", self.bindTime - #print "nmDist ", self.nmDist - #print "nmStep ", self.nmStep - #print "ppStep ", self.ppStep - #print "nmPsRate ", self.nmPsRate - #print "fileID ", self.i - #print "dataDir ", self.dataDirectory - data = self.u.unfold(setpoint=self.deflectionSetpoint, - sBindTime=self.bindTime, - nmDist=self.nmDist, - nmStep=self.nmStep, - ppStep=self.ppStep, - nmPsRate=self.nmPsRate, - fileID=str(self.i), - dataDirectory=self.dataDirectory) - print "plot data" - if self.plotData != None : - self.plotData(data["data"]["Z piezo output"], - data["data"]["Deflection input"]) - if self.showUnfoldingIndex != None : - self.showUnfoldingIndex(self.i) - self.i += 1 - return False - def stop(self) : - if self.bgRun != None : - self.runUnfolds = False - self.bgRun.wait() - self.bgRun = None - -def _test_unfold_expt(controlTemp=True) : - print "Test unfold_expt" - u_expt = unfold.unfold_expt(controlTemp=controlTemp) - u_expt.run() - time.sleep(100) - u_expt.stop() - print "unfold_expt working\n" - -class bg_run : - def __init__(self, function, args=None, finishCallback=None, interruptCallback=None) : - print "init bg" - def tempFn() : - print "temp fn started" - complete = False - if args == None : - complete = function() - else : - complete = function(args) - print "temp Fn finished" - if finishCallback != None and complete == True : - finishCallback() - elif interruptCallback != None and complete == False : - interruptCallback() - print "tempFn defined" - self.thd = threading.Thread(target=tempFn, name="Background Fn") - print "starting thread" - self.thd.start() - print "thread started" - def wait(self) : - print "joining thread" - self.thd.join() - print "thread joined" - -def _test_bg_run() : - print "Test bg_run" - print "test without args or callbacks" - def say_hello() : - for i in range(10) : - print "Hello" - time.sleep(0.1) - return True - bg = unfold.bg_run(say_hello) - time.sleep(.2) - print "The main thread's still going" - bg.wait() - print "The background process is done" - - print "test without args, but with callbacks" - def inter() : - print "I was interrupted" - def fin() : - print "I'm finished" - bg = unfold.bg_run(say_hello, finishCallback=fin, interruptCallback=inter) - time.sleep(.2) - print "The main thread's still going" - bg.wait() - print "The background process is done" - - print "test with args and callbacks" - del say_hello - def say_hello(stop=[False]) : - for i in range(10) : - if stop[0] == True : return False - print "Hello" - time.sleep(0.1) - return True - stop = [False] - bg = unfold.bg_run(say_hello, args=stop, finishCallback=fin, interruptCallback=inter) - time.sleep(.2) - print "The main thread's still going, stop it" - stop[0] = True - bg.wait() - del bg - del stop - del say_hello - del inter - del fin - print "The background process is done" - - print "bg_run working\n" - -def loop_rates(u, rates, num_loops=10, die_file=None, song=None, **kwargs): - """ - loop_rates(u, rates, num_loops=10, die_file=None, song=None, **kwargs) - - Constant speed unfolding using the unfold instance u for num_loops - through the series of rates listed in rates. You may set die_file - to a path, loop_rates() will check that location before each - unfolding attempt, and cleanly stop unfolding if the file exists. - **kwargs are passed on to u.unfold(), so a full call might look like - - loop_rates(u, rates=[20,200,2e3], num_loops=5, die_file='~/die', song='~wking/Music/system/towerclo.wav', rel_setpoint=1, nmDist=800, sBindTime=2) - """ - if die_file != None: - die_file = os.path.expanduser(die_file) - if song != None: - song = os.path.expanduser(song) - for i in range(num_loops): - for nmPsRate in rates: - if die_file != None and os.path.exists(die_file): - return None - u.unfold(nmPsRate=nmPsRate, **kwargs) - u.xpWander() - if song != None: - os.system("aplay '%s'" % song) - -def test() : - _test_unfold(controlTemp=False) - _test_bg_run() - _test_unfold_expt(controlTemp=False) - -if __name__ == "__main__" : - test() +#!/usr/bin/env python +# +# Copyright (C) 2011 W. Trevor King +# +# This file is part of unfold_protein. +# +# Unfold_protein is free software: you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. +# +# Unfold_protein is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with unfold_protein. If not, see +# . + +"""Run a mechanical protein unfolding experiment.""" + +import os +import os.path + +from unfold_protein import __version__ as version +from unfold_protein.afm import get_afm +from unfold_protein.unfolder import Unfolder +from unfold_protein.scan import UnfoldScanner +import unfold_protein.config as _config + + +if __name__ == '__main__': + from argparse import ArgumentParser + + parser = ArgumentParser( + description='Play a pure tone', version=version) + parser.add_argument( + '-s', '--song', + help='Path to a song to play when the experiment is complete') + + args = parser.parse_args() + + unfold_config = _config.UnfoldCycleConfig() + unfold_config['temperature'] = _config.TemperatureConfig() + unfold_config['approach'] = _config.ApproachConfig() + unfold_config['unfold'] = _config.UnfoldConfig() + unfold_config['save'] = _config.SaveConfig() + scan_config = _config.ScanConfig() + scan_config['velocity'] = _config.VelocityScanConfig() + scan_config['position'] = _config.PositionScanConfig() + + afm,comedi_device = get_afm(with_temperature=False) + unfolder = Unfolder(config=unfold_config, afm=afm) + scanner = UnfoldScanner(config=scan_config, unfolder=unfolder) + try: + scanner.run() + finally: + scanner.move_far_from_surface() + comedi_device.close() + if args.song: + song = os.path.abspath(os.path.expanduser(args.song)) + os.system("aplay '%s'" % song) diff --git a/unfold_protein/__init__.py b/unfold_protein/__init__.py new file mode 100644 index 0000000..0c235c0 --- /dev/null +++ b/unfold_protein/__init__.py @@ -0,0 +1,26 @@ +# Copyright (C) 2011 W. Trevor King +# +# This file is part of unfold_protein. +# +# Unfold_protein is free software: you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. +# +# Unfold_protein is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with unfold_protein. If not, see +# . + +from .config import PackageConfig as _PackageConfig + + +__version__ = '0.2' + + +package_config = _PackageConfig(package_name=__name__) +package_config.load_system() diff --git a/unfold_protein/afm.py b/unfold_protein/afm.py new file mode 100644 index 0000000..b2256ab --- /dev/null +++ b/unfold_protein/afm.py @@ -0,0 +1,97 @@ +# Copyright (C) 2011 W. Trevor King +# +# This file is part of unfold_protein. +# +# Unfold_protein is free software: you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. +# +# Unfold_protein is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with unfold_protein. If not, see +# . + +from pyafm.afm import AFM as _AFM +from pycomedi.channel import AnalogChannel as _AnalogChannel +from pycomedi.channel import DigitalChannel as _DigitalChannel +from pycomedi.device import Device as _Device +from pycomedi.subdevice import StreamingSubdevice as _StreamingSubdevice +from pypiezo.afm import AFMPiezo as _AFMPiezo +from stepper import Stepper as _Stepper +import pycomedi.constant as _pycomedi_constant +import pypiezo.base as _pypiezo_base +import pypiezo.config as _pypiezo_config + +try: + from .temperature import Temperature as _Temperature +except ImportError, _temperature_import_error: + _Temperature = None + + +def get_afm(with_temperature=True, logger=None): + d = _Device('/dev/comedi0') + d.open() + s_in = d.find_subdevice_by_type( + _pycomedi_constant.SUBDEVICE_TYPE.ai, factory=_StreamingSubdevice) + s_out = d.find_subdevice_by_type( + _pycomedi_constant.SUBDEVICE_TYPE.ao, factory=_StreamingSubdevice) + z_axis_channel = s_out.channel( + 0, factory=_AnalogChannel, + aref=_pycomedi_constant.AREF.ground) + x_axis_channel = s_out.channel( + 1, factory=_AnalogChannel, + aref=_pycomedi_constant.AREF.ground) + input_channel = s_in.channel( + 0, factory=_AnalogChannel, + aref=_pycomedi_constant.AREF.diff) + for chan in [z_axis_channel, x_axis_channel, input_channel]: + chan.range = chan.find_range( + unit=_pycomedi_constant.UNIT.volt, min=-10, max=10) + z_axis_config = _pypiezo_config.AxisConfig() + z_axis_config.update({'gain':20, 'sensitivity':6.5e-9}) + z_axis_channel_config = _pypiezo_config.ChannelConfig() + z_axis_config['channel'] = z_axis_channel_config + z_axis_channel_config['name'] = 'z' + x_axis_config = _pypiezo_config.AxisConfig() + x_axis_config.update({'gain':20, 'sensitivity':6.5e-9}) # TODO: GET FROM CALIBRATION + x_axis_channel_config = _pypiezo_config.ChannelConfig() + x_axis_config['channel'] = x_axis_channel_config + x_axis_channel_config['name'] = 'x' + input_channel_config = _pypiezo_config.ChannelConfig() + input_channel_config['name'] = 'deflection' + z = _pypiezo_base.PiezoAxis( + config=z_axis_config, axis_channel=z_axis_channel) + z.setup_config() + x = _pypiezo_base.PiezoAxis( + config=x_axis_config, axis_channel=x_axis_channel) + x.setup_config() + inp = _pypiezo_base.InputChannel( + config=input_channel_config, channel=input_channel) + inp.setup_config() + piezo = _AFMPiezo(axes=[x, z], inputs=[inp]) + + s_d = d.find_subdevice_by_type( + _pycomedi_constant.SUBDEVICE_TYPE.dio) + d_channels = [s_d.channel(i, factory=_DigitalChannel) + for i in (0, 1, 2, 3)] + for chan in d_channels: + chan.dio_config( + _pycomedi_constant.IO_DIRECTION.output) + def write(value): + s_d.dio_bitfield(bits=value, write_mask=2**4-1) + stepper = _Stepper(write=write, delay=5e-2) + + if with_temperature: + if _Temperature is None: + raise _temperature_import_error + temperature = _Temperature() + else: + temperature = None + + afm = _AFM(piezo, stepper, temperature=temperature) + return (afm, d) diff --git a/unfold_protein/config.py b/unfold_protein/config.py new file mode 100644 index 0000000..9190203 --- /dev/null +++ b/unfold_protein/config.py @@ -0,0 +1,167 @@ +# Copyright (C) 2011 W. Trevor King +# +# This file is part of unfold_protein. +# +# Unfold_protein is free software: you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. +# +# Unfold_protein is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with unfold_protein. If not, see +# . + +"""Define some variables to configure the package for a particular lab +and workflow.""" + +import os.path as _os_path +import sys as _sys + +from FFT_tools import window_hann as _window_hann +import h5config.config as _config +import h5config.tools as _h5config_tools +import numpy as _numpy +from calibcant.config import TemperatureConfig + + +class PackageConfig (_h5config_tools.PackageConfig): + "Configure `unfold_protein` module operation" + settings = _h5config_tools.PackageConfig.settings + [ + _config.BooleanSetting( + name='matplotlib', + help='Plot piezo motion using `matplotlib`.', + default=False), + _config.FloatSetting( + name='temperature', + help=('Default temperature for unfolding in degrees ' + 'Celsius.'), + default=22), + ] + +class ApproachConfig (_config.Config): + "Configure `unfold_protein` approach operation" + settings = [ + _config.FloatSetting( + name='relative setpoint', + help=('Maximum relative deflection in volts to achieve the bind ' + 'position.'), + default=2.0), + _config.FloatSetting( + name='velocity', + help='Approach velocity in meters/second.', + default=1e-6), + _config.FloatSetting( + name='step', + help='Step size in meters.', + default=5e-9), + _config.IntegerSetting( + name='far', + help=('Approximate distance in meters to move away to get "far" ' + 'from the surface. For possible stepper adjustments while ' + 'initially locating the surface.'), + default=1e-5), + ] + +class UnfoldConfig (_config.Config): + "Configure `unfold_protein` unfold operation" + settings = [ + _config.FloatSetting( + name='distance', + help='Unfolding distance in meters.', + default=800e-9), + _config.FloatSetting( + name='velocity', + help='Unfolding velocity in meters/second.', + default=1e-6), + _config.FloatSetting( + name='frequency', + help='Sampling frequency in Hz.', + default=50e3), + ] + +class SaveConfig (_config.Config): + "Configure `unfold_protein` unfold operation" + settings = [ + _config.Setting( + name='base directory', + help='Root directory for saving data.', + default=_os_path.expanduser('~/rsrch/data/unfold/')), + ] + +class UnfoldCycleConfig (_config.Config): + "Configure a full `unfold_protein` approach-bind-unfold cycle" + settings = [ + _config.ConfigSetting( + name='temperature', + help='Configure the temperature measurements', + config_class=TemperatureConfig), + _config.ConfigSetting( + name='approach', + help=('Configure the approach for an approach-bind-unfold ' + 'sequence.'), + config_class=ApproachConfig), + _config.FloatSetting( + name='bind time', + help=('Binding time in seconds for an approach-bind-unfold ' + 'sequence.'), + default=3), + _config.ConfigSetting( + name='unfold', + help=('Configure the unfolding for an approach-bind-unfold ' + 'sequence.'), + config_class=UnfoldConfig), + _config.ConfigSetting( + name='save', + help='Configure saving.', + config_class=SaveConfig), + ] + +class VelocityScanConfig (_config.Config): + "Configure `unfold_config` unfolding velocity scan" + settings = [ + _config.FloatListSetting( + name='unfolding velocities', + help='Unfolding velocities in meters per second.', + default=_numpy.exp(_numpy.linspace( + _numpy.log(20e-9), _numpy.log(2e-6), 10))), + _config.IntegerSetting( + name='num loops', + help='Number of loops through the scanned velocities.', + default=10), + ] + +class PositionScanConfig (_config.Config): + "Configure `unfold_config` contact position scan" + settings = [ + _config.FloatSetting( + name='x step', + help=('Distance in meters along the x axis between successive ' + 'onfoldings.'), + default=5e-9), + _config.FloatSetting( + name='x max', + help='Maximum sampled x position in meters.', + default=1e-6), + _config.FloatSetting( + name='x min', + help='Minimum sampled x position in meters.', + default=-1e-6), + ] + +class ScanConfig (_config.Config): + "Configure a full `unfold_protein` approach-bind-unfold cycle" + settings = [ + _config.ConfigSetting( + name='velocity', + help='Configure unfolding velocity scan pattern.', + config_class=VelocityScanConfig), + _config.ConfigSetting( + name='position', + help='Configure unfolding position scan pattern.', + config_class=PositionScanConfig), + ] diff --git a/unfold_protein/scan.py b/unfold_protein/scan.py new file mode 100644 index 0000000..efbfb0d --- /dev/null +++ b/unfold_protein/scan.py @@ -0,0 +1,93 @@ +# Copyright (C) 2011 W. Trevor King +# +# This file is part of unfold_protein. +# +# Unfold_protein is free software: you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. +# +# Unfold_protein is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with unfold_protein. If not, see +# . + +"""Define `UnfoldScanner` for sequential unfolding experiments.""" + +import signal as _signal + +from calibcant.calibrate import move_far_from_surface +import pypiezo.base as _pypiezo_base + +from . import LOG as _LOG +from .unfolder import ExceptionTooFar as _ExceptionTooFar +from .unfolder import ExceptionTooClose as _ExceptionTooClose + +class UnfoldScanner (object): + def __init__(self, config, unfolder): + self.config = config + self.unfolder = unfolder + self._state = {'x direction': 1} + + def run(self): + self._stop = False + _signal.signal(_signal.SIGTERM, self._handle_stop_signal) + for i in range(self.config['velocity']['num loops']): + for velocity in self.config['velocity']['unfolding velocities']: + if self._stop: + return + self.unfolder.config['unfold']['velocity'] = velocity + try: + self.unfolder.run() + except _ExceptionTooFar: + self.stepper_approach() + except _ExceptionTooClose: + self.move_far_from_surface() + self.stepper_approach() + else: + self.position_scan_step() + + def _handle_stop_signal(self, signal, frame): + self._stop = True + + def move_far_from_surface(self): + _LOG.info('retract with the stepper motor') + move_far_from_surface( + stepper=self.unfolder.afm.stepper, + distance=self.unfolder.config['approach']['far']) + + def stepper_approach(self): + config = self.unfolder.config['approach'] + deflection = self.unfolder.read_deflection() + setpoint = deflection + config['relative setpoint'] + def_config = self.unfolder.afm.piezo.config.select_config( + 'inputs', 'deflection') + setpoint_bits = _pypiezo_base.convert_volts_to_bits( + def_config, setpoint) + self.unfolder.afm.stepper_approach(target_deflection=setpoint_bits) + + def position_scan_step(self): + axis_name = 'x' + config = self.config['position'] + axis_config = self.unfolder.afm.piezo.config.select_config( + 'axes', self.unfolder.afm.axis_name, + get_attribute=_pypiezo_base.get_axis_name + ) + pos = self.unfolder.afm.piezo.last_output[axis_name] + pos_m = _pypiezo_base.convert_bits_to_meters(axis_config, pos) + next_pos_m = pos_m + self._state['x direction']*config['x step'] + if next_pos_m > config['x max']: + self._state['x direction'] = -1 + next_pos_m = pos_m + self._state['x direction']*config['x step'] + elif next_pos_m < config['x min']: + self._state['x direction'] = 1 + next_pos_m = pos_m + self._state['x direction']*config['x step'] + next_pos = _pypiezo_base.convert_meters_to_bits( + axis_config, next_pos_m) + _LOG.info('move {} from {:g} to {:g} bits'.format( + axis_name, pos, next_pos)) + self.unfolder.afm.piezo.jump(axis_name, next_pos) diff --git a/unfold_protein/temperature.py b/unfold_protein/temperature.py new file mode 100644 index 0000000..91c1c6f --- /dev/null +++ b/unfold_protein/temperature.py @@ -0,0 +1,33 @@ +# Copyright (C) 2011 W. Trevor King +# +# This file is part of unfold_protein. +# +# Unfold_protein is free software: you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. +# +# Unfold_protein is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with unfold_protein. If not, see +# . + +from pypid.backend.melcor import MelcorBackend as _TemperatureBackend + +from . import LOG as _LOG + + +class Temperature (_TemperatureBackend): + def __init__(self, **kwargs): + _LOG.debug('setup temperature monitor') + super(Temperature, self).__init__(self, **kwargs) + self.set_max_mv(max=1) # amp + + def get_temperature(self): + temp = self.get_pv() + 273.15 # return temperature in kelvin + _LOG.info('measured temperature of {:g} K'.format(temp)) + return temp diff --git a/unfold_protein/unfolder.py b/unfold_protein/unfolder.py new file mode 100755 index 0000000..ae835f6 --- /dev/null +++ b/unfold_protein/unfolder.py @@ -0,0 +1,205 @@ +# Copyright (C) 2011 W. Trevor King +# +# This file is part of unfold_protein. +# +# Unfold_protein is free software: you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. +# +# Unfold_protein is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with unfold_protein. If not, see +# . + +"""Define classes for carrying out an unfolding cycle with an AFM.""" + +import email.utils as _email_utils +import os.path as _os_path +import time as _time + +import h5py as _h5py +import pypiezo.base as _pypiezo_base +from h5config.storage.hdf5 import HDF5_Storage as _HDF5_Storage +from h5config.storage.hdf5 import h5_create_group as _h5_create_group + +from . import LOG as _LOG +from . import package_config as _package_config + +try: + import numpy as _numpy + from matplotlib import pyplot as _pyplot + FIGURE = _pyplot.figure() +except (ImportError, RuntimeError), _matplotlib_import_error: + _pyplot = None +# from pylab import figure, plot, title, legend, hold, subplot, draw + + +class ExceptionTooClose (Exception): + """ + The piezo is too close to the surface. + """ + pass + +class ExceptionTooFar (Exception): + """ + The piezo is too far from the surface. + """ + pass + + +class Unfolder (object): + def __init__(self, config, afm): + self.config = config + self.afm = afm + self.zero_piezo() + + def run(self): + """Approach-bind-unfold-save[-plot] cycle. + """ + ret = {} + ret['timestamp'] = _email_utils.formatdate(localtime=True) + ret['temperature'] = self.afm.get_temperature() + ret['approach'] = self._approach() + self._bind() + ret['unfold'] = self._unfold() + self._save(**ret) + if _package_config['matplotlib']: + self._plot(**ret) + return ret + + def _approach(self): + """Approach the surface using the piezo + + Steps in until a given setpoint is reached. + """ + config = self.config['approach'] + deflection = self.read_deflection() + setpoint = deflection + config['relative setpoint'] + _LOG.info('approach with setpoint = {}'.format(setpoint)) + axis_config = self.afm.piezo.config.select_config( + 'axes', self.afm.axis_name, + get_attribute=_pypiezo_base.get_axis_name + ) + def_config = self.afm.piezo.config.select_config( + 'inputs', 'deflection') + start_pos = self.afm.piezo.last_output[self.afm.axis_name] + + # calculate parameters for move_to_pos_or_def from config + setpoint_bits = _pypiezo_base.convert_volts_to_bits( + def_config, setpoint) + mid_pos_bits = _pypiezo_base.convert_meters_to_bits( + axis_config, 0) + step_pos_bits = _pypiezo_base.convert_meters_to_bits( + axis_config, config['step']) + step_bits = step_pos_bits - mid_pos_bits + frequency = config['velocity'] / config['step'] + + # run the approach + data = self.afm.piezo.move_to_pos_or_def( + axis_name=self.afm.axis_name, deflection=setpoint_bits, + step=step_bits, frequency=frequency, return_data=True) + data['setpoint'] = setpoint + # check the output + if data['deflection'][-1] < setpoint_bits: + _LOG.info('unfolding too far from the surface') + self.afm.piezo.jump(self.afm.axis_name, start_pos) + #if PYLAB_INTERACTIVE_VERBOSE == True: + # figure(BASE_FIG_NUM+1) + # hold(False) + # plot_dict(data, 'Approach') + # hold(True) + # title('Unfolding too far') + _LOG.debug('raising ExceptionTooFar') + raise ExceptionTooFar + return data + + def _bind(self): + """Wait on the surface while the protein binds.""" + time = self.config['bind time'] + _LOG.info('binding for {:.3f} seconds'.format(time)) + _time.sleep(time) + + def _unfold(self): + """Pull the bound protein, forcing unfolding events.""" + config = self.config['unfold'] + velocity = config['velocity'] + _LOG.info('unfold at {:g} m/s'.format(velocity)) + axis = self.afm.piezo.axis_by_name(self.afm.axis_name) + d = self.afm.piezo.channel_by_name('deflection') + start_pos = self.afm.piezo.last_output[self.afm.axis_name] + start_pos_m = _pypiezo_base.convert_bits_to_meters(axis, start_pos) + final_pos_m = bind_pos_m - config['distance'] + final_pos = _pypiezo_base.convert_meters_to_bits(axis, final_pos_m) + dtype = afm.piezo.channel_dtype(self.afm.axis_name, direction='output') + num_steps = int( + config['distance'] / config['velocity'] * config['frequency']) + 1 + # (m) * (s/m) * (samples/s) + out = _numpy.linspace( + start_pos, final_pos, num_steps).astype(dtype=dtype) + _LOG.debug( + 'unfolding from {:d} to {:d} in {:d} steps at {:g} Hz'.format( + start_pos, final_pos, num_steps, config['frequency'])) + data = afm.piezo.ramp( + data=out, frequency=config['frequency'], + output_names=[self.afm.axis_name], input_names=['deflection']) + return {self.afm.axis_name:out, 'deflection':data} + + def _save(self, temperature, approach, unfold, timestamp): + config = self.config['save'] + time_tuple = _email_utils.parsedate(timestamp) + filename = _os_path.join( + config['base directory'], + '{0}-{1:02d}-{2:02d}_{3:02d}-{4:02d}-{5:02d}.h5'.format( + time_tuple)) + _LOG.info('saving {}'.format(filename)) + with _h5py.File(filename, 'a') as f: + storage = _HDF5_Storage() + config_cwg = _h5_create_group(f, 'config') + storage.save(config=self.config, group=config_cwg) + f['timestamp'] = timestamp + f['temperature'] = temperature + for k,v in approach.items(): + f['approach/{}'.format(k)] = v + for k,v in unfold.items(): + f['unfold/{}'.format(k)] = v + + def _plot(self, temperature, approach, unfold, timestamp): + "Plot the unfolding cycle" + if not _matplotlib: + raise _matplotlib_import_error + FIGURE.clear() + # TODO... + #if PYLAB_INTERACTIVE_VERBOSE == True: + # figure(BASE_FIG_NUM) + # hold(False) + # plot_dict(approach_data, 'Approach') + # hold(True) + # plot_dict(out['data'], 'Unfold') + # legend(loc='best') + # title('Unfolding') + # draw() + + def zero_piezo(self): + _LOG.info('zero piezo') + x_mid_pos = _pypiezo_base.convert_volts_to_bits( + self.afm.piezo.config.select_config( + 'axes', 'x', get_attribute=_pypiezo_base.get_axis_name + )['channel'], + 0) + z_mid_pos = _pypiezo_base.convert_volts_to_bits( + self.afm.piezo.config.select_config( + 'axes', 'z', get_attribute=_pypiezo_base.get_axis_name + )['channel'], + 0) + self.afm.piezo.jump('z', z_mid_pos) + self.afm.piezo.jump('x', x_mid_pos) + + def read_deflection(self): + bits = self.afm.piezo.read_deflection() + return _pypiezo_base.convert_bits_to_volts( + self.afm.piezo.config.select_config('inputs', 'deflection'), bits)