Use 'Ciao scan list'/'@Sens. ZSensorSens' to scale picoforce z piezo data.
[hooke.git] / hooke / driver / picoforce.py
1 # Copyright (C) 2006-2010 Alberto Gomez-Casado
2 #                         Massimo Sandal <devicerandom@gmail.com>
3 #                         W. Trevor King <wking@drexel.edu>
4 #
5 # This file is part of Hooke.
6 #
7 # Hooke is free software: you can redistribute it and/or modify it
8 # under the terms of the GNU Lesser General Public License as
9 # published by the Free Software Foundation, either version 3 of the
10 # License, or (at your option) any later version.
11 #
12 # Hooke is distributed in the hope that it will be useful, but WITHOUT
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
15 # Public License for more details.
16 #
17 # You should have received a copy of the GNU Lesser General Public
18 # License along with Hooke.  If not, see
19 # <http://www.gnu.org/licenses/>.
20
21 """Driver for Veeco PicoForce force spectroscopy files.
22 """
23
24 import pprint
25 import re
26 import time
27
28 import numpy
29
30 from .. import curve as curve # this module defines data containers.
31 from .. import experiment as experiment # this module defines expt. types
32 from . import Driver as Driver # this is the Driver base class
33
34
35 __version__='0.0.0.20100516'
36
37 class PicoForceDriver (Driver):
38     """Handle Veeco Picoforce force spectroscopy files.
39     """
40     def __init__(self):
41         super(PicoForceDriver, self).__init__(name='picoforce')
42
43     def is_me(self, path):
44         f = file(path, 'r')
45         header = f.read(30)
46         f.close()
47
48         return header[2:17] == 'Force file list'
49
50     def read(self, path, info=None):
51         info = self._read_header_path(path)
52         self._check_version(info)
53         data = self._read_data_path(path, info)
54         info['filetype'] = self.name
55         info['experiment'] = experiment.VelocityClamp
56         return (data, info)
57
58     def _read_header_path(self, path):
59         """Read curve information from the PicoForce file at `path`.
60
61         See :meth:`._read_header_file`.
62         """
63         return self._read_header_file(file(path, 'rb'))
64
65     def _read_header_file(self, file):
66         r"""Read curve information from a PicoForce file.
67
68         Return a dict of dicts representing the information.  If a
69         field is repeated multiple times, it's value is replaced by a
70         list of the values for each occurence.
71
72         Examples
73         --------
74
75         >>> import pprint
76         >>> import StringIO
77         >>> f = StringIO.StringIO('\r\n'.join([
78         ...             '\*Force file list',
79         ...             '\Version: 0x06120002',
80         ...             '\Date: 04:42:34 PM Tue Sep 11 2007',
81         ...             '\Start context: FOL2',
82         ...             '\Data length: 40960',
83         ...             '\Text: ',
84         ...             '\*Equipment list',
85         ...             '\Description: Extended PicoForce',
86         ...             '\Controller: IIIA',
87         ...             '\*Ciao force image list',
88         ...             '\Data offset: 40960',
89         ...             '\Data length: 8192',
90         ...             '\*Ciao force image list',
91         ...             '\Data offset: 49152',
92         ...             '\Data length: 8192',
93         ...             '\*Ciao force image list',
94         ...             '\Data offset: 57344',
95         ...             '\Data length: 8192',
96         ...             ]))
97         >>> p = PicoForceDriver()
98         >>> d = p._read_header_file(f)
99         >>> pprint.pprint(d, width=60)
100         {'Ciao force image list': [{'Data length': '8192',
101                                     'Data offset': '40960'},
102                                    {'Data length': '8192',
103                                     'Data offset': '49152'},
104                                    {'Data length': '8192',
105                                     'Data offset': '57344'}],
106          'Equipment list': {'Controller': 'IIIA',
107                             'Description': 'Extended PicoForce'},
108          'Force file list': {'Data length': '40960',
109                              'Date': '04:42:34 PM Tue Sep 11 2007',
110                              'Start context': 'FOL2',
111                              'Text:': None,
112                              'Version': '0x06120002'}}
113         """
114         info = {}
115         header_field = None
116         for line in file:
117             line = line.strip()
118             if line.startswith('\*File list end'):
119                 break
120             if line.startswith(r'\*'):
121                 header_field = line[len(r'\*'):]
122                 if header_field in info:
123                     if isinstance(info[header_field], list):
124                         info[header_field].append({}) # >=3rd appearance
125                     else: # Second appearance
126                         info[header_field] = [info[header_field], {}]
127                 else: # First appearance
128                     info[header_field] = {}
129             else:
130                 assert line.startswith('\\'), line
131                 fields = line[len('\\'):].split(': ', 1)
132                 key = fields[0]
133                 if len(fields) == 1: # fields = [key]
134                     value = None
135                 else: # fields = [key, value]
136                     value = fields[1]
137                 if isinstance(info[header_field], list): # >=2nd header_field
138                     target_dict = info[header_field][-1]
139                 else: # first appearance of header_field
140                     target_dict = info[header_field]
141                 if key in target_dict and target_dict[key] != value:
142                     raise NotImplementedError(
143                         'Overwriting %s: %s -> %s'
144                         % (key, target_dict[key], value))
145                 target_dict[key] = value
146         return (info)
147
148     def _check_version(self, info):
149         """Ensure the input file is a version we understand.
150
151         Otherwise, raise `ValueError`.
152         """
153         version = info['Force file list'].get('Version', None)
154         if version not in ['0x06120002', '0x06130001', '0x07200000']:
155             raise NotImplementedError(
156                 '%s file version %s not supported (yet!)\n%s'
157                 % (self.name, version,
158                    pprint.pformat(info['Force file list'])))
159
160     def _read_data_path(self, path, info):
161         """Read curve data from the PicoForce file at `path`.
162
163         See :meth:`._read_data_file`.
164         """
165         f = file(path, 'rb')
166         data = self._read_data_file(f, info)
167         f.close()
168         return data
169
170     def _read_data_file(self, file, info):
171         file.seek(0)
172         traces = self._extract_traces(buffer(file.read()), info)
173         self._validate_traces(
174             traces['Z sensor'], traces['Deflection'])
175         L = len(traces['Deflection'])
176         approach = self._extract_block(
177             info, traces['Z sensor'], traces['Deflection'], 0, L/2, 'approach')
178         retract = self._extract_block(
179             info, traces['Z sensor'], traces['Deflection'], L/2, L, 'retract')
180         data = [approach, retract]
181         return data
182
183     def _extract_traces(self, buffer, info):
184         """Extract each of the three vector blocks in a PicoForce file.
185
186         The blocks are (in variable order):
187
188         * Z piezo sensor input
189         * Deflection input
190         * Deflection again?
191
192         And their headers are marked with 'Ciao force image list'.
193         """
194         traces = {}
195         version = info['Force file list']['Version']
196         type_re = re.compile('S \[(\w*)\] "([\w\s.]*)"')
197         for image in info['Ciao force image list']:
198             offset = int(image['Data offset'])
199             length = int(image['Data length'])
200             sample_size = int(image['Bytes/pixel'])
201             if sample_size != 2:
202                 raise NotImplementedError('Size: %s' % sample_size)
203             rows = length / sample_size
204             d = curve.Data(
205                 shape=(rows),
206                 dtype=numpy.int16,
207                 buffer=buffer,
208                 offset=offset,
209                 info=image,
210                 )
211             if version in ['0x06120002', '0x06130001']:
212                 match = type_re.match(image['@4:Image Data'])
213                 assert match != None, 'Bad regexp for %s, %s' \
214                     % ('@4:Image Data', image['@4:Image Data'])
215                 assert match.group(1).lower() == match.group(2).replace(' ','').lower(), \
216                     'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
217                 tname = match.group(2)
218             else:
219                 assert version == '0x07200000', version
220                 match = type_re.match(image['@4:Image Data'])
221                 assert match != None, 'Bad regexp for %s, %s' \
222                     % ('@4:Image Data', image['@4:Image Data'])
223                 if match.group(1) == 'PulseFreq1':
224                     assert match.group(2) == 'Freq. 1', match.group(2)
225                 else:
226                     assert match.group(1).lower() == match.group(2).replace(' ','').lower(), \
227                         'Name missmatch: "%s", "%s"' % (match.group(1), match.group(2))
228                 tname = match.group(2)
229                 if tname == 'Freq. 1':  # Normalize trace names between versions
230                     tname = 'Z sensor'
231                 elif tname == 'Deflection Error':
232                     tname = 'Deflection'
233             if tname in traces:
234                 #d.tofile('%s-2.dat' % tname, sep='\n')
235                 tname = self._replace_name(tname, d, traces, info)
236                 if tname == None:
237                     continue  # Don't replace anything
238             else:
239                 #d.tofile('%s.dat' % tname, sep='\n')
240                 pass
241             traces[tname] = d
242         return traces
243
244     def _validate_traces(self, z_piezo, deflection):
245         if len(z_piezo) != len(deflection):
246             raise ValueError('Trace length missmatch: %d != %d'
247                              % (len(z_piezo), len(deflection)))
248
249     def _extract_block(self, info, z_piezo, deflection, start, stop, name):
250         block = curve.Data(
251             shape=(stop-start, 2),
252             dtype=numpy.float)
253         block[:,0] = z_piezo[start:stop]
254         block[:,1] = deflection[start:stop]
255         block.info = self._translate_block_info(
256             info, z_piezo.info, deflection.info, name)
257         block.info['columns'] = ['z piezo (m)', 'deflection (m)']
258         block = self._scale_block(block)
259         return block
260
261     def _replace_name(self, trace_name, trace, traces, info):
262         """Determine if a duplicate trace name should replace an earlier trace.
263
264         Return the target trace name if it should be replaced by the
265         new trace, or `None` if the new trace should be dropped.
266         """
267         #msg = []
268         #target = traces[trace_name]
269         #
270         ## Compare the info dictionaries for each trace
271         #ik = set(trace.info.keys())
272         #ok = set(traces[trace_name].info.keys())
273         #if ik != ok:  # Ensure we have the same set of keys for both traces
274         #    msg.append('extra keys: %s, missing keys %s' % (ik-ok, ok-ik))
275         #else:
276         #    # List keys we *require* to change between traces
277         #    variable_keys = ['Data offset', 'X data type']  # TODO: What is X data type?
278         #    for key in trace.info.keys():
279         #        if key in variable_keys:
280         #            if target.info[key] == trace.info[key]:
281         #                msg.append('constant %s (%s == %s)'
282         #                           % (key, target.info[key], trace.info[key]))
283         #        else:
284         #            if target.info[key] != trace.info[key]:
285         #                msg.append('variable %s (%s != %s)'
286         #                           % (key, target.info[key], trace.info[key]))
287         # Compare the data
288         #if not (traces[trace_name] == trace).all():
289         #    msg.append('data difference')
290         #if len(msg) > 0:
291         #    raise NotImplementedError(
292         #        'Missmatched duplicate traces for %s: %s'
293         #        % (trace_name, ', '.join(msg)))
294         import logging
295         log = logging.getLogger('hooke')
296         for name,t in traces.items():
297             if (t == trace).all():
298                 log.debug('replace %s with %s-2' % (name, trace_name))
299                 return name  # Replace this identical dataset.
300         log.debug('store %s-2 as Other' % (trace_name))
301         return 'Other'
302         # return None
303
304     def _translate_block_info(self, info, z_piezo_info, deflection_info, name):
305         version = info['Force file list']['Version']
306         ret = {
307             'name': name,
308             'raw info': info,
309             'raw z piezo info': z_piezo_info,
310             'raw deflection info': deflection_info,
311             'spring constant (N/m)': float(z_piezo_info['Spring Constant']),
312             }
313
314         t = info['Force file list']['Date'] # 04:42:34 PM Tue Sep 11 2007
315         ret['time'] = time.strptime(t, '%I:%M:%S %p %a %b %d %Y')
316
317         volt_re = re.compile(
318             'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) V/LSB\) (-?[.0-9]*) V')
319         hz_re = re.compile(
320             'V \[Sens. ([\w\s.]*)\] \(([.0-9]*) kHz/LSB\) (-?[.0-9]*) kHz')
321         if version in ['0x06120002', '0x06130001']:
322             match = volt_re.match(z_piezo_info['@4:Z scale'])
323             assert match != None, 'Bad regexp for %s, %s' \
324                 % ('@4:Z scale', z_piezo_info['@4:Z scale'])
325             assert match.group(1) == 'ZSensorSens', z_piezo_info['@4:Z scale']
326         else:
327             assert version == '0x07200000', version
328             match = hz_re.match(z_piezo_info['@4:Z scale'])
329             assert match != None, 'Bad regexp for %s, %s' \
330                 % ('@4:Z scale', z_piezo_info['@4:Z scale'])
331             assert match.group(1) == 'Freq. 1', z_piezo_info['@4:Z scale']
332         ret['z piezo sensitivity (V/bit)'] = float(match.group(2))
333         ret['z piezo range (V)'] = float(match.group(3))
334         ret['z piezo offset (V)'] = 0.0
335         # offset assumed if raw data is signed...
336
337         match = volt_re.match(deflection_info['@4:Z scale'])
338         assert match != None, 'Bad regexp for %s, %s' \
339             % ('@4:Z scale', deflection_info['@4:Z scale'])
340         assert match.group(1) == 'DeflSens', z_piezo_info['@4:Z scale']
341         ret['deflection sensitivity (V/bit)'] = float(match.group(2))
342         ret['deflection range (V)'] = float(match.group(3))
343         ret['deflection offset (V)'] = 0.0
344         # offset assumed if raw data is signed...
345
346         nm_sens_re = re.compile('V ([.0-9]*) nm/V')
347         match = nm_sens_re.match(info['Ciao scan list']['@Sens. ZSensorSens'])
348         assert match != None, 'Bad regexp for %s/%s, %s' \
349             % ('Ciao scan list', '@Sens. ZSensorSens',
350                info['Ciao scan list']['@Sens. ZSensorSens'])
351         ret['z piezo sensitivity (m/V)'] = float(match.group(1))*1e-9
352
353         match = nm_sens_re.match(info['Ciao scan list']['@Sens. DeflSens'])
354         assert match != None, 'Bad regexp for %s/%s, %s' \
355             % ('Ciao scan list', '@Sens. DeflSens', info['Ciao scan list']['@Sens. DeflSens'])
356         ret['deflection sensitivity (m/V)'] = float(match.group(1))*1e-9
357
358         match = volt_re.match(info['Ciao force list']['@Z scan start'])
359         assert match != None, 'Bad regexp for %s/%s, %s' \
360             % ('Ciao force list', '@Z scan start', info['Ciao force list']['@Z scan start'])
361         ret['z piezo scan (V/bit)'] = float(match.group(2))
362         ret['z piezo scan start (V)'] = float(match.group(3))
363
364         match = volt_re.match(info['Ciao force list']['@Z scan size'])
365         assert match != None, 'Bad regexp for %s/%s, %s' \
366             % ('Ciao force list', '@Z scan size', info['Ciao force list']['@Z scan size'])
367         ret['z piezo scan size (V)'] = float(match.group(3))
368
369         const_re = re.compile('C \[([:\w\s]*)\] ([.0-9]*)')
370         match = const_re.match(z_piezo_info['@Z magnify'])
371         assert match != None, 'Bad regexp for %s, %s' \
372             % ('@Z magnify', info['@Z magnify'])
373         assert match.group(1) == '4:Z scale', match.group(1)
374         ret['z piezo gain'] = float(match.group(2))
375
376         if version in ['0x06120002', '0x06130001']:        
377             match = volt_re.match(z_piezo_info['@4:Z scale'])
378             assert match != None, 'Bad regexp for %s, %s' \
379                 % ('@4:Z scale', info['@4:Z scale'])
380             assert match.group(1) == 'ZSensorSens', match.group(1)
381             ret['z piezo sensitivity (V/bit)'] = float(match.group(2))
382             ret['z piezo range (V)'] = float(match.group(3))
383         else:
384             assert version == '0x07200000', version
385             pass
386
387         match = volt_re.match(z_piezo_info['@4:Ramp size'])
388         assert match != None, 'Bad regexp for %s, %s' \
389             % ('@4:Ramp size', info['@4:Ramp size'])
390         assert match.group(1) == 'Zsens', match.group(1)
391         ret['z piezo ramp size (V/bit)'] = float(match.group(2))
392         ret['z piezo ramp size (V)'] = float(match.group(3))
393
394         match = volt_re.match(z_piezo_info['@4:Ramp offset'])
395         assert match != None, 'Bad regexp for %s, %s' \
396             % ('@4:Ramp offset', info['@4:Ramp offset'])
397         assert match.group(1) == 'Zsens', match.group(1)
398         ret['z piezo ramp offset (V/bit)'] = float(match.group(2))
399         ret['z piezo ramp offset (V)'] = float(match.group(3))
400
401         # Unaccounted for:
402         #   Samps*
403
404         return ret
405
406     def _scale_block(self, data):
407         """Convert the block from its native format to a `numpy.float`
408         array in SI units.
409         """
410         ret = curve.Data(
411             shape=data.shape,
412             dtype=numpy.float,
413             )
414         info = data.info
415         ret.info = info
416         ret.info['raw data'] = data # store the raw data
417         data.info = {} # break circular reference info <-> data
418
419         z_col = info['columns'].index('z piezo (m)')
420         d_col = info['columns'].index('deflection (m)')
421
422         # Leading '-' because Veeco's z increases towards the surface
423         # (positive indentation), but it makes more sense to me to
424         # have it increase away from the surface (positive
425         # separation).
426         ret[:,z_col] = -(
427             (data[:,z_col].astype(ret.dtype)
428              * info['z piezo sensitivity (V/bit)']
429              - info['z piezo offset (V)'])
430             * info['z piezo gain']
431             * info['z piezo sensitivity (m/V)']
432             )
433
434         # Leading '-' because deflection voltage increases as the tip
435         # moves away from the surface, but it makes more sense to me
436         # to have it increase as it moves toward the surface (positive
437         # tension on the protein chain).
438         ret[:,d_col] = -(
439             (data[:,d_col]
440              * info['deflection sensitivity (V/bit)']
441              - info['deflection offset (V)'])
442             * info['deflection sensitivity (m/V)']
443             )
444
445         return ret