Don't specify curve or data block types. See doc/standards.txt.
[hooke.git] / hooke / driver / picoforce.py
1 # Copyright (C) 2006-2010 Alberto Gomez-Casado
2 #                         Massimo Sandal <devicerandom@gmail.com>
3 #                         W. Trevor King <wking@drexel.edu>
4 #
5 # This file is part of Hooke.
6 #
7 # Hooke is free software: you can redistribute it and/or modify it
8 # under the terms of the GNU Lesser General Public License as
9 # published by the Free Software Foundation, either version 3 of the
10 # License, or (at your option) any later version.
11 #
12 # Hooke is distributed in the hope that it will be useful, but WITHOUT
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
15 # Public License for more details.
16 #
17 # You should have received a copy of the GNU Lesser General Public
18 # License along with Hooke.  If not, see
19 # <http://www.gnu.org/licenses/>.
20
21 """Driver for Veeco PicoForce force spectroscopy files.
22 """
23
24 import os.path
25 import pprint
26 import re
27 import time
28
29 import numpy
30
31 from .. import curve as curve # this module defines data containers.
32 from . import Driver as Driver # this is the Driver base class
33
34
35 __version__='0.0.0.20100516'
36
37 class PicoForceDriver (Driver):
38     """Handle Veeco Picoforce force spectroscopy files.
39     """
40     def __init__(self):
41         super(PicoForceDriver, self).__init__(name='picoforce')
42
43     def is_me(self, path):
44         if os.path.isdir(path):
45             return False
46         f = file(path, 'r')
47         header = f.read(30)
48         f.close()
49
50         return header[2:17] == 'Force file list'
51
52     def read(self, path, info=None):
53         info = self._read_header_path(path)
54         self._check_version(info)
55         data = self._read_data_path(path, info)
56         return (data, info)
57
58     def _read_header_path(self, path):
59         """Read curve information from the PicoForce file at `path`.
60
61         See :meth:`._read_header_file`.
62         """
63         return self._read_header_file(file(path, 'rb'))
64
65     def _read_header_file(self, file):
66         r"""Read curve information from a PicoForce file.
67
68         Return a dict of dicts representing the information.  If a
69         field is repeated multiple times, it's value is replaced by a
70         list of the values for each occurence.
71
72         Examples
73         --------
74
75         >>> import pprint
76         >>> import StringIO
77         >>> f = StringIO.StringIO('\r\n'.join([
78         ...             '\*Force file list',
79         ...             '\Version: 0x06120002',
80         ...             '\Date: 04:42:34 PM Tue Sep 11 2007',
81         ...             '\Start context: FOL2',
82         ...             '\Data length: 40960',
83         ...             '\Text: ',
84         ...             '\*Equipment list',
85         ...             '\Description: Extended PicoForce',
86         ...             '\Controller: IIIA',
87         ...             '\*Ciao force image list',
88         ...             '\Data offset: 40960',
89         ...             '\Data length: 8192',
90         ...             '\*Ciao force image list',
91         ...             '\Data offset: 49152',
92         ...             '\Data length: 8192',
93         ...             '\*Ciao force image list',
94         ...             '\Data offset: 57344',
95         ...             '\Data length: 8192',
96         ...             ]))
97         >>> p = PicoForceDriver()
98         >>> d = p._read_header_file(f)
99         >>> pprint.pprint(d, width=60)
100         {'Ciao force image list': [{'Data length': '8192',
101                                     'Data offset': '40960'},
102                                    {'Data length': '8192',
103                                     'Data offset': '49152'},
104                                    {'Data length': '8192',
105                                     'Data offset': '57344'}],
106          'Equipment list': {'Controller': 'IIIA',
107                             'Description': 'Extended PicoForce'},
108          'Force file list': {'Data length': '40960',
109                              'Date': '04:42:34 PM Tue Sep 11 2007',
110                              'Start context': 'FOL2',
111                              'Text:': None,
112                              'Version': '0x06120002'}}
113         """
114         info = {}
115         header_field = None
116         for line in file:
117             line = line.strip()
118             if line.startswith('\*File list end'):
119                 break
120             if line.startswith(r'\*'):
121                 header_field = line[len(r'\*'):]
122                 if header_field in info:
123                     if isinstance(info[header_field], list):
124                         info[header_field].append({}) # >=3rd appearance
125                     else: # Second appearance
126                         info[header_field] = [info[header_field], {}]
127                 else: # First appearance
128                     info[header_field] = {}
129             else:
130                 assert line.startswith('\\'), line
131                 fields = line[len('\\'):].split(': ', 1)
132                 key = fields[0]
133                 if len(fields) == 1: # fields = [key]
134                     value = None
135                 else: # fields = [key, value]
136                     value = fields[1]
137                 if isinstance(info[header_field], list): # >=2nd header_field
138                     target_dict = info[header_field][-1]
139                 else: # first appearance of header_field
140                     target_dict = info[header_field]
141                 if key in target_dict and target_dict[key] != value:
142                     raise NotImplementedError(
143                         'Overwriting %s: %s -> %s'
144                         % (key, target_dict[key], value))
145                 target_dict[key] = value
146         return (info)
147
148     def _check_version(self, info):
149         """Ensure the input file is a version we understand.
150
151         Otherwise, raise `ValueError`.
152         """
153         version = info['Force file list'].get('Version', None)
154         if version not in ['0x06120002', '0x06130001', '0x07200000']:
155             raise NotImplementedError(
156                 '%s file version %s not supported (yet!)\n%s'
157                 % (self.name, version,
158                    pprint.pformat(info['Force file list'])))
159
160     def _read_data_path(self, path, info):
161         """Read curve data from the PicoForce file at `path`.
162
163         See :meth:`._read_data_file`.
164         """
165         f = file(path, 'rb')
166         data = self._read_data_file(f, info)
167         f.close()
168         return data
169
170     def _read_data_file(self, file, info):
171         file.seek(0)
172         traces = self._extract_traces(buffer(file.read()), info)
173         self._validate_traces(
174             traces['Z sensor'], traces['Deflection'])
175         L = len(traces['Deflection'])
176         approach = self._extract_block(
177             info, traces['Z sensor'], traces['Deflection'], 0, L/2, 'approach')
178         retract = self._extract_block(
179             info, traces['Z sensor'], traces['Deflection'], L/2, L, 'retract')
180         data = [approach, retract]
181         return data
182
183     def _extract_traces(self, buffer, info):
184         """Extract each of the three vector blocks in a PicoForce file.
185
186         The blocks are (in variable order):
187
188         * Z piezo sensor input
189         * Deflection input
190         * Deflection again?
191
192         And their headers are marked with 'Ciao force image list'.
193         """
194         traces = {}
195         version = info['Force file list']['Version']
196         type_re = re.compile('S \[(\w*)\] "([\w\s.]*)"')
197         for image in info['Ciao force image list']:
198             offset = int(image['Data offset'])
199             length = int(image['Data length'])
200             sample_size = int(image['Bytes/pixel'])
201             if sample_size != 2:
202                 raise NotImplementedError('Size: %s' % sample_size)
203             rows = length / sample_size
204             d = curve.Data(
205                 shape=(rows),
206                 dtype=numpy.int16,
207                 buffer=buffer,
208                 offset=offset,
209                 info=image,
210                 )
211             if version in ['0x06120002', '0x06130001']:
212                 match = type_re.match(image['@4:Image Data'])
213                 assert match != None, 'Bad regexp for %s, %s' \
214                     % ('@4:Image Data', image['@4:Image Data'])
215                 if version == '0x06130001' and match.group(1) == 'ZLowVoltage':
216                     assert match.group(2) == 'Low Voltage Z', \
217                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
218                 else:
219                     assert match.group(1).lower() == match.group(2).replace(' ','').lower(), \
220                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
221                 tname = match.group(2)
222             else:
223                 assert version == '0x07200000', version
224                 match = type_re.match(image['@4:Image Data'])
225                 assert match != None, 'Bad regexp for %s, %s' \
226                     % ('@4:Image Data', image['@4:Image Data'])
227                 if match.group(1) == 'PulseFreq1':
228                     assert match.group(2) == 'Freq. 1', match.group(2)
229                 else:
230                     assert match.group(1).lower() == match.group(2).replace(' ','').lower(), \
231                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
232                 tname = match.group(2)
233                 if tname == 'Freq. 1':  # Normalize trace names between versions
234                     tname = 'Z sensor'
235                 elif tname == 'Deflection Error':
236                     tname = 'Deflection'
237             if tname in traces:
238                 #d.tofile('%s-2.dat' % tname, sep='\n')
239                 tname = self._replace_name(tname, d, traces, info)
240                 if tname == None:
241                     continue  # Don't replace anything
242             else:
243                 #d.tofile('%s.dat' % tname, sep='\n')
244                 pass
245             traces[tname] = d
246         return traces
247
248     def _validate_traces(self, z_piezo, deflection):
249         if len(z_piezo) != len(deflection):
250             raise ValueError('Trace length missmatch: %d != %d'
251                              % (len(z_piezo), len(deflection)))
252
253     def _extract_block(self, info, z_piezo, deflection, start, stop, name):
254         block = curve.Data(
255             shape=(stop-start, 2),
256             dtype=numpy.float)
257         block[:,0] = z_piezo[start:stop]
258         block[:,1] = deflection[start:stop]
259         block.info = self._translate_block_info(
260             info, z_piezo.info, deflection.info, name)
261         block.info['columns'] = ['z piezo (m)', 'deflection (m)']
262         block = self._scale_block(block)
263         return block
264
265     def _replace_name(self, trace_name, trace, traces, info):
266         """Determine if a duplicate trace name should replace an earlier trace.
267
268         Return the target trace name if it should be replaced by the
269         new trace, or `None` if the new trace should be dropped.
270         """
271         #msg = []
272         #target = traces[trace_name]
273         #
274         ## Compare the info dictionaries for each trace
275         #ik = set(trace.info.keys())
276         #ok = set(traces[trace_name].info.keys())
277         #if ik != ok:  # Ensure we have the same set of keys for both traces
278         #    msg.append('extra keys: %s, missing keys %s' % (ik-ok, ok-ik))
279         #else:
280         #    # List keys we *require* to change between traces
281         #    variable_keys = ['Data offset', 'X data type']  # TODO: What is X data type?
282         #    for key in trace.info.keys():
283         #        if key in variable_keys:
284         #            if target.info[key] == trace.info[key]:
285         #                msg.append('constant %s (%s == %s)'
286         #                           % (key, target.info[key], trace.info[key]))
287         #        else:
288         #            if target.info[key] != trace.info[key]:
289         #                msg.append('variable %s (%s != %s)'
290         #                           % (key, target.info[key], trace.info[key]))
291         # Compare the data
292         #if not (traces[trace_name] == trace).all():
293         #    msg.append('data difference')
294         #if len(msg) > 0:
295         #    raise NotImplementedError(
296         #        'Missmatched duplicate traces for %s: %s'
297         #        % (trace_name, ', '.join(msg)))
298         import logging
299         log = logging.getLogger('hooke')
300         for name,t in traces.items():
301             if (t == trace).all():
302                 log.debug('replace %s with %s-2' % (name, trace_name))
303                 return name  # Replace this identical dataset.
304         log.debug('store %s-2 as Other' % (trace_name))
305         return 'Other'
306         # return None
307
308     def _translate_block_info(self, info, z_piezo_info, deflection_info, name):
309         version = info['Force file list']['Version']
310         ret = {
311             'name': name,
312             'raw info': info,
313             'raw z piezo info': z_piezo_info,
314             'raw deflection info': deflection_info,
315             'spring constant (N/m)': float(z_piezo_info['Spring Constant']),
316             }
317
318         t = info['Force file list']['Date'] # 04:42:34 PM Tue Sep 11 2007
319         ret['time'] = time.strptime(t, '%I:%M:%S %p %a %b %d %Y')
320
321         volt_re = re.compile(
322             'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) V/LSB\) (-?[.0-9]*) V')
323         hz_re = re.compile(
324             'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) kHz/LSB\) (-?[.0-9]*) kHz')
325         if version in ['0x06120002', '0x06130001']:
326             match = volt_re.match(z_piezo_info['@4:Z scale'])
327             assert match != None, 'Bad regexp for %s, %s' \
328                 % ('@4:Z scale', z_piezo_info['@4:Z scale'])
329             assert match.group(1) == 'ZSensorSens', z_piezo_info['@4:Z scale']
330         else:
331             assert version == '0x07200000', version
332             match = hz_re.match(z_piezo_info['@4:Z scale'])
333             assert match != None, 'Bad regexp for %s, %s' \
334                 % ('@4:Z scale', z_piezo_info['@4:Z scale'])
335             assert match.group(1) == 'Freq. 1', z_piezo_info['@4:Z scale']
336         ret['z piezo sensitivity (V/bit)'] = float(match.group(2))
337         ret['z piezo range (V)'] = float(match.group(3))
338         ret['z piezo offset (V)'] = 0.0
339         # offset assumed if raw data is signed...
340
341         match = volt_re.match(deflection_info['@4:Z scale'])
342         assert match != None, 'Bad regexp for %s, %s' \
343             % ('@4:Z scale', deflection_info['@4:Z scale'])
344         assert match.group(1) == 'DeflSens', z_piezo_info['@4:Z scale']
345         ret['deflection sensitivity (V/bit)'] = float(match.group(2))
346         ret['deflection range (V)'] = float(match.group(3))
347         ret['deflection offset (V)'] = 0.0
348         # offset assumed if raw data is signed...
349
350         nm_sens_re = re.compile('V ([.0-9]*) nm/V')
351         if version in ['0x06120002', '0x06130001']:        
352             match = nm_sens_re.match(info['Ciao scan list']['@Sens. ZSensorSens'])
353             assert match != None, 'Bad regexp for %s/%s, %s' \
354                 % ('Ciao scan list', '@Sens. ZSensorSens',
355                    info['Ciao scan list']['@Sens. ZSensorSens'])
356         else:
357             assert version == '0x07200000', version
358             match = nm_sens_re.match(info['Ciao scan list']['@Sens. ZsensSens'])
359             assert match != None, 'Bad regexp for %s/%s, %s' \
360                 % ('Ciao scan list', '@Sens. ZsensSens',
361                    info['Ciao scan list']['@Sens. ZsensSens'])
362         ret['z piezo sensitivity (m/V)'] = float(match.group(1))*1e-9
363
364         match = nm_sens_re.match(info['Ciao scan list']['@Sens. DeflSens'])
365         assert match != None, 'Bad regexp for %s/%s, %s' \
366             % ('Ciao scan list', '@Sens. DeflSens', info['Ciao scan list']['@Sens. DeflSens'])
367         ret['deflection sensitivity (m/V)'] = float(match.group(1))*1e-9
368
369         match = volt_re.match(info['Ciao force list']['@Z scan start'])
370         assert match != None, 'Bad regexp for %s/%s, %s' \
371             % ('Ciao force list', '@Z scan start', info['Ciao force list']['@Z scan start'])
372         ret['z piezo scan (V/bit)'] = float(match.group(2))
373         ret['z piezo scan start (V)'] = float(match.group(3))
374
375         match = volt_re.match(info['Ciao force list']['@Z scan size'])
376         assert match != None, 'Bad regexp for %s/%s, %s' \
377             % ('Ciao force list', '@Z scan size', info['Ciao force list']['@Z scan size'])
378         ret['z piezo scan size (V)'] = float(match.group(3))
379
380         const_re = re.compile('C \[([:\w\s]*)\] ([.0-9]*)')
381         match = const_re.match(z_piezo_info['@Z magnify'])
382         assert match != None, 'Bad regexp for %s, %s' \
383             % ('@Z magnify', info['@Z magnify'])
384         assert match.group(1) == '4:Z scale', match.group(1)
385         ret['z piezo gain'] = float(match.group(2))
386
387         if version in ['0x06120002', '0x06130001']:        
388             match = volt_re.match(z_piezo_info['@4:Z scale'])
389             assert match != None, 'Bad regexp for %s, %s' \
390                 % ('@4:Z scale', info['@4:Z scale'])
391             assert match.group(1) == 'ZSensorSens', match.group(1)
392             ret['z piezo sensitivity (V/bit)'] = float(match.group(2))
393             ret['z piezo range (V)'] = float(match.group(3))
394         else:
395             assert version == '0x07200000', version
396             pass
397
398         match = volt_re.match(z_piezo_info['@4:Ramp size'])
399         assert match != None, 'Bad regexp for %s, %s' \
400             % ('@4:Ramp size', info['@4:Ramp size'])
401         assert match.group(1) == 'Zsens', match.group(1)
402         ret['z piezo ramp size (V/bit)'] = float(match.group(2))
403         ret['z piezo ramp size (V)'] = float(match.group(3))
404
405         match = volt_re.match(z_piezo_info['@4:Ramp offset'])
406         assert match != None, 'Bad regexp for %s, %s' \
407             % ('@4:Ramp offset', info['@4:Ramp offset'])
408         assert match.group(1) == 'Zsens', match.group(1)
409         ret['z piezo ramp offset (V/bit)'] = float(match.group(2))
410         ret['z piezo ramp offset (V)'] = float(match.group(3))
411
412         # Unaccounted for:
413         #   Samps*
414
415         return ret
416
417     def _scale_block(self, data):
418         """Convert the block from its native format to a `numpy.float`
419         array in SI units.
420         """
421         ret = curve.Data(
422             shape=data.shape,
423             dtype=numpy.float,
424             )
425         info = data.info
426         ret.info = info
427         ret.info['raw data'] = data # store the raw data
428         data.info = {} # break circular reference info <-> data
429
430         z_col = info['columns'].index('z piezo (m)')
431         d_col = info['columns'].index('deflection (m)')
432
433         # Leading '-' because Veeco's z increases towards the surface
434         # (positive indentation), but it makes more sense to me to
435         # have it increase away from the surface (positive
436         # separation).
437         ret[:,z_col] = -(
438             (data[:,z_col].astype(ret.dtype)
439              * info['z piezo sensitivity (V/bit)']
440              - info['z piezo offset (V)'])
441             * info['z piezo gain']
442             * info['z piezo sensitivity (m/V)']
443             )
444
445         # Leading '-' because deflection voltage increases as the tip
446         # moves away from the surface, but it makes more sense to me
447         # to have it increase as it moves toward the surface (positive
448         # tension on the protein chain).
449         ret[:,d_col] = -(
450             (data[:,d_col]
451              * info['deflection sensitivity (V/bit)']
452              - info['deflection offset (V)'])
453             * info['deflection sensitivity (m/V)']
454             )
455
456         return ret