Ran update-copyright.py.
[pypid.git] / pypid / backend / test.py
1 # Copyright (C) 2011-2012 W. Trevor King <wking@tremily.us>
2 #
3 # This file is part of pypid.
4 #
5 # pypid is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
8 # version.
9 #
10 # pypid is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # pypid.  If not, see <http://www.gnu.org/licenses/>.
16
17 import threading as _threading
18 import time as _time
19
20 from .. import LOG as _LOG
21 from . import Backend as _Backend
22 from . import ManualMixin as _ManualMixin
23 from . import PIDMixin as _PIDMixin
24
25
26 class TestBackend (_Backend, _ManualMixin, _PIDMixin):
27     """Test backend for demonstrating `Controller` function
28
29     The response function for a first-order plus dead time process
30     (aka FOPDT, or occasionaly KLT), one of control theory's classics,
31     is
32
33       G(s) = Y(s)/X(s) = K_p e^{-Ls} / (1 + T s)
34
35     Going back to the time domain with the inverse Laplace transform:
36
37       (1 + Ts)Y(s) = K_p e^{-Ls} X(s)              Laplace transform rule:
38       y(t) + T dy(t)/dt = K_p e^{-Ls} X(s)         f'(t) -> sF(s) + f(0)
39       y(t) + T dy(t)/dt = K_p x(t-L)u(t-L)         f(t-a)u(t-a) -> e^{-as}F(s)
40       dy(t)/dt = -y(t)/T + K_p/T x(t-L)u(t-L)
41
42     This may look more familiar to those of us more used to
43     differential equations than we are to transfer functions.  There
44     is an exponential decay to y=0 with a time constant T, with an
45     external driver that has a gain of K_p and a lag (dead time) of L.
46
47     The units should be:
48
49     * K_p: y-units over x-units (e.g. K/amp for a thermoelectric controller)
50     * L, T: time (e.g. seconds)
51
52     Just to flesh out the usual jargon, x is refered to as the
53     manipulated variable (MV) and y is the process variable (PV).
54
55     Because transfer functions treats the initial conditions
56     (e.g. x(t=0)), we can add them back in if we want to create a more
57     general y(t) having the same dynamics.
58
59       dy(t)/dt = (y_b(t) - y(t))/T + K_p/T x(t-L)
60
61     where I've assumed that x(t<0)=0 in order to drop the Heaviside
62     step function.  Now instead of decaying to zero in the absence of
63     a driving signal, y(t) will decay to a bath value y_b(t).
64
65     For interesting experimental evidence of exponential cooling, see
66     Kaliszan et al., "Verification of the exponential model of body
67     temperature decrease after death in pigs".
68     doi: 10.1113/expphysiol.2005.030551
69     http://ep.physoc.org/content/90/5/727.long
70     September 1, 2005 Experimental Physiology, 90, 727-738.
71     """
72     def __init__(self, bath=0., process_gain=1., dead_time=1.,
73                  decay_time=1., max_mv=1., process_period=0.01,
74                  log_stream=None):
75         """
76         bath : float
77             bath (ambient) process variable in PV units
78         process_gain : float
79             gain between manipulated variable MV and PV, in PV-units/MV-units
80         dead_time : float
81             time lag between a PV and the corresponding MV response.
82         decay_time : float
83             exponential decay time constant for the undriven PV
84         max_mv : float
85             maximum MV in MV-units
86         process_period : float
87             time between process-thread PV updates
88
89         All times (`dead_time`, `decay_time`, and `process_period`)
90         should be in the same units (e.g. seconds).
91         """
92         self._bath = self._setpoint = bath
93         self._K = process_gain
94         self._L = dead_time
95         self._T = decay_time
96         self._max_mv = max_mv
97         self._dead_periods = int(dead_time/process_period)
98         self._process_period = process_period
99         self._log_stream = log_stream
100         self._i_term = self._d_term = 0
101         self._p_down = self._d_down = 0
102         self._p_up = self._d_up = 0
103         self._i_down = self._i_up = float('inf')
104         self._manual_mv = 0
105         self._mode = 'PID'
106         self._PVs = [bath]*(self._dead_periods+1)
107         self._start_process_thread()
108
109     def cleanup(self):
110         self._stop_process_thread()
111
112     def _start_process_thread(self):
113         self._stop_process = False
114         self._process_thread = _threading.Thread(
115             target=self._run_process, name='process')
116         self._process_thread.start()
117
118     def _stop_process_thread(self):
119         self._stop_process = True
120         self._process_thread.join()
121
122     def _run_process(self):
123         if self._log_stream:
124             line = '\t'.join((
125                     'time', 'SP', 'PV_int', 'PV', 'dPV_bath', 'dPV_drive',
126                     'MV', 'intergal', 'derivative'))
127             self._log_stream.write('#{}\n'.format(line))
128         dt = self._process_period
129         next_time = _time.time() + dt
130         while not self._stop_process:
131             PV = self._PVs[-1]
132             dPV_bath = (self._bath - PV)/self._T * dt
133             MV = self.get_mv(_increment_i_term=True)
134             dPV_drive = self._K * MV / self._T * dt
135             if self._log_stream:
136                 line = '\t'.join(str(x) for x in (
137                     _time.time(), self._setpoint, PV, self.get_pv(),
138                     dPV_bath, dPV_drive, MV, self._i_term, self._d_term))
139                 self._log_stream.write(line + '\n')
140             PV += dPV_bath + dPV_drive
141             self._PVs.pop(0)
142             self._PVs.append(PV)
143             s = next_time - _time.time()
144             if s > 0:
145                 _time.sleep(s)
146             next_time += dt
147
148     def _limited_mv(self, mv):
149         if mv > self._max_mv:
150             #_LOG.debug('limiting mv to maximum: {:n} -> {:n} amps'.format(
151             #        mv, self._max_mv))
152             return self._max_mv
153         elif mv < -self._max_mv:
154             #_LOG.debug('limiting mv to maximum: {:n} -> {:n} amps'.format(
155             #        mv, -self._max_mv))
156             return -self._max_mv
157         return mv
158
159     def get_pv(self):
160         return self._PVs[1]
161
162     def get_ambient_pv(self):
163         return self._bath
164
165     def set_max_mv(self, max):
166         self._max_mv = max
167
168     def get_max_mv(self):
169         return self._max_mv
170
171     def get_mv(self, _increment_i_term=True):
172         if self._mode == 'manual':
173             return self._manual_mv
174         elif self._mode == 'PID':
175             PV_prev,PV = self._PVs[:2]
176             dPV_s = (self._setpoint - PV)
177             if PV > self._setpoint:
178                 p,i,d = self._p_down, self._i_down, self._d_down
179             else:
180                 p,i,d = self._p_up, self._i_up, self._d_up
181             dPV_t = PV - PV_prev
182             dt = self._process_period
183             if _increment_i_term is True:
184                 self._i_term += dPV_s * dt
185             self._d_term = -dPV_t / dt  # = de(t)/dt with constant setpoint
186             return self._limited_mv(
187                 p*(dPV_s + self._i_term/i + d*self._d_term))
188         raise ValueError(self._mode)
189
190     def get_modes(self):
191         return ['manual', 'PID']
192
193     def get_mode(self):
194         return self._mode
195
196     def set_mode(self, mode):
197         self._mode = mode
198
199     # ManualMixin methods
200
201     def set_mv(self, mv):
202         self._manual_mv = self._limited_mv(mv)
203
204     # PIDMixin methods
205
206     def set_setpoint(self, setpoint):
207         self._setpoint = setpoint
208
209     def get_setpoint(self):
210         return self._setpoint
211
212     def set_down_gains(self, proportional=None, integral=None,
213                           derivative=None):
214         if proportional is not None:
215             self._p_down = proportional
216         if integral is not None:
217             self._i_down = integral
218         if derivative is not None:
219             self._d_down = derivative
220
221     def get_down_gains(self):
222         return (self._p_down, self._i_down, self._d_down)
223
224     def set_up_gains(self, proportional=None, integral=None,
225                           derivative=None):
226         if proportional is not None:
227             self._p_up = proportional
228         if integral is not None:
229             self._i_up = integral
230         if derivative is not None:
231             self._d_up = derivative
232
233     def get_up_gains(self):
234         return (self._p_up, self._i_up, self._d_up)
235
236     def get_feedback_terms(self):
237         return (self.get_mv(), self._setpoint - self.get_pv(),
238                 self._i_term, self._d_term)
239
240     def clear_integral_term(self):
241         self._i_term = 0