Run update-copyright.py.
[hooke.git] / hooke / driver / picoforce.py
1 # Copyright (C) 2006-2012 Alberto Gomez-Casado <a.gomezcasado@tnw.utwente.nl>
2 #                         Massimo Sandal <devicerandom@gmail.com>
3 #                         W. Trevor King <wking@drexel.edu>
4 #
5 # This file is part of Hooke.
6 #
7 # Hooke is free software: you can redistribute it and/or modify it under the
8 # terms of the GNU Lesser General Public License as published by the Free
9 # Software Foundation, either version 3 of the License, or (at your option) any
10 # later version.
11 #
12 # Hooke is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
15 # details.
16 #
17 # You should have received a copy of the GNU Lesser General Public License
18 # along with Hooke.  If not, see <http://www.gnu.org/licenses/>.
19
20 """Driver for Bruker PicoForce force spectroscopy files.
21 """
22
23 import os.path
24 import pprint
25 import re
26 import time
27
28 import numpy
29
30 from .. import curve as curve # this module defines data containers.
31 from . import Driver as Driver # this is the Driver base class
32
33
34 __version__='0.0.0.20110421'
35
36
37 class PicoForceDriver (Driver):
38     """Handle Bruker Picoforce force spectroscopy files.
39     """
40     def __init__(self):
41         super(PicoForceDriver, self).__init__(name='picoforce')
42
43     def is_me(self, path):
44         if os.path.isdir(path):
45             return False
46         f = file(path, 'r')
47         header = f.read(30)
48         f.close()
49
50         return header[2:17] == 'Force file list'
51
52     def read(self, path, info=None):
53         info = self._read_header_path(path)
54         self._check_version(info)
55         data = self._read_data_path(path, info)
56         return (data, info)
57
58     def _read_header_path(self, path):
59         """Read curve information from the PicoForce file at `path`.
60
61         See :meth:`._read_header_file`.
62         """
63         return self._read_header_file(file(path, 'rb'))
64
65     def _read_header_file(self, file):
66         r"""Read curve information from a PicoForce file.
67
68         Return a dict of dicts representing the information.  If a
69         field is repeated multiple times, it's value is replaced by a
70         list of the values for each occurence.
71
72         Examples
73         --------
74
75         >>> import pprint
76         >>> import StringIO
77         >>> f = StringIO.StringIO('\r\n'.join([
78         ...             '\*Force file list',
79         ...             '\Version: 0x06120002',
80         ...             '\Date: 04:42:34 PM Tue Sep 11 2007',
81         ...             '\Start context: FOL2',
82         ...             '\Data length: 40960',
83         ...             '\Text: ',
84         ...             '\*Equipment list',
85         ...             '\Description: Extended PicoForce',
86         ...             '\Controller: IIIA',
87         ...             '\*Ciao force image list',
88         ...             '\Data offset: 40960',
89         ...             '\Data length: 8192',
90         ...             '\*Ciao force image list',
91         ...             '\Data offset: 49152',
92         ...             '\Data length: 8192',
93         ...             '\*Ciao force image list',
94         ...             '\Data offset: 57344',
95         ...             '\Data length: 8192',
96         ...             ]))
97         >>> p = PicoForceDriver()
98         >>> d = p._read_header_file(f)
99         >>> pprint.pprint(d, width=60)
100         {'Ciao force image list': [{'Data length': '8192',
101                                     'Data offset': '40960'},
102                                    {'Data length': '8192',
103                                     'Data offset': '49152'},
104                                    {'Data length': '8192',
105                                     'Data offset': '57344'}],
106          'Equipment list': {'Controller': 'IIIA',
107                             'Description': 'Extended PicoForce'},
108          'Force file list': {'Data length': '40960',
109                              'Date': '04:42:34 PM Tue Sep 11 2007',
110                              'Start context': 'FOL2',
111                              'Text:': None,
112                              'Version': '0x06120002'}}
113         """
114         info = {}
115         header_field = None
116         for line in file:
117             line = line.strip()
118             if line.startswith('\*File list end'):
119                 break
120             if line.startswith(r'\*'):
121                 header_field = line[len(r'\*'):]
122                 if header_field in info:
123                     if isinstance(info[header_field], list):
124                         info[header_field].append({}) # >=3rd appearance
125                     else: # Second appearance
126                         info[header_field] = [info[header_field], {}]
127                 else: # First appearance
128                     info[header_field] = {}
129             else:
130                 assert line.startswith('\\'), line
131                 fields = line[len('\\'):].split(': ', 1)
132                 key = fields[0]
133                 if len(fields) == 1: # fields = [key]
134                     value = None
135                 else: # fields = [key, value]
136                     value = fields[1]
137                 if isinstance(info[header_field], list): # >=2nd header_field
138                     target_dict = info[header_field][-1]
139                 else: # first appearance of header_field
140                     target_dict = info[header_field]
141                 if key in target_dict and target_dict[key] != value:
142                     raise NotImplementedError(
143                         'Overwriting %s: %s -> %s'
144                         % (key, target_dict[key], value))
145                 target_dict[key] = value
146         return (info)
147
148     def _check_version(self, info):
149         """Ensure the input file is a version we understand.
150
151         Otherwise, raise `ValueError`.
152         """
153         version = info['Force file list'].get('Version', None)
154         if version not in ['0x05120005', '0x06120002', '0x06130001',
155                            '0x07200000']:
156             raise NotImplementedError(
157                 '%s file version %s not supported (yet!)\n%s'
158                 % (self.name, version,
159                    pprint.pformat(info['Force file list'])))
160
161     def _read_data_path(self, path, info):
162         """Read curve data from the PicoForce file at `path`.
163
164         See :meth:`._read_data_file`.
165         """
166         f = file(path, 'rb')
167         data = self._read_data_file(f, info)
168         f.close()
169         return data
170
171     def _read_data_file(self, file, info):
172         file.seek(0)
173         traces = self._extract_traces(buffer(file.read()), info)
174         self._validate_traces(
175             traces['Z sensor'], traces['Deflection'])
176         L = len(traces['Deflection'])
177         approach = self._extract_block(
178             info, traces['Z sensor'], traces['Deflection'], 0, L/2, 'approach')
179         retract = self._extract_block(
180             info, traces['Z sensor'], traces['Deflection'], L/2, L, 'retract')
181         data = [approach, retract]
182         return data
183
184     def _extract_traces(self, buffer, info):
185         """Extract each of the three vector blocks in a PicoForce file.
186
187         The blocks are (in variable order):
188
189         * Z piezo sensor input
190         * Deflection input
191         * Deflection again?
192
193         And their headers are marked with 'Ciao force image list'.
194         """
195         traces = {}
196         version = info['Force file list']['Version']
197         type_re = re.compile('S \[(\w*)\] "([\w\s.]*)"')
198         if isinstance(info['Ciao force image list'], dict):
199             # there was only one image, but convert to list for processing
200             info['Ciao force image list'] = [info['Ciao force image list']]
201         for image in info['Ciao force image list']:
202             offset = int(image['Data offset'])
203             length = int(image['Data length'])
204             sample_size = int(image['Bytes/pixel'])
205             if sample_size != 2:
206                 raise NotImplementedError('Size: %s' % sample_size)
207             rows = length / sample_size
208             d = curve.Data(
209                 shape=(rows),
210                 dtype=numpy.int16,
211                 buffer=buffer,
212                 offset=offset,
213                 info=image,
214                 )
215             if version in ['0x05120005', '0x06120002', '0x06130001']:
216                 match = type_re.match(image['@4:Image Data'])
217                 assert match != None, 'Bad regexp for %s, %s' \
218                     % ('@4:Image Data', image['@4:Image Data'])
219                 if version == '0x06130001' and match.group(1) == 'ZLowVoltage':
220                     assert match.group(2) == 'Low Voltage Z', \
221                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
222                 else:
223                     assert match.group(1).lower() == match.group(2).replace(' ','').lower(), \
224                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
225                 tname = match.group(2)
226             else:
227                 assert version == '0x07200000', version
228                 match = type_re.match(image['@4:Image Data'])
229                 assert match != None, 'Bad regexp for %s, %s' \
230                     % ('@4:Image Data', image['@4:Image Data'])
231                 if match.group(1) == 'PulseFreq1':
232                     assert match.group(2) == 'Freq. 1', match.group(2)
233                 else:
234                     assert match.group(1).lower() == match.group(2).replace(' ','').lower(), \
235                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
236                 tname = match.group(2)
237                 if tname == 'Freq. 1':  # Normalize trace names between versions
238                     tname = 'Z sensor'
239                 elif tname == 'Deflection Error':
240                     tname = 'Deflection'
241             if tname in traces:
242                 #d.tofile('%s-2.dat' % tname, sep='\n')
243                 tname = self._replace_name(tname, d, traces, info)
244                 if tname == None:
245                     continue  # Don't replace anything
246             else:
247                 #d.tofile('%s.dat' % tname, sep='\n')
248                 pass
249             traces[tname] = d
250         if 'Z sensor' not in traces:
251             # some picoforce files only save deflection
252             assert version == '0x05120005', version
253             force_info = info['Ciao force list']
254             deflection = traces['Deflection']
255             volt_re = re.compile(
256                 'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) V/LSB\) *(-?[.0-9]*) V')
257             match = volt_re.match(force_info['@4:Ramp size Zsweep'])
258             size = float(match.group(3))
259             match = volt_re.match(force_info['@4:Ramp offset Zsweep'])
260             offset = float(match.group(3))
261             match = volt_re.match(force_info['@4:Ramp Begin Zsweep'])
262             begin = float(match.group(3))
263             match = volt_re.match(force_info['@4:Ramp End Zsweep'])
264             end = float(match.group(3))
265             #\@4:Feedback value Zsweep: V [Sens. Zscan] (0.002056286 V/LSB)       0 V
266             #\@4:Z display Zsweep: V [Sens. Zscan] (0.002056286 V/LSB) 18.53100 V
267             assert len(deflection) % 2 == 0, len(deflection)
268             points = len(deflection)/2
269             traces['Z sensor'] = curve.Data(
270                 shape=deflection.shape,
271                 dtype=numpy.float,
272                 info=info['Ciao force image list'][0],
273                 )
274             # deflection data seems to be saved as
275             #   [final approach, ..., initial approach,
276             #    initial retract, ..., final retract]
277             traces['Z sensor'][:points] = numpy.linspace(
278                 offset+begin+size, offset+begin, points)
279             traces['Z sensor'][-points:] = numpy.linspace(
280                 offset+begin+size, offset+end, points)
281         return traces
282
283     def _validate_traces(self, z_piezo, deflection):
284         if len(z_piezo) != len(deflection):
285             raise ValueError('Trace length missmatch: %d != %d'
286                              % (len(z_piezo), len(deflection)))
287
288     def _extract_block(self, info, z_piezo, deflection, start, stop, name):
289         block = curve.Data(
290             shape=(stop-start, 2),
291             dtype=numpy.float)
292         block[:,0] = z_piezo[start:stop]
293         block[:,1] = deflection[start:stop]
294         block.info = self._translate_block_info(
295             info, z_piezo.info, deflection.info, name)
296         block.info['columns'] = ['z piezo (m)', 'deflection (m)']
297         block = self._scale_block(block)
298         return block
299
300     def _replace_name(self, trace_name, trace, traces, info):
301         """Determine if a duplicate trace name should replace an earlier trace.
302
303         Return the target trace name if it should be replaced by the
304         new trace, or `None` if the new trace should be dropped.
305         """
306         #msg = []
307         #target = traces[trace_name]
308         #
309         ## Compare the info dictionaries for each trace
310         #ik = set(trace.info.keys())
311         #ok = set(traces[trace_name].info.keys())
312         #if ik != ok:  # Ensure we have the same set of keys for both traces
313         #    msg.append('extra keys: %s, missing keys %s' % (ik-ok, ok-ik))
314         #else:
315         #    # List keys we *require* to change between traces
316         #    variable_keys = ['Data offset', 'X data type']  # TODO: What is X data type?
317         #    for key in trace.info.keys():
318         #        if key in variable_keys:
319         #            if target.info[key] == trace.info[key]:
320         #                msg.append('constant %s (%s == %s)'
321         #                           % (key, target.info[key], trace.info[key]))
322         #        else:
323         #            if target.info[key] != trace.info[key]:
324         #                msg.append('variable %s (%s != %s)'
325         #                           % (key, target.info[key], trace.info[key]))
326         # Compare the data
327         #if not (traces[trace_name] == trace).all():
328         #    msg.append('data difference')
329         #if len(msg) > 0:
330         #    raise NotImplementedError(
331         #        'Missmatched duplicate traces for %s: %s'
332         #        % (trace_name, ', '.join(msg)))
333         import logging
334         log = logging.getLogger('hooke')
335         for name,t in traces.items():
336             if (t == trace).all():
337                 log.debug('replace %s with %s-2' % (name, trace_name))
338                 return name  # Replace this identical dataset.
339         log.debug('store %s-2 as Other' % (trace_name))
340         return 'Other'
341         # return None
342
343     def _translate_block_info(self, info, z_piezo_info, deflection_info, name):
344         version = info['Force file list']['Version']
345         if version == '0x05120005':
346             k_key = 'Spring constant'
347         else:
348             assert version in [
349                 '0x06120002', '0x06130001', '0x07200000'], version
350             k_key = 'Spring Constant'
351         ret = {
352             'name': name,
353             'raw info': info,
354             'raw z piezo info': z_piezo_info,
355             'raw deflection info': deflection_info,
356             'spring constant (N/m)': float(z_piezo_info[k_key]),
357             }
358
359         t = info['Force file list']['Date'] # 04:42:34 PM Tue Sep 11 2007
360         ret['time'] = time.strptime(t, '%I:%M:%S %p %a %b %d %Y')
361
362         volt_re = re.compile(
363             'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) V/LSB\) (-?[.0-9]*) V')
364         hz_re = re.compile(
365             'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) kHz/LSB\) (-?[.0-9]*) kHz')
366         if version in ['0x05120005', '0x06120002', '0x06130001']:
367             match = volt_re.match(z_piezo_info['@4:Z scale'])
368             assert match != None, 'Bad regexp for %s, %s' \
369                 % ('@4:Z scale', z_piezo_info['@4:Z scale'])
370             if version == '0x05120005':
371                 assert match.group(1) == 'Deflection', (
372                     z_piezo_info['@4:Z scale'])
373             else:
374                 assert match.group(1) == 'ZSensorSens', (
375                     z_piezo_info['@4:Z scale'])
376         else:
377             assert version == '0x07200000', version
378             match = hz_re.match(z_piezo_info['@4:Z scale'])
379             assert match != None, 'Bad regexp for %s, %s' \
380                 % ('@4:Z scale', z_piezo_info['@4:Z scale'])
381             assert match.group(1) == 'Freq. 1', z_piezo_info['@4:Z scale']
382         ret['z piezo sensitivity (V/bit)'] = float(match.group(2))
383         ret['z piezo range (V)'] = float(match.group(3))
384         ret['z piezo offset (V)'] = 0.0
385         # offset assumed if raw data is signed...
386
387         match = volt_re.match(deflection_info['@4:Z scale'])
388         assert match != None, 'Bad regexp for %s, %s' \
389             % ('@4:Z scale', deflection_info['@4:Z scale'])
390         if version == '0x05120005':
391             assert match.group(1) == 'Deflection', z_piezo_info['@4:Z scale']
392         else:
393             assert version in [
394                 '0x06120002', '0x06130001', '0x07200000'], version
395             assert match.group(1) == 'DeflSens', z_piezo_info['@4:Z scale']
396         ret['deflection sensitivity (V/bit)'] = float(match.group(2))
397         ret['deflection range (V)'] = float(match.group(3))
398         ret['deflection offset (V)'] = 0.0
399         # offset assumed if raw data is signed...
400
401         nm_sens_re = re.compile('V ([.0-9]*) nm/V')
402         if version == '0x05120005':
403             match = nm_sens_re.match(info['Scanner list']['@Sens. Zscan'])
404             assert match != None, 'Bad regexp for %s/%s, %s' \
405                 % ('Scanner list', '@Sens. Zscan',
406                    info['Scanner list']['@Sens. Zscan'])
407         elif version in ['0x06120002', '0x06130001']:        
408             match = nm_sens_re.match(info['Ciao scan list']['@Sens. ZSensorSens'])
409             assert match != None, 'Bad regexp for %s/%s, %s' \
410                 % ('Ciao scan list', '@Sens. ZSensorSens',
411                    info['Ciao scan list']['@Sens. ZSensorSens'])
412         else:
413             assert version == '0x07200000', version
414             match = nm_sens_re.match(info['Ciao scan list']['@Sens. ZsensSens'])
415             assert match != None, 'Bad regexp for %s/%s, %s' \
416                 % ('Ciao scan list', '@Sens. ZsensSens',
417                    info['Ciao scan list']['@Sens. ZsensSens'])
418         ret['z piezo sensitivity (m/V)'] = float(match.group(1))*1e-9
419
420         if version == '0x05120005':
421             match = nm_sens_re.match(info['Ciao scan list']['@Sens. Deflection'])
422             assert match != None, 'Bad regexp for %s/%s, %s' \
423                 % ('Scanner list', '@Sens. Zscan',
424                    info['Ciao scan list']['@Sens. Deflection'])
425         else:
426             assert version in [
427                 '0x06120002', '0x06130001', '0x07200000'], version
428             match = nm_sens_re.match(info['Ciao scan list']['@Sens. DeflSens'])
429             assert match != None, 'Bad regexp for %s/%s, %s' \
430                 % ('Ciao scan list', '@Sens. DeflSens', info['Ciao scan list']['@Sens. DeflSens'])
431         ret['deflection sensitivity (m/V)'] = float(match.group(1))*1e-9
432
433         match = volt_re.match(info['Ciao force list']['@Z scan start'])
434         assert match != None, 'Bad regexp for %s/%s, %s' \
435             % ('Ciao force list', '@Z scan start', info['Ciao force list']['@Z scan start'])
436         ret['z piezo scan (V/bit)'] = float(match.group(2))
437         ret['z piezo scan start (V)'] = float(match.group(3))
438
439         match = volt_re.match(info['Ciao force list']['@Z scan size'])
440         assert match != None, 'Bad regexp for %s/%s, %s' \
441             % ('Ciao force list', '@Z scan size', info['Ciao force list']['@Z scan size'])
442         ret['z piezo scan size (V)'] = float(match.group(3))
443
444         const_re = re.compile('C \[([:\w\s]*)\] ([.0-9]*)')
445         match = const_re.match(z_piezo_info['@Z magnify'])
446         assert match != None, 'Bad regexp for %s, %s' \
447             % ('@Z magnify', info['@Z magnify'])
448         assert match.group(1) == '4:Z scale', match.group(1)
449         ret['z piezo gain'] = float(match.group(2))
450
451         if version in ['0x06120002', '0x06130001']:        
452             match = volt_re.match(z_piezo_info['@4:Z scale'])
453             assert match != None, 'Bad regexp for %s, %s' \
454                 % ('@4:Z scale', info['@4:Z scale'])
455             assert match.group(1) == 'ZSensorSens', match.group(1)
456             ret['z piezo sensitivity (V/bit)'] = float(match.group(2))
457             ret['z piezo range (V)'] = float(match.group(3))
458         else:
459             assert version in ['0x05120005', '0x07200000'], version
460             pass
461
462         if version == '0x05120005':
463             # already accounded for when generating 'Z sensor' trace
464             pass
465         else:
466             assert version in [
467                 '0x06120002', '0x06130001', '0x07200000'], version
468             match = volt_re.match(z_piezo_info['@4:Ramp size'])
469             assert match != None, 'Bad regexp for %s, %s' \
470                 % ('@4:Ramp size', info['@4:Ramp size'])
471             assert match.group(1) == 'Zsens', match.group(1)
472             ret['z piezo ramp size (V/bit)'] = float(match.group(2))
473             ret['z piezo ramp size (V)'] = float(match.group(3))
474
475             match = volt_re.match(z_piezo_info['@4:Ramp offset'])
476             assert match != None, 'Bad regexp for %s, %s' \
477                 % ('@4:Ramp offset', info['@4:Ramp offset'])
478             assert match.group(1) == 'Zsens', match.group(1)
479             ret['z piezo ramp offset (V/bit)'] = float(match.group(2))
480             ret['z piezo ramp offset (V)'] = float(match.group(3))
481
482         # Unaccounted for:
483         #   Samps*
484
485         return ret
486
487     def _scale_block(self, data):
488         """Convert the block from its native format to a `numpy.float`
489         array in SI units.
490         """
491         ret = curve.Data(
492             shape=data.shape,
493             dtype=numpy.float,
494             )
495         info = data.info
496         ret.info = info
497         ret.info['raw data'] = data # store the raw data
498         data.info = {} # break circular reference info <-> data
499
500         z_col = info['columns'].index('z piezo (m)')
501         d_col = info['columns'].index('deflection (m)')
502
503         # Leading '-' because Bruker's z increases towards the surface
504         # (positive indentation), but it makes more sense to me to
505         # have it increase away from the surface (positive
506         # separation).
507         ret[:,z_col] = -(
508             (data[:,z_col].astype(ret.dtype)
509              * info['z piezo sensitivity (V/bit)']
510              - info['z piezo offset (V)'])
511             * info['z piezo gain']
512             * info['z piezo sensitivity (m/V)']
513             )
514
515         # Leading '-' because deflection voltage increases as the tip
516         # moves away from the surface, but it makes more sense to me
517         # to have it increase as it moves toward the surface (positive
518         # tension on the protein chain).
519         ret[:,d_col] = -(
520             (data[:,d_col]
521              * info['deflection sensitivity (V/bit)']
522              - info['deflection offset (V)'])
523             * info['deflection sensitivity (m/V)']
524             )
525
526         return ret