1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
3 # This file is part of Hooke.
5 # Hooke is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # Hooke is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
13 # Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with Hooke. If not, see
17 # <http://www.gnu.org/licenses/>.
19 """Provide :class:`ModelFitter` to make arbitrary model fitting easy.
22 from numpy import arange, ndarray
23 from scipy.optimize import leastsq
27 class PoorFit (ValueError):
30 class ModelFitter (object):
31 """A convenient wrapper around :func:`scipy.optimize.leastsq`.
33 TODO: look into :mod:`scipy.odr` as an alternative fitting
34 algorithm (minimizing the sum of squares of orthogonal distances,
35 vs. minimizing y distances).
40 Deflection data to be analyzed for the contact position.
42 Store any extra information useful inside your overridden
45 Rescale parameters so the scale of each is 1.0. Also rescale
46 the data so data.std() == 1.0.
51 >>> from pprint import pprint
52 >>> from Queue import Queue
55 You'll want to subclass `ModelFitter`, overriding at least
56 `.model` and potentially the parameter and scale guessing
59 >>> class LinearModel (ModelFitter):
60 ... '''Simple linear model.
62 ... Levenberg-Marquardt is not how you want to solve this problem
63 ... for real systems, but it's a simple test case.
65 ... def model(self, params):
66 ... '''A linear model.
70 ... .. math:: y = p_0 x + p_1
72 ... p = params # convenient alias
73 ... self._model_data[:] = p[0]*arange(len(self._data)) + p[1]
74 ... return self._model_data
75 ... def guess_initial_params(self, outqueue=None):
76 ... return [float(self._data[-1] - self._data[0])/len(self._data),
78 ... def guess_scale(self, params, outqueue=None):
79 ... slope_scale = params[0]/10.
80 ... if slope_scale == 0: # data is expected to be flat
81 ... slope_scale = float(self._data.max()-self._data.min())/len(self._data)
82 ... if slope_scale == 0: # data is completely flat
84 ... offset_scale = self._data.std()/10.0
85 ... if offset_scale == 0: # data is completely flat
87 ... return [slope_scale, offset_scale]
88 >>> data = 20*numpy.sin(arange(1000)) + 7.*arange(1000) - 33.0
89 >>> m = LinearModel(data)
90 >>> outqueue = Queue()
91 >>> slope,offset = m.fit(outqueue=outqueue)
92 >>> info = outqueue.get(block=False)
93 >>> pprint(info) # doctest: +ELLIPSIS, +REPORT_UDIFF
94 {'active fitted parameters': array([ 6.999..., -32.889...]),
95 'active parameters': array([ 6.999..., -32.889...]),
96 'active scale': [0.699..., 202.071...],
97 'convergence flag': 2,
98 'covariance matrix': array([[ 1.199...e-08, -5.993...e-06],
99 [ -5.993...e-06, 3.994...e-03]]),
100 'data scale factor': 1.0,
101 'fitted parameters': array([ 6.999..., -32.889...]),
102 'info': {'fjac': array([[...]]),
103 'fvec': array([...]),
104 'ipvt': array([1, 2]),
106 'qtf': array([ 2.851...e-07, 1.992...e-06])},
107 'initial parameters': [6.992..., -33.0],
108 'message': 'The relative error between two consecutive iterates is at most 0.000...',
110 'scale': [0.699..., 202.071...]}
112 We round the outputs to protect the doctest against differences in
113 machine rounding during computation. We expect the values to be close
114 to the input settings (slope 7, offset -33).
116 >>> print '%.3f' % slope
118 >>> print '%.3f' % offset
121 The offset is a bit off because, the range is not a multiple of
124 We could also use rescaled parameters:
126 >>> m = LinearModel(data, rescale=True)
127 >>> outqueue = Queue()
128 >>> slope,offset = m.fit(outqueue=outqueue)
129 >>> print '%.3f' % slope
131 >>> print '%.3f' % offset
134 def __init__(self, *args, **kwargs):
135 self.set_data(*args, **kwargs)
137 def set_data(self, data, info=None, rescale=False):
139 self._model_data = ndarray(shape=data.shape, dtype=data.dtype)
141 self._rescale = rescale
143 self._data_scale_factor = data.std()
145 self._data_scale_factor = 1.0
147 def model(self, params):
148 p = params # convenient alias
149 self._model_data[:] = arange(len(self._data))
150 raise NotImplementedError
152 def guess_initial_params(self, outqueue=None):
155 def guess_scale(self, params, outqueue=None):
158 def residual(self, params):
159 if self._rescale == True:
160 params = [p*s for p,s in zip(params, self._param_scale_factors)]
161 residual = self._data - self.model(params)
162 if self._rescale == True or False:
163 residual /= self._data_scale_factor
166 def fit(self, initial_params=None, scale=None, outqueue=None, **kwargs):
170 initial_params : iterable or None
171 Initial parameter values for residual minimization. If
172 `None`, they are estimated from the data using
173 :meth:`guess_initial_params`.
174 scale : iterable or None
175 Parameter length scales for residual minimization. If
176 `None`, they are estimated from the data using
178 outqueue : Queue or None
179 If given, will be used to output the data and fitted model
180 for user verification.
182 Any additional arguments are passed through to `leastsq`.
184 if initial_params == None:
185 initial_params = self.guess_initial_params(outqueue)
187 scale = self.guess_scale(initial_params, outqueue)
188 assert min(scale) > 0, scale
189 if self._rescale == True:
190 self._param_scale_factors = scale
191 active_scale = [1.0 for s in scale]
192 active_params = [p/s for p,s in zip(initial_params,
193 self._param_scale_factors)]
196 active_params = initial_params
197 params,cov,info,mesg,ier = leastsq(
198 func=self.residual, x0=active_params, full_output=True,
199 diag=active_scale, **kwargs)
200 if self._rescale == True:
201 active_params = params
202 if len(initial_params) == 1: # params is a float
203 params = params * self._param_scale_factors[0]
205 params = [p*s for p,s in zip(params,
206 self._param_scale_factors)]
208 active_params = params
211 'rescaled': self._rescale,
212 'initial parameters': initial_params,
213 'active parameters': active_params,
215 'active scale': active_scale,
216 'data scale factor': self._data_scale_factor,
217 'active fitted parameters': active_params,
218 'fitted parameters': params,
219 'covariance matrix': cov,
222 'convergence flag': ier,
226 # Example ORD code from the old fit.py
227 # def dist(px,py,linex,liney):
228 # distancesx=scipy.array([(px-x)**2 for x in linex])
229 # minindex=numpy.argmin(distancesx)
230 # print px, linex[0], linex[-1]
231 # return (py-liney[minindex])**2
234 # def f_wlc(params,x,T=T):
236 # wlc function for ODR fitting
241 # y=(therm*pii/4.0) * (((1-(x*lambd))**-2) - 1 + (4*x*lambd))
244 # def f_wlc_plfix(params,x,pl_value=pl_value,T=T):
246 # wlc function for ODR fitting
252 # y=(therm*pii/4.0) * (((1-(x*lambd))**-2) - 1 + (4*x*lambd))
256 # realdata=scipy.odr.RealData(xchunk_corr_up,ychunk_corr_up)
258 # model=scipy.odr.Model(f_wlc_plfix)
259 # o = scipy.odr.ODR(realdata, model, p0_plfix)
261 # model=scipy.odr.Model(f_wlc)
262 # o = scipy.odr.ODR(realdata, model, p0)
264 # o.set_job(fit_type=2)
266 # fit_out=[(1/i) for i in out.beta]
268 # #Calculate fit errors from output standard deviations.
269 # #We must propagate the error because we fit the *inverse* parameters!
270 # #The error = (error of the inverse)*(value**2)
272 # for sd,value in zip(out.sd_beta, fit_out):
273 # err_real=sd*(value**2)
274 # fit_errors.append(err_real)