Rewrite with a more modular structure.
[pypid.git] / tempcontrol / backend / test.py
1 # Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
2 #
3 # This file is part of tempcontrol.
4 #
5 # tempcontrol is free software: you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation, either
8 # version 3 of the License, or (at your option) any later version.
9 #
10 # tempcontrol is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with tempcontrol.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 import threading as _threading
20 import time as _time
21
22 from .. import LOG as _LOG
23 from . import Backend as _Backend
24 from . import ManualMixin as _ManualMixin
25 from . import PIDMixin as _PIDMixin
26
27
28 class TestBackend (_Backend, _ManualMixin, _PIDMixin):
29     """Test backend for demonstrating `Controller` function
30
31     The underlying temperature decay model is exponential, which is
32     often refered to as "Newton's law of cooling"
33
34         dT/dt = h (Tbath - T)
35
36     where `h` is the transfer coefficient (with some scaling terms
37     brushed under the rug).  To make the system more realistic, I've
38     also added a dead time, so temperatures returned by `.get_temp()`
39     actually correspond to the system temperature `dead_time` seconds
40     before the measurement was taken.  Finally, there's a
41     `drive_coefficient` `d`, which gives the rate of temperature
42     change due to a applied driving current `I`, so
43
44         dT(y)/dt = h (Tbath(t) - T(t)) + d I(t-L)
45
46     This model is often refered to as a FOPDT (first-order plus dead
47     time) or KLT (K: static gain, L: time delay, T: time constant/lag)
48     model.  Translating our model above into process-control jargon:
49
50     * Process variable y(t) corresponds to our T(t).
51     * Manipulated variable u(t) corresponds to our I(t).
52     * Process gain dy/du (often denoted K_p).  For our parameters
53       above, K_p = dy/du = dT/dI = d/h.
54     * Process time constant (aka lag, often denoted tau or T; the
55       exponential decay timescale).  For our parameters above,
56       tau = 1/h.
57     * Dead time (often denoted L or theta; the time delay between a
58       change system and that change being reflected in the process
59       variable).  For our parameters above, L = dead_time.
60
61     The response function for a FOPDT process is
62
63       G(s) = K_p e^{-Ls} / (1 + T s)
64
65     For interesting experimental evidence of exponential cooling, see
66     Kaliszan et al., "Verification of the exponential model of body
67     temperature decrease after death in pigs".
68     doi: 10.1113/expphysiol.2005.030551
69     http://ep.physoc.org/content/90/5/727.long
70     September 1, 2005 Experimental Physiology, 90, 727-738.
71     """
72     def __init__(self, bath=20, transfer_coefficient=0.1,
73                  drive_coefficient=1., max_current=1., dead_time=1.,
74                  process_period=0.01, log_stream=None):
75         """
76         bath : float
77             bath (ambient) temperature in degrees Celsius
78         transfer_coefficient : float
79             between the system and the bath, in inverse seconds
80         drive_coefficient : float
81             for the applied current, in degrees Celsius per amp
82         max_current : float
83             maximum current in amps
84         dead_time : float
85             time lag in seconds between an internal system temperature
86             and the corresponding `.get_temp()` reading
87         process_period : float
88             time in seconds between process-thread temperature updates
89         """
90         self._bath = bath
91         self._transfer_coefficient = transfer_coefficient
92         self._drive_coefficient = drive_coefficient
93         self._max_current = max_current
94         self._dead_periods = int(dead_time/process_period)
95         self._process_period = process_period
96         self._log_stream = log_stream
97         self._setpoint = 0
98         self._i_term = self._d_term = 0
99         self._p_cool = self._d_cool = 0
100         self._p_heat = self._d_heat = 0
101         self._i_cool = self._i_heat = float('inf')
102         self._manual_current = 0
103         self._mode = 'PID'
104         self._temperatures = [bath]*(self._dead_periods+1)
105         self._start_process_thread()
106
107     def cleanup(self):
108         self._stop_process_thread()
109
110     def _start_process_thread(self):
111         self._stop_process = False
112         self._process_thread = _threading.Thread(
113             target=self._run_process, name='process')
114         self._process_thread.start()
115
116     def _stop_process_thread(self):
117         self._stop_process = True
118         self._process_thread.join()
119
120     def _run_process(self):
121         if self._log_stream:
122             line = '\t'.join((
123                     'time', 'setpoint', 'process temperature',
124                     'measured temperature', 'dT_bath', 'dT_drive', 'current',
125                     'intergal', 'derivative'))
126             self._log_stream.write('#{}\n'.format(line))
127         dt = self._process_period
128         next_time = _time.time() + dt
129         while not self._stop_process:
130             T = self._temperatures[-1]
131             dT_bath = self._transfer_coefficient * (self._bath - T)
132             current = self.get_current(_increment_i_term=True)
133             dT_drive = self._drive_coefficient * current
134             if self._log_stream:
135                 line = '\t'.join(str(x) for x in (
136                     _time.time(), self._setpoint, T, self.get_temp(), dT_bath*dt,
137                     dT_drive*dt, current, self._i_term, self._d_term))
138                 self._log_stream.write(line + '\n')
139             T += (dT_bath + dT_drive) * dt
140             self._temperatures.pop(0)
141             self._temperatures.append(T)
142             s = next_time - _time.time()
143             if s > 0:
144                 _time.sleep(s)
145             next_time += dt
146
147     def _limited_current(self, current):
148         if current > self._max_current:
149             #_LOG.debug('limiting current to maximum: {:n} -> {:n} amps'.format(
150             #        current, self._max_current))
151             return self._max_current
152         elif current < -self._max_current:
153             #_LOG.debug('limiting current to maximum: {:n} -> {:n} amps'.format(
154             #        current, -self._max_current))
155             return -self._max_current
156         return current
157
158     def get_temp(self):
159         return self._temperatures[1]
160
161     def get_ambient_temp(self):
162         return self._bath
163
164     def set_max_current(self, max):
165         self._max_current = max
166
167     def get_max_current(self):
168         return self._max_current
169
170     def get_current(self, _increment_i_term=True):
171         if self._mode == 'manual':
172             return self._manual_current
173         elif self._mode == 'PID':
174             T_pref,T = self._temperatures[:2]
175             dT_s = (self._setpoint - T)
176             if T > self._setpoint:
177                 p,i,d = self._p_cool, self._i_cool, self._d_cool
178             else:
179                 p,i,d = self._p_heat, self._i_heat, self._d_heat
180             dT_t = T - T_pref
181             dt = self._process_period
182             if _increment_i_term is True:
183                 self._i_term += dT_s * dt
184             self._d_term = -dT_t / dt  # = de(t)/dt with constant setpoint
185             return self._limited_current(
186                 p*(dT_s + self._i_term/i + d*self._d_term))
187         raise ValueError(self._mode)
188
189     def get_modes(self):
190         return ['manual', 'PID']
191
192     def get_mode(self):
193         return self._mode
194
195     def set_mode(self, mode):
196         self._mode = mode
197
198     # ManualMixin methods
199
200     def set_current(self, current):
201         self._manual_current = self._limited_current(current)
202
203     # PIDMixin methods
204
205     def set_setpoint(self, setpoint):
206         self._setpoint = setpoint
207
208     def get_setpoint(self):
209         return self._setpoint
210
211     def set_cooling_gains(self, proportional=None, integral=None,
212                           derivative=None):
213         if proportional is not None:
214             self._p_cool = proportional
215         if integral is not None:
216             self._i_cool = integral
217         if derivative is not None:
218             self._d_cool = derivative
219
220     def get_cooling_gains(self):
221         return (self._p_cool, self._i_cool, self._d_cool)
222
223     def set_heating_gains(self, proportional=None, integral=None,
224                           derivative=None):
225         if proportional is not None:
226             self._p_heat = proportional
227         if integral is not None:
228             self._i_heat = integral
229         if derivative is not None:
230             self._d_heat = derivative
231
232     def get_heating_gains(self):
233         return (self._p_heat, self._i_heat, self._d_heat)
234
235     def get_feedback_terms(self):
236         return (self.get_current(), self._setpoint - self.get_temp(),
237                 self._i_term, self._d_term)
238
239     def clear_integral_term(self):
240         self._i_term = 0