3 import hashlib as _hashlib
5 import subprocess as _subprocess
6 import struct as _struct
9 def _get_stdout(args, stdin=None):
12 stdin_pipe = _subprocess.PIPE
13 p = _subprocess.Popen(args, stdin=stdin_pipe, stdout=_subprocess.PIPE)
14 stdout, stderr = p.communicate(stdin)
17 raise RuntimeError(status)
21 class PGPPacket (dict):
22 # http://tools.ietf.org/search/rfc4880
23 _old_format_packet_length_type = { # type: (bytes, struct type)
24 0: (1, 'B'), # 1-byte unsigned integer
25 1: (2, 'H'), # 2-byte unsigned integer
26 2: (4, 'I'), # 4-byte unsigned integer
32 1: 'public-key encrypted session key packet',
33 2: 'signature packet',
34 3: 'symmetric-key encrypted session key packet',
35 4: 'one-pass signature packet',
36 5: 'secret-key packet',
37 6: 'public-key packet',
38 7: 'secret-subkey packet',
39 8: 'compressed data packet',
40 9: 'symmetrically encrypted data packet',
42 11: 'literal data packet',
45 14: 'public-subkey packet',
46 17: 'user attribute packet',
47 18: 'sym. encrypted and integrity protected data packet',
48 19: 'modification detection code packet',
55 _public_key_algorithms = {
56 1: 'rsa (encrypt or sign)',
57 2: 'rsa encrypt-only',
59 16: 'elgamal (encrypt-only)',
60 17: 'dsa (digital signature algorithm)',
61 18: 'reserved for elliptic curve',
62 19: 'reserved for ecdsa',
63 20: 'reserved (formerly elgamal encrypt or sign)',
64 21: 'reserved for diffie-hellman',
78 _symmetric_key_algorithms = {
79 0: 'plaintext or unencrypted data',
86 7: 'aes with 128-bit key',
87 8: 'aes with 192-bit key',
88 9: 'aes with 256-bit key',
103 _cipher_block_size = { # in bits
104 'aes with 128-bit key': 128,
105 'aes with 192-bit key': 128,
106 'aes with 256-bit key': 128,
110 _compression_algorithms = {
153 _string_to_key_types = {
157 3: 'iterated and salted',
172 0x00: 'binary document',
173 0x01: 'canonical text document',
175 0x10: 'generic user id and public-key packet',
176 0x11: 'persona user id and public-key packet',
177 0x12: 'casual user id and public-key packet',
178 0x13: 'postitive user id and public-key packet',
179 0x18: 'subkey binding',
180 0x19: 'primary key binding',
182 0x20: 'key revocation',
183 0x28: 'subkey revocation',
184 0x30: 'certification revocation',
186 0x50: 'third-party confirmation',
189 _signature_subpacket_types = {
192 2: 'signature creation time',
193 3: 'signature expiration time',
194 4: 'exportable certification',
195 5: 'trust signature',
196 6: 'regular expression',
199 9: 'key expiration time',
200 10: 'placeholder for backward compatibility',
201 11: 'preferred symmetric algorithms',
202 12: 'revocation key',
211 21: 'preferred hash algorithms',
212 22: 'preferred compression algorithms',
213 23: 'key server preferences',
214 24: 'preferred key server',
215 25: 'primary user id',
218 28: 'signer user id',
219 29: 'reason for revocation',
221 31: 'signature target',
222 32: 'embedded signature',
236 _clean_type_regex = _re.compile('\W+')
238 def _clean_type(self, type=None):
241 return self._clean_type_regex.sub('_', type)
244 method_name = '_str_{}'.format(self._clean_type())
245 method = getattr(self, method_name, None)
249 return '{}: {}'.format(self['type'], details)
251 def _str_public_key_packet(self):
252 return self._str_generic_key_packet()
254 def _str_public_subkey_packet(self):
255 return self._str_generic_key_packet()
257 def _str_secret_key_packet(self):
258 return self._str_generic_key_packet()
260 def _str_secret_subkey_packet(self):
261 return self._str_generic_key_packet()
263 def _str_generic_key_packet(self):
264 return self['fingerprint'][-8:].upper()
266 def _str_signature_packet(self):
267 lines = [self['signature-type']]
268 if self['hashed-subpackets']:
269 lines.append(' hashed subpackets:')
270 lines.extend(self._str_signature_subpackets(
271 self['hashed-subpackets'], prefix=' '))
272 if self['unhashed-subpackets']:
273 lines.append(' unhashed subpackets:')
274 lines.extend(self._str_signature_subpackets(
275 self['unhashed-subpackets'], prefix=' '))
276 return '\n'.join(lines)
278 def _str_signature_subpackets(self, subpackets, prefix):
280 for subpacket in subpackets:
281 method_name = '_str_{}_signature_subpacket'.format(
282 self._clean_type(type=subpacket['type']))
283 method = getattr(self, method_name, None)
285 lines.append(' {}: {}'.format(
287 method(subpacket=subpacket)))
289 lines.append(' {}'.format(subpacket['type']))
292 def _str_issuer_signature_subpacket(self, subpacket):
293 return subpacket['issuer'][-8:].upper()
295 def _str_preferred_symmetric_algorithms_signature_subpacket(
298 algo for algo in subpacket['preferred-symmetric-algorithms'])
300 def _str_preferred_hash_algorithms_signature_subpacket(
303 algo for algo in subpacket['preferred-hash-algorithms'])
305 def _str_preferred_compression_algorithms_signature_subpacket(
308 algo for algo in subpacket['preferred-compression-algorithms'])
310 def _str_key_server_preferences_signature_subpacket(self, subpacket):
312 x for x in sorted(subpacket['key-server-preferences']))
314 def _str_key_flags_signature_subpacket(self, subpacket):
315 return ', '.join(x for x in sorted(subpacket['key-flags']))
317 def _str_features_signature_subpacket(self, subpacket):
318 return ', '.join(x for x in sorted(subpacket['features']))
320 def _str_embedded_signature_signature_subpacket(self, subpacket):
321 return subpacket['embedded']['signature-type']
323 def _str_user_id_packet(self):
326 def from_bytes(self, data):
327 offset = self._parse_header(data=data)
328 packet = data[offset:offset + self['length']]
329 if len(packet) < self['length']:
330 raise ValueError('packet too short ({} < {})'.format(
331 len(packet), self['length']))
332 offset += self['length']
333 method_name = '_parse_{}'.format(self._clean_type())
334 method = getattr(self, method_name, None)
336 raise NotImplementedError(
337 'cannot parse packet type {!r}'.format(self['type']))
341 def _parse_header(self, data):
344 always_one = packet_tag & 1 << 7
346 raise ValueError('most significant packet tag bit not set')
347 self['new-format'] = packet_tag & 1 << 6
348 if self['new-format']:
349 type_code = packet_tag & 0b111111
350 raise NotImplementedError('new-format packet length')
352 type_code = packet_tag >> 2 & 0b1111
353 self['length-type'] = packet_tag & 0b11
354 length_bytes, length_type = self._old_format_packet_length_type[
357 raise NotImplementedError(
358 'old-format packet of indeterminate length')
359 length_format = '>{}'.format(length_type)
360 length_data = data[offset: offset + length_bytes]
361 offset += length_bytes
362 self['length'] = _struct.unpack(length_format, length_data)[0]
363 self['type'] = self._packet_types[type_code]
367 def _parse_multiprecision_integer(data):
368 r"""Parse RFC 4880's multiprecision integers
370 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x01\x01')
372 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x09\x01\xff')
375 bits = _struct.unpack('>H', data[:2])[0]
377 length = (bits + 7) // 8
379 for i in range(length):
380 value += data[offset + i] * 1 << (8 * (length - i - 1))
382 return (offset, value)
384 def _parse_string_to_key_specifier(self, data):
385 self['string-to-key-type'] = self._string_to_key_types[data[0]]
387 if self['string-to-key-type'] == 'simple':
388 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
391 elif self['string-to-key-type'] == 'salted':
392 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
395 self['string-to-key-salt'] = data[offset: offset + 8]
397 elif self['string-to-key-type'] == 'iterated and salted':
398 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
401 self['string-to-key-salt'] = data[offset: offset + 8]
403 self['string-to-key-coded-count'] = data[offset]
406 raise NotImplementedError(
407 'string-to-key type {}'.format(self['string-to-key-type']))
410 def _parse_public_key_packet(self, data):
411 self._parse_generic_public_key_packet(data=data)
413 def _parse_public_subkey_packet(self, data):
414 self._parse_generic_public_key_packet(data=data)
416 def _parse_generic_public_key_packet(self, data):
417 self['key-version'] = data[0]
419 if self['key-version'] != 4:
420 raise NotImplementedError(
421 'public (sub)key packet version {}'.format(
422 self['key-version']))
424 self['creation-time'], algorithm = _struct.unpack(
425 '>IB', data[offset: offset + length])
427 self['public-key-algorithm'] = self._public_key_algorithms[algorithm]
428 if self['public-key-algorithm'].startswith('rsa '):
429 o, self['public-modulus'] = self._parse_multiprecision_integer(
432 o, self['public-exponent'] = self._parse_multiprecision_integer(
435 elif self['public-key-algorithm'].startswith('dsa '):
436 o, self['prime'] = self._parse_multiprecision_integer(
439 o, self['group-order'] = self._parse_multiprecision_integer(
442 o, self['group-generator'] = self._parse_multiprecision_integer(
445 o, self['public-key'] = self._parse_multiprecision_integer(
448 elif self['public-key-algorithm'].startswith('elgamal '):
449 o, self['prime'] = self._parse_multiprecision_integer(
452 o, self['group-generator'] = self._parse_multiprecision_integer(
455 o, self['public-key'] = self._parse_multiprecision_integer(
459 raise NotImplementedError(
460 'algorithm-specific key fields for {}'.format(
461 self['public-key-algorithm']))
462 fingerprint = _hashlib.sha1()
463 fingerprint.update(b'\x99')
464 fingerprint.update(_struct.pack('>H', len(data)))
465 fingerprint.update(data)
466 self['fingerprint'] = fingerprint.hexdigest()
469 def _parse_secret_key_packet(self, data):
470 self._parse_generic_secret_key_packet(data=data)
472 def _parse_secret_subkey_packet(self, data):
473 self._parse_generic_secret_key_packet(data=data)
475 def _parse_generic_secret_key_packet(self, data):
476 offset = self._parse_generic_public_key_packet(data=data)
477 string_to_key_usage = data[offset]
479 if string_to_key_usage in [255, 254]:
480 self['symmetric-encryption-algorithm'] = (
481 self._symmetric_key_algorithms[data[offset]])
483 offset += self._parse_string_to_key_specifier(data=data[offset:])
485 self['symmetric-encryption-algorithm'] = (
486 self._symmetric_key_algorithms[string_to_key_usage])
487 if string_to_key_usage:
488 block_size_bits = self._cipher_block_size.get(
489 self['symmetric-encryption-algorithm'], None)
490 if block_size_bits % 8:
491 raise NotImplementedError(
492 ('{}-bit block size for {} is not an integer number of bytes'
494 block_size_bits, self['symmetric-encryption-algorithm']))
495 block_size = block_size_bits // 8
497 raise NotImplementedError(
498 'unknown block size for {}'.format(
499 self['symmetric-encryption-algorithm']))
500 self['initial-vector'] = data[offset: offset + block_size]
502 if string_to_key_usage in [0, 255]:
506 self['secret-key'] = data[offset:key_end]
508 self['secret-key-checksum'] = data[key_end:]
510 def _parse_signature_subpackets(self, data):
512 while offset < len(data):
513 o, subpacket = self._parse_signature_subpacket(data=data[offset:])
517 def _parse_signature_subpacket(self, data):
523 elif first >= 192 and first < 255:
524 second = data[offset]
526 length = ((first - 192) << 8) + second + 192
528 length = _struct.unpack(
529 '>I', data[offset: offset + 4])[0]
531 subpacket['type'] = self._signature_subpacket_types[data[offset]]
533 subpacket_data = data[offset: offset + length - 1]
534 offset += len(subpacket_data)
535 method_name = '_parse_{}_signature_subpacket'.format(
536 self._clean_type(type=subpacket['type']))
537 method = getattr(self, method_name, None)
539 raise NotImplementedError(
540 'cannot parse signature subpacket type {!r}'.format(
542 method(data=subpacket_data, subpacket=subpacket)
543 return (offset, subpacket)
545 def _parse_signature_packet(self, data):
546 self['signature-version'] = data[0]
548 if self['signature-version'] != 4:
549 raise NotImplementedError(
550 'signature packet version {}'.format(
551 self['signature-version']))
552 self['signature-type'] = self._signature_types[data[offset]]
554 self['public-key-algorithm'] = self._public_key_algorithms[
557 self['hash-algorithm'] = self._hash_algorithms[data[offset]]
559 hashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
561 self['hashed-subpackets'] = list(self._parse_signature_subpackets(
562 data[offset: offset + hashed_count]))
563 offset += hashed_count
564 unhashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
566 self['unhashed-subpackets'] = list(self._parse_signature_subpackets(
567 data=data[offset: offset + unhashed_count]))
568 offset += unhashed_count
569 self['signed-hash-word'] = data[offset: offset + 2]
571 self['signature'] = data[offset:]
573 def _parse_issuer_signature_subpacket(self, data, subpacket):
574 subpacket['issuer'] = ''.join('{:02x}'.format(byte) for byte in data)
576 def _parse_preferred_symmetric_algorithms_signature_subpacket(
577 self, data, subpacket):
578 subpacket['preferred-symmetric-algorithms'] = [
579 self._symmetric_key_algorithms[d] for d in data]
581 def _parse_preferred_hash_algorithms_signature_subpacket(
582 self, data, subpacket):
583 subpacket['preferred-hash-algorithms'] = [
584 self._hash_algorithms[d] for d in data]
586 def _parse_preferred_compression_algorithms_signature_subpacket(
587 self, data, subpacket):
588 subpacket['preferred-compression-algorithms'] = [
589 self._compression_algorithms[d] for d in data]
591 def _parse_key_server_preferences_signature_subpacket(
592 self, data, subpacket):
593 subpacket['key-server-preferences'] = set()
595 subpacket['key-server-preferences'].add('no-modify')
597 def _parse_key_flags_signature_subpacket(self, data, subpacket):
598 subpacket['key-flags'] = set()
600 subpacket['key-flags'].add('can certify')
602 subpacket['key-flags'].add('can sign')
604 subpacket['key-flags'].add('can encrypt communications')
606 subpacket['key-flags'].add('can encrypt storage')
608 subpacket['key-flags'].add('private split')
610 subpacket['key-flags'].add('can authenticate')
612 subpacket['key-flags'].add('private shared')
614 def _parse_features_signature_subpacket(self, data, subpacket):
615 subpacket['features'] = set()
617 subpacket['features'].add('modification detection')
619 def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
620 subpacket['embedded'] = PGPPacket()
621 subpacket['embedded']._parse_signature_packet(data=data)
623 def _parse_user_id_packet(self, data):
624 self['user'] = str(data, 'utf-8')
630 def packets_from_bytes(data):
632 while offset < len(data):
634 offset += packet.from_bytes(data=data[offset:])
638 class PGPKey (object):
639 def __init__(self, fingerprint):
640 self.fingerprint = fingerprint
641 self.public_packets = None
642 self.secret_packets = None
645 lines = ['key: {}'.format(self.fingerprint)]
646 if self.public_packets:
647 lines.append(' public:')
648 for packet in self.public_packets:
649 lines.extend(self._str_packet(packet=packet, prefix=' '))
650 if self.secret_packets:
651 lines.append(' secret:')
652 for packet in self.secret_packets:
653 lines.extend(self._str_packet(packet=packet, prefix=' '))
654 return '\n'.join(lines)
656 def _str_packet(self, packet, prefix):
657 lines = str(packet).split('\n')
658 return [prefix + line for line in lines]
660 def import_from_gpg(self):
661 key_export = _get_stdout(
662 ['gpg', '--export', self.fingerprint])
663 self.public_packets = list(
664 packets_from_bytes(data=key_export))
665 if self.public_packets[0]['type'] != 'public-key packet':
667 '{} does not start with a public-key packet'.format(
669 key_secret_export = _get_stdout(
670 ['gpg', '--export-secret-keys', self.fingerprint])
671 self.secret_packets = list(
672 packets_from_bytes(data=key_secret_export))
674 def export_to_gpg(self):
675 raise NotImplemetedError('export to gpg')
677 def import_from_key(self, key):
678 """Migrate the (sub)keys into this key"""
682 def migrate(old_key, new_key):
683 """Add the old key and sub-keys to the new key
685 For example, to upgrade your master key, while preserving old
686 signatures you'd made. You will lose signature *on* your old key
687 though, since sub-keys can't be signed (I don't think).
689 old_key = PGPKey(fingerprint=old_key)
690 old_key.import_from_gpg()
691 new_key = PGPKey(fingerprint=new_key)
692 new_key.import_from_gpg()
693 new_key.import_from_key(key=old_key)
699 if __name__ == '__main__':
702 old_key, new_key = _sys.argv[1:3]
703 migrate(old_key=old_key, new_key=new_key)