f3b7ed67b551fa7d6ae53933642ee83f66a6cca6
[hooke.git] / hooke / driver / wtk.py
1 # Copyright (C) 2010-2012 W. Trevor King <wking@drexel.edu>
2 #
3 # This file is part of Hooke.
4 #
5 # Hooke is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU Lesser General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option) any
8 # later version.
9 #
10 # Hooke is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
13 # details.
14 #
15 # You should have received a copy of the GNU Lesser General Public License
16 # along with Hooke.  If not, see <http://www.gnu.org/licenses/>.
17
18 """Driver for W. Trevor King's velocity clamp data format.
19
20 See my blog post:
21
22 * http://www.physics.drexel.edu/~wking/unfolding-disasters/Hooke/
23
24 related projects:
25
26 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=unfold_protein.git
27 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=calibrate_cantilever.git
28 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=data_logger.git
29 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=piezo.git
30 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=pycomedi.git
31
32 and the deprecated projects (which Hooke replaces):
33
34 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=scale_unfold.git
35 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=sawmodel.git
36 """
37
38 import calendar
39 import copy
40 import os
41 import os.path
42 import re
43 import time
44
45 import numpy
46
47 from .. import curve as curve
48 from ..config import Setting
49 from . import Driver as Driver
50
51
52 class WTKDriver (Driver):
53     """Handle W. Trevor King's data_logger data format.
54     """
55     def __init__(self):
56         super(WTKDriver, self).__init__(name='wtk')
57
58     def default_settings(self):
59         return [
60             Setting(section=self.setting_section, help=self.__doc__),
61             Setting(section=self.setting_section,
62                     option='cantilever calibration directory',
63                     value='~/rsrch/data/calibrate_cantilever', type='path',
64                     help='Set the directory where cantilever calibration data is stored'),
65             ]
66
67     def is_me(self, path):
68         if os.path.isdir(path):
69             return False
70         if not path.endswith('_unfold'):
71             return False
72         for p in self._paths(path):
73             if not os.path.isfile(p):
74                 return False
75         return True
76
77     def read(self, path, info=None):
78         approach_path,retract_path,param_path = self._paths(path)
79
80         unlabeled_approach_data = numpy.loadtxt(
81             approach_path, dtype=numpy.uint16)
82         unlabeled_retract_data = numpy.loadtxt(
83             retract_path, dtype=numpy.uint16)
84         params = self._read_params(param_path)
85         params = self._translate_params(params)
86
87         # move data into Data blocks.
88         approach = self._scale_block(
89             unlabeled_approach_data, params, 'approach')
90         retract = self._scale_block(
91             unlabeled_retract_data, params, 'retract')
92         info = {}
93         return ([approach, retract], info)
94
95     def _paths(self, path):
96         return (path+'_approach', path, path+'_param')
97
98     def _read_params(self, param_path):
99         params = {}
100         for line in file(param_path):
101             ldata = [x.strip() for x in line.split(':', 1)]
102             if ldata[0] == 'Data fields':
103                 params['columns'] = ldata[1].split()
104             elif len(ldata) == 2:
105                 argwords = ldata[1].split()
106                 if ldata[0] == 'Time' or len(argwords) != 1:
107                     params[ldata[0]] = ldata[1]
108                 else:  # single word probably a number
109                     params[ldata[0]] = float(ldata[1])
110             else:
111                 pass  # ignore comment lines
112         return params
113
114     def _translate_params(self, params):
115         ret = {'raw info':params,}
116
117         t = params['Time'] # 20100504135849
118         ret['time'] = self._time_from_localtime_string(t)
119
120         assert params['columns'] == ['Z_piezo_out', 'Deflection_in', 'Z_piezo_in'], \
121             'Unexpected columns: %s' % ret['columns']
122         ret['columns'] = ['z piezo (m)', 'deflection (m)', ]
123
124         calibcant_file = self._find_previous_cantilever_calibration_file(
125             ret['time'])
126         calibcant_info = self._read_cantilever_calibration_file(calibcant_file)
127         ret['raw spring constant'] = calibcant_info
128         ret['spring constant (N/m)'] = calibcant_info['Cantilever k (N/m)']
129         ret['deflection sensitivity (m/V)'] = \
130             1.0/numpy.sqrt(calibcant_info['photoSensitivity**2 (V/nm)**2']) * 1e-9
131
132         # (32768 bits = 2**15 bits = 10 Volts)
133         ret['deflection sensitivity (V/bit)'] = 1.0/3276.8
134         ret['deflection range (V)'] = 20.0
135         ret['deflection offset (V)'] = 10.0  # assumes raw data is unsigned
136
137         ret['z piezo sensitivity (V/bit)'] = 1.0/3276.8  # output volts / bit
138         ret['z piezo range (V)'] = 20.0                  # output volts
139         ret['z piezo offset (V)'] = 10.0  # output volts, assumes unsigned
140         ret['z piezo gain'] = \
141             params['Z piezo gain (Vp/Vo)']  # piezo volts / output volt
142         ret['z piezo sensitivity (m/V)'] = \
143             params['Z piezo sensitivity (nm/Vp)'] * 1e-9  # m / piezo volts
144         
145         return ret
146
147     def _scale_block(self, data, info, name):
148         """Convert the block from its native format to a `numpy.float`
149         array in SI units.
150         """
151         ret = curve.Data(
152             shape=(data.shape[0], 2),
153             dtype=numpy.float,
154             info=copy.deepcopy(info)
155             )
156         ret.info['name'] = name
157         ret.info['raw data'] = data # store the raw data
158         ret.info['columns'] = ['z piezo (m)', 'deflection (m)']
159
160         # raw column indices
161         # approach data does not have a Z_piezo_in column as of unfold v0.0.
162         z_rcol = info['raw info']['columns'].index('Z_piezo_out')
163         d_rcol = info['raw info']['columns'].index('Deflection_in')
164
165         # scaled column indices
166         z_scol = ret.info['columns'].index('z piezo (m)')
167         d_scol = ret.info['columns'].index('deflection (m)')
168
169         # Leading '-' because increasing voltage extends the piezo,
170         # moving the tip towards the surface (positive indentation),
171         # but it makes more sense to me to have it increase away from
172         # the surface (positive separation).
173         ret[:,z_scol] = -(
174             (data[:,z_rcol].astype(ret.dtype)
175              * info['z piezo sensitivity (V/bit)']
176              - info['z piezo offset (V)'])
177             * info['z piezo gain']
178             * info['z piezo sensitivity (m/V)']
179             )
180
181         # Leading '-' because deflection voltage increases as the tip
182         # moves away from the surface, but it makes more sense to me
183         # to have it increase as it moves toward the surface (positive
184         # tension on the protein chain).
185         ret[:,d_scol] = -(
186             (data[:,d_rcol]
187              * info['deflection sensitivity (V/bit)']
188              - info['deflection offset (V)'])
189             * info['deflection sensitivity (m/V)']
190             )
191
192         return ret
193
194     def _list_re_search(self, list, regexp):
195         "Return list entries matching re"
196         reg = re.compile(regexp)
197         ret = []
198         for item in list:
199             if reg.match(item):
200                 ret.append(item)
201         return ret
202
203     def _list_cantilever_calibration_files(self, timestamp=None, basedir=None):
204         if timestamp == None:
205             timestamp = time.time()
206         if basedir == None:
207             basedir = self.config['cantilever calibration directory']
208         YYYYMMDD = time.strftime("%Y%m%d", time.localtime(timestamp))
209         basedir = os.path.expanduser(basedir)
210         dir = os.path.join(basedir, YYYYMMDD)
211         if not os.path.exists(dir):
212             return []
213         all_calibfiles = os.listdir(dir) # everything in the directory
214         #regexp = "%s.*_analysis_text " % (YYYYMMDD)
215         regexp = ".*_analysis_text"
216         calibfiles = self._list_re_search(all_calibfiles, regexp)
217         paths = [os.path.join(dir, cf) for cf in calibfiles]
218         return paths
219
220     def _calibfile_timestamp(self, path):
221         filename = os.path.basename(path)
222         YYYYMMDDHHMMSS = filename[0:14]
223         return self._time_from_localtime_string(YYYYMMDDHHMMSS)
224
225     def _find_previous_cantilever_calibration_file(
226         self, timestamp=None, basedir=None, previous_days=1):
227         """
228         If timestamp == None, uses current time.
229         Warning : brittle! depends on the default data_logger.py save filesystem.
230         Renaming files or moving directories will break me.
231         """
232         #if FORCED_CANTILEVER_CALIBRATION_FILE != None:
233         #    return FORCED_CANTILEVER_CALIBRATION_FILE
234         calibfiles = []
235         for i in range(previous_days+1):
236             calibfiles.extend(self._list_cantilever_calibration_files(
237                     timestamp-24*3600*i,basedir=basedir))
238         assert len(calibfiles) > 0 , \
239             "No calibration files in that day range in directory '%s'" % basedir
240         calibfiles.sort()
241         for i in range(len(calibfiles)):
242             if self._calibfile_timestamp(calibfiles[i]) > timestamp:
243                 i -= 1
244                 break
245         return calibfiles[i]
246
247     def _read_cantilever_calibration_file(self, filename):
248         ret = {'file': filename}
249         line_re = re.compile('(.*) *: (.*) [+]/- (.*) \((.*)\)\n')
250         for line in file(filename):
251             match = line_re.match(line)
252             if match == None:
253                 raise ValueError(
254                     'Invalid cantilever calibration line in %s: %s'
255                     % (filename, line))
256             key,value,std_dev,rel_err = [
257                 x.strip() for x in match.group(*range(1,5))]
258             if key == 'Variable (units)':
259                 continue # ignore the header line
260             value = float(value)
261             std_dev = float(std_dev)
262             # Handle older calibcant output.
263             if key == 'k (N/m)':
264                 key = 'Cantilever k (N/m)'
265             elif key == 'a**2 (V/nm)**2':
266                 key = 'photoSensitivity**2 (V/nm)**2'
267             ret[key] = value
268             ret[key + ' (std. dev.)'] = std_dev
269         return ret
270
271     def _time_from_localtime_string(self, timestring, format="%Y%m%d%H%M%S"):
272         """
273         >>> print time.tzname
274         ('EST', 'EDT')
275         >>> d = WTKDriver()
276         >>> d._time_from_localtime_string("19700101", format="%Y%m%d")/3600.0
277         5.0
278         >>> d._time_from_localtime_string("20081231", format="%Y%m%d")
279         1230699600
280         >>> d._time_from_localtime_string("20090101", format="%Y%m%d")
281         1230786000
282         """
283         timestruct = time.strptime(timestring, format)
284         timestamp = calendar.timegm(timestruct)
285         # Date strings are in localtime, so what we really want is
286         # .timelocale, but that doesn't exist.  Workaround to convert
287         # timestamp to UTC.
288         timestamp += time.timezone # assume no Daylight Savings Time (DST)
289         if bool(time.localtime(timestamp).tm_isdst) == True:
290             timestamp -= time.timezone - time.altzone # correct if DST
291         assert time.strftime(format, time.localtime(timestamp)) == timestring, "error in time_from_localtime_string, %s != %s" % (time.strftime(format, time.localtime(timestamp)), timestring)
292         return timestamp