1 # Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
3 # This file is part of tempcontrol.
5 # tempcontrol is free software: you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation, either
8 # version 3 of the License, or (at your option) any later version.
10 # tempcontrol is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with tempcontrol. If not, see
17 # <http://www.gnu.org/licenses/>.
19 import threading as _threading
22 from .. import LOG as _LOG
23 from . import Backend as _Backend
24 from . import ManualMixin as _ManualMixin
25 from . import PIDMixin as _PIDMixin
28 class TestBackend (_Backend, _ManualMixin, _PIDMixin):
29 """Test backend for demonstrating `Controller` function
31 The underlying temperature decay model is exponential, which is
32 often refered to as "Newton's law of cooling"
36 where `h` is the transfer coefficient (with some scaling terms
37 brushed under the rug). To make the system more realistic, I've
38 also added a dead time, so temperatures returned by `.get_temp()`
39 actually correspond to the system temperature `dead_time` seconds
40 before the measurement was taken. Finally, there's a
41 `drive_coefficient` `d`, which gives the rate of temperature
42 change due to a applied driving current `I`, so
44 dT(y)/dt = h (Tbath(t) - T(t)) + d I(t-L)
46 This model is often refered to as a FOPDT (first-order plus dead
47 time) or KLT (K: static gain, L: time delay, T: time constant/lag)
48 model. Translating our model above into process-control jargon:
50 * Process variable y(t) corresponds to our T(t).
51 * Manipulated variable u(t) corresponds to our I(t).
52 * Process gain dy/du (often denoted K_p). For our parameters
53 above, K_p = dy/du = dT/dI = d/h.
54 * Process time constant (aka lag, often denoted tau or T; the
55 exponential decay timescale). For our parameters above,
57 * Dead time (often denoted L or theta; the time delay between a
58 change system and that change being reflected in the process
59 variable). For our parameters above, L = dead_time.
61 The response function for a FOPDT process is
63 G(s) = K_p e^{-Ls} / (1 + T s)
65 For interesting experimental evidence of exponential cooling, see
66 Kaliszan et al., "Verification of the exponential model of body
67 temperature decrease after death in pigs".
68 doi: 10.1113/expphysiol.2005.030551
69 http://ep.physoc.org/content/90/5/727.long
70 September 1, 2005 Experimental Physiology, 90, 727-738.
72 def __init__(self, bath=20, transfer_coefficient=0.1,
73 drive_coefficient=1., max_current=1., dead_time=1.,
74 process_period=0.01, log_stream=None):
77 bath (ambient) temperature in degrees Celsius
78 transfer_coefficient : float
79 between the system and the bath, in inverse seconds
80 drive_coefficient : float
81 for the applied current, in degrees Celsius per amp
83 maximum current in amps
85 time lag in seconds between an internal system temperature
86 and the corresponding `.get_temp()` reading
87 process_period : float
88 time in seconds between process-thread temperature updates
91 self._transfer_coefficient = transfer_coefficient
92 self._drive_coefficient = drive_coefficient
93 self._max_current = max_current
94 self._dead_periods = int(dead_time/process_period)
95 self._process_period = process_period
96 self._log_stream = log_stream
98 self._i_term = self._d_term = 0
99 self._p_cool = self._d_cool = 0
100 self._p_heat = self._d_heat = 0
101 self._i_cool = self._i_heat = float('inf')
102 self._manual_current = 0
104 self._temperatures = [bath]*(self._dead_periods+1)
105 self._start_process_thread()
108 self._stop_process_thread()
110 def _start_process_thread(self):
111 self._stop_process = False
112 self._process_thread = _threading.Thread(
113 target=self._run_process, name='process')
114 self._process_thread.start()
116 def _stop_process_thread(self):
117 self._stop_process = True
118 self._process_thread.join()
120 def _run_process(self):
123 'time', 'setpoint', 'process temperature',
124 'measured temperature', 'dT_bath', 'dT_drive', 'current',
125 'intergal', 'derivative'))
126 self._log_stream.write('#{}\n'.format(line))
127 dt = self._process_period
128 next_time = _time.time() + dt
129 while not self._stop_process:
130 T = self._temperatures[-1]
131 dT_bath = self._transfer_coefficient * (self._bath - T)
132 current = self.get_current(_increment_i_term=True)
133 dT_drive = self._drive_coefficient * current
135 line = '\t'.join(str(x) for x in (
136 _time.time(), self._setpoint, T, self.get_temp(), dT_bath*dt,
137 dT_drive*dt, current, self._i_term, self._d_term))
138 self._log_stream.write(line + '\n')
139 T += (dT_bath + dT_drive) * dt
140 self._temperatures.pop(0)
141 self._temperatures.append(T)
142 s = next_time - _time.time()
147 def _limited_current(self, current):
148 if current > self._max_current:
149 #_LOG.debug('limiting current to maximum: {:n} -> {:n} amps'.format(
150 # current, self._max_current))
151 return self._max_current
152 elif current < -self._max_current:
153 #_LOG.debug('limiting current to maximum: {:n} -> {:n} amps'.format(
154 # current, -self._max_current))
155 return -self._max_current
159 return self._temperatures[1]
161 def get_ambient_temp(self):
164 def set_max_current(self, max):
165 self._max_current = max
167 def get_max_current(self):
168 return self._max_current
170 def get_current(self, _increment_i_term=True):
171 if self._mode == 'manual':
172 return self._manual_current
173 elif self._mode == 'PID':
174 T_pref,T = self._temperatures[:2]
175 dT_s = (self._setpoint - T)
176 if T > self._setpoint:
177 p,i,d = self._p_cool, self._i_cool, self._d_cool
179 p,i,d = self._p_heat, self._i_heat, self._d_heat
181 dt = self._process_period
182 if _increment_i_term is True:
183 self._i_term += dT_s * dt
184 self._d_term = -dT_t / dt # = de(t)/dt with constant setpoint
185 return self._limited_current(
186 p*(dT_s + self._i_term/i + d*self._d_term))
187 raise ValueError(self._mode)
190 return ['manual', 'PID']
195 def set_mode(self, mode):
198 # ManualMixin methods
200 def set_current(self, current):
201 self._manual_current = self._limited_current(current)
205 def set_setpoint(self, setpoint):
206 self._setpoint = setpoint
208 def get_setpoint(self):
209 return self._setpoint
211 def set_cooling_gains(self, proportional=None, integral=None,
213 if proportional is not None:
214 self._p_cool = proportional
215 if integral is not None:
216 self._i_cool = integral
217 if derivative is not None:
218 self._d_cool = derivative
220 def get_cooling_gains(self):
221 return (self._p_cool, self._i_cool, self._d_cool)
223 def set_heating_gains(self, proportional=None, integral=None,
225 if proportional is not None:
226 self._p_heat = proportional
227 if integral is not None:
228 self._i_heat = integral
229 if derivative is not None:
230 self._d_heat = derivative
232 def get_heating_gains(self):
233 return (self._p_heat, self._i_heat, self._d_heat)
235 def get_feedback_terms(self):
236 return (self.get_current(), self._setpoint - self.get_temp(),
237 self._i_term, self._d_term)
239 def clear_integral_term(self):