1 # Copyright (C) 2008-2011 W. Trevor King <wking@drexel.edu>
3 # This file is part of tempcontrol.
5 # tempcontrol is free software: you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation, either
8 # version 3 of the License, or (at your option) any later version.
10 # tempcontrol is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with tempcontrol. If not, see
17 # <http://www.gnu.org/licenses/>.
21 from aubio.aubioclass import pitch as _pitch
22 from aubio.aubioclass import fvec as _fvec
23 from numpy import array as _array
24 from numpy import exp as _exp
25 from numpy import linspace as _linspace
26 from numpy import log as _log
27 from scipy.interpolate import interp1d as _interp1d
29 from hooke.util.fit import ModelFitter as _ModelFitter
31 from . import LOG as _LOG
34 class Controller (object):
35 """PID temperature control frontend.
37 backend: tempcontrol.backend.Backend instance
38 backend driving your particular harware
40 initial setpoint in degrees Celsius
42 minimum temperature in degrees Celsius (for sanity checks)
44 maximum temperature in degrees Celsius (for sanity checks)
46 def __init__(self, backend, setpoint=20.0, min=5.0, max=50.0):
47 self._backend = backend
48 self._setpoint = setpoint
52 # basic user interface methods
55 """Return the current process temperature in degrees Celsius
57 We should expose this to users, so they don't need to go
58 mucking about in `._backend`.
60 return self._backend.get_temp()
62 def set_temp(self, setpoint, **kwargs):
63 """Change setpoint to `setpoint` and wait for stability
65 self._backend.set_setpoint(setpoint)
66 self.wait_for_stability(setpoint=setpoint, **kwargs)
68 def wait_for_stability(self, setpoint, tolerance=0.3, time=10.,
69 timeout=-1, sleep_time=0.1, return_data=False):
70 """Wait until the temperature is sufficiently stable
73 target temperature in degrees C
75 maximum allowed deviation from `setpoint` in dregrees C
77 time the temperature must remain in the allowed region
78 before the signal is delared "stable"
80 maximum time to wait for stability. Set to -1 to never
83 time in seconds to sleep between reads to avoid an
86 if true, also return a list of `(timestamp, temp)` tuples
89 Read the temperature every `sleep_time` seconds until the
90 temperature has remained within `tolerance` of `setpoint` for
91 `time`. If the stability criteria are met, return `True`
92 (stable). If `timeout` seconds pass before the criteria are
93 met, return `False` (not stable).
95 _LOG.debug(('wait until the temperature is stable at {:n} +/- {:n} C '
96 'for {:n} seconds').format(setpoint, tolerance, time))
100 start_time = _time.time()
101 stable_time = start_time + time
105 timeout_time = start_time + timeout
108 in_range = abs(T - setpoint) < tolerance
114 _LOG.debug('temperature is stable')
116 break # in range for long enough
118 stable_time = t + time # reset target time
119 if timeout_time and t > timeout_time:
121 _time.sleep(sleep_time)
123 return (stable, data)
126 def is_stable(self, setpoint, time, **kwargs):
127 return self.wait_for_stability(
128 setpoint=setpoint, time=time, timeout=time, **kwargs)
130 def estimate_temperature_sensitivity(self, num_temps=10, sleep_time=0.1,
136 temp = self.get_temp()
137 if repeats == max_repeats:
139 if temp == last_temp:
143 if len(temps) > num_temps:
147 _time.sleep(sleep_time)
148 temps = _array(temps)
153 def check_feedback_terms(self):
154 """Check a backend's interpretation of its PID feedback terms.
156 Some backends provide an interface to read out their PID
157 feedback terms, but the interface is not always well
158 documented. This method reads out the terms, and compares
159 them with our own calculations (when possible) to test the
160 backend's interpretation.
162 c = self._backend.get_current()
163 pid,prop,ntgrl,deriv = self._backend.get_feedback_terms()
165 Tset = self._backend.get_setpoint()
166 if T > Tset: # cooling
167 p,i,d = self._backend.get_cooling_gains()
169 p,i,d = self._backend.get_heating_gains()
170 _LOG.info(('pid(read) {:n} =? sum(calc from terms) {:n} '
171 '=? cur(read) {:n} A').format(pid, prop+ntgrl+deriv, c))
172 _LOG.info('read: p {:n}, i {:n}, d {:n}'.format(p,i,d))
173 _LOG.info('calc: p {:n}'.format(p*(Tset-T)))
175 # tuning experiments and data processing
177 def get_step_response(self, current_a, current_b,
178 sleep_time=0.1, stable_time=10., **kwargs):
179 "Measure a step response for later analysis"
180 _LOG.debug('measure step response')
182 raise ValueError(kwargs)
183 kwargs['time'] = stable_time
184 kwargs['sleep_time'] = sleep_time
185 mode = self._backend.get_mode()
187 manual_current = self._backend.get_current()
189 self._backend.set_mode('manual')
190 _LOG.debug('set first current and wait for stability')
191 self._backend.set_current(current_a)
192 temp_a = self.get_temp()
193 while not self.is_stable(temp_a, **kwargs):
194 temp_a = self.get_temp()
195 _LOG.debug('stabilized at {:n} C with {:n} amps'.format(
197 _LOG.debug('set second current and wait for stability')
199 start_time = _time.time()
200 self._backend.set_current(current_b)
203 stable,d = self.is_stable(temp_b, return_data=True, **kwargs)
205 temp_b = self.get_temp()
208 _LOG.debug('stabilized at {:n} C with {:n} amps'.format(
211 self._backend.set_current(manual_current)
213 self._backend.set_mode(mode)
217 def analyze_step_response(step_response, current_shift):
218 rates = [(Tb-Ta)/(tb-ta) for ((ta,Ta),(tb,Tb))
219 in zip(step_response, step_response[1:])]
220 # TODO: averaging filter?
221 max_rate_i = max_rate = 0
222 for i,rate in enumerate(rates):
223 if abs(rate) > max_rate: # handle steps in both directions
226 max_rate_time,max_rate_temp = step_response[max_rate_i] # TODO: avg i and i+1?
227 time_a,temp_a = step_response[0]
228 max_rate_time -= time_a
229 dead_time = max_rate_time - (max_rate_temp - temp_a) / max_rate
230 t_data = _array([t for t,T in step_response[max_rate_i:]])
231 T_data = _array([T for t,T in step_response[max_rate_i:]])
232 model = ExponentialModel(T_data, info={'x data (s)': t_data})
233 tau,T0,T8 = model.fit()
234 gain = (T8 - temp_a) / current_shift
235 return (gain, dead_time, tau, max_rate)
237 def get_bang_bang_response(self, dead_band=0.8, num_oscillations=10,
238 max_dead_band_time=30, sleep_time=0.1):
239 orig_cool_gains = self._backend.get_cooling_gains()
240 orig_heat_gains = self._backend.get_heating_gains()
241 _LOG.debug('measure bang-bang response')
242 mode = self._backend.get_mode()
244 self._backend.set_mode('PID')
246 setpoint = self._backend.get_setpoint()
247 self._backend.set_cooling_gains(float('inf'), float('inf'), 0)
248 self._backend.set_heating_gains(float('inf'), float('inf'), 0)
249 start_time = _time.time()
250 temp = self.get_temp()
251 heat_first = self._is_heating(
252 temp=temp, setpoint=setpoint, dead_band=dead_band)
253 _LOG.debug('wait to exit dead band')
255 while heat_first is None:
256 if t - start_time > max_dead_band_time:
257 msg = 'still in dead band after after {:n} seconds'.format(
260 raise ValueError(msg)
261 _time.sleep(sleep_time)
264 heat_first = self._is_heating(
265 temp=temp, setpoint=setpoint, dead_band=dead_band)
266 _LOG.debug('read {:d} oscillations'.format(num_oscillations))
269 while i < num_oscillations*2 + 1:
271 temp = self.get_temp()
272 # drop first half cycle (possibly includes ramp to setpoint)
274 data.append((t, temp))
275 is_heating = self._is_heating(
276 temp=temp, setpoint=setpoint, dead_band=dead_band)
277 if heating is True and is_heating is False:
278 _LOG.debug('transition to cooling (i={:d})'.format(i))
281 elif heating is False and is_heating is True:
282 _LOG.debug('transition to heating (i={:d})'.format(i))
285 _time.sleep(sleep_time)
286 self._backend.set_cooling_gains(*orig_cool_gains)
287 self._backend.set_heating_gains(*orig_heat_gains)
289 self._backend.set_mode(mode)
293 def analyze_bang_bang_response(bang_bang_response):
294 t_data = _array([t for t,T in bang_bang_response])
295 T_data = _array([T for t,T in bang_bang_response])
296 amp = (T_data.max() - T_data.min()) / 2
297 freq = Controller._get_frequency(x_data=t_data, y_data=T_data)
301 def get_ultimate_cycle_response(self, proportional, period):
302 orig_cool_gains = self._backend.get_cooling_gains()
303 orig_heat_gains = self._backend.get_heating_gains()
304 _LOG.debug('measure ultimate cycle response')
305 mode = self._backend.get_mode()
307 self._backend.set_mode('PID')
309 self._backend.set_cooling_gains(*orig_cool_gains)
310 self._backend.set_heating_gains(*orig_heat_gains)
312 self._backend.set_mode(mode)
316 def analyze_ultimate_cycle_response(ultimate_cycle_response):
317 amp,period = Controller.analyze_bang_bang_response(
318 ultimate_cycle_response)
324 def ziegler_nichols_step_response(gain, dead_time, tau, mode='PID'):
327 _LOG.warn(('Ziegler-Nichols not a good idea when '
328 'dead-time/tau = {:n}').format(r))
329 pkern = tau/(gain*dead_time)
331 return (pkern, float('inf'), 0)
333 return (0.9*pkern, 3.3*dead_time, 0)
335 return (1.2*pkern, 2*dead_time, dead_time/2.)
336 raise ValueError(mode)
338 def ziegler_nichols_bang_bang_response(self, amplitude, period,
339 max_current=None, mode='PID'):
340 if max_current is None:
341 max_current = self._backend.get_max_current()
342 return self._ziegler_nichols_bang_bang_response(
343 amplitude, period, max_current=max_current, mode=mode)
346 def _ziegler_nichols_bang_bang_response(amplitude, period,
347 max_current, mode='PID'):
350 center-to-peak amplitude (in K) of bang-bang oscillation
352 period (in seconds) of the critical oscillation
354 "bang" current (in amps)
356 proportional = float(max_current)/amplitude
357 period = float(period)
359 return (proportional/2, float('inf'), 0)
361 return (proportional/3, 2*period, 0)
363 return (proportional/2, period, period/4)
364 raise ValueError(mode)
366 def ziegler_nichols_ultimate_cycle_response(self, proportional, period):
369 critical P-only gain (ultimate gain, for sustained oscillation)
371 period (in seconds) of the critical oscillation
373 Microstar Laboratories has a `nice analysis`_ on ZN
374 limitations, which points out that ZN-tuning assumes your
375 system has the FOPDT transfer function (see `TestBackend` for
378 .. _nice analysis: http://www.mstarlabs.com/control/znrule.html
381 return (0.50*proportional, float('inf'), 0)
383 return (0.45*proportional, period/1.2, 0)
385 return (0.60*proportional, period/2, period/8)
386 raise ValueError(mode)
389 def cohen_coon_step_response(gain, dead_time, tau, mode='PID'):
391 pkern = tau/(gain*dead_time)
393 return (pkern*(1+r/3.), float('inf'), 0)
395 return (pkern*(0.9+r/12.), (30.+3*r)/(9+20*r)*dead_time, 0)
396 elif mode == 'PD': # double check
397 return (1.24*pkern*(1+0.13*tf), float('inf'),
398 (0.27-0.36*t)/(1-0.87*t)*dead_time)
400 return (pkern*(4./3+r/4.), (32.-6*r)/(13.-8*r)*dead_time,
401 4/(11.+2*r)*dead_time)
402 raise ValueError(mode)
405 def wang_juang_chan_step_response(gain, dead_time, tau, mode='PID'):
406 """Wang-Juang-Chan tuning
408 K,L,T = (gain, dead_time, tau)
410 return ((0.7303+0.5307*T/L)*(T+0.5*L)/(K*(T+L)),
412 0.5*L*T / (T + 0.5*L))
413 raise ValueError(mode)
417 def _wait_until_close(self, setpoint, tolerance=0.3, sleep_time=0.1):
418 while abs(self.get_temp() - setpoint) > tolerance:
419 _time.sleep(sleep_time)
421 def _time_function(self, function, args=(), kwargs=None, count=10):
422 "Rough estimate timing of get_temp(), takes me about 0.1s"
426 for i in range(count):
427 function(*args, **kwargs)
429 return float(stop-start)/count
431 def _is_heating(self, temp=None, setpoint=None, dead_band=None):
433 temp = self.get_temp()
435 temp = self._backend.get_setpoint()
436 low_temp = high_temp = setpoint
438 low_temp -= dead_band
439 high_temp += dead_band
442 elif temp > high_temp:
446 def _select_parameter(self, heating_result=None, cooling_result=None,
447 dead_band_result=None, **kwargs):
448 heating = self._is_heating(**kwargs)
450 return heating_result
451 elif heating is False:
452 return cooling_result
453 return dead_band_result
456 def _resample_with_constant_dx(x_data, y_data):
457 f = _interp1d(x_data, y_data)
458 x = _linspace(x_data[0], x_data[-1], len(x_data))
463 def _get_frequency(x_data, y_data):
464 x,y = Controller._resample_with_constant_dx(x_data, y_data)
466 yvec = _fvec(len(y_data))
468 for i,_y in enumerate(y_data):
469 yvec.set(_y - mean, i)
470 fake_sample_rate = 8000 # aubio is built for audio
471 p = _pitch(mode='schmitt', bufsize=len(y_data), hopsize=len(y_data),
472 samplerate=fake_sample_rate, omode='freq', tolerance=0.1)
473 freq = p(yvec) / (fake_sample_rate * dx)
474 _LOG.debug('pitch: {:n}, sample rate {:n}'.format(freq, 1./dx))
480 def _check_range(value, min, max):
482 raise ValueError('%g < %g' % (value, min))
484 raise ValueError('%g > %g' % (value, max))
486 def _check_temp(temp):
487 self._check_range(temp, self._min, self._max)
490 class ExponentialModel (_ModelFitter):
491 "Exponential decay model"
492 def model(self, params):
494 x_data = self.info['x data (s)']
495 x0 = x_data[0] # raw times in seconds are too far from the epoc
497 self._model_data[:] = y8*(1-a*_exp(-(x_data-x0)/tau))
498 return self._model_data
500 def guess_initial_params(self, outqueue=None, **kwargs):
501 x_data = self.info['x data (s)']
504 x_mid = x_data[int(len(x_data)/2)]
505 y_mid = y_data[int(len(y_data)/2)]
508 tau = (x_mid - x_start)/_log((y_start-y8)/(y_mid-y8))
509 return (tau, y_start, y8)
511 def guess_scale(self, params, outqueue=None, **kwargs):