b1a09be42233745706da2819d40d9955bc0a460a
[hooke.git] / hooke / driver / wtk.py
1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
2 #
3 # This file is part of Hooke.
4 #
5 # Hooke is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # Hooke is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
13 # Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with Hooke.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Driver for W. Trevor King's velocity clamp data format.
20
21 See my blog post:
22
23 * http://www.physics.drexel.edu/~wking/unfolding-disasters/Hooke/
24
25 related projects:
26
27 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=unfold_protein.git
28 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=calibrate_cantilever.git
29 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=data_logger.git
30 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=piezo.git
31 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=pycomedi.git
32
33 and the deprecated projects (which Hooke replaces):
34
35 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=scale_unfold.git
36 * http://www.physics.drexel.edu/~wking/code/git/gitweb.cgi?p=sawmodel.git
37 """
38
39 import calendar
40 import copy
41 import os
42 import os.path
43 import re
44 import time
45
46 import numpy
47
48 from .. import curve as curve
49 from ..config import Setting
50 from . import Driver as Driver
51
52 try:
53     import calibcant.config
54     calibcant_dir = calibcant.config.LOG_DIR
55 except ImportError:
56     calibcant_dir = None
57
58 try:
59     import data_logger
60 except ImportError:
61     data_logger = None
62
63
64 class WTKDriver (Driver):
65     """Handle W. Trevor King's data_logger data format.
66     """
67     def __init__(self):
68         super(WTKDriver, self).__init__(name='wtk')
69
70     def default_settings(self):
71         return [
72             Setting(section=self.setting_section, help=self.__doc__),
73             Setting(section=self.setting_section,
74                     option='cantilever calibration directory',
75                     value=calibcant_dir, type='path',
76                     help='Set the directory where cantilever calibration data is stored'),
77             ]
78
79     def is_me(self, path):
80         if os.path.isdir(path):
81             return False
82         if not path.endswith('_unfold'):
83             return False
84         for p in self._paths(path):
85             if not os.path.isfile(p):
86                 return False
87         return True
88
89     def read(self, path, info=None):
90         approach_path,retract_path,param_path = self._paths(path)
91
92         unlabeled_approach_data = numpy.loadtxt(
93             approach_path, dtype=numpy.uint16)
94         unlabeled_retract_data = numpy.loadtxt(
95             retract_path, dtype=numpy.uint16)
96         params = self._read_params(param_path)
97         params = self._translate_params(params)
98
99         # move data into Data blocks.
100         approach = self._scale_block(
101             unlabeled_approach_data, params, 'approach')
102         retract = self._scale_block(
103             unlabeled_retract_data, params, 'retract')
104         info = {}
105         return ([approach, retract], info)
106
107     def _paths(self, path):
108         return (path+'_approach', path, path+'_param')
109
110     def _read_params(self, param_path):
111         params = {}
112         for line in file(param_path):
113             ldata = [x.strip() for x in line.split(':', 1)]
114             if ldata[0] == 'Data fields':
115                 params['columns'] = ldata[1].split()
116             elif len(ldata) == 2:
117                 argwords = ldata[1].split()
118                 if ldata[0] == 'Time' or len(argwords) != 1:
119                     params[ldata[0]] = ldata[1]
120                 else:  # single word probably a number
121                     params[ldata[0]] = float(ldata[1])
122             else:
123                 pass  # ignore comment lines
124         return params
125
126     def _translate_params(self, params):
127         ret = {'raw info':params,}
128
129         t = params['Time'] # 20100504135849
130         ret['time'] = self._time_from_localtime_string(t)
131
132         assert params['columns'] == ['Z_piezo_out', 'Deflection_in', 'Z_piezo_in'], \
133             'Unexpected columns: %s' % ret['columns']
134         ret['columns'] = ['z piezo (m)', 'deflection (m)', ]
135
136         calibcant_file = self._find_previous_cantilever_calibration_file(
137             ret['time'])
138         calibcant_info = self._read_cantilever_calibration_file(calibcant_file)
139         ret['raw spring constant'] = calibcant_info
140         ret['spring constant (N/m)'] = calibcant_info['Cantilever k (N/m)']
141         ret['deflection sensitivity (m/V)'] = \
142             1.0/numpy.sqrt(calibcant_info['photoSensitivity**2 (V/nm)**2']) * 1e-9
143
144         # (32768 bits = 2**15 bits = 10 Volts)
145         ret['deflection sensitivity (V/bit)'] = 1.0/3276.8
146         ret['deflection range (V)'] = 20.0
147         ret['deflection offset (V)'] = 10.0  # assumes raw data is unsigned
148
149         ret['z piezo sensitivity (V/bit)'] = 1.0/3276.8  # output volts / bit
150         ret['z piezo range (V)'] = 20.0                  # output volts
151         ret['z piezo offset (V)'] = 10.0  # output volts, assumes unsigned
152         ret['z piezo gain'] = \
153             params['Z piezo gain (Vp/Vo)']  # piezo volts / output volt
154         ret['z piezo sensitivity (m/V)'] = \
155             params['Z piezo sensitivity (nm/Vp)'] * 1e-9  # m / piezo volts
156         
157         return ret
158
159     def _scale_block(self, data, info, name):
160         """Convert the block from its native format to a `numpy.float`
161         array in SI units.
162         """
163         ret = curve.Data(
164             shape=(data.shape[0], 2),
165             dtype=numpy.float,
166             info=copy.deepcopy(info)
167             )
168         ret.info['name'] = name
169         ret.info['raw data'] = data # store the raw data
170         ret.info['columns'] = ['z piezo (m)', 'deflection (m)']
171
172         # raw column indices
173         # approach data does not have a Z_piezo_in column as of unfold v0.0.
174         z_rcol = info['raw info']['columns'].index('Z_piezo_out')
175         d_rcol = info['raw info']['columns'].index('Deflection_in')
176
177         # scaled column indices
178         z_scol = ret.info['columns'].index('z piezo (m)')
179         d_scol = ret.info['columns'].index('deflection (m)')
180
181         # Leading '-' because increasing voltage extends the piezo,
182         # moving the tip towards the surface (positive indentation),
183         # but it makes more sense to me to have it increase away from
184         # the surface (positive separation).
185         ret[:,z_scol] = -(
186             (data[:,z_rcol].astype(ret.dtype)
187              * info['z piezo sensitivity (V/bit)']
188              - info['z piezo offset (V)'])
189             * info['z piezo gain']
190             * info['z piezo sensitivity (m/V)']
191             )
192
193         # Leading '-' because deflection voltage increases as the tip
194         # moves away from the surface, but it makes more sense to me
195         # to have it increase as it moves toward the surface (positive
196         # tension on the protein chain).
197         ret[:,d_scol] = -(
198             (data[:,d_rcol]
199              * info['deflection sensitivity (V/bit)']
200              - info['deflection offset (V)'])
201             * info['deflection sensitivity (m/V)']
202             )
203
204         return ret
205
206     def _list_re_search(self, list, regexp):
207         "Return list entries matching re"
208         reg = re.compile(regexp)
209         ret = []
210         for item in list:
211             if reg.match(item):
212                 ret.append(item)
213         return ret
214
215     def _list_cantilever_calibration_files(self, timestamp=None, basedir=None):
216         if timestamp == None:
217             timestamp = time.time()
218         if basedir == None:
219             basedir = self.config['cantilever calibration directory']
220         YYYYMMDD = time.strftime("%Y%m%d", time.localtime(timestamp))
221         dir = os.path.join(data_logger.normalize_log_dir(basedir), YYYYMMDD)
222         if not os.path.exists(dir):
223             return []
224         all_calibfiles = os.listdir(dir) # everything in the directory
225         #regexp = "%s.*_analysis_text " % (YYYYMMDD)
226         regexp = ".*_analysis_text"
227         calibfiles = self._list_re_search(all_calibfiles, regexp)
228         paths = [os.path.join(dir, cf) for cf in calibfiles]
229         return paths
230
231     def _calibfile_timestamp(self, path):
232         filename = os.path.basename(path)
233         YYYYMMDDHHMMSS = filename[0:14]
234         return self._time_from_localtime_string(YYYYMMDDHHMMSS)
235
236     def _find_previous_cantilever_calibration_file(
237         self, timestamp=None, basedir=None, previous_days=1):
238         """
239         If timestamp == None, uses current time.
240         Warning : brittle! depends on the default data_logger.py save filesystem.
241         Renaming files or moving directories will break me.
242         """
243         #if FORCED_CANTILEVER_CALIBRATION_FILE != None:
244         #    return FORCED_CANTILEVER_CALIBRATION_FILE
245         calibfiles = []
246         for i in range(previous_days+1):
247             calibfiles.extend(self._list_cantilever_calibration_files(
248                     timestamp-24*3600*i,basedir=basedir))
249         assert len(calibfiles) > 0 , \
250             "No calibration files in that day range in directory '%s'" % basedir
251         calibfiles.sort()
252         for i in range(len(calibfiles)):
253             if self._calibfile_timestamp(calibfiles[i]) > timestamp:
254                 i -= 1
255                 break
256         return calibfiles[i]
257
258     def _read_cantilever_calibration_file(self, filename):
259         ret = {'file': filename}
260         line_re = re.compile('(.*) *: (.*) [+]/- (.*) \((.*)\)\n')
261         for line in file(filename):
262             match = line_re.match(line)
263             if match == None:
264                 raise ValueError(
265                     'Invalid cantilever calibration line in %s: %s'
266                     % (filename, line))
267             key,value,std_dev,rel_err = [
268                 x.strip() for x in match.group(*range(1,5))]
269             if key == 'Variable (units)':
270                 continue # ignore the header line
271             value = float(value)
272             std_dev = float(std_dev)
273             # Handle older calibcant output.
274             if key == 'k (N/m)':
275                 key = 'Cantilever k (N/m)'
276             elif key == 'a**2 (V/nm)**2':
277                 key = 'photoSensitivity**2 (V/nm)**2'
278             ret[key] = value
279             ret[key + ' (std. dev.)'] = std_dev
280         return ret
281
282     def _time_from_localtime_string(self, timestring, format="%Y%m%d%H%M%S"):
283         """
284         >>> print time.tzname
285         ('EST', 'EDT')
286         >>> d = WTKDriver()
287         >>> d._time_from_localtime_string("19700101", format="%Y%m%d")/3600.0
288         5.0
289         >>> d._time_from_localtime_string("20081231", format="%Y%m%d")
290         1230699600
291         >>> d._time_from_localtime_string("20090101", format="%Y%m%d")
292         1230786000
293         """
294         timestruct = time.strptime(timestring, format)
295         timestamp = calendar.timegm(timestruct)
296         # Date strings are in localtime, so what we really want is
297         # .timelocale, but that doesn't exist.  Workaround to convert
298         # timestamp to UTC.
299         timestamp += time.timezone # assume no Daylight Savings Time (DST)
300         if bool(time.localtime(timestamp).tm_isdst) == True:
301             timestamp -= time.timezone - time.altzone # correct if DST
302         assert time.strftime(format, time.localtime(timestamp)) == timestring, "error in time_from_localtime_string, %s != %s" % (time.strftime(format, time.localtime(timestamp)), timestring)
303         return timestamp