8cef93947c75f1a217887d0c87a746793d6805b0
[hooke.git] / hooke / driver / wtk.py
1 # Copyright (C) 2010 W. Trevor King <wking@drexel.edu>
2 #
3 # This file is part of Hooke.
4 #
5 # Hooke is free software: you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation, either
8 # version 3 of the License, or (at your option) any later version.
9 #
10 # Hooke is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with Hooke.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Driver for W. Trevor King's velocity clamp data format.
20
21 See my related projects:
22
23 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=unfold_protein.git
24 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=calibrate_cantilever.git
25 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=data_logger.git
26 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=piezo.git
27 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=pycomedi.git
28
29 And the deprecated projects (which Hooke replaces):
30
31 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=scale_unfold.git
32 * http://www.physics.drexel.edu/~wking/code/git/git.php?p=sawmodel.git
33 """
34
35 import calendar
36 import copy
37 import os
38 import os.path
39 import re
40 import time
41
42 import numpy
43
44 from .. import curve as curve
45 from .. import experiment as experiment
46 from ..config import Setting
47 from . import Driver as Driver
48
49 try:
50     import calibcant.config
51     calibcant_dir = calibcant.config.LOG_DIR
52 except ImportError:
53     calibcant_dir = None
54
55 try:
56     import data_logger
57 except ImportError:
58     data_logger = None
59
60
61 class WTKDriver (Driver):
62     """Handle W. Trevor King's data_logger data format.
63     """
64     def __init__(self):
65         super(WTKDriver, self).__init__(name='wtk')
66
67     def default_settings(self):
68         return [
69             Setting(section=self.setting_section, help=self.__doc__),
70             Setting(section=self.setting_section,
71                     option='cantilever calibration directory',
72                     value=calibcant_dir,
73                     help='Set the directory where cantilever calibration data is stored'),
74             ]
75
76     def is_me(self, path):
77         if not path.endswith('_unfold'):
78             return False
79         for p in self._paths(path):
80             if not os.path.isfile(p):
81                 return False
82         return True
83
84     def read(self, path, info=None):
85         approach_path,retract_path,param_path = self._paths(path)
86
87         unlabeled_approach_data = numpy.loadtxt(
88             approach_path, dtype=numpy.uint16)
89         unlabeled_retract_data = numpy.loadtxt(
90             retract_path, dtype=numpy.uint16)
91         params = self._read_params(param_path)
92         params = self._translate_params(params)
93
94         # move data into Data blocks.
95         approach = self._scale_block(
96             unlabeled_approach_data, params, 'approach')
97         retract = self._scale_block(
98             unlabeled_retract_data, params, 'retract')
99         info = {'filetype':self.name, 'experiment':experiment.VelocityClamp}
100         return ([approach, retract], info)
101
102     def _paths(self, path):
103         return (path+'_approach', path, path+'_param')
104
105     def _read_params(self, param_path):
106         params = {}
107         for line in file(param_path):
108             ldata = [x.strip() for x in line.split(':', 1)]
109             if ldata[0] == 'Data fields':
110                 params['columns'] = ldata[1].split()
111             elif len(ldata) == 2:
112                 argwords = ldata[1].split()
113                 if ldata[0] == 'Time' or len(argwords) != 1:
114                     params[ldata[0]] = ldata[1]
115                 else:  # single word probably a number
116                     params[ldata[0]] = float(ldata[1])
117             else:
118                 pass  # ignore comment lines
119         return params
120
121     def _translate_params(self, params):
122         ret = {'raw info':params,}
123
124         t = params['Time'] # 20100504135849
125         ret['time'] = self._time_from_localtime_string(t)
126
127         assert params['columns'] == ['Z_piezo_out', 'Deflection_in', 'Z_piezo_in'], \
128             'Unexpected columns: %s' % ret['columns']
129         ret['columns'] = ['z piezo (m)', 'deflection (m)', ]
130
131         calibcant_file = self._find_previous_cantilever_calibration_file(
132             ret['time'])
133         calibcant_info = self._read_cantilever_calibration_file(calibcant_file)
134         ret['raw spring constant'] = calibcant_info
135         ret['spring constant (N/m)'] = calibcant_info['Cantilever k (N/m)']
136         ret['deflection sensitivity (m/V)'] = \
137             1.0/numpy.sqrt(calibcant_info['photoSensitivity**2 (V/nm)**2'])
138
139         # (32768 bits = 2**15 bits = 10 Volts)
140         ret['deflection sensitivity (V/bit)'] = 1.0/3276.8
141         ret['deflection range (V)'] = 20.0
142         ret['deflection offset (V)'] = 10.0  # assumes raw data is unsigned
143
144         ret['z piezo sensitivity (V/bit)'] = 1.0/3276.8  # output volts / bit
145         ret['z piezo range (V)'] = 20.0                  # output volts
146         ret['z piezo offset (V)'] = 10.0  # output volts, assumes unsigned
147         ret['z piezo gain'] = \
148             params['Z piezo gain (Vp/Vo)']  # piezo volts / output volt
149         ret['z piezo sensitivity (m/V)'] = \
150             params['Z piezo sensitivity (nm/Vp)'] * 1e-9  # m / piezo volts
151         
152         return ret
153
154     def _scale_block(self, data, info, name):
155         """Convert the block from its native format to a `numpy.float`
156         array in SI units.
157         """
158         ret = curve.Data(
159             shape=(data.shape[0], 2),
160             dtype=numpy.float,
161             info=copy.deepcopy(info)
162             )
163         ret.info['name'] = name
164         ret.info['raw data'] = data # store the raw data
165         ret.info['columns'] = ['z piezo (m)', 'deflection (m)']
166
167         # raw column indices
168         # approach data does not have a Z_piezo_in column as of unfold v0.0.
169         z_rcol = info['raw info']['columns'].index('Z_piezo_out')
170         d_rcol = info['raw info']['columns'].index('Deflection_in')
171
172         # scaled column indices
173         z_scol = ret.info['columns'].index('z piezo (m)')
174         d_scol = ret.info['columns'].index('deflection (m)')
175
176         # Leading '-' because increasing voltage extends the piezo,
177         # moving the tip towards the surface (positive indentation),
178         # but it makes more sense to me to have it increase away from
179         # the surface (positive separation).
180         ret[:,z_scol] = -(
181             (data[:,z_rcol].astype(ret.dtype)
182              * info['z piezo sensitivity (V/bit)']
183              - info['z piezo offset (V)'])
184             * info['z piezo gain']
185             * info['z piezo sensitivity (m/V)']
186             )
187
188         # Leading '-' because deflection voltage increases as the tip
189         # moves away from the surface, but it makes more sense to me
190         # to have it increase as it moves toward the surface (positive
191         # tension on the protein chain).
192         ret[:,d_scol] = -(
193             (data[:,d_rcol]
194              * info['deflection sensitivity (V/bit)']
195              - info['deflection offset (V)'])
196             * info['deflection sensitivity (m/V)']
197             )
198
199         return ret
200
201     def _list_re_search(self, list, regexp):
202         "Return list entries matching re"
203         reg = re.compile(regexp)
204         ret = []
205         for item in list:
206             if reg.match(item):
207                 ret.append(item)
208         return ret
209
210     def _list_cantilever_calibration_files(self, timestamp=None, basedir=None):
211         if timestamp == None:
212             timestamp = time.time()
213         if basedir == None:
214             basedir = self.config['cantilever calibration directory']
215         YYYYMMDD = time.strftime("%Y%m%d", time.localtime(timestamp))
216         dir = os.path.join(data_logger.normalize_logdir(basedir), YYYYMMDD)
217         if not os.path.exists(dir):
218             return []
219         all_calibfiles = os.listdir(dir) # everything in the directory
220         #regexp = "%s.*_analysis_text " % (YYYYMMDD)
221         regexp = ".*_analysis_text"
222         calibfiles = self._list_re_search(all_calibfiles, regexp)
223         paths = [os.path.join(dir, cf) for cf in calibfiles]
224         return paths
225
226     def _calibfile_timestamp(self, path):
227         filename = os.path.basename(path)
228         YYYYMMDDHHMMSS = filename[0:14]
229         return self._time_from_localtime_string(YYYYMMDDHHMMSS)
230
231     def _find_previous_cantilever_calibration_file(
232         self, timestamp=None, basedir=None, previous_days=1):
233         """
234         If timestamp == None, uses current time.
235         Warning : brittle! depends on the default data_logger.py save filesystem.
236         Renaming files or moving directories will break me.
237         """
238         #if FORCED_CANTILEVER_CALIBRATION_FILE != None:
239         #    return FORCED_CANTILEVER_CALIBRATION_FILE
240         calibfiles = []
241         for i in range(previous_days+1):
242             calibfiles.extend(self._list_cantilever_calibration_files(
243                     timestamp-24*3600*i,basedir=basedir))
244         assert len(calibfiles) > 0 , \
245             "No calibration files in that day range in directory '%s'" % basedir
246         calibfiles.sort()
247         for i in range(len(calibfiles)):
248             if self._calibfile_timestamp(calibfiles[i]) > timestamp:
249                 i -= 1
250                 break
251         return calibfiles[i]
252
253     def _read_cantilever_calibration_file(self, filename):
254         ret = {'file': filename}
255         line_re = re.compile('(.*) *: (.*) [+]/- (.*) \((.*)\)\n')
256         for line in file(filename):
257             match = line_re.match(line)
258             if match == None:
259                 raise ValueError(
260                     'Invalid cantilever calibration line in %s: %s'
261                     % (filename, line))
262             key,value,std_dev,rel_err = [
263                 x.strip() for x in match.group(*range(1,5))]
264             if key == 'Variable (units)':
265                 continue # ignore the header line
266             value = float(value)
267             std_dev = float(std_dev)
268             # Handle older calibcant output.
269             if key == 'k (N/m)':
270                 key = 'Cantilever k (N/m)'
271             elif key == 'a**2 (V/nm)**2':
272                 key = 'photoSensitivity**2 (V/nm)**2'
273             ret[key] = value
274             ret[key + ' (std. dev.)'] = std_dev
275         return ret
276
277     def _time_from_localtime_string(self, timestring, format="%Y%m%d%H%M%S"):
278         """
279         >>> print time.tzname
280         ('EST', 'EDT')
281         >>> d = WTKDriver()
282         >>> d._time_from_localtime_string("19700101", format="%Y%m%d")/3600.0
283         5.0
284         >>> d._time_from_localtime_string("20081231", format="%Y%m%d")
285         1230699600
286         >>> d._time_from_localtime_string("20090101", format="%Y%m%d")
287         1230786000
288         """
289         timestruct = time.strptime(timestring, format)
290         timestamp = calendar.timegm(timestruct)
291         # Date strings are in localtime, so what we really want is
292         # .timelocale, but that doesn't exist.  Workaround to convert
293         # timestamp to UTC.
294         timestamp += time.timezone # assume no Daylight Savings Time (DST)
295         if bool(time.localtime(timestamp).tm_isdst) == True:
296             timestamp -= time.timezone - time.altzone # correct if DST
297         assert time.strftime(format, time.localtime(timestamp)) == timestring, "error in time_from_localtime_string, %s != %s" % (time.strftime(format, time.localtime(timestamp)), timestring)
298         return timestamp