ca84792a3b470aef69afc1e9c99fc0f3b7518773
[hooke.git] / hooke / driver / picoforce.py
1 # Copyright (C) 2006-2010 Alberto Gomez-Casado
2 #                         Massimo Sandal <devicerandom@gmail.com>
3 #                         W. Trevor King <wking@drexel.edu>
4 #
5 # This file is part of Hooke.
6 #
7 # Hooke is free software: you can redistribute it and/or modify it
8 # under the terms of the GNU Lesser General Public License as
9 # published by the Free Software Foundation, either version 3 of the
10 # License, or (at your option) any later version.
11 #
12 # Hooke is distributed in the hope that it will be useful, but WITHOUT
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
15 # Public License for more details.
16 #
17 # You should have received a copy of the GNU Lesser General Public
18 # License along with Hooke.  If not, see
19 # <http://www.gnu.org/licenses/>.
20
21 """Driver for Veeco PicoForce force spectroscopy files.
22 """
23
24 import os.path
25 import pprint
26 import re
27 import time
28
29 import numpy
30
31 from .. import curve as curve # this module defines data containers.
32 from .. import experiment as experiment # this module defines expt. types
33 from . import Driver as Driver # this is the Driver base class
34
35
36 __version__='0.0.0.20100516'
37
38 class PicoForceDriver (Driver):
39     """Handle Veeco Picoforce force spectroscopy files.
40     """
41     def __init__(self):
42         super(PicoForceDriver, self).__init__(name='picoforce')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         f = file(path, 'r')
48         header = f.read(30)
49         f.close()
50
51         return header[2:17] == 'Force file list'
52
53     def read(self, path, info=None):
54         info = self._read_header_path(path)
55         self._check_version(info)
56         data = self._read_data_path(path, info)
57         info['filetype'] = self.name
58         info['experiment'] = experiment.VelocityClamp()
59         return (data, info)
60
61     def _read_header_path(self, path):
62         """Read curve information from the PicoForce file at `path`.
63
64         See :meth:`._read_header_file`.
65         """
66         return self._read_header_file(file(path, 'rb'))
67
68     def _read_header_file(self, file):
69         r"""Read curve information from a PicoForce file.
70
71         Return a dict of dicts representing the information.  If a
72         field is repeated multiple times, it's value is replaced by a
73         list of the values for each occurence.
74
75         Examples
76         --------
77
78         >>> import pprint
79         >>> import StringIO
80         >>> f = StringIO.StringIO('\r\n'.join([
81         ...             '\*Force file list',
82         ...             '\Version: 0x06120002',
83         ...             '\Date: 04:42:34 PM Tue Sep 11 2007',
84         ...             '\Start context: FOL2',
85         ...             '\Data length: 40960',
86         ...             '\Text: ',
87         ...             '\*Equipment list',
88         ...             '\Description: Extended PicoForce',
89         ...             '\Controller: IIIA',
90         ...             '\*Ciao force image list',
91         ...             '\Data offset: 40960',
92         ...             '\Data length: 8192',
93         ...             '\*Ciao force image list',
94         ...             '\Data offset: 49152',
95         ...             '\Data length: 8192',
96         ...             '\*Ciao force image list',
97         ...             '\Data offset: 57344',
98         ...             '\Data length: 8192',
99         ...             ]))
100         >>> p = PicoForceDriver()
101         >>> d = p._read_header_file(f)
102         >>> pprint.pprint(d, width=60)
103         {'Ciao force image list': [{'Data length': '8192',
104                                     'Data offset': '40960'},
105                                    {'Data length': '8192',
106                                     'Data offset': '49152'},
107                                    {'Data length': '8192',
108                                     'Data offset': '57344'}],
109          'Equipment list': {'Controller': 'IIIA',
110                             'Description': 'Extended PicoForce'},
111          'Force file list': {'Data length': '40960',
112                              'Date': '04:42:34 PM Tue Sep 11 2007',
113                              'Start context': 'FOL2',
114                              'Text:': None,
115                              'Version': '0x06120002'}}
116         """
117         info = {}
118         header_field = None
119         for line in file:
120             line = line.strip()
121             if line.startswith('\*File list end'):
122                 break
123             if line.startswith(r'\*'):
124                 header_field = line[len(r'\*'):]
125                 if header_field in info:
126                     if isinstance(info[header_field], list):
127                         info[header_field].append({}) # >=3rd appearance
128                     else: # Second appearance
129                         info[header_field] = [info[header_field], {}]
130                 else: # First appearance
131                     info[header_field] = {}
132             else:
133                 assert line.startswith('\\'), line
134                 fields = line[len('\\'):].split(': ', 1)
135                 key = fields[0]
136                 if len(fields) == 1: # fields = [key]
137                     value = None
138                 else: # fields = [key, value]
139                     value = fields[1]
140                 if isinstance(info[header_field], list): # >=2nd header_field
141                     target_dict = info[header_field][-1]
142                 else: # first appearance of header_field
143                     target_dict = info[header_field]
144                 if key in target_dict and target_dict[key] != value:
145                     raise NotImplementedError(
146                         'Overwriting %s: %s -> %s'
147                         % (key, target_dict[key], value))
148                 target_dict[key] = value
149         return (info)
150
151     def _check_version(self, info):
152         """Ensure the input file is a version we understand.
153
154         Otherwise, raise `ValueError`.
155         """
156         version = info['Force file list'].get('Version', None)
157         if version not in ['0x06120002', '0x06130001', '0x07200000']:
158             raise NotImplementedError(
159                 '%s file version %s not supported (yet!)\n%s'
160                 % (self.name, version,
161                    pprint.pformat(info['Force file list'])))
162
163     def _read_data_path(self, path, info):
164         """Read curve data from the PicoForce file at `path`.
165
166         See :meth:`._read_data_file`.
167         """
168         f = file(path, 'rb')
169         data = self._read_data_file(f, info)
170         f.close()
171         return data
172
173     def _read_data_file(self, file, info):
174         file.seek(0)
175         traces = self._extract_traces(buffer(file.read()), info)
176         self._validate_traces(
177             traces['Z sensor'], traces['Deflection'])
178         L = len(traces['Deflection'])
179         approach = self._extract_block(
180             info, traces['Z sensor'], traces['Deflection'], 0, L/2, 'approach')
181         retract = self._extract_block(
182             info, traces['Z sensor'], traces['Deflection'], L/2, L, 'retract')
183         data = [approach, retract]
184         return data
185
186     def _extract_traces(self, buffer, info):
187         """Extract each of the three vector blocks in a PicoForce file.
188
189         The blocks are (in variable order):
190
191         * Z piezo sensor input
192         * Deflection input
193         * Deflection again?
194
195         And their headers are marked with 'Ciao force image list'.
196         """
197         traces = {}
198         version = info['Force file list']['Version']
199         type_re = re.compile('S \[(\w*)\] "([\w\s.]*)"')
200         for image in info['Ciao force image list']:
201             offset = int(image['Data offset'])
202             length = int(image['Data length'])
203             sample_size = int(image['Bytes/pixel'])
204             if sample_size != 2:
205                 raise NotImplementedError('Size: %s' % sample_size)
206             rows = length / sample_size
207             d = curve.Data(
208                 shape=(rows),
209                 dtype=numpy.int16,
210                 buffer=buffer,
211                 offset=offset,
212                 info=image,
213                 )
214             if version in ['0x06120002', '0x06130001']:
215                 match = type_re.match(image['@4:Image Data'])
216                 assert match != None, 'Bad regexp for %s, %s' \
217                     % ('@4:Image Data', image['@4:Image Data'])
218                 if version == '0x06130001' and match.group(1) == 'ZLowVoltage':
219                     assert match.group(2) == 'Low Voltage Z', \
220                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
221                 else:
222                     assert match.group(1).lower() == match.group(2).replace(' ','').lower(), \
223                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
224                 tname = match.group(2)
225             else:
226                 assert version == '0x07200000', version
227                 match = type_re.match(image['@4:Image Data'])
228                 assert match != None, 'Bad regexp for %s, %s' \
229                     % ('@4:Image Data', image['@4:Image Data'])
230                 if match.group(1) == 'PulseFreq1':
231                     assert match.group(2) == 'Freq. 1', match.group(2)
232                 else:
233                     assert match.group(1).lower() == match.group(2).replace(' ','').lower(), \
234                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
235                 tname = match.group(2)
236                 if tname == 'Freq. 1':  # Normalize trace names between versions
237                     tname = 'Z sensor'
238                 elif tname == 'Deflection Error':
239                     tname = 'Deflection'
240             if tname in traces:
241                 #d.tofile('%s-2.dat' % tname, sep='\n')
242                 tname = self._replace_name(tname, d, traces, info)
243                 if tname == None:
244                     continue  # Don't replace anything
245             else:
246                 #d.tofile('%s.dat' % tname, sep='\n')
247                 pass
248             traces[tname] = d
249         return traces
250
251     def _validate_traces(self, z_piezo, deflection):
252         if len(z_piezo) != len(deflection):
253             raise ValueError('Trace length missmatch: %d != %d'
254                              % (len(z_piezo), len(deflection)))
255
256     def _extract_block(self, info, z_piezo, deflection, start, stop, name):
257         block = curve.Data(
258             shape=(stop-start, 2),
259             dtype=numpy.float)
260         block[:,0] = z_piezo[start:stop]
261         block[:,1] = deflection[start:stop]
262         block.info = self._translate_block_info(
263             info, z_piezo.info, deflection.info, name)
264         block.info['columns'] = ['z piezo (m)', 'deflection (m)']
265         block = self._scale_block(block)
266         return block
267
268     def _replace_name(self, trace_name, trace, traces, info):
269         """Determine if a duplicate trace name should replace an earlier trace.
270
271         Return the target trace name if it should be replaced by the
272         new trace, or `None` if the new trace should be dropped.
273         """
274         #msg = []
275         #target = traces[trace_name]
276         #
277         ## Compare the info dictionaries for each trace
278         #ik = set(trace.info.keys())
279         #ok = set(traces[trace_name].info.keys())
280         #if ik != ok:  # Ensure we have the same set of keys for both traces
281         #    msg.append('extra keys: %s, missing keys %s' % (ik-ok, ok-ik))
282         #else:
283         #    # List keys we *require* to change between traces
284         #    variable_keys = ['Data offset', 'X data type']  # TODO: What is X data type?
285         #    for key in trace.info.keys():
286         #        if key in variable_keys:
287         #            if target.info[key] == trace.info[key]:
288         #                msg.append('constant %s (%s == %s)'
289         #                           % (key, target.info[key], trace.info[key]))
290         #        else:
291         #            if target.info[key] != trace.info[key]:
292         #                msg.append('variable %s (%s != %s)'
293         #                           % (key, target.info[key], trace.info[key]))
294         # Compare the data
295         #if not (traces[trace_name] == trace).all():
296         #    msg.append('data difference')
297         #if len(msg) > 0:
298         #    raise NotImplementedError(
299         #        'Missmatched duplicate traces for %s: %s'
300         #        % (trace_name, ', '.join(msg)))
301         import logging
302         log = logging.getLogger('hooke')
303         for name,t in traces.items():
304             if (t == trace).all():
305                 log.debug('replace %s with %s-2' % (name, trace_name))
306                 return name  # Replace this identical dataset.
307         log.debug('store %s-2 as Other' % (trace_name))
308         return 'Other'
309         # return None
310
311     def _translate_block_info(self, info, z_piezo_info, deflection_info, name):
312         version = info['Force file list']['Version']
313         ret = {
314             'name': name,
315             'raw info': info,
316             'raw z piezo info': z_piezo_info,
317             'raw deflection info': deflection_info,
318             'spring constant (N/m)': float(z_piezo_info['Spring Constant']),
319             }
320
321         t = info['Force file list']['Date'] # 04:42:34 PM Tue Sep 11 2007
322         ret['time'] = time.strptime(t, '%I:%M:%S %p %a %b %d %Y')
323
324         volt_re = re.compile(
325             'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) V/LSB\) (-?[.0-9]*) V')
326         hz_re = re.compile(
327             'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) kHz/LSB\) (-?[.0-9]*) kHz')
328         if version in ['0x06120002', '0x06130001']:
329             match = volt_re.match(z_piezo_info['@4:Z scale'])
330             assert match != None, 'Bad regexp for %s, %s' \
331                 % ('@4:Z scale', z_piezo_info['@4:Z scale'])
332             assert match.group(1) == 'ZSensorSens', z_piezo_info['@4:Z scale']
333         else:
334             assert version == '0x07200000', version
335             match = hz_re.match(z_piezo_info['@4:Z scale'])
336             assert match != None, 'Bad regexp for %s, %s' \
337                 % ('@4:Z scale', z_piezo_info['@4:Z scale'])
338             assert match.group(1) == 'Freq. 1', z_piezo_info['@4:Z scale']
339         ret['z piezo sensitivity (V/bit)'] = float(match.group(2))
340         ret['z piezo range (V)'] = float(match.group(3))
341         ret['z piezo offset (V)'] = 0.0
342         # offset assumed if raw data is signed...
343
344         match = volt_re.match(deflection_info['@4:Z scale'])
345         assert match != None, 'Bad regexp for %s, %s' \
346             % ('@4:Z scale', deflection_info['@4:Z scale'])
347         assert match.group(1) == 'DeflSens', z_piezo_info['@4:Z scale']
348         ret['deflection sensitivity (V/bit)'] = float(match.group(2))
349         ret['deflection range (V)'] = float(match.group(3))
350         ret['deflection offset (V)'] = 0.0
351         # offset assumed if raw data is signed...
352
353         nm_sens_re = re.compile('V ([.0-9]*) nm/V')
354         if version in ['0x06120002', '0x06130001']:        
355             match = nm_sens_re.match(info['Ciao scan list']['@Sens. ZSensorSens'])
356             assert match != None, 'Bad regexp for %s/%s, %s' \
357                 % ('Ciao scan list', '@Sens. ZSensorSens',
358                    info['Ciao scan list']['@Sens. ZSensorSens'])
359         else:
360             assert version == '0x07200000', version
361             match = nm_sens_re.match(info['Ciao scan list']['@Sens. ZsensSens'])
362             assert match != None, 'Bad regexp for %s/%s, %s' \
363                 % ('Ciao scan list', '@Sens. ZsensSens',
364                    info['Ciao scan list']['@Sens. ZsensSens'])
365         ret['z piezo sensitivity (m/V)'] = float(match.group(1))*1e-9
366
367         match = nm_sens_re.match(info['Ciao scan list']['@Sens. DeflSens'])
368         assert match != None, 'Bad regexp for %s/%s, %s' \
369             % ('Ciao scan list', '@Sens. DeflSens', info['Ciao scan list']['@Sens. DeflSens'])
370         ret['deflection sensitivity (m/V)'] = float(match.group(1))*1e-9
371
372         match = volt_re.match(info['Ciao force list']['@Z scan start'])
373         assert match != None, 'Bad regexp for %s/%s, %s' \
374             % ('Ciao force list', '@Z scan start', info['Ciao force list']['@Z scan start'])
375         ret['z piezo scan (V/bit)'] = float(match.group(2))
376         ret['z piezo scan start (V)'] = float(match.group(3))
377
378         match = volt_re.match(info['Ciao force list']['@Z scan size'])
379         assert match != None, 'Bad regexp for %s/%s, %s' \
380             % ('Ciao force list', '@Z scan size', info['Ciao force list']['@Z scan size'])
381         ret['z piezo scan size (V)'] = float(match.group(3))
382
383         const_re = re.compile('C \[([:\w\s]*)\] ([.0-9]*)')
384         match = const_re.match(z_piezo_info['@Z magnify'])
385         assert match != None, 'Bad regexp for %s, %s' \
386             % ('@Z magnify', info['@Z magnify'])
387         assert match.group(1) == '4:Z scale', match.group(1)
388         ret['z piezo gain'] = float(match.group(2))
389
390         if version in ['0x06120002', '0x06130001']:        
391             match = volt_re.match(z_piezo_info['@4:Z scale'])
392             assert match != None, 'Bad regexp for %s, %s' \
393                 % ('@4:Z scale', info['@4:Z scale'])
394             assert match.group(1) == 'ZSensorSens', match.group(1)
395             ret['z piezo sensitivity (V/bit)'] = float(match.group(2))
396             ret['z piezo range (V)'] = float(match.group(3))
397         else:
398             assert version == '0x07200000', version
399             pass
400
401         match = volt_re.match(z_piezo_info['@4:Ramp size'])
402         assert match != None, 'Bad regexp for %s, %s' \
403             % ('@4:Ramp size', info['@4:Ramp size'])
404         assert match.group(1) == 'Zsens', match.group(1)
405         ret['z piezo ramp size (V/bit)'] = float(match.group(2))
406         ret['z piezo ramp size (V)'] = float(match.group(3))
407
408         match = volt_re.match(z_piezo_info['@4:Ramp offset'])
409         assert match != None, 'Bad regexp for %s, %s' \
410             % ('@4:Ramp offset', info['@4:Ramp offset'])
411         assert match.group(1) == 'Zsens', match.group(1)
412         ret['z piezo ramp offset (V/bit)'] = float(match.group(2))
413         ret['z piezo ramp offset (V)'] = float(match.group(3))
414
415         # Unaccounted for:
416         #   Samps*
417
418         return ret
419
420     def _scale_block(self, data):
421         """Convert the block from its native format to a `numpy.float`
422         array in SI units.
423         """
424         ret = curve.Data(
425             shape=data.shape,
426             dtype=numpy.float,
427             )
428         info = data.info
429         ret.info = info
430         ret.info['raw data'] = data # store the raw data
431         data.info = {} # break circular reference info <-> data
432
433         z_col = info['columns'].index('z piezo (m)')
434         d_col = info['columns'].index('deflection (m)')
435
436         # Leading '-' because Veeco's z increases towards the surface
437         # (positive indentation), but it makes more sense to me to
438         # have it increase away from the surface (positive
439         # separation).
440         ret[:,z_col] = -(
441             (data[:,z_col].astype(ret.dtype)
442              * info['z piezo sensitivity (V/bit)']
443              - info['z piezo offset (V)'])
444             * info['z piezo gain']
445             * info['z piezo sensitivity (m/V)']
446             )
447
448         # Leading '-' because deflection voltage increases as the tip
449         # moves away from the surface, but it makes more sense to me
450         # to have it increase as it moves toward the surface (positive
451         # tension on the protein chain).
452         ret[:,d_col] = -(
453             (data[:,d_col]
454              * info['deflection sensitivity (V/bit)']
455              - info['deflection offset (V)'])
456             * info['deflection sensitivity (m/V)']
457             )
458
459         return ret