Fix existing Driver.is_me's crashes if path is a directory.
[hooke.git] / hooke / driver / wtk.py
1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
2 #
3 # This file is part of Hooke.
4 #
5 # Hooke is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # Hooke is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
13 # Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with Hooke.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Driver for W. Trevor King's velocity clamp data format.
20
21 See my related projects:
22
23 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=unfold_protein.git
24 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=calibrate_cantilever.git
25 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=data_logger.git
26 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=piezo.git
27 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=pycomedi.git
28
29 And the deprecated projects (which Hooke replaces):
30
31 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=scale_unfold.git
32 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=sawmodel.git
33 """
34
35 import calendar
36 import copy
37 import os
38 import os.path
39 import re
40 import time
41
42 import numpy
43
44 from .. import curve as curve
45 from .. import experiment as experiment
46 from ..config import Setting
47 from . import Driver as Driver
48
49 try:
50     import calibcant.config
51     calibcant_dir = calibcant.config.LOG_DIR
52 except ImportError:
53     calibcant_dir = None
54
55 try:
56     import data_logger
57 except ImportError:
58     data_logger = None
59
60
61 class WTKDriver (Driver):
62     """Handle W. Trevor King's data_logger data format.
63     """
64     def __init__(self):
65         super(WTKDriver, self).__init__(name='wtk')
66
67     def default_settings(self):
68         return [
69             Setting(section=self.setting_section, help=self.__doc__),
70             Setting(section=self.setting_section,
71                     option='cantilever calibration directory',
72                     value=calibcant_dir, type='path',
73                     help='Set the directory where cantilever calibration data is stored'),
74             ]
75
76     def is_me(self, path):
77         if os.path.isdir(path):
78             return False
79         if not path.endswith('_unfold'):
80             return False
81         for p in self._paths(path):
82             if not os.path.isfile(p):
83                 return False
84         return True
85
86     def read(self, path, info=None):
87         approach_path,retract_path,param_path = self._paths(path)
88
89         unlabeled_approach_data = numpy.loadtxt(
90             approach_path, dtype=numpy.uint16)
91         unlabeled_retract_data = numpy.loadtxt(
92             retract_path, dtype=numpy.uint16)
93         params = self._read_params(param_path)
94         params = self._translate_params(params)
95
96         # move data into Data blocks.
97         approach = self._scale_block(
98             unlabeled_approach_data, params, 'approach')
99         retract = self._scale_block(
100             unlabeled_retract_data, params, 'retract')
101         info = {'filetype':self.name, 'experiment':experiment.VelocityClamp}
102         return ([approach, retract], info)
103
104     def _paths(self, path):
105         return (path+'_approach', path, path+'_param')
106
107     def _read_params(self, param_path):
108         params = {}
109         for line in file(param_path):
110             ldata = [x.strip() for x in line.split(':', 1)]
111             if ldata[0] == 'Data fields':
112                 params['columns'] = ldata[1].split()
113             elif len(ldata) == 2:
114                 argwords = ldata[1].split()
115                 if ldata[0] == 'Time' or len(argwords) != 1:
116                     params[ldata[0]] = ldata[1]
117                 else:  # single word probably a number
118                     params[ldata[0]] = float(ldata[1])
119             else:
120                 pass  # ignore comment lines
121         return params
122
123     def _translate_params(self, params):
124         ret = {'raw info':params,}
125
126         t = params['Time'] # 20100504135849
127         ret['time'] = self._time_from_localtime_string(t)
128
129         assert params['columns'] == ['Z_piezo_out', 'Deflection_in', 'Z_piezo_in'], \
130             'Unexpected columns: %s' % ret['columns']
131         ret['columns'] = ['z piezo (m)', 'deflection (m)', ]
132
133         calibcant_file = self._find_previous_cantilever_calibration_file(
134             ret['time'])
135         calibcant_info = self._read_cantilever_calibration_file(calibcant_file)
136         ret['raw spring constant'] = calibcant_info
137         ret['spring constant (N/m)'] = calibcant_info['Cantilever k (N/m)']
138         ret['deflection sensitivity (m/V)'] = \
139             1.0/numpy.sqrt(calibcant_info['photoSensitivity**2 (V/nm)**2']) * 1e-9
140
141         # (32768 bits = 2**15 bits = 10 Volts)
142         ret['deflection sensitivity (V/bit)'] = 1.0/3276.8
143         ret['deflection range (V)'] = 20.0
144         ret['deflection offset (V)'] = 10.0  # assumes raw data is unsigned
145
146         ret['z piezo sensitivity (V/bit)'] = 1.0/3276.8  # output volts / bit
147         ret['z piezo range (V)'] = 20.0                  # output volts
148         ret['z piezo offset (V)'] = 10.0  # output volts, assumes unsigned
149         ret['z piezo gain'] = \
150             params['Z piezo gain (Vp/Vo)']  # piezo volts / output volt
151         ret['z piezo sensitivity (m/V)'] = \
152             params['Z piezo sensitivity (nm/Vp)'] * 1e-9  # m / piezo volts
153         
154         return ret
155
156     def _scale_block(self, data, info, name):
157         """Convert the block from its native format to a `numpy.float`
158         array in SI units.
159         """
160         ret = curve.Data(
161             shape=(data.shape[0], 2),
162             dtype=numpy.float,
163             info=copy.deepcopy(info)
164             )
165         ret.info['name'] = name
166         ret.info['raw data'] = data # store the raw data
167         ret.info['columns'] = ['z piezo (m)', 'deflection (m)']
168
169         # raw column indices
170         # approach data does not have a Z_piezo_in column as of unfold v0.0.
171         z_rcol = info['raw info']['columns'].index('Z_piezo_out')
172         d_rcol = info['raw info']['columns'].index('Deflection_in')
173
174         # scaled column indices
175         z_scol = ret.info['columns'].index('z piezo (m)')
176         d_scol = ret.info['columns'].index('deflection (m)')
177
178         # Leading '-' because increasing voltage extends the piezo,
179         # moving the tip towards the surface (positive indentation),
180         # but it makes more sense to me to have it increase away from
181         # the surface (positive separation).
182         ret[:,z_scol] = -(
183             (data[:,z_rcol].astype(ret.dtype)
184              * info['z piezo sensitivity (V/bit)']
185              - info['z piezo offset (V)'])
186             * info['z piezo gain']
187             * info['z piezo sensitivity (m/V)']
188             )
189
190         # Leading '-' because deflection voltage increases as the tip
191         # moves away from the surface, but it makes more sense to me
192         # to have it increase as it moves toward the surface (positive
193         # tension on the protein chain).
194         ret[:,d_scol] = -(
195             (data[:,d_rcol]
196              * info['deflection sensitivity (V/bit)']
197              - info['deflection offset (V)'])
198             * info['deflection sensitivity (m/V)']
199             )
200
201         return ret
202
203     def _list_re_search(self, list, regexp):
204         "Return list entries matching re"
205         reg = re.compile(regexp)
206         ret = []
207         for item in list:
208             if reg.match(item):
209                 ret.append(item)
210         return ret
211
212     def _list_cantilever_calibration_files(self, timestamp=None, basedir=None):
213         if timestamp == None:
214             timestamp = time.time()
215         if basedir == None:
216             basedir = self.config['cantilever calibration directory']
217         YYYYMMDD = time.strftime("%Y%m%d", time.localtime(timestamp))
218         dir = os.path.join(data_logger.normalize_logdir(basedir), YYYYMMDD)
219         if not os.path.exists(dir):
220             return []
221         all_calibfiles = os.listdir(dir) # everything in the directory
222         #regexp = "%s.*_analysis_text " % (YYYYMMDD)
223         regexp = ".*_analysis_text"
224         calibfiles = self._list_re_search(all_calibfiles, regexp)
225         paths = [os.path.join(dir, cf) for cf in calibfiles]
226         return paths
227
228     def _calibfile_timestamp(self, path):
229         filename = os.path.basename(path)
230         YYYYMMDDHHMMSS = filename[0:14]
231         return self._time_from_localtime_string(YYYYMMDDHHMMSS)
232
233     def _find_previous_cantilever_calibration_file(
234         self, timestamp=None, basedir=None, previous_days=1):
235         """
236         If timestamp == None, uses current time.
237         Warning : brittle! depends on the default data_logger.py save filesystem.
238         Renaming files or moving directories will break me.
239         """
240         #if FORCED_CANTILEVER_CALIBRATION_FILE != None:
241         #    return FORCED_CANTILEVER_CALIBRATION_FILE
242         calibfiles = []
243         for i in range(previous_days+1):
244             calibfiles.extend(self._list_cantilever_calibration_files(
245                     timestamp-24*3600*i,basedir=basedir))
246         assert len(calibfiles) > 0 , \
247             "No calibration files in that day range in directory '%s'" % basedir
248         calibfiles.sort()
249         for i in range(len(calibfiles)):
250             if self._calibfile_timestamp(calibfiles[i]) > timestamp:
251                 i -= 1
252                 break
253         return calibfiles[i]
254
255     def _read_cantilever_calibration_file(self, filename):
256         ret = {'file': filename}
257         line_re = re.compile('(.*) *: (.*) [+]/- (.*) \((.*)\)\n')
258         for line in file(filename):
259             match = line_re.match(line)
260             if match == None:
261                 raise ValueError(
262                     'Invalid cantilever calibration line in %s: %s'
263                     % (filename, line))
264             key,value,std_dev,rel_err = [
265                 x.strip() for x in match.group(*range(1,5))]
266             if key == 'Variable (units)':
267                 continue # ignore the header line
268             value = float(value)
269             std_dev = float(std_dev)
270             # Handle older calibcant output.
271             if key == 'k (N/m)':
272                 key = 'Cantilever k (N/m)'
273             elif key == 'a**2 (V/nm)**2':
274                 key = 'photoSensitivity**2 (V/nm)**2'
275             ret[key] = value
276             ret[key + ' (std. dev.)'] = std_dev
277         return ret
278
279     def _time_from_localtime_string(self, timestring, format="%Y%m%d%H%M%S"):
280         """
281         >>> print time.tzname
282         ('EST', 'EDT')
283         >>> d = WTKDriver()
284         >>> d._time_from_localtime_string("19700101", format="%Y%m%d")/3600.0
285         5.0
286         >>> d._time_from_localtime_string("20081231", format="%Y%m%d")
287         1230699600
288         >>> d._time_from_localtime_string("20090101", format="%Y%m%d")
289         1230786000
290         """
291         timestruct = time.strptime(timestring, format)
292         timestamp = calendar.timegm(timestruct)
293         # Date strings are in localtime, so what we really want is
294         # .timelocale, but that doesn't exist.  Workaround to convert
295         # timestamp to UTC.
296         timestamp += time.timezone # assume no Daylight Savings Time (DST)
297         if bool(time.localtime(timestamp).tm_isdst) == True:
298             timestamp -= time.timezone - time.altzone # correct if DST
299         assert time.strftime(format, time.localtime(timestamp)) == timestring, "error in time_from_localtime_string, %s != %s" % (time.strftime(format, time.localtime(timestamp)), timestring)
300         return timestamp