1 # Copyright (C) 2011-2012 W. Trevor King <wking@tremily.us>
3 # This file is part of pypid.
5 # pypid is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
10 # pypid is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License along with
15 # pypid. If not, see <http://www.gnu.org/licenses/>.
19 from aubio.aubioclass import pitch as _pitch
20 from aubio.aubioclass import fvec as _fvec
21 from numpy import array as _array
22 from numpy import exp as _exp
23 from numpy import linspace as _linspace
24 from numpy import log as _log
25 from scipy.interpolate import interp1d as _interp1d
27 from . import LOG as _LOG
28 from .fit import ModelFitter as _ModelFitter
31 class Controller (object):
32 """PID control frontend.
34 backend: pypid.backend.Backend instance
35 backend driving your particular harware
37 initial setpoint in PV-units
39 minimum PV in PV-units (for sanity checks)
41 maximum PV in PV-units (for sanity checks)
43 def __init__(self, backend, setpoint=0.0, min=-float('inf'),
45 self._backend = backend
46 self._setpoint = setpoint
50 # basic user interface methods
53 """Return the current process variable in PV-units
55 We should expose this to users, so they don't need to go
56 mucking about in `._backend`.
58 return self._backend.get_pv()
60 def set_pv(self, setpoint, **kwargs):
61 """Change setpoint to `setpoint` and wait for stability
63 self._backend.set_setpoint(setpoint)
64 self.wait_for_stability(setpoint=setpoint, **kwargs)
66 def wait_for_stability(self, setpoint, tolerance=0.3, time=10.,
67 timeout=-1, sleep_time=0.1, return_data=False):
68 """Wait until the PV is sufficiently stable
73 maximum allowed deviation from `setpoint` in PV-units
75 time the PV must remain in the allowed region before the
76 signal is delared "stable"
78 maximum time to wait for stability. Set to -1 to never
81 time in seconds to sleep between reads to avoid an
84 if true, also return a list of `(timestamp, PV)` tuples
87 Read the PV every `sleep_time` seconds until the PV has
88 remained within `tolerance` of `setpoint` for `time`. If the
89 stability criteria are met, return `True` (stable). If
90 `timeout` seconds pass before the criteria are met, return
93 _LOG.debug(('wait until the PV is stable at {:n} +/- {:n} C '
94 'for {:n} seconds').format(setpoint, tolerance, time))
98 start_time = _time.time()
99 stable_time = start_time + time
103 timeout_time = start_time + timeout
106 in_range = abs(PV - setpoint) < tolerance
112 _LOG.debug('process variable is stable')
114 break # in range for long enough
116 stable_time = t + time # reset target time
117 if timeout_time and t > timeout_time:
118 _LOG.debug('process variable is not stable')
120 _time.sleep(sleep_time)
122 return (stable, data)
125 def is_stable(self, setpoint, time, **kwargs):
126 return self.wait_for_stability(
127 setpoint=setpoint, time=time, timeout=time, **kwargs)
129 def estimate_pv_sensitivity(self, values=10, sleep_time=0.1,
136 if repeats == max_repeats:
142 if len(PVs) > values:
146 _time.sleep(sleep_time)
152 def check_feedback_terms(self):
153 """Check a backend's interpretation of its PID feedback terms.
155 Some backends provide an interface to read out their PID
156 feedback terms, but the interface is not always well
157 documented. This method reads out the terms, and compares
158 them with our own calculations (when possible) to test the
159 backend's interpretation.
161 c = self._backend.get_current()
162 pid,prop,ntgrl,deriv = self._backend.get_feedback_terms()
164 SP = self._backend.get_setpoint()
166 p,i,d = self._backend.get_down_gains()
168 p,i,d = self._backend.get_up_gains()
169 _LOG.info(('pid(read) {:n} =? sum(calc from terms) {:n} '
170 '=? cur(read) {:n} A').format(pid, prop+ntgrl+deriv, c))
171 _LOG.info('read: p {:n}, i {:n}, d {:n}'.format(p,i,d))
172 _LOG.info('calc: p {:n}'.format(p*(SP-PV)))
174 # tuning experiments and data processing
176 def get_step_response(self, mv_a, mv_b, sleep_time=0.1, stable_time=10.,
178 "Measure a step response for later analysis"
179 _LOG.debug('measure step response')
181 raise ValueError(kwargs)
182 kwargs['time'] = stable_time
183 kwargs['sleep_time'] = sleep_time
184 mode = self._backend.get_mode()
186 manual_mv = self._backend.get_mv()
188 self._backend.set_mode('manual')
189 _LOG.debug('set first current and wait for stability')
190 self._backend.set_mv(mv_a)
192 while not self.is_stable(pv_a, **kwargs):
194 _LOG.debug('stabilized at {:n} {} with {:n} {}'.format(
195 pv_a, self._backend.pv_units, mv_a, self._backend.mv_units))
196 _LOG.debug('set second MV and wait for stability')
198 start_time = _time.time()
199 self._backend.set_mv(mv_b)
202 stable,d = self.is_stable(pv_b, return_data=True, **kwargs)
207 _LOG.debug('stabilized at {:n} {} with {:n} {}'.format(
208 pv_b, self._backend.pv_units, mv_b, self._backend.mv_units))
210 self._backend.set_mv(manual_mv)
212 self._backend.set_mode(mode)
216 def analyze_step_response(step_response, mv_shift):
217 rates = [(PVb-PVa)/(tb-ta) for ((ta,PVa),(tb,PVb))
218 in zip(step_response, step_response[1:])]
219 # TODO: averaging filter?
220 max_rate_i = max_rate = 0
221 for i,rate in enumerate(rates):
222 if abs(rate) > max_rate: # handle steps in both directions
225 max_rate_time,max_rate_temp = step_response[max_rate_i] # TODO: avg i and i+1?
226 time_a,PV_a = step_response[0]
227 max_rate_time -= time_a
228 dead_time = max_rate_time - (max_rate_temp - PV_a) / max_rate
229 t_data = _array([t for t,PV in step_response[max_rate_i:]])
230 PV_data = _array([PV for t,PV in step_response[max_rate_i:]])
231 model = ExponentialModel(PV_data, info={'x data (s)': t_data})
232 decay_time,PV0,PV8 = model.fit()
233 process_gain = (PV8 - PV_a) / mv_shift
234 return (process_gain, dead_time, decay_time, max_rate)
236 def get_bang_bang_response(self, dead_band=0.8, num_oscillations=10,
237 max_dead_band_time=30, sleep_time=0.1):
238 orig_down_gains = self._backend.get_down_gains()
239 orig_up_gains = self._backend.get_up_gains()
240 _LOG.debug('measure bang-bang response')
241 mode = self._backend.get_mode()
243 self._backend.set_mode('PID')
245 setpoint = self._backend.get_setpoint()
246 self._backend.set_down_gains(float('inf'), float('inf'), 0)
247 self._backend.set_up_gains(float('inf'), float('inf'), 0)
248 start_time = _time.time()
250 under_first = self._is_under(
251 pv=pv, setpoint=setpoint, dead_band=dead_band)
252 _LOG.debug('wait to exit dead band')
254 while under_first is None:
255 if t - start_time > max_dead_band_time:
256 msg = 'still in dead band after after {:n} seconds'.format(
259 raise ValueError(msg)
260 _time.sleep(sleep_time)
263 under_first = self._is_under(
264 pv=pv, setpoint=setpoint, dead_band=dead_band)
265 _LOG.debug('read {:d} oscillations'.format(num_oscillations))
268 while i < num_oscillations*2 + 1:
271 # drop first half cycle (possibly includes ramp to setpoint)
274 _under = self._is_under(
275 temp=temp, setpoint=setpoint, dead_band=dead_band)
276 if _under is True and under is False:
277 _LOG.debug('transition to PV < SP (i={:d})'.format(i))
280 elif under is False and is_under is True:
281 _LOG.debug('transition to PV > SP (i={:d})'.format(i))
284 _time.sleep(sleep_time)
285 self._backend.set_down_gains(*orig_down_gains)
286 self._backend.set_up_gains(*orig_up_gains)
288 self._backend.set_mode(mode)
292 def analyze_bang_bang_response(bang_bang_response):
293 t_data = _array([t for t,PV in bang_bang_response])
294 PV_data = _array([PV for t,PV in bang_bang_response])
295 amp = (PV_data.max() - PV_data.min()) / 2
296 freq = Controller._get_frequency(x_data=t_data, y_data=PV_data)
300 def get_ultimate_cycle_response(self, proportional, period):
301 orig_down_gains = self._backend.get_down_gains()
302 orig_up_gains = self._backend.get_up_gains()
303 _LOG.debug('measure ultimate cycle response')
304 mode = self._backend.get_mode()
306 self._backend.set_mode('PID')
308 self._backend.set_down_gains(*orig_down_gains)
309 self._backend.set_up_gains(*orig_up_gains)
311 self._backend.set_mode(mode)
315 def analyze_ultimate_cycle_response(ultimate_cycle_response):
316 amp,period = Controller.analyze_bang_bang_response(
317 ultimate_cycle_response)
322 def _wait_until_close(self, setpoint, tolerance=0.3, sleep_time=0.1):
323 while abs(self.get_pv() - setpoint) > tolerance:
324 _time.sleep(sleep_time)
326 def _time_function(self, function, args=(), kwargs=None, count=10):
327 "Rough estimate timing of get_temp(), takes me about 0.1s"
331 for i in range(count):
332 function(*args, **kwargs)
334 return float(stop-start)/count
336 def _is_under(self, pv=None, setpoint=None, dead_band=None):
340 setpoint = self._backend.get_setpoint()
341 low_pv = high_pv = setpoint
351 def _select_parameter(self, under_result=None, over_result=None,
352 dead_band_result=None, **kwargs):
353 under = self._is_under(**kwargs)
358 return dead_band_result
361 def _resample_with_constant_dx(x_data, y_data):
362 f = _interp1d(x_data, y_data)
363 x = _linspace(x_data[0], x_data[-1], len(x_data))
368 def _get_frequency(x_data, y_data):
369 x,y = Controller._resample_with_constant_dx(x_data, y_data)
371 yvec = _fvec(len(y_data))
373 for i,_y in enumerate(y_data):
374 yvec.set(_y - mean, i)
375 fake_sample_rate = 8000 # aubio is built for audio
376 p = _pitch(mode='schmitt', bufsize=len(y_data), hopsize=len(y_data),
377 samplerate=fake_sample_rate, omode='freq', tolerance=0.1)
378 freq = p(yvec) / (fake_sample_rate * dx)
379 _LOG.debug('pitch: {:n}, sample rate {:n}'.format(freq, 1./dx))
385 def _check_range(value, min, max):
387 raise ValueError('%g < %g' % (value, min))
389 raise ValueError('%g > %g' % (value, max))
392 self._check_rangee(pv, self._min_pv, self._max_pv)
395 class ExponentialModel (_ModelFitter):
396 "Exponential decay model"
397 def model(self, params):
399 x_data = self.info['x data (s)']
400 x0 = x_data[0] # raw times in seconds are too far from the epoc
402 self._model_data[:] = y8*(1-a*_exp(-(x_data-x0)/tau))
403 return self._model_data
405 def guess_initial_params(self, outqueue=None, **kwargs):
406 x_data = self.info['x data (s)']
409 x_mid = x_data[int(len(x_data)/2)]
410 y_mid = y_data[int(len(y_data)/2)]
413 tau = (x_mid - x_start)/_log((y_start-y8)/(y_mid-y8))
414 return (tau, y_start, y8)
416 def guess_scale(self, params, outqueue=None, **kwargs):