Add key server preferences signature subpacket parsing to PGPPacket
[gpg-migrate.git] / gpg-migrate.py
1 #!/usr/bin/python
2
3 import hashlib as _hashlib
4 import re as _re
5 import subprocess as _subprocess
6 import struct as _struct
7
8
9 def _get_stdout(args, stdin=None):
10     stdin_pipe = None
11     if stdin is not None:
12         stdin_pipe = _subprocess.PIPE
13     p = _subprocess.Popen(args, stdin=stdin_pipe, stdout=_subprocess.PIPE)
14     stdout, stderr = p.communicate(stdin)
15     status = p.wait()
16     if status != 0:
17         raise RuntimeError(status)
18     return stdout
19
20
21 class PGPPacket (dict):
22     # http://tools.ietf.org/search/rfc4880
23     _old_format_packet_length_type = {  # type: (bytes, struct type)
24         0: (1, 'B'),  # 1-byte unsigned integer
25         1: (2, 'H'),  # 2-byte unsigned integer
26         2: (4, 'I'),  # 4-byte unsigned integer
27         3: (None, None),
28         }
29
30     _packet_types = {
31         0: 'reserved',
32         1: 'public-key encrypted session key packet',
33         2: 'signature packet',
34         3: 'symmetric-key encrypted session key packet',
35         4: 'one-pass signature packet',
36         5: 'secret-key packet',
37         6: 'public-key packet',
38         7: 'secret-subkey packet',
39         8: 'compressed data packet',
40         9: 'symmetrically encrypted data packet',
41         10: 'marker packet',
42         11: 'literal data packet',
43         12: 'trust packet',
44         13: 'user id packet',
45         14: 'public-subkey packet',
46         17: 'user attribute packet',
47         18: 'sym. encrypted and integrity protected data packet',
48         19: 'modification detection code packet',
49         60: 'private',
50         61: 'private',
51         62: 'private',
52         63: 'private',
53         }
54
55     _public_key_algorithms = {
56         1: 'rsa (encrypt or sign)',
57         2: 'rsa encrypt-only',
58         3: 'rsa sign-only',
59         16: 'elgamal (encrypt-only)',
60         17: 'dsa (digital signature algorithm)',
61         18: 'reserved for elliptic curve',
62         19: 'reserved for ecdsa',
63         20: 'reserved (formerly elgamal encrypt or sign)',
64         21: 'reserved for diffie-hellman',
65         100: 'private',
66         101: 'private',
67         102: 'private',
68         103: 'private',
69         104: 'private',
70         105: 'private',
71         106: 'private',
72         107: 'private',
73         108: 'private',
74         109: 'private',
75         110: 'private',
76         }
77
78     _symmetric_key_algorithms = {
79         0: 'plaintext or unencrypted data',
80         1: 'idea',
81         2: 'tripledes',
82         3: 'cast5',
83         4: 'blowfish',
84         5: 'reserved',
85         6: 'reserved',
86         7: 'aes with 128-bit key',
87         8: 'aes with 192-bit key',
88         9: 'aes with 256-bit key',
89         10: 'twofish',
90         100: 'private',
91         101: 'private',
92         102: 'private',
93         103: 'private',
94         104: 'private',
95         105: 'private',
96         106: 'private',
97         107: 'private',
98         108: 'private',
99         109: 'private',
100         110: 'private',
101         }
102
103     _cipher_block_size = {  # in bits
104         'aes with 128-bit key': 128,
105         'aes with 192-bit key': 128,
106         'aes with 256-bit key': 128,
107         'cast5': 64,
108         }
109
110     _compression_algorithms = {
111         0: 'uncompressed',
112         1: 'zip',
113         2: 'zlib',
114         3: 'bzip2',
115         100: 'private',
116         101: 'private',
117         102: 'private',
118         103: 'private',
119         104: 'private',
120         105: 'private',
121         106: 'private',
122         107: 'private',
123         108: 'private',
124         109: 'private',
125         110: 'private',
126         }
127
128     _hash_algorithms = {
129         1: 'md5',
130         2: 'sha-1',
131         3: 'ripe-md/160',
132         4: 'reserved',
133         5: 'reserved',
134         6: 'reserved',
135         7: 'reserved',
136         8: 'sha256',
137         9: 'sha384',
138         10: 'sha512',
139         11: 'sha224',
140         100: 'private',
141         101: 'private',
142         102: 'private',
143         103: 'private',
144         104: 'private',
145         105: 'private',
146         106: 'private',
147         107: 'private',
148         108: 'private',
149         109: 'private',
150         110: 'private',
151         }
152
153     _string_to_key_types = {
154         0: 'simple',
155         1: 'salted',
156         2: 'reserved',
157         3: 'iterated and salted',
158         100: 'private',
159         101: 'private',
160         102: 'private',
161         103: 'private',
162         104: 'private',
163         105: 'private',
164         106: 'private',
165         107: 'private',
166         108: 'private',
167         109: 'private',
168         110: 'private',
169         }
170
171     _signature_types = {
172         0x00: 'binary document',
173         0x01: 'canonical text document',
174         0x02: 'standalone',
175         0x10: 'generic user id and public-key packet',
176         0x11: 'persona user id and public-key packet',
177         0x12: 'casual user id and public-key packet',
178         0x13: 'postitive user id and public-key packet',
179         0x18: 'subkey binding',
180         0x19: 'primary key binding',
181         0x1F: 'direct key',
182         0x20: 'key revocation',
183         0x28: 'subkey revocation',
184         0x30: 'certification revocation',
185         0x40: 'timestamp',
186         0x50: 'third-party confirmation',
187         }
188
189     _signature_subpacket_types = {
190         0: 'reserved',
191         1: 'reserved',
192         2: 'signature creation time',
193         3: 'signature expiration time',
194         4: 'exportable certification',
195         5: 'trust signature',
196         6: 'regular expression',
197         7: 'revocable',
198         8: 'reserved',
199         9: 'key expiration time',
200         10: 'placeholder for backward compatibility',
201         11: 'preferred symmetric algorithms',
202         12: 'revocation key',
203         13: 'reserved',
204         14: 'reserved',
205         15: 'reserved',
206         16: 'issuer',
207         17: 'reserved',
208         18: 'reserved',
209         19: 'reserved',
210         20: 'notation data',
211         21: 'preferred hash algorithms',
212         22: 'preferred compression algorithms',
213         23: 'key server preferences',
214         24: 'preferred key server',
215         25: 'primary user id',
216         26: 'policy uri',
217         27: 'key flags',
218         28: 'signer user id',
219         29: 'reason for revocation',
220         30: 'features',
221         31: 'signature target',
222         32: 'embedded signature',
223         100: 'private',
224         101: 'private',
225         102: 'private',
226         103: 'private',
227         104: 'private',
228         105: 'private',
229         106: 'private',
230         107: 'private',
231         108: 'private',
232         109: 'private',
233         110: 'private',
234         }
235
236     _clean_type_regex = _re.compile('\W+')
237
238     def _clean_type(self, type=None):
239         if type is None:
240             type = self['type']
241         return self._clean_type_regex.sub('_', type)
242
243     def __str__(self):
244         method_name = '_str_{}'.format(self._clean_type())
245         method = getattr(self, method_name, None)
246         if not method:
247             return self['type']
248         details = method()
249         return '{}: {}'.format(self['type'], details)
250
251     def _str_public_key_packet(self):
252         return self._str_generic_key_packet()
253
254     def _str_public_subkey_packet(self):
255         return self._str_generic_key_packet()
256
257     def _str_secret_key_packet(self):
258         return self._str_generic_key_packet()
259
260     def _str_secret_subkey_packet(self):
261         return self._str_generic_key_packet()
262
263     def _str_generic_key_packet(self):
264         return self['fingerprint'][-8:].upper()
265
266     def _str_signature_packet(self):
267         lines = [self['signature-type']]
268         if self['hashed-subpackets']:
269             lines.append('  hashed subpackets:')
270             lines.extend(self._str_signature_subpackets(
271                 self['hashed-subpackets'], prefix='    '))
272         if self['unhashed-subpackets']:
273             lines.append('  unhashed subpackets:')
274             lines.extend(self._str_signature_subpackets(
275                 self['unhashed-subpackets'], prefix='    '))
276         return '\n'.join(lines)
277
278     def _str_signature_subpackets(self, subpackets, prefix):
279         lines = []
280         for subpacket in subpackets:
281             method_name = '_str_{}_signature_subpacket'.format(
282                 self._clean_type(type=subpacket['type']))
283             method = getattr(self, method_name, None)
284             if method:
285                 lines.append('    {}: {}'.format(
286                     subpacket['type'],
287                     method(subpacket=subpacket)))
288             else:
289                 lines.append('    {}'.format(subpacket['type']))
290         return lines
291
292     def _str_issuer_signature_subpacket(self, subpacket):
293         return subpacket['issuer'][-8:].upper()
294
295     def _str_preferred_symmetric_algorithms_signature_subpacket(
296             self, subpacket):
297         return ', '.join(
298             algo for algo in subpacket['preferred-symmetric-algorithms'])
299
300     def _str_preferred_hash_algorithms_signature_subpacket(
301             self, subpacket):
302         return ', '.join(
303             algo for algo in subpacket['preferred-hash-algorithms'])
304
305     def _str_preferred_compression_algorithms_signature_subpacket(
306             self, subpacket):
307         return ', '.join(
308             algo for algo in subpacket['preferred-compression-algorithms'])
309
310     def _str_key_server_preferences_signature_subpacket(self, subpacket):
311         return ', '.join(
312             x for x in sorted(subpacket['key-server-preferences']))
313
314     def _str_key_flags_signature_subpacket(self, subpacket):
315         return ', '.join(x for x in sorted(subpacket['key-flags']))
316
317     def _str_features_signature_subpacket(self, subpacket):
318         return ', '.join(x for x in sorted(subpacket['features']))
319
320     def _str_embedded_signature_signature_subpacket(self, subpacket):
321         return subpacket['embedded']['signature-type']
322
323     def _str_user_id_packet(self):
324         return self['user']
325
326     def from_bytes(self, data):
327         offset = self._parse_header(data=data)
328         packet = data[offset:offset + self['length']]
329         if len(packet) < self['length']:
330             raise ValueError('packet too short ({} < {})'.format(
331                 len(packet), self['length']))
332         offset += self['length']
333         method_name = '_parse_{}'.format(self._clean_type())
334         method = getattr(self, method_name, None)
335         if not method:
336             raise NotImplementedError(
337                 'cannot parse packet type {!r}'.format(self['type']))
338         method(data=packet)
339         return offset
340
341     def _parse_header(self, data):
342         packet_tag = data[0]
343         offset = 1
344         always_one = packet_tag & 1 << 7
345         if not always_one:
346             raise ValueError('most significant packet tag bit not set')
347         self['new-format'] = packet_tag & 1 << 6
348         if self['new-format']:
349             type_code = packet_tag & 0b111111
350             raise NotImplementedError('new-format packet length')
351         else:
352             type_code = packet_tag >> 2 & 0b1111
353             self['length-type'] = packet_tag & 0b11
354             length_bytes, length_type = self._old_format_packet_length_type[
355                 self['length-type']]
356             if not length_bytes:
357                 raise NotImplementedError(
358                     'old-format packet of indeterminate length')
359             length_format = '>{}'.format(length_type)
360             length_data = data[offset: offset + length_bytes]
361             offset += length_bytes
362             self['length'] = _struct.unpack(length_format, length_data)[0]
363         self['type'] = self._packet_types[type_code]
364         return offset
365
366     @staticmethod
367     def _parse_multiprecision_integer(data):
368         r"""Parse RFC 4880's multiprecision integers
369
370         >>> PGPPacket._parse_multiprecision_integer(b'\x00\x01\x01')
371         (3, 1)
372         >>> PGPPacket._parse_multiprecision_integer(b'\x00\x09\x01\xff')
373         (4, 511)
374         """
375         bits = _struct.unpack('>H', data[:2])[0]
376         offset = 2
377         length = (bits + 7) // 8
378         value = 0
379         for i in range(length):
380             value += data[offset + i] * 1 << (8 * (length - i - 1))
381         offset += length
382         return (offset, value)
383
384     def _parse_string_to_key_specifier(self, data):
385         self['string-to-key-type'] = self._string_to_key_types[data[0]]
386         offset = 1
387         if self['string-to-key-type'] == 'simple':
388             self['string-to-key-hash-algorithm'] = self._hash_algorithms[
389                 data[offset]]
390             offset += 1
391         elif self['string-to-key-type'] == 'salted':
392             self['string-to-key-hash-algorithm'] = self._hash_algorithms[
393                 data[offset]]
394             offset += 1
395             self['string-to-key-salt'] = data[offset: offset + 8]
396             offset += 8
397         elif self['string-to-key-type'] == 'iterated and salted':
398             self['string-to-key-hash-algorithm'] = self._hash_algorithms[
399                 data[offset]]
400             offset += 1
401             self['string-to-key-salt'] = data[offset: offset + 8]
402             offset += 8
403             self['string-to-key-coded-count'] = data[offset]
404             offset += 1
405         else:
406             raise NotImplementedError(
407                 'string-to-key type {}'.format(self['string-to-key-type']))
408         return offset
409
410     def _parse_public_key_packet(self, data):
411         self._parse_generic_public_key_packet(data=data)
412
413     def _parse_public_subkey_packet(self, data):
414         self._parse_generic_public_key_packet(data=data)
415
416     def _parse_generic_public_key_packet(self, data):
417         self['key-version'] = data[0]
418         offset = 1
419         if self['key-version'] != 4:
420             raise NotImplementedError(
421                 'public (sub)key packet version {}'.format(
422                     self['key-version']))
423         length = 5
424         self['creation-time'], algorithm = _struct.unpack(
425             '>IB', data[offset: offset + length])
426         offset += length
427         self['public-key-algorithm'] = self._public_key_algorithms[algorithm]
428         if self['public-key-algorithm'].startswith('rsa '):
429             o, self['public-modulus'] = self._parse_multiprecision_integer(
430                 data[offset:])
431             offset += o
432             o, self['public-exponent'] = self._parse_multiprecision_integer(
433                 data[offset:])
434             offset += o
435         elif self['public-key-algorithm'].startswith('dsa '):
436             o, self['prime'] = self._parse_multiprecision_integer(
437                 data[offset:])
438             offset += o
439             o, self['group-order'] = self._parse_multiprecision_integer(
440                 data[offset:])
441             offset += o
442             o, self['group-generator'] = self._parse_multiprecision_integer(
443                 data[offset:])
444             offset += o
445             o, self['public-key'] = self._parse_multiprecision_integer(
446                 data[offset:])
447             offset += o
448         elif self['public-key-algorithm'].startswith('elgamal '):
449             o, self['prime'] = self._parse_multiprecision_integer(
450                 data[offset:])
451             offset += o
452             o, self['group-generator'] = self._parse_multiprecision_integer(
453                 data[offset:])
454             offset += o
455             o, self['public-key'] = self._parse_multiprecision_integer(
456                 data[offset:])
457             offset += o
458         else:
459             raise NotImplementedError(
460                 'algorithm-specific key fields for {}'.format(
461                     self['public-key-algorithm']))
462         fingerprint = _hashlib.sha1()
463         fingerprint.update(b'\x99')
464         fingerprint.update(_struct.pack('>H', len(data)))
465         fingerprint.update(data)
466         self['fingerprint'] = fingerprint.hexdigest()
467         return offset
468
469     def _parse_secret_key_packet(self, data):
470         self._parse_generic_secret_key_packet(data=data)
471
472     def _parse_secret_subkey_packet(self, data):
473         self._parse_generic_secret_key_packet(data=data)
474
475     def _parse_generic_secret_key_packet(self, data):
476         offset = self._parse_generic_public_key_packet(data=data)
477         string_to_key_usage = data[offset]
478         offset += 1
479         if string_to_key_usage in [255, 254]:
480             self['symmetric-encryption-algorithm'] = (
481                 self._symmetric_key_algorithms[data[offset]])
482             offset += 1
483             offset += self._parse_string_to_key_specifier(data=data[offset:])
484         else:
485             self['symmetric-encryption-algorithm'] = (
486                 self._symmetric_key_algorithms[string_to_key_usage])
487         if string_to_key_usage:
488             block_size_bits = self._cipher_block_size.get(
489                 self['symmetric-encryption-algorithm'], None)
490             if block_size_bits % 8:
491                 raise NotImplementedError(
492                     ('{}-bit block size for {} is not an integer number of bytes'
493                      ).format(
494                          block_size_bits, self['symmetric-encryption-algorithm']))
495             block_size = block_size_bits // 8
496             if not block_size:
497                 raise NotImplementedError(
498                     'unknown block size for {}'.format(
499                         self['symmetric-encryption-algorithm']))
500             self['initial-vector'] = data[offset: offset + block_size]
501             offset += block_size
502         if string_to_key_usage in [0, 255]:
503             key_end = -2
504         else:
505             key_end = 0
506         self['secret-key'] = data[offset:key_end]
507         if key_end:
508             self['secret-key-checksum'] = data[key_end:]
509
510     def _parse_signature_subpackets(self, data):
511         offset = 0
512         while offset < len(data):
513             o, subpacket = self._parse_signature_subpacket(data=data[offset:])
514             offset += o
515             yield subpacket
516
517     def _parse_signature_subpacket(self, data):
518         subpacket = {}
519         first = data[0]
520         offset = 1
521         if first < 192:
522             length = first
523         elif first >= 192 and first < 255:
524             second = data[offset]
525             offset += 1
526             length = ((first - 192) << 8) + second + 192
527         else:
528             length = _struct.unpack(
529                 '>I', data[offset: offset + 4])[0]
530             offset += 4
531         subpacket['type'] = self._signature_subpacket_types[data[offset]]
532         offset += 1
533         subpacket_data = data[offset: offset + length - 1]
534         offset += len(subpacket_data)
535         method_name = '_parse_{}_signature_subpacket'.format(
536             self._clean_type(type=subpacket['type']))
537         method = getattr(self, method_name, None)
538         if not method:
539             raise NotImplementedError(
540                 'cannot parse signature subpacket type {!r}'.format(
541                     subpacket['type']))
542         method(data=subpacket_data, subpacket=subpacket)
543         return (offset, subpacket)
544
545     def _parse_signature_packet(self, data):
546         self['signature-version'] = data[0]
547         offset = 1
548         if self['signature-version'] != 4:
549             raise NotImplementedError(
550                 'signature packet version {}'.format(
551                     self['signature-version']))
552         self['signature-type'] = self._signature_types[data[offset]]
553         offset += 1
554         self['public-key-algorithm'] = self._public_key_algorithms[
555             data[offset]]
556         offset += 1
557         self['hash-algorithm'] = self._hash_algorithms[data[offset]]
558         offset += 1
559         hashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
560         offset += 2
561         self['hashed-subpackets'] = list(self._parse_signature_subpackets(
562             data[offset: offset + hashed_count]))
563         offset += hashed_count
564         unhashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
565         offset += 2
566         self['unhashed-subpackets'] = list(self._parse_signature_subpackets(
567             data=data[offset: offset + unhashed_count]))
568         offset += unhashed_count
569         self['signed-hash-word'] = data[offset: offset + 2]
570         offset += 2
571         self['signature'] = data[offset:]
572
573     def _parse_issuer_signature_subpacket(self, data, subpacket):
574         subpacket['issuer'] = ''.join('{:02x}'.format(byte) for byte in data)
575
576     def _parse_preferred_symmetric_algorithms_signature_subpacket(
577             self, data, subpacket):
578         subpacket['preferred-symmetric-algorithms'] = [
579             self._symmetric_key_algorithms[d] for d in data]
580
581     def _parse_preferred_hash_algorithms_signature_subpacket(
582             self, data, subpacket):
583         subpacket['preferred-hash-algorithms'] = [
584             self._hash_algorithms[d] for d in data]
585
586     def _parse_preferred_compression_algorithms_signature_subpacket(
587             self, data, subpacket):
588         subpacket['preferred-compression-algorithms'] = [
589             self._compression_algorithms[d] for d in data]
590
591     def _parse_key_server_preferences_signature_subpacket(
592             self, data, subpacket):
593         subpacket['key-server-preferences'] = set()
594         if data[0] & 0x80:
595             subpacket['key-server-preferences'].add('no-modify')
596
597     def _parse_key_flags_signature_subpacket(self, data, subpacket):
598         subpacket['key-flags'] = set()
599         if data[0] & 0x1:
600             subpacket['key-flags'].add('can certify')
601         if data[0] & 0x2:
602             subpacket['key-flags'].add('can sign')
603         if data[0] & 0x4:
604             subpacket['key-flags'].add('can encrypt communications')
605         if data[0] & 0x8:
606             subpacket['key-flags'].add('can encrypt storage')
607         if data[0] & 0x10:
608             subpacket['key-flags'].add('private split')
609         if data[0] & 0x20:
610             subpacket['key-flags'].add('can authenticate')
611         if data[0] & 0x80:
612             subpacket['key-flags'].add('private shared')
613
614     def _parse_features_signature_subpacket(self, data, subpacket):
615         subpacket['features'] = set()
616         if data[0] & 0x1:
617             subpacket['features'].add('modification detection')
618
619     def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
620         subpacket['embedded'] = PGPPacket()
621         subpacket['embedded']._parse_signature_packet(data=data)
622
623     def _parse_user_id_packet(self, data):
624         self['user'] = str(data, 'utf-8')
625
626     def to_bytes(self):
627         pass
628
629
630 def packets_from_bytes(data):
631     offset = 0
632     while offset < len(data):
633         packet = PGPPacket()
634         offset += packet.from_bytes(data=data[offset:])
635         yield packet
636
637
638 class PGPKey (object):
639     def __init__(self, fingerprint):
640         self.fingerprint = fingerprint
641         self.public_packets = None
642         self.secret_packets = None
643
644     def __str__(self):
645         lines = ['key: {}'.format(self.fingerprint)]
646         if self.public_packets:
647             lines.append('  public:')
648             for packet in self.public_packets:
649                 lines.extend(self._str_packet(packet=packet, prefix='    '))
650         if self.secret_packets:
651             lines.append('  secret:')
652             for packet in self.secret_packets:
653                 lines.extend(self._str_packet(packet=packet, prefix='    '))
654         return '\n'.join(lines)
655
656     def _str_packet(self, packet, prefix):
657         lines = str(packet).split('\n')
658         return [prefix + line for line in lines]
659
660     def import_from_gpg(self):
661         key_export = _get_stdout(
662             ['gpg', '--export', self.fingerprint])
663         self.public_packets = list(
664             packets_from_bytes(data=key_export))
665         if self.public_packets[0]['type'] != 'public-key packet':
666             raise ValueError(
667                 '{} does not start with a public-key packet'.format(
668                     self.fingerprint))
669         key_secret_export = _get_stdout(
670             ['gpg', '--export-secret-keys', self.fingerprint])
671         self.secret_packets = list(
672             packets_from_bytes(data=key_secret_export))
673
674     def export_to_gpg(self):
675         raise NotImplemetedError('export to gpg')
676
677     def import_from_key(self, key):
678         """Migrate the (sub)keys into this key"""
679         pass
680
681
682 def migrate(old_key, new_key):
683     """Add the old key and sub-keys to the new key
684
685     For example, to upgrade your master key, while preserving old
686     signatures you'd made.  You will lose signature *on* your old key
687     though, since sub-keys can't be signed (I don't think).
688     """
689     old_key = PGPKey(fingerprint=old_key)
690     old_key.import_from_gpg()
691     new_key = PGPKey(fingerprint=new_key)
692     new_key.import_from_gpg()
693     new_key.import_from_key(key=old_key)
694
695     print(old_key)
696     print(new_key)
697
698
699 if __name__ == '__main__':
700     import sys as _sys
701
702     old_key, new_key = _sys.argv[1:3]
703     migrate(old_key=old_key, new_key=new_key)