1 # Copyright (C) 2011-2012 W. Trevor King <wking@tremily.us>
3 # This file is part of pypid.
5 # pypid is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
10 # pypid is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License along with
15 # pypid. If not, see <http://www.gnu.org/licenses/>.
17 import threading as _threading
20 from .. import LOG as _LOG
21 from . import Backend as _Backend
22 from . import ManualMixin as _ManualMixin
23 from . import PIDMixin as _PIDMixin
26 class TestBackend (_Backend, _ManualMixin, _PIDMixin):
27 """Test backend for demonstrating `Controller` function
29 The response function for a first-order plus dead time process
30 (aka FOPDT, or occasionaly KLT), one of control theory's classics,
33 G(s) = Y(s)/X(s) = K_p e^{-Ls} / (1 + T s)
35 Going back to the time domain with the inverse Laplace transform:
37 (1 + Ts)Y(s) = K_p e^{-Ls} X(s) Laplace transform rule:
38 y(t) + T dy(t)/dt = K_p e^{-Ls} X(s) f'(t) -> sF(s) + f(0)
39 y(t) + T dy(t)/dt = K_p x(t-L)u(t-L) f(t-a)u(t-a) -> e^{-as}F(s)
40 dy(t)/dt = -y(t)/T + K_p/T x(t-L)u(t-L)
42 This may look more familiar to those of us more used to
43 differential equations than we are to transfer functions. There
44 is an exponential decay to y=0 with a time constant T, with an
45 external driver that has a gain of K_p and a lag (dead time) of L.
49 * K_p: y-units over x-units (e.g. K/amp for a thermoelectric controller)
50 * L, T: time (e.g. seconds)
52 Just to flesh out the usual jargon, x is refered to as the
53 manipulated variable (MV) and y is the process variable (PV).
55 Because transfer functions treats the initial conditions
56 (e.g. x(t=0)), we can add them back in if we want to create a more
57 general y(t) having the same dynamics.
59 dy(t)/dt = (y_b(t) - y(t))/T + K_p/T x(t-L)
61 where I've assumed that x(t<0)=0 in order to drop the Heaviside
62 step function. Now instead of decaying to zero in the absence of
63 a driving signal, y(t) will decay to a bath value y_b(t).
65 For interesting experimental evidence of exponential cooling, see
66 Kaliszan et al., "Verification of the exponential model of body
67 temperature decrease after death in pigs".
68 doi: 10.1113/expphysiol.2005.030551
69 http://ep.physoc.org/content/90/5/727.long
70 September 1, 2005 Experimental Physiology, 90, 727-738.
72 def __init__(self, bath=0., process_gain=1., dead_time=1.,
73 decay_time=1., max_mv=1., process_period=0.01,
77 bath (ambient) process variable in PV units
79 gain between manipulated variable MV and PV, in PV-units/MV-units
81 time lag between a PV and the corresponding MV response.
83 exponential decay time constant for the undriven PV
85 maximum MV in MV-units
86 process_period : float
87 time between process-thread PV updates
89 All times (`dead_time`, `decay_time`, and `process_period`)
90 should be in the same units (e.g. seconds).
92 self._bath = self._setpoint = bath
93 self._K = process_gain
97 self._dead_periods = int(dead_time/process_period)
98 self._process_period = process_period
99 self._log_stream = log_stream
100 self._i_term = self._d_term = 0
101 self._p_down = self._d_down = 0
102 self._p_up = self._d_up = 0
103 self._i_down = self._i_up = float('inf')
106 self._PVs = [bath]*(self._dead_periods+1)
107 self._start_process_thread()
110 self._stop_process_thread()
112 def _start_process_thread(self):
113 self._stop_process = False
114 self._process_thread = _threading.Thread(
115 target=self._run_process, name='process')
116 self._process_thread.start()
118 def _stop_process_thread(self):
119 self._stop_process = True
120 self._process_thread.join()
122 def _run_process(self):
125 'time', 'SP', 'PV_int', 'PV', 'dPV_bath', 'dPV_drive',
126 'MV', 'intergal', 'derivative'))
127 self._log_stream.write('#{}\n'.format(line))
128 dt = self._process_period
129 next_time = _time.time() + dt
130 while not self._stop_process:
132 dPV_bath = (self._bath - PV)/self._T * dt
133 MV = self.get_mv(_increment_i_term=True)
134 dPV_drive = self._K * MV / self._T * dt
136 line = '\t'.join(str(x) for x in (
137 _time.time(), self._setpoint, PV, self.get_pv(),
138 dPV_bath, dPV_drive, MV, self._i_term, self._d_term))
139 self._log_stream.write(line + '\n')
140 PV += dPV_bath + dPV_drive
143 s = next_time - _time.time()
148 def _limited_mv(self, mv):
149 if mv > self._max_mv:
150 #_LOG.debug('limiting mv to maximum: {:n} -> {:n} amps'.format(
153 elif mv < -self._max_mv:
154 #_LOG.debug('limiting mv to maximum: {:n} -> {:n} amps'.format(
155 # mv, -self._max_mv))
162 def get_ambient_pv(self):
165 def set_max_mv(self, max):
168 def get_max_mv(self):
171 def get_mv(self, _increment_i_term=True):
172 if self._mode == 'manual':
173 return self._manual_mv
174 elif self._mode == 'PID':
175 PV_prev,PV = self._PVs[:2]
176 dPV_s = (self._setpoint - PV)
177 if PV > self._setpoint:
178 p,i,d = self._p_down, self._i_down, self._d_down
180 p,i,d = self._p_up, self._i_up, self._d_up
182 dt = self._process_period
183 if _increment_i_term is True:
184 self._i_term += dPV_s * dt
185 self._d_term = -dPV_t / dt # = de(t)/dt with constant setpoint
186 return self._limited_mv(
187 p*(dPV_s + self._i_term/i + d*self._d_term))
188 raise ValueError(self._mode)
191 return ['manual', 'PID']
196 def set_mode(self, mode):
199 # ManualMixin methods
201 def set_mv(self, mv):
202 self._manual_mv = self._limited_mv(mv)
206 def set_setpoint(self, setpoint):
207 self._setpoint = setpoint
209 def get_setpoint(self):
210 return self._setpoint
212 def set_down_gains(self, proportional=None, integral=None,
214 if proportional is not None:
215 self._p_down = proportional
216 if integral is not None:
217 self._i_down = integral
218 if derivative is not None:
219 self._d_down = derivative
221 def get_down_gains(self):
222 return (self._p_down, self._i_down, self._d_down)
224 def set_up_gains(self, proportional=None, integral=None,
226 if proportional is not None:
227 self._p_up = proportional
228 if integral is not None:
229 self._i_up = integral
230 if derivative is not None:
231 self._d_up = derivative
233 def get_up_gains(self):
234 return (self._p_up, self._i_up, self._d_up)
236 def get_feedback_terms(self):
237 return (self.get_mv(), self._setpoint - self.get_pv(),
238 self._i_term, self._d_term)
240 def clear_integral_term(self):