--- /dev/null
+TEXT_VERBOSE = False
+PYLAB_INTERACTIVE_VERBOSE = True
+BASE_FIG_NUM = 40
+LOG_DATA = True
+LOG_DIR = '$DEFAULT$/unfold'
+
+import stepper
+import z_piezo
+import z_piezo_utils
+import x_piezo
+import temperature
+import threading
+import time
+import os, os.path
+from numpy import arange, sin, pi, isnan
+import data_logger
+
+if PYLAB_INTERACTIVE_VERBOSE :
+ from pylab import figure, plot, title, legend, hold, subplot, draw
+ import time # for timestamping lines on plots
+
+EFILE = file(os.path.expanduser('~/rsrch/debug.unfold'), 'a')
+
+class ExceptionTooClose (Exception) :
+ """
+ The piezo is too close to the surface,
+ an unfolding would extend past pzMin
+ """
+class ExceptionTooFar (Exception) :
+ """
+ The piezo has reached pzMax without
+ reaching the desired deflection setpoint.
+ """
+
+def plot_dict(d, label) :
+ try :
+ plot(d["Z piezo output"], d["Deflection input"], '.',label=label)
+ except KeyError, string:
+ print d.keys()
+ raise KeyError, string
+
+class unfold_data_log (data_logger.data_log) :
+ def save(self, unfold, timestamp, CTemp, VSetpoint, nmBindPos,
+ sBindTime,
+ nmDist, nmStep, ppStep, nmPsRate, freq,
+ approach_data, unfold_data) :
+ filepath,timestamp = self.get_filename(timestamp)
+ # save the unfolding data
+ dataFile = open(filepath, "w")
+ for i in range(0, len(unfold_data["Z piezo output"])) :
+ dataFile.write("%d\t%d\t%d\n" % (unfold_data["Z piezo output"][i],
+ unfold_data["Deflection input"][i],
+ unfold_data["Z piezo input"][i]))
+ dataFile.close()
+ dataFile = open(filepath+"_approach", "w")
+ for i in range(0, len(approach_data["Z piezo output"])) :
+ dataFile.write("%d\t%d\n" % (approach_data["Z piezo output"][i],
+ approach_data["Deflection input"][i]))
+ dataFile.close()
+
+ # save parameters
+ paramFile = open(filepath+"_param", "w")
+ paramFile.write("Environment\n")
+ paramFile.write("Time:\t"+timestamp+"\n")
+ if CTemp != None :
+ paramFile.write("Temperature (C):\t"+str(CTemp)+"\n")
+
+ paramFile.write("\nPiezo parameters\n")
+ paramFile.write("Z piezo sensitivity (nm/Vp):\t"+str(unfold.zp.sensitivity)+"\n")
+ paramFile.write("Z piezo gain (Vp/Vo):\t"+str(unfold.zp.gain)+"\n")
+ paramFile.write("X piezo sensitivity (nm/Vp):\t"+str(unfold.xp.xpSensitivity)+"\n")
+ paramFile.write("X piezo gain (Vp/Vo):\t"+str(unfold.xp.gain)+"\n")
+ paramFile.write("X piezo position (nm):\t"+str(unfold.xp.out2nm(unfold.xp.curPos()))+"\n")
+
+ paramFile.write("\nApproach parameters\n")
+ paramFile.write("Setpoint (V):\t"+str(VSetpoint)+"\n")
+ paramFile.write("Bind pos (nm):\t"+str(nmBindPos)+"\n")
+
+ paramFile.write("\nBinding parameters\n")
+ paramFile.write("Bind time (s):\t"+str(sBindTime)+"\n")
+
+ paramFile.write("\nUnfolding parameters\n")
+ paramFile.write("Distance (nm):\t"+str(nmDist)+"\n")
+ paramFile.write("Step size (nm):\t"+str(nmStep)+"\n")
+ paramFile.write("Points per step:\t"+str(ppStep)+"\n")
+ paramFile.write("Unfold rate (nm/s):\t"+str(nmPsRate)+"\n")
+ paramFile.write("Sample rate (Hz):\t"+str(freq)+"\n")
+
+ paramFile.write("\nData fields:\tZ_piezo_out Deflection_in Z_piezo_in\n")
+ paramFile.close()
+
+ return timestamp
+
+class unfold :
+ def __init__(self, controlTemp=True, dataDirectory=LOG_DIR) :
+ self.step = stepper.stepper_obj()
+ self.zp = z_piezo.z_piezo()
+ self.xp = x_piezo.x_piezo()
+ #self.zp = z_piezo.z_piezo(zp_chan = '/Dev1/ao0',
+ # zp_mon_chan = '/Dev1/ai1',
+ # def_chan = '/Dev1/ai0')
+ #self.xp = x_piezo.x_piezo(xp_chan = '/Dev1/ao1')
+ self.zp.jumpToPos(self.zp.pos_nm2out(0))
+ self.xp.jumpToPos(self.xp.nm2out(0))
+ if controlTemp == True :
+ self.T = temperature.tempController(maxCurrent=1.0)
+ else: self.T = None
+ self.log = unfold_data_log(dataDirectory, log_name="unfold")
+ def unfold(self, setpoint=None, rel_setpoint=1.0, sBindTime = 10.0,
+ nmDist=600, nmStep=0.5, ppStep=10, nmPsRate=1000,
+ dataDirectory=LOG_DIR, fileID=None) :
+ while True :
+ try :
+ data = self.unfold_cycle(setpoint, rel_setpoint, sBindTime,
+ nmDist, nmStep, ppStep, nmPsRate,
+ dataDirectory, fileID)
+ break
+ except ExceptionTooFar :
+ EFILE.write('caught ExceptionTooFar\n'); EFILE.flush()
+ try : # try for a useful surface distance...
+ if setpoint == None : # HACK! redundant!
+ assert rel_setpoint != None, "must have some sort of setpoint"
+ setpoint = self.curDef() + rel_setpoint
+ print "setpoint = %g" % setpoint
+ EFILE.write('attempt getSurfPos\n'); EFILE.flush()
+ surfPos = self.getSurfPos(setpoint)
+ EFILE.write('getSurfPos succeeded\n'); EFILE.flush()
+ print "Too far (surface at %g nm), stepping closer" % surfPos
+ except z_piezo_utils.poorFit, string : # ... oh well, print what we know
+ EFILE.write('getSurfPos failed\n'); EFILE.flush()
+ print "Too far, stepping closer"
+ print "(Fit failed with: %s)" % string
+ EFILE.write('zero Z piezo\n'); EFILE.flush()
+ self.zp.jumpToPos(self.zp.pos_nm2out(0))
+ EFILE.write('step closer\n'); EFILE.flush()
+ self.stepCloser()
+ EFILE.write('step closer successful\n'); EFILE.flush()
+ except ExceptionTooClose :
+ try : # try for a useful surface distance...
+ if setpoint == None : # !HACK !redundant!
+ assert rel_setpoint != None, "must have some sort of setpoint"
+ setpoint = self.curDef() + rel_setpoint
+ print "setpoint = %g" % setpoint
+ surfPos = self.getSurfPos(setpoint)
+ print "Too close (surface at %g nm), stepping away" % surfPos
+ except z_piezo_utils.poorFit, string : # ... oh well, print what we know
+ print "Too close, stepping away"
+ print "(Fit failed with: %s)" % string
+ self.zp.jumpToPos(self.zp.pos_nm2out(0))
+ self.stepAway()
+ print "Too close, stepping away"
+ return data
+ def stepApproach(self, setpoint) :
+ cd = self.curDef()
+ while cd < setpoint or isnan(cd) : # sometimes during approach, we get negative nan values
+ print "deflection %g < setpoint %g. step closer" % (cd, setpoint)
+ self.stepCloser()
+ cd = self.curDef()
+ def stepApproachRetractCurve(self, setpoint) :
+ def_array = []
+ step_array = []
+ orig_step = self.step.get_cur_pos()
+ def store_point() :
+ cd = self.curDef()
+ def_array.append(cd)
+ step_array.append(self.step.get_cur_pos())
+ return cd
+ cd = store_point()
+ while cd < setpoint or isnan(cd) : # sometimes during approach, we get negative nan values
+ print "deflection %g < setpoint %g. step closer" % (cd, setpoint)
+ self.step.step_rel(2)
+ cd = store_point()
+ while self.step.get_cur_pos() > orig_step :
+ self.step.step_rel(-2)
+ store_point()
+ return {"Deflection":def_array, "Stepper position":step_array}
+ def piezoApproach(self, setpoint, return_data=False) :
+ startPos = self.zp.curPos()
+ curVals,data = z_piezo_utils.moveToPosOrDef(self.zp,
+ self.zp.zpMax,
+ self.zp.def_V2in(setpoint),
+ step=10,
+ return_data=return_data)
+ if curVals["Deflection input"] < self.zp.def_V2in(setpoint) :
+ EFILE.write('Unfolding too far\n'); EFILE.flush()
+ self.zp.jumpToPos(startPos)
+ self.zp.updateInputs()
+ if PYLAB_INTERACTIVE_VERBOSE == True :
+ figure(BASE_FIG_NUM+1)
+ hold(False)
+ plot_dict(data, 'Approach')
+ hold(True)
+ title('Unfolding too far')
+ EFILE.write('Raising ExceptionTooFar\n'); EFILE.flush()
+ raise ExceptionTooFar
+ if return_data == True :
+ return (curVals, data)
+ else :
+ return curVals
+ def bind(self, sTime=10.0) :
+ time.sleep(sTime)
+ def unfold_pull(self, nmDist=600, nmStep=0.5, ppStep=10, nmPsRate=1000) :
+ if nmStep < 0 :
+ nmStep = -nmStep
+ numSteps = int(nmDist/nmStep+1)
+ out = [0.0]*numSteps*ppStep
+ pos = self.zp.curPos()
+ startPos = pos
+ step = int(self.zp.pos_nm2out(nmStep)-self.zp.pos_nm2out(0))
+ for i in range(0, numSteps) :
+ for j in range(0, ppStep) :
+ out[i*ppStep + j] = pos
+ pos -= step
+ # points/step * step/nm * nm/s = points/s
+ freq = ppStep / nmStep * nmPsRate
+ print "unfolding %d points per %d steps from %d to %d at freq of %g" % (ppStep, numSteps, startPos, pos, freq)
+ return {'freq':freq, 'data':self.zp.ramp(out, freq)}
+ def generate_refold_bind_pull_output(self, nmSurfPos=0,
+ sBindTime=2, nmUnfoldDist=160, nmRefoldDist=145, sPauseTime=2,
+ cycles=50, nmStep=0.5, ppStepUnfold=10, ppStepRefold=1,
+ nmPsRateUnfold=1000) :
+
+ if nmStep < 0 :##
+ nmStep = -nmStep
+ numSteps = int(nmDist/nmStep+1)
+ out = [0.0]*numSteps*ppStep
+ pos = self.zp.curPos()
+ startPos = pos
+ step = int(self.zp.pos_nm2out(nmStep)-self.zp.pos_nm2out(0))
+ for i in range(0, numSteps) :
+ for j in range(0, ppStep) :
+ out[i*ppStep + j] = pos
+ pos -= step
+ # points/step * step/nm * nm/s = points/s
+ freq = ppStep / nmStep * nmPsRate
+ print "unfolding %d points per %d steps from %d to %d at freq of %g" % (ppStep, numSteps, startPos, pos, freq)
+ return {'freq':freq, 'data':self.zp.ramp(out, freq)}
+ def unfold_cycle(self, setpoint=None, rel_setpoint=1.0, sBindTime = 10.0,
+ nmDist=600, nmStep=0.5, ppStep=10, nmPsRate=1000,
+ dataDirectory=LOG_DIR, fileID=None) :
+ if setpoint == None :
+ assert rel_setpoint != None, "must have some sort of setpoint"
+ setpoint = self.curDef() + rel_setpoint
+ print "setpoint = %g" % setpoint
+ timestamp = time.strftime("%Y%m%d%H%M%S")
+ temp = None
+ if self.T != None : temp = self.T.getTemp()
+ print "approaching"
+ startPos = self.zp.curPos()
+ curVals,approach_data = self.piezoApproach(setpoint, return_data=True)
+ bindPos = curVals['Z piezo output'] # in output units
+ finalPos = bindPos - (self.zp.pos_nm2out(nmDist)-self.zp.pos_nm2out(0))
+ try :
+ self.zp._check_range(finalPos)
+ except z_piezo.outOfRange :
+ self.zp.jumpToPos(startPos)
+ self.zp.updateInputs()
+ if PYLAB_INTERACTIVE_VERBOSE == True :
+ figure(BASE_FIG_NUM+1)
+ hold(False)
+ plot_dict(approach_data, 'Approach')
+ hold(True)
+ title('Unfolding too close')
+ raise ExceptionTooClose
+ print "binding for %.3f seconds" % sBindTime
+ self.bind(sBindTime)
+ out = self.unfold_pull(nmDist, nmStep, ppStep, nmPsRate)
+
+ if PYLAB_INTERACTIVE_VERBOSE == True :
+ figure(BASE_FIG_NUM)
+ hold(False)
+ plot_dict(approach_data, 'Approach')
+ hold(True)
+ plot_dict(out['data'], 'Unfold')
+ legend(loc='best')
+ title('Unfolding')
+ draw()
+
+ if LOG_DATA == True:
+ print "saving"
+ timestamp = self.log.save(self, timestamp, temp, setpoint,
+ int(self.zp.pos_out2nm(bindPos)), # don't need lots of precision...
+ sBindTime,
+ nmDist, nmStep, ppStep, nmPsRate, out['freq'],
+ approach_data, out['data'])
+ return out
+ def getSurfPos(self, setpoint=2, textVerbose=False, plotVerbose=False) :
+ return self.zp.pos_out2nm(z_piezo_utils.getSurfPos(self.zp, self.zp.def_V2in(setpoint), textVerbose, plotVerbose))
+ def curDef(self) :
+ self.zp.updateInputs()
+ return self.zp.def_in2V(self.zp.curDef())
+ def stepCloser(self) :
+ "Backlash-robust stepping"
+ self.step.step_rel(2) # two half-steps in full step mode
+ def stepAway(self) :
+ "Backlash-robust stepping"
+ self.step.step_rel(-120)
+ self.step.step_rel(110) # HACK, should come back 118
+ def xpWander(self) :
+ self.xp.wander()
+
+def _test_unfold(controlTemp=True) :
+ print "Test unfold"
+ u = unfold(controlTemp=controlTemp)
+ u.unfold(setpoint=0.5, sBindTime=1.0)
+ del u
+ print "unfold successful\n"
+
+
+class unfold_expt :
+ def __init__(self, controlTemp=True) :
+ self.u = unfold(controlTemp=controlTemp)
+ self.runUnfolds = False
+ self.bgRun = None
+ self.getExternalTempSetpoint = None
+ if self.u.T != None :
+ self.tempSetpoint = self.u.T.getTemp()
+ self.getExternalDeflectionSetpoint = None
+ self.deflectionSetpoint = 0.5
+ self.getExternalBindTime = None
+ self.bindTime = 1.0
+ self.getExternalnmDist = None
+ self.nmDist = 600.0
+ self.getExternalnmStep = None
+ self.nmStep = 0.5
+ self.getExternalppStep = None
+ self.ppStep = 10
+ self.getExternalnmPsRate = None
+ self.nmPsRate = 1000.0
+ self.getExternalDataDirectory = None
+ self.dataDirectory = LOG_DIR
+ self.plotData = None
+ self.i=0
+ self.showUnfoldingIndex = None
+ def run(self) :
+ print "running"
+ if self.bgRun != None :
+ raise Exception, "Can't run two backgrounds simultaneously"
+ self.runUnfolds = True
+ print "unfolding in the background"
+ self.bgRun = bg_run( self._run)
+ def stepApproach(self) :
+ print "approaching"
+ if self.getExternalDeflectionSetpoint != None :
+ self.deflectionSetpoint = self.getExternalDeflectionSetpoint()
+ self.u.stepApproach(self.deflectionSetpoint)
+ def _run(self) :
+ print "starting unfold loop"
+ while self.runUnfolds == True :
+ print "get unfold parameters"
+ if self.u.T != None and self.getExternalTempSetpoint != None :
+ setpoint = self.getExternalTempSetpoint()
+ if setpoint != self.tempSetpoint :
+ self.u.T.setTemp(setpoint)
+ self.tempSetpoint = setpoint
+ if self.getExternalDeflectionSetpoint != None :
+ self.deflectionSetpoint = self.getExternalDeflectionSetpoint()
+ if self.getExternalBindTime != None :
+ self.bindTime = self.getExternalBindTime()
+ if self.getExternalnmDist != None :
+ self.nmDist = self.getExternalnmDist()
+ if self.getExternalnmStep != None :
+ self.nmStep = self.getExternalnmStep()
+ if self.getExternalppStep != None :
+ self.ppStep = self.getExternalppStep()
+ if self.getExternalnmPsRate != None :
+ self.nmPsRate = self.getExternalnmPsRate()
+ if self.getExternalDataDirectory != None :
+ self.dataDirectory = self.getExternalDataDirectory()
+ print "run unfold"
+ #print "setpoint ", self.deflectionSetpoint
+ #print "bind time ", self.bindTime
+ #print "nmDist ", self.nmDist
+ #print "nmStep ", self.nmStep
+ #print "ppStep ", self.ppStep
+ #print "nmPsRate ", self.nmPsRate
+ #print "fileID ", self.i
+ #print "dataDir ", self.dataDirectory
+ data = self.u.unfold(setpoint=self.deflectionSetpoint,
+ sBindTime=self.bindTime,
+ nmDist=self.nmDist,
+ nmStep=self.nmStep,
+ ppStep=self.ppStep,
+ nmPsRate=self.nmPsRate,
+ fileID=str(self.i),
+ dataDirectory=self.dataDirectory)
+ print "plot data"
+ if self.plotData != None :
+ self.plotData(data["data"]["Z piezo output"],
+ data["data"]["Deflection input"])
+ if self.showUnfoldingIndex != None :
+ self.showUnfoldingIndex(self.i)
+ self.i += 1
+ return False
+ def stop(self) :
+ if self.bgRun != None :
+ self.runUnfolds = False
+ self.bgRun.wait()
+ self.bgRun = None
+
+def _test_unfold_expt(controlTemp=True) :
+ print "Test unfold_expt"
+ u_expt = unfold.unfold_expt(controlTemp=controlTemp)
+ u_expt.run()
+ time.sleep(100)
+ u_expt.stop()
+ print "unfold_expt working\n"
+
+class bg_run :
+ def __init__(self, function, args=None, finishCallback=None, interruptCallback=None) :
+ print "init bg"
+ def tempFn() :
+ print "temp fn started"
+ complete = False
+ if args == None :
+ complete = function()
+ else :
+ complete = function(args)
+ print "temp Fn finished"
+ if finishCallback != None and complete == True :
+ finishCallback()
+ elif interruptCallback != None and complete == False :
+ interruptCallback()
+ print "tempFn defined"
+ self.thd = threading.Thread(target=tempFn, name="Background Fn")
+ print "starting thread"
+ self.thd.start()
+ print "thread started"
+ def wait(self) :
+ print "joining thread"
+ self.thd.join()
+ print "thread joined"
+
+def _test_bg_run() :
+ print "Test bg_run"
+ print "test without args or callbacks"
+ def say_hello() :
+ for i in range(10) :
+ print "Hello"
+ time.sleep(0.1)
+ return True
+ bg = unfold.bg_run(say_hello)
+ time.sleep(.2)
+ print "The main thread's still going"
+ bg.wait()
+ print "The background process is done"
+
+ print "test without args, but with callbacks"
+ def inter() :
+ print "I was interrupted"
+ def fin() :
+ print "I'm finished"
+ bg = unfold.bg_run(say_hello, finishCallback=fin, interruptCallback=inter)
+ time.sleep(.2)
+ print "The main thread's still going"
+ bg.wait()
+ print "The background process is done"
+
+ print "test with args and callbacks"
+ del say_hello
+ def say_hello(stop=[False]) :
+ for i in range(10) :
+ if stop[0] == True : return False
+ print "Hello"
+ time.sleep(0.1)
+ return True
+ stop = [False]
+ bg = unfold.bg_run(say_hello, args=stop, finishCallback=fin, interruptCallback=inter)
+ time.sleep(.2)
+ print "The main thread's still going, stop it"
+ stop[0] = True
+ bg.wait()
+ del bg
+ del stop
+ del say_hello
+ del inter
+ del fin
+ print "The background process is done"
+
+ print "bg_run working\n"
+
+def loop_rates(u, rates, num_loops=10, die_file=None, song=None, **kwargs):
+ """
+ loop_rates(u, rates, num_loops=10, die_file=None, song=None, **kwargs)
+
+ Constant speed unfolding using the unfold instance u for num_loops
+ through the series of rates listed in rates. You may set die_file
+ to a path, loop_rates() will check that location before each
+ unfolding attempt, and cleanly stop unfolding if the file exists.
+ **kwargs are passed on to u.unfold(), so a full call might look like
+
+ loop_rates(u, rates=[20,200,2e3], num_loops=5, die_file='~/die', song='~wking/Music/system/towerclo.wav', rel_setpoint=1, nmDist=800, sBindTime=2)
+ """
+ if die_file != None:
+ die_file = os.path.expanduser(die_file)
+ if song != None:
+ song = os.path.expanduser(song)
+ for i in range(num_loops):
+ for nmPsRate in rates:
+ if die_file != None and os.path.exists(die_file):
+ return None
+ u.unfold(nmPsRate=nmPsRate, **kwargs)
+ u.xpWander()
+ if song != None:
+ os.system("aplay '%s'" % song)
+
+def test() :
+ _test_unfold(controlTemp=False)
+ _test_bg_run()
+ _test_unfold_expt(controlTemp=False)
+
+if __name__ == "__main__" :
+ test()