TestBackend
~~~~~~~~~~~
-To get a feel for driving a PID system, check out the `TestBackend`.
-For example, you can experiment with different feedback terms and dead
-times to understand why you're getting instability or other control
-effects. Here's an example that shows a reasonable approach with a
-bit of integrator overshoot::
-
- >>> from pypid.backend.test import TestBackend
- >>> from time import sleep
- >>> from matplotlib import pyplot
- >>> from numpy import loadtxt
- >>> log_file = 'pid.log'
- >>> log_stream = open('pid.log', 'w')
- >>> b = TestBackend(log_stream=log_stream)
- >>> b.set_max_current(0.6)
- >>> b.set_heating_gains(propband=2, integral=.1)
- >>> b.set_cooling_gains(propband=2, integral=.1)
- >>> b.set_setpoint(25)
- >>> sleep(120)
- >>> t.cleanup()
- >>> log_stream.close()
- >>> header = open(log_file, 'r').readline()
- >>> label = header.strip('#\n').split('\t')
- >>> data = loadtxt('pid.log')
- >>> pyplot.hold(True)
- >>> for i in range(1, len(label)):
- ... if i in [1, 3, 5]:
- ... if i:
- ... pyplot.legend(loc='best') # add legend to previous subplot
- ... pyplot.subplot(3, 1, (i-1)/2 + 1)
- ... pyplot.plot(data[:,0], data[:,i], '.', label=label[i])
- >>> pyplot.legend(loc='best')
- >>> pyplot.show()
-
-Of course, you can use whatever plotting program you like to graph the
-values stored to `pid.log`. Matplotlib_ and NumPy_ are just
-convenient Python-based packages.
+To get a feel for driving a PID system, look atcheck out the
+`TestBackend`, which simulates a standard first-order process with
+dead time (FOPDT).
Installation
============
========= ===================== ================ ==========================
Package Purpose Debian_ Gentoo_
========= ===================== ================ ==========================
-pymodbus_ Modbus stack python-modbus dev-python/twisted
-pySerial_ serial comminication python-serial dev-python/pyserial
+aubio_ Pitch detection python-aubio media-libs/aubio
nose_ testing python-nose dev-python/nose
+NumPy_ Controller analysis python-numpy dev-python/numpy
+pySerial_ serial comminication python-serial dev-python/pyserial
+pymodbus_ Modbus stack python-modbus dev-python/twisted
+SciPy_ Controller analysis python-scipy dev-python/scipy
========= ===================== ================ ==========================
Actually, `pymodbus` may (depending on your packaging system) depend
.. _Modbus: http://en.wikipedia.org/wiki/Modbus
.. _serial port: http://en.wikipedia.org/wiki/Serial_port
.. _Matplotlib: http://matplotlib.sourceforge.net/
-.. _NumPy: http://numpy.scipy.org/
.. _Laird announcement: http://www.lairdtech.com/NewsItem.aspx?id=953
.. _wayback: http://web.archive.org/web/20090204201524/http://melcor.com/
.. _Laird thermal: http://lairdtech.thomasnet.com/category/thermal-management-solutions/
http://www.physics.drexel.edu/~wking/unfolding-disasters/posts/Gentoo_overlay
.. _Debian: http://www.debian.org/
.. _Gentoo: http://www.gentoo.org/
+.. _aubio: http://aubio.org/
+.. _NumPy: http://numpy.scipy.org/
.. _pymodbus: http://code.google.com/p/pymodbus/
.. _pySerial: http://pyserial.sourceforge.net/
.. _Twisted: http://twistedmatrix.com/trac/
+.. _SciPy: http://www.scipy.org/
.. _db578120: http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=578120
.. _nose: http://somethingaboutorange.com/mrl/projects/nose/
.. _Git: http://git-scm.com/
--- /dev/null
+#!/usr/bin/env python
+# Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
+#
+# This file is part of pypid.
+#
+# pypid is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation, either
+# version 3 of the License, or (at your option) any later version.
+#
+# pypid is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with pypid. If not, see
+# <http://www.gnu.org/licenses/>.
+
+
+from argparse import ArgumentParser
+from sys import stdout
+from time import sleep
+
+try:
+ from matplotlib import pyplot
+ from numpy import loadtxt
+except ImportError, e:
+ pyplot = None
+ loadtxt = None
+ plot_import_error = e
+
+from pypid.backend.test import TestBackend
+from pypid.rules import ziegler_nichols_step_response
+
+
+parser = ArgumentParser(description='Simulate a step response.')
+parser.add_argument(
+ '-K', '--process-gain', metavar='GAIN', type=float, default=1,
+ help='process gain (PV-units over MV-units)',)
+parser.add_argument(
+ '-L', '--dead-time', metavar='TIME', type=float, default=1,
+ help='system dead time (lag)')
+parser.add_argument(
+ '-T', '--decay-time', metavar='TIME', type=float, default=1,
+ help='exponential decay timescale')
+parser.add_argument(
+ '-P', '--proportional', metavar='GAIN', type=float, default=None,
+ help='process gain (output units over input units)',)
+parser.add_argument(
+ '-I', '--integral', metavar='TIME', type=float, default=None,
+ help='intergral gain timescale')
+parser.add_argument(
+ '-D', '--derivative', metavar='TIME', type=float, default=None,
+ help='derivative gain timescale')
+parser.add_argument(
+ '-M', '--max-mv', metavar='MV', type=float, default=100.,
+ help='maximum manipulated variable')
+parser.add_argument(
+ '-A', '--tuning-algorithm', metavar='TUNER', default=None,
+ choices=['ZN'], help='step tuning algorithm')
+parser.add_argument(
+ '-m', '--mode', metavar='MODE', default='PID',
+ choices=['P', 'PI', 'PID'], help='controller mode')
+parser.add_argument(
+ '-t', '--time', metavar='TIME', type=float, default=10.,
+ help='simulation time')
+parser.add_argument(
+ '-o', '--output', default='-', help='output log file')
+parser.add_argument(
+ '-p', '--plot', action='store_true', default=False,
+ help='plot the repsonse')
+
+args = parser.parse_args()
+
+if args.plot and not (pyplot and loadtxt) :
+ raise plot_import_error
+
+if args.output == '-':
+ log_stream = stdout
+ if args.plot:
+ raise ValueError('can only plot when outputing to a file')
+else:
+ log_stream = open(args.output, 'w')
+
+K = args.process_gain
+L = args.dead_time
+T = args.decay_time
+
+p,i,d = (0, float('inf'), 0)
+if args.tuning_algorithm == 'ZN':
+ p,i,d = ziegler_nichols_step_response(
+ process_gain=K, dead_time=L, decay_time=T, mode=args.mode)
+else:
+ if args.proportional:
+ p = args.proportional
+ if args.integral:
+ i = args.integral
+ if args.derivative:
+ d = args.derivative
+
+b = TestBackend(
+ process_gain=K, dead_time=L, decay_time=T, max_mv=args.max_mv,
+ log_stream=log_stream)
+try:
+ b.set_up_gains(proportional=p, integral=i, derivative=d)
+ b.set_down_gains(proportional=p, integral=i, derivative=d)
+ b.set_setpoint(1.0)
+ sleep(args.time)
+finally:
+ b.cleanup();
+ if args.output != '-':
+ log_stream.close()
+
+if args.plot:
+ header = open(args.output, 'r').readline()
+ label = header.strip('#\n').split('\t')
+ data = loadtxt(args.output)
+ times = data[:,0] - data[0,0]
+ pyplot.hold(True)
+ subplot = 1
+ for i in range(1, len(label)):
+ if i in [1, 4, 6]:
+ if i:
+ pyplot.legend(loc='best') # add legend to previous subplot
+ pyplot.subplot(3, 1, subplot)
+ subplot += 1
+ pyplot.plot(times, data[:,i], '.', label=label[i])
+ pyplot.legend(loc='best')
+ pyplot.show()
#!/usr/bin/env python
# Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
#
-# This file is part of tempcontrol.
+# This file is part of pypid.
#
-# tempcontrol is free software: you can redistribute it and/or
+# pypid is free software: you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation, either
# version 3 of the License, or (at your option) any later version.
#
-# tempcontrol is distributed in the hope that it will be useful,
+# pypid is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
-# License along with tempcontrol. If not, see
+# License along with pypid. If not, see
# <http://www.gnu.org/licenses/>.
"""Log control and ambient temperature every 10 seconds.
import time
-from tempcontrol.backend import get_backend
+from pypid.backend import get_backend
b = get_backend('melcor')()
class Backend (object):
- """Temperature control backend
+ """Process-control backend
There are several common forms for a PID control formula. For the
purpose of setting heating and cooling gains (`.get_*_gains()` and
In this formulation, the parameter units will be:
- * K_p: MV units / PV units (e.g. amp/K)
- * T_i, T_d: time (e.g. seconds)
+ * K_p: MV-units/PV-units (e.g. amp/K for a thermoelectric
+ controller). Don't confuse this `proportional gain` with the
+ `process gain` used in `TestBackend`.
+ * T_i, T_d: time (e.g. seconds)
"""
- def __init__(self):
- self._max_current = None
-
- @staticmethod
- def _convert_F_to_C(F):
- return (F - 32)/1.8
+ pv_units = 'PV-units'
+ mv_units = 'MV-units'
- @staticmethod
- def _convert_C_to_F(C):
- return C*1.8 + 32
+ def __init__(self):
+ self._max_mv = None
def cleanup(self):
"Release resources and disconnect from any hardware."
pass
- def get_temp(self):
- "Return the current process temperature in degrees Celsius"
+ def get_pv(self):
+ "Return the current process variable in PV-units"
raise NotImplementedError()
- def get_ambient_temp(self):
- "Return room temperature in degrees Celsius"
+ def get_ambient_mv(self):
+ "Return the abmient (bath) status in PV-units"
raise NotImplementedError()
- def set_max_current(self, max):
- "Set the max current in Amps"
+ def set_max_mv(self, max):
+ "Set the max manipulated variable in MV-units"
raise NotImplementedError()
- def get_max_current(self):
- "Get the max current in Amps"
+ def get_max_mvt(self):
+ "Get the max manipulated variable MV-units"
raise NotImplementedError()
- def get_current(self):
- """Return the calculated control current in Amps"
+ def get_mv(self):
+ """Return the calculated manipulated varaible in MV-units
- The returned current is not the actual current, but the
- current that the temperature controller calculates it should
- generate. If the voltage required to generate that current
- exceeds the controller's max voltage (15 V on mine), then the
- physical current will be less than the value returned here.
+ The returned current is not the actual MV, but the MV that the
+ controller calculates it should generate. For example, if the
+ voltage required to generate an MV current exceeds the
+ controller's max voltage, then the physical current will be
+ less than the value returned here.
"""
raise NotImplementedError()
class ManualMixin (object):
- def set_current(self, current):
- """Set the desired control current in Amps
- """
+ def set_mv(self, current):
+ "Set the desired manipulated variable in MV-units"
raise NotImplementedError()
class PIDMixin (object):
def set_setpoint(self, setpoint):
- "Set the temperature setpoint in degrees Celsius"
+ "Set the process variable setpoint in PV-units"
raise NotImplementedError()
def get_setpoint(self, setpoint):
- "Get the temperature setpoint in degrees Celsius"
+ "Get the process variable setpoint in PV-units"
raise NotImplementedError()
- def get_cooling_gains(self):
+ def get_duwn_gains(self):
"""..."""
raise NotImplementedError()
- def set_cooling_gains(self, proportional=None, integral=None,
- derivative=None):
+ def set_down_gains(self, proportional=None, integral=None,
+ derivative=None):
"""
...
"""
raise NotImplementedError()
- def get_heating_gains(self):
+ def get_up_gains(self):
"""..."""
raise NotImplementedError()
- def set_heating_gains(self, proportional=None, integral=None,
- derivative=None):
+ def set_up_gains(self, proportional=None, integral=None, derivative=None):
"""
...
"""
backend.
"""
raise NotImplementedError()
+
+
+class TemperatureMixin (object):
+ @staticmethod
+ def _convert_F_to_C(F):
+ return (F - 32)/1.8
+
+ @staticmethod
+ def _convert_C_to_F(C):
+ return C*1.8 + 32
from . import Backend as _Backend
from . import ManualMixin as _ManualMixin
from . import PIDMixin as _PIDMixin
+from . import TemperatureMixin as _TemperatureMixin
class Register (object):
return super(BoundedFloatRegister, self).decode(value, **kwargs)
-class MelcorBackend (_Backend, _ManualMixin, _PIDMixin):
+class MelcorBackend (_Backend, _ManualMixin, _PIDMixin, _TemperatureMixin):
"""Temperature control backend for a Melcor MTCA Temperature Controller
+
+ * PV: process temperature
+ * PV-units: degrees Celsius
+ * MV: controller current
+ * MV-units: amps
"""
+ pv_units = 'C'
+ mv_units = 'A'
+
# Relative register addresses from back page of Melcor Manual.
# Then I went through Chapter 6 tables looking for missing
# registers. References are from Series MTCA Thermoelectric
# Support for Backend methods
- def get_temp(self):
+ def get_pv(self):
return self._read('HIGH_RESOLUTION')
- def get_ambient_temp(self):
+ def get_ambient_pv(self):
return self._convert_F_to_C(self._read('AMBIENT_TEMPERATURE'))
- def set_max_current(self, max):
+ def set_max_mv(self, max):
"""Set the max current in Amps
0.2 A is the default max current since it seems ok to use
self._write('HIGH_POWER_LIMIT_BELOW', max_percent)
self._max_current = max
- def get_max_current(self):
+ def get_max_mv(self):
percent = self._read('HIGH_POWER_LIMIT_ABOVE')
above = percent/100. * self._spec_max_current
percent = self._read('HIGH_POWER_LIMIT_BELOW')
self._max_current = above
return above
- def get_current(self):
+ def get_mv(self):
pout = self._read('PERCENT_OUTPUT')
cur = self._spec_max_current * pout / 100.0
return cur
# ManualMixin methods
- def set_current(self, current):
+ def set_mv(self, current):
if current > self._spec_max_current:
raise ValueError('current {} exceeds spec maximum {}'.format(
current, self._spec_max_current))
proportional = max_current/propband
return (proportional, integral, derivative)
- def set_cooling_gains(self, proportional=None, integral=None,
- derivative=None):
+ def set_down_gains(self, proportional=None, integral=None,
+ derivative=None):
self._set_gains(
output=1, proportional=proportional, integral=integral,
derivative=derivative)
- def get_cooling_gains(self):
+ def get_down_gains(self):
return self._get_gains(output=1)
- def set_heating_gains(self, proportional=None, integral=None,
- derivative=None):
+ def set_up_gains(self, proportional=None, integral=None, derivative=None):
self._set_gains(
output=2, proportional=proportional, integral=integral,
derivative=derivative)
- def get_heating_gains(self):
+ def get_up_gains(self):
return self._get_gains(output=2)
def get_feedback_terms(self):
class TestBackend (_Backend, _ManualMixin, _PIDMixin):
"""Test backend for demonstrating `Controller` function
- The underlying temperature decay model is exponential, which is
- often refered to as "Newton's law of cooling"
+ The response function for a first-order plus dead time process
+ (aka FOPDT, or occasionaly KLT), one of control theory's classics,
+ is
- dT/dt = h (Tbath - T)
+ G(s) = Y(s)/X(s) = K_p e^{-Ls} / (1 + T s)
- where `h` is the transfer coefficient (with some scaling terms
- brushed under the rug). To make the system more realistic, I've
- also added a dead time, so temperatures returned by `.get_temp()`
- actually correspond to the system temperature `dead_time` seconds
- before the measurement was taken. Finally, there's a
- `drive_coefficient` `d`, which gives the rate of temperature
- change due to a applied driving current `I`, so
+ Going back to the time domain with the inverse Laplace transform:
- dT(y)/dt = h (Tbath(t) - T(t)) + d I(t-L)
+ (1 + Ts)Y(s) = K_p e^{-Ls} X(s) Laplace transform rule:
+ y(t) + T dy(t)/dt = K_p e^{-Ls} X(s) f'(t) -> sF(s) + f(0)
+ y(t) + T dy(t)/dt = K_p x(t-L)u(t-L) f(t-a)u(t-a) -> e^{-as}F(s)
+ dy(t)/dt = -y(t)/T + K_p/T x(t-L)u(t-L)
- This model is often refered to as a FOPDT (first-order plus dead
- time) or KLT (K: static gain, L: time delay, T: time constant/lag)
- model. Translating our model above into process-control jargon:
+ This may look more familiar to those of us more used to
+ differential equations than we are to transfer functions. There
+ is an exponential decay to y=0 with a time constant T, with an
+ external driver that has a gain of K_p and a lag (dead time) of L.
- * Process variable y(t) corresponds to our T(t).
- * Manipulated variable u(t) corresponds to our I(t).
- * Process gain dy/du (often denoted K_p). For our parameters
- above, K_p = dy/du = dT/dI = d/h.
- * Process time constant (aka lag, often denoted tau or T; the
- exponential decay timescale). For our parameters above,
- tau = 1/h.
- * Dead time (often denoted L or theta; the time delay between a
- change system and that change being reflected in the process
- variable). For our parameters above, L = dead_time.
+ The units should be:
- The response function for a FOPDT process is
+ * K_p: y-units over x-units (e.g. K/amp for a thermoelectric controller)
+ * L, T: time (e.g. seconds)
- G(s) = K_p e^{-Ls} / (1 + T s)
+ Just to flesh out the usual jargon, x is refered to as the
+ manipulated variable (MV) and y is the process variable (PV).
+
+ Because transfer functions treats the initial conditions
+ (e.g. x(t=0)), we can add them back in if we want to create a more
+ general y(t) having the same dynamics.
+
+ dy(t)/dt = (y_b(t) - y(t))/T + K_p/T x(t-L)
+
+ where I've assumed that x(t<0)=0 in order to drop the Heaviside
+ step function. Now instead of decaying to zero in the absence of
+ a driving signal, y(t) will decay to a bath value y_b(t).
For interesting experimental evidence of exponential cooling, see
Kaliszan et al., "Verification of the exponential model of body
http://ep.physoc.org/content/90/5/727.long
September 1, 2005 Experimental Physiology, 90, 727-738.
"""
- def __init__(self, bath=20, transfer_coefficient=0.1,
- drive_coefficient=1., max_current=1., dead_time=1.,
- process_period=0.01, log_stream=None):
+ def __init__(self, bath=0., process_gain=1., dead_time=1.,
+ decay_time=1., max_mv=1., process_period=0.01,
+ log_stream=None):
"""
bath : float
- bath (ambient) temperature in degrees Celsius
- transfer_coefficient : float
- between the system and the bath, in inverse seconds
- drive_coefficient : float
- for the applied current, in degrees Celsius per amp
- max_current : float
- maximum current in amps
+ bath (ambient) process variable in PV units
+ process_gain : float
+ gain between manipulated variable MV and PV, in PV-units/MV-units
dead_time : float
- time lag in seconds between an internal system temperature
- and the corresponding `.get_temp()` reading
+ time lag between a PV and the corresponding MV response.
+ decay_time : float
+ exponential decay time constant for the undriven PV
+ max_mv : float
+ maximum MV in MV-units
process_period : float
- time in seconds between process-thread temperature updates
+ time between process-thread PV updates
+
+ All times (`dead_time`, `decay_time`, and `process_period`)
+ should be in the same units (e.g. seconds).
"""
- self._bath = bath
- self._transfer_coefficient = transfer_coefficient
- self._drive_coefficient = drive_coefficient
- self._max_current = max_current
+ self._bath = self._setpoint = bath
+ self._K = process_gain
+ self._L = dead_time
+ self._T = decay_time
+ self._max_mv = max_mv
self._dead_periods = int(dead_time/process_period)
self._process_period = process_period
self._log_stream = log_stream
- self._setpoint = 0
self._i_term = self._d_term = 0
- self._p_cool = self._d_cool = 0
- self._p_heat = self._d_heat = 0
- self._i_cool = self._i_heat = float('inf')
- self._manual_current = 0
+ self._p_down = self._d_down = 0
+ self._p_up = self._d_up = 0
+ self._i_down = self._i_up = float('inf')
+ self._manual_mv = 0
self._mode = 'PID'
- self._temperatures = [bath]*(self._dead_periods+1)
+ self._PVs = [bath]*(self._dead_periods+1)
self._start_process_thread()
def cleanup(self):
def _run_process(self):
if self._log_stream:
line = '\t'.join((
- 'time', 'setpoint', 'process temperature',
- 'measured temperature', 'dT_bath', 'dT_drive', 'current',
- 'intergal', 'derivative'))
+ 'time', 'SP', 'PV_int', 'PV', 'dPV_bath', 'dPV_drive',
+ 'MV', 'intergal', 'derivative'))
self._log_stream.write('#{}\n'.format(line))
dt = self._process_period
next_time = _time.time() + dt
while not self._stop_process:
- T = self._temperatures[-1]
- dT_bath = self._transfer_coefficient * (self._bath - T)
- current = self.get_current(_increment_i_term=True)
- dT_drive = self._drive_coefficient * current
+ PV = self._PVs[-1]
+ dPV_bath = (self._bath - PV)/self._T * dt
+ MV = self.get_mv(_increment_i_term=True)
+ dPV_drive = self._K * MV / self._T * dt
if self._log_stream:
line = '\t'.join(str(x) for x in (
- _time.time(), self._setpoint, T, self.get_temp(), dT_bath*dt,
- dT_drive*dt, current, self._i_term, self._d_term))
+ _time.time(), self._setpoint, PV, self.get_pv(),
+ dPV_bath, dPV_drive, MV, self._i_term, self._d_term))
self._log_stream.write(line + '\n')
- T += (dT_bath + dT_drive) * dt
- self._temperatures.pop(0)
- self._temperatures.append(T)
+ PV += dPV_bath + dPV_drive
+ self._PVs.pop(0)
+ self._PVs.append(PV)
s = next_time - _time.time()
if s > 0:
_time.sleep(s)
next_time += dt
- def _limited_current(self, current):
- if current > self._max_current:
- #_LOG.debug('limiting current to maximum: {:n} -> {:n} amps'.format(
- # current, self._max_current))
- return self._max_current
- elif current < -self._max_current:
- #_LOG.debug('limiting current to maximum: {:n} -> {:n} amps'.format(
- # current, -self._max_current))
- return -self._max_current
- return current
-
- def get_temp(self):
- return self._temperatures[1]
-
- def get_ambient_temp(self):
+ def _limited_mv(self, mv):
+ if mv > self._max_mv:
+ #_LOG.debug('limiting mv to maximum: {:n} -> {:n} amps'.format(
+ # mv, self._max_mv))
+ return self._max_mv
+ elif mv < -self._max_mv:
+ #_LOG.debug('limiting mv to maximum: {:n} -> {:n} amps'.format(
+ # mv, -self._max_mv))
+ return -self._max_mv
+ return mv
+
+ def get_pv(self):
+ return self._PVs[1]
+
+ def get_ambient_pv(self):
return self._bath
- def set_max_current(self, max):
- self._max_current = max
+ def set_max_mv(self, max):
+ self._max_mv = max
- def get_max_current(self):
- return self._max_current
+ def get_max_mv(self):
+ return self._max_mv
- def get_current(self, _increment_i_term=True):
+ def get_mv(self, _increment_i_term=True):
if self._mode == 'manual':
- return self._manual_current
+ return self._manual_mv
elif self._mode == 'PID':
- T_pref,T = self._temperatures[:2]
- dT_s = (self._setpoint - T)
- if T > self._setpoint:
- p,i,d = self._p_cool, self._i_cool, self._d_cool
+ PV_prev,PV = self._PVs[:2]
+ dPV_s = (self._setpoint - PV)
+ if PV > self._setpoint:
+ p,i,d = self._p_down, self._i_down, self._d_down
else:
- p,i,d = self._p_heat, self._i_heat, self._d_heat
- dT_t = T - T_pref
+ p,i,d = self._p_up, self._i_up, self._d_up
+ dPV_t = PV - PV_prev
dt = self._process_period
if _increment_i_term is True:
- self._i_term += dT_s * dt
- self._d_term = -dT_t / dt # = de(t)/dt with constant setpoint
- return self._limited_current(
- p*(dT_s + self._i_term/i + d*self._d_term))
+ self._i_term += dPV_s * dt
+ self._d_term = -dPV_t / dt # = de(t)/dt with constant setpoint
+ return self._limited_mv(
+ p*(dPV_s + self._i_term/i + d*self._d_term))
raise ValueError(self._mode)
def get_modes(self):
# ManualMixin methods
- def set_current(self, current):
- self._manual_current = self._limited_current(current)
+ def set_mv(self, mv):
+ self._manual_mv = self._limited_mv(mv)
# PIDMixin methods
def get_setpoint(self):
return self._setpoint
- def set_cooling_gains(self, proportional=None, integral=None,
+ def set_down_gains(self, proportional=None, integral=None,
derivative=None):
if proportional is not None:
- self._p_cool = proportional
+ self._p_down = proportional
if integral is not None:
- self._i_cool = integral
+ self._i_down = integral
if derivative is not None:
- self._d_cool = derivative
+ self._d_down = derivative
- def get_cooling_gains(self):
- return (self._p_cool, self._i_cool, self._d_cool)
+ def get_down_gains(self):
+ return (self._p_down, self._i_down, self._d_down)
- def set_heating_gains(self, proportional=None, integral=None,
+ def set_up_gains(self, proportional=None, integral=None,
derivative=None):
if proportional is not None:
- self._p_heat = proportional
+ self._p_up = proportional
if integral is not None:
- self._i_heat = integral
+ self._i_up = integral
if derivative is not None:
- self._d_heat = derivative
+ self._d_up = derivative
- def get_heating_gains(self):
- return (self._p_heat, self._i_heat, self._d_heat)
+ def get_up_gains(self):
+ return (self._p_up, self._i_up, self._d_up)
def get_feedback_terms(self):
- return (self.get_current(), self._setpoint - self.get_temp(),
+ return (self.get_mv(), self._setpoint - self.get_pv(),
self._i_term, self._d_term)
def clear_integral_term(self):
from numpy import log as _log
from scipy.interpolate import interp1d as _interp1d
-from hooke.util.fit import ModelFitter as _ModelFitter
-
from . import LOG as _LOG
+from .fit import ModelFitter as _ModelFitter
class Controller (object):
- """PID temperature control frontend.
+ """PID control frontend.
backend: pypid.backend.Backend instance
backend driving your particular harware
setpoint: float
- initial setpoint in degrees Celsius
+ initial setpoint in PV-units
min: float
- minimum temperature in degrees Celsius (for sanity checks)
+ minimum PV in PV-units (for sanity checks)
max: float
- maximum temperature in degrees Celsius (for sanity checks)
+ maximum PV in PV-units (for sanity checks)
"""
- def __init__(self, backend, setpoint=20.0, min=5.0, max=50.0):
+ def __init__(self, backend, setpoint=0.0, min=-float('inf'),
+ max=float('inf')):
self._backend = backend
self._setpoint = setpoint
- self._min = min
- self._max = max
+ self._min_pv = min
+ self._max_pv = max
# basic user interface methods
- def get_temp(self):
- """Return the current process temperature in degrees Celsius
+ def get_pv(self):
+ """Return the current process variable in PV-units
We should expose this to users, so they don't need to go
mucking about in `._backend`.
"""
- return self._backend.get_temp()
+ return self._backend.get_pv()
- def set_temp(self, setpoint, **kwargs):
+ def set_pv(self, setpoint, **kwargs):
"""Change setpoint to `setpoint` and wait for stability
"""
self._backend.set_setpoint(setpoint)
def wait_for_stability(self, setpoint, tolerance=0.3, time=10.,
timeout=-1, sleep_time=0.1, return_data=False):
- """Wait until the temperature is sufficiently stable
+ """Wait until the PV is sufficiently stable
setpoint : float
- target temperature in degrees C
+ target PV in PV-units
tolerance : float
- maximum allowed deviation from `setpoint` in dregrees C
+ maximum allowed deviation from `setpoint` in PV-units
time : float
- time the temperature must remain in the allowed region
- before the signal is delared "stable"
+ time the PV must remain in the allowed region before the
+ signal is delared "stable"
timeout : float
maximum time to wait for stability. Set to -1 to never
timeout.
time in seconds to sleep between reads to avoid an
overly-busy loop
return_data : boolean
- if true, also return a list of `(timestamp, temp)` tuples
+ if true, also return a list of `(timestamp, PV)` tuples
read while waiting
- Read the temperature every `sleep_time` seconds until the
- temperature has remained within `tolerance` of `setpoint` for
- `time`. If the stability criteria are met, return `True`
- (stable). If `timeout` seconds pass before the criteria are
- met, return `False` (not stable).
+ Read the PV every `sleep_time` seconds until the PV has
+ remained within `tolerance` of `setpoint` for `time`. If the
+ stability criteria are met, return `True` (stable). If
+ `timeout` seconds pass before the criteria are met, return
+ `False` (not stable).
"""
- _LOG.debug(('wait until the temperature is stable at {:n} +/- {:n} C '
+ _LOG.debug(('wait until the PV is stable at {:n} +/- {:n} C '
'for {:n} seconds').format(setpoint, tolerance, time))
stable = False
if return_data:
else:
timeout_time = start_time + timeout
while True:
- T = self.get_temp()
- in_range = abs(T - setpoint) < tolerance
+ PV = self.get_pv()
+ in_range = abs(PV - setpoint) < tolerance
t = _time.time()
if return_data:
- data.append((t, T))
+ data.append((t, PV))
if in_range:
if t >= stable_time:
- _LOG.debug('temperature is stable')
+ _LOG.debug('process variable is stable')
stable = True
break # in range for long enough
else:
stable_time = t + time # reset target time
if timeout_time and t > timeout_time:
+ _LOG.debug('process variable is not stable')
break # timeout
_time.sleep(sleep_time)
if return_data:
return self.wait_for_stability(
setpoint=setpoint, time=time, timeout=time, **kwargs)
- def estimate_temperature_sensitivity(self, num_temps=10, sleep_time=0.1,
- max_repeats=10):
- temps = []
- last_temp = None
+ def estimate_pv_sensitivity(self, values=10, sleep_time=0.1,
+ max_repeats=10):
+ PVs = []
+ last_PV = None
repeats = 0
while True:
- temp = self.get_temp()
+ PV = self.get_pv()
if repeats == max_repeats:
- last_temp = None
- if temp == last_temp:
+ last_PV = None
+ if PV == last_PV:
repeats += 1
else:
- temps.append(temp)
- if len(temps) > num_temps:
+ PVs.append(PV)
+ if len(PVs) > values:
break
repeats = 0
- last_temp = temp
+ last_PV = PV
_time.sleep(sleep_time)
- temps = _array(temps)
- return temps.std()
+ PVs = _array(PVs)
+ return PVs.std()
# debugging methods
"""
c = self._backend.get_current()
pid,prop,ntgrl,deriv = self._backend.get_feedback_terms()
- T = self.get_temp()
- Tset = self._backend.get_setpoint()
- if T > Tset: # cooling
- p,i,d = self._backend.get_cooling_gains()
- else: # heating
- p,i,d = self._backend.get_heating_gains()
+ PV = self.get_pv()
+ SP = self._backend.get_setpoint()
+ if PV > SP:
+ p,i,d = self._backend.get_down_gains()
+ else:
+ p,i,d = self._backend.get_up_gains()
_LOG.info(('pid(read) {:n} =? sum(calc from terms) {:n} '
'=? cur(read) {:n} A').format(pid, prop+ntgrl+deriv, c))
_LOG.info('read: p {:n}, i {:n}, d {:n}'.format(p,i,d))
- _LOG.info('calc: p {:n}'.format(p*(Tset-T)))
+ _LOG.info('calc: p {:n}'.format(p*(SP-PV)))
# tuning experiments and data processing
- def get_step_response(self, current_a, current_b,
- sleep_time=0.1, stable_time=10., **kwargs):
+ def get_step_response(self, mv_a, mv_b, sleep_time=0.1, stable_time=10.,
+ **kwargs):
"Measure a step response for later analysis"
_LOG.debug('measure step response')
if 'time' in kwargs:
kwargs['sleep_time'] = sleep_time
mode = self._backend.get_mode()
if mode == 'manual':
- manual_current = self._backend.get_current()
+ manual_mv = self._backend.get_mv()
else:
self._backend.set_mode('manual')
_LOG.debug('set first current and wait for stability')
- self._backend.set_current(current_a)
- temp_a = self.get_temp()
- while not self.is_stable(temp_a, **kwargs):
- temp_a = self.get_temp()
- _LOG.debug('stabilized at {:n} C with {:n} amps'.format(
- temp_a, current_a))
- _LOG.debug('set second current and wait for stability')
+ self._backend.set_mv(mv_a)
+ pv_a = self.get_pv()
+ while not self.is_stable(pv_a, **kwargs):
+ pv_a = self.get_pv()
+ _LOG.debug('stabilized at {:n} {} with {:n} {}'.format(
+ pv_a, self._backend.pv_units, mv_a, self._backend.mv_units))
+ _LOG.debug('set second MV and wait for stability')
data = []
start_time = _time.time()
- self._backend.set_current(current_b)
- temp_b = temp_a
+ self._backend.set_mv(mv_b)
+ pv_b = pv_a
while True:
- stable,d = self.is_stable(temp_b, return_data=True, **kwargs)
+ stable,d = self.is_stable(pv_b, return_data=True, **kwargs)
data.extend(d)
- temp_b = self.get_temp()
+ pv_b = self.get_pv()
if stable:
break
- _LOG.debug('stabilized at {:n} C with {:n} amps'.format(
- temp_b, current_b))
+ _LOG.debug('stabilized at {:n} {} with {:n} {}'.format(
+ pv_b, self._backend.pv_units, mv_b, self._backend.mv_units))
if mode == 'manual':
- self._backend.set_current(manual_current)
+ self._backend.set_mv(manual_mv)
else:
self._backend.set_mode(mode)
return data
@staticmethod
- def analyze_step_response(step_response, current_shift):
- rates = [(Tb-Ta)/(tb-ta) for ((ta,Ta),(tb,Tb))
+ def analyze_step_response(step_response, mv_shift):
+ rates = [(PVb-PVa)/(tb-ta) for ((ta,PVa),(tb,PVb))
in zip(step_response, step_response[1:])]
# TODO: averaging filter?
max_rate_i = max_rate = 0
max_rate_i = i
max_rate = abs(rate)
max_rate_time,max_rate_temp = step_response[max_rate_i] # TODO: avg i and i+1?
- time_a,temp_a = step_response[0]
+ time_a,PV_a = step_response[0]
max_rate_time -= time_a
- dead_time = max_rate_time - (max_rate_temp - temp_a) / max_rate
- t_data = _array([t for t,T in step_response[max_rate_i:]])
- T_data = _array([T for t,T in step_response[max_rate_i:]])
- model = ExponentialModel(T_data, info={'x data (s)': t_data})
- tau,T0,T8 = model.fit()
- gain = (T8 - temp_a) / current_shift
- return (gain, dead_time, tau, max_rate)
+ dead_time = max_rate_time - (max_rate_temp - PV_a) / max_rate
+ t_data = _array([t for t,PV in step_response[max_rate_i:]])
+ PV_data = _array([PV for t,PV in step_response[max_rate_i:]])
+ model = ExponentialModel(PV_data, info={'x data (s)': t_data})
+ decay_time,PV0,PV8 = model.fit()
+ process_gain = (PV8 - PV_a) / mv_shift
+ return (process_gain, dead_time, decay_time, max_rate)
def get_bang_bang_response(self, dead_band=0.8, num_oscillations=10,
max_dead_band_time=30, sleep_time=0.1):
- orig_cool_gains = self._backend.get_cooling_gains()
- orig_heat_gains = self._backend.get_heating_gains()
+ orig_down_gains = self._backend.get_down_gains()
+ orig_up_gains = self._backend.get_up_gains()
_LOG.debug('measure bang-bang response')
mode = self._backend.get_mode()
if mode != 'PID':
self._backend.set_mode('PID')
i=0
setpoint = self._backend.get_setpoint()
- self._backend.set_cooling_gains(float('inf'), float('inf'), 0)
- self._backend.set_heating_gains(float('inf'), float('inf'), 0)
+ self._backend.set_down_gains(float('inf'), float('inf'), 0)
+ self._backend.set_up_gains(float('inf'), float('inf'), 0)
start_time = _time.time()
- temp = self.get_temp()
- heat_first = self._is_heating(
- temp=temp, setpoint=setpoint, dead_band=dead_band)
+ pv = self.get_pv()
+ under_first = self._is_under(
+ pv=pv, setpoint=setpoint, dead_band=dead_band)
_LOG.debug('wait to exit dead band')
t = start_time
- while heat_first is None:
+ while under_first is None:
if t - start_time > max_dead_band_time:
msg = 'still in dead band after after {:n} seconds'.format(
max_dead_band_time)
raise ValueError(msg)
_time.sleep(sleep_time)
t = _time.time()
- temp = t.get_temp()
- heat_first = self._is_heating(
- temp=temp, setpoint=setpoint, dead_band=dead_band)
+ pv = t.get_pv()
+ under_first = self._is_under(
+ pv=pv, setpoint=setpoint, dead_band=dead_band)
_LOG.debug('read {:d} oscillations'.format(num_oscillations))
data = []
- heating = heat_first
+ under = under_first
while i < num_oscillations*2 + 1:
t = _time.time()
- temp = self.get_temp()
+ pv = self.get_pv()
# drop first half cycle (possibly includes ramp to setpoint)
if i > 0:
- data.append((t, temp))
- is_heating = self._is_heating(
+ data.append((t, pv))
+ _under = self._is_under(
temp=temp, setpoint=setpoint, dead_band=dead_band)
- if heating is True and is_heating is False:
- _LOG.debug('transition to cooling (i={:d})'.format(i))
- heating = False
+ if _under is True and under is False:
+ _LOG.debug('transition to PV < SP (i={:d})'.format(i))
+ under = _under
i += 1
- elif heating is False and is_heating is True:
- _LOG.debug('transition to heating (i={:d})'.format(i))
- heating = True
+ elif under is False and is_under is True:
+ _LOG.debug('transition to PV > SP (i={:d})'.format(i))
+ under = _under
i += 1
_time.sleep(sleep_time)
- self._backend.set_cooling_gains(*orig_cool_gains)
- self._backend.set_heating_gains(*orig_heat_gains)
+ self._backend.set_down_gains(*orig_down_gains)
+ self._backend.set_up_gains(*orig_up_gains)
if mode != 'PID':
self._backend.set_mode(mode)
return data
@staticmethod
def analyze_bang_bang_response(bang_bang_response):
- t_data = _array([t for t,T in bang_bang_response])
- T_data = _array([T for t,T in bang_bang_response])
- amp = (T_data.max() - T_data.min()) / 2
- freq = Controller._get_frequency(x_data=t_data, y_data=T_data)
+ t_data = _array([t for t,PV in bang_bang_response])
+ PV_data = _array([PV for t,PV in bang_bang_response])
+ amp = (PV_data.max() - PV_data.min()) / 2
+ freq = Controller._get_frequency(x_data=t_data, y_data=PV_data)
period = 1./freq
return (amp, period)
def get_ultimate_cycle_response(self, proportional, period):
- orig_cool_gains = self._backend.get_cooling_gains()
- orig_heat_gains = self._backend.get_heating_gains()
+ orig_down_gains = self._backend.get_down_gains()
+ orig_up_gains = self._backend.get_up_gains()
_LOG.debug('measure ultimate cycle response')
mode = self._backend.get_mode()
if mode != 'PID':
self._backend.set_mode('PID')
# TODO...
- self._backend.set_cooling_gains(*orig_cool_gains)
- self._backend.set_heating_gains(*orig_heat_gains)
+ self._backend.set_down_gains(*orig_down_gains)
+ self._backend.set_up_gains(*orig_up_gains)
if mode != 'PID':
self._backend.set_mode(mode)
return data
ultimate_cycle_response)
return period
- # tuning rules
-
- @staticmethod
- def ziegler_nichols_step_response(gain, dead_time, tau, mode='PID'):
- r = dead_time / tau
- if r < 0.1 or r > 1:
- _LOG.warn(('Ziegler-Nichols not a good idea when '
- 'dead-time/tau = {:n}').format(r))
- pkern = tau/(gain*dead_time)
- if mode == 'P':
- return (pkern, float('inf'), 0)
- elif mode == 'PI':
- return (0.9*pkern, 3.3*dead_time, 0)
- elif mode == 'PID':
- return (1.2*pkern, 2*dead_time, dead_time/2.)
- raise ValueError(mode)
-
- def ziegler_nichols_bang_bang_response(self, amplitude, period,
- max_current=None, mode='PID'):
- if max_current is None:
- max_current = self._backend.get_max_current()
- return self._ziegler_nichols_bang_bang_response(
- amplitude, period, max_current=max_current, mode=mode)
-
- @staticmethod
- def _ziegler_nichols_bang_bang_response(amplitude, period,
- max_current, mode='PID'):
- """
- amplitude : float
- center-to-peak amplitude (in K) of bang-bang oscillation
- period : float
- period (in seconds) of the critical oscillation
- max_current : float
- "bang" current (in amps)
- """
- proportional = float(max_current)/amplitude
- period = float(period)
- if mode == 'P':
- return (proportional/2, float('inf'), 0)
- elif mode == 'PI':
- return (proportional/3, 2*period, 0)
- elif mode == 'PID':
- return (proportional/2, period, period/4)
- raise ValueError(mode)
-
- def ziegler_nichols_ultimate_cycle_response(self, proportional, period):
- """
- proportional : float
- critical P-only gain (ultimate gain, for sustained oscillation)
- period : float
- period (in seconds) of the critical oscillation
-
- Microstar Laboratories has a `nice analysis`_ on ZN
- limitations, which points out that ZN-tuning assumes your
- system has the FOPDT transfer function (see `TestBackend` for
- details).
-
- .. _nice analysis: http://www.mstarlabs.com/control/znrule.html
- """
- if mode == 'P':
- return (0.50*proportional, float('inf'), 0)
- elif mode == 'PI':
- return (0.45*proportional, period/1.2, 0)
- elif mode == 'PID':
- return (0.60*proportional, period/2, period/8)
- raise ValueError(mode)
-
- @staticmethod
- def cohen_coon_step_response(gain, dead_time, tau, mode='PID'):
- r = dead_time / tau
- pkern = tau/(gain*dead_time)
- if mode == 'P':
- return (pkern*(1+r/3.), float('inf'), 0)
- elif mode == 'PI':
- return (pkern*(0.9+r/12.), (30.+3*r)/(9+20*r)*dead_time, 0)
- elif mode == 'PD': # double check
- return (1.24*pkern*(1+0.13*tf), float('inf'),
- (0.27-0.36*t)/(1-0.87*t)*dead_time)
- elif mode == 'PID':
- return (pkern*(4./3+r/4.), (32.-6*r)/(13.-8*r)*dead_time,
- 4/(11.+2*r)*dead_time)
- raise ValueError(mode)
-
- @staticmethod
- def wang_juang_chan_step_response(gain, dead_time, tau, mode='PID'):
- """Wang-Juang-Chan tuning
- """
- K,L,T = (gain, dead_time, tau)
- if mode == 'PID':
- return ((0.7303+0.5307*T/L)*(T+0.5*L)/(K*(T+L)),
- T + 0.5*L,
- 0.5*L*T / (T + 0.5*L))
- raise ValueError(mode)
-
# utility methods
def _wait_until_close(self, setpoint, tolerance=0.3, sleep_time=0.1):
- while abs(self.get_temp() - setpoint) > tolerance:
+ while abs(self.get_pv() - setpoint) > tolerance:
_time.sleep(sleep_time)
def _time_function(self, function, args=(), kwargs=None, count=10):
stop = _time.time()
return float(stop-start)/count
- def _is_heating(self, temp=None, setpoint=None, dead_band=None):
- if temp is None:
- temp = self.get_temp()
+ def _is_under(self, pv=None, setpoint=None, dead_band=None):
+ if pv is None:
+ pv = self.get_pv()
if setpoint is None:
- temp = self._backend.get_setpoint()
- low_temp = high_temp = setpoint
+ setpoint = self._backend.get_setpoint()
+ low_pv = high_pv = setpoint
if dead_band:
- low_temp -= dead_band
- high_temp += dead_band
- if temp < low_temp:
- return False
- elif temp > high_temp:
+ low_pv -= dead_band
+ high_pv += dead_band
+ if pv < low_pv:
return True
+ elif pv > high_pv:
+ return False
return None
- def _select_parameter(self, heating_result=None, cooling_result=None,
+ def _select_parameter(self, under_result=None, over_result=None,
dead_band_result=None, **kwargs):
- heating = self._is_heating(**kwargs)
- if heating:
- return heating_result
- elif heating is False:
- return cooling_result
+ under = self._is_under(**kwargs)
+ if under:
+ return under_result
+ elif under is False:
+ return over_result
return dead_band_result
@staticmethod
if value > max:
raise ValueError('%g > %g' % (value, max))
- def _check_temp(temp):
- self._check_range(temp, self._min, self._max)
+ def _check_pv(pv):
+ self._check_rangee(pv, self._min_pv, self._max_pv)
class ExponentialModel (_ModelFitter):
--- /dev/null
+# Copyright (C) 2010-2011 W. Trevor King <wking@drexel.edu>
+#
+# This file is part of Hooke.
+#
+# Hooke is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# Hooke is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
+# Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with Hooke. If not, see
+# <http://www.gnu.org/licenses/>.
+
+"""Provide :class:`ModelFitter` to make arbitrary model fitting easy.
+"""
+
+from numpy import arange, ndarray
+from scipy import __version__ as _scipy_version
+from scipy.optimize import leastsq
+
+_strings = _scipy_version.split('.')
+# Don't convert third string to an integer in case of (for example) '0.7.2rc3'
+_SCIPY_VERSION = (int(_strings[0]), int(_strings[1]), _strings[2])
+del _strings
+
+
+class PoorFit (ValueError):
+ pass
+
+class ModelFitter (object):
+ """A convenient wrapper around :func:`scipy.optimize.leastsq`.
+
+ TODO: look into :mod:`scipy.odr` as an alternative fitting
+ algorithm (minimizing the sum of squares of orthogonal distances,
+ vs. minimizing y distances).
+
+ Parameters
+ ----------
+ d_data : array_like
+ Deflection data to be analyzed for the contact position.
+ info :
+ Store any extra information useful inside your overridden
+ methods.
+ rescale : boolean
+ Rescale parameters so the guess for each is 1.0. Also rescale
+ the data so data.std() == 1.0.
+
+ Examples
+ --------
+
+ >>> from pprint import pprint
+ >>> from Queue import Queue
+ >>> import numpy
+
+ You'll want to subclass `ModelFitter`, overriding at least
+ `.model` and potentially the parameter and scale guessing
+ methods.
+
+ >>> class LinearModel (ModelFitter):
+ ... '''Simple linear model.
+ ...
+ ... Levenberg-Marquardt is not how you want to solve this problem
+ ... for real systems, but it's a simple test case.
+ ... '''
+ ... def model(self, params):
+ ... '''A linear model.
+ ...
+ ... Notes
+ ... -----
+ ... .. math:: y = p_0 x + p_1
+ ... '''
+ ... p = params # convenient alias
+ ... self._model_data[:] = p[0]*arange(len(self._data)) + p[1]
+ ... return self._model_data
+ ... def guess_initial_params(self, outqueue=None):
+ ... return [float(self._data[-1] - self._data[0])/len(self._data),
+ ... self._data[0]]
+ ... def guess_scale(self, params, outqueue=None):
+ ... slope_scale = 0.1
+ ... if params[1] == 0:
+ ... offset_scale = 1
+ ... else:
+ ... offset_scale = 0.1*self._data.std()/abs(params[1])
+ ... if offset_scale == 0: # data is completely flat
+ ... offset_scale = 1.
+ ... return [slope_scale, offset_scale]
+ >>> data = 20*numpy.sin(arange(1000)) + 7.*arange(1000) - 33.0
+ >>> m = LinearModel(data)
+ >>> outqueue = Queue()
+ >>> slope,offset = m.fit(outqueue=outqueue)
+ >>> info = outqueue.get(block=False)
+ >>> pprint(info) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE, +REPORT_UDIFF
+ {'active fitted parameters': array([ 6.999..., -32.889...]),
+ 'active parameters': array([ 6.999..., -32.889...]),
+ 'convergence flag': ...,
+ 'covariance matrix': array([[ 1.199...e-08, -5.993...e-06],
+ [ -5.993...e-06, 3.994...e-03]]),
+ 'data scale factor': 1.0,
+ 'fitted parameters': array([ 6.999..., -32.889...]),
+ 'info': {'fjac': array([[...]]),
+ 'fvec': array([...]),
+ 'ipvt': array([1, 2]),
+ 'nfev': 7,
+ 'qtf': array([...])},
+ 'initial parameters': [6.992..., -33.0],
+ 'message': '...relative error between two consecutive iterates is at most 0.000...',
+ 'rescaled': False,
+ 'scale': [0.100..., 6.123...]}
+
+ We round the outputs to protect the doctest against differences in
+ machine rounding during computation. We expect the values to be close
+ to the input settings (slope 7, offset -33).
+
+ >>> print '%.3f' % slope
+ 7.000
+ >>> print '%.3f' % offset
+ -32.890
+
+ The offset is a bit off because, the range is not a multiple of
+ :math:`2\pi`.
+
+ We could also use rescaled parameters:
+
+ >>> m = LinearModel(data, rescale=True)
+ >>> outqueue = Queue()
+ >>> slope,offset = m.fit(outqueue=outqueue)
+ >>> print '%.3f' % slope
+ 7.000
+ >>> print '%.3f' % offset
+ -32.890
+
+ Test single-parameter models:
+
+ >>> class SingleParameterModel (LinearModel):
+ ... '''Simple linear model.
+ ... '''
+ ... def model(self, params):
+ ... return super(SingleParameterModel, self).model([params[0], 0.])
+ ... def guess_initial_params(self, outqueue=None):
+ ... return super(SingleParameterModel, self
+ ... ).guess_initial_params(outqueue)[:1]
+ ... def guess_scale(self, params, outqueue=None):
+ ... return super(SingleParameterModel, self
+ ... ).guess_scale([params[0], 0.], outqueue)[:1]
+ >>> data = 20*numpy.sin(arange(1000)) + 7.*arange(1000)
+ >>> m = SingleParameterModel(data)
+ >>> slope, = m.fit(outqueue=outqueue)
+ >>> print '%.3f' % slope
+ 7.000
+ """
+ def __init__(self, *args, **kwargs):
+ self.set_data(*args, **kwargs)
+
+ def set_data(self, data, info=None, rescale=False):
+ self._data = data
+ self._model_data = ndarray(shape=data.shape, dtype=data.dtype)
+ self.info = info
+ self._rescale = rescale
+ if rescale == True:
+ for x in [data.std(), data.max()-data.min(), abs(data.max()), 1.0]:
+ if x != 0:
+ self._data_scale_factor = x
+ break
+ else:
+ self._data_scale_factor = 1.0
+
+ def model(self, params):
+ p = params # convenient alias
+ self._model_data[:] = arange(len(self._data))
+ raise NotImplementedError
+
+ def guess_initial_params(self, outqueue=None):
+ return []
+
+ def guess_scale(self, params, outqueue=None):
+ """Guess the problem length scale in each parameter dimension.
+
+ Notes
+ -----
+ From the :func:`scipy.optimize.leastsq` documentation, `diag`
+ (which we refer to as `scale`, sets `factor * || diag * x||`
+ as the initial step. If `x == 0`, then `factor` is used
+ instead (from `lmdif.f`_)::
+
+ do 70 j = 1, n
+ wa3(j) = diag(j)*x(j)
+ 70 continue
+ xnorm = enorm(n,wa3)
+ delta = factor*xnorm
+ if (delta .eq. zero) delta = factor
+
+ For most situations then, you don't need to do anything fancy.
+ The default scaling (if you don't set a scale) is::
+
+ c on the first iteration and if mode is 1, scale according
+ c to the norms of the columns of the initial jacobian.
+
+ (i.e. `diag(j) = acnorm(j)`, where `acnorm(j) is the norm of the `j`th column
+ of the initial Jacobian).
+
+ .. _lmdif.f: http://www.netlib.org/minpack/lmdif.f
+ """
+ return None
+
+ def residual(self, params):
+ if self._rescale == True:
+ params = [p*s for p,s in zip(params, self._param_scale_factors)]
+ residual = self._data - self.model(params)
+ if False: # fit debugging
+ if not hasattr(self, '_i_'):
+ self._i_ = 0
+ self._data.tofile('data.%d' % self._i_, sep='\n')
+ self.model(params).tofile('model.%d' % self._i_, sep='\n')
+ self._i_ += 1
+ if self._rescale == True:
+ residual /= self._data_scale_factor
+ return residual
+
+ def fit(self, initial_params=None, scale=None, outqueue=None, **kwargs):
+ """
+ Parameters
+ ----------
+ initial_params : iterable or None
+ Initial parameter values for residual minimization. If
+ `None`, they are estimated from the data using
+ :meth:`guess_initial_params`.
+ scale : iterable or None
+ Parameter length scales for residual minimization. If
+ `None`, they are estimated from the data using
+ :meth:`guess_scale`.
+ outqueue : Queue or None
+ If given, will be used to output the data and fitted model
+ for user verification.
+ kwargs :
+ Any additional arguments are passed through to `leastsq`.
+ """
+ if initial_params == None:
+ initial_params = self.guess_initial_params(outqueue)
+ if scale == None:
+ scale = self.guess_scale(initial_params, outqueue)
+ if scale != None:
+ assert min(scale) > 0, scale
+ if self._rescale == True:
+ self._param_scale_factors = initial_params
+ for i,s in enumerate(self._param_scale_factors):
+ if s == 0:
+ self._param_scale_factors[i] = 1.0
+ active_params = [p/s for p,s in zip(initial_params,
+ self._param_scale_factors)]
+ else:
+ active_params = initial_params
+ params,cov,info,mesg,ier = leastsq(
+ func=self.residual, x0=active_params, full_output=True,
+ diag=scale, **kwargs)
+ if len(initial_params) == 1 and _SCIPY_VERSION < (0, 8, '0'):
+ # params is a float for scipy < 0.8.0. Convert to list.
+ params = [params]
+ if self._rescale == True:
+ active_params = params
+ params = [p*s for p,s in zip(params,
+ self._param_scale_factors)]
+ else:
+ active_params = params
+ if outqueue != None:
+ outqueue.put({
+ 'rescaled': self._rescale,
+ 'initial parameters': initial_params,
+ 'active parameters': active_params,
+ 'scale': scale,
+ 'data scale factor': self._data_scale_factor,
+ 'active fitted parameters': active_params,
+ 'fitted parameters': params,
+ 'covariance matrix': cov,
+ 'info': info,
+ 'message': mesg,
+ 'convergence flag': ier,
+ })
+ return params
+
+# Example ORD code from the old fit.py
+# def dist(px,py,linex,liney):
+# distancesx=scipy.array([(px-x)**2 for x in linex])
+# minindex=numpy.argmin(distancesx)
+# print px, linex[0], linex[-1]
+# return (py-liney[minindex])**2
+#
+#
+# def f_wlc(params,x,T=T):
+# '''
+# wlc function for ODR fitting
+# '''
+# lambd,pii=params
+# Kb=(1.38065e-23)
+# therm=Kb*T
+# y=(therm*pii/4.0) * (((1-(x*lambd))**-2) - 1 + (4*x*lambd))
+# return y
+#
+# def f_wlc_plfix(params,x,pl_value=pl_value,T=T):
+# '''
+# wlc function for ODR fitting
+# '''
+# lambd=params
+# pii=1/pl_value
+# Kb=(1.38065e-23)
+# therm=Kb*T
+# y=(therm*pii/4.0) * (((1-(x*lambd))**-2) - 1 + (4*x*lambd))
+# return y
+#
+# #make the ODR fit
+# realdata=scipy.odr.RealData(xchunk_corr_up,ychunk_corr_up)
+# if pl_value:
+# model=scipy.odr.Model(f_wlc_plfix)
+# o = scipy.odr.ODR(realdata, model, p0_plfix)
+# else:
+# model=scipy.odr.Model(f_wlc)
+# o = scipy.odr.ODR(realdata, model, p0)
+#
+# o.set_job(fit_type=2)
+# out=o.run()
+# fit_out=[(1/i) for i in out.beta]
+#
+# #Calculate fit errors from output standard deviations.
+# #We must propagate the error because we fit the *inverse* parameters!
+# #The error = (error of the inverse)*(value**2)
+# fit_errors=[]
+# for sd,value in zip(out.sd_beta, fit_out):
+# err_real=sd*(value**2)
+# fit_errors.append(err_real)
+
--- /dev/null
+# Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
+#
+# This file is part of pypid.
+#
+# pypid is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation, either
+# version 3 of the License, or (at your option) any later version.
+#
+# pypid is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with pypid. If not, see
+# <http://www.gnu.org/licenses/>.
+
+"Assorted PID tuning rules"
+
+
+def ziegler_nichols_step_response(process_gain, dead_time, decay_time, mode='PID'):
+ K,L,T = (process_gain, dead_time, decay_time)
+ r = L / T
+ if r > 1:
+ _LOG.warn(('Ziegler-Nichols not a good idea when '
+ 'dead-time/decay_time = {:n}').format(r))
+ pkern = 1/(K*r)
+ if mode == 'P':
+ return (pkern, float('inf'), 0)
+ elif mode == 'PI':
+ return (0.9*pkern, 3.3*L, 0)
+ elif mode == 'PID':
+ return (1.2*pkern, 2*L, L/2.)
+ raise ValueError(mode)
+
+def ziegler_nichols_bang_bang_response(amplitude, period, max_current,
+ mode='PID'):
+ """
+ amplitude : float
+ center-to-peak amplitude (in K) of bang-bang oscillation
+ period : float
+ period (in seconds) of the critical oscillation
+ max_current : float
+ "bang" current (in amps)
+ """
+ proportional = float(max_current)/amplitude
+ period = float(period)
+ if mode == 'P':
+ return (proportional/2, float('inf'), 0)
+ elif mode == 'PI':
+ return (proportional/3, 2*period, 0)
+ elif mode == 'PID':
+ return (proportional/2, period, period/4)
+ raise ValueError(mode)
+
+def ziegler_nichols_ultimate_cycle_response(proportional, period):
+ """
+ proportional : float
+ critical P-only process_gain (ultimate process_gain, for sustained oscillation)
+ period : float
+ period (in seconds) of the critical oscillation
+
+ Microstar Laboratories has a `nice analysis`_ on ZN
+ limitations, which points out that ZN-tuning assumes your
+ system has the FOPDT transfer function (see `TestBackend` for
+ details).
+
+ .. _nice analysis: http://www.mstarlabs.com/control/znrule.html
+ """
+ if mode == 'P':
+ return (0.50*proportional, float('inf'), 0)
+ elif mode == 'PI':
+ return (0.45*proportional, period/1.2, 0)
+ elif mode == 'PID':
+ return (0.60*proportional, period/2, period/8)
+ raise ValueError(mode)
+
+def cohen_coon_step_response(process_gain, dead_time, decay_time, mode='PID'):
+ K,L,T = (process_gain, dead_time, decay_time)
+ r = L/T
+ pkern = 1/(K*r)
+ if mode == 'P':
+ return (pkern*(1+r/3.), float('inf'), 0)
+ elif mode == 'PI':
+ return (pkern*(0.9+r/12.), (30.+3*r)/(9+20*r)*dead_time, 0)
+ elif mode == 'PD': # double check
+ return (1.24*pkern*(1+0.13*tf), float('inf'),
+ (0.27-0.36*t)/(1-0.87*t)*dead_time)
+ elif mode == 'PID':
+ return (pkern*(4./3+r/4.), (32.-6*r)/(13.-8*r)*dead_time,
+ 4/(11.+2*r)*dead_time)
+ raise ValueError(mode)
+
+def wang_juang_chan_step_response(process_gain, dead_time, decay_time, mode='PID'):
+ """Wang-Juang-Chan tuning
+ """
+ K,L,T = (process_gain, dead_time, decay_time)
+ if mode == 'PID':
+ return ((0.7303+0.5307*T/L)*(T+0.5*L)/(K*(T+L)),
+ T + 0.5*L,
+ 0.5*L*T / (T + 0.5*L))
+ raise ValueError(mode)
+
import time as _time
from . import LOG as _LOG
+from . import rules as _rules
from .backend import get_backend as _get_backend
from controller import Controller as _Controller
backend = _get_backend('test')()
try:
sp = backend.get_setpoint()
- _LOG.info('temperature = {:n} C'.format(backend.get_temp()))
- _LOG.info('setpoint = {:n} C'.format(sp))
- _LOG.info('current = {:n} A'.format(backend.get_current()))
+ _LOG.info('PV = {:n} {}'.format(backend.get_pv(), backend.pv_units))
+ _LOG.info('SP = {:n} {}'.format(sp, backend.pv_units))
+ _LOG.info('MV = {:n} {}'.format(backend.get_mv(), backend.mv_units))
_set_and_check_setpoint(backend=backend, setpoint=5.0)
- _check_max_current(backend=backend)
+ _check_max_mv(backend=backend)
_set_and_check_setpoint(backend=backend, setpoint=50.0)
- _check_max_current(backend=backend)
+ _check_max_mv(backend=backend)
_set_and_check_setpoint(backend=backend, setpoint=sp)
finally:
if internal_backend:
backend.cleanup()
def _set_and_check_setpoint(backend, setpoint):
- _LOG.info('setting setpoint to {:n} C'.format(setpoint))
+ _LOG.info('setting setpoint to {:n} {}'.format(setpoint, backend.pv_units))
c.set_setpoint(setpoint)
sp = c.get_setpoint()
- _LOG.info('setpoint = {:n} C'.format(sp))
+ _LOG.info('SP = {:n} {}'.format(sp, backend.pv_units))
if sp != setpoint:
msg = 'read setpoint {:n} != written setpoint {:n}'.format(
sp, setpoint)
def _check_max_current(backend):
# give the backend some time to overcome any integral gain
_time.sleep(10)
- cur = c.get_current()
- _LOG.info('current = {:n} A'.format(cur))
- mcur = c.get_max_current()
- if cur != mcur:
- temp = backend.get_temp()
- sp = backend.get_setpoint()
- msg = ('current of {:n} A is not the max {:n} A, but the system is '
- 'at {:n} C while the setpoint is at {:n}').format(
- cur, mcur, temp, sp)
+ MV = c.get_mv()
+ _LOG.info('MV = {:n} {}'.format(MV, backend.mv_units))
+ mMV = c.get_max_mv()
+ if mv != mMV:
+ PV = backend.get_pv()
+ SP = backend.get_setpoint()
+ PVu = backend.pv_units
+ MVu = backend.mv_units
+ msg = ('current of {:n} {} is not the max {:n} {}, but the system is '
+ 'at {:n} {} while the setpoint is at {:n} {}').format(
+ MV, MVu, mMV, MVu, PV, PVu, SP, PVu)
_LOG.error(msg)
raise Exception(msg)
-def test_controller_step_response(backend=None, setpoint=25):
+def test_controller_step_response(backend=None, setpoint=1):
internal_backend = False
if not backend:
internal_backend = True
try:
backend.set_mode('PID')
c = _Controller(backend=backend)
- max_current = backend.get_max_current()
- current_a = 0.4 * max_current
- current_b = 0.5 * max_current
+ max_MV = backend.get_max_mv()
+ MVa = 0.4 * max_MV
+ MVb = 0.5 * max_MV
step_response = c.get_step_response(
- current_a=current_a, current_b=current_b, tolerance=0.5, stable_time=4.)
- if True:
+ mv_a=MVa, mv_b=MVb, tolerance=0.5, stable_time=4.)
+ if False:
with open('step_response.dat', 'w') as d:
s = step_response[0][0]
- for t,T in step_response:
- d.write('{:n}\t{:n}\n'.format(t-s, T))
- gain,dead_time,tau,max_rate = c.analyze_step_response(
- step_response, current_shift=current_b-current_a)
- _LOG.debug(('step response: dead time {:n}, gain {:n}, tau {:n}, '
- 'max-rate {:n}').format(dead_time, gain, tau, max_rate))
+ for t,PV in step_response:
+ d.write('{:n}\t{:n}\n'.format(t-s, PV))
+ process_gain,dead_time,decay_time,max_rate = c.analyze_step_response(
+ step_response, mv_shift=MVb-MVa)
+ _LOG.debug(('step response: process gain {:n}, dead time {:n}, decay '
+ 'time {:n}, max-rate {:n}').format(
+ process_gain, dead_time, decay_time, max_rate))
+ r = rules
for name,response_fn,modes in [
- ('Zeigler-Nichols', c.ziegler_nichols_step_response,
+ ('Zeigler-Nichols', r.ziegler_nichols_step_response,
['P', 'PI', 'PID']),
- ('Cohen-Coon', c.cohen_coon_step_response,
+ ('Cohen-Coon', r.cohen_coon_step_response,
['P', 'PI', 'PID']), # 'PD'
- ('Wang-Juan-Chan', c.wang_juang_chan_step_response,
+ ('Wang-Juan-Chan', r.wang_juang_chan_step_response,
['PID']),
]:
for mode in modes:
p,i,d = response_fn(
- gain=gain, dead_time=dead_time, tau=tau, mode=mode)
+ process_gain=process_gain, dead_time=dead_time,
+ decay_time=decay_time, mode=mode)
_LOG.debug(
'{} step response {}: p {:n}, i {:n}, d {:n}'.format(
name, mode, p, i, d))
if internal_backend:
backend.cleanup()
-def test_controller_bang_bang_response(backend=None, setpoint=25):
+def test_controller_bang_bang_response(backend=None, setpoint=1):
internal_backend = False
if not backend:
internal_backend = True
- backend = _get_backend('test')(log_stream=open('pid.log', 'w'))
- # shift our noise-less system off its setpoint
- backend.set_setpoint(backend.get_temp()+0.1)
+ backend = _get_backend('test')()
try:
+ backend.set_setpoint(setpoint)
c = _Controller(backend=backend)
- dead_band = 3*c.estimate_temperature_sensitivity()
- bang_bang_response = c.get_bang_bang_response(dead_band=dead_band, num_oscillations=4)
- if True:
+ dead_band = 3*c.estimate_pv_sensitivity()
+ bang_bang_response = c.get_bang_bang_response(dead_band=dead_band)
+ if False:
with open('bang_bang_response.dat', 'w') as d:
s = bang_bang_response[0][0]
for t,T in bang_bang_response:
amplitude,period = c.analyze_bang_bang_response(bang_bang_response)
_LOG.debug('bang-bang response: amplitude {:n}, period {:n}'.format(
amplitude,period))
- p,i,d = c.ziegler_nichols_bang_bang_response(
- amplitude=amplitude, period=period, mode='PID')
- _LOG.debug(('Zeigler-Nichols bang-bang response: '
- 'p {:n}, i {:n}, d {:n}').format(p, i, d))
+ for name,response_fn,modes in [
+ ('Zeigler-Nichols', r.ziegler_nichols_bang_bang_response,
+ ['P', 'PI', 'PID']),
+ ]:
+ for mode in modes:
+ p,i,d = response_fn(
+ amplitude=amplitude, period=period, mode=mode)
+ _LOG.debug(
+ '{} bang-bang response {}: p {:n}, i {:n}, d {:n}'.format(
+ name, mode, p, i, d))
finally:
if internal_backend:
backend.cleanup()