"""
import copy
+import logging
import numpy
import scipy
cant_adjust.run(hooke, inqueue, outqueue, **params)
return curve
+
class SurfacePositionModel (ModelFitter):
"""Bilinear surface position model.
In order for this model to produce a satisfactory fit, there
should be enough data in the off-surface region that interactions
due to proteins, etc. will not seriously skew the fit in the
- off-surface region.
+ off-surface region. If you don't have much of a tail, you can set
+ the `info` dict's `ignore non-contact before index` parameter to
+ the index of the last surface- or protein-related feature.
"""
def model(self, params):
"""A continuous, bilinear model.
:math:`p_3` is the slope of the second region.
"""
p = params # convenient alias
- if self.info.get('force zero non-contact slope', None) == True:
+ rNC_ignore = self.info['ignore non-contact before index']
+ if self.info['force zero non-contact slope'] == True:
p = list(p)
p.append(0.) # restore the non-contact slope parameter
r2 = numpy.round(abs(p[2]))
if r2 < len(self._data)-1:
self._model_data[r2:] = \
p[0] + p[1]*p[2] + p[3] * numpy.arange(len(self._data)-r2)
+ if r2 < rNC_ignore:
+ self._model_data[r2:rNC_ignore] = self._data[r2:rNC_ignore]
return self._model_data
- def set_data(self, data, info=None):
- super(SurfacePositionModel, self).set_data(data, info)
- if info == None:
- info = {}
- self.info = info
- self.info['min position'] = 0
- self.info['max position'] = len(data)
- self.info['max deflection'] = data.max()
- self.info['min deflection'] = data.min()
- self.info['position range'] = self.info['max position'] - self.info['min position']
- self.info['deflection range'] = self.info['max deflection'] - self.info['min deflection']
+ def set_data(self, data, info=None, *args, **kwargs):
+ super(SurfacePositionModel, self).set_data(data, info, *args, **kwargs)
+ if self.info == None:
+ self.info = {}
+ for key,value in [
+ ('force zero non-contact slope', False),
+ ('ignore non-contact before index', 6158),
+ ('min position', 0), # Store postions etc. to avoid recalculating.
+ ('max position', len(data)),
+ ('max deflection', data.max()),
+ ('min deflection', data.min()),
+ ]:
+ if key not in self.info:
+ self.info[key] = value
+ for key,value in [
+ ('position range',
+ self.info['max position'] - self.info['min position']),
+ ('deflection range',
+ self.info['max deflection'] - self.info['min deflection']),
+ ]:
+ if key not in self.info:
+ self.info[key] = value
def guess_initial_params(self, outqueue=None):
"""Guess the initial parameters.
right_slope = 0
self.info['guessed contact slope'] = left_slope
params = [left_offset, left_slope, kink_position, right_slope]
- if self.info.get('force zero non-contact slope', None) == True:
+ if self.info['force zero non-contact slope'] == True:
params = params[:-1]
return params
Notes
-----
- We guess offset scale (:math:`p_0`) as one tenth of the total
- deflection range, the kink scale (:math:`p_2`) as one tenth of
- the total index range, the initial (contact) slope scale
- (:math:`p_1`) as one tenth of the contact slope estimation,
- and the final (non-contact) slope scale (:math:`p_3`) is as
- one tenth of the initial slope scale.
+ We the scale as one tenth for each parameter.
"""
- offset_scale = self.info['deflection range']/10.
- left_slope_scale = abs(params[1])/10.
- kink_scale = self.info['position range']/10.
- right_slope_scale = left_slope_scale/10.
+ offset_scale = 0.1
+ left_slope_scale = 0.1
+ kink_scale = 0.1
+ right_slope_scale = 0.1
scale = [offset_scale, left_slope_scale, kink_scale, right_slope_scale]
- if self.info.get('force zero non-contact slope', None) == True:
+ if self.info['force zero non-contact slope'] == True:
scale = scale[:-1]
return scale
def fit(self, *args, **kwargs):
+ """Fit the model to the data.
+
+ Notes
+ -----
+ We change the `epsfcn` default from :func:`scipy.optimize.leastsq`'s
+ `0` to `1e-3`, so the initial Jacobian estimate takes larger steps,
+ which helps avoid being trapped in noise-generated local minima.
+ """
self.info['guessed contact slope'] = None
+ if 'epsfcn' not in kwargs:
+ kwargs['epsfcn'] = 1e-3 # take big steps to estimate the Jacobian
params = super(SurfacePositionModel, self).fit(*args, **kwargs)
params[2] = abs(params[2])
- if self.info.get('force zero non-contact slope', None) == True:
+ if self.info['force zero non-contact slope'] == True:
params = list(params)
params.append(0.) # restore the non-contact slope parameter
% (params[3], self.info['guessed contact slope']))
return params
+
class VelocityClampPlugin (Plugin):
def __init__(self):
super(VelocityClampPlugin, self).__init__(name='vclamp')
self._commands = [
SurfaceContactCommand(self), ForceCommand(self),
- CantileverAdjustedExtensionCommand(self),
+ CantileverAdjustedExtensionCommand(self), FlattenCommand(self),
]
def default_settings(self):
Argument(name='input distance column', type='string',
default='z piezo (m)',
help="""
-Name of the column to use as the surface positioning input.
+Name of the column to use as the surface position input.
""".strip()),
Argument(name='input deflection column', type='string',
default='deflection (m)',
Argument(name='output distance column', type='string',
default='surface distance',
help="""
-Name of the column (without units) to use as the surface positioning output.
+Name of the column (without units) to use as the surface position output.
""".strip()),
Argument(name='output deflection column', type='string',
default='surface deflection',
Argument(name='fit parameters info name', type='string',
default='surface deflection offset',
help="""
-Name (without units) for storing the deflection offset in the `.info` dictionary.
+Name (without units) for storing fit parameters in the `.info` dictionary.
""".strip()),
],
help=self.__doc__, plugin=plugin)
Notes
-----
- Uses :func:`analyze_surf_pos_data_wtk` internally.
+ Uses :class:`SurfacePositionModel` internally.
"""
reverse = z_data[0] > z_data[-1]
if reverse == True: # approaching, contact region on the right
d_data = d_data[::-1]
- s = SurfacePositionModel(d_data)
- s.info['force zero non-contact slope'] = True
+ s = SurfacePositionModel(d_data, info={
+ 'force zero non-contact slope':True},
+ rescale=True)
offset,contact_slope,surface_index,non_contact_slope = s.fit(
outqueue=outqueue)
info = {
params['curve'].data[params['block']] = new
-class generalvclampCommands(object):
-
- def _plug_init(self):
- self.basecurrent=None
- self.basepoints=None
- self.autofile=''
-
- def do_distance(self,args):
- '''
- DISTANCE
- (generalvclamp.py)
- Measure the distance (in nm) between two points.
- For a standard experiment this is the delta X distance.
- For a force clamp experiment this is the delta Y distance (actually becomes
- an alias of zpiezo)
- -----------------
- Syntax: distance
- '''
- if self.current.curve.experiment == 'clamp':
- print 'You wanted to use zpiezo perhaps?'
- return
- else:
- dx,unitx,dy,unity=self._delta(set=1)
- print str(dx*(10**9))+' nm'
- to_dump='distance '+self.current.path+' '+str(dx*(10**9))+' nm'
- self.outlet.push(to_dump)
-
-
- def do_force(self,args):
- '''
- FORCE
- (generalvclamp.py)
- Measure the force difference (in pN) between two points
- ---------------
- Syntax: force
- '''
- if self.current.curve.experiment == 'clamp':
- print 'This command makes no sense for a force clamp experiment.'
- return
- dx,unitx,dy,unity=self._delta(set=1)
- print str(dy*(10**12))+' pN'
- to_dump='force '+self.current.path+' '+str(dy*(10**12))+' pN'
- self.outlet.push(to_dump)
-
-
- def do_forcebase(self,args):
- '''
- FORCEBASE
- (generalvclamp.py)
- Measures the difference in force (in pN) between a point and a baseline
- took as the average between two points.
-
- The baseline is fixed once for a given curve and different force measurements,
- unless the user wants it to be recalculated
- ------------
- Syntax: forcebase [rebase]
- rebase: Forces forcebase to ask again the baseline
- max: Instead of asking for a point to measure, asks for two points and use
- the maximum peak in between
- '''
- rebase=False #if true=we select rebase
- maxpoint=False #if true=we measure the maximum peak
-
- plot=self._get_displayed_plot()
- whatset=1 #fixme: for all sets
- if 'rebase' in args or (self.basecurrent != self.current.path):
- rebase=True
- if 'max' in args:
- maxpoint=True
-
- if rebase:
- print 'Select baseline'
- self.basepoints=self._measure_N_points(N=2, whatset=whatset)
- self.basecurrent=self.current.path
-
- if maxpoint:
- print 'Select two points'
- points=self._measure_N_points(N=2, whatset=whatset)
- boundpoints=[points[0].index, points[1].index]
- boundpoints.sort()
- try:
- y=min(plot.vectors[whatset][1][boundpoints[0]:boundpoints[1]])
- except ValueError:
- print 'Chosen interval not valid. Try picking it again. Did you pick the same point as begin and end of interval?'
- else:
- print 'Select point to measure'
- points=self._measure_N_points(N=1, whatset=whatset)
- #whatplot=points[0].dest
- y=points[0].graph_coords[1]
-
- #fixme: code duplication
- boundaries=[self.basepoints[0].index, self.basepoints[1].index]
- boundaries.sort()
- to_average=plot.vectors[whatset][1][boundaries[0]:boundaries[1]] #y points to average
-
- avg=np.mean(to_average)
- forcebase=abs(y-avg)
- print str(forcebase*(10**12))+' pN'
- to_dump='forcebase '+self.current.path+' '+str(forcebase*(10**12))+' pN'
- self.outlet.push(to_dump)
-
- def plotmanip_multiplier(self, plot, current):
- '''
- Multiplies all the Y values of an SMFS curve by a value stored in the 'force_multiplier'
- configuration variable. Useful for calibrations and other stuff.
- '''
-
- #not a smfs curve...
- if current.curve.experiment != 'smfs':
- return plot
-
- #only one set is present...
- if len(self.plots[0].vectors) != 2:
- return plot
-
- #multiplier is 1...
- if (self.config['force_multiplier']==1):
- return plot
-
- for i in range(len(plot.vectors[0][1])):
- plot.vectors[0][1][i]=plot.vectors[0][1][i]*self.config['force_multiplier']
-
- for i in range(len(plot.vectors[1][1])):
- plot.vectors[1][1][i]=plot.vectors[1][1][i]*self.config['force_multiplier']
-
- return plot
-
-
- def plotmanip_flatten(self, plot, current, customvalue=False):
- '''
- Subtracts a polynomial fit to the non-contact part of the curve, as to flatten it.
- the best polynomial fit is chosen among polynomials of degree 1 to n, where n is
- given by the configuration file or by the customvalue.
-
- customvalue= int (>0) --> starts the function even if config says no (default=False)
- '''
-
- #not a smfs curve...
- if current.curve.experiment != 'smfs':
- return plot
+class FlattenCommand (Command):
+ """Flatten a deflection column.
- #only one set is present...
- if len(self.plots[0].vectors) != 2:
- return plot
+ Subtracts a polynomial fit from the non-contact part of the curve
+ to flatten it. The best polynomial fit is chosen among
+ polynomials of degree 1 to `max degree`.
- #config is not flatten, and customvalue flag is false too
- if (not self.config['flatten']) and (not customvalue):
- return plot
+ .. todo: Why does flattening use a polynomial fit and not a sinusoid?
+ Isn't most of the oscillation due to laser interference?
+ See Jaschke 1995 ( 10.1063/1.1146018 )
+ and the figure 4 caption of Weisenhorn 1992 ( 10.1103/PhysRevB.45.11226 )
+ """
+ def __init__(self, plugin):
+ super(FlattenCommand, self).__init__(
+ name='add flattened extension array',
+ arguments=[
+ CurveArgument,
+ Argument(name='block', aliases=['set'], type='int', default=0,
+ help="""
+Data block for which the adjusted extension should be calculated. For
+an approach/retract force curve, `0` selects the approaching curve and
+`1` selects the retracting curve.
+""".strip()),
+ Argument(name='max degree', type='int',
+ default=1,
+ help="""
+Highest order polynomial to consider for flattening. Using values
+greater than one usually doesn't help and can give artifacts.
+However, it could be useful too. (TODO: Back this up with some
+theory...)
+""".strip()),
+ Argument(name='input distance column', type='string',
+ default='surface distance (m)',
+ help="""
+Name of the column to use as the distance input.
+""".strip()),
+ Argument(name='input deflection column', type='string',
+ default='deflection (N)',
+ help="""
+Name of the column to use as the deflection input.
+""".strip()),
+ Argument(name='output deflection column', type='string',
+ default='flattened deflection',
+ help="""
+Name of the column (without units) to use as the deflection output.
+""".strip()),
+ Argument(name='fit info name', type='string',
+ default='flatten fit',
+ help="""
+Name of the flattening information in the `.info` dictionary.
+""".strip()),
+ ],
+ help=self.__doc__, plugin=plugin)
- max_exponent=12
- delta_contact=0
+ def _run(self, hooke, inqueue, outqueue, params):
+ data = params['curve'].data[params['block']]
+ # HACK? rely on params['curve'] being bound to the local hooke
+ # playlist (i.e. not a copy, as you would get by passing a
+ # curve through the queue). Ugh. Stupid queues. As an
+ # alternative, we could pass lookup information through the
+ # queue.
+ new = Data((data.shape[0], data.shape[1]+1), dtype=data.dtype)
+ new.info = copy.deepcopy(data.info)
+ new[:,:-1] = data
+ z_data = data[:,data.info['columns'].index(
+ params['input distance column'])]
+ d_data = data[:,data.info['columns'].index(
+ params['input deflection column'])]
- if customvalue:
- max_cycles=customvalue
- else:
- max_cycles=self.config['flatten'] #Using > 1 usually doesn't help and can give artefacts. However, it could be useful too.
-
- contact_index=self.find_contact_point()
-
- valn=[[] for item in range(max_exponent)]
- yrn=[0.0 for item in range(max_exponent)]
- errn=[0.0 for item in range(max_exponent)]
-
- #Check if we have a proper numerical value
- try:
- zzz=int(max_cycles)
- except:
- #Loudly and annoyingly complain if it's not a number, then fallback to zero
- print '''Warning: flatten value is not a number!
- Use "set flatten" or edit hooke.conf to set it properly
- Using zero.'''
- max_cycles=0
-
- for i in range(int(max_cycles)):
-
- x_ext=plot.vectors[0][0][contact_index+delta_contact:]
- y_ext=plot.vectors[0][1][contact_index+delta_contact:]
- x_ret=plot.vectors[1][0][contact_index+delta_contact:]
- y_ret=plot.vectors[1][1][contact_index+delta_contact:]
- for exponent in range(max_exponent):
- try:
- valn[exponent]=sp.polyfit(x_ext,y_ext,exponent)
- yrn[exponent]=sp.polyval(valn[exponent],x_ret)
- errn[exponent]=sp.sqrt(sum((yrn[exponent]-y_ext)**2)/float(len(y_ext)))
- except Exception,e:
- print 'Cannot flatten!'
- print e
- return plot
-
- best_exponent=errn.index(min(errn))
-
- #extension
- ycorr_ext=y_ext-yrn[best_exponent]+y_ext[0] #noncontact part
- yjoin_ext=np.array(plot.vectors[0][1][0:contact_index+delta_contact]) #contact part
- #retraction
- ycorr_ret=y_ret-yrn[best_exponent]+y_ext[0] #noncontact part
- yjoin_ret=np.array(plot.vectors[1][1][0:contact_index+delta_contact]) #contact part
-
- ycorr_ext=np.concatenate((yjoin_ext, ycorr_ext))
- ycorr_ret=np.concatenate((yjoin_ret, ycorr_ret))
-
- plot.vectors[0][1]=list(ycorr_ext)
- plot.vectors[1][1]=list(ycorr_ret)
-
- return plot
-
- #---SLOPE---
- def do_slope(self,args):
- '''
- SLOPE
- (generalvclamp.py)
- Measures the slope of a delimited chunk on the return trace.
- The chunk can be delimited either by two manual clicks, or have
- a fixed width, given as an argument.
- ---------------
- Syntax: slope [width]
- The facultative [width] parameter specifies how many
- points will be considered for the fit. If [width] is
- specified, only one click will be required.
- (c) Marco Brucale, Massimo Sandal 2008
- '''
-
- # Reads the facultative width argument
- try:
- fitspan=int(args)
- except:
- fitspan=0
-
- # Decides between the two forms of user input, as per (args)
- if fitspan == 0:
- # Gets the Xs of two clicked points as indexes on the current curve vector
- print 'Click twice to delimit chunk'
- points=self._measure_N_points(N=2,whatset=1)
- else:
- print 'Click once on the leftmost point of the chunk (i.e.usually the peak)'
- points=self._measure_N_points(N=1,whatset=1)
-
- slope=self._slope(points,fitspan)
-
- # Outputs the relevant slope parameter
- print 'Slope:'
- print str(slope)
- to_dump='slope '+self.current.path+' '+str(slope)
- self.outlet.push(to_dump)
-
- def _slope(self,points,fitspan):
- # Calls the function linefit_between
- parameters=[0,0,[],[]]
- try:
- clickedpoints=[points[0].index,points[1].index]
- clickedpoints.sort()
- except:
- clickedpoints=[points[0].index-fitspan,points[0].index]
-
- try:
- parameters=self.linefit_between(clickedpoints[0],clickedpoints[1])
- except:
- print 'Cannot fit. Did you click twice the same point?'
- return
-
- # Outputs the relevant slope parameter
- print 'Slope:'
- print str(parameters[0])
- to_dump='slope '+self.curve.path+' '+str(parameters[0])
- self.outlet.push(to_dump)
-
- # Makes a vector with the fitted parameters and sends it to the GUI
- xtoplot=parameters[2]
- ytoplot=[]
- x=0
- for x in xtoplot:
- ytoplot.append((x*parameters[0])+parameters[1])
-
- clickvector_x, clickvector_y=[], []
- for item in points:
- clickvector_x.append(item.graph_coords[0])
- clickvector_y.append(item.graph_coords[1])
-
- lineplot=self._get_displayed_plot(0) #get topmost displayed plot
-
- lineplot.add_set(xtoplot,ytoplot)
- lineplot.add_set(clickvector_x, clickvector_y)
-
-
- if lineplot.styles==[]:
- lineplot.styles=[None,None,None,'scatter']
- else:
- lineplot.styles+=[None,'scatter']
- if lineplot.colors==[]:
- lineplot.colors=[None,None,'black',None]
- else:
- lineplot.colors+=['black',None]
-
-
- self._send_plot([lineplot])
-
- return parameters[0]
-
-
- def linefit_between(self,index1,index2,whatset=1):
- '''
- Creates two vectors (xtofit,ytofit) slicing out from the
- current return trace a portion delimited by the two indexes
- given as arguments.
- Then does a least squares linear fit on that slice.
- Finally returns [0]=the slope, [1]=the intercept of the
- fitted 1st grade polynomial, and [2,3]=the actual (x,y) vectors
- used for the fit.
- (c) Marco Brucale, Massimo Sandal 2008
- '''
- # Translates the indexes into two vectors containing the x,y data to fit
- xtofit=self.plots[0].vectors[whatset][0][index1:index2]
- ytofit=self.plots[0].vectors[whatset][1][index1:index2]
-
- # Does the actual linear fitting (simple least squares with numpy.polyfit)
- linefit=[]
- linefit=np.polyfit(xtofit,ytofit,1)
-
- return (linefit[0],linefit[1],xtofit,ytofit)
-
-
- def fit_interval_nm(self,start_index,plot,nm,backwards):
- '''
- Calculates the number of points to fit, given a fit interval in nm
- start_index: index of point
- plot: plot to use
- backwards: if true, finds a point backwards.
- '''
- whatset=1 #FIXME: should be decidable
- x_vect=plot.vectors[1][0]
-
- c=0
- i=start_index
- start=x_vect[start_index]
- maxlen=len(x_vect)
- while abs(x_vect[i]-x_vect[start_index])*(10**9) < nm:
- if i==0 or i==maxlen-1: #we reached boundaries of vector!
- return c
-
- if backwards:
- i-=1
- else:
- i+=1
- c+=1
- return c
-
-
-
- def find_current_peaks(self,noflatten, a=True, maxpeak=True):
- #Find peaks.
- if a==True:
- a=self.convfilt_config['mindeviation']
+ d_name,d_unit = split_data_label(params['input deflection column'])
+ new.info['columns'].append(
+ join_data_label(params['output deflection column'], d_unit))
+
+ contact_index = numpy.absolute(z_data).argmin()
+ mask = z_data > 0
+ indices = numpy.argwhere(mask)
+ z_nc = z_data[indices].flatten()
+ d_nc = d_data[indices].flatten()
+
+ min_err = numpy.inf
+ degree = poly_values = None
+ log = logging.getLogger('hooke')
+ for deg in range(params['max degree']):
try:
- abs_devs=float(a)
- except:
- print "Bad input, using default."
- abs_devs=self.convfilt_config['mindeviation']
-
- defplot=self.current.curve.default_plots()[0]
- if not noflatten:
- flatten=self._find_plotmanip('flatten') #Extract flatten plotmanip
- defplot=flatten(defplot, self.current, customvalue=1) #Flatten curve before feeding it to has_peaks
- pk_location,peak_size=self.has_peaks(defplot, abs_devs, maxpeak)
- return pk_location, peak_size
-
-
- def pickup_contact_point(self,N=1,whatset=1):
- '''macro to pick up the contact point by clicking'''
- contact_point=self._measure_N_points(N=1, whatset=1)[0]
- contact_point_index=contact_point.index
- self.wlccontact_point=contact_point
- self.wlccontact_index=contact_point.index
- self.wlccurrent=self.current.path
- return contact_point, contact_point_index
-
-
- def baseline_points(self,peak_location, displayed_plot):
- clicks=self.config['baseline_clicks']
- if clicks==0:
- self.basepoints=[]
- base_index_0=peak_location[-1]+self.fit_interval_nm(peak_location[-1], displayed_plot, self.config['auto_right_baseline'],False)
- self.basepoints.append(self._clickize(displayed_plot.vectors[1][0],displayed_plot.vectors[1][1],base_index_0))
- base_index_1=self.basepoints[0].index+self.fit_interval_nm(self.basepoints[0].index, displayed_plot, self.config['auto_left_baseline'],False)
- self.basepoints.append(self._clickize(displayed_plot.vectors[1][0],displayed_plot.vectors[1][1],base_index_1))
- elif clicks>0:
- print 'Select baseline'
- if clicks==1:
- self.basepoints=self._measure_N_points(N=1, whatset=1)
- base_index_1=self.basepoints[0].index+self.fit_interval_nm(self.basepoints[0].index, displayed_plot, self.config['auto_left_baseline'], False)
- self.basepoints.append(self._clickize(displayed_plot.vectors[1][0],displayed_plot.vectors[1][1],base_index_1))
- else:
- self.basepoints=self._measure_N_points(N=2, whatset=1)
-
- self.basecurrent=self.current.path
- return self.basepoints
+ pv = scipy.polyfit(z_nc, d_nc, deg)
+ df = scipy.polyval(pv, z_nc)
+ err = numpy.sqrt((df-d_nc)**2).sum()
+ except Exception,e:
+ log.warn('failed to flatten with a degree %d polynomial: %s'
+ % (deg, e))
+ continue
+ if err < min_err: # new best fit
+ min_err = err
+ degree = deg
+ poly_values = pv
+
+ if degree == None:
+ raise Failure('failed to flatten with all degrees')
+ new.info[params['fit info name']] = {
+ 'error':min_err/len(z_nc),
+ 'degree':degree,
+ 'max degree':params['max degree'],
+ 'polynomial values':poly_values,
+ }
+ new[:,-1] = d_data - mask*scipy.polyval(poly_values, z_data)
+ params['curve'].data[params['block']] = new