Add PGPKey with a basic key-level API
[gpg-migrate.git] / gpg-migrate.py
1 #!/usr/bin/python
2
3 import re as _re
4 import subprocess as _subprocess
5 import struct as _struct
6
7
8 def _get_stdout(args, stdin=None):
9     stdin_pipe = None
10     if stdin is not None:
11         stdin_pipe = _subprocess.PIPE
12     p = _subprocess.Popen(args, stdin=stdin_pipe, stdout=_subprocess.PIPE)
13     stdout, stderr = p.communicate(stdin)
14     status = p.wait()
15     if status != 0:
16         raise RuntimeError(status)
17     return stdout
18
19
20 class PGPPacket (dict):
21     # http://tools.ietf.org/search/rfc4880
22     _old_format_packet_length_type = {  # type: (bytes, struct type)
23         0: (1, 'B'),  # 1-byte unsigned integer
24         1: (2, 'H'),  # 2-byte unsigned integer
25         2: (4, 'I'),  # 4-byte unsigned integer
26         3: (None, None),
27         }
28
29     _packet_types = {
30         0: 'reserved',
31         1: 'public-key encrypted session key packet',
32         2: 'signature packet',
33         3: 'symmetric-key encrypted session key packet',
34         4: 'one-pass signature packet',
35         5: 'secret-key packet',
36         6: 'public-key packet',
37         7: 'secret-subkey packet',
38         8: 'compressed data packet',
39         9: 'symmetrically encrypted data packet',
40         10: 'marker packet',
41         11: 'literal data packet',
42         12: 'trust packet',
43         13: 'user id packet',
44         14: 'public-subkey packet',
45         17: 'user attribute packet',
46         18: 'sym. encrypted and integrity protected data packet',
47         19: 'modification detection code packet',
48         60: 'private',
49         61: 'private',
50         62: 'private',
51         63: 'private',
52         }
53
54     _public_key_algorithms = {
55         1: 'rsa (encrypt or sign)',
56         2: 'rsa encrypt-only',
57         3: 'rsa sign-only',
58         16: 'elgamal (encrypt-only)',
59         17: 'dsa (digital signature algorithm)',
60         18: 'reserved for elliptic curve',
61         19: 'reserved for ecdsa',
62         20: 'reserved (formerly elgamal encrypt or sign)',
63         21: 'reserved for diffie-hellman',
64         100: 'private',
65         101: 'private',
66         102: 'private',
67         103: 'private',
68         104: 'private',
69         105: 'private',
70         106: 'private',
71         107: 'private',
72         108: 'private',
73         109: 'private',
74         110: 'private',
75         }
76
77     _symmetric_key_algorithms = {
78         0: 'plaintext or unencrypted data',
79         1: 'idea',
80         2: 'tripledes',
81         3: 'cast5',
82         4: 'blowfish',
83         5: 'reserved',
84         6: 'reserved',
85         7: 'aes with 128-bit key',
86         8: 'aes with 192-bit key',
87         9: 'aes with 256-bit key',
88         10: 'twofish',
89         100: 'private',
90         101: 'private',
91         102: 'private',
92         103: 'private',
93         104: 'private',
94         105: 'private',
95         106: 'private',
96         107: 'private',
97         108: 'private',
98         109: 'private',
99         110: 'private',
100         }
101
102     _cipher_block_size = {  # in bits
103         'aes with 128-bit key': 128,
104         'aes with 192-bit key': 128,
105         'aes with 256-bit key': 128,
106         'cast5': 64,
107         }
108
109     _compression_algorithms = {
110         0: 'uncompressed',
111         1: 'zip',
112         2: 'zlib',
113         3: 'bzip2',
114         100: 'private',
115         101: 'private',
116         102: 'private',
117         103: 'private',
118         104: 'private',
119         105: 'private',
120         106: 'private',
121         107: 'private',
122         108: 'private',
123         109: 'private',
124         110: 'private',
125         }
126
127     _hash_algorithms = {
128         1: 'md5',
129         2: 'sha-1',
130         3: 'ripe-md/160',
131         4: 'reserved',
132         5: 'reserved',
133         6: 'reserved',
134         7: 'reserved',
135         8: 'sha256',
136         9: 'sha384',
137         10: 'sha512',
138         11: 'sha224',
139         100: 'private',
140         101: 'private',
141         102: 'private',
142         103: 'private',
143         104: 'private',
144         105: 'private',
145         106: 'private',
146         107: 'private',
147         108: 'private',
148         109: 'private',
149         110: 'private',
150         }
151
152     _string_to_key_types = {
153         0: 'simple',
154         1: 'salted',
155         2: 'reserved',
156         3: 'iterated and salted',
157         100: 'private',
158         101: 'private',
159         102: 'private',
160         103: 'private',
161         104: 'private',
162         105: 'private',
163         106: 'private',
164         107: 'private',
165         108: 'private',
166         109: 'private',
167         110: 'private',
168         }
169
170     _signature_types = {
171         0x00: 'binary document',
172         0x01: 'canonical text document',
173         0x02: 'standalone',
174         0x10: 'generic user id and public-key packet',
175         0x11: 'persona user id and public-key packet',
176         0x12: 'casual user id and public-key packet',
177         0x13: 'postitive user id and public-key packet',
178         0x18: 'subkey binding',
179         0x19: 'primary key binding',
180         0x1F: 'direct key',
181         0x20: 'key revocation',
182         0x28: 'subkey revocation',
183         0x30: 'certification revocation',
184         0x40: 'timestamp',
185         0x50: 'third-party confirmation',
186         }
187
188     _clean_type_regex = _re.compile('\W+')
189
190     def _clean_type(self):
191         return self._clean_type_regex.sub('_', self['type'])
192
193     def __str__(self):
194         method_name = '_str_{}'.format(self._clean_type())
195         method = getattr(self, method_name, None)
196         if not method:
197             return self['type']
198         details = method()
199         return '{}: {}'.format(self['type'], details)
200
201     def _str_user_id_packet(self):
202         return self['user']
203
204     def from_bytes(self, data):
205         offset = self._parse_header(data=data)
206         packet = data[offset:offset + self['length']]
207         if len(packet) < self['length']:
208             raise ValueError('packet too short ({} < {})'.format(
209                 len(packet), self['length']))
210         offset += self['length']
211         method_name = '_parse_{}'.format(self._clean_type())
212         method = getattr(self, method_name, None)
213         if not method:
214             raise NotImplementedError(
215                 'cannot parse packet type {!r}'.format(self['type']))
216         method(data=packet)
217         return offset
218
219     def _parse_header(self, data):
220         packet_tag = data[0]
221         offset = 1
222         always_one = packet_tag & 1 << 7
223         if not always_one:
224             raise ValueError('most significant packet tag bit not set')
225         self['new-format'] = packet_tag & 1 << 6
226         if self['new-format']:
227             type_code = packet_tag & 0b111111
228             raise NotImplementedError('new-format packet length')
229         else:
230             type_code = packet_tag >> 2 & 0b1111
231             self['length-type'] = packet_tag & 0b11
232             length_bytes, length_type = self._old_format_packet_length_type[
233                 self['length-type']]
234             if not length_bytes:
235                 raise NotImplementedError(
236                     'old-format packet of indeterminate length')
237             length_format = '>{}'.format(length_type)
238             length_data = data[offset: offset + length_bytes]
239             offset += length_bytes
240             self['length'] = _struct.unpack(length_format, length_data)[0]
241         self['type'] = self._packet_types[type_code]
242         return offset
243
244     @staticmethod
245     def _parse_multiprecision_integer(data):
246         r"""Parse RFC 4880's multiprecision integers
247
248         >>> PGPPacket._parse_multiprecision_integer(b'\x00\x01\x01')
249         (3, 1)
250         >>> PGPPacket._parse_multiprecision_integer(b'\x00\x09\x01\xff')
251         (4, 511)
252         """
253         bits = _struct.unpack('>H', data[:2])[0]
254         offset = 2
255         length = (bits + 7) // 8
256         value = 0
257         for i in range(length):
258             value += data[offset + i] * 1 << (8 * (length - i - 1))
259         offset += length
260         return (offset, value)
261
262     def _parse_string_to_key_specifier(self, data):
263         self['string-to-key-type'] = self._string_to_key_types[data[0]]
264         offset = 1
265         if self['string-to-key-type'] == 'simple':
266             self['string-to-key-hash-algorithm'] = self._hash_algorithms[
267                 data[offset]]
268             offset += 1
269         elif self['string-to-key-type'] == 'salted':
270             self['string-to-key-hash-algorithm'] = self._hash_algorithms[
271                 data[offset]]
272             offset += 1
273             self['string-to-key-salt'] = data[offset: offset + 8]
274             offset += 8
275         elif self['string-to-key-type'] == 'iterated and salted':
276             self['string-to-key-hash-algorithm'] = self._hash_algorithms[
277                 data[offset]]
278             offset += 1
279             self['string-to-key-salt'] = data[offset: offset + 8]
280             offset += 8
281             self['string-to-key-coded-count'] = data[offset]
282             offset += 1
283         else:
284             raise NotImplementedError(
285                 'string-to-key type {}'.format(self['string-to-key-type']))
286         return offset
287
288     def _parse_public_key_packet(self, data):
289         self._parse_generic_public_key_packet(data=data)
290
291     def _parse_public_subkey_packet(self, data):
292         self._parse_generic_public_key_packet(data=data)
293
294     def _parse_generic_public_key_packet(self, data):
295         self['key-version'] = data[0]
296         offset = 1
297         if self['key-version'] != 4:
298             raise NotImplementedError(
299                 'public (sub)key packet version {}'.format(
300                     self['key-version']))
301         length = 5
302         self['creation-time'], algorithm = _struct.unpack(
303             '>IB', data[offset: offset + length])
304         offset += length
305         self['public-key-algorithm'] = self._public_key_algorithms[algorithm]
306         if self['public-key-algorithm'].startswith('rsa '):
307             o, self['public-modulus'] = self._parse_multiprecision_integer(
308                 data[offset:])
309             offset += o
310             o, self['public-exponent'] = self._parse_multiprecision_integer(
311                 data[offset:])
312             offset += o
313         elif self['public-key-algorithm'].startswith('dsa '):
314             o, self['prime'] = self._parse_multiprecision_integer(
315                 data[offset:])
316             offset += o
317             o, self['group-order'] = self._parse_multiprecision_integer(
318                 data[offset:])
319             offset += o
320             o, self['group-generator'] = self._parse_multiprecision_integer(
321                 data[offset:])
322             offset += o
323             o, self['public-key'] = self._parse_multiprecision_integer(
324                 data[offset:])
325             offset += o
326         elif self['public-key-algorithm'].startswith('elgamal '):
327             o, self['prime'] = self._parse_multiprecision_integer(
328                 data[offset:])
329             offset += o
330             o, self['group-generator'] = self._parse_multiprecision_integer(
331                 data[offset:])
332             offset += o
333             o, self['public-key'] = self._parse_multiprecision_integer(
334                 data[offset:])
335             offset += o
336         else:
337             raise NotImplementedError(
338                 'algorithm-specific key fields for {}'.format(
339                     self['public-key-algorithm']))
340         return offset
341
342     def _parse_secret_key_packet(self, data):
343         self._parse_generic_secret_key_packet(data=data)
344
345     def _parse_secret_subkey_packet(self, data):
346         self._parse_generic_secret_key_packet(data=data)
347
348     def _parse_generic_secret_key_packet(self, data):
349         offset = self._parse_generic_public_key_packet(data=data)
350         string_to_key_usage = data[offset]
351         offset += 1
352         if string_to_key_usage in [255, 254]:
353             self['symmetric-encryption-algorithm'] = (
354                 self._symmetric_key_algorithms[data[offset]])
355             offset += 1
356             offset += self._parse_string_to_key_specifier(data=data[offset:])
357         else:
358             self['symmetric-encryption-algorithm'] = (
359                 self._symmetric_key_algorithms[string_to_key_usage])
360         if string_to_key_usage:
361             block_size_bits = self._cipher_block_size.get(
362                 self['symmetric-encryption-algorithm'], None)
363             if block_size_bits % 8:
364                 raise NotImplementedError(
365                     ('{}-bit block size for {} is not an integer number of bytes'
366                      ).format(
367                          block_size_bits, self['symmetric-encryption-algorithm']))
368             block_size = block_size_bits // 8
369             if not block_size:
370                 raise NotImplementedError(
371                     'unknown block size for {}'.format(
372                         self['symmetric-encryption-algorithm']))
373             self['initial-vector'] = data[offset: offset + block_size]
374             offset += block_size
375         if string_to_key_usage in [0, 255]:
376             key_end = -2
377         else:
378             key_end = 0
379         self['secret-key'] = data[offset:key_end]
380         if key_end:
381             self['secret-key-checksum'] = data[key_end:]
382
383     def _parse_signature_packet(self, data):
384         self['signature-version'] = data[0]
385         offset = 1
386         if self['signature-version'] != 4:
387             raise NotImplementedError(
388                 'signature packet version {}'.format(
389                     self['signature-version']))
390         self['signature-type'] = self._signature_types[data[offset]]
391         offset += 1
392         self['public-key-algorithm'] = self._public_key_algorithms[
393             data[offset]]
394         offset += 1
395         self['hash-algorithm'] = self._hash_algorithms[data[offset]]
396         offset += 1
397         hashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
398         offset += 2
399         self['hashed-subpackets'] = data[offset: offset + hashed_count]
400         offset += hashed_count
401         unhashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
402         offset += 2
403         self['unhashed-subpackets'] = data[offset: offset + unhashed_count]
404         offset += unhashed_count
405         self['signed-hash-word'] = data[offset: offset + 2]
406         offset += 2
407         self['signature'] = data[offset:]
408
409     def _parse_user_id_packet(self, data):
410         self['user'] = str(data, 'utf-8')
411
412     def to_bytes(self):
413         pass
414
415
416 def packets_from_bytes(data):
417     offset = 0
418     while offset < len(data):
419         packet = PGPPacket()
420         offset += packet.from_bytes(data=data[offset:])
421         yield packet
422
423
424 class PGPKey (object):
425     def __init__(self, fingerprint):
426         self.fingerprint = fingerprint
427         self.public_packets = None
428         self.secret_packets = None
429
430     def __str__(self):
431         lines = ['key: {}'.format(self.fingerprint)]
432         if self.public_packets:
433             lines.append('  public:')
434             for packet in self.public_packets:
435                 lines.append('    {}'.format(packet))
436         if self.secret_packets:
437             lines.append('  secret:')
438             for packet in self.secret_packets:
439                 lines.append('    {}'.format(packet))
440         return '\n'.join(lines)
441
442     def import_from_gpg(self):
443         key_export = _get_stdout(
444             ['gpg', '--export', self.fingerprint])
445         self.public_packets = list(
446             packets_from_bytes(data=key_export))
447         if self.public_packets[0]['type'] != 'public-key packet':
448             raise ValueError(
449                 '{} does not start with a public-key packet'.format(
450                     self.fingerprint))
451         key_secret_export = _get_stdout(
452             ['gpg', '--export-secret-keys', self.fingerprint])
453         self.secret_packets = list(
454             packets_from_bytes(data=key_secret_export))
455
456     def export_to_gpg(self):
457         raise NotImplemetedError('export to gpg')
458
459
460 def migrate(old_key, new_key):
461     """Add the old key and sub-keys to the new key
462
463     For example, to upgrade your master key, while preserving old
464     signatures you'd made.  You will lose signature *on* your old key
465     though, since sub-keys can't be signed (I don't think).
466     """
467     old_key = PGPKey(fingerprint=old_key)
468     old_key.import_from_gpg()
469     new_key = PGPKey(fingerprint=new_key)
470     new_key.import_from_gpg()
471
472     print(old_key)
473     print(new_key)
474
475
476 if __name__ == '__main__':
477     import sys as _sys
478
479     old_key, new_key = _sys.argv[1:3]
480     migrate(old_key=old_key, new_key=new_key)