From: W. Trevor King Date: Wed, 27 Jul 2011 14:56:17 +0000 (-0400) Subject: Copy fit.py from Hooke to remove dependency, and generalize temp -> PV, etc. X-Git-Tag: v0.3~2 X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=5d9293b;p=pypid.git Copy fit.py from Hooke to remove dependency, and generalize temp -> PV, etc. --- diff --git a/README b/README index 002968a..27dcc9d 100644 --- a/README +++ b/README @@ -44,42 +44,9 @@ __ `Laird thermal`_ TestBackend ~~~~~~~~~~~ -To get a feel for driving a PID system, check out the `TestBackend`. -For example, you can experiment with different feedback terms and dead -times to understand why you're getting instability or other control -effects. Here's an example that shows a reasonable approach with a -bit of integrator overshoot:: - - >>> from pypid.backend.test import TestBackend - >>> from time import sleep - >>> from matplotlib import pyplot - >>> from numpy import loadtxt - >>> log_file = 'pid.log' - >>> log_stream = open('pid.log', 'w') - >>> b = TestBackend(log_stream=log_stream) - >>> b.set_max_current(0.6) - >>> b.set_heating_gains(propband=2, integral=.1) - >>> b.set_cooling_gains(propband=2, integral=.1) - >>> b.set_setpoint(25) - >>> sleep(120) - >>> t.cleanup() - >>> log_stream.close() - >>> header = open(log_file, 'r').readline() - >>> label = header.strip('#\n').split('\t') - >>> data = loadtxt('pid.log') - >>> pyplot.hold(True) - >>> for i in range(1, len(label)): - ... if i in [1, 3, 5]: - ... if i: - ... pyplot.legend(loc='best') # add legend to previous subplot - ... pyplot.subplot(3, 1, (i-1)/2 + 1) - ... pyplot.plot(data[:,0], data[:,i], '.', label=label[i]) - >>> pyplot.legend(loc='best') - >>> pyplot.show() - -Of course, you can use whatever plotting program you like to graph the -values stored to `pid.log`. Matplotlib_ and NumPy_ are just -convenient Python-based packages. +To get a feel for driving a PID system, look atcheck out the +`TestBackend`, which simulates a standard first-order process with +dead time (FOPDT). Installation ============ @@ -106,9 +73,12 @@ distribution, you'll need the following dependencies: ========= ===================== ================ ========================== Package Purpose Debian_ Gentoo_ ========= ===================== ================ ========================== -pymodbus_ Modbus stack python-modbus dev-python/twisted -pySerial_ serial comminication python-serial dev-python/pyserial +aubio_ Pitch detection python-aubio media-libs/aubio nose_ testing python-nose dev-python/nose +NumPy_ Controller analysis python-numpy dev-python/numpy +pySerial_ serial comminication python-serial dev-python/pyserial +pymodbus_ Modbus stack python-modbus dev-python/twisted +SciPy_ Controller analysis python-scipy dev-python/scipy ========= ===================== ================ ========================== Actually, `pymodbus` may (depending on your packaging system) depend @@ -167,7 +137,6 @@ Copyright 2008-2011 .. _Modbus: http://en.wikipedia.org/wiki/Modbus .. _serial port: http://en.wikipedia.org/wiki/Serial_port .. _Matplotlib: http://matplotlib.sourceforge.net/ -.. _NumPy: http://numpy.scipy.org/ .. _Laird announcement: http://www.lairdtech.com/NewsItem.aspx?id=953 .. _wayback: http://web.archive.org/web/20090204201524/http://melcor.com/ .. _Laird thermal: http://lairdtech.thomasnet.com/category/thermal-management-solutions/ @@ -177,9 +146,12 @@ Copyright 2008-2011 http://www.physics.drexel.edu/~wking/unfolding-disasters/posts/Gentoo_overlay .. _Debian: http://www.debian.org/ .. _Gentoo: http://www.gentoo.org/ +.. _aubio: http://aubio.org/ +.. _NumPy: http://numpy.scipy.org/ .. _pymodbus: http://code.google.com/p/pymodbus/ .. _pySerial: http://pyserial.sourceforge.net/ .. _Twisted: http://twistedmatrix.com/trac/ +.. _SciPy: http://www.scipy.org/ .. _db578120: http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=578120 .. _nose: http://somethingaboutorange.com/mrl/projects/nose/ .. _Git: http://git-scm.com/ diff --git a/examples/pid_repsonse.py b/examples/pid_repsonse.py new file mode 100755 index 0000000..33311cb --- /dev/null +++ b/examples/pid_repsonse.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python +# Copyright (C) 2011 W. Trevor King +# +# This file is part of pypid. +# +# pypid is free software: you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. +# +# pypid is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with pypid. If not, see +# . + + +from argparse import ArgumentParser +from sys import stdout +from time import sleep + +try: + from matplotlib import pyplot + from numpy import loadtxt +except ImportError, e: + pyplot = None + loadtxt = None + plot_import_error = e + +from pypid.backend.test import TestBackend +from pypid.rules import ziegler_nichols_step_response + + +parser = ArgumentParser(description='Simulate a step response.') +parser.add_argument( + '-K', '--process-gain', metavar='GAIN', type=float, default=1, + help='process gain (PV-units over MV-units)',) +parser.add_argument( + '-L', '--dead-time', metavar='TIME', type=float, default=1, + help='system dead time (lag)') +parser.add_argument( + '-T', '--decay-time', metavar='TIME', type=float, default=1, + help='exponential decay timescale') +parser.add_argument( + '-P', '--proportional', metavar='GAIN', type=float, default=None, + help='process gain (output units over input units)',) +parser.add_argument( + '-I', '--integral', metavar='TIME', type=float, default=None, + help='intergral gain timescale') +parser.add_argument( + '-D', '--derivative', metavar='TIME', type=float, default=None, + help='derivative gain timescale') +parser.add_argument( + '-M', '--max-mv', metavar='MV', type=float, default=100., + help='maximum manipulated variable') +parser.add_argument( + '-A', '--tuning-algorithm', metavar='TUNER', default=None, + choices=['ZN'], help='step tuning algorithm') +parser.add_argument( + '-m', '--mode', metavar='MODE', default='PID', + choices=['P', 'PI', 'PID'], help='controller mode') +parser.add_argument( + '-t', '--time', metavar='TIME', type=float, default=10., + help='simulation time') +parser.add_argument( + '-o', '--output', default='-', help='output log file') +parser.add_argument( + '-p', '--plot', action='store_true', default=False, + help='plot the repsonse') + +args = parser.parse_args() + +if args.plot and not (pyplot and loadtxt) : + raise plot_import_error + +if args.output == '-': + log_stream = stdout + if args.plot: + raise ValueError('can only plot when outputing to a file') +else: + log_stream = open(args.output, 'w') + +K = args.process_gain +L = args.dead_time +T = args.decay_time + +p,i,d = (0, float('inf'), 0) +if args.tuning_algorithm == 'ZN': + p,i,d = ziegler_nichols_step_response( + process_gain=K, dead_time=L, decay_time=T, mode=args.mode) +else: + if args.proportional: + p = args.proportional + if args.integral: + i = args.integral + if args.derivative: + d = args.derivative + +b = TestBackend( + process_gain=K, dead_time=L, decay_time=T, max_mv=args.max_mv, + log_stream=log_stream) +try: + b.set_up_gains(proportional=p, integral=i, derivative=d) + b.set_down_gains(proportional=p, integral=i, derivative=d) + b.set_setpoint(1.0) + sleep(args.time) +finally: + b.cleanup(); + if args.output != '-': + log_stream.close() + +if args.plot: + header = open(args.output, 'r').readline() + label = header.strip('#\n').split('\t') + data = loadtxt(args.output) + times = data[:,0] - data[0,0] + pyplot.hold(True) + subplot = 1 + for i in range(1, len(label)): + if i in [1, 4, 6]: + if i: + pyplot.legend(loc='best') # add legend to previous subplot + pyplot.subplot(3, 1, subplot) + subplot += 1 + pyplot.plot(times, data[:,i], '.', label=label[i]) + pyplot.legend(loc='best') + pyplot.show() diff --git a/examples/temp_monitor.py b/examples/temp_monitor.py old mode 100644 new mode 100755 index f6b075f..19f603a --- a/examples/temp_monitor.py +++ b/examples/temp_monitor.py @@ -1,20 +1,20 @@ #!/usr/bin/env python # Copyright (C) 2011 W. Trevor King # -# This file is part of tempcontrol. +# This file is part of pypid. # -# tempcontrol is free software: you can redistribute it and/or +# pypid is free software: you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation, either # version 3 of the License, or (at your option) any later version. # -# tempcontrol is distributed in the hope that it will be useful, +# pypid is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public -# License along with tempcontrol. If not, see +# License along with pypid. If not, see # . """Log control and ambient temperature every 10 seconds. @@ -24,7 +24,7 @@ usage: python temp_monitor.py import time -from tempcontrol.backend import get_backend +from pypid.backend import get_backend b = get_backend('melcor')() diff --git a/pypid/backend/__init__.py b/pypid/backend/__init__.py index 51cd544..9efad6a 100644 --- a/pypid/backend/__init__.py +++ b/pypid/backend/__init__.py @@ -50,7 +50,7 @@ def get_backend(name): class Backend (object): - """Temperature control backend + """Process-control backend There are several common forms for a PID control formula. For the purpose of setting heating and cooling gains (`.get_*_gains()` and @@ -64,48 +64,45 @@ class Backend (object): In this formulation, the parameter units will be: - * K_p: MV units / PV units (e.g. amp/K) - * T_i, T_d: time (e.g. seconds) + * K_p: MV-units/PV-units (e.g. amp/K for a thermoelectric + controller). Don't confuse this `proportional gain` with the + `process gain` used in `TestBackend`. + * T_i, T_d: time (e.g. seconds) """ - def __init__(self): - self._max_current = None - - @staticmethod - def _convert_F_to_C(F): - return (F - 32)/1.8 + pv_units = 'PV-units' + mv_units = 'MV-units' - @staticmethod - def _convert_C_to_F(C): - return C*1.8 + 32 + def __init__(self): + self._max_mv = None def cleanup(self): "Release resources and disconnect from any hardware." pass - def get_temp(self): - "Return the current process temperature in degrees Celsius" + def get_pv(self): + "Return the current process variable in PV-units" raise NotImplementedError() - def get_ambient_temp(self): - "Return room temperature in degrees Celsius" + def get_ambient_mv(self): + "Return the abmient (bath) status in PV-units" raise NotImplementedError() - def set_max_current(self, max): - "Set the max current in Amps" + def set_max_mv(self, max): + "Set the max manipulated variable in MV-units" raise NotImplementedError() - def get_max_current(self): - "Get the max current in Amps" + def get_max_mvt(self): + "Get the max manipulated variable MV-units" raise NotImplementedError() - def get_current(self): - """Return the calculated control current in Amps" + def get_mv(self): + """Return the calculated manipulated varaible in MV-units - The returned current is not the actual current, but the - current that the temperature controller calculates it should - generate. If the voltage required to generate that current - exceeds the controller's max voltage (15 V on mine), then the - physical current will be less than the value returned here. + The returned current is not the actual MV, but the MV that the + controller calculates it should generate. For example, if the + voltage required to generate an MV current exceeds the + controller's max voltage, then the physical current will be + less than the value returned here. """ raise NotImplementedError() @@ -133,38 +130,36 @@ class Backend (object): class ManualMixin (object): - def set_current(self, current): - """Set the desired control current in Amps - """ + def set_mv(self, current): + "Set the desired manipulated variable in MV-units" raise NotImplementedError() class PIDMixin (object): def set_setpoint(self, setpoint): - "Set the temperature setpoint in degrees Celsius" + "Set the process variable setpoint in PV-units" raise NotImplementedError() def get_setpoint(self, setpoint): - "Get the temperature setpoint in degrees Celsius" + "Get the process variable setpoint in PV-units" raise NotImplementedError() - def get_cooling_gains(self): + def get_duwn_gains(self): """...""" raise NotImplementedError() - def set_cooling_gains(self, proportional=None, integral=None, - derivative=None): + def set_down_gains(self, proportional=None, integral=None, + derivative=None): """ ... """ raise NotImplementedError() - def get_heating_gains(self): + def get_up_gains(self): """...""" raise NotImplementedError() - def set_heating_gains(self, proportional=None, integral=None, - derivative=None): + def set_up_gains(self, proportional=None, integral=None, derivative=None): """ ... """ @@ -195,3 +190,13 @@ class PIDMixin (object): backend. """ raise NotImplementedError() + + +class TemperatureMixin (object): + @staticmethod + def _convert_F_to_C(F): + return (F - 32)/1.8 + + @staticmethod + def _convert_C_to_F(C): + return C*1.8 + 32 diff --git a/pypid/backend/melcor.py b/pypid/backend/melcor.py index f0a5c92..1d9d0f7 100644 --- a/pypid/backend/melcor.py +++ b/pypid/backend/melcor.py @@ -26,6 +26,7 @@ from .. import LOG as _LOG from . import Backend as _Backend from . import ManualMixin as _ManualMixin from . import PIDMixin as _PIDMixin +from . import TemperatureMixin as _TemperatureMixin class Register (object): @@ -124,9 +125,17 @@ class BoundedFloatRegister (FloatRegister): return super(BoundedFloatRegister, self).decode(value, **kwargs) -class MelcorBackend (_Backend, _ManualMixin, _PIDMixin): +class MelcorBackend (_Backend, _ManualMixin, _PIDMixin, _TemperatureMixin): """Temperature control backend for a Melcor MTCA Temperature Controller + + * PV: process temperature + * PV-units: degrees Celsius + * MV: controller current + * MV-units: amps """ + pv_units = 'C' + mv_units = 'A' + # Relative register addresses from back page of Melcor Manual. # Then I went through Chapter 6 tables looking for missing # registers. References are from Series MTCA Thermoelectric @@ -474,13 +483,13 @@ class MelcorBackend (_Backend, _ManualMixin, _PIDMixin): # Support for Backend methods - def get_temp(self): + def get_pv(self): return self._read('HIGH_RESOLUTION') - def get_ambient_temp(self): + def get_ambient_pv(self): return self._convert_F_to_C(self._read('AMBIENT_TEMPERATURE')) - def set_max_current(self, max): + def set_max_mv(self, max): """Set the max current in Amps 0.2 A is the default max current since it seems ok to use @@ -496,7 +505,7 @@ class MelcorBackend (_Backend, _ManualMixin, _PIDMixin): self._write('HIGH_POWER_LIMIT_BELOW', max_percent) self._max_current = max - def get_max_current(self): + def get_max_mv(self): percent = self._read('HIGH_POWER_LIMIT_ABOVE') above = percent/100. * self._spec_max_current percent = self._read('HIGH_POWER_LIMIT_BELOW') @@ -506,7 +515,7 @@ class MelcorBackend (_Backend, _ManualMixin, _PIDMixin): self._max_current = above return above - def get_current(self): + def get_mv(self): pout = self._read('PERCENT_OUTPUT') cur = self._spec_max_current * pout / 100.0 return cur @@ -529,7 +538,7 @@ class MelcorBackend (_Backend, _ManualMixin, _PIDMixin): # ManualMixin methods - def set_current(self, current): + def set_mv(self, current): if current > self._spec_max_current: raise ValueError('current {} exceeds spec maximum {}'.format( current, self._spec_max_current)) @@ -588,22 +597,21 @@ class MelcorBackend (_Backend, _ManualMixin, _PIDMixin): proportional = max_current/propband return (proportional, integral, derivative) - def set_cooling_gains(self, proportional=None, integral=None, - derivative=None): + def set_down_gains(self, proportional=None, integral=None, + derivative=None): self._set_gains( output=1, proportional=proportional, integral=integral, derivative=derivative) - def get_cooling_gains(self): + def get_down_gains(self): return self._get_gains(output=1) - def set_heating_gains(self, proportional=None, integral=None, - derivative=None): + def set_up_gains(self, proportional=None, integral=None, derivative=None): self._set_gains( output=2, proportional=proportional, integral=integral, derivative=derivative) - def get_heating_gains(self): + def get_up_gains(self): return self._get_gains(output=2) def get_feedback_terms(self): diff --git a/pypid/backend/test.py b/pypid/backend/test.py index 826e881..ba93814 100644 --- a/pypid/backend/test.py +++ b/pypid/backend/test.py @@ -28,39 +28,41 @@ from . import PIDMixin as _PIDMixin class TestBackend (_Backend, _ManualMixin, _PIDMixin): """Test backend for demonstrating `Controller` function - The underlying temperature decay model is exponential, which is - often refered to as "Newton's law of cooling" + The response function for a first-order plus dead time process + (aka FOPDT, or occasionaly KLT), one of control theory's classics, + is - dT/dt = h (Tbath - T) + G(s) = Y(s)/X(s) = K_p e^{-Ls} / (1 + T s) - where `h` is the transfer coefficient (with some scaling terms - brushed under the rug). To make the system more realistic, I've - also added a dead time, so temperatures returned by `.get_temp()` - actually correspond to the system temperature `dead_time` seconds - before the measurement was taken. Finally, there's a - `drive_coefficient` `d`, which gives the rate of temperature - change due to a applied driving current `I`, so + Going back to the time domain with the inverse Laplace transform: - dT(y)/dt = h (Tbath(t) - T(t)) + d I(t-L) + (1 + Ts)Y(s) = K_p e^{-Ls} X(s) Laplace transform rule: + y(t) + T dy(t)/dt = K_p e^{-Ls} X(s) f'(t) -> sF(s) + f(0) + y(t) + T dy(t)/dt = K_p x(t-L)u(t-L) f(t-a)u(t-a) -> e^{-as}F(s) + dy(t)/dt = -y(t)/T + K_p/T x(t-L)u(t-L) - This model is often refered to as a FOPDT (first-order plus dead - time) or KLT (K: static gain, L: time delay, T: time constant/lag) - model. Translating our model above into process-control jargon: + This may look more familiar to those of us more used to + differential equations than we are to transfer functions. There + is an exponential decay to y=0 with a time constant T, with an + external driver that has a gain of K_p and a lag (dead time) of L. - * Process variable y(t) corresponds to our T(t). - * Manipulated variable u(t) corresponds to our I(t). - * Process gain dy/du (often denoted K_p). For our parameters - above, K_p = dy/du = dT/dI = d/h. - * Process time constant (aka lag, often denoted tau or T; the - exponential decay timescale). For our parameters above, - tau = 1/h. - * Dead time (often denoted L or theta; the time delay between a - change system and that change being reflected in the process - variable). For our parameters above, L = dead_time. + The units should be: - The response function for a FOPDT process is + * K_p: y-units over x-units (e.g. K/amp for a thermoelectric controller) + * L, T: time (e.g. seconds) - G(s) = K_p e^{-Ls} / (1 + T s) + Just to flesh out the usual jargon, x is refered to as the + manipulated variable (MV) and y is the process variable (PV). + + Because transfer functions treats the initial conditions + (e.g. x(t=0)), we can add them back in if we want to create a more + general y(t) having the same dynamics. + + dy(t)/dt = (y_b(t) - y(t))/T + K_p/T x(t-L) + + where I've assumed that x(t<0)=0 in order to drop the Heaviside + step function. Now instead of decaying to zero in the absence of + a driving signal, y(t) will decay to a bath value y_b(t). For interesting experimental evidence of exponential cooling, see Kaliszan et al., "Verification of the exponential model of body @@ -69,39 +71,41 @@ class TestBackend (_Backend, _ManualMixin, _PIDMixin): http://ep.physoc.org/content/90/5/727.long September 1, 2005 Experimental Physiology, 90, 727-738. """ - def __init__(self, bath=20, transfer_coefficient=0.1, - drive_coefficient=1., max_current=1., dead_time=1., - process_period=0.01, log_stream=None): + def __init__(self, bath=0., process_gain=1., dead_time=1., + decay_time=1., max_mv=1., process_period=0.01, + log_stream=None): """ bath : float - bath (ambient) temperature in degrees Celsius - transfer_coefficient : float - between the system and the bath, in inverse seconds - drive_coefficient : float - for the applied current, in degrees Celsius per amp - max_current : float - maximum current in amps + bath (ambient) process variable in PV units + process_gain : float + gain between manipulated variable MV and PV, in PV-units/MV-units dead_time : float - time lag in seconds between an internal system temperature - and the corresponding `.get_temp()` reading + time lag between a PV and the corresponding MV response. + decay_time : float + exponential decay time constant for the undriven PV + max_mv : float + maximum MV in MV-units process_period : float - time in seconds between process-thread temperature updates + time between process-thread PV updates + + All times (`dead_time`, `decay_time`, and `process_period`) + should be in the same units (e.g. seconds). """ - self._bath = bath - self._transfer_coefficient = transfer_coefficient - self._drive_coefficient = drive_coefficient - self._max_current = max_current + self._bath = self._setpoint = bath + self._K = process_gain + self._L = dead_time + self._T = decay_time + self._max_mv = max_mv self._dead_periods = int(dead_time/process_period) self._process_period = process_period self._log_stream = log_stream - self._setpoint = 0 self._i_term = self._d_term = 0 - self._p_cool = self._d_cool = 0 - self._p_heat = self._d_heat = 0 - self._i_cool = self._i_heat = float('inf') - self._manual_current = 0 + self._p_down = self._d_down = 0 + self._p_up = self._d_up = 0 + self._i_down = self._i_up = float('inf') + self._manual_mv = 0 self._mode = 'PID' - self._temperatures = [bath]*(self._dead_periods+1) + self._PVs = [bath]*(self._dead_periods+1) self._start_process_thread() def cleanup(self): @@ -120,70 +124,69 @@ class TestBackend (_Backend, _ManualMixin, _PIDMixin): def _run_process(self): if self._log_stream: line = '\t'.join(( - 'time', 'setpoint', 'process temperature', - 'measured temperature', 'dT_bath', 'dT_drive', 'current', - 'intergal', 'derivative')) + 'time', 'SP', 'PV_int', 'PV', 'dPV_bath', 'dPV_drive', + 'MV', 'intergal', 'derivative')) self._log_stream.write('#{}\n'.format(line)) dt = self._process_period next_time = _time.time() + dt while not self._stop_process: - T = self._temperatures[-1] - dT_bath = self._transfer_coefficient * (self._bath - T) - current = self.get_current(_increment_i_term=True) - dT_drive = self._drive_coefficient * current + PV = self._PVs[-1] + dPV_bath = (self._bath - PV)/self._T * dt + MV = self.get_mv(_increment_i_term=True) + dPV_drive = self._K * MV / self._T * dt if self._log_stream: line = '\t'.join(str(x) for x in ( - _time.time(), self._setpoint, T, self.get_temp(), dT_bath*dt, - dT_drive*dt, current, self._i_term, self._d_term)) + _time.time(), self._setpoint, PV, self.get_pv(), + dPV_bath, dPV_drive, MV, self._i_term, self._d_term)) self._log_stream.write(line + '\n') - T += (dT_bath + dT_drive) * dt - self._temperatures.pop(0) - self._temperatures.append(T) + PV += dPV_bath + dPV_drive + self._PVs.pop(0) + self._PVs.append(PV) s = next_time - _time.time() if s > 0: _time.sleep(s) next_time += dt - def _limited_current(self, current): - if current > self._max_current: - #_LOG.debug('limiting current to maximum: {:n} -> {:n} amps'.format( - # current, self._max_current)) - return self._max_current - elif current < -self._max_current: - #_LOG.debug('limiting current to maximum: {:n} -> {:n} amps'.format( - # current, -self._max_current)) - return -self._max_current - return current - - def get_temp(self): - return self._temperatures[1] - - def get_ambient_temp(self): + def _limited_mv(self, mv): + if mv > self._max_mv: + #_LOG.debug('limiting mv to maximum: {:n} -> {:n} amps'.format( + # mv, self._max_mv)) + return self._max_mv + elif mv < -self._max_mv: + #_LOG.debug('limiting mv to maximum: {:n} -> {:n} amps'.format( + # mv, -self._max_mv)) + return -self._max_mv + return mv + + def get_pv(self): + return self._PVs[1] + + def get_ambient_pv(self): return self._bath - def set_max_current(self, max): - self._max_current = max + def set_max_mv(self, max): + self._max_mv = max - def get_max_current(self): - return self._max_current + def get_max_mv(self): + return self._max_mv - def get_current(self, _increment_i_term=True): + def get_mv(self, _increment_i_term=True): if self._mode == 'manual': - return self._manual_current + return self._manual_mv elif self._mode == 'PID': - T_pref,T = self._temperatures[:2] - dT_s = (self._setpoint - T) - if T > self._setpoint: - p,i,d = self._p_cool, self._i_cool, self._d_cool + PV_prev,PV = self._PVs[:2] + dPV_s = (self._setpoint - PV) + if PV > self._setpoint: + p,i,d = self._p_down, self._i_down, self._d_down else: - p,i,d = self._p_heat, self._i_heat, self._d_heat - dT_t = T - T_pref + p,i,d = self._p_up, self._i_up, self._d_up + dPV_t = PV - PV_prev dt = self._process_period if _increment_i_term is True: - self._i_term += dT_s * dt - self._d_term = -dT_t / dt # = de(t)/dt with constant setpoint - return self._limited_current( - p*(dT_s + self._i_term/i + d*self._d_term)) + self._i_term += dPV_s * dt + self._d_term = -dPV_t / dt # = de(t)/dt with constant setpoint + return self._limited_mv( + p*(dPV_s + self._i_term/i + d*self._d_term)) raise ValueError(self._mode) def get_modes(self): @@ -197,8 +200,8 @@ class TestBackend (_Backend, _ManualMixin, _PIDMixin): # ManualMixin methods - def set_current(self, current): - self._manual_current = self._limited_current(current) + def set_mv(self, mv): + self._manual_mv = self._limited_mv(mv) # PIDMixin methods @@ -208,32 +211,32 @@ class TestBackend (_Backend, _ManualMixin, _PIDMixin): def get_setpoint(self): return self._setpoint - def set_cooling_gains(self, proportional=None, integral=None, + def set_down_gains(self, proportional=None, integral=None, derivative=None): if proportional is not None: - self._p_cool = proportional + self._p_down = proportional if integral is not None: - self._i_cool = integral + self._i_down = integral if derivative is not None: - self._d_cool = derivative + self._d_down = derivative - def get_cooling_gains(self): - return (self._p_cool, self._i_cool, self._d_cool) + def get_down_gains(self): + return (self._p_down, self._i_down, self._d_down) - def set_heating_gains(self, proportional=None, integral=None, + def set_up_gains(self, proportional=None, integral=None, derivative=None): if proportional is not None: - self._p_heat = proportional + self._p_up = proportional if integral is not None: - self._i_heat = integral + self._i_up = integral if derivative is not None: - self._d_heat = derivative + self._d_up = derivative - def get_heating_gains(self): - return (self._p_heat, self._i_heat, self._d_heat) + def get_up_gains(self): + return (self._p_up, self._i_up, self._d_up) def get_feedback_terms(self): - return (self.get_current(), self._setpoint - self.get_temp(), + return (self.get_mv(), self._setpoint - self.get_pv(), self._i_term, self._d_term) def clear_integral_term(self): diff --git a/pypid/controller.py b/pypid/controller.py index 43052ee..0a9677e 100644 --- a/pypid/controller.py +++ b/pypid/controller.py @@ -26,40 +26,40 @@ from numpy import linspace as _linspace from numpy import log as _log from scipy.interpolate import interp1d as _interp1d -from hooke.util.fit import ModelFitter as _ModelFitter - from . import LOG as _LOG +from .fit import ModelFitter as _ModelFitter class Controller (object): - """PID temperature control frontend. + """PID control frontend. backend: pypid.backend.Backend instance backend driving your particular harware setpoint: float - initial setpoint in degrees Celsius + initial setpoint in PV-units min: float - minimum temperature in degrees Celsius (for sanity checks) + minimum PV in PV-units (for sanity checks) max: float - maximum temperature in degrees Celsius (for sanity checks) + maximum PV in PV-units (for sanity checks) """ - def __init__(self, backend, setpoint=20.0, min=5.0, max=50.0): + def __init__(self, backend, setpoint=0.0, min=-float('inf'), + max=float('inf')): self._backend = backend self._setpoint = setpoint - self._min = min - self._max = max + self._min_pv = min + self._max_pv = max # basic user interface methods - def get_temp(self): - """Return the current process temperature in degrees Celsius + def get_pv(self): + """Return the current process variable in PV-units We should expose this to users, so they don't need to go mucking about in `._backend`. """ - return self._backend.get_temp() + return self._backend.get_pv() - def set_temp(self, setpoint, **kwargs): + def set_pv(self, setpoint, **kwargs): """Change setpoint to `setpoint` and wait for stability """ self._backend.set_setpoint(setpoint) @@ -67,15 +67,15 @@ class Controller (object): def wait_for_stability(self, setpoint, tolerance=0.3, time=10., timeout=-1, sleep_time=0.1, return_data=False): - """Wait until the temperature is sufficiently stable + """Wait until the PV is sufficiently stable setpoint : float - target temperature in degrees C + target PV in PV-units tolerance : float - maximum allowed deviation from `setpoint` in dregrees C + maximum allowed deviation from `setpoint` in PV-units time : float - time the temperature must remain in the allowed region - before the signal is delared "stable" + time the PV must remain in the allowed region before the + signal is delared "stable" timeout : float maximum time to wait for stability. Set to -1 to never timeout. @@ -83,16 +83,16 @@ class Controller (object): time in seconds to sleep between reads to avoid an overly-busy loop return_data : boolean - if true, also return a list of `(timestamp, temp)` tuples + if true, also return a list of `(timestamp, PV)` tuples read while waiting - Read the temperature every `sleep_time` seconds until the - temperature has remained within `tolerance` of `setpoint` for - `time`. If the stability criteria are met, return `True` - (stable). If `timeout` seconds pass before the criteria are - met, return `False` (not stable). + Read the PV every `sleep_time` seconds until the PV has + remained within `tolerance` of `setpoint` for `time`. If the + stability criteria are met, return `True` (stable). If + `timeout` seconds pass before the criteria are met, return + `False` (not stable). """ - _LOG.debug(('wait until the temperature is stable at {:n} +/- {:n} C ' + _LOG.debug(('wait until the PV is stable at {:n} +/- {:n} C ' 'for {:n} seconds').format(setpoint, tolerance, time)) stable = False if return_data: @@ -104,19 +104,20 @@ class Controller (object): else: timeout_time = start_time + timeout while True: - T = self.get_temp() - in_range = abs(T - setpoint) < tolerance + PV = self.get_pv() + in_range = abs(PV - setpoint) < tolerance t = _time.time() if return_data: - data.append((t, T)) + data.append((t, PV)) if in_range: if t >= stable_time: - _LOG.debug('temperature is stable') + _LOG.debug('process variable is stable') stable = True break # in range for long enough else: stable_time = t + time # reset target time if timeout_time and t > timeout_time: + _LOG.debug('process variable is not stable') break # timeout _time.sleep(sleep_time) if return_data: @@ -127,26 +128,26 @@ class Controller (object): return self.wait_for_stability( setpoint=setpoint, time=time, timeout=time, **kwargs) - def estimate_temperature_sensitivity(self, num_temps=10, sleep_time=0.1, - max_repeats=10): - temps = [] - last_temp = None + def estimate_pv_sensitivity(self, values=10, sleep_time=0.1, + max_repeats=10): + PVs = [] + last_PV = None repeats = 0 while True: - temp = self.get_temp() + PV = self.get_pv() if repeats == max_repeats: - last_temp = None - if temp == last_temp: + last_PV = None + if PV == last_PV: repeats += 1 else: - temps.append(temp) - if len(temps) > num_temps: + PVs.append(PV) + if len(PVs) > values: break repeats = 0 - last_temp = temp + last_PV = PV _time.sleep(sleep_time) - temps = _array(temps) - return temps.std() + PVs = _array(PVs) + return PVs.std() # debugging methods @@ -161,21 +162,21 @@ class Controller (object): """ c = self._backend.get_current() pid,prop,ntgrl,deriv = self._backend.get_feedback_terms() - T = self.get_temp() - Tset = self._backend.get_setpoint() - if T > Tset: # cooling - p,i,d = self._backend.get_cooling_gains() - else: # heating - p,i,d = self._backend.get_heating_gains() + PV = self.get_pv() + SP = self._backend.get_setpoint() + if PV > SP: + p,i,d = self._backend.get_down_gains() + else: + p,i,d = self._backend.get_up_gains() _LOG.info(('pid(read) {:n} =? sum(calc from terms) {:n} ' '=? cur(read) {:n} A').format(pid, prop+ntgrl+deriv, c)) _LOG.info('read: p {:n}, i {:n}, d {:n}'.format(p,i,d)) - _LOG.info('calc: p {:n}'.format(p*(Tset-T))) + _LOG.info('calc: p {:n}'.format(p*(SP-PV))) # tuning experiments and data processing - def get_step_response(self, current_a, current_b, - sleep_time=0.1, stable_time=10., **kwargs): + def get_step_response(self, mv_a, mv_b, sleep_time=0.1, stable_time=10., + **kwargs): "Measure a step response for later analysis" _LOG.debug('measure step response') if 'time' in kwargs: @@ -184,38 +185,38 @@ class Controller (object): kwargs['sleep_time'] = sleep_time mode = self._backend.get_mode() if mode == 'manual': - manual_current = self._backend.get_current() + manual_mv = self._backend.get_mv() else: self._backend.set_mode('manual') _LOG.debug('set first current and wait for stability') - self._backend.set_current(current_a) - temp_a = self.get_temp() - while not self.is_stable(temp_a, **kwargs): - temp_a = self.get_temp() - _LOG.debug('stabilized at {:n} C with {:n} amps'.format( - temp_a, current_a)) - _LOG.debug('set second current and wait for stability') + self._backend.set_mv(mv_a) + pv_a = self.get_pv() + while not self.is_stable(pv_a, **kwargs): + pv_a = self.get_pv() + _LOG.debug('stabilized at {:n} {} with {:n} {}'.format( + pv_a, self._backend.pv_units, mv_a, self._backend.mv_units)) + _LOG.debug('set second MV and wait for stability') data = [] start_time = _time.time() - self._backend.set_current(current_b) - temp_b = temp_a + self._backend.set_mv(mv_b) + pv_b = pv_a while True: - stable,d = self.is_stable(temp_b, return_data=True, **kwargs) + stable,d = self.is_stable(pv_b, return_data=True, **kwargs) data.extend(d) - temp_b = self.get_temp() + pv_b = self.get_pv() if stable: break - _LOG.debug('stabilized at {:n} C with {:n} amps'.format( - temp_b, current_b)) + _LOG.debug('stabilized at {:n} {} with {:n} {}'.format( + pv_b, self._backend.pv_units, mv_b, self._backend.mv_units)) if mode == 'manual': - self._backend.set_current(manual_current) + self._backend.set_mv(manual_mv) else: self._backend.set_mode(mode) return data @staticmethod - def analyze_step_response(step_response, current_shift): - rates = [(Tb-Ta)/(tb-ta) for ((ta,Ta),(tb,Tb)) + def analyze_step_response(step_response, mv_shift): + rates = [(PVb-PVa)/(tb-ta) for ((ta,PVa),(tb,PVb)) in zip(step_response, step_response[1:])] # TODO: averaging filter? max_rate_i = max_rate = 0 @@ -224,35 +225,35 @@ class Controller (object): max_rate_i = i max_rate = abs(rate) max_rate_time,max_rate_temp = step_response[max_rate_i] # TODO: avg i and i+1? - time_a,temp_a = step_response[0] + time_a,PV_a = step_response[0] max_rate_time -= time_a - dead_time = max_rate_time - (max_rate_temp - temp_a) / max_rate - t_data = _array([t for t,T in step_response[max_rate_i:]]) - T_data = _array([T for t,T in step_response[max_rate_i:]]) - model = ExponentialModel(T_data, info={'x data (s)': t_data}) - tau,T0,T8 = model.fit() - gain = (T8 - temp_a) / current_shift - return (gain, dead_time, tau, max_rate) + dead_time = max_rate_time - (max_rate_temp - PV_a) / max_rate + t_data = _array([t for t,PV in step_response[max_rate_i:]]) + PV_data = _array([PV for t,PV in step_response[max_rate_i:]]) + model = ExponentialModel(PV_data, info={'x data (s)': t_data}) + decay_time,PV0,PV8 = model.fit() + process_gain = (PV8 - PV_a) / mv_shift + return (process_gain, dead_time, decay_time, max_rate) def get_bang_bang_response(self, dead_band=0.8, num_oscillations=10, max_dead_band_time=30, sleep_time=0.1): - orig_cool_gains = self._backend.get_cooling_gains() - orig_heat_gains = self._backend.get_heating_gains() + orig_down_gains = self._backend.get_down_gains() + orig_up_gains = self._backend.get_up_gains() _LOG.debug('measure bang-bang response') mode = self._backend.get_mode() if mode != 'PID': self._backend.set_mode('PID') i=0 setpoint = self._backend.get_setpoint() - self._backend.set_cooling_gains(float('inf'), float('inf'), 0) - self._backend.set_heating_gains(float('inf'), float('inf'), 0) + self._backend.set_down_gains(float('inf'), float('inf'), 0) + self._backend.set_up_gains(float('inf'), float('inf'), 0) start_time = _time.time() - temp = self.get_temp() - heat_first = self._is_heating( - temp=temp, setpoint=setpoint, dead_band=dead_band) + pv = self.get_pv() + under_first = self._is_under( + pv=pv, setpoint=setpoint, dead_band=dead_band) _LOG.debug('wait to exit dead band') t = start_time - while heat_first is None: + while under_first is None: if t - start_time > max_dead_band_time: msg = 'still in dead band after after {:n} seconds'.format( max_dead_band_time) @@ -260,54 +261,54 @@ class Controller (object): raise ValueError(msg) _time.sleep(sleep_time) t = _time.time() - temp = t.get_temp() - heat_first = self._is_heating( - temp=temp, setpoint=setpoint, dead_band=dead_band) + pv = t.get_pv() + under_first = self._is_under( + pv=pv, setpoint=setpoint, dead_band=dead_band) _LOG.debug('read {:d} oscillations'.format(num_oscillations)) data = [] - heating = heat_first + under = under_first while i < num_oscillations*2 + 1: t = _time.time() - temp = self.get_temp() + pv = self.get_pv() # drop first half cycle (possibly includes ramp to setpoint) if i > 0: - data.append((t, temp)) - is_heating = self._is_heating( + data.append((t, pv)) + _under = self._is_under( temp=temp, setpoint=setpoint, dead_band=dead_band) - if heating is True and is_heating is False: - _LOG.debug('transition to cooling (i={:d})'.format(i)) - heating = False + if _under is True and under is False: + _LOG.debug('transition to PV < SP (i={:d})'.format(i)) + under = _under i += 1 - elif heating is False and is_heating is True: - _LOG.debug('transition to heating (i={:d})'.format(i)) - heating = True + elif under is False and is_under is True: + _LOG.debug('transition to PV > SP (i={:d})'.format(i)) + under = _under i += 1 _time.sleep(sleep_time) - self._backend.set_cooling_gains(*orig_cool_gains) - self._backend.set_heating_gains(*orig_heat_gains) + self._backend.set_down_gains(*orig_down_gains) + self._backend.set_up_gains(*orig_up_gains) if mode != 'PID': self._backend.set_mode(mode) return data @staticmethod def analyze_bang_bang_response(bang_bang_response): - t_data = _array([t for t,T in bang_bang_response]) - T_data = _array([T for t,T in bang_bang_response]) - amp = (T_data.max() - T_data.min()) / 2 - freq = Controller._get_frequency(x_data=t_data, y_data=T_data) + t_data = _array([t for t,PV in bang_bang_response]) + PV_data = _array([PV for t,PV in bang_bang_response]) + amp = (PV_data.max() - PV_data.min()) / 2 + freq = Controller._get_frequency(x_data=t_data, y_data=PV_data) period = 1./freq return (amp, period) def get_ultimate_cycle_response(self, proportional, period): - orig_cool_gains = self._backend.get_cooling_gains() - orig_heat_gains = self._backend.get_heating_gains() + orig_down_gains = self._backend.get_down_gains() + orig_up_gains = self._backend.get_up_gains() _LOG.debug('measure ultimate cycle response') mode = self._backend.get_mode() if mode != 'PID': self._backend.set_mode('PID') # TODO... - self._backend.set_cooling_gains(*orig_cool_gains) - self._backend.set_heating_gains(*orig_heat_gains) + self._backend.set_down_gains(*orig_down_gains) + self._backend.set_up_gains(*orig_up_gains) if mode != 'PID': self._backend.set_mode(mode) return data @@ -318,104 +319,10 @@ class Controller (object): ultimate_cycle_response) return period - # tuning rules - - @staticmethod - def ziegler_nichols_step_response(gain, dead_time, tau, mode='PID'): - r = dead_time / tau - if r < 0.1 or r > 1: - _LOG.warn(('Ziegler-Nichols not a good idea when ' - 'dead-time/tau = {:n}').format(r)) - pkern = tau/(gain*dead_time) - if mode == 'P': - return (pkern, float('inf'), 0) - elif mode == 'PI': - return (0.9*pkern, 3.3*dead_time, 0) - elif mode == 'PID': - return (1.2*pkern, 2*dead_time, dead_time/2.) - raise ValueError(mode) - - def ziegler_nichols_bang_bang_response(self, amplitude, period, - max_current=None, mode='PID'): - if max_current is None: - max_current = self._backend.get_max_current() - return self._ziegler_nichols_bang_bang_response( - amplitude, period, max_current=max_current, mode=mode) - - @staticmethod - def _ziegler_nichols_bang_bang_response(amplitude, period, - max_current, mode='PID'): - """ - amplitude : float - center-to-peak amplitude (in K) of bang-bang oscillation - period : float - period (in seconds) of the critical oscillation - max_current : float - "bang" current (in amps) - """ - proportional = float(max_current)/amplitude - period = float(period) - if mode == 'P': - return (proportional/2, float('inf'), 0) - elif mode == 'PI': - return (proportional/3, 2*period, 0) - elif mode == 'PID': - return (proportional/2, period, period/4) - raise ValueError(mode) - - def ziegler_nichols_ultimate_cycle_response(self, proportional, period): - """ - proportional : float - critical P-only gain (ultimate gain, for sustained oscillation) - period : float - period (in seconds) of the critical oscillation - - Microstar Laboratories has a `nice analysis`_ on ZN - limitations, which points out that ZN-tuning assumes your - system has the FOPDT transfer function (see `TestBackend` for - details). - - .. _nice analysis: http://www.mstarlabs.com/control/znrule.html - """ - if mode == 'P': - return (0.50*proportional, float('inf'), 0) - elif mode == 'PI': - return (0.45*proportional, period/1.2, 0) - elif mode == 'PID': - return (0.60*proportional, period/2, period/8) - raise ValueError(mode) - - @staticmethod - def cohen_coon_step_response(gain, dead_time, tau, mode='PID'): - r = dead_time / tau - pkern = tau/(gain*dead_time) - if mode == 'P': - return (pkern*(1+r/3.), float('inf'), 0) - elif mode == 'PI': - return (pkern*(0.9+r/12.), (30.+3*r)/(9+20*r)*dead_time, 0) - elif mode == 'PD': # double check - return (1.24*pkern*(1+0.13*tf), float('inf'), - (0.27-0.36*t)/(1-0.87*t)*dead_time) - elif mode == 'PID': - return (pkern*(4./3+r/4.), (32.-6*r)/(13.-8*r)*dead_time, - 4/(11.+2*r)*dead_time) - raise ValueError(mode) - - @staticmethod - def wang_juang_chan_step_response(gain, dead_time, tau, mode='PID'): - """Wang-Juang-Chan tuning - """ - K,L,T = (gain, dead_time, tau) - if mode == 'PID': - return ((0.7303+0.5307*T/L)*(T+0.5*L)/(K*(T+L)), - T + 0.5*L, - 0.5*L*T / (T + 0.5*L)) - raise ValueError(mode) - # utility methods def _wait_until_close(self, setpoint, tolerance=0.3, sleep_time=0.1): - while abs(self.get_temp() - setpoint) > tolerance: + while abs(self.get_pv() - setpoint) > tolerance: _time.sleep(sleep_time) def _time_function(self, function, args=(), kwargs=None, count=10): @@ -428,28 +335,28 @@ class Controller (object): stop = _time.time() return float(stop-start)/count - def _is_heating(self, temp=None, setpoint=None, dead_band=None): - if temp is None: - temp = self.get_temp() + def _is_under(self, pv=None, setpoint=None, dead_band=None): + if pv is None: + pv = self.get_pv() if setpoint is None: - temp = self._backend.get_setpoint() - low_temp = high_temp = setpoint + setpoint = self._backend.get_setpoint() + low_pv = high_pv = setpoint if dead_band: - low_temp -= dead_band - high_temp += dead_band - if temp < low_temp: - return False - elif temp > high_temp: + low_pv -= dead_band + high_pv += dead_band + if pv < low_pv: return True + elif pv > high_pv: + return False return None - def _select_parameter(self, heating_result=None, cooling_result=None, + def _select_parameter(self, under_result=None, over_result=None, dead_band_result=None, **kwargs): - heating = self._is_heating(**kwargs) - if heating: - return heating_result - elif heating is False: - return cooling_result + under = self._is_under(**kwargs) + if under: + return under_result + elif under is False: + return over_result return dead_band_result @staticmethod @@ -483,8 +390,8 @@ class Controller (object): if value > max: raise ValueError('%g > %g' % (value, max)) - def _check_temp(temp): - self._check_range(temp, self._min, self._max) + def _check_pv(pv): + self._check_rangee(pv, self._min_pv, self._max_pv) class ExponentialModel (_ModelFitter): diff --git a/pypid/fit.py b/pypid/fit.py new file mode 100644 index 0000000..6267341 --- /dev/null +++ b/pypid/fit.py @@ -0,0 +1,334 @@ +# Copyright (C) 2010-2011 W. Trevor King +# +# This file is part of Hooke. +# +# Hooke is free software: you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# Hooke is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General +# Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with Hooke. If not, see +# . + +"""Provide :class:`ModelFitter` to make arbitrary model fitting easy. +""" + +from numpy import arange, ndarray +from scipy import __version__ as _scipy_version +from scipy.optimize import leastsq + +_strings = _scipy_version.split('.') +# Don't convert third string to an integer in case of (for example) '0.7.2rc3' +_SCIPY_VERSION = (int(_strings[0]), int(_strings[1]), _strings[2]) +del _strings + + +class PoorFit (ValueError): + pass + +class ModelFitter (object): + """A convenient wrapper around :func:`scipy.optimize.leastsq`. + + TODO: look into :mod:`scipy.odr` as an alternative fitting + algorithm (minimizing the sum of squares of orthogonal distances, + vs. minimizing y distances). + + Parameters + ---------- + d_data : array_like + Deflection data to be analyzed for the contact position. + info : + Store any extra information useful inside your overridden + methods. + rescale : boolean + Rescale parameters so the guess for each is 1.0. Also rescale + the data so data.std() == 1.0. + + Examples + -------- + + >>> from pprint import pprint + >>> from Queue import Queue + >>> import numpy + + You'll want to subclass `ModelFitter`, overriding at least + `.model` and potentially the parameter and scale guessing + methods. + + >>> class LinearModel (ModelFitter): + ... '''Simple linear model. + ... + ... Levenberg-Marquardt is not how you want to solve this problem + ... for real systems, but it's a simple test case. + ... ''' + ... def model(self, params): + ... '''A linear model. + ... + ... Notes + ... ----- + ... .. math:: y = p_0 x + p_1 + ... ''' + ... p = params # convenient alias + ... self._model_data[:] = p[0]*arange(len(self._data)) + p[1] + ... return self._model_data + ... def guess_initial_params(self, outqueue=None): + ... return [float(self._data[-1] - self._data[0])/len(self._data), + ... self._data[0]] + ... def guess_scale(self, params, outqueue=None): + ... slope_scale = 0.1 + ... if params[1] == 0: + ... offset_scale = 1 + ... else: + ... offset_scale = 0.1*self._data.std()/abs(params[1]) + ... if offset_scale == 0: # data is completely flat + ... offset_scale = 1. + ... return [slope_scale, offset_scale] + >>> data = 20*numpy.sin(arange(1000)) + 7.*arange(1000) - 33.0 + >>> m = LinearModel(data) + >>> outqueue = Queue() + >>> slope,offset = m.fit(outqueue=outqueue) + >>> info = outqueue.get(block=False) + >>> pprint(info) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE, +REPORT_UDIFF + {'active fitted parameters': array([ 6.999..., -32.889...]), + 'active parameters': array([ 6.999..., -32.889...]), + 'convergence flag': ..., + 'covariance matrix': array([[ 1.199...e-08, -5.993...e-06], + [ -5.993...e-06, 3.994...e-03]]), + 'data scale factor': 1.0, + 'fitted parameters': array([ 6.999..., -32.889...]), + 'info': {'fjac': array([[...]]), + 'fvec': array([...]), + 'ipvt': array([1, 2]), + 'nfev': 7, + 'qtf': array([...])}, + 'initial parameters': [6.992..., -33.0], + 'message': '...relative error between two consecutive iterates is at most 0.000...', + 'rescaled': False, + 'scale': [0.100..., 6.123...]} + + We round the outputs to protect the doctest against differences in + machine rounding during computation. We expect the values to be close + to the input settings (slope 7, offset -33). + + >>> print '%.3f' % slope + 7.000 + >>> print '%.3f' % offset + -32.890 + + The offset is a bit off because, the range is not a multiple of + :math:`2\pi`. + + We could also use rescaled parameters: + + >>> m = LinearModel(data, rescale=True) + >>> outqueue = Queue() + >>> slope,offset = m.fit(outqueue=outqueue) + >>> print '%.3f' % slope + 7.000 + >>> print '%.3f' % offset + -32.890 + + Test single-parameter models: + + >>> class SingleParameterModel (LinearModel): + ... '''Simple linear model. + ... ''' + ... def model(self, params): + ... return super(SingleParameterModel, self).model([params[0], 0.]) + ... def guess_initial_params(self, outqueue=None): + ... return super(SingleParameterModel, self + ... ).guess_initial_params(outqueue)[:1] + ... def guess_scale(self, params, outqueue=None): + ... return super(SingleParameterModel, self + ... ).guess_scale([params[0], 0.], outqueue)[:1] + >>> data = 20*numpy.sin(arange(1000)) + 7.*arange(1000) + >>> m = SingleParameterModel(data) + >>> slope, = m.fit(outqueue=outqueue) + >>> print '%.3f' % slope + 7.000 + """ + def __init__(self, *args, **kwargs): + self.set_data(*args, **kwargs) + + def set_data(self, data, info=None, rescale=False): + self._data = data + self._model_data = ndarray(shape=data.shape, dtype=data.dtype) + self.info = info + self._rescale = rescale + if rescale == True: + for x in [data.std(), data.max()-data.min(), abs(data.max()), 1.0]: + if x != 0: + self._data_scale_factor = x + break + else: + self._data_scale_factor = 1.0 + + def model(self, params): + p = params # convenient alias + self._model_data[:] = arange(len(self._data)) + raise NotImplementedError + + def guess_initial_params(self, outqueue=None): + return [] + + def guess_scale(self, params, outqueue=None): + """Guess the problem length scale in each parameter dimension. + + Notes + ----- + From the :func:`scipy.optimize.leastsq` documentation, `diag` + (which we refer to as `scale`, sets `factor * || diag * x||` + as the initial step. If `x == 0`, then `factor` is used + instead (from `lmdif.f`_):: + + do 70 j = 1, n + wa3(j) = diag(j)*x(j) + 70 continue + xnorm = enorm(n,wa3) + delta = factor*xnorm + if (delta .eq. zero) delta = factor + + For most situations then, you don't need to do anything fancy. + The default scaling (if you don't set a scale) is:: + + c on the first iteration and if mode is 1, scale according + c to the norms of the columns of the initial jacobian. + + (i.e. `diag(j) = acnorm(j)`, where `acnorm(j) is the norm of the `j`th column + of the initial Jacobian). + + .. _lmdif.f: http://www.netlib.org/minpack/lmdif.f + """ + return None + + def residual(self, params): + if self._rescale == True: + params = [p*s for p,s in zip(params, self._param_scale_factors)] + residual = self._data - self.model(params) + if False: # fit debugging + if not hasattr(self, '_i_'): + self._i_ = 0 + self._data.tofile('data.%d' % self._i_, sep='\n') + self.model(params).tofile('model.%d' % self._i_, sep='\n') + self._i_ += 1 + if self._rescale == True: + residual /= self._data_scale_factor + return residual + + def fit(self, initial_params=None, scale=None, outqueue=None, **kwargs): + """ + Parameters + ---------- + initial_params : iterable or None + Initial parameter values for residual minimization. If + `None`, they are estimated from the data using + :meth:`guess_initial_params`. + scale : iterable or None + Parameter length scales for residual minimization. If + `None`, they are estimated from the data using + :meth:`guess_scale`. + outqueue : Queue or None + If given, will be used to output the data and fitted model + for user verification. + kwargs : + Any additional arguments are passed through to `leastsq`. + """ + if initial_params == None: + initial_params = self.guess_initial_params(outqueue) + if scale == None: + scale = self.guess_scale(initial_params, outqueue) + if scale != None: + assert min(scale) > 0, scale + if self._rescale == True: + self._param_scale_factors = initial_params + for i,s in enumerate(self._param_scale_factors): + if s == 0: + self._param_scale_factors[i] = 1.0 + active_params = [p/s for p,s in zip(initial_params, + self._param_scale_factors)] + else: + active_params = initial_params + params,cov,info,mesg,ier = leastsq( + func=self.residual, x0=active_params, full_output=True, + diag=scale, **kwargs) + if len(initial_params) == 1 and _SCIPY_VERSION < (0, 8, '0'): + # params is a float for scipy < 0.8.0. Convert to list. + params = [params] + if self._rescale == True: + active_params = params + params = [p*s for p,s in zip(params, + self._param_scale_factors)] + else: + active_params = params + if outqueue != None: + outqueue.put({ + 'rescaled': self._rescale, + 'initial parameters': initial_params, + 'active parameters': active_params, + 'scale': scale, + 'data scale factor': self._data_scale_factor, + 'active fitted parameters': active_params, + 'fitted parameters': params, + 'covariance matrix': cov, + 'info': info, + 'message': mesg, + 'convergence flag': ier, + }) + return params + +# Example ORD code from the old fit.py +# def dist(px,py,linex,liney): +# distancesx=scipy.array([(px-x)**2 for x in linex]) +# minindex=numpy.argmin(distancesx) +# print px, linex[0], linex[-1] +# return (py-liney[minindex])**2 +# +# +# def f_wlc(params,x,T=T): +# ''' +# wlc function for ODR fitting +# ''' +# lambd,pii=params +# Kb=(1.38065e-23) +# therm=Kb*T +# y=(therm*pii/4.0) * (((1-(x*lambd))**-2) - 1 + (4*x*lambd)) +# return y +# +# def f_wlc_plfix(params,x,pl_value=pl_value,T=T): +# ''' +# wlc function for ODR fitting +# ''' +# lambd=params +# pii=1/pl_value +# Kb=(1.38065e-23) +# therm=Kb*T +# y=(therm*pii/4.0) * (((1-(x*lambd))**-2) - 1 + (4*x*lambd)) +# return y +# +# #make the ODR fit +# realdata=scipy.odr.RealData(xchunk_corr_up,ychunk_corr_up) +# if pl_value: +# model=scipy.odr.Model(f_wlc_plfix) +# o = scipy.odr.ODR(realdata, model, p0_plfix) +# else: +# model=scipy.odr.Model(f_wlc) +# o = scipy.odr.ODR(realdata, model, p0) +# +# o.set_job(fit_type=2) +# out=o.run() +# fit_out=[(1/i) for i in out.beta] +# +# #Calculate fit errors from output standard deviations. +# #We must propagate the error because we fit the *inverse* parameters! +# #The error = (error of the inverse)*(value**2) +# fit_errors=[] +# for sd,value in zip(out.sd_beta, fit_out): +# err_real=sd*(value**2) +# fit_errors.append(err_real) + diff --git a/pypid/rules.py b/pypid/rules.py new file mode 100644 index 0000000..35a1de4 --- /dev/null +++ b/pypid/rules.py @@ -0,0 +1,104 @@ +# Copyright (C) 2011 W. Trevor King +# +# This file is part of pypid. +# +# pypid is free software: you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. +# +# pypid is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with pypid. If not, see +# . + +"Assorted PID tuning rules" + + +def ziegler_nichols_step_response(process_gain, dead_time, decay_time, mode='PID'): + K,L,T = (process_gain, dead_time, decay_time) + r = L / T + if r > 1: + _LOG.warn(('Ziegler-Nichols not a good idea when ' + 'dead-time/decay_time = {:n}').format(r)) + pkern = 1/(K*r) + if mode == 'P': + return (pkern, float('inf'), 0) + elif mode == 'PI': + return (0.9*pkern, 3.3*L, 0) + elif mode == 'PID': + return (1.2*pkern, 2*L, L/2.) + raise ValueError(mode) + +def ziegler_nichols_bang_bang_response(amplitude, period, max_current, + mode='PID'): + """ + amplitude : float + center-to-peak amplitude (in K) of bang-bang oscillation + period : float + period (in seconds) of the critical oscillation + max_current : float + "bang" current (in amps) + """ + proportional = float(max_current)/amplitude + period = float(period) + if mode == 'P': + return (proportional/2, float('inf'), 0) + elif mode == 'PI': + return (proportional/3, 2*period, 0) + elif mode == 'PID': + return (proportional/2, period, period/4) + raise ValueError(mode) + +def ziegler_nichols_ultimate_cycle_response(proportional, period): + """ + proportional : float + critical P-only process_gain (ultimate process_gain, for sustained oscillation) + period : float + period (in seconds) of the critical oscillation + + Microstar Laboratories has a `nice analysis`_ on ZN + limitations, which points out that ZN-tuning assumes your + system has the FOPDT transfer function (see `TestBackend` for + details). + + .. _nice analysis: http://www.mstarlabs.com/control/znrule.html + """ + if mode == 'P': + return (0.50*proportional, float('inf'), 0) + elif mode == 'PI': + return (0.45*proportional, period/1.2, 0) + elif mode == 'PID': + return (0.60*proportional, period/2, period/8) + raise ValueError(mode) + +def cohen_coon_step_response(process_gain, dead_time, decay_time, mode='PID'): + K,L,T = (process_gain, dead_time, decay_time) + r = L/T + pkern = 1/(K*r) + if mode == 'P': + return (pkern*(1+r/3.), float('inf'), 0) + elif mode == 'PI': + return (pkern*(0.9+r/12.), (30.+3*r)/(9+20*r)*dead_time, 0) + elif mode == 'PD': # double check + return (1.24*pkern*(1+0.13*tf), float('inf'), + (0.27-0.36*t)/(1-0.87*t)*dead_time) + elif mode == 'PID': + return (pkern*(4./3+r/4.), (32.-6*r)/(13.-8*r)*dead_time, + 4/(11.+2*r)*dead_time) + raise ValueError(mode) + +def wang_juang_chan_step_response(process_gain, dead_time, decay_time, mode='PID'): + """Wang-Juang-Chan tuning + """ + K,L,T = (process_gain, dead_time, decay_time) + if mode == 'PID': + return ((0.7303+0.5307*T/L)*(T+0.5*L)/(K*(T+L)), + T + 0.5*L, + 0.5*L*T / (T + 0.5*L)) + raise ValueError(mode) + diff --git a/pypid/test.py b/pypid/test.py index c3b69f3..8f114f6 100644 --- a/pypid/test.py +++ b/pypid/test.py @@ -21,6 +21,7 @@ import time as _time from . import LOG as _LOG +from . import rules as _rules from .backend import get_backend as _get_backend from controller import Controller as _Controller @@ -32,24 +33,24 @@ def test_backend(backend=None): backend = _get_backend('test')() try: sp = backend.get_setpoint() - _LOG.info('temperature = {:n} C'.format(backend.get_temp())) - _LOG.info('setpoint = {:n} C'.format(sp)) - _LOG.info('current = {:n} A'.format(backend.get_current())) + _LOG.info('PV = {:n} {}'.format(backend.get_pv(), backend.pv_units)) + _LOG.info('SP = {:n} {}'.format(sp, backend.pv_units)) + _LOG.info('MV = {:n} {}'.format(backend.get_mv(), backend.mv_units)) _set_and_check_setpoint(backend=backend, setpoint=5.0) - _check_max_current(backend=backend) + _check_max_mv(backend=backend) _set_and_check_setpoint(backend=backend, setpoint=50.0) - _check_max_current(backend=backend) + _check_max_mv(backend=backend) _set_and_check_setpoint(backend=backend, setpoint=sp) finally: if internal_backend: backend.cleanup() def _set_and_check_setpoint(backend, setpoint): - _LOG.info('setting setpoint to {:n} C'.format(setpoint)) + _LOG.info('setting setpoint to {:n} {}'.format(setpoint, backend.pv_units)) c.set_setpoint(setpoint) sp = c.get_setpoint() - _LOG.info('setpoint = {:n} C'.format(sp)) + _LOG.info('SP = {:n} {}'.format(sp, backend.pv_units)) if sp != setpoint: msg = 'read setpoint {:n} != written setpoint {:n}'.format( sp, setpoint) @@ -59,19 +60,21 @@ def _set_and_check_setpoint(backend, setpoint): def _check_max_current(backend): # give the backend some time to overcome any integral gain _time.sleep(10) - cur = c.get_current() - _LOG.info('current = {:n} A'.format(cur)) - mcur = c.get_max_current() - if cur != mcur: - temp = backend.get_temp() - sp = backend.get_setpoint() - msg = ('current of {:n} A is not the max {:n} A, but the system is ' - 'at {:n} C while the setpoint is at {:n}').format( - cur, mcur, temp, sp) + MV = c.get_mv() + _LOG.info('MV = {:n} {}'.format(MV, backend.mv_units)) + mMV = c.get_max_mv() + if mv != mMV: + PV = backend.get_pv() + SP = backend.get_setpoint() + PVu = backend.pv_units + MVu = backend.mv_units + msg = ('current of {:n} {} is not the max {:n} {}, but the system is ' + 'at {:n} {} while the setpoint is at {:n} {}').format( + MV, MVu, mMV, MVu, PV, PVu, SP, PVu) _LOG.error(msg) raise Exception(msg) -def test_controller_step_response(backend=None, setpoint=25): +def test_controller_step_response(backend=None, setpoint=1): internal_backend = False if not backend: internal_backend = True @@ -79,31 +82,34 @@ def test_controller_step_response(backend=None, setpoint=25): try: backend.set_mode('PID') c = _Controller(backend=backend) - max_current = backend.get_max_current() - current_a = 0.4 * max_current - current_b = 0.5 * max_current + max_MV = backend.get_max_mv() + MVa = 0.4 * max_MV + MVb = 0.5 * max_MV step_response = c.get_step_response( - current_a=current_a, current_b=current_b, tolerance=0.5, stable_time=4.) - if True: + mv_a=MVa, mv_b=MVb, tolerance=0.5, stable_time=4.) + if False: with open('step_response.dat', 'w') as d: s = step_response[0][0] - for t,T in step_response: - d.write('{:n}\t{:n}\n'.format(t-s, T)) - gain,dead_time,tau,max_rate = c.analyze_step_response( - step_response, current_shift=current_b-current_a) - _LOG.debug(('step response: dead time {:n}, gain {:n}, tau {:n}, ' - 'max-rate {:n}').format(dead_time, gain, tau, max_rate)) + for t,PV in step_response: + d.write('{:n}\t{:n}\n'.format(t-s, PV)) + process_gain,dead_time,decay_time,max_rate = c.analyze_step_response( + step_response, mv_shift=MVb-MVa) + _LOG.debug(('step response: process gain {:n}, dead time {:n}, decay ' + 'time {:n}, max-rate {:n}').format( + process_gain, dead_time, decay_time, max_rate)) + r = rules for name,response_fn,modes in [ - ('Zeigler-Nichols', c.ziegler_nichols_step_response, + ('Zeigler-Nichols', r.ziegler_nichols_step_response, ['P', 'PI', 'PID']), - ('Cohen-Coon', c.cohen_coon_step_response, + ('Cohen-Coon', r.cohen_coon_step_response, ['P', 'PI', 'PID']), # 'PD' - ('Wang-Juan-Chan', c.wang_juang_chan_step_response, + ('Wang-Juan-Chan', r.wang_juang_chan_step_response, ['PID']), ]: for mode in modes: p,i,d = response_fn( - gain=gain, dead_time=dead_time, tau=tau, mode=mode) + process_gain=process_gain, dead_time=dead_time, + decay_time=decay_time, mode=mode) _LOG.debug( '{} step response {}: p {:n}, i {:n}, d {:n}'.format( name, mode, p, i, d)) @@ -111,18 +117,17 @@ def test_controller_step_response(backend=None, setpoint=25): if internal_backend: backend.cleanup() -def test_controller_bang_bang_response(backend=None, setpoint=25): +def test_controller_bang_bang_response(backend=None, setpoint=1): internal_backend = False if not backend: internal_backend = True - backend = _get_backend('test')(log_stream=open('pid.log', 'w')) - # shift our noise-less system off its setpoint - backend.set_setpoint(backend.get_temp()+0.1) + backend = _get_backend('test')() try: + backend.set_setpoint(setpoint) c = _Controller(backend=backend) - dead_band = 3*c.estimate_temperature_sensitivity() - bang_bang_response = c.get_bang_bang_response(dead_band=dead_band, num_oscillations=4) - if True: + dead_band = 3*c.estimate_pv_sensitivity() + bang_bang_response = c.get_bang_bang_response(dead_band=dead_band) + if False: with open('bang_bang_response.dat', 'w') as d: s = bang_bang_response[0][0] for t,T in bang_bang_response: @@ -130,10 +135,16 @@ def test_controller_bang_bang_response(backend=None, setpoint=25): amplitude,period = c.analyze_bang_bang_response(bang_bang_response) _LOG.debug('bang-bang response: amplitude {:n}, period {:n}'.format( amplitude,period)) - p,i,d = c.ziegler_nichols_bang_bang_response( - amplitude=amplitude, period=period, mode='PID') - _LOG.debug(('Zeigler-Nichols bang-bang response: ' - 'p {:n}, i {:n}, d {:n}').format(p, i, d)) + for name,response_fn,modes in [ + ('Zeigler-Nichols', r.ziegler_nichols_bang_bang_response, + ['P', 'PI', 'PID']), + ]: + for mode in modes: + p,i,d = response_fn( + amplitude=amplitude, period=period, mode=mode) + _LOG.debug( + '{} bang-bang response {}: p {:n}, i {:n}, d {:n}'.format( + name, mode, p, i, d)) finally: if internal_backend: backend.cleanup()