1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This file is part of Hooke.
5 # Hooke is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # Hooke is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
13 # Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with Hooke. If not, see
17 # <http://www.gnu.org/licenses/>.
19 """Provide :class:`ModelFitter` to make arbitrary model fitting easy.
22 from numpy import arange, ndarray
23 from scipy.optimize import leastsq
27 class PoorFit (ValueError):
30 class ModelFitter (object):
31 """A convenient wrapper around :func:`scipy.optimize.leastsq`.
33 TODO: look into :mod:`scipy.odr` as an alternative fitting
34 algorithm (minimizing the sum of squares of orthogonal distances,
35 vs. minimizing y distances).
40 Deflection data to be analyzed for the contact position.
42 Store any extra information useful inside your overridden
45 Rescale parameters so the scale of each is 1.0. Also rescale
46 the data so data.std() == 1.0.
51 >>> from pprint import pprint
52 >>> from Queue import Queue
55 You'll want to subclass `ModelFitter`, overriding at least
56 `.model` and potentially the parameter and scale guessing
59 >>> class LinearModel (ModelFitter):
60 ... '''Simple linear model.
62 ... Levenberg-Marquardt is not how you want to solve this problem
63 ... for real systems, but it's a simple test case.
65 ... def model(self, params):
66 ... '''A linear model.
70 ... .. math:: y = p_0 x + p_1
72 ... p = params # convenient alias
73 ... self._model_data[:] = p[0]*arange(len(self._data)) + p[1]
74 ... return self._model_data
75 ... def guess_initial_params(self, outqueue=None):
76 ... return [float(self._data[-1] - self._data[0])/len(self._data),
78 ... def guess_scale(self, params, outqueue=None):
79 ... slope_scale = params[0]/10.
80 ... if slope_scale == 0: # data is expected to be flat
81 ... slope_scale = float(self._data.max()-self._data.min())/len(self._data)
82 ... if slope_scale == 0: # data is completely flat
84 ... offset_scale = self._data.std()/10.0
85 ... if offset_scale == 0: # data is completely flat
87 ... return [slope_scale, offset_scale]
88 >>> data = 20*numpy.sin(arange(1000)) + 7.*arange(1000) - 33.0
89 >>> m = LinearModel(data)
90 >>> outqueue = Queue()
91 >>> slope,offset = m.fit(outqueue=outqueue)
92 >>> info = outqueue.get(block=False)
93 >>> pprint(info) # doctest: +ELLIPSIS, +REPORT_UDIFF
94 {'active fitted parameters': array([ 6.999..., -32.889...]),
95 'active parameters': array([ 6.999..., -32.889...]),
96 'active scale': [0.699..., 202.071...],
97 'convergence flag': 2,
98 'covariance matrix': array([[ 1.199...e-08, -5.993...e-06],
99 [ -5.993...e-06, 3.994...e-03]]),
100 'data scale factor': 1.0,
101 'fitted parameters': array([ 6.999..., -32.889...]),
102 'info': {'fjac': array([[...]]),
103 'fvec': array([...]),
104 'ipvt': array([1, 2]),
106 'qtf': array([ 2.851...e-07, 1.992...e-06])},
107 'initial parameters': [6.992..., -33.0],
108 'message': 'The relative error between two consecutive iterates is at most 0.000...',
110 'scale': [0.699..., 202.071...]}
112 We round the outputs to protect the doctest against differences in
113 machine rounding during computation. We expect the values to be close
114 to the input settings (slope 7, offset -33).
116 >>> print '%.3f' % slope
118 >>> print '%.3f' % offset
121 The offset is a bit off because, the range is not a multiple of
124 We could also use rescaled parameters:
126 >>> m = LinearModel(data, rescale=True)
127 >>> outqueue = Queue()
128 >>> slope,offset = m.fit(outqueue=outqueue)
129 >>> print '%.3f' % slope
131 >>> print '%.3f' % offset
134 def __init__(self, *args, **kwargs):
135 self.set_data(*args, **kwargs)
137 def set_data(self, data, info=None, rescale=False):
139 self._model_data = ndarray(shape=data.shape, dtype=data.dtype)
141 self._rescale = rescale
143 for x in [data.std(), data.max()-data.min(), abs(data.max()), 1.0]:
145 self._data_scale_factor = x
148 self._data_scale_factor = 1.0
150 def model(self, params):
151 p = params # convenient alias
152 self._model_data[:] = arange(len(self._data))
153 raise NotImplementedError
155 def guess_initial_params(self, outqueue=None):
158 def guess_scale(self, params, outqueue=None):
161 def residual(self, params):
162 if self._rescale == True:
163 params = [p*s for p,s in zip(params, self._param_scale_factors)]
164 residual = self._data - self.model(params)
165 if self._rescale == True:
166 residual /= self._data_scale_factor
169 def fit(self, initial_params=None, scale=None, outqueue=None, **kwargs):
173 initial_params : iterable or None
174 Initial parameter values for residual minimization. If
175 `None`, they are estimated from the data using
176 :meth:`guess_initial_params`.
177 scale : iterable or None
178 Parameter length scales for residual minimization. If
179 `None`, they are estimated from the data using
181 outqueue : Queue or None
182 If given, will be used to output the data and fitted model
183 for user verification.
185 Any additional arguments are passed through to `leastsq`.
187 if initial_params == None:
188 initial_params = self.guess_initial_params(outqueue)
190 scale = self.guess_scale(initial_params, outqueue)
191 assert min(scale) > 0, scale
192 if self._rescale == True:
193 self._param_scale_factors = scale
194 active_scale = [1.0 for s in scale]
195 active_params = [p/s for p,s in zip(initial_params,
196 self._param_scale_factors)]
199 active_params = initial_params
200 params,cov,info,mesg,ier = leastsq(
201 func=self.residual, x0=active_params, full_output=True,
202 diag=active_scale, **kwargs)
203 if self._rescale == True:
204 active_params = params
205 if len(initial_params) == 1: # params is a float
206 params = params * self._param_scale_factors[0]
208 params = [p*s for p,s in zip(params,
209 self._param_scale_factors)]
211 active_params = params
214 'rescaled': self._rescale,
215 'initial parameters': initial_params,
216 'active parameters': active_params,
218 'active scale': active_scale,
219 'data scale factor': self._data_scale_factor,
220 'active fitted parameters': active_params,
221 'fitted parameters': params,
222 'covariance matrix': cov,
225 'convergence flag': ier,
229 # Example ORD code from the old fit.py
230 # def dist(px,py,linex,liney):
231 # distancesx=scipy.array([(px-x)**2 for x in linex])
232 # minindex=numpy.argmin(distancesx)
233 # print px, linex[0], linex[-1]
234 # return (py-liney[minindex])**2
237 # def f_wlc(params,x,T=T):
239 # wlc function for ODR fitting
244 # y=(therm*pii/4.0) * (((1-(x*lambd))**-2) - 1 + (4*x*lambd))
247 # def f_wlc_plfix(params,x,pl_value=pl_value,T=T):
249 # wlc function for ODR fitting
255 # y=(therm*pii/4.0) * (((1-(x*lambd))**-2) - 1 + (4*x*lambd))
259 # realdata=scipy.odr.RealData(xchunk_corr_up,ychunk_corr_up)
261 # model=scipy.odr.Model(f_wlc_plfix)
262 # o = scipy.odr.ODR(realdata, model, p0_plfix)
264 # model=scipy.odr.Model(f_wlc)
265 # o = scipy.odr.ODR(realdata, model, p0)
267 # o.set_job(fit_type=2)
269 # fit_out=[(1/i) for i in out.beta]
271 # #Calculate fit errors from output standard deviations.
272 # #We must propagate the error because we fit the *inverse* parameters!
273 # #The error = (error of the inverse)*(value**2)
275 # for sd,value in zip(out.sd_beta, fit_out):
276 # err_real=sd*(value**2)
277 # fit_errors.append(err_real)