7fbc822a8518fdc592c70c8a049693e7e5aac277
[hooke.git] / hooke / driver / wtk.py
1 # Copyright (C) 2010-2011 W. Trevor King <wking@drexel.edu>
2 #
3 # This file is part of Hooke.
4 #
5 # Hooke is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # Hooke is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
13 # Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with Hooke.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Driver for W. Trevor King's velocity clamp data format.
20
21 See my blog post:
22
23 * http://www.physics.drexel.edu/~wking/unfolding-disasters/Hooke/
24
25 related projects:
26
27 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=unfold_protein.git
28 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=calibrate_cantilever.git
29 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=data_logger.git
30 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=piezo.git
31 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=pycomedi.git
32
33 and the deprecated projects (which Hooke replaces):
34
35 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=scale_unfold.git
36 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=sawmodel.git
37 """
38
39 import calendar
40 import copy
41 import os
42 import os.path
43 import re
44 import time
45
46 import numpy
47
48 from .. import curve as curve
49 from ..config import Setting
50 from . import Driver as Driver
51
52
53 class WTKDriver (Driver):
54     """Handle W. Trevor King's data_logger data format.
55     """
56     def __init__(self):
57         super(WTKDriver, self).__init__(name='wtk')
58
59     def default_settings(self):
60         return [
61             Setting(section=self.setting_section, help=self.__doc__),
62             Setting(section=self.setting_section,
63                     option='cantilever calibration directory',
64                     value='~/rsrch/data/calibrate_cantilever', type='path',
65                     help='Set the directory where cantilever calibration data is stored'),
66             ]
67
68     def is_me(self, path):
69         if os.path.isdir(path):
70             return False
71         if not path.endswith('_unfold'):
72             return False
73         for p in self._paths(path):
74             if not os.path.isfile(p):
75                 return False
76         return True
77
78     def read(self, path, info=None):
79         approach_path,retract_path,param_path = self._paths(path)
80
81         unlabeled_approach_data = numpy.loadtxt(
82             approach_path, dtype=numpy.uint16)
83         unlabeled_retract_data = numpy.loadtxt(
84             retract_path, dtype=numpy.uint16)
85         params = self._read_params(param_path)
86         params = self._translate_params(params)
87
88         # move data into Data blocks.
89         approach = self._scale_block(
90             unlabeled_approach_data, params, 'approach')
91         retract = self._scale_block(
92             unlabeled_retract_data, params, 'retract')
93         info = {}
94         return ([approach, retract], info)
95
96     def _paths(self, path):
97         return (path+'_approach', path, path+'_param')
98
99     def _read_params(self, param_path):
100         params = {}
101         for line in file(param_path):
102             ldata = [x.strip() for x in line.split(':', 1)]
103             if ldata[0] == 'Data fields':
104                 params['columns'] = ldata[1].split()
105             elif len(ldata) == 2:
106                 argwords = ldata[1].split()
107                 if ldata[0] == 'Time' or len(argwords) != 1:
108                     params[ldata[0]] = ldata[1]
109                 else:  # single word probably a number
110                     params[ldata[0]] = float(ldata[1])
111             else:
112                 pass  # ignore comment lines
113         return params
114
115     def _translate_params(self, params):
116         ret = {'raw info':params,}
117
118         t = params['Time'] # 20100504135849
119         ret['time'] = self._time_from_localtime_string(t)
120
121         assert params['columns'] == ['Z_piezo_out', 'Deflection_in', 'Z_piezo_in'], \
122             'Unexpected columns: %s' % ret['columns']
123         ret['columns'] = ['z piezo (m)', 'deflection (m)', ]
124
125         calibcant_file = self._find_previous_cantilever_calibration_file(
126             ret['time'])
127         calibcant_info = self._read_cantilever_calibration_file(calibcant_file)
128         ret['raw spring constant'] = calibcant_info
129         ret['spring constant (N/m)'] = calibcant_info['Cantilever k (N/m)']
130         ret['deflection sensitivity (m/V)'] = \
131             1.0/numpy.sqrt(calibcant_info['photoSensitivity**2 (V/nm)**2']) * 1e-9
132
133         # (32768 bits = 2**15 bits = 10 Volts)
134         ret['deflection sensitivity (V/bit)'] = 1.0/3276.8
135         ret['deflection range (V)'] = 20.0
136         ret['deflection offset (V)'] = 10.0  # assumes raw data is unsigned
137
138         ret['z piezo sensitivity (V/bit)'] = 1.0/3276.8  # output volts / bit
139         ret['z piezo range (V)'] = 20.0                  # output volts
140         ret['z piezo offset (V)'] = 10.0  # output volts, assumes unsigned
141         ret['z piezo gain'] = \
142             params['Z piezo gain (Vp/Vo)']  # piezo volts / output volt
143         ret['z piezo sensitivity (m/V)'] = \
144             params['Z piezo sensitivity (nm/Vp)'] * 1e-9  # m / piezo volts
145         
146         return ret
147
148     def _scale_block(self, data, info, name):
149         """Convert the block from its native format to a `numpy.float`
150         array in SI units.
151         """
152         ret = curve.Data(
153             shape=(data.shape[0], 2),
154             dtype=numpy.float,
155             info=copy.deepcopy(info)
156             )
157         ret.info['name'] = name
158         ret.info['raw data'] = data # store the raw data
159         ret.info['columns'] = ['z piezo (m)', 'deflection (m)']
160
161         # raw column indices
162         # approach data does not have a Z_piezo_in column as of unfold v0.0.
163         z_rcol = info['raw info']['columns'].index('Z_piezo_out')
164         d_rcol = info['raw info']['columns'].index('Deflection_in')
165
166         # scaled column indices
167         z_scol = ret.info['columns'].index('z piezo (m)')
168         d_scol = ret.info['columns'].index('deflection (m)')
169
170         # Leading '-' because increasing voltage extends the piezo,
171         # moving the tip towards the surface (positive indentation),
172         # but it makes more sense to me to have it increase away from
173         # the surface (positive separation).
174         ret[:,z_scol] = -(
175             (data[:,z_rcol].astype(ret.dtype)
176              * info['z piezo sensitivity (V/bit)']
177              - info['z piezo offset (V)'])
178             * info['z piezo gain']
179             * info['z piezo sensitivity (m/V)']
180             )
181
182         # Leading '-' because deflection voltage increases as the tip
183         # moves away from the surface, but it makes more sense to me
184         # to have it increase as it moves toward the surface (positive
185         # tension on the protein chain).
186         ret[:,d_scol] = -(
187             (data[:,d_rcol]
188              * info['deflection sensitivity (V/bit)']
189              - info['deflection offset (V)'])
190             * info['deflection sensitivity (m/V)']
191             )
192
193         return ret
194
195     def _list_re_search(self, list, regexp):
196         "Return list entries matching re"
197         reg = re.compile(regexp)
198         ret = []
199         for item in list:
200             if reg.match(item):
201                 ret.append(item)
202         return ret
203
204     def _list_cantilever_calibration_files(self, timestamp=None, basedir=None):
205         if timestamp == None:
206             timestamp = time.time()
207         if basedir == None:
208             basedir = self.config['cantilever calibration directory']
209         YYYYMMDD = time.strftime("%Y%m%d", time.localtime(timestamp))
210         basedir = os.path.expanduser(basedir)
211         dir = os.path.join(basedir, YYYYMMDD)
212         if not os.path.exists(dir):
213             return []
214         all_calibfiles = os.listdir(dir) # everything in the directory
215         #regexp = "%s.*_analysis_text " % (YYYYMMDD)
216         regexp = ".*_analysis_text"
217         calibfiles = self._list_re_search(all_calibfiles, regexp)
218         paths = [os.path.join(dir, cf) for cf in calibfiles]
219         return paths
220
221     def _calibfile_timestamp(self, path):
222         filename = os.path.basename(path)
223         YYYYMMDDHHMMSS = filename[0:14]
224         return self._time_from_localtime_string(YYYYMMDDHHMMSS)
225
226     def _find_previous_cantilever_calibration_file(
227         self, timestamp=None, basedir=None, previous_days=1):
228         """
229         If timestamp == None, uses current time.
230         Warning : brittle! depends on the default data_logger.py save filesystem.
231         Renaming files or moving directories will break me.
232         """
233         #if FORCED_CANTILEVER_CALIBRATION_FILE != None:
234         #    return FORCED_CANTILEVER_CALIBRATION_FILE
235         calibfiles = []
236         for i in range(previous_days+1):
237             calibfiles.extend(self._list_cantilever_calibration_files(
238                     timestamp-24*3600*i,basedir=basedir))
239         assert len(calibfiles) > 0 , \
240             "No calibration files in that day range in directory '%s'" % basedir
241         calibfiles.sort()
242         for i in range(len(calibfiles)):
243             if self._calibfile_timestamp(calibfiles[i]) > timestamp:
244                 i -= 1
245                 break
246         return calibfiles[i]
247
248     def _read_cantilever_calibration_file(self, filename):
249         ret = {'file': filename}
250         line_re = re.compile('(.*) *: (.*) [+]/- (.*) \((.*)\)\n')
251         for line in file(filename):
252             match = line_re.match(line)
253             if match == None:
254                 raise ValueError(
255                     'Invalid cantilever calibration line in %s: %s'
256                     % (filename, line))
257             key,value,std_dev,rel_err = [
258                 x.strip() for x in match.group(*range(1,5))]
259             if key == 'Variable (units)':
260                 continue # ignore the header line
261             value = float(value)
262             std_dev = float(std_dev)
263             # Handle older calibcant output.
264             if key == 'k (N/m)':
265                 key = 'Cantilever k (N/m)'
266             elif key == 'a**2 (V/nm)**2':
267                 key = 'photoSensitivity**2 (V/nm)**2'
268             ret[key] = value
269             ret[key + ' (std. dev.)'] = std_dev
270         return ret
271
272     def _time_from_localtime_string(self, timestring, format="%Y%m%d%H%M%S"):
273         """
274         >>> print time.tzname
275         ('EST', 'EDT')
276         >>> d = WTKDriver()
277         >>> d._time_from_localtime_string("19700101", format="%Y%m%d")/3600.0
278         5.0
279         >>> d._time_from_localtime_string("20081231", format="%Y%m%d")
280         1230699600
281         >>> d._time_from_localtime_string("20090101", format="%Y%m%d")
282         1230786000
283         """
284         timestruct = time.strptime(timestring, format)
285         timestamp = calendar.timegm(timestruct)
286         # Date strings are in localtime, so what we really want is
287         # .timelocale, but that doesn't exist.  Workaround to convert
288         # timestamp to UTC.
289         timestamp += time.timezone # assume no Daylight Savings Time (DST)
290         if bool(time.localtime(timestamp).tm_isdst) == True:
291             timestamp -= time.timezone - time.altzone # correct if DST
292         assert time.strftime(format, time.localtime(timestamp)) == timestring, "error in time_from_localtime_string, %s != %s" % (time.strftime(format, time.localtime(timestamp)), timestring)
293         return timestamp