Began versioning.
authorW. Trevor King <wking@drexel.edu>
Wed, 3 Jun 2009 19:19:40 +0000 (15:19 -0400)
committerW. Trevor King <wking@drexel.edu>
Wed, 3 Jun 2009 19:19:40 +0000 (15:19 -0400)
unfold.py [new file with mode: 0644]

diff --git a/unfold.py b/unfold.py
new file mode 100644 (file)
index 0000000..fddcbde
--- /dev/null
+++ b/unfold.py
@@ -0,0 +1,513 @@
+TEXT_VERBOSE = False
+PYLAB_INTERACTIVE_VERBOSE = True
+BASE_FIG_NUM = 40
+LOG_DATA = True
+LOG_DIR = '$DEFAULT$/unfold'
+
+import stepper
+import z_piezo
+import z_piezo_utils
+import x_piezo
+import temperature
+import threading
+import time
+import os, os.path
+from numpy import arange, sin, pi, isnan
+import data_logger
+
+if PYLAB_INTERACTIVE_VERBOSE :
+    from pylab import figure, plot, title, legend, hold, subplot, draw
+    import time # for timestamping lines on plots
+
+EFILE = file(os.path.expanduser('~/rsrch/debug.unfold'), 'a')
+
+class ExceptionTooClose (Exception) :
+    """
+    The piezo is too close to the surface,
+    an unfolding would extend past pzMin
+    """
+class ExceptionTooFar (Exception) :
+    """
+    The piezo has reached pzMax without
+    reaching the desired deflection setpoint.
+    """
+
+def plot_dict(d, label) :
+    try :
+        plot(d["Z piezo output"], d["Deflection input"], '.',label=label)
+    except KeyError, string:
+        print d.keys()
+        raise KeyError, string
+
+class unfold_data_log (data_logger.data_log) :
+    def save(self, unfold, timestamp, CTemp, VSetpoint, nmBindPos,
+             sBindTime,
+             nmDist, nmStep, ppStep, nmPsRate, freq,
+             approach_data, unfold_data) :
+        filepath,timestamp = self.get_filename(timestamp)
+        # save the unfolding data
+        dataFile = open(filepath, "w")
+        for i in range(0, len(unfold_data["Z piezo output"])) :
+                dataFile.write("%d\t%d\t%d\n" % (unfold_data["Z piezo output"][i],
+                                                 unfold_data["Deflection input"][i],
+                                                 unfold_data["Z piezo input"][i]))
+        dataFile.close()
+        dataFile = open(filepath+"_approach", "w")
+        for i in range(0, len(approach_data["Z piezo output"])) :
+                dataFile.write("%d\t%d\n" % (approach_data["Z piezo output"][i],
+                                                 approach_data["Deflection input"][i]))
+        dataFile.close()
+
+        # save parameters
+        paramFile = open(filepath+"_param", "w")
+        paramFile.write("Environment\n")
+        paramFile.write("Time:\t"+timestamp+"\n")
+        if CTemp != None :
+            paramFile.write("Temperature (C):\t"+str(CTemp)+"\n")
+
+        paramFile.write("\nPiezo parameters\n")
+        paramFile.write("Z piezo sensitivity (nm/Vp):\t"+str(unfold.zp.sensitivity)+"\n")
+        paramFile.write("Z piezo gain (Vp/Vo):\t"+str(unfold.zp.gain)+"\n")
+        paramFile.write("X piezo sensitivity (nm/Vp):\t"+str(unfold.xp.xpSensitivity)+"\n")
+        paramFile.write("X piezo gain (Vp/Vo):\t"+str(unfold.xp.gain)+"\n")
+        paramFile.write("X piezo position (nm):\t"+str(unfold.xp.out2nm(unfold.xp.curPos()))+"\n")
+
+        paramFile.write("\nApproach parameters\n")
+        paramFile.write("Setpoint (V):\t"+str(VSetpoint)+"\n")
+        paramFile.write("Bind pos (nm):\t"+str(nmBindPos)+"\n")
+        
+        paramFile.write("\nBinding parameters\n")
+        paramFile.write("Bind time (s):\t"+str(sBindTime)+"\n")
+
+        paramFile.write("\nUnfolding parameters\n")
+        paramFile.write("Distance (nm):\t"+str(nmDist)+"\n")
+        paramFile.write("Step size (nm):\t"+str(nmStep)+"\n")
+        paramFile.write("Points per step:\t"+str(ppStep)+"\n")
+        paramFile.write("Unfold rate (nm/s):\t"+str(nmPsRate)+"\n")
+        paramFile.write("Sample rate (Hz):\t"+str(freq)+"\n")
+
+        paramFile.write("\nData fields:\tZ_piezo_out Deflection_in Z_piezo_in\n")
+        paramFile.close()
+
+        return timestamp
+
+class unfold :
+    def __init__(self, controlTemp=True, dataDirectory=LOG_DIR) :
+        self.step = stepper.stepper_obj()
+        self.zp = z_piezo.z_piezo()
+        self.xp = x_piezo.x_piezo()
+        #self.zp = z_piezo.z_piezo(zp_chan = '/Dev1/ao0',
+        #                          zp_mon_chan = '/Dev1/ai1',
+        #                          def_chan = '/Dev1/ai0')
+        #self.xp = x_piezo.x_piezo(xp_chan = '/Dev1/ao1')
+        self.zp.jumpToPos(self.zp.pos_nm2out(0))
+        self.xp.jumpToPos(self.xp.nm2out(0))
+        if controlTemp == True :
+            self.T = temperature.tempController(maxCurrent=1.0)
+        else: self.T = None
+        self.log = unfold_data_log(dataDirectory, log_name="unfold")
+    def unfold(self, setpoint=None, rel_setpoint=1.0, sBindTime = 10.0,
+               nmDist=600, nmStep=0.5, ppStep=10, nmPsRate=1000,
+               dataDirectory=LOG_DIR, fileID=None) :
+        while True :
+            try :
+                data = self.unfold_cycle(setpoint, rel_setpoint, sBindTime,
+                                         nmDist, nmStep, ppStep, nmPsRate,
+                                         dataDirectory, fileID)
+                break
+            except ExceptionTooFar :
+                EFILE.write('caught ExceptionTooFar\n'); EFILE.flush()
+                try : # try for a useful surface distance...
+                    if setpoint == None : # HACK! redundant!
+                        assert rel_setpoint != None, "must have some sort of setpoint"
+                        setpoint = self.curDef() + rel_setpoint
+                        print "setpoint = %g" % setpoint
+                    EFILE.write('attempt getSurfPos\n'); EFILE.flush()
+                    surfPos = self.getSurfPos(setpoint)
+                    EFILE.write('getSurfPos succeeded\n'); EFILE.flush()
+                    print "Too far (surface at %g nm), stepping closer" % surfPos
+                except z_piezo_utils.poorFit, string : # ... oh well, print what we know
+                    EFILE.write('getSurfPos failed\n'); EFILE.flush()
+                    print "Too far, stepping closer"
+                    print "(Fit failed with: %s)" % string
+                EFILE.write('zero Z piezo\n'); EFILE.flush()
+                self.zp.jumpToPos(self.zp.pos_nm2out(0))
+                EFILE.write('step closer\n'); EFILE.flush()
+                self.stepCloser()
+                EFILE.write('step closer successful\n'); EFILE.flush()
+            except ExceptionTooClose :
+                try : # try for a useful surface distance...
+                    if setpoint == None : # !HACK !redundant!
+                        assert rel_setpoint != None, "must have some sort of setpoint"
+                        setpoint = self.curDef() + rel_setpoint
+                        print "setpoint = %g" % setpoint
+                    surfPos = self.getSurfPos(setpoint)
+                    print "Too close (surface at %g nm), stepping away" % surfPos
+                except z_piezo_utils.poorFit, string : # ... oh well, print what we know
+                    print "Too close, stepping away"
+                    print "(Fit failed with: %s)" % string
+                self.zp.jumpToPos(self.zp.pos_nm2out(0))
+                self.stepAway()
+            print "Too close, stepping away"
+        return data
+    def stepApproach(self, setpoint) :
+        cd = self.curDef()
+        while cd < setpoint or isnan(cd) : # sometimes during approach, we get negative nan values
+            print "deflection %g < setpoint %g.  step closer" % (cd, setpoint)
+            self.stepCloser()
+            cd = self.curDef()
+    def stepApproachRetractCurve(self, setpoint) :
+        def_array = []
+        step_array = []
+        orig_step = self.step.get_cur_pos()
+        def store_point() :
+            cd = self.curDef()
+            def_array.append(cd)
+            step_array.append(self.step.get_cur_pos())
+            return cd        
+        cd = store_point()
+        while cd < setpoint or isnan(cd) : # sometimes during approach, we get negative nan values
+            print "deflection %g < setpoint %g.  step closer" % (cd, setpoint)
+            self.step.step_rel(2)
+            cd = store_point()
+        while self.step.get_cur_pos() > orig_step :
+            self.step.step_rel(-2)
+            store_point()
+        return {"Deflection":def_array, "Stepper position":step_array}
+    def piezoApproach(self, setpoint, return_data=False) :
+        startPos = self.zp.curPos()
+        curVals,data = z_piezo_utils.moveToPosOrDef(self.zp,
+                           self.zp.zpMax,
+                           self.zp.def_V2in(setpoint),
+                           step=10,
+                           return_data=return_data)
+        if curVals["Deflection input"] < self.zp.def_V2in(setpoint) :
+            EFILE.write('Unfolding too far\n'); EFILE.flush()
+            self.zp.jumpToPos(startPos)
+            self.zp.updateInputs()
+            if PYLAB_INTERACTIVE_VERBOSE == True :
+                figure(BASE_FIG_NUM+1)
+                hold(False)
+                plot_dict(data, 'Approach')
+                hold(True)
+                title('Unfolding too far')
+            EFILE.write('Raising ExceptionTooFar\n'); EFILE.flush()
+            raise ExceptionTooFar
+        if return_data == True :
+            return (curVals, data)
+        else :
+            return curVals
+    def bind(self, sTime=10.0) :
+        time.sleep(sTime)
+    def unfold_pull(self, nmDist=600, nmStep=0.5, ppStep=10, nmPsRate=1000) :
+        if nmStep < 0 :
+                nmStep = -nmStep
+        numSteps = int(nmDist/nmStep+1)
+        out = [0.0]*numSteps*ppStep
+        pos = self.zp.curPos()
+        startPos = pos
+        step = int(self.zp.pos_nm2out(nmStep)-self.zp.pos_nm2out(0))
+        for i in range(0, numSteps) :
+                for j in range(0, ppStep) :
+                        out[i*ppStep + j] = pos
+                pos -= step
+        # points/step * step/nm * nm/s  =  points/s
+        freq = ppStep / nmStep * nmPsRate
+        print "unfolding %d points per %d steps from %d to %d at freq of %g" % (ppStep, numSteps, startPos, pos, freq) 
+        return {'freq':freq, 'data':self.zp.ramp(out, freq)}
+    def generate_refold_bind_pull_output(self, nmSurfPos=0,
+            sBindTime=2, nmUnfoldDist=160, nmRefoldDist=145, sPauseTime=2,
+            cycles=50, nmStep=0.5, ppStepUnfold=10, ppStepRefold=1,
+            nmPsRateUnfold=1000) :
+        
+        if nmStep < 0 :##
+            nmStep = -nmStep
+        numSteps = int(nmDist/nmStep+1)
+        out = [0.0]*numSteps*ppStep
+        pos = self.zp.curPos()
+        startPos = pos
+        step = int(self.zp.pos_nm2out(nmStep)-self.zp.pos_nm2out(0))
+        for i in range(0, numSteps) :
+                for j in range(0, ppStep) :
+                        out[i*ppStep + j] = pos
+                pos -= step
+        # points/step * step/nm * nm/s  =  points/s
+        freq = ppStep / nmStep * nmPsRate
+        print "unfolding %d points per %d steps from %d to %d at freq of %g" % (ppStep, numSteps, startPos, pos, freq) 
+        return {'freq':freq, 'data':self.zp.ramp(out, freq)}
+    def unfold_cycle(self, setpoint=None, rel_setpoint=1.0, sBindTime = 10.0,
+                     nmDist=600, nmStep=0.5, ppStep=10, nmPsRate=1000,
+                     dataDirectory=LOG_DIR, fileID=None) :
+        if setpoint == None :
+            assert rel_setpoint != None, "must have some sort of setpoint"
+            setpoint = self.curDef() + rel_setpoint
+            print "setpoint = %g" % setpoint
+        timestamp = time.strftime("%Y%m%d%H%M%S")
+       temp = None
+        if self.T != None : temp = self.T.getTemp()
+        print "approaching"
+        startPos = self.zp.curPos()
+        curVals,approach_data = self.piezoApproach(setpoint, return_data=True)
+        bindPos = curVals['Z piezo output'] # in output units
+        finalPos = bindPos - (self.zp.pos_nm2out(nmDist)-self.zp.pos_nm2out(0))
+        try :
+            self.zp._check_range(finalPos)
+        except z_piezo.outOfRange :
+            self.zp.jumpToPos(startPos)
+            self.zp.updateInputs()
+            if PYLAB_INTERACTIVE_VERBOSE == True :
+                figure(BASE_FIG_NUM+1)
+                hold(False)
+                plot_dict(approach_data, 'Approach')
+                hold(True)
+                title('Unfolding too close')
+            raise ExceptionTooClose
+        print "binding for %.3f seconds" % sBindTime
+        self.bind(sBindTime)
+        out = self.unfold_pull(nmDist, nmStep, ppStep, nmPsRate)
+
+        if PYLAB_INTERACTIVE_VERBOSE == True :
+            figure(BASE_FIG_NUM)
+            hold(False)
+            plot_dict(approach_data, 'Approach')
+            hold(True)
+            plot_dict(out['data'], 'Unfold')
+            legend(loc='best')
+            title('Unfolding')
+            draw()
+
+        if LOG_DATA == True:
+            print "saving"
+            timestamp = self.log.save(self, timestamp, temp, setpoint,
+                                      int(self.zp.pos_out2nm(bindPos)), # don't need lots of precision...
+                                      sBindTime,
+                                      nmDist, nmStep, ppStep, nmPsRate, out['freq'],
+                                      approach_data, out['data'])
+        return out
+    def getSurfPos(self, setpoint=2, textVerbose=False, plotVerbose=False) :
+        return self.zp.pos_out2nm(z_piezo_utils.getSurfPos(self.zp, self.zp.def_V2in(setpoint), textVerbose, plotVerbose))
+    def curDef(self) :
+        self.zp.updateInputs()
+        return self.zp.def_in2V(self.zp.curDef())
+    def stepCloser(self) :
+        "Backlash-robust stepping"
+        self.step.step_rel(2) # two half-steps in full step mode
+    def stepAway(self) :
+        "Backlash-robust stepping"
+        self.step.step_rel(-120)
+        self.step.step_rel(110) # HACK, should come back 118
+    def xpWander(self) :
+        self.xp.wander()
+
+def _test_unfold(controlTemp=True) :
+    print "Test unfold"
+    u = unfold(controlTemp=controlTemp)
+    u.unfold(setpoint=0.5, sBindTime=1.0)
+    del u
+    print "unfold successful\n"
+
+
+class unfold_expt :
+    def __init__(self, controlTemp=True) :
+        self.u = unfold(controlTemp=controlTemp)
+        self.runUnfolds = False
+        self.bgRun = None
+        self.getExternalTempSetpoint = None
+        if self.u.T != None :
+            self.tempSetpoint = self.u.T.getTemp()
+        self.getExternalDeflectionSetpoint = None
+        self.deflectionSetpoint = 0.5
+        self.getExternalBindTime = None
+        self.bindTime = 1.0
+        self.getExternalnmDist = None
+        self.nmDist = 600.0
+        self.getExternalnmStep = None
+        self.nmStep = 0.5
+        self.getExternalppStep = None
+        self.ppStep = 10
+        self.getExternalnmPsRate = None
+        self.nmPsRate = 1000.0
+        self.getExternalDataDirectory = None
+        self.dataDirectory = LOG_DIR
+        self.plotData = None
+        self.i=0
+        self.showUnfoldingIndex = None
+    def run(self) :
+        print "running"
+        if self.bgRun != None :
+            raise Exception, "Can't run two backgrounds simultaneously"
+        self.runUnfolds = True
+        print "unfolding in the background"
+        self.bgRun = bg_run( self._run)
+    def stepApproach(self) :
+        print "approaching"
+        if self.getExternalDeflectionSetpoint != None :
+            self.deflectionSetpoint = self.getExternalDeflectionSetpoint()
+        self.u.stepApproach(self.deflectionSetpoint)
+    def _run(self) :
+        print "starting unfold loop"
+        while self.runUnfolds == True :
+            print "get unfold parameters"
+            if self.u.T != None and self.getExternalTempSetpoint != None :
+                setpoint = self.getExternalTempSetpoint()
+                if setpoint != self.tempSetpoint :
+                    self.u.T.setTemp(setpoint)
+                    self.tempSetpoint = setpoint
+            if self.getExternalDeflectionSetpoint != None :
+                self.deflectionSetpoint = self.getExternalDeflectionSetpoint()
+            if self.getExternalBindTime != None :
+                self.bindTime = self.getExternalBindTime()
+            if self.getExternalnmDist != None :
+                self.nmDist = self.getExternalnmDist()
+            if self.getExternalnmStep != None :
+                self.nmStep = self.getExternalnmStep()
+            if self.getExternalppStep != None :
+                self.ppStep = self.getExternalppStep()
+            if self.getExternalnmPsRate != None :
+                self.nmPsRate = self.getExternalnmPsRate()
+            if self.getExternalDataDirectory != None :
+                self.dataDirectory = self.getExternalDataDirectory()
+            print "run unfold"
+            #print "setpoint  ", self.deflectionSetpoint
+            #print "bind time ", self.bindTime
+            #print "nmDist    ", self.nmDist
+            #print "nmStep    ", self.nmStep
+            #print "ppStep    ", self.ppStep
+            #print "nmPsRate  ", self.nmPsRate
+            #print "fileID    ", self.i
+            #print "dataDir   ", self.dataDirectory
+            data = self.u.unfold(setpoint=self.deflectionSetpoint,
+                                 sBindTime=self.bindTime,
+                                 nmDist=self.nmDist,
+                                 nmStep=self.nmStep,
+                                 ppStep=self.ppStep,
+                                 nmPsRate=self.nmPsRate,
+                                 fileID=str(self.i),
+                                 dataDirectory=self.dataDirectory)
+            print "plot data"
+            if self.plotData != None :
+                self.plotData(data["data"]["Z piezo output"],
+                              data["data"]["Deflection input"])
+            if self.showUnfoldingIndex != None :
+                self.showUnfoldingIndex(self.i)
+            self.i += 1
+        return False
+    def stop(self) :
+        if self.bgRun != None :
+            self.runUnfolds = False
+            self.bgRun.wait()
+            self.bgRun = None
+
+def _test_unfold_expt(controlTemp=True) :
+    print "Test unfold_expt"
+    u_expt = unfold.unfold_expt(controlTemp=controlTemp)
+    u_expt.run()
+    time.sleep(100)
+    u_expt.stop()
+    print "unfold_expt working\n"
+
+class bg_run :
+    def __init__(self, function, args=None, finishCallback=None, interruptCallback=None) :
+        print "init bg"
+        def tempFn() :
+            print "temp fn started"
+            complete = False
+            if args == None :
+                complete = function()
+            else :
+                complete = function(args)
+            print "temp Fn finished"
+            if finishCallback != None and complete == True :
+                finishCallback()
+            elif interruptCallback != None and complete == False :
+                interruptCallback()
+        print "tempFn defined"
+        self.thd = threading.Thread(target=tempFn, name="Background Fn")
+        print "starting thread"
+        self.thd.start()
+        print "thread started"
+    def wait(self) :
+        print "joining thread"
+        self.thd.join()
+        print "thread joined"
+
+def _test_bg_run() :
+    print "Test bg_run"
+    print "test without args or callbacks"
+    def say_hello() :
+        for i in range(10) :
+            print "Hello"
+            time.sleep(0.1)
+        return True
+    bg = unfold.bg_run(say_hello)
+    time.sleep(.2)
+    print "The main thread's still going"
+    bg.wait()
+    print "The background process is done"
+    
+    print "test without args, but with callbacks"
+    def inter() :
+        print "I was interrupted"
+    def fin() :
+        print "I'm finished"
+    bg = unfold.bg_run(say_hello, finishCallback=fin, interruptCallback=inter)
+    time.sleep(.2)
+    print "The main thread's still going"
+    bg.wait()
+    print "The background process is done"
+
+    print "test with args and callbacks"
+    del say_hello
+    def say_hello(stop=[False]) :
+        for i in range(10) :
+            if stop[0] == True : return False
+            print "Hello"
+            time.sleep(0.1)
+        return True
+    stop = [False]
+    bg = unfold.bg_run(say_hello, args=stop, finishCallback=fin, interruptCallback=inter)
+    time.sleep(.2)
+    print "The main thread's still going, stop it"
+    stop[0] = True
+    bg.wait()
+    del bg
+    del stop
+    del say_hello
+    del inter
+    del fin
+    print "The background process is done"
+    
+    print "bg_run working\n"
+
+def loop_rates(u, rates, num_loops=10, die_file=None, song=None, **kwargs):
+    """
+    loop_rates(u, rates, num_loops=10, die_file=None, song=None, **kwargs)
+    
+    Constant speed unfolding using the unfold instance u for num_loops
+    through the series of rates listed in rates.  You may set die_file
+    to a path, loop_rates() will check that location before each
+    unfolding attempt, and cleanly stop unfolding if the file exists.
+    **kwargs are passed on to u.unfold(), so a full call might look like
+    
+    loop_rates(u, rates=[20,200,2e3], num_loops=5, die_file='~/die', song='~wking/Music/system/towerclo.wav', rel_setpoint=1, nmDist=800, sBindTime=2)
+    """
+    if die_file != None:
+        die_file = os.path.expanduser(die_file)
+    if song != None:
+        song = os.path.expanduser(song)
+    for i in range(num_loops):
+        for nmPsRate in rates:
+            if die_file != None and os.path.exists(die_file):
+                return None
+            u.unfold(nmPsRate=nmPsRate, **kwargs)
+            u.xpWander()
+    if song != None:
+        os.system("aplay '%s'" % song)
+
+def test() :
+    _test_unfold(controlTemp=False)
+    _test_bg_run()
+    _test_unfold_expt(controlTemp=False)
+
+if __name__ == "__main__" :
+    test()