1 # Copyright (C) 2008-2011 W. Trevor King <wking@drexel.edu>
3 # This file is part of pypid.
5 # pypid is free software: you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation, either
8 # version 3 of the License, or (at your option) any later version.
10 # pypid is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with pypid. If not, see
17 # <http://www.gnu.org/licenses/>.
21 from aubio.aubioclass import pitch as _pitch
22 from aubio.aubioclass import fvec as _fvec
23 from numpy import array as _array
24 from numpy import exp as _exp
25 from numpy import linspace as _linspace
26 from numpy import log as _log
27 from scipy.interpolate import interp1d as _interp1d
29 from . import LOG as _LOG
30 from .fit import ModelFitter as _ModelFitter
33 class Controller (object):
34 """PID control frontend.
36 backend: pypid.backend.Backend instance
37 backend driving your particular harware
39 initial setpoint in PV-units
41 minimum PV in PV-units (for sanity checks)
43 maximum PV in PV-units (for sanity checks)
45 def __init__(self, backend, setpoint=0.0, min=-float('inf'),
47 self._backend = backend
48 self._setpoint = setpoint
52 # basic user interface methods
55 """Return the current process variable in PV-units
57 We should expose this to users, so they don't need to go
58 mucking about in `._backend`.
60 return self._backend.get_pv()
62 def set_pv(self, setpoint, **kwargs):
63 """Change setpoint to `setpoint` and wait for stability
65 self._backend.set_setpoint(setpoint)
66 self.wait_for_stability(setpoint=setpoint, **kwargs)
68 def wait_for_stability(self, setpoint, tolerance=0.3, time=10.,
69 timeout=-1, sleep_time=0.1, return_data=False):
70 """Wait until the PV is sufficiently stable
75 maximum allowed deviation from `setpoint` in PV-units
77 time the PV must remain in the allowed region before the
78 signal is delared "stable"
80 maximum time to wait for stability. Set to -1 to never
83 time in seconds to sleep between reads to avoid an
86 if true, also return a list of `(timestamp, PV)` tuples
89 Read the PV every `sleep_time` seconds until the PV has
90 remained within `tolerance` of `setpoint` for `time`. If the
91 stability criteria are met, return `True` (stable). If
92 `timeout` seconds pass before the criteria are met, return
95 _LOG.debug(('wait until the PV is stable at {:n} +/- {:n} C '
96 'for {:n} seconds').format(setpoint, tolerance, time))
100 start_time = _time.time()
101 stable_time = start_time + time
105 timeout_time = start_time + timeout
108 in_range = abs(PV - setpoint) < tolerance
114 _LOG.debug('process variable is stable')
116 break # in range for long enough
118 stable_time = t + time # reset target time
119 if timeout_time and t > timeout_time:
120 _LOG.debug('process variable is not stable')
122 _time.sleep(sleep_time)
124 return (stable, data)
127 def is_stable(self, setpoint, time, **kwargs):
128 return self.wait_for_stability(
129 setpoint=setpoint, time=time, timeout=time, **kwargs)
131 def estimate_pv_sensitivity(self, values=10, sleep_time=0.1,
138 if repeats == max_repeats:
144 if len(PVs) > values:
148 _time.sleep(sleep_time)
154 def check_feedback_terms(self):
155 """Check a backend's interpretation of its PID feedback terms.
157 Some backends provide an interface to read out their PID
158 feedback terms, but the interface is not always well
159 documented. This method reads out the terms, and compares
160 them with our own calculations (when possible) to test the
161 backend's interpretation.
163 c = self._backend.get_current()
164 pid,prop,ntgrl,deriv = self._backend.get_feedback_terms()
166 SP = self._backend.get_setpoint()
168 p,i,d = self._backend.get_down_gains()
170 p,i,d = self._backend.get_up_gains()
171 _LOG.info(('pid(read) {:n} =? sum(calc from terms) {:n} '
172 '=? cur(read) {:n} A').format(pid, prop+ntgrl+deriv, c))
173 _LOG.info('read: p {:n}, i {:n}, d {:n}'.format(p,i,d))
174 _LOG.info('calc: p {:n}'.format(p*(SP-PV)))
176 # tuning experiments and data processing
178 def get_step_response(self, mv_a, mv_b, sleep_time=0.1, stable_time=10.,
180 "Measure a step response for later analysis"
181 _LOG.debug('measure step response')
183 raise ValueError(kwargs)
184 kwargs['time'] = stable_time
185 kwargs['sleep_time'] = sleep_time
186 mode = self._backend.get_mode()
188 manual_mv = self._backend.get_mv()
190 self._backend.set_mode('manual')
191 _LOG.debug('set first current and wait for stability')
192 self._backend.set_mv(mv_a)
194 while not self.is_stable(pv_a, **kwargs):
196 _LOG.debug('stabilized at {:n} {} with {:n} {}'.format(
197 pv_a, self._backend.pv_units, mv_a, self._backend.mv_units))
198 _LOG.debug('set second MV and wait for stability')
200 start_time = _time.time()
201 self._backend.set_mv(mv_b)
204 stable,d = self.is_stable(pv_b, return_data=True, **kwargs)
209 _LOG.debug('stabilized at {:n} {} with {:n} {}'.format(
210 pv_b, self._backend.pv_units, mv_b, self._backend.mv_units))
212 self._backend.set_mv(manual_mv)
214 self._backend.set_mode(mode)
218 def analyze_step_response(step_response, mv_shift):
219 rates = [(PVb-PVa)/(tb-ta) for ((ta,PVa),(tb,PVb))
220 in zip(step_response, step_response[1:])]
221 # TODO: averaging filter?
222 max_rate_i = max_rate = 0
223 for i,rate in enumerate(rates):
224 if abs(rate) > max_rate: # handle steps in both directions
227 max_rate_time,max_rate_temp = step_response[max_rate_i] # TODO: avg i and i+1?
228 time_a,PV_a = step_response[0]
229 max_rate_time -= time_a
230 dead_time = max_rate_time - (max_rate_temp - PV_a) / max_rate
231 t_data = _array([t for t,PV in step_response[max_rate_i:]])
232 PV_data = _array([PV for t,PV in step_response[max_rate_i:]])
233 model = ExponentialModel(PV_data, info={'x data (s)': t_data})
234 decay_time,PV0,PV8 = model.fit()
235 process_gain = (PV8 - PV_a) / mv_shift
236 return (process_gain, dead_time, decay_time, max_rate)
238 def get_bang_bang_response(self, dead_band=0.8, num_oscillations=10,
239 max_dead_band_time=30, sleep_time=0.1):
240 orig_down_gains = self._backend.get_down_gains()
241 orig_up_gains = self._backend.get_up_gains()
242 _LOG.debug('measure bang-bang response')
243 mode = self._backend.get_mode()
245 self._backend.set_mode('PID')
247 setpoint = self._backend.get_setpoint()
248 self._backend.set_down_gains(float('inf'), float('inf'), 0)
249 self._backend.set_up_gains(float('inf'), float('inf'), 0)
250 start_time = _time.time()
252 under_first = self._is_under(
253 pv=pv, setpoint=setpoint, dead_band=dead_band)
254 _LOG.debug('wait to exit dead band')
256 while under_first is None:
257 if t - start_time > max_dead_band_time:
258 msg = 'still in dead band after after {:n} seconds'.format(
261 raise ValueError(msg)
262 _time.sleep(sleep_time)
265 under_first = self._is_under(
266 pv=pv, setpoint=setpoint, dead_band=dead_band)
267 _LOG.debug('read {:d} oscillations'.format(num_oscillations))
270 while i < num_oscillations*2 + 1:
273 # drop first half cycle (possibly includes ramp to setpoint)
276 _under = self._is_under(
277 temp=temp, setpoint=setpoint, dead_band=dead_band)
278 if _under is True and under is False:
279 _LOG.debug('transition to PV < SP (i={:d})'.format(i))
282 elif under is False and is_under is True:
283 _LOG.debug('transition to PV > SP (i={:d})'.format(i))
286 _time.sleep(sleep_time)
287 self._backend.set_down_gains(*orig_down_gains)
288 self._backend.set_up_gains(*orig_up_gains)
290 self._backend.set_mode(mode)
294 def analyze_bang_bang_response(bang_bang_response):
295 t_data = _array([t for t,PV in bang_bang_response])
296 PV_data = _array([PV for t,PV in bang_bang_response])
297 amp = (PV_data.max() - PV_data.min()) / 2
298 freq = Controller._get_frequency(x_data=t_data, y_data=PV_data)
302 def get_ultimate_cycle_response(self, proportional, period):
303 orig_down_gains = self._backend.get_down_gains()
304 orig_up_gains = self._backend.get_up_gains()
305 _LOG.debug('measure ultimate cycle response')
306 mode = self._backend.get_mode()
308 self._backend.set_mode('PID')
310 self._backend.set_down_gains(*orig_down_gains)
311 self._backend.set_up_gains(*orig_up_gains)
313 self._backend.set_mode(mode)
317 def analyze_ultimate_cycle_response(ultimate_cycle_response):
318 amp,period = Controller.analyze_bang_bang_response(
319 ultimate_cycle_response)
324 def _wait_until_close(self, setpoint, tolerance=0.3, sleep_time=0.1):
325 while abs(self.get_pv() - setpoint) > tolerance:
326 _time.sleep(sleep_time)
328 def _time_function(self, function, args=(), kwargs=None, count=10):
329 "Rough estimate timing of get_temp(), takes me about 0.1s"
333 for i in range(count):
334 function(*args, **kwargs)
336 return float(stop-start)/count
338 def _is_under(self, pv=None, setpoint=None, dead_band=None):
342 setpoint = self._backend.get_setpoint()
343 low_pv = high_pv = setpoint
353 def _select_parameter(self, under_result=None, over_result=None,
354 dead_band_result=None, **kwargs):
355 under = self._is_under(**kwargs)
360 return dead_band_result
363 def _resample_with_constant_dx(x_data, y_data):
364 f = _interp1d(x_data, y_data)
365 x = _linspace(x_data[0], x_data[-1], len(x_data))
370 def _get_frequency(x_data, y_data):
371 x,y = Controller._resample_with_constant_dx(x_data, y_data)
373 yvec = _fvec(len(y_data))
375 for i,_y in enumerate(y_data):
376 yvec.set(_y - mean, i)
377 fake_sample_rate = 8000 # aubio is built for audio
378 p = _pitch(mode='schmitt', bufsize=len(y_data), hopsize=len(y_data),
379 samplerate=fake_sample_rate, omode='freq', tolerance=0.1)
380 freq = p(yvec) / (fake_sample_rate * dx)
381 _LOG.debug('pitch: {:n}, sample rate {:n}'.format(freq, 1./dx))
387 def _check_range(value, min, max):
389 raise ValueError('%g < %g' % (value, min))
391 raise ValueError('%g > %g' % (value, max))
394 self._check_rangee(pv, self._min_pv, self._max_pv)
397 class ExponentialModel (_ModelFitter):
398 "Exponential decay model"
399 def model(self, params):
401 x_data = self.info['x data (s)']
402 x0 = x_data[0] # raw times in seconds are too far from the epoc
404 self._model_data[:] = y8*(1-a*_exp(-(x_data-x0)/tau))
405 return self._model_data
407 def guess_initial_params(self, outqueue=None, **kwargs):
408 x_data = self.info['x data (s)']
411 x_mid = x_data[int(len(x_data)/2)]
412 y_mid = y_data[int(len(y_data)/2)]
415 tau = (x_mid - x_start)/_log((y_start-y8)/(y_mid-y8))
416 return (tau, y_start, y8)
418 def guess_scale(self, params, outqueue=None, **kwargs):