Add wtk driver for W. Trevor King's velocity clamp data format.
[hooke.git] / hooke / driver / wtk.py
1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
2 #
3 # This file is part of Hooke.
4 #
5 # Hooke is free software: you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation, either
8 # version 3 of the License, or (at your option) any later version.
9 #
10 # Hooke is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with Hooke.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Hooke driver for W. Trevor King's velocity clamp data format.
20
21 See my related projects:
22
23 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=unfold_protein.git
24 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=calibrate_cantilever.git
25 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=data_logger.git
26 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=piezo.git
27 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=pycomedi.git
28
29 And the deprecated projects (which Hooke replaces):
30
31 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=scale_unfold.git
32 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=sawmodel.git
33 """
34
35 import calendar
36 import copy
37 import os
38 import os.path
39 import re
40 import time
41
42 import numpy
43
44 from .. import curve as curve
45 from .. import experiment as experiment
46 from ..config import Setting
47 from . import Driver as Driver
48
49 try:
50     import calibcant.config
51     calibcant_dir = calibcant.config.LOG_DIR
52 except ImportError:
53     calibcant_dir = None
54
55 try:
56     import data_logger
57 except ImportError:
58     data_logger = None
59
60
61 class WTKDriver (Driver):
62     """Handle W. Trevor King's data_logger data format.
63     """
64     def __init__(self):
65         super(WTKDriver, self).__init__(name='wtk')
66
67     def default_settings(self):
68         return [
69             Setting(section=self.setting_section, help=self.__doc__),
70             Setting(section=self.setting_section,
71                     option='cantilever calibration directory',
72                     value=calibcant_dir,
73                     help='Set the directory where cantilever calibration data is stored'),
74             ]
75
76     def is_me(self, path):
77         if not path.endswith('_unfold'):
78             return False
79         for p in self._paths(path):
80             if not os.path.isfile(p):
81                 return False
82         return True
83
84     def read(self, path):
85         approach_path,retract_path,param_path = self._paths(path)
86
87         unlabeled_approach_data = numpy.loadtxt(
88             approach_path, dtype=numpy.uint16)
89         unlabeled_retract_data = numpy.loadtxt(
90             retract_path, dtype=numpy.uint16)
91         params = self._read_params(param_path)
92         params = self._translate_params(params)
93
94         # move data into Data blocks.
95         approach = self._scale_block(
96             unlabeled_approach_data, params, 'approach')
97         retract = self._scale_block(
98             unlabeled_retract_data, params, 'retract')
99         info = {'filetype':'wtk', 'experiment':experiment.VelocityClamp}
100         return ([approach, retract], info)
101
102     def _paths(self, path):
103         return (path, path+'_approach', path+'_param')
104
105     def _read_params(self, param_path):
106         params = {}
107         for line in file(param_path):
108             ldata = [x.strip() for x in line.split(':', 1)]
109             if ldata[0] == 'Data fields':
110                 params['columns'] = ldata[1].split()
111             elif len(ldata) == 2:
112                 argwords = ldata[1].split()
113                 if ldata[0] == 'Time' or len(argwords) != 1:
114                     params[ldata[0]] = ldata[1]
115                 else:  # single word probably a number
116                     params[ldata[0]] = float(ldata[1])
117             else:
118                 pass  # ignore comment lines
119         return params
120
121     def _translate_params(self, params):
122         ret = {'raw info':params,}
123
124         t = params['Time'] # 20100504135849
125         ret['time'] = self._time_from_localtime_string(t)
126
127         assert params['columns'] == ['Z_piezo_out', 'Deflection_in', 'Z_piezo_in'], \
128             'Unexpected columns: %s' % ret['columns']
129         ret['columns'] = ['z piezo (m)', 'deflection (m)', ]
130
131         calibcant_file = self._find_previous_cantilever_calibration_file(
132             ret['time'])
133         calibcant_info = self._read_cantilever_calibration_file(calibcant_file)
134         ret['raw spring constant'] = calibcant_info
135         ret['spring constant (N/m)'] = calibcant_info['Cantilever k (N/m)']
136         ret['deflection sensitivity (m/V)'] = \
137             1.0/numpy.sqrt(calibcant_info['photoSensitivity**2 (V/nm)**2'])
138
139         # (32768 bits = 2**15 bits = 10 Volts)
140         ret['deflection sensitivity (V/bit)'] = 1.0/3276.8
141         ret['deflection range (V)'] = 20.0
142         ret['deflection offset (V)'] = 10.0  # assumes raw data is unsigned
143
144         ret['z piezo sensitivity (V/bit)'] = 1.0/3276.8  # output volts / bit
145         ret['z piezo range (V)'] = 20.0                  # output volts
146         ret['z piezo offset (V)'] = 10.0  # output volts, assumes unsigned
147         ret['z piezo gain'] = \
148             params['Z piezo gain (Vp/Vo)']  # piezo volts / output volt
149         ret['z piezo sensitivity (m/V)'] = \
150             params['Z piezo sensitivity (nm/Vp)'] * 1e-9  # m / piezo volts
151         
152         return ret
153
154     def _scale_block(self, data, info, name):
155         """Convert the block from its native format to a `numpy.float`
156         array in SI units.
157         """
158         ret = curve.Data(
159             shape=data.shape,
160             dtype=numpy.float,
161             info=copy.deepcopy(info)
162             )
163         ret.info['name'] = name
164         ret.info['raw data'] = data # store the raw data
165         ret.info['columns'] = ['z piezo (m)', 'deflection (m)']
166
167         # raw column indices
168         # approach data does not have a Z_piezo_in column as of unfold v0.0.
169         z_rcol = info['raw info']['columns'].index('Z_piezo_out')
170         d_rcol = info['raw info']['columns'].index('Deflection_in')
171
172         # scaled column indices
173         z_scol = ret.info['columns'].index('z piezo (m)')
174         d_scol = ret.info['columns'].index('deflection (m)')
175
176         # Leading '-' because increasing voltage extends the piezo,
177         # moving the tip towards the surface (positive indentation),
178         # but it makes more sense to me to have it increase away from
179         # the surface (positive separation).
180         ret[:,z_scol] = -(
181             (data[:,z_rcol].astype(ret.dtype)
182              * info['z piezo sensitivity (V/bit)']
183              - info['z piezo offset (V)'])
184             * info['z piezo gain']
185             * info['z piezo sensitivity (m/V)']
186             )
187
188         ret[:,d_scol] = (
189             (data[:,d_rcol]
190              * info['deflection sensitivity (V/bit)']
191              - info['deflection offset (V)'])
192             * info['deflection sensitivity (m/V)']
193             )
194
195         return ret
196
197     def _list_re_search(self, list, regexp):
198         "Return list entries matching re"
199         reg = re.compile(regexp)
200         ret = []
201         for item in list:
202             if reg.match(item):
203                 ret.append(item)
204         return ret
205
206     def _list_cantilever_calibration_files(self, timestamp=None, basedir=None):
207         if timestamp == None:
208             timestamp = time.time()
209         if basedir == None:
210             basedir = self.config['cantilever calibration directory']
211         YYYYMMDD = time.strftime("%Y%m%d", time.localtime(timestamp))
212         dir = os.path.join(data_logger.normalize_logdir(basedir), YYYYMMDD)
213         if not os.path.exists(dir):
214             return []
215         all_calibfiles = os.listdir(dir) # everything in the directory
216         #regexp = "%s.*_analysis_text " % (YYYYMMDD)
217         regexp = ".*_analysis_text"
218         calibfiles = self._list_re_search(all_calibfiles, regexp)
219         paths = [os.path.join(dir, cf) for cf in calibfiles]
220         return paths
221
222     def _calibfile_timestamp(self, path):
223         filename = os.path.basename(path)
224         YYYYMMDDHHMMSS = filename[0:14]
225         return self._time_from_localtime_string(YYYYMMDDHHMMSS)
226
227     def _find_previous_cantilever_calibration_file(
228         self, timestamp=None, basedir=None, previous_days=1):
229         """
230         If timestamp == None, uses current time.
231         Warning : brittle! depends on the default data_logger.py save filesystem.
232         Renaming files or moving directories will break me.
233         """
234         #if FORCED_CANTILEVER_CALIBRATION_FILE != None:
235         #    return FORCED_CANTILEVER_CALIBRATION_FILE
236         calibfiles = []
237         for i in range(previous_days+1):
238             calibfiles.extend(self._list_cantilever_calibration_files(
239                     timestamp-24*3600*i,basedir=basedir))
240         assert len(calibfiles) > 0 , \
241             "No calibration files in that day range in directory '%s'" % basedir
242         calibfiles.sort()
243         for i in range(len(calibfiles)):
244             if self._calibfile_timestamp(calibfiles[i]) > timestamp:
245                 i -= 1
246                 break
247         return os.path.join(dir, calibfiles[i])
248
249     def _read_cantilever_calibration_file(self, filename):
250         ret = {'file': filename}
251         line_re = re.compile('(.*) *: (.*) [+]/- (.*) \((.*)\)\n')
252         for line in file(filename):
253             match = line_re.match(line)
254             if match == None:
255                 raise ValueError(
256                     'Invalid cantilever calibration line in %s: %s'
257                     % (filename, line))
258             key,value,std_dev,rel_err = [
259                 x.strip() for x in match.group(*range(1,5))]
260             if key == 'Variable (units)':
261                 continue # ignore the header line
262             value = float(value)
263             std_dev = float(std_dev)
264             # Handle older calibcant output.
265             if key == 'k (N/m)':
266                 key = 'Cantilever k (N/m)'
267             elif key == 'a**2 (V/nm)**2':
268                 key = 'photoSensitivity**2 (V/nm)**2'
269             ret[key] = value
270             ret[key + ' (std. dev.)'] = std_dev
271         return ret
272
273     def _time_from_localtime_string(self, timestring, format="%Y%m%d%H%M%S"):
274         """
275         >>> print time.tzname
276         ('EST', 'EDT')
277         >>> d = WTKDriver()
278         >>> d._time_from_localtime_string("19700101", format="%Y%m%d")/3600.0
279         5.0
280         >>> d._time_from_localtime_string("20081231", format="%Y%m%d")
281         1230699600
282         >>> d._time_from_localtime_string("20090101", format="%Y%m%d")
283         1230786000
284         """
285         timestruct = time.strptime(timestring, format)
286         timestamp = calendar.timegm(timestruct)
287         # Date strings are in localtime, so what we really want is
288         # .timelocale, but that doesn't exist.  Workaround to convert
289         # timestamp to UTC.
290         timestamp += time.timezone # assume no Daylight Savings Time (DST)
291         if bool(time.localtime(timestamp).tm_isdst) == True:
292             timestamp -= time.timezone - time.altzone # correct if DST
293         assert time.strftime(format, time.localtime(timestamp)) == timestring, "error in time_from_localtime_string, %s != %s" % (time.strftime(format, time.localtime(timestamp)), timestring)
294         return timestamp