Initial attempt at PicoForce 0x05120005 support.
[hooke.git] / hooke / driver / picoforce.py
1 # Copyright (C) 2006-2010 Alberto Gomez-Casado
2 #                         Massimo Sandal <devicerandom@gmail.com>
3 #                         W. Trevor King <wking@drexel.edu>
4 #
5 # This file is part of Hooke.
6 #
7 # Hooke is free software: you can redistribute it and/or modify it
8 # under the terms of the GNU Lesser General Public License as
9 # published by the Free Software Foundation, either version 3 of the
10 # License, or (at your option) any later version.
11 #
12 # Hooke is distributed in the hope that it will be useful, but WITHOUT
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
15 # Public License for more details.
16 #
17 # You should have received a copy of the GNU Lesser General Public
18 # License along with Hooke.  If not, see
19 # <http://www.gnu.org/licenses/>.
20
21 """Driver for Bruker PicoForce force spectroscopy files.
22 """
23
24 import os.path
25 import pprint
26 import re
27 import time
28
29 import numpy
30
31 from .. import curve as curve # this module defines data containers.
32 from . import Driver as Driver # this is the Driver base class
33
34
35 __version__='0.0.0.20110421'
36
37
38 class PicoForceDriver (Driver):
39     """Handle Bruker Picoforce force spectroscopy files.
40     """
41     def __init__(self):
42         super(PicoForceDriver, self).__init__(name='picoforce')
43
44     def is_me(self, path):
45         if os.path.isdir(path):
46             return False
47         f = file(path, 'r')
48         header = f.read(30)
49         f.close()
50
51         return header[2:17] == 'Force file list'
52
53     def read(self, path, info=None):
54         info = self._read_header_path(path)
55         self._check_version(info)
56         data = self._read_data_path(path, info)
57         return (data, info)
58
59     def _read_header_path(self, path):
60         """Read curve information from the PicoForce file at `path`.
61
62         See :meth:`._read_header_file`.
63         """
64         return self._read_header_file(file(path, 'rb'))
65
66     def _read_header_file(self, file):
67         r"""Read curve information from a PicoForce file.
68
69         Return a dict of dicts representing the information.  If a
70         field is repeated multiple times, it's value is replaced by a
71         list of the values for each occurence.
72
73         Examples
74         --------
75
76         >>> import pprint
77         >>> import StringIO
78         >>> f = StringIO.StringIO('\r\n'.join([
79         ...             '\*Force file list',
80         ...             '\Version: 0x06120002',
81         ...             '\Date: 04:42:34 PM Tue Sep 11 2007',
82         ...             '\Start context: FOL2',
83         ...             '\Data length: 40960',
84         ...             '\Text: ',
85         ...             '\*Equipment list',
86         ...             '\Description: Extended PicoForce',
87         ...             '\Controller: IIIA',
88         ...             '\*Ciao force image list',
89         ...             '\Data offset: 40960',
90         ...             '\Data length: 8192',
91         ...             '\*Ciao force image list',
92         ...             '\Data offset: 49152',
93         ...             '\Data length: 8192',
94         ...             '\*Ciao force image list',
95         ...             '\Data offset: 57344',
96         ...             '\Data length: 8192',
97         ...             ]))
98         >>> p = PicoForceDriver()
99         >>> d = p._read_header_file(f)
100         >>> pprint.pprint(d, width=60)
101         {'Ciao force image list': [{'Data length': '8192',
102                                     'Data offset': '40960'},
103                                    {'Data length': '8192',
104                                     'Data offset': '49152'},
105                                    {'Data length': '8192',
106                                     'Data offset': '57344'}],
107          'Equipment list': {'Controller': 'IIIA',
108                             'Description': 'Extended PicoForce'},
109          'Force file list': {'Data length': '40960',
110                              'Date': '04:42:34 PM Tue Sep 11 2007',
111                              'Start context': 'FOL2',
112                              'Text:': None,
113                              'Version': '0x06120002'}}
114         """
115         info = {}
116         header_field = None
117         for line in file:
118             line = line.strip()
119             if line.startswith('\*File list end'):
120                 break
121             if line.startswith(r'\*'):
122                 header_field = line[len(r'\*'):]
123                 if header_field in info:
124                     if isinstance(info[header_field], list):
125                         info[header_field].append({}) # >=3rd appearance
126                     else: # Second appearance
127                         info[header_field] = [info[header_field], {}]
128                 else: # First appearance
129                     info[header_field] = {}
130             else:
131                 assert line.startswith('\\'), line
132                 fields = line[len('\\'):].split(': ', 1)
133                 key = fields[0]
134                 if len(fields) == 1: # fields = [key]
135                     value = None
136                 else: # fields = [key, value]
137                     value = fields[1]
138                 if isinstance(info[header_field], list): # >=2nd header_field
139                     target_dict = info[header_field][-1]
140                 else: # first appearance of header_field
141                     target_dict = info[header_field]
142                 if key in target_dict and target_dict[key] != value:
143                     raise NotImplementedError(
144                         'Overwriting %s: %s -> %s'
145                         % (key, target_dict[key], value))
146                 target_dict[key] = value
147         return (info)
148
149     def _check_version(self, info):
150         """Ensure the input file is a version we understand.
151
152         Otherwise, raise `ValueError`.
153         """
154         version = info['Force file list'].get('Version', None)
155         if version not in ['0x05120005', '0x06120002', '0x06130001',
156                            '0x07200000']:
157             raise NotImplementedError(
158                 '%s file version %s not supported (yet!)\n%s'
159                 % (self.name, version,
160                    pprint.pformat(info['Force file list'])))
161
162     def _read_data_path(self, path, info):
163         """Read curve data from the PicoForce file at `path`.
164
165         See :meth:`._read_data_file`.
166         """
167         f = file(path, 'rb')
168         data = self._read_data_file(f, info)
169         f.close()
170         return data
171
172     def _read_data_file(self, file, info):
173         file.seek(0)
174         traces = self._extract_traces(buffer(file.read()), info)
175         self._validate_traces(
176             traces['Z sensor'], traces['Deflection'])
177         L = len(traces['Deflection'])
178         approach = self._extract_block(
179             info, traces['Z sensor'], traces['Deflection'], 0, L/2, 'approach')
180         retract = self._extract_block(
181             info, traces['Z sensor'], traces['Deflection'], L/2, L, 'retract')
182         data = [approach, retract]
183         return data
184
185     def _extract_traces(self, buffer, info):
186         """Extract each of the three vector blocks in a PicoForce file.
187
188         The blocks are (in variable order):
189
190         * Z piezo sensor input
191         * Deflection input
192         * Deflection again?
193
194         And their headers are marked with 'Ciao force image list'.
195         """
196         traces = {}
197         version = info['Force file list']['Version']
198         type_re = re.compile('S \[(\w*)\] "([\w\s.]*)"')
199         if isinstance(info['Ciao force image list'], dict):
200             # there was only one image, but convert to list for processing
201             info['Ciao force image list'] = [info['Ciao force image list']]
202         for image in info['Ciao force image list']:
203             offset = int(image['Data offset'])
204             length = int(image['Data length'])
205             sample_size = int(image['Bytes/pixel'])
206             if sample_size != 2:
207                 raise NotImplementedError('Size: %s' % sample_size)
208             rows = length / sample_size
209             d = curve.Data(
210                 shape=(rows),
211                 dtype=numpy.int16,
212                 buffer=buffer,
213                 offset=offset,
214                 info=image,
215                 )
216             if version in ['0x05120005', '0x06120002', '0x06130001']:
217                 match = type_re.match(image['@4:Image Data'])
218                 assert match != None, 'Bad regexp for %s, %s' \
219                     % ('@4:Image Data', image['@4:Image Data'])
220                 if version == '0x06130001' and match.group(1) == 'ZLowVoltage':
221                     assert match.group(2) == 'Low Voltage Z', \
222                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
223                 else:
224                     assert match.group(1).lower() == match.group(2).replace(' ','').lower(), \
225                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
226                 tname = match.group(2)
227             else:
228                 assert version == '0x07200000', version
229                 match = type_re.match(image['@4:Image Data'])
230                 assert match != None, 'Bad regexp for %s, %s' \
231                     % ('@4:Image Data', image['@4:Image Data'])
232                 if match.group(1) == 'PulseFreq1':
233                     assert match.group(2) == 'Freq. 1', match.group(2)
234                 else:
235                     assert match.group(1).lower() == match.group(2).replace(' ','').lower(), \
236                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
237                 tname = match.group(2)
238                 if tname == 'Freq. 1':  # Normalize trace names between versions
239                     tname = 'Z sensor'
240                 elif tname == 'Deflection Error':
241                     tname = 'Deflection'
242             if tname in traces:
243                 #d.tofile('%s-2.dat' % tname, sep='\n')
244                 tname = self._replace_name(tname, d, traces, info)
245                 if tname == None:
246                     continue  # Don't replace anything
247             else:
248                 #d.tofile('%s.dat' % tname, sep='\n')
249                 pass
250             traces[tname] = d
251         if 'Z sensor' not in traces:
252             # some picoforce files only save deflection
253             assert version == '0x05120005', version
254             force_info = info['Ciao force list']
255             deflection = traces['Deflection']
256             volt_re = re.compile(
257                 'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) V/LSB\) *(-?[.0-9]*) V')
258             match = volt_re.match(force_info['@4:Ramp size Zsweep'])
259             size = float(match.group(3))
260             match = volt_re.match(force_info['@4:Ramp offset Zsweep'])
261             offset = float(match.group(3))
262             match = volt_re.match(force_info['@4:Ramp Begin Zsweep'])
263             begin = float(match.group(3))
264             match = volt_re.match(force_info['@4:Ramp End Zsweep'])
265             end = float(match.group(3))
266             #\@4:Feedback value Zsweep: V [Sens. Zscan] (0.002056286 V/LSB)       0 V
267             #\@4:Z display Zsweep: V [Sens. Zscan] (0.002056286 V/LSB) 18.53100 V
268             assert len(deflection) % 2 == 0, len(deflection)
269             points = len(deflection)/2
270             traces['Z sensor'] = curve.Data(
271                 shape=deflection.shape,
272                 dtype=numpy.float,
273                 info=info['Ciao force image list'][0],
274                 )
275             # deflection data seems to be saved as
276             #   [final approach, ..., initial approach,
277             #    initial retract, ..., final retract]
278             traces['Z sensor'][:points] = numpy.linspace(
279                 offset+begin+size, offset+begin, points)
280             traces['Z sensor'][-points:] = numpy.linspace(
281                 offset+begin+size, offset+end, points)
282         return traces
283
284     def _validate_traces(self, z_piezo, deflection):
285         if len(z_piezo) != len(deflection):
286             raise ValueError('Trace length missmatch: %d != %d'
287                              % (len(z_piezo), len(deflection)))
288
289     def _extract_block(self, info, z_piezo, deflection, start, stop, name):
290         block = curve.Data(
291             shape=(stop-start, 2),
292             dtype=numpy.float)
293         block[:,0] = z_piezo[start:stop]
294         block[:,1] = deflection[start:stop]
295         block.info = self._translate_block_info(
296             info, z_piezo.info, deflection.info, name)
297         block.info['columns'] = ['z piezo (m)', 'deflection (m)']
298         block = self._scale_block(block)
299         return block
300
301     def _replace_name(self, trace_name, trace, traces, info):
302         """Determine if a duplicate trace name should replace an earlier trace.
303
304         Return the target trace name if it should be replaced by the
305         new trace, or `None` if the new trace should be dropped.
306         """
307         #msg = []
308         #target = traces[trace_name]
309         #
310         ## Compare the info dictionaries for each trace
311         #ik = set(trace.info.keys())
312         #ok = set(traces[trace_name].info.keys())
313         #if ik != ok:  # Ensure we have the same set of keys for both traces
314         #    msg.append('extra keys: %s, missing keys %s' % (ik-ok, ok-ik))
315         #else:
316         #    # List keys we *require* to change between traces
317         #    variable_keys = ['Data offset', 'X data type']  # TODO: What is X data type?
318         #    for key in trace.info.keys():
319         #        if key in variable_keys:
320         #            if target.info[key] == trace.info[key]:
321         #                msg.append('constant %s (%s == %s)'
322         #                           % (key, target.info[key], trace.info[key]))
323         #        else:
324         #            if target.info[key] != trace.info[key]:
325         #                msg.append('variable %s (%s != %s)'
326         #                           % (key, target.info[key], trace.info[key]))
327         # Compare the data
328         #if not (traces[trace_name] == trace).all():
329         #    msg.append('data difference')
330         #if len(msg) > 0:
331         #    raise NotImplementedError(
332         #        'Missmatched duplicate traces for %s: %s'
333         #        % (trace_name, ', '.join(msg)))
334         import logging
335         log = logging.getLogger('hooke')
336         for name,t in traces.items():
337             if (t == trace).all():
338                 log.debug('replace %s with %s-2' % (name, trace_name))
339                 return name  # Replace this identical dataset.
340         log.debug('store %s-2 as Other' % (trace_name))
341         return 'Other'
342         # return None
343
344     def _translate_block_info(self, info, z_piezo_info, deflection_info, name):
345         version = info['Force file list']['Version']
346         if version == '0x05120005':
347             k_key = 'Spring constant'
348         else:
349             assert version in [
350                 '0x06120002', '0x06130001', '0x07200000'], version
351             k_key = 'Spring Constant'
352         ret = {
353             'name': name,
354             'raw info': info,
355             'raw z piezo info': z_piezo_info,
356             'raw deflection info': deflection_info,
357             'spring constant (N/m)': float(z_piezo_info[k_key]),
358             }
359
360         t = info['Force file list']['Date'] # 04:42:34 PM Tue Sep 11 2007
361         ret['time'] = time.strptime(t, '%I:%M:%S %p %a %b %d %Y')
362
363         volt_re = re.compile(
364             'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) V/LSB\) (-?[.0-9]*) V')
365         hz_re = re.compile(
366             'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) kHz/LSB\) (-?[.0-9]*) kHz')
367         if version in ['0x05120005', '0x06120002', '0x06130001']:
368             match = volt_re.match(z_piezo_info['@4:Z scale'])
369             assert match != None, 'Bad regexp for %s, %s' \
370                 % ('@4:Z scale', z_piezo_info['@4:Z scale'])
371             if version == '0x05120005':
372                 assert match.group(1) == 'Deflection', (
373                     z_piezo_info['@4:Z scale'])
374             else:
375                 assert match.group(1) == 'ZSensorSens', (
376                     z_piezo_info['@4:Z scale'])
377         else:
378             assert version == '0x07200000', version
379             match = hz_re.match(z_piezo_info['@4:Z scale'])
380             assert match != None, 'Bad regexp for %s, %s' \
381                 % ('@4:Z scale', z_piezo_info['@4:Z scale'])
382             assert match.group(1) == 'Freq. 1', z_piezo_info['@4:Z scale']
383         ret['z piezo sensitivity (V/bit)'] = float(match.group(2))
384         ret['z piezo range (V)'] = float(match.group(3))
385         ret['z piezo offset (V)'] = 0.0
386         # offset assumed if raw data is signed...
387
388         match = volt_re.match(deflection_info['@4:Z scale'])
389         assert match != None, 'Bad regexp for %s, %s' \
390             % ('@4:Z scale', deflection_info['@4:Z scale'])
391         if version == '0x05120005':
392             assert match.group(1) == 'Deflection', z_piezo_info['@4:Z scale']
393         else:
394             assert version in [
395                 '0x06120002', '0x06130001', '0x07200000'], version
396             assert match.group(1) == 'DeflSens', z_piezo_info['@4:Z scale']
397         ret['deflection sensitivity (V/bit)'] = float(match.group(2))
398         ret['deflection range (V)'] = float(match.group(3))
399         ret['deflection offset (V)'] = 0.0
400         # offset assumed if raw data is signed...
401
402         nm_sens_re = re.compile('V ([.0-9]*) nm/V')
403         if version == '0x05120005':
404             match = nm_sens_re.match(info['Scanner list']['@Sens. Zscan'])
405             assert match != None, 'Bad regexp for %s/%s, %s' \
406                 % ('Scanner list', '@Sens. Zscan',
407                    info['Scanner list']['@Sens. Zscan'])
408         elif version in ['0x06120002', '0x06130001']:        
409             match = nm_sens_re.match(info['Ciao scan list']['@Sens. ZSensorSens'])
410             assert match != None, 'Bad regexp for %s/%s, %s' \
411                 % ('Ciao scan list', '@Sens. ZSensorSens',
412                    info['Ciao scan list']['@Sens. ZSensorSens'])
413         else:
414             assert version == '0x07200000', version
415             match = nm_sens_re.match(info['Ciao scan list']['@Sens. ZsensSens'])
416             assert match != None, 'Bad regexp for %s/%s, %s' \
417                 % ('Ciao scan list', '@Sens. ZsensSens',
418                    info['Ciao scan list']['@Sens. ZsensSens'])
419         ret['z piezo sensitivity (m/V)'] = float(match.group(1))*1e-9
420
421         if version == '0x05120005':
422             match = nm_sens_re.match(info['Ciao scan list']['@Sens. Deflection'])
423             assert match != None, 'Bad regexp for %s/%s, %s' \
424                 % ('Scanner list', '@Sens. Zscan',
425                    info['Ciao scan list']['@Sens. Deflection'])
426         else:
427             assert version in [
428                 '0x06120002', '0x06130001', '0x07200000'], version
429             match = nm_sens_re.match(info['Ciao scan list']['@Sens. DeflSens'])
430             assert match != None, 'Bad regexp for %s/%s, %s' \
431                 % ('Ciao scan list', '@Sens. DeflSens', info['Ciao scan list']['@Sens. DeflSens'])
432         ret['deflection sensitivity (m/V)'] = float(match.group(1))*1e-9
433
434         match = volt_re.match(info['Ciao force list']['@Z scan start'])
435         assert match != None, 'Bad regexp for %s/%s, %s' \
436             % ('Ciao force list', '@Z scan start', info['Ciao force list']['@Z scan start'])
437         ret['z piezo scan (V/bit)'] = float(match.group(2))
438         ret['z piezo scan start (V)'] = float(match.group(3))
439
440         match = volt_re.match(info['Ciao force list']['@Z scan size'])
441         assert match != None, 'Bad regexp for %s/%s, %s' \
442             % ('Ciao force list', '@Z scan size', info['Ciao force list']['@Z scan size'])
443         ret['z piezo scan size (V)'] = float(match.group(3))
444
445         const_re = re.compile('C \[([:\w\s]*)\] ([.0-9]*)')
446         match = const_re.match(z_piezo_info['@Z magnify'])
447         assert match != None, 'Bad regexp for %s, %s' \
448             % ('@Z magnify', info['@Z magnify'])
449         assert match.group(1) == '4:Z scale', match.group(1)
450         ret['z piezo gain'] = float(match.group(2))
451
452         if version in ['0x06120002', '0x06130001']:        
453             match = volt_re.match(z_piezo_info['@4:Z scale'])
454             assert match != None, 'Bad regexp for %s, %s' \
455                 % ('@4:Z scale', info['@4:Z scale'])
456             assert match.group(1) == 'ZSensorSens', match.group(1)
457             ret['z piezo sensitivity (V/bit)'] = float(match.group(2))
458             ret['z piezo range (V)'] = float(match.group(3))
459         else:
460             assert version in ['0x05120005', '0x07200000'], version
461             pass
462
463         if version == '0x05120005':
464             # already accounded for when generating 'Z sensor' trace
465             pass
466         else:
467             assert version in [
468                 '0x06120002', '0x06130001', '0x07200000'], version
469             match = volt_re.match(z_piezo_info['@4:Ramp size'])
470             assert match != None, 'Bad regexp for %s, %s' \
471                 % ('@4:Ramp size', info['@4:Ramp size'])
472             assert match.group(1) == 'Zsens', match.group(1)
473             ret['z piezo ramp size (V/bit)'] = float(match.group(2))
474             ret['z piezo ramp size (V)'] = float(match.group(3))
475
476             match = volt_re.match(z_piezo_info['@4:Ramp offset'])
477             assert match != None, 'Bad regexp for %s, %s' \
478                 % ('@4:Ramp offset', info['@4:Ramp offset'])
479             assert match.group(1) == 'Zsens', match.group(1)
480             ret['z piezo ramp offset (V/bit)'] = float(match.group(2))
481             ret['z piezo ramp offset (V)'] = float(match.group(3))
482
483         # Unaccounted for:
484         #   Samps*
485
486         return ret
487
488     def _scale_block(self, data):
489         """Convert the block from its native format to a `numpy.float`
490         array in SI units.
491         """
492         ret = curve.Data(
493             shape=data.shape,
494             dtype=numpy.float,
495             )
496         info = data.info
497         ret.info = info
498         ret.info['raw data'] = data # store the raw data
499         data.info = {} # break circular reference info <-> data
500
501         z_col = info['columns'].index('z piezo (m)')
502         d_col = info['columns'].index('deflection (m)')
503
504         # Leading '-' because Bruker's z increases towards the surface
505         # (positive indentation), but it makes more sense to me to
506         # have it increase away from the surface (positive
507         # separation).
508         ret[:,z_col] = -(
509             (data[:,z_col].astype(ret.dtype)
510              * info['z piezo sensitivity (V/bit)']
511              - info['z piezo offset (V)'])
512             * info['z piezo gain']
513             * info['z piezo sensitivity (m/V)']
514             )
515
516         # Leading '-' because deflection voltage increases as the tip
517         # moves away from the surface, but it makes more sense to me
518         # to have it increase as it moves toward the surface (positive
519         # tension on the protein chain).
520         ret[:,d_col] = -(
521             (data[:,d_col]
522              * info['deflection sensitivity (V/bit)']
523              - info['deflection offset (V)'])
524             * info['deflection sensitivity (m/V)']
525             )
526
527         return ret