3 import getpass as _getpass
4 import hashlib as _hashlib
7 import subprocess as _subprocess
8 import struct as _struct
10 import Crypto.Cipher.AES as _crypto_cipher_aes
11 import Crypto.Cipher.Blowfish as _crypto_cipher_blowfish
12 import Crypto.Cipher.CAST as _crypto_cipher_cast
13 import Crypto.Cipher.DES3 as _crypto_cipher_des3
14 import Crypto.PublicKey.DSA as _crypto_publickey_dsa
15 import Crypto.PublicKey.ElGamal as _crypto_publickey_elgamal
16 import Crypto.PublicKey.RSA as _crypto_publickey_rsa
19 def _get_stdout(args, stdin=None):
22 stdin_pipe = _subprocess.PIPE
23 p = _subprocess.Popen(args, stdin=stdin_pipe, stdout=_subprocess.PIPE)
24 stdout, stderr = p.communicate(stdin)
27 raise RuntimeError(status)
31 def byte_string(data, sep=' '):
32 r"""Convert a byte-string to human readable form
34 >>> byte_string(b'\x12\x34\x56')
37 return sep.join('{:02x}'.format(byte) for byte in data)
40 def string_bytes(data, sep=' '):
41 r"""Reverse byte_string()
43 >>> string_bytes('12 fa fb')
47 int(c1+c2, base=16) for c1,c2 in
48 zip(data[::2 + len(sep)], data[1::2 + len(sep)]))
51 class PGPPacket (dict):
52 # http://tools.ietf.org/search/rfc4880
53 _old_format_packet_length_type = { # type: (bytes, struct type)
54 0: (1, 'B'), # 1-byte unsigned integer
55 1: (2, 'H'), # 2-byte unsigned integer
56 2: (4, 'I'), # 4-byte unsigned integer
62 1: 'public-key encrypted session key packet',
63 2: 'signature packet',
64 3: 'symmetric-key encrypted session key packet',
65 4: 'one-pass signature packet',
66 5: 'secret-key packet',
67 6: 'public-key packet',
68 7: 'secret-subkey packet',
69 8: 'compressed data packet',
70 9: 'symmetrically encrypted data packet',
72 11: 'literal data packet',
75 14: 'public-subkey packet',
76 17: 'user attribute packet',
77 18: 'sym. encrypted and integrity protected data packet',
78 19: 'modification detection code packet',
85 _public_key_algorithms = {
86 1: 'rsa (encrypt or sign)',
87 2: 'rsa encrypt-only',
89 16: 'elgamal (encrypt-only)',
90 17: 'dsa (digital signature algorithm)',
91 18: 'reserved for elliptic curve',
92 19: 'reserved for ecdsa',
93 20: 'reserved (formerly elgamal encrypt or sign)',
94 21: 'reserved for diffie-hellman',
108 _symmetric_key_algorithms = {
109 0: 'plaintext or unencrypted data',
116 7: 'aes with 128-bit key',
117 8: 'aes with 192-bit key',
118 9: 'aes with 256-bit key',
133 _cipher_block_size = { # in bits
134 'aes with 128-bit key': 128,
135 'aes with 192-bit key': 128,
136 'aes with 256-bit key': 128,
142 # symmetric-key encryption
143 'aes with 128-bit key': _crypto_cipher_aes,
144 'aes with 192-bit key': _crypto_cipher_aes,
145 'aes with 256-bit key': _crypto_cipher_aes,
146 'blowfish': _crypto_cipher_blowfish,
147 'cast5': _crypto_cipher_cast,
148 'tripledes': _crypto_cipher_des3,
149 # public-key encryption
150 'dsa (digital signature algorithm)': _crypto_publickey_dsa,
151 'elgamal (encrypt-only)': _crypto_publickey_elgamal,
152 'rsa (encrypt or sign)': _crypto_publickey_rsa,
153 'rsa encrypt-only': _crypto_publickey_rsa,
154 'rsa sign-only': _crypto_publickey_rsa,
157 _key_size = { # in bits
158 'aes with 128-bit key': 128,
159 'aes with 192-bit key': 192,
160 'aes with 256-bit key': 256,
164 _compression_algorithms = {
207 _hashlib_name = { # map OpenPGP-based names to hashlib names
210 'ripe-md/160': 'ripemd160',
217 _string_to_key_types = {
221 3: 'iterated and salted',
235 _string_to_key_expbias = 6
238 0x00: 'binary document',
239 0x01: 'canonical text document',
241 0x10: 'generic user id and public-key packet',
242 0x11: 'persona user id and public-key packet',
243 0x12: 'casual user id and public-key packet',
244 0x13: 'postitive user id and public-key packet',
245 0x18: 'subkey binding',
246 0x19: 'primary key binding',
248 0x20: 'key revocation',
249 0x28: 'subkey revocation',
250 0x30: 'certification revocation',
252 0x50: 'third-party confirmation',
255 _signature_subpacket_types = {
258 2: 'signature creation time',
259 3: 'signature expiration time',
260 4: 'exportable certification',
261 5: 'trust signature',
262 6: 'regular expression',
265 9: 'key expiration time',
266 10: 'placeholder for backward compatibility',
267 11: 'preferred symmetric algorithms',
268 12: 'revocation key',
277 21: 'preferred hash algorithms',
278 22: 'preferred compression algorithms',
279 23: 'key server preferences',
280 24: 'preferred key server',
281 25: 'primary user id',
284 28: 'signer user id',
285 29: 'reason for revocation',
287 31: 'signature target',
288 32: 'embedded signature',
302 _clean_type_regex = _re.compile('\W+')
304 def __init__(self, key=None):
305 super(PGPPacket, self).__init__()
308 def _clean_type(self, type=None):
311 return self._clean_type_regex.sub('_', type)
314 def _reverse(dict, value):
315 """Reverse lookups in dictionaries
317 >>> PGPPacket._reverse(PGPPacket._packet_types, 'public-key packet')
320 return [k for k,v in dict.items() if v == value][0]
323 packet = PGPPacket(key=self.key)
328 method_name = '_str_{}'.format(self._clean_type())
329 method = getattr(self, method_name, None)
333 return '{}: {}'.format(self['type'], details)
335 def _str_public_key_packet(self):
336 return self._str_generic_key_packet()
338 def _str_public_subkey_packet(self):
339 return self._str_generic_key_packet()
341 def _str_generic_key_packet(self):
342 return self['fingerprint'][-8:].upper()
344 def _str_secret_key_packet(self):
345 return self._str_generic_secret_key_packet()
347 def _str_secret_subkey_packet(self):
348 return self._str_generic_secret_key_packet()
350 def _str_generic_secret_key_packet(self):
351 lines = [self._str_generic_key_packet()]
353 ('symmetric encryption',
354 'symmetric-encryption-algorithm'),
355 ('s2k hash', 'string-to-key-hash-algorithm'),
356 ('s2k count', 'string-to-key-count'),
357 ('s2k salt', 'string-to-key-salt'),
358 ('IV', 'initial-vector'),
362 if isinstance(value, bytes):
363 value = byte_string(data=value)
364 lines.append(' {}: {}'.format(label, value))
365 return '\n'.join(lines)
367 def _str_signature_packet(self):
368 lines = [self['signature-type']]
369 if self['hashed-subpackets']:
370 lines.append(' hashed subpackets:')
371 lines.extend(self._str_signature_subpackets(
372 self['hashed-subpackets'], prefix=' '))
373 if self['unhashed-subpackets']:
374 lines.append(' unhashed subpackets:')
375 lines.extend(self._str_signature_subpackets(
376 self['unhashed-subpackets'], prefix=' '))
377 return '\n'.join(lines)
379 def _str_signature_subpackets(self, subpackets, prefix):
381 for subpacket in subpackets:
382 method_name = '_str_{}_signature_subpacket'.format(
383 self._clean_type(type=subpacket['type']))
384 method = getattr(self, method_name, None)
386 lines.append(' {}: {}'.format(
388 method(subpacket=subpacket)))
390 lines.append(' {}'.format(subpacket['type']))
393 def _str_signature_creation_time_signature_subpacket(self, subpacket):
394 return str(subpacket['signature-creation-time'])
396 def _str_issuer_signature_subpacket(self, subpacket):
397 return subpacket['issuer'][-8:].upper()
399 def _str_key_expiration_time_signature_subpacket(self, subpacket):
400 return str(subpacket['key-expiration-time'])
402 def _str_preferred_symmetric_algorithms_signature_subpacket(
405 algo for algo in subpacket['preferred-symmetric-algorithms'])
407 def _str_preferred_hash_algorithms_signature_subpacket(
410 algo for algo in subpacket['preferred-hash-algorithms'])
412 def _str_preferred_compression_algorithms_signature_subpacket(
415 algo for algo in subpacket['preferred-compression-algorithms'])
417 def _str_key_server_preferences_signature_subpacket(self, subpacket):
419 x for x in sorted(subpacket['key-server-preferences']))
421 def _str_primary_user_id_signature_subpacket(self, subpacket):
422 return str(subpacket['primary-user-id'])
424 def _str_key_flags_signature_subpacket(self, subpacket):
425 return ', '.join(x for x in sorted(subpacket['key-flags']))
427 def _str_features_signature_subpacket(self, subpacket):
428 return ', '.join(x for x in sorted(subpacket['features']))
430 def _str_embedded_signature_signature_subpacket(self, subpacket):
431 return subpacket['embedded']['signature-type']
433 def _str_user_id_packet(self):
436 def from_bytes(self, data):
437 offset = self._parse_header(data=data)
438 packet = data[offset:offset + self['length']]
439 if len(packet) < self['length']:
440 raise ValueError('packet too short ({} < {})'.format(
441 len(packet), self['length']))
442 offset += self['length']
443 method_name = '_parse_{}'.format(self._clean_type())
444 method = getattr(self, method_name, None)
446 raise NotImplementedError(
447 'cannot parse packet type {!r}'.format(self['type']))
449 self['raw'] = data[:offset]
452 def _parse_header(self, data):
455 always_one = packet_tag & 1 << 7
457 raise ValueError('most significant packet tag bit not set')
458 self['new-format'] = packet_tag & 1 << 6
459 if self['new-format']:
460 type_code = packet_tag & 0b111111
461 raise NotImplementedError('new-format packet length')
463 type_code = packet_tag >> 2 & 0b1111
464 self['length-type'] = packet_tag & 0b11
465 length_bytes, length_type = self._old_format_packet_length_type[
468 raise NotImplementedError(
469 'old-format packet of indeterminate length')
470 length_format = '>{}'.format(length_type)
471 length_data = data[offset: offset + length_bytes]
472 offset += length_bytes
473 self['length'] = _struct.unpack(length_format, length_data)[0]
474 self['type'] = self._packet_types[type_code]
478 def _parse_multiprecision_integer(data):
479 r"""Parse RFC 4880's multiprecision integers
481 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x01\x01')
483 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x09\x01\xff')
486 bits = _struct.unpack('>H', data[:2])[0]
488 length = (bits + 7) // 8
490 for i in range(length):
491 value += data[offset + i] * 1 << (8 * (length - i - 1))
493 return (offset, value)
496 def _decode_string_to_key_count(cls, data):
497 r"""Decode RFC 4880's string-to-key count
499 >>> PGPPacket._decode_string_to_key_count(b'\x97'[0])
502 return (16 + (data & 15)) << ((data >> 4) + cls._string_to_key_expbias)
504 def _parse_string_to_key_specifier(self, data):
505 self['string-to-key-type'] = self._string_to_key_types[data[0]]
507 if self['string-to-key-type'] == 'simple':
508 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
511 elif self['string-to-key-type'] == 'salted':
512 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
515 self['string-to-key-salt'] = data[offset: offset + 8]
517 elif self['string-to-key-type'] == 'iterated and salted':
518 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
521 self['string-to-key-salt'] = data[offset: offset + 8]
523 self['string-to-key-count'] = self._decode_string_to_key_count(
527 raise NotImplementedError(
528 'string-to-key type {}'.format(self['string-to-key-type']))
531 def _parse_public_key_packet(self, data):
532 self._parse_generic_public_key_packet(data=data)
534 def _parse_public_subkey_packet(self, data):
535 self._parse_generic_public_key_packet(data=data)
537 def _parse_generic_public_key_packet(self, data):
538 self['key-version'] = data[0]
540 if self['key-version'] != 4:
541 raise NotImplementedError(
542 'public (sub)key packet version {}'.format(
543 self['key-version']))
545 self['creation-time'], algorithm = _struct.unpack(
546 '>IB', data[offset: offset + length])
548 self['public-key-algorithm'] = self._public_key_algorithms[algorithm]
549 if self['public-key-algorithm'].startswith('rsa '):
550 o, self['public-modulus'] = self._parse_multiprecision_integer(
553 o, self['public-exponent'] = self._parse_multiprecision_integer(
556 elif self['public-key-algorithm'].startswith('dsa '):
557 o, self['prime'] = self._parse_multiprecision_integer(
560 o, self['group-order'] = self._parse_multiprecision_integer(
563 o, self['group-generator'] = self._parse_multiprecision_integer(
566 o, self['public-key'] = self._parse_multiprecision_integer(
569 elif self['public-key-algorithm'].startswith('elgamal '):
570 o, self['prime'] = self._parse_multiprecision_integer(
573 o, self['group-generator'] = self._parse_multiprecision_integer(
576 o, self['public-key'] = self._parse_multiprecision_integer(
580 raise NotImplementedError(
581 'algorithm-specific key fields for {}'.format(
582 self['public-key-algorithm']))
583 fingerprint = _hashlib.sha1()
585 self._serialize_signature_packet_target(target=self))
586 self['fingerprint'] = fingerprint.hexdigest()
589 def _parse_secret_key_packet(self, data):
590 self._parse_generic_secret_key_packet(data=data)
592 def _parse_secret_subkey_packet(self, data):
593 self._parse_generic_secret_key_packet(data=data)
595 def _parse_generic_secret_key_packet(self, data):
596 offset = self._parse_generic_public_key_packet(data=data)
597 string_to_key_usage = data[offset]
599 if string_to_key_usage in [255, 254]:
600 self['symmetric-encryption-algorithm'] = (
601 self._symmetric_key_algorithms[data[offset]])
603 offset += self._parse_string_to_key_specifier(data=data[offset:])
605 self['symmetric-encryption-algorithm'] = (
606 self._symmetric_key_algorithms[string_to_key_usage])
607 if string_to_key_usage:
608 block_size_bits = self._cipher_block_size.get(
609 self['symmetric-encryption-algorithm'], None)
610 if block_size_bits % 8:
611 raise NotImplementedError(
612 ('{}-bit block size for {} is not an integer number of bytes'
614 block_size_bits, self['symmetric-encryption-algorithm']))
615 block_size = block_size_bits // 8
617 raise NotImplementedError(
618 'unknown block size for {}'.format(
619 self['symmetric-encryption-algorithm']))
620 self['initial-vector'] = data[offset: offset + block_size]
622 ciphertext = data[offset:]
623 offset += len(ciphertext)
624 decrypted_data = self.decrypt_symmetric_encryption(data=ciphertext)
626 decrypted_data = data[offset:key_end]
627 if string_to_key_usage in [0, 255]:
629 elif string_to_key_usage == 254:
633 secret_key = decrypted_data[:key_end]
636 secret_key_checksum = decrypted_data[key_end:]
638 calculated_checksum = sum(secret_key) % 65536
640 checksum_hash = _hashlib.sha1()
641 checksum_hash.update(secret_key)
642 calculated_checksum = checksum_hash.digest()
643 if secret_key_checksum != calculated_checksum:
645 'corrupt secret key (checksum {} != expected {})'.format(
646 secret_key_checksum, calculated_checksum))
647 if self['public-key-algorithm'].startswith('rsa '):
648 o, self['secret-exponent'] = self._parse_multiprecision_integer(
649 secret_key[secret_offset:])
651 o, self['secret-prime-p'] = self._parse_multiprecision_integer(
652 secret_key[secret_offset:])
654 o, self['secret-prime-q'] = self._parse_multiprecision_integer(
655 secret_key[secret_offset:])
657 o, self['secret-inverse-of-p-mod-q'] = (
658 self._parse_multiprecision_integer(
659 secret_key[secret_offset:]))
661 elif self['public-key-algorithm'].startswith('dsa '):
662 o, self['secret-exponent'] = self._parse_multiprecision_integer(
663 secret_key[secret_offset:])
665 elif self['public-key-algorithm'].startswith('elgamal '):
666 o, self['secret-exponent'] = self._parse_multiprecision_integer(
667 secret_key[secret_offset:])
670 raise NotImplementedError(
671 'algorithm-specific key fields for {}'.format(
672 self['public-key-algorithm']))
673 if secret_offset != len(secret_key):
675 ('parsed {} out of {} bytes of algorithm-specific key fields '
677 secret_offset, len(secret_key),
678 self['public-key-algorithm']))
680 def _parse_signature_subpackets(self, data):
682 while offset < len(data):
683 o, subpacket = self._parse_signature_subpacket(data=data[offset:])
687 def _parse_signature_subpacket(self, data):
693 elif first >= 192 and first < 255:
694 second = data[offset]
696 length = ((first - 192) << 8) + second + 192
698 length = _struct.unpack(
699 '>I', data[offset: offset + 4])[0]
701 subpacket['type'] = self._signature_subpacket_types[data[offset]]
703 subpacket_data = data[offset: offset + length - 1]
704 offset += len(subpacket_data)
705 method_name = '_parse_{}_signature_subpacket'.format(
706 self._clean_type(type=subpacket['type']))
707 method = getattr(self, method_name, None)
709 raise NotImplementedError(
710 'cannot parse signature subpacket type {!r}'.format(
712 method(data=subpacket_data, subpacket=subpacket)
713 return (offset, subpacket)
715 def _parse_signature_packet(self, data):
716 self['signature-version'] = data[0]
718 if self['signature-version'] != 4:
719 raise NotImplementedError(
720 'signature packet version {}'.format(
721 self['signature-version']))
722 self['signature-type'] = self._signature_types[data[offset]]
724 self['public-key-algorithm'] = self._public_key_algorithms[
727 self['hash-algorithm'] = self._hash_algorithms[data[offset]]
729 hashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
731 self['hashed-subpackets'] = list(self._parse_signature_subpackets(
732 data[offset: offset + hashed_count]))
733 offset += hashed_count
734 unhashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
736 self['unhashed-subpackets'] = list(self._parse_signature_subpackets(
737 data=data[offset: offset + unhashed_count]))
738 offset += unhashed_count
739 self['signed-hash-word'] = data[offset: offset + 2]
741 self['signature'] = []
742 while offset < len(data):
743 o, mpi = self._parse_multiprecision_integer(data=data[offset:])
745 self['signature'].append(mpi)
746 if self['signature-type'] == 'standalone':
747 self['target'] = None
748 elif self['signature-type'].endswith(' user id and public-key packet'):
750 [p for p in self.key.public_packets if p['type'] == 'public-key packet'][-1],
751 [p for p in self.key.public_packets if p['type'] == 'user id packet'][-1],
754 raise NotImplementedError(
755 'target for {}'.format(self['signature-type']))
757 def _parse_signature_creation_time_signature_subpacket(
758 self, data, subpacket):
759 subpacket['signature-creation-time'] = _struct.unpack('>I', data)[0]
761 def _parse_issuer_signature_subpacket(self, data, subpacket):
762 subpacket['issuer'] = byte_string(data=data, sep='')
764 def _parse_key_expiration_time_signature_subpacket(
765 self, data, subpacket):
766 subpacket['key-expiration-time'] = _struct.unpack('>I', data)[0]
768 def _parse_preferred_symmetric_algorithms_signature_subpacket(
769 self, data, subpacket):
770 subpacket['preferred-symmetric-algorithms'] = [
771 self._symmetric_key_algorithms[d] for d in data]
773 def _parse_preferred_hash_algorithms_signature_subpacket(
774 self, data, subpacket):
775 subpacket['preferred-hash-algorithms'] = [
776 self._hash_algorithms[d] for d in data]
778 def _parse_preferred_compression_algorithms_signature_subpacket(
779 self, data, subpacket):
780 subpacket['preferred-compression-algorithms'] = [
781 self._compression_algorithms[d] for d in data]
783 def _parse_key_server_preferences_signature_subpacket(
784 self, data, subpacket):
785 subpacket['key-server-preferences'] = set()
787 subpacket['key-server-preferences'].add('no-modify')
789 def _parse_primary_user_id_signature_subpacket(self, data, subpacket):
790 subpacket['primary-user-id'] = bool(data[0])
792 def _parse_key_flags_signature_subpacket(self, data, subpacket):
793 subpacket['key-flags'] = set()
795 subpacket['key-flags'].add('can certify')
797 subpacket['key-flags'].add('can sign')
799 subpacket['key-flags'].add('can encrypt communications')
801 subpacket['key-flags'].add('can encrypt storage')
803 subpacket['key-flags'].add('private split')
805 subpacket['key-flags'].add('can authenticate')
807 subpacket['key-flags'].add('private shared')
809 def _parse_features_signature_subpacket(self, data, subpacket):
810 subpacket['features'] = set()
812 subpacket['features'].add('modification detection')
814 def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
815 subpacket['embedded'] = PGPPacket(key=self.key)
816 subpacket['embedded']['type'] = 'signature packet'
817 subpacket['embedded']['embedded'] = True
818 subpacket['embedded']._parse_signature_packet(data=data)
819 subpacket['embedded']['raw'] = data
821 def _parse_user_id_packet(self, data):
822 self['user'] = str(data, 'utf-8')
825 body = self._serialize_body()
827 raise ValueError(method)
828 self['length'] = len(body)
830 self._serialize_header(),
834 def _serialize_header(self):
837 type_code = self._reverse(self._packet_types, self['type'])
839 always_one * (1 << 7) |
840 new_format * (1 << 6) |
841 type_code * (1 << 2) |
844 length_bytes, length_type = self._old_format_packet_length_type[
846 length_format = '>{}'.format(length_type)
847 length_data = _struct.pack(length_format, self['length'])
853 def _serialize_body(self):
854 method_name = '_serialize_{}'.format(self._clean_type())
855 method = getattr(self, method_name, None)
857 raise NotImplementedError(
858 'cannot serialize packet type {!r}'.format(self['type']))
862 def _serialize_multiprecision_integer(integer):
863 r"""Serialize RFC 4880's multipricision integers
865 >>> PGPPacket._serialize_multiprecision_integer(1)
867 >>> PGPPacket._serialize_multiprecision_integer(511)
870 bit_length = int(_math.log(integer, 2)) + 1
872 _struct.pack('>H', bit_length),
875 chunks.insert(1, bytes([integer & 0xff]))
876 integer = integer >> 8
877 return b''.join(chunks)
880 def _encode_string_to_key_count(cls, count):
881 r"""Encode RFC 4880's string-to-key count
883 >>> PGPPacket._encode_string_to_key_count(753664)
887 count = count >> cls._string_to_key_expbias
890 coded_count += 1 << 4
891 coded_count += count & 15
892 return bytes([coded_count])
894 def _serialize_string_to_key_specifier(self):
895 string_to_key_type = bytes([
897 self._string_to_key_types, self['string-to-key-type']),
899 chunks = [string_to_key_type]
900 if self['string-to-key-type'] == 'simple':
901 chunks.append(bytes([self._reverse(
902 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
903 elif self['string-to-key-type'] == 'salted':
904 chunks.append(bytes([self._reverse(
905 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
906 chunks.append(self['string-to-key-salt'])
907 elif self['string-to-key-type'] == 'iterated and salted':
908 chunks.append(bytes([self._reverse(
909 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
910 chunks.append(self['string-to-key-salt'])
911 chunks.append(self._encode_string_to_key_count(
912 count=self['string-to-key-count']))
914 raise NotImplementedError(
915 'string-to-key type {}'.format(self['string-to-key-type']))
917 return b''.join(chunks)
919 def _serialize_public_key_packet(self):
920 return self._serialize_generic_public_key_packet()
922 def _serialize_public_subkey_packet(self):
923 return self._serialize_generic_public_key_packet()
925 def _serialize_generic_public_key_packet(self):
926 key_version = bytes([self['key-version']])
927 chunks = [key_version]
928 if self['key-version'] != 4:
929 raise NotImplementedError(
930 'public (sub)key packet version {}'.format(
931 self['key-version']))
932 chunks.append(_struct.pack('>I', self['creation-time']))
933 chunks.append(bytes([self._reverse(
934 self._public_key_algorithms, self['public-key-algorithm'])]))
935 if self['public-key-algorithm'].startswith('rsa '):
936 chunks.append(self._serialize_multiprecision_integer(
937 self['public-modulus']))
938 chunks.append(self._serialize_multiprecision_integer(
939 self['public-exponent']))
940 elif self['public-key-algorithm'].startswith('dsa '):
941 chunks.append(self._serialize_multiprecision_integer(
943 chunks.append(self._serialize_multiprecision_integer(
944 self['group-order']))
945 chunks.append(self._serialize_multiprecision_integer(
946 self['group-generator']))
947 chunks.append(self._serialize_multiprecision_integer(
949 elif self['public-key-algorithm'].startswith('elgamal '):
950 chunks.append(self._serialize_multiprecision_integer(
952 chunks.append(self._serialize_multiprecision_integer(
953 self['group-generator']))
954 chunks.append(self._serialize_multiprecision_integer(
957 raise NotImplementedError(
958 'algorithm-specific key fields for {}'.format(
959 self['public-key-algorithm']))
960 return b''.join(chunks)
962 def _serialize_signature_subpackets(self, subpackets):
964 self._serialize_signature_subpacket(subpacket=subpacket)
965 for subpacket in subpackets)
967 def _serialize_signature_subpacket(self, subpacket):
968 method_name = '_serialize_{}_signature_subpacket'.format(
969 self._clean_type(type=subpacket['type']))
970 method = getattr(self, method_name, None)
972 raise NotImplementedError(
973 'cannot serialize signature subpacket type {!r}'.format(
975 body = method(subpacket=subpacket)
976 length = len(body) + 1
979 chunks.append(bytes([length]))
981 first = ((length - 192) >> 8) + 192
985 (length - 192) % 256,
988 chunks.append(_struct.pack('>I', length))
989 chunks.append(bytes([self._reverse(
990 self._signature_subpacket_types, subpacket['type'])]))
992 return b''.join(chunks)
994 def _serialize_signature_packet_target(self, target):
997 elif isinstance(target, bytes):
999 elif isinstance(target, PGPPacket):
1000 if target['type'] in [
1001 'public-subkey packet',
1002 'secret-key packet',
1003 'secret-subkey packet',
1005 target = target.copy()
1006 target['type'] = 'public-key packet'
1007 serialized = target._serialize_body()
1008 if target['type'] in [
1009 'public-key packet',
1010 'public-subkey packet'
1011 'secret-key packet',
1012 'secret-subkey packet'
1014 serialized = b''.join([
1016 _struct.pack('>H', len(serialized)),
1019 elif target['type'] == 'user id packet':
1020 serialized = b''.join([
1022 _struct.pack('>I', len(serialized)),
1025 elif target['type'] == 'user attribute packet':
1026 serialized = b''.join([
1028 _struct.pack('>I', len(serialized)),
1032 elif isinstance(target, list):
1034 self._serialize_signature_packet_target(target=x)
1037 def _serialize_hashed_signature_packet(self):
1038 if self['signature-version'] != 4:
1039 raise NotImplementedError(
1040 'signature packet version {}'.format(
1041 self['signature-version']))
1042 chunks = [bytes([self['signature-version']])]
1043 chunks.append(bytes([self._reverse(
1044 self._signature_types, self['signature-type'])]))
1045 chunks.append(bytes([self._reverse(
1046 self._public_key_algorithms, self['public-key-algorithm'])]))
1047 chunks.append(bytes([self._reverse(
1048 self._hash_algorithms, self['hash-algorithm'])]))
1049 hashed_subpackets = self._serialize_signature_subpackets(
1050 self['hashed-subpackets'])
1051 chunks.append(_struct.pack('>H', len(hashed_subpackets)))
1052 chunks.append(hashed_subpackets)
1053 return b''.join(chunks)
1055 def _signature_packet_signed_data(self, hashed_signature_data):
1056 target = self._serialize_signature_packet_target(target=self['target'])
1059 hashed_signature_data,
1060 bytes([self['signature-version']]),
1062 _struct.pack('>I', len(hashed_signature_data)),
1065 def _serialize_signature_packet(self):
1066 hashed_signature_data = self._serialize_hashed_signature_packet()
1067 chunks = [hashed_signature_data]
1068 unhashed_subpackets = self._serialize_signature_subpackets(
1069 self['unhashed-subpackets'])
1070 chunks.append(_struct.pack('>H', len(unhashed_subpackets)))
1071 chunks.append(unhashed_subpackets)
1072 signed_data = self._signature_packet_signed_data(
1073 hashed_signature_data=hashed_signature_data)
1074 digest, signature = self.key.sign(
1075 data=signed_data, hash_algorithm=self['hash-algorithm'],
1076 signature_algorithm=self['public-key-algorithm'])
1077 chunks.append(digest[:2])
1079 self._serialize_multiprecision_integer(integer=integer)
1080 for integer in signature)
1081 return b''.join(chunks)
1083 def _serialize_signature_creation_time_signature_subpacket(
1085 return _struct.pack('>I', subpacket['signature-creation-time'])
1087 def _serialize_issuer_signature_subpacket(self, subpacket):
1088 return string_bytes(data=subpacket['issuer'], sep='')
1090 def _serialize_key_expiration_time_signature_subpacket(self, subpacket):
1091 return _struct.pack('>I', subpacket['key-expiration-time'])
1093 def _serialize_preferred_symmetric_algorithms_signature_subpacket(
1096 self._reverse(self._symmetric_key_algorithms, a)
1097 for a in subpacket['preferred-symmetric-algorithms'])
1099 def _serialize_preferred_hash_algorithms_signature_subpacket(
1102 self._reverse(self._hash_algorithms, a)
1103 for a in subpacket['preferred-hash-algorithms'])
1105 def _serialize_preferred_compression_algorithms_signature_subpacket(
1108 self._reverse(self._compression_algorithms, a)
1109 for a in subpacket['preferred-compression-algorithms'])
1111 def _serialize_key_server_preferences_signature_subpacket(self, subpacket):
1113 0x80 * ('no-modify' in subpacket['key-server-preferences']) |
1117 def _serialize_primary_user_id_signature_subpacket(self, subpacket):
1118 return bytes([0x1 * subpacket['primary-user-id']])
1120 def _serialize_key_flags_signature_subpacket(self, subpacket):
1122 0x1 * ('can certify' in subpacket['key-flags']) |
1123 0x2 * ('can sign' in subpacket['key-flags']) |
1124 0x4 * ('can encrypt communications' in subpacket['key-flags']) |
1125 0x8 * ('can encrypt storage' in subpacket['key-flags']) |
1126 0x10 * ('private split' in subpacket['key-flags']) |
1127 0x20 * ('can authenticate' in subpacket['key-flags']) |
1128 0x80 * ('private shated' in subpacket['key-flags']) |
1132 def _serialize_features_signature_subpacket(self, subpacket):
1134 0x1 * ('modification detection' in subpacket['features']) |
1138 def _serialize_embedded_signature_signature_subpacket(self, subpacket):
1139 return subpacket['embedded'].to_bytes()
1141 def _serialize_user_id_packet(self):
1142 return self['user'].encode('utf-8')
1144 def _string_to_key(self, string, key_size):
1147 '{}-bit key is not an integer number of bytes'.format(
1149 key_size_bytes = key_size // 8
1150 hash_name = self._hashlib_name[
1151 self['string-to-key-hash-algorithm']]
1152 string_hash = _hashlib.new(hash_name)
1153 hashes = _math.ceil(key_size_bytes / string_hash.digest_size)
1155 if self['string-to-key-type'] == 'simple':
1156 update_bytes = string
1157 elif self['string-to-key-type'] in [
1159 'iterated and salted',
1161 update_bytes = self['string-to-key-salt'] + string
1162 if self['string-to-key-type'] == 'iterated and salted':
1163 count = self['string-to-key-count']
1164 if count < len(update_bytes):
1165 count = len(update_bytes)
1167 raise NotImplementedError(
1168 'key calculation for string-to-key type {}'.format(
1169 self['string-to-key-type']))
1170 for padding in range(hashes):
1171 string_hash = _hashlib.new(hash_name)
1172 string_hash.update(padding * b'\x00')
1173 if self['string-to-key-type'] in [
1177 string_hash.update(update_bytes)
1178 elif self['string-to-key-type'] == 'iterated and salted':
1180 while remaining > 0:
1181 string_hash.update(update_bytes[:remaining])
1182 remaining -= len(update_bytes)
1183 key += string_hash.digest()
1184 key = key[:key_size_bytes]
1187 def decrypt_symmetric_encryption(self, data):
1188 """Decrypt OpenPGP's Cipher Feedback mode"""
1189 algorithm = self['symmetric-encryption-algorithm']
1190 module = self._crypto_module[algorithm]
1191 key_size = self._key_size[algorithm]
1192 segment_size_bits = self._cipher_block_size[algorithm]
1193 if segment_size_bits % 8:
1194 raise NotImplementedError(
1195 ('{}-bit segment size for {} is not an integer number of bytes'
1196 ).format(segment_size_bits, algorithm))
1197 segment_size_bytes = segment_size_bits // 8
1198 padding = segment_size_bytes - len(data) % segment_size_bytes
1200 data += b'\x00' * padding
1201 if self.key and self.key._cache_passphrase and self.key._passphrase:
1202 passphrase = self.key._passphrase
1204 passphrase = _getpass.getpass(
1205 'passphrase for {}: '.format(self['fingerprint'][-8:]))
1206 passphrase = passphrase.encode('ascii')
1207 if self.key and self.key._cache_passphrase:
1208 self.key._passphrase = passphrase
1209 key = self._string_to_key(string=passphrase, key_size=key_size)
1210 cipher = module.new(
1212 mode=module.MODE_CFB,
1213 IV=self['initial-vector'],
1214 segment_size=segment_size_bits)
1215 plaintext = cipher.decrypt(data)
1217 plaintext = plaintext[:-padding]
1220 def check_roundtrip(self):
1221 serialized = self.to_bytes()
1222 source = self['raw']
1223 if serialized != source:
1224 if len(serialized) != len(source):
1226 ('serialized {} is {} bytes long, '
1227 'but input is {} bytes long').format(
1228 self['type'], len(serialized), len(source)))
1230 for i in range(0, len(source), 8):
1231 in_chunk = source[i: i + chunk_size]
1232 out_chunk = serialized[i: i + chunk_size]
1233 if in_chunk != out_chunk:
1235 ('serialized {} differs from input packet: '
1236 'at byte {}, {} != {}').format(
1237 self['type'], i, byte_string(data=out_chunk),
1238 byte_string(data=in_chunk)))
1241 class PGPKey (object):
1242 """An OpenPGP key with public and private parts.
1246 OpenPGP users may transfer public keys. The essential elements
1247 of a transferable public key are as follows:
1249 - One Public-Key packet
1250 - Zero or more revocation signatures
1251 - One or more User ID packets
1252 - After each User ID packet, zero or more Signature packets
1254 - Zero or more User Attribute packets
1255 - After each User Attribute packet, zero or more Signature
1256 packets (certifications)
1257 - Zero or more Subkey packets
1258 - After each Subkey packet, one Signature packet, plus
1259 optionally a revocation
1261 Secret keys have a similar packet stream [2]:
1263 OpenPGP users may transfer secret keys. The format of a
1264 transferable secret key is the same as a transferable public key
1265 except that secret-key and secret-subkey packets are used
1266 instead of the public key and public-subkey packets.
1267 Implementations SHOULD include self-signatures on any user IDs
1268 and subkeys, as this allows for a complete public key to be
1269 automatically extracted from the transferable secret key.
1270 Implementations MAY choose to omit the self-signatures,
1271 especially if a transferable public key accompanies the
1272 transferable secret key.
1274 [1]: http://tools.ietf.org/search/rfc4880#section-11.1
1275 [2]: http://tools.ietf.org/search/rfc4880#section-11.2
1277 def __init__(self, fingerprint, cache_passphrase=False):
1278 self.fingerprint = fingerprint
1279 self._cache_passphrase = cache_passphrase
1280 self._passphrase = None
1281 self.public_packets = None
1282 self.secret_packets = None
1285 lines = ['key: {}'.format(self.fingerprint)]
1286 if self.public_packets:
1287 lines.append(' public:')
1288 for packet in self.public_packets:
1289 lines.extend(self._str_packet(packet=packet, prefix=' '))
1290 if self.secret_packets:
1291 lines.append(' secret:')
1292 for packet in self.secret_packets:
1293 lines.extend(self._str_packet(packet=packet, prefix=' '))
1294 return '\n'.join(lines)
1296 def _str_packet(self, packet, prefix):
1297 lines = str(packet).split('\n')
1298 return [prefix + line for line in lines]
1300 def import_from_gpg(self):
1301 key_export = _get_stdout(
1302 ['gpg', '--export', self.fingerprint])
1303 self.public_packets = []
1304 self._packets_from_bytes(list=self.public_packets, data=key_export)
1305 if self.public_packets[0]['type'] != 'public-key packet':
1307 '{} does not start with a public-key packet'.format(
1309 key_secret_export = _get_stdout(
1310 ['gpg', '--export-secret-keys', self.fingerprint])
1311 self.secret_packets = []
1312 self._packets_from_bytes(list=self.secret_packets, data=key_secret_export)
1313 if self.secret_packets[0]['type'] != 'secret-key packet':
1315 '{} does not start with a secret-key packet'.format(
1317 for packet in self.public_packets + self.secret_packets:
1318 packet.check_roundtrip()
1320 def _packets_from_bytes(self, list, data):
1322 while offset < len(data):
1323 packet = PGPPacket(key=self)
1324 offset += packet.from_bytes(data=data[offset:])
1327 def export_to_gpg(self):
1328 raise NotImplemetedError('export to gpg')
1330 def import_from_key(self, key):
1331 """Migrate the (sub)keys into this key"""
1335 def migrate(old_key, new_key, cache_passphrase=False):
1336 """Add the old key and sub-keys to the new key
1338 For example, to upgrade your master key, while preserving old
1339 signatures you'd made. You will lose signature *on* your old key
1340 though, since sub-keys can't be signed (I don't think).
1342 old_key = PGPKey(fingerprint=old_key, cache_passphrase=cache_passphrase)
1343 old_key.import_from_gpg()
1344 new_key = PGPKey(fingerprint=new_key, cache_passphrase=cache_passphrase)
1345 new_key.import_from_gpg()
1346 new_key.import_from_key(key=old_key)
1352 if __name__ == '__main__':
1355 old_key, new_key = _sys.argv[1:3]
1356 migrate(old_key=old_key, new_key=new_key, cache_passphrase=True)