From 837218048897e877a1578379818bac6b82bf12c8 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Wed, 3 Jun 2009 15:19:40 -0400 Subject: [PATCH] Began versioning. --- unfold.py | 513 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 513 insertions(+) create mode 100644 unfold.py diff --git a/unfold.py b/unfold.py new file mode 100644 index 0000000..fddcbde --- /dev/null +++ b/unfold.py @@ -0,0 +1,513 @@ +TEXT_VERBOSE = False +PYLAB_INTERACTIVE_VERBOSE = True +BASE_FIG_NUM = 40 +LOG_DATA = True +LOG_DIR = '$DEFAULT$/unfold' + +import stepper +import z_piezo +import z_piezo_utils +import x_piezo +import temperature +import threading +import time +import os, os.path +from numpy import arange, sin, pi, isnan +import data_logger + +if PYLAB_INTERACTIVE_VERBOSE : + from pylab import figure, plot, title, legend, hold, subplot, draw + import time # for timestamping lines on plots + +EFILE = file(os.path.expanduser('~/rsrch/debug.unfold'), 'a') + +class ExceptionTooClose (Exception) : + """ + The piezo is too close to the surface, + an unfolding would extend past pzMin + """ +class ExceptionTooFar (Exception) : + """ + The piezo has reached pzMax without + reaching the desired deflection setpoint. + """ + +def plot_dict(d, label) : + try : + plot(d["Z piezo output"], d["Deflection input"], '.',label=label) + except KeyError, string: + print d.keys() + raise KeyError, string + +class unfold_data_log (data_logger.data_log) : + def save(self, unfold, timestamp, CTemp, VSetpoint, nmBindPos, + sBindTime, + nmDist, nmStep, ppStep, nmPsRate, freq, + approach_data, unfold_data) : + filepath,timestamp = self.get_filename(timestamp) + # save the unfolding data + dataFile = open(filepath, "w") + for i in range(0, len(unfold_data["Z piezo output"])) : + dataFile.write("%d\t%d\t%d\n" % (unfold_data["Z piezo output"][i], + unfold_data["Deflection input"][i], + unfold_data["Z piezo input"][i])) + dataFile.close() + dataFile = open(filepath+"_approach", "w") + for i in range(0, len(approach_data["Z piezo output"])) : + dataFile.write("%d\t%d\n" % (approach_data["Z piezo output"][i], + approach_data["Deflection input"][i])) + dataFile.close() + + # save parameters + paramFile = open(filepath+"_param", "w") + paramFile.write("Environment\n") + paramFile.write("Time:\t"+timestamp+"\n") + if CTemp != None : + paramFile.write("Temperature (C):\t"+str(CTemp)+"\n") + + paramFile.write("\nPiezo parameters\n") + paramFile.write("Z piezo sensitivity (nm/Vp):\t"+str(unfold.zp.sensitivity)+"\n") + paramFile.write("Z piezo gain (Vp/Vo):\t"+str(unfold.zp.gain)+"\n") + paramFile.write("X piezo sensitivity (nm/Vp):\t"+str(unfold.xp.xpSensitivity)+"\n") + paramFile.write("X piezo gain (Vp/Vo):\t"+str(unfold.xp.gain)+"\n") + paramFile.write("X piezo position (nm):\t"+str(unfold.xp.out2nm(unfold.xp.curPos()))+"\n") + + paramFile.write("\nApproach parameters\n") + paramFile.write("Setpoint (V):\t"+str(VSetpoint)+"\n") + paramFile.write("Bind pos (nm):\t"+str(nmBindPos)+"\n") + + paramFile.write("\nBinding parameters\n") + paramFile.write("Bind time (s):\t"+str(sBindTime)+"\n") + + paramFile.write("\nUnfolding parameters\n") + paramFile.write("Distance (nm):\t"+str(nmDist)+"\n") + paramFile.write("Step size (nm):\t"+str(nmStep)+"\n") + paramFile.write("Points per step:\t"+str(ppStep)+"\n") + paramFile.write("Unfold rate (nm/s):\t"+str(nmPsRate)+"\n") + paramFile.write("Sample rate (Hz):\t"+str(freq)+"\n") + + paramFile.write("\nData fields:\tZ_piezo_out Deflection_in Z_piezo_in\n") + paramFile.close() + + return timestamp + +class unfold : + def __init__(self, controlTemp=True, dataDirectory=LOG_DIR) : + self.step = stepper.stepper_obj() + self.zp = z_piezo.z_piezo() + self.xp = x_piezo.x_piezo() + #self.zp = z_piezo.z_piezo(zp_chan = '/Dev1/ao0', + # zp_mon_chan = '/Dev1/ai1', + # def_chan = '/Dev1/ai0') + #self.xp = x_piezo.x_piezo(xp_chan = '/Dev1/ao1') + self.zp.jumpToPos(self.zp.pos_nm2out(0)) + self.xp.jumpToPos(self.xp.nm2out(0)) + if controlTemp == True : + self.T = temperature.tempController(maxCurrent=1.0) + else: self.T = None + self.log = unfold_data_log(dataDirectory, log_name="unfold") + def unfold(self, setpoint=None, rel_setpoint=1.0, sBindTime = 10.0, + nmDist=600, nmStep=0.5, ppStep=10, nmPsRate=1000, + dataDirectory=LOG_DIR, fileID=None) : + while True : + try : + data = self.unfold_cycle(setpoint, rel_setpoint, sBindTime, + nmDist, nmStep, ppStep, nmPsRate, + dataDirectory, fileID) + break + except ExceptionTooFar : + EFILE.write('caught ExceptionTooFar\n'); EFILE.flush() + try : # try for a useful surface distance... + if setpoint == None : # HACK! redundant! + assert rel_setpoint != None, "must have some sort of setpoint" + setpoint = self.curDef() + rel_setpoint + print "setpoint = %g" % setpoint + EFILE.write('attempt getSurfPos\n'); EFILE.flush() + surfPos = self.getSurfPos(setpoint) + EFILE.write('getSurfPos succeeded\n'); EFILE.flush() + print "Too far (surface at %g nm), stepping closer" % surfPos + except z_piezo_utils.poorFit, string : # ... oh well, print what we know + EFILE.write('getSurfPos failed\n'); EFILE.flush() + print "Too far, stepping closer" + print "(Fit failed with: %s)" % string + EFILE.write('zero Z piezo\n'); EFILE.flush() + self.zp.jumpToPos(self.zp.pos_nm2out(0)) + EFILE.write('step closer\n'); EFILE.flush() + self.stepCloser() + EFILE.write('step closer successful\n'); EFILE.flush() + except ExceptionTooClose : + try : # try for a useful surface distance... + if setpoint == None : # !HACK !redundant! + assert rel_setpoint != None, "must have some sort of setpoint" + setpoint = self.curDef() + rel_setpoint + print "setpoint = %g" % setpoint + surfPos = self.getSurfPos(setpoint) + print "Too close (surface at %g nm), stepping away" % surfPos + except z_piezo_utils.poorFit, string : # ... oh well, print what we know + print "Too close, stepping away" + print "(Fit failed with: %s)" % string + self.zp.jumpToPos(self.zp.pos_nm2out(0)) + self.stepAway() + print "Too close, stepping away" + return data + def stepApproach(self, setpoint) : + cd = self.curDef() + while cd < setpoint or isnan(cd) : # sometimes during approach, we get negative nan values + print "deflection %g < setpoint %g. step closer" % (cd, setpoint) + self.stepCloser() + cd = self.curDef() + def stepApproachRetractCurve(self, setpoint) : + def_array = [] + step_array = [] + orig_step = self.step.get_cur_pos() + def store_point() : + cd = self.curDef() + def_array.append(cd) + step_array.append(self.step.get_cur_pos()) + return cd + cd = store_point() + while cd < setpoint or isnan(cd) : # sometimes during approach, we get negative nan values + print "deflection %g < setpoint %g. step closer" % (cd, setpoint) + self.step.step_rel(2) + cd = store_point() + while self.step.get_cur_pos() > orig_step : + self.step.step_rel(-2) + store_point() + return {"Deflection":def_array, "Stepper position":step_array} + def piezoApproach(self, setpoint, return_data=False) : + startPos = self.zp.curPos() + curVals,data = z_piezo_utils.moveToPosOrDef(self.zp, + self.zp.zpMax, + self.zp.def_V2in(setpoint), + step=10, + return_data=return_data) + if curVals["Deflection input"] < self.zp.def_V2in(setpoint) : + EFILE.write('Unfolding too far\n'); EFILE.flush() + self.zp.jumpToPos(startPos) + self.zp.updateInputs() + if PYLAB_INTERACTIVE_VERBOSE == True : + figure(BASE_FIG_NUM+1) + hold(False) + plot_dict(data, 'Approach') + hold(True) + title('Unfolding too far') + EFILE.write('Raising ExceptionTooFar\n'); EFILE.flush() + raise ExceptionTooFar + if return_data == True : + return (curVals, data) + else : + return curVals + def bind(self, sTime=10.0) : + time.sleep(sTime) + def unfold_pull(self, nmDist=600, nmStep=0.5, ppStep=10, nmPsRate=1000) : + if nmStep < 0 : + nmStep = -nmStep + numSteps = int(nmDist/nmStep+1) + out = [0.0]*numSteps*ppStep + pos = self.zp.curPos() + startPos = pos + step = int(self.zp.pos_nm2out(nmStep)-self.zp.pos_nm2out(0)) + for i in range(0, numSteps) : + for j in range(0, ppStep) : + out[i*ppStep + j] = pos + pos -= step + # points/step * step/nm * nm/s = points/s + freq = ppStep / nmStep * nmPsRate + print "unfolding %d points per %d steps from %d to %d at freq of %g" % (ppStep, numSteps, startPos, pos, freq) + return {'freq':freq, 'data':self.zp.ramp(out, freq)} + def generate_refold_bind_pull_output(self, nmSurfPos=0, + sBindTime=2, nmUnfoldDist=160, nmRefoldDist=145, sPauseTime=2, + cycles=50, nmStep=0.5, ppStepUnfold=10, ppStepRefold=1, + nmPsRateUnfold=1000) : + + if nmStep < 0 :## + nmStep = -nmStep + numSteps = int(nmDist/nmStep+1) + out = [0.0]*numSteps*ppStep + pos = self.zp.curPos() + startPos = pos + step = int(self.zp.pos_nm2out(nmStep)-self.zp.pos_nm2out(0)) + for i in range(0, numSteps) : + for j in range(0, ppStep) : + out[i*ppStep + j] = pos + pos -= step + # points/step * step/nm * nm/s = points/s + freq = ppStep / nmStep * nmPsRate + print "unfolding %d points per %d steps from %d to %d at freq of %g" % (ppStep, numSteps, startPos, pos, freq) + return {'freq':freq, 'data':self.zp.ramp(out, freq)} + def unfold_cycle(self, setpoint=None, rel_setpoint=1.0, sBindTime = 10.0, + nmDist=600, nmStep=0.5, ppStep=10, nmPsRate=1000, + dataDirectory=LOG_DIR, fileID=None) : + if setpoint == None : + assert rel_setpoint != None, "must have some sort of setpoint" + setpoint = self.curDef() + rel_setpoint + print "setpoint = %g" % setpoint + timestamp = time.strftime("%Y%m%d%H%M%S") + temp = None + if self.T != None : temp = self.T.getTemp() + print "approaching" + startPos = self.zp.curPos() + curVals,approach_data = self.piezoApproach(setpoint, return_data=True) + bindPos = curVals['Z piezo output'] # in output units + finalPos = bindPos - (self.zp.pos_nm2out(nmDist)-self.zp.pos_nm2out(0)) + try : + self.zp._check_range(finalPos) + except z_piezo.outOfRange : + self.zp.jumpToPos(startPos) + self.zp.updateInputs() + if PYLAB_INTERACTIVE_VERBOSE == True : + figure(BASE_FIG_NUM+1) + hold(False) + plot_dict(approach_data, 'Approach') + hold(True) + title('Unfolding too close') + raise ExceptionTooClose + print "binding for %.3f seconds" % sBindTime + self.bind(sBindTime) + out = self.unfold_pull(nmDist, nmStep, ppStep, nmPsRate) + + if PYLAB_INTERACTIVE_VERBOSE == True : + figure(BASE_FIG_NUM) + hold(False) + plot_dict(approach_data, 'Approach') + hold(True) + plot_dict(out['data'], 'Unfold') + legend(loc='best') + title('Unfolding') + draw() + + if LOG_DATA == True: + print "saving" + timestamp = self.log.save(self, timestamp, temp, setpoint, + int(self.zp.pos_out2nm(bindPos)), # don't need lots of precision... + sBindTime, + nmDist, nmStep, ppStep, nmPsRate, out['freq'], + approach_data, out['data']) + return out + def getSurfPos(self, setpoint=2, textVerbose=False, plotVerbose=False) : + return self.zp.pos_out2nm(z_piezo_utils.getSurfPos(self.zp, self.zp.def_V2in(setpoint), textVerbose, plotVerbose)) + def curDef(self) : + self.zp.updateInputs() + return self.zp.def_in2V(self.zp.curDef()) + def stepCloser(self) : + "Backlash-robust stepping" + self.step.step_rel(2) # two half-steps in full step mode + def stepAway(self) : + "Backlash-robust stepping" + self.step.step_rel(-120) + self.step.step_rel(110) # HACK, should come back 118 + def xpWander(self) : + self.xp.wander() + +def _test_unfold(controlTemp=True) : + print "Test unfold" + u = unfold(controlTemp=controlTemp) + u.unfold(setpoint=0.5, sBindTime=1.0) + del u + print "unfold successful\n" + + +class unfold_expt : + def __init__(self, controlTemp=True) : + self.u = unfold(controlTemp=controlTemp) + self.runUnfolds = False + self.bgRun = None + self.getExternalTempSetpoint = None + if self.u.T != None : + self.tempSetpoint = self.u.T.getTemp() + self.getExternalDeflectionSetpoint = None + self.deflectionSetpoint = 0.5 + self.getExternalBindTime = None + self.bindTime = 1.0 + self.getExternalnmDist = None + self.nmDist = 600.0 + self.getExternalnmStep = None + self.nmStep = 0.5 + self.getExternalppStep = None + self.ppStep = 10 + self.getExternalnmPsRate = None + self.nmPsRate = 1000.0 + self.getExternalDataDirectory = None + self.dataDirectory = LOG_DIR + self.plotData = None + self.i=0 + self.showUnfoldingIndex = None + def run(self) : + print "running" + if self.bgRun != None : + raise Exception, "Can't run two backgrounds simultaneously" + self.runUnfolds = True + print "unfolding in the background" + self.bgRun = bg_run( self._run) + def stepApproach(self) : + print "approaching" + if self.getExternalDeflectionSetpoint != None : + self.deflectionSetpoint = self.getExternalDeflectionSetpoint() + self.u.stepApproach(self.deflectionSetpoint) + def _run(self) : + print "starting unfold loop" + while self.runUnfolds == True : + print "get unfold parameters" + if self.u.T != None and self.getExternalTempSetpoint != None : + setpoint = self.getExternalTempSetpoint() + if setpoint != self.tempSetpoint : + self.u.T.setTemp(setpoint) + self.tempSetpoint = setpoint + if self.getExternalDeflectionSetpoint != None : + self.deflectionSetpoint = self.getExternalDeflectionSetpoint() + if self.getExternalBindTime != None : + self.bindTime = self.getExternalBindTime() + if self.getExternalnmDist != None : + self.nmDist = self.getExternalnmDist() + if self.getExternalnmStep != None : + self.nmStep = self.getExternalnmStep() + if self.getExternalppStep != None : + self.ppStep = self.getExternalppStep() + if self.getExternalnmPsRate != None : + self.nmPsRate = self.getExternalnmPsRate() + if self.getExternalDataDirectory != None : + self.dataDirectory = self.getExternalDataDirectory() + print "run unfold" + #print "setpoint ", self.deflectionSetpoint + #print "bind time ", self.bindTime + #print "nmDist ", self.nmDist + #print "nmStep ", self.nmStep + #print "ppStep ", self.ppStep + #print "nmPsRate ", self.nmPsRate + #print "fileID ", self.i + #print "dataDir ", self.dataDirectory + data = self.u.unfold(setpoint=self.deflectionSetpoint, + sBindTime=self.bindTime, + nmDist=self.nmDist, + nmStep=self.nmStep, + ppStep=self.ppStep, + nmPsRate=self.nmPsRate, + fileID=str(self.i), + dataDirectory=self.dataDirectory) + print "plot data" + if self.plotData != None : + self.plotData(data["data"]["Z piezo output"], + data["data"]["Deflection input"]) + if self.showUnfoldingIndex != None : + self.showUnfoldingIndex(self.i) + self.i += 1 + return False + def stop(self) : + if self.bgRun != None : + self.runUnfolds = False + self.bgRun.wait() + self.bgRun = None + +def _test_unfold_expt(controlTemp=True) : + print "Test unfold_expt" + u_expt = unfold.unfold_expt(controlTemp=controlTemp) + u_expt.run() + time.sleep(100) + u_expt.stop() + print "unfold_expt working\n" + +class bg_run : + def __init__(self, function, args=None, finishCallback=None, interruptCallback=None) : + print "init bg" + def tempFn() : + print "temp fn started" + complete = False + if args == None : + complete = function() + else : + complete = function(args) + print "temp Fn finished" + if finishCallback != None and complete == True : + finishCallback() + elif interruptCallback != None and complete == False : + interruptCallback() + print "tempFn defined" + self.thd = threading.Thread(target=tempFn, name="Background Fn") + print "starting thread" + self.thd.start() + print "thread started" + def wait(self) : + print "joining thread" + self.thd.join() + print "thread joined" + +def _test_bg_run() : + print "Test bg_run" + print "test without args or callbacks" + def say_hello() : + for i in range(10) : + print "Hello" + time.sleep(0.1) + return True + bg = unfold.bg_run(say_hello) + time.sleep(.2) + print "The main thread's still going" + bg.wait() + print "The background process is done" + + print "test without args, but with callbacks" + def inter() : + print "I was interrupted" + def fin() : + print "I'm finished" + bg = unfold.bg_run(say_hello, finishCallback=fin, interruptCallback=inter) + time.sleep(.2) + print "The main thread's still going" + bg.wait() + print "The background process is done" + + print "test with args and callbacks" + del say_hello + def say_hello(stop=[False]) : + for i in range(10) : + if stop[0] == True : return False + print "Hello" + time.sleep(0.1) + return True + stop = [False] + bg = unfold.bg_run(say_hello, args=stop, finishCallback=fin, interruptCallback=inter) + time.sleep(.2) + print "The main thread's still going, stop it" + stop[0] = True + bg.wait() + del bg + del stop + del say_hello + del inter + del fin + print "The background process is done" + + print "bg_run working\n" + +def loop_rates(u, rates, num_loops=10, die_file=None, song=None, **kwargs): + """ + loop_rates(u, rates, num_loops=10, die_file=None, song=None, **kwargs) + + Constant speed unfolding using the unfold instance u for num_loops + through the series of rates listed in rates. You may set die_file + to a path, loop_rates() will check that location before each + unfolding attempt, and cleanly stop unfolding if the file exists. + **kwargs are passed on to u.unfold(), so a full call might look like + + loop_rates(u, rates=[20,200,2e3], num_loops=5, die_file='~/die', song='~wking/Music/system/towerclo.wav', rel_setpoint=1, nmDist=800, sBindTime=2) + """ + if die_file != None: + die_file = os.path.expanduser(die_file) + if song != None: + song = os.path.expanduser(song) + for i in range(num_loops): + for nmPsRate in rates: + if die_file != None and os.path.exists(die_file): + return None + u.unfold(nmPsRate=nmPsRate, **kwargs) + u.xpWander() + if song != None: + os.system("aplay '%s'" % song) + +def test() : + _test_unfold(controlTemp=False) + _test_bg_run() + _test_unfold_expt(controlTemp=False) + +if __name__ == "__main__" : + test() -- 2.26.2