3 import getpass as _getpass
4 import hashlib as _hashlib
7 import subprocess as _subprocess
8 import struct as _struct
10 import Crypto.Cipher.AES as _crypto_cipher_aes
11 import Crypto.Cipher.Blowfish as _crypto_cipher_blowfish
12 import Crypto.Cipher.CAST as _crypto_cipher_cast
13 import Crypto.Cipher.DES3 as _crypto_cipher_des3
16 def _get_stdout(args, stdin=None):
19 stdin_pipe = _subprocess.PIPE
20 p = _subprocess.Popen(args, stdin=stdin_pipe, stdout=_subprocess.PIPE)
21 stdout, stderr = p.communicate(stdin)
24 raise RuntimeError(status)
28 def byte_string(data, sep=' '):
29 r"""Convert a byte-string to human readable form
31 >>> byte_string(b'\x12\x34\x56')
34 return sep.join('{:02x}'.format(byte) for byte in data)
37 def string_bytes(data, sep=' '):
38 r"""Reverse byte_string()
40 >>> string_bytes('12 fa fb')
44 int(c1+c2, base=16) for c1,c2 in
45 zip(data[::2 + len(sep)], data[1::2 + len(sep)]))
48 class PGPPacket (dict):
49 # http://tools.ietf.org/search/rfc4880
50 _old_format_packet_length_type = { # type: (bytes, struct type)
51 0: (1, 'B'), # 1-byte unsigned integer
52 1: (2, 'H'), # 2-byte unsigned integer
53 2: (4, 'I'), # 4-byte unsigned integer
59 1: 'public-key encrypted session key packet',
60 2: 'signature packet',
61 3: 'symmetric-key encrypted session key packet',
62 4: 'one-pass signature packet',
63 5: 'secret-key packet',
64 6: 'public-key packet',
65 7: 'secret-subkey packet',
66 8: 'compressed data packet',
67 9: 'symmetrically encrypted data packet',
69 11: 'literal data packet',
72 14: 'public-subkey packet',
73 17: 'user attribute packet',
74 18: 'sym. encrypted and integrity protected data packet',
75 19: 'modification detection code packet',
82 _public_key_algorithms = {
83 1: 'rsa (encrypt or sign)',
84 2: 'rsa encrypt-only',
86 16: 'elgamal (encrypt-only)',
87 17: 'dsa (digital signature algorithm)',
88 18: 'reserved for elliptic curve',
89 19: 'reserved for ecdsa',
90 20: 'reserved (formerly elgamal encrypt or sign)',
91 21: 'reserved for diffie-hellman',
105 _symmetric_key_algorithms = {
106 0: 'plaintext or unencrypted data',
113 7: 'aes with 128-bit key',
114 8: 'aes with 192-bit key',
115 9: 'aes with 256-bit key',
130 _cipher_block_size = { # in bits
131 'aes with 128-bit key': 128,
132 'aes with 192-bit key': 128,
133 'aes with 256-bit key': 128,
139 'aes with 128-bit key': _crypto_cipher_aes,
140 'aes with 192-bit key': _crypto_cipher_aes,
141 'aes with 256-bit key': _crypto_cipher_aes,
142 'blowfish': _crypto_cipher_blowfish,
143 'cast5': _crypto_cipher_cast,
144 'tripledes': _crypto_cipher_des3,
147 _key_size = { # in bits
148 'aes with 128-bit key': 128,
149 'aes with 192-bit key': 192,
150 'aes with 256-bit key': 256,
154 _compression_algorithms = {
197 _hashlib_name = { # map OpenPGP-based names to hashlib names
200 'ripe-md/160': 'ripemd160',
207 _string_to_key_types = {
211 3: 'iterated and salted',
225 _string_to_key_expbias = 6
228 0x00: 'binary document',
229 0x01: 'canonical text document',
231 0x10: 'generic user id and public-key packet',
232 0x11: 'persona user id and public-key packet',
233 0x12: 'casual user id and public-key packet',
234 0x13: 'postitive user id and public-key packet',
235 0x18: 'subkey binding',
236 0x19: 'primary key binding',
238 0x20: 'key revocation',
239 0x28: 'subkey revocation',
240 0x30: 'certification revocation',
242 0x50: 'third-party confirmation',
245 _signature_subpacket_types = {
248 2: 'signature creation time',
249 3: 'signature expiration time',
250 4: 'exportable certification',
251 5: 'trust signature',
252 6: 'regular expression',
255 9: 'key expiration time',
256 10: 'placeholder for backward compatibility',
257 11: 'preferred symmetric algorithms',
258 12: 'revocation key',
267 21: 'preferred hash algorithms',
268 22: 'preferred compression algorithms',
269 23: 'key server preferences',
270 24: 'preferred key server',
271 25: 'primary user id',
274 28: 'signer user id',
275 29: 'reason for revocation',
277 31: 'signature target',
278 32: 'embedded signature',
292 _clean_type_regex = _re.compile('\W+')
294 def __init__(self, key=None):
295 super(PGPPacket, self).__init__()
298 def _clean_type(self, type=None):
301 return self._clean_type_regex.sub('_', type)
304 def _reverse(dict, value):
305 """Reverse lookups in dictionaries
307 >>> PGPPacket._reverse(PGPPacket._packet_types, 'public-key packet')
310 return [k for k,v in dict.items() if v == value][0]
313 packet = PGPPacket(key=self.key)
318 method_name = '_str_{}'.format(self._clean_type())
319 method = getattr(self, method_name, None)
323 return '{}: {}'.format(self['type'], details)
325 def _str_public_key_packet(self):
326 return self._str_generic_key_packet()
328 def _str_public_subkey_packet(self):
329 return self._str_generic_key_packet()
331 def _str_generic_key_packet(self):
332 return self['fingerprint'][-8:].upper()
334 def _str_secret_key_packet(self):
335 return self._str_generic_secret_key_packet()
337 def _str_secret_subkey_packet(self):
338 return self._str_generic_secret_key_packet()
340 def _str_generic_secret_key_packet(self):
341 lines = [self._str_generic_key_packet()]
343 ('symmetric encryption',
344 'symmetric-encryption-algorithm'),
345 ('s2k hash', 'string-to-key-hash-algorithm'),
346 ('s2k count', 'string-to-key-count'),
347 ('s2k salt', 'string-to-key-salt'),
348 ('IV', 'initial-vector'),
352 if isinstance(value, bytes):
353 value = byte_string(data=value)
354 lines.append(' {}: {}'.format(label, value))
355 return '\n'.join(lines)
357 def _str_signature_packet(self):
358 lines = [self['signature-type']]
359 if self['hashed-subpackets']:
360 lines.append(' hashed subpackets:')
361 lines.extend(self._str_signature_subpackets(
362 self['hashed-subpackets'], prefix=' '))
363 if self['unhashed-subpackets']:
364 lines.append(' unhashed subpackets:')
365 lines.extend(self._str_signature_subpackets(
366 self['unhashed-subpackets'], prefix=' '))
367 return '\n'.join(lines)
369 def _str_signature_subpackets(self, subpackets, prefix):
371 for subpacket in subpackets:
372 method_name = '_str_{}_signature_subpacket'.format(
373 self._clean_type(type=subpacket['type']))
374 method = getattr(self, method_name, None)
376 lines.append(' {}: {}'.format(
378 method(subpacket=subpacket)))
380 lines.append(' {}'.format(subpacket['type']))
383 def _str_signature_creation_time_signature_subpacket(self, subpacket):
384 return str(subpacket['signature-creation-time'])
386 def _str_issuer_signature_subpacket(self, subpacket):
387 return subpacket['issuer'][-8:].upper()
389 def _str_key_expiration_time_signature_subpacket(self, subpacket):
390 return str(subpacket['key-expiration-time'])
392 def _str_preferred_symmetric_algorithms_signature_subpacket(
395 algo for algo in subpacket['preferred-symmetric-algorithms'])
397 def _str_preferred_hash_algorithms_signature_subpacket(
400 algo for algo in subpacket['preferred-hash-algorithms'])
402 def _str_preferred_compression_algorithms_signature_subpacket(
405 algo for algo in subpacket['preferred-compression-algorithms'])
407 def _str_key_server_preferences_signature_subpacket(self, subpacket):
409 x for x in sorted(subpacket['key-server-preferences']))
411 def _str_primary_user_id_signature_subpacket(self, subpacket):
412 return str(subpacket['primary-user-id'])
414 def _str_key_flags_signature_subpacket(self, subpacket):
415 return ', '.join(x for x in sorted(subpacket['key-flags']))
417 def _str_features_signature_subpacket(self, subpacket):
418 return ', '.join(x for x in sorted(subpacket['features']))
420 def _str_embedded_signature_signature_subpacket(self, subpacket):
421 return subpacket['embedded']['signature-type']
423 def _str_user_id_packet(self):
426 def from_bytes(self, data):
427 offset = self._parse_header(data=data)
428 packet = data[offset:offset + self['length']]
429 if len(packet) < self['length']:
430 raise ValueError('packet too short ({} < {})'.format(
431 len(packet), self['length']))
432 offset += self['length']
433 method_name = '_parse_{}'.format(self._clean_type())
434 method = getattr(self, method_name, None)
436 raise NotImplementedError(
437 'cannot parse packet type {!r}'.format(self['type']))
439 self['raw'] = data[:offset]
442 def _parse_header(self, data):
445 always_one = packet_tag & 1 << 7
447 raise ValueError('most significant packet tag bit not set')
448 self['new-format'] = packet_tag & 1 << 6
449 if self['new-format']:
450 type_code = packet_tag & 0b111111
451 raise NotImplementedError('new-format packet length')
453 type_code = packet_tag >> 2 & 0b1111
454 self['length-type'] = packet_tag & 0b11
455 length_bytes, length_type = self._old_format_packet_length_type[
458 raise NotImplementedError(
459 'old-format packet of indeterminate length')
460 length_format = '>{}'.format(length_type)
461 length_data = data[offset: offset + length_bytes]
462 offset += length_bytes
463 self['length'] = _struct.unpack(length_format, length_data)[0]
464 self['type'] = self._packet_types[type_code]
468 def _parse_multiprecision_integer(data):
469 r"""Parse RFC 4880's multiprecision integers
471 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x01\x01')
473 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x09\x01\xff')
476 bits = _struct.unpack('>H', data[:2])[0]
478 length = (bits + 7) // 8
480 for i in range(length):
481 value += data[offset + i] * 1 << (8 * (length - i - 1))
483 return (offset, value)
486 def _decode_string_to_key_count(cls, data):
487 r"""Decode RFC 4880's string-to-key count
489 >>> PGPPacket._decode_string_to_key_count(b'\x97'[0])
492 return (16 + (data & 15)) << ((data >> 4) + cls._string_to_key_expbias)
494 def _parse_string_to_key_specifier(self, data):
495 self['string-to-key-type'] = self._string_to_key_types[data[0]]
497 if self['string-to-key-type'] == 'simple':
498 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
501 elif self['string-to-key-type'] == 'salted':
502 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
505 self['string-to-key-salt'] = data[offset: offset + 8]
507 elif self['string-to-key-type'] == 'iterated and salted':
508 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
511 self['string-to-key-salt'] = data[offset: offset + 8]
513 self['string-to-key-count'] = self._decode_string_to_key_count(
517 raise NotImplementedError(
518 'string-to-key type {}'.format(self['string-to-key-type']))
521 def _parse_public_key_packet(self, data):
522 self._parse_generic_public_key_packet(data=data)
524 def _parse_public_subkey_packet(self, data):
525 self._parse_generic_public_key_packet(data=data)
527 def _parse_generic_public_key_packet(self, data):
528 self['key-version'] = data[0]
530 if self['key-version'] != 4:
531 raise NotImplementedError(
532 'public (sub)key packet version {}'.format(
533 self['key-version']))
535 self['creation-time'], algorithm = _struct.unpack(
536 '>IB', data[offset: offset + length])
538 self['public-key-algorithm'] = self._public_key_algorithms[algorithm]
539 if self['public-key-algorithm'].startswith('rsa '):
540 o, self['public-modulus'] = self._parse_multiprecision_integer(
543 o, self['public-exponent'] = self._parse_multiprecision_integer(
546 elif self['public-key-algorithm'].startswith('dsa '):
547 o, self['prime'] = self._parse_multiprecision_integer(
550 o, self['group-order'] = self._parse_multiprecision_integer(
553 o, self['group-generator'] = self._parse_multiprecision_integer(
556 o, self['public-key'] = self._parse_multiprecision_integer(
559 elif self['public-key-algorithm'].startswith('elgamal '):
560 o, self['prime'] = self._parse_multiprecision_integer(
563 o, self['group-generator'] = self._parse_multiprecision_integer(
566 o, self['public-key'] = self._parse_multiprecision_integer(
570 raise NotImplementedError(
571 'algorithm-specific key fields for {}'.format(
572 self['public-key-algorithm']))
573 fingerprint = _hashlib.sha1()
574 fingerprint_target = self
575 if self['type'] != 'public-key packet':
576 fingerprint_target = self.copy()
577 fingerprint_target['type'] = 'public-key packet'
579 self._serialize_signature_packet_target(target=fingerprint_target))
580 self['fingerprint'] = fingerprint.hexdigest()
583 def _parse_secret_key_packet(self, data):
584 self._parse_generic_secret_key_packet(data=data)
586 def _parse_secret_subkey_packet(self, data):
587 self._parse_generic_secret_key_packet(data=data)
589 def _parse_generic_secret_key_packet(self, data):
590 offset = self._parse_generic_public_key_packet(data=data)
591 string_to_key_usage = data[offset]
593 if string_to_key_usage in [255, 254]:
594 self['symmetric-encryption-algorithm'] = (
595 self._symmetric_key_algorithms[data[offset]])
597 offset += self._parse_string_to_key_specifier(data=data[offset:])
599 self['symmetric-encryption-algorithm'] = (
600 self._symmetric_key_algorithms[string_to_key_usage])
601 if string_to_key_usage:
602 block_size_bits = self._cipher_block_size.get(
603 self['symmetric-encryption-algorithm'], None)
604 if block_size_bits % 8:
605 raise NotImplementedError(
606 ('{}-bit block size for {} is not an integer number of bytes'
608 block_size_bits, self['symmetric-encryption-algorithm']))
609 block_size = block_size_bits // 8
611 raise NotImplementedError(
612 'unknown block size for {}'.format(
613 self['symmetric-encryption-algorithm']))
614 self['initial-vector'] = data[offset: offset + block_size]
616 ciphertext = data[offset:]
617 offset += len(ciphertext)
618 decrypted_data = self.decrypt_symmetric_encryption(data=ciphertext)
620 decrypted_data = data[offset:key_end]
621 if string_to_key_usage in [0, 255]:
623 elif string_to_key_usage == 254:
627 secret_key = decrypted_data[:key_end]
630 secret_key_checksum = decrypted_data[key_end:]
632 calculated_checksum = sum(secret_key) % 65536
634 checksum_hash = _hashlib.sha1()
635 checksum_hash.update(secret_key)
636 calculated_checksum = checksum_hash.digest()
637 if secret_key_checksum != calculated_checksum:
639 'corrupt secret key (checksum {} != expected {})'.format(
640 secret_key_checksum, calculated_checksum))
641 if self['public-key-algorithm'].startswith('rsa '):
642 o, self['secret-exponent'] = self._parse_multiprecision_integer(
643 secret_key[secret_offset:])
645 o, self['secret-prime-p'] = self._parse_multiprecision_integer(
646 secret_key[secret_offset:])
648 o, self['secret-prime-q'] = self._parse_multiprecision_integer(
649 secret_key[secret_offset:])
651 o, self['secret-inverse-of-p-mod-q'] = (
652 self._parse_multiprecision_integer(
653 secret_key[secret_offset:]))
655 elif self['public-key-algorithm'].startswith('dsa '):
656 o, self['secret-exponent'] = self._parse_multiprecision_integer(
657 secret_key[secret_offset:])
659 elif self['public-key-algorithm'].startswith('elgamal '):
660 o, self['secret-exponent'] = self._parse_multiprecision_integer(
661 secret_key[secret_offset:])
664 raise NotImplementedError(
665 'algorithm-specific key fields for {}'.format(
666 self['public-key-algorithm']))
667 if secret_offset != len(secret_key):
669 ('parsed {} out of {} bytes of algorithm-specific key fields '
671 secret_offset, len(secret_key),
672 self['public-key-algorithm']))
674 def _parse_signature_subpackets(self, data):
676 while offset < len(data):
677 o, subpacket = self._parse_signature_subpacket(data=data[offset:])
681 def _parse_signature_subpacket(self, data):
687 elif first >= 192 and first < 255:
688 second = data[offset]
690 length = ((first - 192) << 8) + second + 192
692 length = _struct.unpack(
693 '>I', data[offset: offset + 4])[0]
695 subpacket['type'] = self._signature_subpacket_types[data[offset]]
697 subpacket_data = data[offset: offset + length - 1]
698 offset += len(subpacket_data)
699 method_name = '_parse_{}_signature_subpacket'.format(
700 self._clean_type(type=subpacket['type']))
701 method = getattr(self, method_name, None)
703 raise NotImplementedError(
704 'cannot parse signature subpacket type {!r}'.format(
706 method(data=subpacket_data, subpacket=subpacket)
707 return (offset, subpacket)
709 def _parse_signature_packet(self, data):
710 self['signature-version'] = data[0]
712 if self['signature-version'] != 4:
713 raise NotImplementedError(
714 'signature packet version {}'.format(
715 self['signature-version']))
716 self['signature-type'] = self._signature_types[data[offset]]
718 self['public-key-algorithm'] = self._public_key_algorithms[
721 self['hash-algorithm'] = self._hash_algorithms[data[offset]]
723 hashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
725 self['hashed-subpackets'] = list(self._parse_signature_subpackets(
726 data[offset: offset + hashed_count]))
727 offset += hashed_count
728 unhashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
730 self['unhashed-subpackets'] = list(self._parse_signature_subpackets(
731 data=data[offset: offset + unhashed_count]))
732 offset += unhashed_count
733 self['signed-hash-word'] = data[offset: offset + 2]
735 self['signature'] = data[offset:]
736 if self['signature-type'] == 'standalone':
737 self['target'] = None
738 elif self['signature-type'].endswith(' user id and public-key packet'):
740 [p for p in self.key.public_packets if p['type'] == 'public-key packet'][-1],
741 [p for p in self.key.public_packets if p['type'] == 'user id packet'][-1],
744 raise NotImplementedError(
745 'target for {}'.format(self['signature-type']))
747 def _parse_signature_creation_time_signature_subpacket(
748 self, data, subpacket):
749 subpacket['signature-creation-time'] = _struct.unpack('>I', data)[0]
751 def _parse_issuer_signature_subpacket(self, data, subpacket):
752 subpacket['issuer'] = byte_string(data=data, sep='')
754 def _parse_key_expiration_time_signature_subpacket(
755 self, data, subpacket):
756 subpacket['key-expiration-time'] = _struct.unpack('>I', data)[0]
758 def _parse_preferred_symmetric_algorithms_signature_subpacket(
759 self, data, subpacket):
760 subpacket['preferred-symmetric-algorithms'] = [
761 self._symmetric_key_algorithms[d] for d in data]
763 def _parse_preferred_hash_algorithms_signature_subpacket(
764 self, data, subpacket):
765 subpacket['preferred-hash-algorithms'] = [
766 self._hash_algorithms[d] for d in data]
768 def _parse_preferred_compression_algorithms_signature_subpacket(
769 self, data, subpacket):
770 subpacket['preferred-compression-algorithms'] = [
771 self._compression_algorithms[d] for d in data]
773 def _parse_key_server_preferences_signature_subpacket(
774 self, data, subpacket):
775 subpacket['key-server-preferences'] = set()
777 subpacket['key-server-preferences'].add('no-modify')
779 def _parse_primary_user_id_signature_subpacket(self, data, subpacket):
780 subpacket['primary-user-id'] = bool(data[0])
782 def _parse_key_flags_signature_subpacket(self, data, subpacket):
783 subpacket['key-flags'] = set()
785 subpacket['key-flags'].add('can certify')
787 subpacket['key-flags'].add('can sign')
789 subpacket['key-flags'].add('can encrypt communications')
791 subpacket['key-flags'].add('can encrypt storage')
793 subpacket['key-flags'].add('private split')
795 subpacket['key-flags'].add('can authenticate')
797 subpacket['key-flags'].add('private shared')
799 def _parse_features_signature_subpacket(self, data, subpacket):
800 subpacket['features'] = set()
802 subpacket['features'].add('modification detection')
804 def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
805 subpacket['embedded'] = PGPPacket(key=self.key)
806 subpacket['embedded']._parse_signature_packet(data=data)
808 def _parse_user_id_packet(self, data):
809 self['user'] = str(data, 'utf-8')
812 body = self._serialize_body()
814 raise ValueError(method)
815 self['length'] = len(body)
817 self._serialize_header(),
821 def _serialize_header(self):
824 type_code = self._reverse(self._packet_types, self['type'])
826 always_one * (1 << 7) |
827 new_format * (1 << 6) |
828 type_code * (1 << 2) |
831 length_bytes, length_type = self._old_format_packet_length_type[
833 length_format = '>{}'.format(length_type)
834 length_data = _struct.pack(length_format, self['length'])
840 def _serialize_body(self):
841 method_name = '_serialize_{}'.format(self._clean_type())
842 method = getattr(self, method_name, None)
844 raise NotImplementedError(
845 'cannot serialize packet type {!r}'.format(self['type']))
849 def _serialize_multiprecision_integer(integer):
850 r"""Serialize RFC 4880's multipricision integers
852 >>> PGPPacket._serialize_multiprecision_integer(1)
854 >>> PGPPacket._serialize_multiprecision_integer(511)
857 bit_length = int(_math.log(integer, 2)) + 1
859 _struct.pack('>H', bit_length),
862 chunks.insert(1, bytes([integer & 0xff]))
863 integer = integer >> 8
864 return b''.join(chunks)
867 def _encode_string_to_key_count(cls, count):
868 r"""Encode RFC 4880's string-to-key count
870 >>> PGPPacket._encode_string_to_key_count(753664)
874 count = count >> cls._string_to_key_expbias
877 coded_count += 1 << 4
878 coded_count += count & 15
879 return bytes([coded_count])
881 def _serialize_string_to_key_specifier(self):
882 string_to_key_type = bytes([
884 self._string_to_key_types, self['string-to-key-type']),
886 chunks = [string_to_key_type]
887 if self['string-to-key-type'] == 'simple':
888 chunks.append(bytes([self._reverse(
889 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
890 elif self['string-to-key-type'] == 'salted':
891 chunks.append(bytes([self._reverse(
892 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
893 chunks.append(self['string-to-key-salt'])
894 elif self['string-to-key-type'] == 'iterated and salted':
895 chunks.append(bytes([self._reverse(
896 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
897 chunks.append(self['string-to-key-salt'])
898 chunks.append(self._encode_string_to_key_count(
899 count=self['string-to-key-count']))
901 raise NotImplementedError(
902 'string-to-key type {}'.format(self['string-to-key-type']))
904 return b''.join(chunks)
906 def _serialize_public_key_packet(self):
907 return self._serialize_generic_public_key_packet()
909 def _serialize_public_subkey_packet(self):
910 return self._serialize_generic_public_key_packet()
912 def _serialize_generic_public_key_packet(self):
913 key_version = bytes([self['key-version']])
914 chunks = [key_version]
915 if self['key-version'] != 4:
916 raise NotImplementedError(
917 'public (sub)key packet version {}'.format(
918 self['key-version']))
919 chunks.append(_struct.pack('>I', self['creation-time']))
920 chunks.append(bytes([self._reverse(
921 self._public_key_algorithms, self['public-key-algorithm'])]))
922 if self['public-key-algorithm'].startswith('rsa '):
923 chunks.append(self._serialize_multiprecision_integer(
924 self['public-modulus']))
925 chunks.append(self._serialize_multiprecision_integer(
926 self['public-exponent']))
927 elif self['public-key-algorithm'].startswith('dsa '):
928 chunks.append(self._serialize_multiprecision_integer(
930 chunks.append(self._serialize_multiprecision_integer(
931 self['group-order']))
932 chunks.append(self._serialize_multiprecision_integer(
933 self['group-generator']))
934 chunks.append(self._serialize_multiprecision_integer(
936 elif self['public-key-algorithm'].startswith('elgamal '):
937 chunks.append(self._serialize_multiprecision_integer(
939 chunks.append(self._serialize_multiprecision_integer(
940 self['group-generator']))
941 chunks.append(self._serialize_multiprecision_integer(
944 raise NotImplementedError(
945 'algorithm-specific key fields for {}'.format(
946 self['public-key-algorithm']))
947 return b''.join(chunks)
949 def _serialize_signature_subpackets(self, subpackets):
951 self._serialize_signature_subpacket(subpacket=subpacket)
952 for subpacket in subpackets)
954 def _serialize_signature_subpacket(self, subpacket):
955 method_name = '_serialize_{}_signature_subpacket'.format(
956 self._clean_type(type=subpacket['type']))
957 method = getattr(self, method_name, None)
959 raise NotImplementedError(
960 'cannot serialize signature subpacket type {!r}'.format(
962 body = method(subpacket=subpacket)
963 length = len(body) + 1
966 chunks.append(bytes([length]))
968 first = ((length - 192) >> 8) + 192
972 (length - 192) % 256,
975 chunks.append(_struct.pack('>I', length))
976 chunks.append(bytes([self._reverse(
977 self._signature_subpacket_types, subpacket['type'])]))
979 return b''.join(chunks)
981 def _serialize_signature_packet_target(self, target):
984 elif isinstance(target, bytes):
986 elif isinstance(target, PGPPacket):
987 serialized = target._serialize_body()
988 if target['type'] in [
990 'public-subkey packet'
992 'secret-subkey packet'
994 serialized = b''.join([
996 _struct.pack('>H', len(serialized)),
999 elif target['type'] == 'user id packet':
1000 serialized = b''.join([
1002 _struct.pack('>I', len(serialized)),
1005 elif target['type'] == 'user attribute packet':
1006 serialized = b''.join([
1008 _struct.pack('>I', len(serialized)),
1012 elif isinstance(target, list):
1014 self._serialize_signature_packet_target(target=x)
1017 def _serialize_signature_packet(self):
1018 if self['signature-version'] != 4:
1019 raise NotImplementedError(
1020 'signature packet version {}'.format(
1021 self['signature-version']))
1022 signature_version = bytes([self['signature-version']])
1023 chunks = [signature_version]
1024 chunks.append(bytes([self._reverse(
1025 self._signature_types, self['signature-type'])]))
1026 chunks.append(bytes([self._reverse(
1027 self._public_key_algorithms, self['public-key-algorithm'])]))
1028 chunks.append(bytes([self._reverse(
1029 self._hash_algorithms, self['hash-algorithm'])]))
1030 hashed_subpackets = self._serialize_signature_subpackets(
1031 self['hashed-subpackets'])
1032 chunks.append(_struct.pack('>H', len(hashed_subpackets)))
1033 chunks.append(hashed_subpackets)
1034 hashed_signature_data = b''.join(chunks)
1035 unhashed_subpackets = self._serialize_signature_subpackets(
1036 self['unhashed-subpackets'])
1037 chunks.append(_struct.pack('>H', len(unhashed_subpackets)))
1038 chunks.append(unhashed_subpackets)
1039 target = self._serialize_signature_packet_target(target=self['target'])
1040 signed_data = b''.join([
1042 hashed_signature_data,
1045 _struct.pack('>I', len(hashed_signature_data)),
1047 digest, signature = self.key.sign(
1048 data=signed_data, hash_algorithm=self['hash-algorithm'],
1049 signature_algorithm=self['public-key-algorithm'])
1050 chunks.extend([digest[:2], signature])
1051 return b''.join(chunks)
1053 def _serialize_signature_creation_time_signature_subpacket(
1055 return _struct.pack('>I', subpacket['signature-creation-time'])
1057 def _serialize_issuer_signature_subpacket(self, subpacket):
1058 return string_bytes(data=subpacket['issuer'], sep='')
1060 def _serialize_key_expiration_time_signature_subpacket(self, subpacket):
1061 return _struct.pack('>I', subpacket['key-expiration-time'])
1063 def _serialize_preferred_symmetric_algorithms_signature_subpacket(
1066 self._reverse(self._symmetric_key_algorithms, a)
1067 for a in subpacket['preferred-symmetric-algorithms'])
1069 def _serialize_preferred_hash_algorithms_signature_subpacket(
1072 self._reverse(self._hash_algorithms, a)
1073 for a in subpacket['preferred-hash-algorithms'])
1075 def _serialize_preferred_compression_algorithms_signature_subpacket(
1078 self._reverse(self._compression_algorithms, a)
1079 for a in subpacket['preferred-compression-algorithms'])
1081 def _serialize_key_server_preferences_signature_subpacket(self, subpacket):
1083 0x80 * ('no-modify' in subpacket['key-server-preferences']) |
1087 def _serialize_primary_user_id_signature_subpacket(self, subpacket):
1088 return bytes([0x1 * subpacket['primary-user-id']])
1090 def _serialize_key_flags_signature_subpacket(self, subpacket):
1092 0x1 * ('can certify' in subpacket['key-flags']) |
1093 0x2 * ('can sign' in subpacket['key-flags']) |
1094 0x4 * ('can encrypt communications' in subpacket['key-flags']) |
1095 0x8 * ('can encrypt storage' in subpacket['key-flags']) |
1096 0x10 * ('private split' in subpacket['key-flags']) |
1097 0x20 * ('can authenticate' in subpacket['key-flags']) |
1098 0x80 * ('private shated' in subpacket['key-flags']) |
1102 def _serialize_features_signature_subpacket(self, subpacket):
1104 0x1 * ('modification detection' in subpacket['features']) |
1108 def _serialize_embedded_signature_signature_subpacket(self, subpacket):
1109 return subpacket['embedded'].to_bytes()
1111 def _serialize_user_id_packet(self):
1112 return self['user'].encode('utf-8')
1114 def _string_to_key(self, string, key_size):
1117 '{}-bit key is not an integer number of bytes'.format(
1119 key_size_bytes = key_size // 8
1120 hash_name = self._hashlib_name[
1121 self['string-to-key-hash-algorithm']]
1122 string_hash = _hashlib.new(hash_name)
1123 hashes = _math.ceil(key_size_bytes / string_hash.digest_size)
1125 if self['string-to-key-type'] == 'simple':
1126 update_bytes = string
1127 elif self['string-to-key-type'] in [
1129 'iterated and salted',
1131 update_bytes = self['string-to-key-salt'] + string
1132 if self['string-to-key-type'] == 'iterated and salted':
1133 count = self['string-to-key-count']
1134 if count < len(update_bytes):
1135 count = len(update_bytes)
1137 raise NotImplementedError(
1138 'key calculation for string-to-key type {}'.format(
1139 self['string-to-key-type']))
1140 for padding in range(hashes):
1141 string_hash = _hashlib.new(hash_name)
1142 string_hash.update(padding * b'\x00')
1143 if self['string-to-key-type'] in [
1147 string_hash.update(update_bytes)
1148 elif self['string-to-key-type'] == 'iterated and salted':
1150 while remaining > 0:
1151 string_hash.update(update_bytes[:remaining])
1152 remaining -= len(update_bytes)
1153 key += string_hash.digest()
1154 key = key[:key_size_bytes]
1157 def decrypt_symmetric_encryption(self, data):
1158 """Decrypt OpenPGP's Cipher Feedback mode"""
1159 algorithm = self['symmetric-encryption-algorithm']
1160 module = self._crypto_module[algorithm]
1161 key_size = self._key_size[algorithm]
1162 segment_size_bits = self._cipher_block_size[algorithm]
1163 if segment_size_bits % 8:
1164 raise NotImplementedError(
1165 ('{}-bit segment size for {} is not an integer number of bytes'
1166 ).format(segment_size_bits, algorithm))
1167 segment_size_bytes = segment_size_bits // 8
1168 padding = segment_size_bytes - len(data) % segment_size_bytes
1170 data += b'\x00' * padding
1171 if self.key and self.key._cache_passphrase and self.key._passphrase:
1172 passphrase = self.key._passphrase
1174 passphrase = _getpass.getpass(
1175 'passphrase for {}: '.format(self['fingerprint'][-8:]))
1176 passphrase = passphrase.encode('ascii')
1177 if self.key and self.key._cache_passphrase:
1178 self.key._passphrase = passphrase
1179 key = self._string_to_key(string=passphrase, key_size=key_size)
1180 cipher = module.new(
1182 mode=module.MODE_CFB,
1183 IV=self['initial-vector'],
1184 segment_size=segment_size_bits)
1185 plaintext = cipher.decrypt(data)
1187 plaintext = plaintext[:-padding]
1190 def check_roundtrip(self):
1191 serialized = self.to_bytes()
1192 source = self['raw']
1193 if serialized != source:
1194 if len(serialized) != len(source):
1196 ('serialized {} is {} bytes long, '
1197 'but input is {} bytes long').format(
1198 self['type'], len(serialized), len(source)))
1200 for i in range(0, len(source), 8):
1201 in_chunk = source[i: i + chunk_size]
1202 out_chunk = serialized[i: i + chunk_size]
1203 if in_chunk != out_chunk:
1205 ('serialized {} differs from input packet: '
1206 'at byte {}, {} != {}').format(
1207 self['type'], i, byte_string(data=out_chunk),
1208 byte_string(data=in_chunk)))
1211 class PGPKey (object):
1212 """An OpenPGP key with public and private parts.
1216 OpenPGP users may transfer public keys. The essential elements
1217 of a transferable public key are as follows:
1219 - One Public-Key packet
1220 - Zero or more revocation signatures
1221 - One or more User ID packets
1222 - After each User ID packet, zero or more Signature packets
1224 - Zero or more User Attribute packets
1225 - After each User Attribute packet, zero or more Signature
1226 packets (certifications)
1227 - Zero or more Subkey packets
1228 - After each Subkey packet, one Signature packet, plus
1229 optionally a revocation
1231 Secret keys have a similar packet stream [2]:
1233 OpenPGP users may transfer secret keys. The format of a
1234 transferable secret key is the same as a transferable public key
1235 except that secret-key and secret-subkey packets are used
1236 instead of the public key and public-subkey packets.
1237 Implementations SHOULD include self-signatures on any user IDs
1238 and subkeys, as this allows for a complete public key to be
1239 automatically extracted from the transferable secret key.
1240 Implementations MAY choose to omit the self-signatures,
1241 especially if a transferable public key accompanies the
1242 transferable secret key.
1244 [1]: http://tools.ietf.org/search/rfc4880#section-11.1
1245 [2]: http://tools.ietf.org/search/rfc4880#section-11.2
1247 def __init__(self, fingerprint, cache_passphrase=False):
1248 self.fingerprint = fingerprint
1249 self._cache_passphrase = cache_passphrase
1250 self._passphrase = None
1251 self.public_packets = None
1252 self.secret_packets = None
1255 lines = ['key: {}'.format(self.fingerprint)]
1256 if self.public_packets:
1257 lines.append(' public:')
1258 for packet in self.public_packets:
1259 lines.extend(self._str_packet(packet=packet, prefix=' '))
1260 if self.secret_packets:
1261 lines.append(' secret:')
1262 for packet in self.secret_packets:
1263 lines.extend(self._str_packet(packet=packet, prefix=' '))
1264 return '\n'.join(lines)
1266 def _str_packet(self, packet, prefix):
1267 lines = str(packet).split('\n')
1268 return [prefix + line for line in lines]
1270 def import_from_gpg(self):
1271 key_export = _get_stdout(
1272 ['gpg', '--export', self.fingerprint])
1273 self.public_packets = []
1274 self._packets_from_bytes(list=self.public_packets, data=key_export)
1275 if self.public_packets[0]['type'] != 'public-key packet':
1277 '{} does not start with a public-key packet'.format(
1279 key_secret_export = _get_stdout(
1280 ['gpg', '--export-secret-keys', self.fingerprint])
1281 self.secret_packets = []
1282 self._packets_from_bytes(list=self.secret_packets, data=key_secret_export)
1283 if self.secret_packets[0]['type'] != 'secret-key packet':
1285 '{} does not start with a secret-key packet'.format(
1287 for packet in self.public_packets + self.secret_packets:
1288 packet.check_roundtrip()
1290 def _packets_from_bytes(self, list, data):
1292 while offset < len(data):
1293 packet = PGPPacket(key=self)
1294 offset += packet.from_bytes(data=data[offset:])
1297 def export_to_gpg(self):
1298 raise NotImplemetedError('export to gpg')
1300 def import_from_key(self, key):
1301 """Migrate the (sub)keys into this key"""
1305 def migrate(old_key, new_key, cache_passphrase=False):
1306 """Add the old key and sub-keys to the new key
1308 For example, to upgrade your master key, while preserving old
1309 signatures you'd made. You will lose signature *on* your old key
1310 though, since sub-keys can't be signed (I don't think).
1312 old_key = PGPKey(fingerprint=old_key, cache_passphrase=cache_passphrase)
1313 old_key.import_from_gpg()
1314 new_key = PGPKey(fingerprint=new_key, cache_passphrase=cache_passphrase)
1315 new_key.import_from_gpg()
1316 new_key.import_from_key(key=old_key)
1322 if __name__ == '__main__':
1325 old_key, new_key = _sys.argv[1:3]
1326 migrate(old_key=old_key, new_key=new_key, cache_passphrase=True)