Split out curve renaming to PicoForceDriver._replace_name
[hooke.git] / hooke / driver / picoforce.py
1 # Copyright (C) 2006-2010 Alberto Gomez-Casado
2 #                         Massimo Sandal <devicerandom@gmail.com>
3 #                         W. Trevor King <wking@drexel.edu>
4 #
5 # This file is part of Hooke.
6 #
7 # Hooke is free software: you can redistribute it and/or
8 # modify it under the terms of the GNU Lesser General Public
9 # License as published by the Free Software Foundation, either
10 # version 3 of the License, or (at your option) any later version.
11 #
12 # Hooke is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU Lesser General Public License for more details.
16 #
17 # You should have received a copy of the GNU Lesser General Public
18 # License along with Hooke.  If not, see
19 # <http://www.gnu.org/licenses/>.
20
21 """Driver for Veeco PicoForce force spectroscopy files.
22 """
23
24 import pprint
25 import re
26 import time
27
28 import numpy
29
30 from .. import curve as curve # this module defines data containers.
31 from .. import experiment as experiment # this module defines expt. types
32 from ..config import Setting # configurable setting class
33 from . import Driver as Driver # this is the Driver base class
34
35
36 __version__='0.0.0.20100516'
37
38 class PicoForceDriver (Driver):
39     """Handle Veeco Picoforce force spectroscopy files.
40     """
41     def __init__(self):
42         super(PicoForceDriver, self).__init__(name='picoforce')
43
44     def is_me(self, path):
45         f = file(path, 'r')
46         header = f.read(30)
47         f.close()
48
49         return header[2:17] == 'Force file list'
50
51     def read(self, path):
52         info = self._read_header_path(path)
53         self._check_version(info)
54         data = self._read_data_path(path, info)
55         info['filetype'] = self.name
56         info['experiment'] = experiment.VelocityClamp
57         return (data, info)
58
59     def _read_header_path(self, path):
60         """Read curve information from the PicoForce file at `path`.
61
62         See :meth:`._read_header_file`.
63         """
64         return self._read_header_file(file(path, 'rb'))
65
66     def _read_header_file(self, file):
67         r"""Read curve information from a PicoForce file.
68
69         Return a dict of dicts representing the information.  If a
70         field is repeated multiple times, it's value is replaced by a
71         list of the values for each occurence.
72
73         Examples
74         --------
75
76         >>> import pprint
77         >>> import StringIO
78         >>> f = StringIO.StringIO('\r\n'.join([
79         ...             '\*Force file list',
80         ...             '\Version: 0x06120002',
81         ...             '\Date: 04:42:34 PM Tue Sep 11 2007',
82         ...             '\Start context: FOL2',
83         ...             '\Data length: 40960',
84         ...             '\Text: ',
85         ...             '\*Equipment list',
86         ...             '\Description: Extended PicoForce',
87         ...             '\Controller: IIIA',
88         ...             '\*Ciao force image list',
89         ...             '\Data offset: 40960',
90         ...             '\Data length: 8192',
91         ...             '\*Ciao force image list',
92         ...             '\Data offset: 49152',
93         ...             '\Data length: 8192',
94         ...             '\*Ciao force image list',
95         ...             '\Data offset: 57344',
96         ...             '\Data length: 8192',
97         ...             ]))
98         >>> p = PicoForceDriver()
99         >>> d = p._read_header_file(f)
100         >>> pprint.pprint(d, width=60)
101         {'Ciao force image list': [{'Data length': '8192',
102                                     'Data offset': '40960'},
103                                    {'Data length': '8192',
104                                     'Data offset': '49152'},
105                                    {'Data length': '8192',
106                                     'Data offset': '57344'}],
107          'Equipment list': {'Controller': 'IIIA',
108                             'Description': 'Extended PicoForce'},
109          'Force file list': {'Data length': '40960',
110                              'Date': '04:42:34 PM Tue Sep 11 2007',
111                              'Start context': 'FOL2',
112                              'Text:': None,
113                              'Version': '0x06120002'}}
114         """
115         info = {}
116         header_field = None
117         for line in file:
118             line = line.strip()
119             if line.startswith('\*File list end'):
120                 break
121             if line.startswith(r'\*'):
122                 header_field = line[len(r'\*'):]
123                 if header_field in info:
124                     if isinstance(info[header_field], list):
125                         info[header_field].append({}) # >=3rd appearance
126                     else: # Second appearance
127                         info[header_field] = [info[header_field], {}]
128                 else: # First appearance
129                     info[header_field] = {}
130             else:
131                 assert line.startswith('\\'), line
132                 fields = line[len('\\'):].split(': ', 1)
133                 key = fields[0]
134                 if len(fields) == 1: # fields = [key]
135                     value = None
136                 else: # fields = [key, value]
137                     value = fields[1]
138                 if isinstance(info[header_field], list): # >=2nd header_field
139                     target_dict = info[header_field][-1]
140                 else: # first appearance of header_field
141                     target_dict = info[header_field]
142                 if key in target_dict and target_dict[key] != value:
143                     raise NotImplementedError(
144                         'Overwriting %s: %s -> %s'
145                         % (key, target_dict[key], value))
146                 target_dict[key] = value
147         return (info)
148
149     def _check_version(self, info):
150         """Ensure the input file is a version we understand.
151
152         Otherwise, raise `ValueError`.
153         """
154         version = info['Force file list'].get('Version', None)
155         if version not in ['0x06120002', '0x06130001', '0x07200000']:
156             raise NotImplementedError(
157                 '%s file version %s not supported (yet!)\n%s'
158                 % (self.name, version,
159                    pprint.pformat(info['Force file list'])))
160
161     def _read_data_path(self, path, info):
162         """Read curve data from the PicoForce file at `path`.
163
164         See :meth:`._read_data_file`.
165         """
166         f = file(path, 'rb')
167         data = self._read_data_file(f, info)
168         f.close()
169         return data
170
171     def _read_data_file(self, file, info):
172         file.seek(0)
173         traces = self._extract_traces(buffer(file.read()), info)
174         self._validate_traces(
175             traces['Z sensor'], traces['Deflection'])
176         L = len(traces['Deflection'])
177         approach = self._extract_block(
178             info, traces['Z sensor'], traces['Deflection'], 0, L/2, 'approach')
179         retract = self._extract_block(
180             info, traces['Z sensor'], traces['Deflection'], L/2, L, 'retract')
181         data = [approach, retract]
182         return data
183
184     def _extract_traces(self, buffer, info):
185         """Extract each of the three vector blocks in a PicoForce file.
186
187         The blocks are (in variable order):
188
189         * Z piezo sensor input
190         * Deflection input
191         * Deflection again?
192
193         And their headers are marked with 'Ciao force image list'.
194         """
195         traces = {}
196         version = info['Force file list']['Version']
197         type_re = re.compile('S \[(\w*)\] "([\w\s.]*)"')
198         for image in info['Ciao force image list']:
199             offset = int(image['Data offset'])
200             length = int(image['Data length'])
201             sample_size = int(image['Bytes/pixel'])
202             if sample_size != 2:
203                 raise NotImplementedError('Size: %s' % sample_size)
204             rows = length / sample_size
205             d = curve.Data(
206                 shape=(rows),
207                 dtype=numpy.int16,
208                 buffer=buffer,
209                 offset=offset,
210                 info=image,
211                 )
212             if version in ['0x06120002', '0x06130001']:
213                 match = type_re.match(image['@4:Image Data'])
214                 assert match != None, 'Bad regexp for %s, %s' \
215                     % ('@4:Image Data', image['@4:Image Data'])
216                 assert match.group(1).lower() == match.group(2).replace(' ','').lower(), \
217                     'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
218                 tname = match.group(2)
219             else:
220                 assert version == '0x07200000', version
221                 match = type_re.match(image['@4:Image Data'])
222                 assert match != None, 'Bad regexp for %s, %s' \
223                     % ('@4:Image Data', image['@4:Image Data'])
224                 if match.group(1) == 'PulseFreq1':
225                     assert match.group(2) == 'Freq. 1', match.group(2)
226                 else:
227                     assert match.group(1).lower() == match.group(2).replace(' ','').lower(), \
228                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
229                 tname = match.group(2)
230                 if tname == 'Freq. 1':  # Normalize trace names between versions
231                     tname = 'Z sensor'
232                 elif tname == 'Deflection Error':
233                     tname = 'Deflection'
234             if tname in traces:
235                 #d.tofile('%s-2.dat' % tname, sep='\n')
236                 tname = self._replace_name(tname, d, traces, info)
237                 if tname == None:
238                     continue  # Don't replace anything
239             else:
240                 #d.tofile('%s.dat' % tname, sep='\n')
241                 pass
242             traces[tname] = d
243         return traces
244
245     def _validate_traces(self, z_piezo, deflection):
246         if len(z_piezo) != len(deflection):
247             raise ValueError('Trace length missmatch: %d != %d'
248                              % (len(z_piezo), len(deflection)))
249
250     def _extract_block(self, info, z_piezo, deflection, start, stop, name):
251         block = curve.Data(
252             shape=(stop-start, 2),
253             dtype=numpy.float)
254         block[:,0] = z_piezo[start:stop]
255         block[:,1] = deflection[start:stop]
256         block.info = self._translate_block_info(
257             info, z_piezo.info, deflection.info, name)
258         block.info['columns'] = ['z piezo (m)', 'deflection (m)']
259         block = self._scale_block(block)
260         return block
261
262     def _replace_name(self, trace_name, trace, traces, info):
263         """Determine if a duplicate trace name should replace an earlier trace.
264
265         Return the target trace name if it should be replaced by the
266         new trace, or `None` if the new trace should be dropped.
267         """
268         #msg = []
269         #target = traces[trace_name]
270         #
271         ## Compare the info dictionaries for each trace
272         #ik = set(trace.info.keys())
273         #ok = set(traces[trace_name].info.keys())
274         #if ik != ok:  # Ensure we have the same set of keys for both traces
275         #    msg.append('extra keys: %s, missing keys %s' % (ik-ok, ok-ik))
276         #else:
277         #    # List keys we *require* to change between traces
278         #    variable_keys = ['Data offset', 'X data type']  # TODO: What is X data type?
279         #    for key in trace.info.keys():
280         #        if key in variable_keys:
281         #            if target.info[key] == trace.info[key]:
282         #                msg.append('constant %s (%s == %s)'
283         #                           % (key, target.info[key], trace.info[key]))
284         #        else:
285         #            if target.info[key] != trace.info[key]:
286         #                msg.append('variable %s (%s != %s)'
287         #                           % (key, target.info[key], trace.info[key]))
288         # Compare the data
289         #if not (traces[trace_name] == trace).all():
290         #    msg.append('data difference')
291         #if len(msg) > 0:
292         #    raise NotImplementedError(
293         #        'Missmatched duplicate traces for %s: %s'
294         #        % (trace_name, ', '.join(msg)))
295         import sys
296         for name,t in traces.items():
297             if (t == trace).all():
298                 print >> sys.stderr, 'replace %s with %s-2' % (name, trace_name)
299                 return name  # Replace this identical dataset.
300         print >> sys.stderr, 'store %s-2 as Other' % (trace_name)
301         return 'Other'
302         # return None
303
304     def _translate_block_info(self, info, z_piezo_info, deflection_info, name):
305         version = info['Force file list']['Version']
306         ret = {
307             'name': name,
308             'raw info': info,
309             'raw z piezo info': z_piezo_info,
310             'raw deflection info': deflection_info,
311             'spring constant (N/m)': float(z_piezo_info['Spring Constant']),
312             }
313
314         t = info['Force file list']['Date'] # 04:42:34 PM Tue Sep 11 2007
315         ret['time'] = time.strptime(t, '%I:%M:%S %p %a %b %d %Y')
316
317         volt_re = re.compile(
318             'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) V/LSB\) (-?[.0-9]*) V')
319         hz_re = re.compile(
320             'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) kHz/LSB\) (-?[.0-9]*) kHz')
321         if version in ['0x06120002', '0x06130001']:
322             match = volt_re.match(z_piezo_info['@4:Z scale'])
323             assert match != None, 'Bad regexp for %s, %s' \
324                 % ('@4:Z scale', z_piezo_info['@4:Z scale'])
325             assert match.group(1) == 'ZSensorSens', z_piezo_info['@4:Z scale']
326         else:
327             assert version == '0x07200000', version
328             match = hz_re.match(z_piezo_info['@4:Z scale'])
329             assert match != None, 'Bad regexp for %s, %s' \
330                 % ('@4:Z scale', z_piezo_info['@4:Z scale'])
331             assert match.group(1) == 'Freq. 1', z_piezo_info['@4:Z scale']
332         ret['z piezo sensitivity (V/bit)'] = float(match.group(2))
333         ret['z piezo range (V)'] = float(match.group(3))
334         ret['z piezo offset (V)'] = 0.0
335         # offset assumed if raw data is signed...
336
337         match = volt_re.match(deflection_info['@4:Z scale'])
338         assert match != None, 'Bad regexp for %s, %s' \
339             % ('@4:Z scale', deflection_info['@4:Z scale'])
340         assert match.group(1) == 'DeflSens', z_piezo_info['@4:Z scale']
341         ret['deflection sensitivity (V/bit)'] = float(match.group(2))
342         ret['deflection range (V)'] = float(match.group(3))
343         ret['deflection offset (V)'] = 0.0
344         # offset assumed if raw data is signed...
345
346         nm_sens_re = re.compile('V ([.0-9]*) nm/V')
347         match = nm_sens_re.match(info['Scanner list']['@Sens. Zsens'])
348         assert match != None, 'Bad regexp for %s/%s, %s' \
349             % ('Scanner list', '@Sens. Zsens', info['Scanner list']['@4:Z scale'])
350         ret['z piezo sensitivity (m/V)'] = float(match.group(1))*1e-9
351
352         match = nm_sens_re.match(info['Ciao scan list']['@Sens. DeflSens'])
353         assert match != None, 'Bad regexp for %s/%s, %s' \
354             % ('Ciao scan list', '@Sens. DeflSens', info['Ciao scan list']['@Sens. DeflSens'])
355         ret['deflection sensitivity (m/V)'] = float(match.group(1))*1e-9
356
357         match = volt_re.match(info['Ciao force list']['@Z scan start'])
358         assert match != None, 'Bad regexp for %s/%s, %s' \
359             % ('Ciao force list', '@Z scan start', info['Ciao force list']['@Z scan start'])
360         ret['z piezo scan (V/bit)'] = float(match.group(2))
361         ret['z piezo scan start (V)'] = float(match.group(3))
362
363         match = volt_re.match(info['Ciao force list']['@Z scan size'])
364         assert match != None, 'Bad regexp for %s/%s, %s' \
365             % ('Ciao force list', '@Z scan size', info['Ciao force list']['@Z scan size'])
366         ret['z piezo scan size (V)'] = float(match.group(3))
367
368         const_re = re.compile('C \[([:\w\s]*)\] ([.0-9]*)')
369         match = const_re.match(z_piezo_info['@Z magnify'])
370         assert match != None, 'Bad regexp for %s, %s' \
371             % ('@Z magnify', info['@Z magnify'])
372         assert match.group(1) == '4:Z scale', match.group(1)
373         ret['z piezo gain'] = float(match.group(2))
374
375         if version in ['0x06120002', '0x06130001']:        
376             match = volt_re.match(z_piezo_info['@4:Z scale'])
377             assert match != None, 'Bad regexp for %s, %s' \
378                 % ('@4:Z scale', info['@4:Z scale'])
379             assert match.group(1) == 'ZSensorSens', match.group(1)
380             ret['z piezo sensitivity (V/bit)'] = float(match.group(2))
381             ret['z piezo range (V)'] = float(match.group(3))
382         else:
383             assert version == '0x07200000', version
384             pass
385
386         match = volt_re.match(z_piezo_info['@4:Ramp size'])
387         assert match != None, 'Bad regexp for %s, %s' \
388             % ('@4:Ramp size', info['@4:Ramp size'])
389         assert match.group(1) == 'Zsens', match.group(1)
390         ret['z piezo ramp size (V/bit)'] = float(match.group(2))
391         ret['z piezo ramp size (V)'] = float(match.group(3))
392
393         match = volt_re.match(z_piezo_info['@4:Ramp offset'])
394         assert match != None, 'Bad regexp for %s, %s' \
395             % ('@4:Ramp offset', info['@4:Ramp offset'])
396         assert match.group(1) == 'Zsens', match.group(1)
397         ret['z piezo ramp offset (V/bit)'] = float(match.group(2))
398         ret['z piezo ramp offset (V)'] = float(match.group(3))
399
400         # Unaccounted for:
401         #   Samps*
402
403         return ret
404
405     def _scale_block(self, data):
406         """Convert the block from its native format to a `numpy.float`
407         array in SI units.
408         """
409         ret = curve.Data(
410             shape=data.shape,
411             dtype=numpy.float,
412             )
413         info = data.info
414         ret.info = info
415         ret.info['raw data'] = data # store the raw data
416         data.info = {} # break circular reference info <-> data
417
418         z_col = info['columns'].index('z piezo (m)')
419         d_col = info['columns'].index('deflection (m)')
420
421         # Leading '-' because Veeco's z increases towards the surface
422         # (positive indentation), but it makes more sense to me to
423         # have it increase away from the surface (positive
424         # separation).
425         ret[:,z_col] = -(
426             (data[:,z_col].astype(ret.dtype)
427              * info['z piezo sensitivity (V/bit)']
428              - info['z piezo offset (V)'])
429             * info['z piezo gain']
430             * info['z piezo sensitivity (m/V)']
431             )
432
433         # Leading '-' because deflection voltage increases as the tip
434         # moves away from the surface, but it makes more sense to me
435         # to have it increase as it moves toward the surface (positive
436         # tension on the protein chain).
437         ret[:,d_col] = -(
438             (data[:,d_col]
439              * info['deflection sensitivity (V/bit)']
440              - info['deflection offset (V)'])
441             * info['deflection sensitivity (m/V)']
442             )
443
444         return ret