3 import getpass as _getpass
4 import hashlib as _hashlib
5 import logging as _logging
8 import subprocess as _subprocess
9 import struct as _struct
11 import Crypto.Cipher.AES as _crypto_cipher_aes
12 import Crypto.Cipher.Blowfish as _crypto_cipher_blowfish
13 import Crypto.Cipher.CAST as _crypto_cipher_cast
14 import Crypto.Cipher.DES3 as _crypto_cipher_des3
15 import Crypto.PublicKey.DSA as _crypto_publickey_dsa
16 import Crypto.PublicKey.ElGamal as _crypto_publickey_elgamal
17 import Crypto.PublicKey.RSA as _crypto_publickey_rsa
18 import Crypto.Random.random as _crypto_random_random
19 import Crypto.Signature.PKCS1_v1_5 as _crypto_signature_pkcs1_v1_5
20 import Crypto.Hash.SHA as _crypto_hash_sha
23 LOG = _logging.getLogger('gpg-migrate')
24 LOG.addHandler(_logging.StreamHandler())
25 LOG.setLevel(_logging.WARNING)
28 def _get_stdout(args, stdin=None):
31 stdin_pipe = _subprocess.PIPE
32 p = _subprocess.Popen(args, stdin=stdin_pipe, stdout=_subprocess.PIPE)
33 stdout, stderr = p.communicate(stdin)
36 raise RuntimeError(status)
40 def byte_string(data, sep=' '):
41 r"""Convert a byte-string to human readable form
43 >>> byte_string(b'\x12\x34\x56')
46 return sep.join('{:02x}'.format(byte) for byte in data)
49 def string_bytes(data, sep=' '):
50 r"""Reverse byte_string()
52 >>> string_bytes('12 fa fb')
56 int(c1+c2, base=16) for c1,c2 in
57 zip(data[::2 + len(sep)], data[1::2 + len(sep)]))
60 class PGPPacket (dict):
61 # http://tools.ietf.org/search/rfc4880
62 _old_format_packet_length_type = { # type: (bytes, struct type)
63 0: (1, 'B'), # 1-byte unsigned integer
64 1: (2, 'H'), # 2-byte unsigned integer
65 2: (4, 'I'), # 4-byte unsigned integer
71 1: 'public-key encrypted session key packet',
72 2: 'signature packet',
73 3: 'symmetric-key encrypted session key packet',
74 4: 'one-pass signature packet',
75 5: 'secret-key packet',
76 6: 'public-key packet',
77 7: 'secret-subkey packet',
78 8: 'compressed data packet',
79 9: 'symmetrically encrypted data packet',
81 11: 'literal data packet',
84 14: 'public-subkey packet',
85 17: 'user attribute packet',
86 18: 'sym. encrypted and integrity protected data packet',
87 19: 'modification detection code packet',
94 _public_key_algorithms = {
95 1: 'rsa (encrypt or sign)',
96 2: 'rsa encrypt-only',
98 16: 'elgamal (encrypt-only)',
99 17: 'dsa (digital signature algorithm)',
100 18: 'reserved for elliptic curve',
101 19: 'reserved for ecdsa',
102 20: 'reserved (formerly elgamal encrypt or sign)',
103 21: 'reserved for diffie-hellman',
117 _symmetric_key_algorithms = {
118 0: 'plaintext or unencrypted data',
125 7: 'aes with 128-bit key',
126 8: 'aes with 192-bit key',
127 9: 'aes with 256-bit key',
142 _cipher_block_size = { # in bits
143 'aes with 128-bit key': 128,
144 'aes with 192-bit key': 128,
145 'aes with 256-bit key': 128,
151 # symmetric-key encryption
152 'aes with 128-bit key': _crypto_cipher_aes,
153 'aes with 192-bit key': _crypto_cipher_aes,
154 'aes with 256-bit key': _crypto_cipher_aes,
155 'blowfish': _crypto_cipher_blowfish,
156 'cast5': _crypto_cipher_cast,
157 'tripledes': _crypto_cipher_des3,
158 # public-key encryption
159 'dsa (digital signature algorithm)': _crypto_publickey_dsa,
160 'elgamal (encrypt-only)': _crypto_publickey_elgamal,
161 'rsa (encrypt or sign)': _crypto_publickey_rsa,
162 'rsa encrypt-only': _crypto_publickey_rsa,
163 'rsa sign-only': _crypto_publickey_rsa,
166 _key_size = { # in bits
167 'aes with 128-bit key': 128,
168 'aes with 192-bit key': 192,
169 'aes with 256-bit key': 256,
173 _compression_algorithms = {
216 _hashlib_name = { # map OpenPGP-based names to hashlib names
219 'ripe-md/160': 'ripemd160',
226 _string_to_key_types = {
230 3: 'iterated and salted',
244 _string_to_key_expbias = 6
247 0x00: 'binary document',
248 0x01: 'canonical text document',
250 0x10: 'generic user id and public-key packet',
251 0x11: 'persona user id and public-key packet',
252 0x12: 'casual user id and public-key packet',
253 0x13: 'postitive user id and public-key packet',
254 0x18: 'subkey binding',
255 0x19: 'primary key binding',
257 0x20: 'key revocation',
258 0x28: 'subkey revocation',
259 0x30: 'certification revocation',
261 0x50: 'third-party confirmation',
264 _signature_subpacket_types = {
267 2: 'signature creation time',
268 3: 'signature expiration time',
269 4: 'exportable certification',
270 5: 'trust signature',
271 6: 'regular expression',
274 9: 'key expiration time',
275 10: 'placeholder for backward compatibility',
276 11: 'preferred symmetric algorithms',
277 12: 'revocation key',
286 21: 'preferred hash algorithms',
287 22: 'preferred compression algorithms',
288 23: 'key server preferences',
289 24: 'preferred key server',
290 25: 'primary user id',
293 28: 'signer user id',
294 29: 'reason for revocation',
296 31: 'signature target',
297 32: 'embedded signature',
311 _clean_type_regex = _re.compile('\W+')
313 def __init__(self, key=None):
314 super(PGPPacket, self).__init__()
317 def _clean_type(self, type=None):
320 return self._clean_type_regex.sub('_', type)
323 def _reverse(dict, value):
324 """Reverse lookups in dictionaries
326 >>> PGPPacket._reverse(PGPPacket._packet_types, 'public-key packet')
329 return [k for k,v in dict.items() if v == value][0]
332 packet = PGPPacket(key=self.key)
337 method_name = '_str_{}'.format(self._clean_type())
338 method = getattr(self, method_name, None)
342 return '{}: {}'.format(self['type'], details)
344 def _str_public_key_packet(self):
345 return self._str_generic_key_packet()
347 def _str_public_subkey_packet(self):
348 return self._str_generic_key_packet()
350 def _str_generic_key_packet(self):
351 return self['fingerprint'][-8:].upper()
353 def _str_secret_key_packet(self):
354 return self._str_generic_secret_key_packet()
356 def _str_secret_subkey_packet(self):
357 return self._str_generic_secret_key_packet()
359 def _str_generic_secret_key_packet(self):
360 lines = [self._str_generic_key_packet()]
362 ('symmetric encryption',
363 'symmetric-encryption-algorithm'),
364 ('s2k hash', 'string-to-key-hash-algorithm'),
365 ('s2k count', 'string-to-key-count'),
366 ('s2k salt', 'string-to-key-salt'),
367 ('IV', 'initial-vector'),
371 if isinstance(value, bytes):
372 value = byte_string(data=value)
373 lines.append(' {}: {}'.format(label, value))
374 return '\n'.join(lines)
376 def _str_signature_packet(self):
377 lines = [self['signature-type']]
378 if self['hashed-subpackets']:
379 lines.append(' hashed subpackets:')
380 lines.extend(self._str_signature_subpackets(
381 self['hashed-subpackets'], prefix=' '))
382 if self['unhashed-subpackets']:
383 lines.append(' unhashed subpackets:')
384 lines.extend(self._str_signature_subpackets(
385 self['unhashed-subpackets'], prefix=' '))
386 return '\n'.join(lines)
388 def _str_signature_subpackets(self, subpackets, prefix):
390 for subpacket in subpackets:
391 method_name = '_str_{}_signature_subpacket'.format(
392 self._clean_type(type=subpacket['type']))
393 method = getattr(self, method_name, None)
395 lines.append(' {}: {}'.format(
397 method(subpacket=subpacket)))
399 lines.append(' {}'.format(subpacket['type']))
402 def _str_signature_creation_time_signature_subpacket(self, subpacket):
403 return str(subpacket['signature-creation-time'])
405 def _str_issuer_signature_subpacket(self, subpacket):
406 return subpacket['issuer'][-8:].upper()
408 def _str_key_expiration_time_signature_subpacket(self, subpacket):
409 return str(subpacket['key-expiration-time'])
411 def _str_preferred_symmetric_algorithms_signature_subpacket(
414 algo for algo in subpacket['preferred-symmetric-algorithms'])
416 def _str_preferred_hash_algorithms_signature_subpacket(
419 algo for algo in subpacket['preferred-hash-algorithms'])
421 def _str_preferred_compression_algorithms_signature_subpacket(
424 algo for algo in subpacket['preferred-compression-algorithms'])
426 def _str_key_server_preferences_signature_subpacket(self, subpacket):
428 x for x in sorted(subpacket['key-server-preferences']))
430 def _str_primary_user_id_signature_subpacket(self, subpacket):
431 return str(subpacket['primary-user-id'])
433 def _str_key_flags_signature_subpacket(self, subpacket):
434 return ', '.join(x for x in sorted(subpacket['key-flags']))
436 def _str_features_signature_subpacket(self, subpacket):
437 return ', '.join(x for x in sorted(subpacket['features']))
439 def _str_embedded_signature_signature_subpacket(self, subpacket):
440 return subpacket['embedded']['signature-type']
442 def _str_user_id_packet(self):
445 def from_bytes(self, data):
446 offset = self._parse_header(data=data)
447 packet = data[offset:offset + self['length']]
448 if len(packet) < self['length']:
449 raise ValueError('packet too short ({} < {})'.format(
450 len(packet), self['length']))
451 offset += self['length']
452 method_name = '_parse_{}'.format(self._clean_type())
453 method = getattr(self, method_name, None)
455 raise NotImplementedError(
456 'cannot parse packet type {!r}'.format(self['type']))
458 self['raw'] = data[:offset]
461 def _parse_header(self, data):
464 always_one = packet_tag & 1 << 7
466 raise ValueError('most significant packet tag bit not set')
467 self['new-format'] = packet_tag & 1 << 6
468 if self['new-format']:
469 type_code = packet_tag & 0b111111
470 raise NotImplementedError('new-format packet length')
472 type_code = packet_tag >> 2 & 0b1111
473 self['length-type'] = packet_tag & 0b11
474 length_bytes, length_type = self._old_format_packet_length_type[
477 raise NotImplementedError(
478 'old-format packet of indeterminate length')
479 length_format = '>{}'.format(length_type)
480 length_data = data[offset: offset + length_bytes]
481 offset += length_bytes
482 self['length'] = _struct.unpack(length_format, length_data)[0]
483 self['type'] = self._packet_types[type_code]
487 def _parse_multiprecision_integer(data):
488 r"""Parse RFC 4880's multiprecision integers
490 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x01\x01')
492 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x09\x01\xff')
495 bits = _struct.unpack('>H', data[:2])[0]
497 length = (bits + 7) // 8
499 for i in range(length):
500 value += data[offset + i] * 1 << (8 * (length - i - 1))
502 return (offset, value)
505 def _decode_string_to_key_count(cls, data):
506 r"""Decode RFC 4880's string-to-key count
508 >>> PGPPacket._decode_string_to_key_count(b'\x97'[0])
511 return (16 + (data & 15)) << ((data >> 4) + cls._string_to_key_expbias)
513 def _parse_string_to_key_specifier(self, data):
514 self['string-to-key-type'] = self._string_to_key_types[data[0]]
516 if self['string-to-key-type'] == 'simple':
517 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
520 elif self['string-to-key-type'] == 'salted':
521 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
524 self['string-to-key-salt'] = data[offset: offset + 8]
526 elif self['string-to-key-type'] == 'iterated and salted':
527 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
530 self['string-to-key-salt'] = data[offset: offset + 8]
532 self['string-to-key-count'] = self._decode_string_to_key_count(
536 raise NotImplementedError(
537 'string-to-key type {}'.format(self['string-to-key-type']))
540 def _parse_public_key_packet(self, data):
541 self._parse_generic_public_key_packet(data=data)
543 def _parse_public_subkey_packet(self, data):
544 self._parse_generic_public_key_packet(data=data)
546 def _parse_generic_public_key_packet(self, data):
547 self['key-version'] = data[0]
549 if self['key-version'] != 4:
550 raise NotImplementedError(
551 'public (sub)key packet version {}'.format(
552 self['key-version']))
554 self['creation-time'], algorithm = _struct.unpack(
555 '>IB', data[offset: offset + length])
557 self['public-key-algorithm'] = self._public_key_algorithms[algorithm]
558 if self['public-key-algorithm'].startswith('rsa '):
559 o, self['public-modulus'] = self._parse_multiprecision_integer(
562 o, self['public-exponent'] = self._parse_multiprecision_integer(
565 elif self['public-key-algorithm'].startswith('dsa '):
566 o, self['prime'] = self._parse_multiprecision_integer(
569 o, self['group-order'] = self._parse_multiprecision_integer(
572 o, self['group-generator'] = self._parse_multiprecision_integer(
575 o, self['public-key'] = self._parse_multiprecision_integer(
578 elif self['public-key-algorithm'].startswith('elgamal '):
579 o, self['prime'] = self._parse_multiprecision_integer(
582 o, self['group-generator'] = self._parse_multiprecision_integer(
585 o, self['public-key'] = self._parse_multiprecision_integer(
589 raise NotImplementedError(
590 'algorithm-specific key fields for {}'.format(
591 self['public-key-algorithm']))
592 fingerprint = _hashlib.sha1()
594 self._serialize_signature_packet_target(target=self))
595 self['fingerprint'] = fingerprint.hexdigest()
598 def _parse_secret_key_packet(self, data):
599 self._parse_generic_secret_key_packet(data=data)
601 def _parse_secret_subkey_packet(self, data):
602 self._parse_generic_secret_key_packet(data=data)
604 def _parse_generic_secret_key_packet(self, data):
605 offset = self._parse_generic_public_key_packet(data=data)
606 string_to_key_usage = data[offset]
608 if string_to_key_usage in [255, 254]:
609 self['symmetric-encryption-algorithm'] = (
610 self._symmetric_key_algorithms[data[offset]])
612 offset += self._parse_string_to_key_specifier(data=data[offset:])
614 self['symmetric-encryption-algorithm'] = (
615 self._symmetric_key_algorithms[string_to_key_usage])
616 if string_to_key_usage:
617 block_size_bits = self._cipher_block_size.get(
618 self['symmetric-encryption-algorithm'], None)
619 if block_size_bits % 8:
620 raise NotImplementedError(
621 ('{}-bit block size for {} is not an integer number of bytes'
623 block_size_bits, self['symmetric-encryption-algorithm']))
624 block_size = block_size_bits // 8
626 raise NotImplementedError(
627 'unknown block size for {}'.format(
628 self['symmetric-encryption-algorithm']))
629 self['initial-vector'] = data[offset: offset + block_size]
631 ciphertext = data[offset:]
632 offset += len(ciphertext)
633 decrypted_data = self.decrypt_symmetric_encryption(data=ciphertext)
635 decrypted_data = data[offset:key_end]
636 if string_to_key_usage in [0, 255]:
638 elif string_to_key_usage == 254:
642 secret_key = decrypted_data[:key_end]
645 secret_key_checksum = decrypted_data[key_end:]
647 calculated_checksum = sum(secret_key) % 65536
649 checksum_hash = _hashlib.sha1()
650 checksum_hash.update(secret_key)
651 calculated_checksum = checksum_hash.digest()
652 if secret_key_checksum != calculated_checksum:
654 'corrupt secret key (checksum {} != expected {})'.format(
655 secret_key_checksum, calculated_checksum))
656 if self['public-key-algorithm'].startswith('rsa '):
657 o, self['secret-exponent'] = self._parse_multiprecision_integer(
658 secret_key[secret_offset:])
660 o, self['secret-prime-p'] = self._parse_multiprecision_integer(
661 secret_key[secret_offset:])
663 o, self['secret-prime-q'] = self._parse_multiprecision_integer(
664 secret_key[secret_offset:])
666 o, self['secret-inverse-of-p-mod-q'] = (
667 self._parse_multiprecision_integer(
668 secret_key[secret_offset:]))
670 elif self['public-key-algorithm'].startswith('dsa '):
671 o, self['secret-exponent'] = self._parse_multiprecision_integer(
672 secret_key[secret_offset:])
674 elif self['public-key-algorithm'].startswith('elgamal '):
675 o, self['secret-exponent'] = self._parse_multiprecision_integer(
676 secret_key[secret_offset:])
679 raise NotImplementedError(
680 'algorithm-specific key fields for {}'.format(
681 self['public-key-algorithm']))
682 if secret_offset != len(secret_key):
684 ('parsed {} out of {} bytes of algorithm-specific key fields '
686 secret_offset, len(secret_key),
687 self['public-key-algorithm']))
689 def _parse_signature_subpackets(self, data):
691 while offset < len(data):
692 o, subpacket = self._parse_signature_subpacket(data=data[offset:])
696 def _parse_signature_subpacket(self, data):
702 elif first >= 192 and first < 255:
703 second = data[offset]
705 length = ((first - 192) << 8) + second + 192
707 length = _struct.unpack(
708 '>I', data[offset: offset + 4])[0]
710 subpacket['type'] = self._signature_subpacket_types[data[offset]]
712 subpacket_data = data[offset: offset + length - 1]
713 offset += len(subpacket_data)
714 method_name = '_parse_{}_signature_subpacket'.format(
715 self._clean_type(type=subpacket['type']))
716 method = getattr(self, method_name, None)
718 raise NotImplementedError(
719 'cannot parse signature subpacket type {!r}'.format(
721 method(data=subpacket_data, subpacket=subpacket)
722 return (offset, subpacket)
724 def _parse_signature_packet(self, data):
725 self['signature-version'] = data[0]
727 if self['signature-version'] != 4:
728 raise NotImplementedError(
729 'signature packet version {}'.format(
730 self['signature-version']))
731 self['signature-type'] = self._signature_types[data[offset]]
733 self['public-key-algorithm'] = self._public_key_algorithms[
736 self['hash-algorithm'] = self._hash_algorithms[data[offset]]
738 hashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
740 self['hashed-subpackets'] = list(self._parse_signature_subpackets(
741 data[offset: offset + hashed_count]))
742 offset += hashed_count
743 unhashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
745 self['unhashed-subpackets'] = list(self._parse_signature_subpackets(
746 data=data[offset: offset + unhashed_count]))
747 offset += unhashed_count
748 self['signed-hash-word'] = data[offset: offset + 2]
750 self['signature'] = []
751 while offset < len(data):
752 o, mpi = self._parse_multiprecision_integer(data=data[offset:])
754 self['signature'].append(mpi)
755 if self.key.secret_packets:
756 packets = self.key.secret_packets
758 packets = self.key.public_packets
759 if self['signature-type'] == 'standalone':
760 self['target'] = None
761 elif self['signature-type'].endswith(' user id and public-key packet'):
764 [p for p in packets if p['type'] == 'user id packet'][-1],
766 elif self['signature-type'].endswith('key binding'):
769 [p for p in packets if p['type'] == 'public-subkey packet'][-1],
772 raise NotImplementedError(
773 'target for {}'.format(self['signature-type']))
776 def _parse_signature_creation_time_signature_subpacket(
777 self, data, subpacket):
778 subpacket['signature-creation-time'] = _struct.unpack('>I', data)[0]
780 def _parse_issuer_signature_subpacket(self, data, subpacket):
781 subpacket['issuer'] = byte_string(data=data, sep='')
783 def _parse_key_expiration_time_signature_subpacket(
784 self, data, subpacket):
785 subpacket['key-expiration-time'] = _struct.unpack('>I', data)[0]
787 def _parse_preferred_symmetric_algorithms_signature_subpacket(
788 self, data, subpacket):
789 subpacket['preferred-symmetric-algorithms'] = [
790 self._symmetric_key_algorithms[d] for d in data]
792 def _parse_preferred_hash_algorithms_signature_subpacket(
793 self, data, subpacket):
794 subpacket['preferred-hash-algorithms'] = [
795 self._hash_algorithms[d] for d in data]
797 def _parse_preferred_compression_algorithms_signature_subpacket(
798 self, data, subpacket):
799 subpacket['preferred-compression-algorithms'] = [
800 self._compression_algorithms[d] for d in data]
802 def _parse_key_server_preferences_signature_subpacket(
803 self, data, subpacket):
804 subpacket['key-server-preferences'] = set()
806 subpacket['key-server-preferences'].add('no-modify')
808 def _parse_primary_user_id_signature_subpacket(self, data, subpacket):
809 subpacket['primary-user-id'] = bool(data[0])
811 def _parse_key_flags_signature_subpacket(self, data, subpacket):
812 subpacket['key-flags'] = set()
814 subpacket['key-flags'].add('can certify')
816 subpacket['key-flags'].add('can sign')
818 subpacket['key-flags'].add('can encrypt communications')
820 subpacket['key-flags'].add('can encrypt storage')
822 subpacket['key-flags'].add('private split')
824 subpacket['key-flags'].add('can authenticate')
826 subpacket['key-flags'].add('private shared')
828 def _parse_features_signature_subpacket(self, data, subpacket):
829 subpacket['features'] = set()
831 subpacket['features'].add('modification detection')
833 def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
834 subpacket['embedded'] = PGPPacket(key=self.key)
835 subpacket['embedded']['type'] = 'signature packet'
836 subpacket['embedded']['embedded'] = True
837 subpacket['embedded']._parse_signature_packet(data=data)
838 subpacket['embedded']['raw'] = data
840 def _parse_user_id_packet(self, data):
841 self['user'] = str(data, 'utf-8')
844 body = self._serialize_body()
846 raise ValueError(method)
847 self['length'] = len(body)
849 self._serialize_header(),
853 def _serialize_header(self):
856 type_code = self._reverse(self._packet_types, self['type'])
858 always_one * (1 << 7) |
859 new_format * (1 << 6) |
860 type_code * (1 << 2) |
863 length_bytes, length_type = self._old_format_packet_length_type[
865 length_format = '>{}'.format(length_type)
866 length_data = _struct.pack(length_format, self['length'])
872 def _serialize_body(self):
873 method_name = '_serialize_{}'.format(self._clean_type())
874 method = getattr(self, method_name, None)
876 raise NotImplementedError(
877 'cannot serialize packet type {!r}'.format(self['type']))
881 def _serialize_multiprecision_integer(integer):
882 r"""Serialize RFC 4880's multipricision integers
884 >>> PGPPacket._serialize_multiprecision_integer(1)
886 >>> PGPPacket._serialize_multiprecision_integer(511)
889 bit_length = int(_math.log(integer, 2)) + 1
891 _struct.pack('>H', bit_length),
894 chunks.insert(1, bytes([integer & 0xff]))
895 integer = integer >> 8
896 return b''.join(chunks)
899 def _encode_string_to_key_count(cls, count):
900 r"""Encode RFC 4880's string-to-key count
902 >>> PGPPacket._encode_string_to_key_count(753664)
906 count = count >> cls._string_to_key_expbias
909 coded_count += 1 << 4
910 coded_count += count & 15
911 return bytes([coded_count])
913 def _serialize_string_to_key_specifier(self):
914 string_to_key_type = bytes([
916 self._string_to_key_types, self['string-to-key-type']),
918 chunks = [string_to_key_type]
919 if self['string-to-key-type'] == 'simple':
920 chunks.append(bytes([self._reverse(
921 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
922 elif self['string-to-key-type'] == 'salted':
923 chunks.append(bytes([self._reverse(
924 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
925 chunks.append(self['string-to-key-salt'])
926 elif self['string-to-key-type'] == 'iterated and salted':
927 chunks.append(bytes([self._reverse(
928 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
929 chunks.append(self['string-to-key-salt'])
930 chunks.append(self._encode_string_to_key_count(
931 count=self['string-to-key-count']))
933 raise NotImplementedError(
934 'string-to-key type {}'.format(self['string-to-key-type']))
936 return b''.join(chunks)
938 def _serialize_public_key_packet(self):
939 return self._serialize_generic_public_key_packet()
941 def _serialize_public_subkey_packet(self):
942 return self._serialize_generic_public_key_packet()
944 def _serialize_generic_public_key_packet(self):
945 key_version = bytes([self['key-version']])
946 chunks = [key_version]
947 if self['key-version'] != 4:
948 raise NotImplementedError(
949 'public (sub)key packet version {}'.format(
950 self['key-version']))
951 chunks.append(_struct.pack('>I', self['creation-time']))
952 chunks.append(bytes([self._reverse(
953 self._public_key_algorithms, self['public-key-algorithm'])]))
954 if self['public-key-algorithm'].startswith('rsa '):
955 chunks.append(self._serialize_multiprecision_integer(
956 self['public-modulus']))
957 chunks.append(self._serialize_multiprecision_integer(
958 self['public-exponent']))
959 elif self['public-key-algorithm'].startswith('dsa '):
960 chunks.append(self._serialize_multiprecision_integer(
962 chunks.append(self._serialize_multiprecision_integer(
963 self['group-order']))
964 chunks.append(self._serialize_multiprecision_integer(
965 self['group-generator']))
966 chunks.append(self._serialize_multiprecision_integer(
968 elif self['public-key-algorithm'].startswith('elgamal '):
969 chunks.append(self._serialize_multiprecision_integer(
971 chunks.append(self._serialize_multiprecision_integer(
972 self['group-generator']))
973 chunks.append(self._serialize_multiprecision_integer(
976 raise NotImplementedError(
977 'algorithm-specific key fields for {}'.format(
978 self['public-key-algorithm']))
979 return b''.join(chunks)
981 def _serialize_signature_subpackets(self, subpackets):
983 self._serialize_signature_subpacket(subpacket=subpacket)
984 for subpacket in subpackets)
986 def _serialize_signature_subpacket(self, subpacket):
987 method_name = '_serialize_{}_signature_subpacket'.format(
988 self._clean_type(type=subpacket['type']))
989 method = getattr(self, method_name, None)
991 raise NotImplementedError(
992 'cannot serialize signature subpacket type {!r}'.format(
994 body = method(subpacket=subpacket)
995 length = len(body) + 1
998 chunks.append(bytes([length]))
1000 first = ((length - 192) >> 8) + 192
1004 (length - 192) % 256,
1007 chunks.append(_struct.pack('>I', length))
1008 chunks.append(bytes([self._reverse(
1009 self._signature_subpacket_types, subpacket['type'])]))
1011 return b''.join(chunks)
1013 def _serialize_signature_packet_target(self, target):
1016 elif isinstance(target, bytes):
1018 elif isinstance(target, PGPPacket):
1019 if target['type'] in [
1020 'public-subkey packet',
1021 'secret-key packet',
1022 'secret-subkey packet',
1024 target = target.copy()
1025 target['type'] = 'public-key packet'
1026 serialized = target._serialize_body()
1027 if target['type'] in [
1028 'public-key packet',
1029 'public-subkey packet'
1030 'secret-key packet',
1031 'secret-subkey packet'
1033 serialized = b''.join([
1035 _struct.pack('>H', len(serialized)),
1038 elif target['type'] == 'user id packet':
1039 serialized = b''.join([
1041 _struct.pack('>I', len(serialized)),
1044 elif target['type'] == 'user attribute packet':
1045 serialized = b''.join([
1047 _struct.pack('>I', len(serialized)),
1051 elif isinstance(target, list):
1053 self._serialize_signature_packet_target(target=x)
1056 def _serialize_hashed_signature_packet(self):
1057 if self['signature-version'] != 4:
1058 raise NotImplementedError(
1059 'signature packet version {}'.format(
1060 self['signature-version']))
1061 chunks = [bytes([self['signature-version']])]
1062 chunks.append(bytes([self._reverse(
1063 self._signature_types, self['signature-type'])]))
1064 chunks.append(bytes([self._reverse(
1065 self._public_key_algorithms, self['public-key-algorithm'])]))
1066 chunks.append(bytes([self._reverse(
1067 self._hash_algorithms, self['hash-algorithm'])]))
1068 hashed_subpackets = self._serialize_signature_subpackets(
1069 self['hashed-subpackets'])
1070 chunks.append(_struct.pack('>H', len(hashed_subpackets)))
1071 chunks.append(hashed_subpackets)
1072 return b''.join(chunks)
1074 def _signature_packet_signed_data(self, hashed_signature_data):
1075 target = self._serialize_signature_packet_target(target=self['target'])
1078 hashed_signature_data,
1079 bytes([self['signature-version']]),
1081 _struct.pack('>I', len(hashed_signature_data)),
1084 def _serialize_signature_packet(self):
1085 hashed_signature_data = self._serialize_hashed_signature_packet()
1086 chunks = [hashed_signature_data]
1087 unhashed_subpackets = self._serialize_signature_subpackets(
1088 self['unhashed-subpackets'])
1089 chunks.append(_struct.pack('>H', len(unhashed_subpackets)))
1090 chunks.append(unhashed_subpackets)
1091 signed_data = self._signature_packet_signed_data(
1092 hashed_signature_data=hashed_signature_data)
1093 digest, signature = self.key.sign(
1094 data=signed_data, hash_algorithm=self['hash-algorithm'],
1095 signature_algorithm=self['public-key-algorithm'])
1096 chunks.append(digest[:2])
1098 self._serialize_multiprecision_integer(integer=integer)
1099 for integer in signature)
1100 return b''.join(chunks)
1102 def _serialize_signature_creation_time_signature_subpacket(
1104 return _struct.pack('>I', subpacket['signature-creation-time'])
1106 def _serialize_issuer_signature_subpacket(self, subpacket):
1107 return string_bytes(data=subpacket['issuer'], sep='')
1109 def _serialize_key_expiration_time_signature_subpacket(self, subpacket):
1110 return _struct.pack('>I', subpacket['key-expiration-time'])
1112 def _serialize_preferred_symmetric_algorithms_signature_subpacket(
1115 self._reverse(self._symmetric_key_algorithms, a)
1116 for a in subpacket['preferred-symmetric-algorithms'])
1118 def _serialize_preferred_hash_algorithms_signature_subpacket(
1121 self._reverse(self._hash_algorithms, a)
1122 for a in subpacket['preferred-hash-algorithms'])
1124 def _serialize_preferred_compression_algorithms_signature_subpacket(
1127 self._reverse(self._compression_algorithms, a)
1128 for a in subpacket['preferred-compression-algorithms'])
1130 def _serialize_key_server_preferences_signature_subpacket(self, subpacket):
1132 0x80 * ('no-modify' in subpacket['key-server-preferences']) |
1136 def _serialize_primary_user_id_signature_subpacket(self, subpacket):
1137 return bytes([0x1 * subpacket['primary-user-id']])
1139 def _serialize_key_flags_signature_subpacket(self, subpacket):
1141 0x1 * ('can certify' in subpacket['key-flags']) |
1142 0x2 * ('can sign' in subpacket['key-flags']) |
1143 0x4 * ('can encrypt communications' in subpacket['key-flags']) |
1144 0x8 * ('can encrypt storage' in subpacket['key-flags']) |
1145 0x10 * ('private split' in subpacket['key-flags']) |
1146 0x20 * ('can authenticate' in subpacket['key-flags']) |
1147 0x80 * ('private shated' in subpacket['key-flags']) |
1151 def _serialize_features_signature_subpacket(self, subpacket):
1153 0x1 * ('modification detection' in subpacket['features']) |
1157 def _serialize_embedded_signature_signature_subpacket(self, subpacket):
1158 return subpacket['embedded'].to_bytes()
1160 def _serialize_user_id_packet(self):
1161 return self['user'].encode('utf-8')
1163 def _string_to_key(self, string, key_size):
1166 '{}-bit key is not an integer number of bytes'.format(
1168 key_size_bytes = key_size // 8
1169 hash_name = self._hashlib_name[
1170 self['string-to-key-hash-algorithm']]
1171 string_hash = _hashlib.new(hash_name)
1172 hashes = _math.ceil(key_size_bytes / string_hash.digest_size)
1174 if self['string-to-key-type'] == 'simple':
1175 update_bytes = string
1176 elif self['string-to-key-type'] in [
1178 'iterated and salted',
1180 update_bytes = self['string-to-key-salt'] + string
1181 if self['string-to-key-type'] == 'iterated and salted':
1182 count = self['string-to-key-count']
1183 if count < len(update_bytes):
1184 count = len(update_bytes)
1186 raise NotImplementedError(
1187 'key calculation for string-to-key type {}'.format(
1188 self['string-to-key-type']))
1189 for padding in range(hashes):
1190 string_hash = _hashlib.new(hash_name)
1191 string_hash.update(padding * b'\x00')
1192 if self['string-to-key-type'] in [
1196 string_hash.update(update_bytes)
1197 elif self['string-to-key-type'] == 'iterated and salted':
1199 while remaining > 0:
1200 string_hash.update(update_bytes[:remaining])
1201 remaining -= len(update_bytes)
1202 key += string_hash.digest()
1203 key = key[:key_size_bytes]
1206 def decrypt_symmetric_encryption(self, data):
1207 """Decrypt OpenPGP's Cipher Feedback mode"""
1208 algorithm = self['symmetric-encryption-algorithm']
1209 module = self._crypto_module[algorithm]
1210 key_size = self._key_size[algorithm]
1211 segment_size_bits = self._cipher_block_size[algorithm]
1212 if segment_size_bits % 8:
1213 raise NotImplementedError(
1214 ('{}-bit segment size for {} is not an integer number of bytes'
1215 ).format(segment_size_bits, algorithm))
1216 segment_size_bytes = segment_size_bits // 8
1217 padding = segment_size_bytes - len(data) % segment_size_bytes
1219 data += b'\x00' * padding
1220 if self.key and self.key._cache_passphrase and self.key._passphrase:
1221 passphrase = self.key._passphrase
1223 passphrase = _getpass.getpass(
1224 'passphrase for {}: '.format(self['fingerprint'][-8:]))
1225 passphrase = passphrase.encode('ascii')
1226 if self.key and self.key._cache_passphrase:
1227 self.key._passphrase = passphrase
1228 key = self._string_to_key(string=passphrase, key_size=key_size)
1229 cipher = module.new(
1231 mode=module.MODE_CFB,
1232 IV=self['initial-vector'],
1233 segment_size=segment_size_bits)
1234 plaintext = cipher.decrypt(data)
1236 plaintext = plaintext[:-padding]
1239 def check_roundtrip(self):
1240 serialized = self.to_bytes()
1241 source = self['raw']
1242 if serialized != source:
1243 if len(serialized) != len(source):
1245 ('serialized {} is {} bytes long, '
1246 'but input is {} bytes long').format(
1247 self['type'], len(serialized), len(source)))
1249 for i in range(0, len(source), 8):
1250 in_chunk = source[i: i + chunk_size]
1251 out_chunk = serialized[i: i + chunk_size]
1252 if in_chunk != out_chunk:
1254 ('serialized {} differs from input packet: '
1255 'at byte {}, {} != {}').format(
1256 self['type'], i, byte_string(data=out_chunk),
1257 byte_string(data=in_chunk)))
1260 if self['type'] != 'signature packet':
1261 raise NotImplmentedError('verify {}'.format(self['type']))
1262 hashed_signature_data = self._serialize_hashed_signature_packet()
1263 signed_data = self._signature_packet_signed_data(
1264 hashed_signature_data=hashed_signature_data)
1266 subpackets = self['hashed-subpackets'] + self['unhashed-subpackets']
1267 issuer_subpackets = [p for p in subpackets if p['type'] == 'issuer']
1268 if issuer_subpackets:
1269 issuer = issuer_subpackets[0]
1270 packets = (self.key.public_packets or []) + (
1271 self.key.secret_packets or [])
1272 keys = [k for k in packets
1273 if k.get('fingerprint', '').endswith(issuer['issuer'])]
1275 key_packet = keys[-1]
1277 LOG.info('no packet found for issuer {}'.format(
1278 issuer['issuer'][-8:].upper()))
1280 LOG.debug('verify {} with {}'.format(
1281 self['signature-type'],
1282 key_packet['fingerprint'][-8:].upper()))
1283 verified = self.key.verify(
1284 data=signed_data, signature=self['signature'],
1285 hash_algorithm=self['hash-algorithm'],
1286 signature_algorithm=self['public-key-algorithm'],
1287 key_packet=key_packet, digest_check=self['signed-hash-word'])
1289 raise ValueError('verification failed for {}'.format(self))
1291 LOG.debug('verified {}'.format(self['signature-type']))
1294 class PGPKey (object):
1295 """An OpenPGP key with public and private parts.
1299 OpenPGP users may transfer public keys. The essential elements
1300 of a transferable public key are as follows:
1302 - One Public-Key packet
1303 - Zero or more revocation signatures
1304 - One or more User ID packets
1305 - After each User ID packet, zero or more Signature packets
1307 - Zero or more User Attribute packets
1308 - After each User Attribute packet, zero or more Signature
1309 packets (certifications)
1310 - Zero or more Subkey packets
1311 - After each Subkey packet, one Signature packet, plus
1312 optionally a revocation
1314 Secret keys have a similar packet stream [2]:
1316 OpenPGP users may transfer secret keys. The format of a
1317 transferable secret key is the same as a transferable public key
1318 except that secret-key and secret-subkey packets are used
1319 instead of the public key and public-subkey packets.
1320 Implementations SHOULD include self-signatures on any user IDs
1321 and subkeys, as this allows for a complete public key to be
1322 automatically extracted from the transferable secret key.
1323 Implementations MAY choose to omit the self-signatures,
1324 especially if a transferable public key accompanies the
1325 transferable secret key.
1327 [1]: http://tools.ietf.org/search/rfc4880#section-11.1
1328 [2]: http://tools.ietf.org/search/rfc4880#section-11.2
1330 def __init__(self, fingerprint, cache_passphrase=False):
1331 self.fingerprint = fingerprint
1332 self._cache_passphrase = cache_passphrase
1333 self._passphrase = None
1334 self.public_packets = None
1335 self.secret_packets = None
1338 lines = ['key: {}'.format(self.fingerprint)]
1339 if self.public_packets:
1340 lines.append(' public:')
1341 for packet in self.public_packets:
1342 lines.extend(self._str_packet(packet=packet, prefix=' '))
1343 if self.secret_packets:
1344 lines.append(' secret:')
1345 for packet in self.secret_packets:
1346 lines.extend(self._str_packet(packet=packet, prefix=' '))
1347 return '\n'.join(lines)
1349 def _str_packet(self, packet, prefix):
1350 lines = str(packet).split('\n')
1351 return [prefix + line for line in lines]
1353 def import_from_gpg(self):
1354 key_export = _get_stdout(
1355 ['gpg', '--export', self.fingerprint])
1356 self.public_packets = []
1357 self._packets_from_bytes(list=self.public_packets, data=key_export)
1358 if self.public_packets[0]['type'] != 'public-key packet':
1360 '{} does not start with a public-key packet'.format(
1362 key_secret_export = _get_stdout(
1363 ['gpg', '--export-secret-keys', self.fingerprint])
1364 self.secret_packets = []
1365 self._packets_from_bytes(list=self.secret_packets, data=key_secret_export)
1366 if self.secret_packets[0]['type'] != 'secret-key packet':
1368 '{} does not start with a secret-key packet'.format(
1370 for packet in self.public_packets + self.secret_packets:
1371 packet.check_roundtrip()
1373 def _packets_from_bytes(self, list, data):
1375 while offset < len(data):
1376 packet = PGPPacket(key=self)
1377 offset += packet.from_bytes(data=data[offset:])
1380 def export_to_gpg(self):
1381 raise NotImplemetedError('export to gpg')
1383 def import_from_key(self, key):
1384 """Migrate the (sub)keys into this key"""
1387 def _get_signer(self, signature_algorithm=None, key_packet=None,
1389 if key_packet is None:
1391 key_packet = self.secret_packets[0]
1393 key_packet = self.public_packets[0]
1395 if 'secret' not in key_packet['type']:
1397 '{} is not a secret key'.format(key_packet['type']))
1398 if signature_algorithm is None:
1399 signature_algorithm = key_packet['public-key-algorithm']
1400 if signature_algorithm != key_packet['public-key-algorithm']:
1402 'cannot act on a {} signature with a {} key'.format(
1403 signature_algorithm, key_packet['public-key-algorithm']))
1404 module = key_packet._crypto_module[signature_algorithm]
1405 if signature_algorithm.startswith('rsa '):
1406 key = module.construct((
1407 key_packet['public-modulus'], # n
1408 key_packet['public-exponent'], # e
1412 key.d = key_packet['secret-exponent']
1413 key.p = key_packet['secret-prime-p']
1414 key.q = key_packet['secret-prime-q']
1415 key.u = key_packet['secret-inverse-of-p-mod-q']
1416 signer = _crypto_signature_pkcs1_v1_5.new(key)
1417 elif signature_algorithm.startswith('dsa '):
1418 signer = module.construct((
1419 key_packet['public-key'], # y
1420 key_packet['group-generator'], # g
1421 key_packet['prime'], # p
1422 key_packet['group-order'], # q
1425 signer.x = key_packet['secret-exponent']
1427 raise NotImplementedError(
1428 'construct {}'.format(signature_algorithm))
1429 return (key_packet, signer)
1431 def _hash(self, data, hash_algorithm, key_packet):
1432 hash_name = key_packet._hashlib_name[hash_algorithm]
1433 data_hash = _hashlib.new(hash_name)
1434 data_hash.update(data)
1437 def verify(self, data, signature, hash_algorithm, signature_algorithm=None,
1438 key_packet=None, digest_check=None):
1439 key_packet, signer = self._get_signer(
1440 signature_algorithm=signature_algorithm, key_packet=key_packet)
1441 data_hash = self._hash(
1442 data=data, hash_algorithm=hash_algorithm, key_packet=key_packet)
1443 digest = data_hash.digest()
1444 hexdigest = data_hash.hexdigest()
1445 if digest_check and not digest.startswith(digest_check):
1447 'corrupted hash: {} does not start with {}'.format(
1448 byte_string(digest),
1449 byte_string(digest_check)))
1450 if signature_algorithm.startswith('rsa '):
1451 sig_hex = '{:x}'.format(signature[0])
1452 signature = string_bytes(data=sig_hex, sep='')
1453 elif signature_algorithm.startswith('dsa '):
1455 LOG.debug('verify signature {} on {} with {}'.format(
1456 signature, hexdigest, signer))
1457 return signer.verify(data_hash, signature)
1460 def migrate(old_key, new_key, cache_passphrase=False):
1461 """Add the old key and sub-keys to the new key
1463 For example, to upgrade your master key, while preserving old
1464 signatures you'd made. You will lose signature *on* your old key
1465 though, since sub-keys can't be signed (I don't think).
1467 old_key = PGPKey(fingerprint=old_key, cache_passphrase=cache_passphrase)
1468 old_key.import_from_gpg()
1469 new_key = PGPKey(fingerprint=new_key, cache_passphrase=cache_passphrase)
1470 new_key.import_from_gpg()
1471 new_key.import_from_key(key=old_key)
1477 if __name__ == '__main__':
1480 old_key, new_key = _sys.argv[1:3]
1481 migrate(old_key=old_key, new_key=new_key, cache_passphrase=True)