3 import getpass as _getpass
4 import hashlib as _hashlib
7 import subprocess as _subprocess
8 import struct as _struct
10 import Crypto.Cipher.AES as _crypto_cipher_aes
11 import Crypto.Cipher.Blowfish as _crypto_cipher_blowfish
12 import Crypto.Cipher.CAST as _crypto_cipher_cast
13 import Crypto.Cipher.DES3 as _crypto_cipher_des3
16 def _get_stdout(args, stdin=None):
19 stdin_pipe = _subprocess.PIPE
20 p = _subprocess.Popen(args, stdin=stdin_pipe, stdout=_subprocess.PIPE)
21 stdout, stderr = p.communicate(stdin)
24 raise RuntimeError(status)
28 class PGPPacket (dict):
29 # http://tools.ietf.org/search/rfc4880
30 _old_format_packet_length_type = { # type: (bytes, struct type)
31 0: (1, 'B'), # 1-byte unsigned integer
32 1: (2, 'H'), # 2-byte unsigned integer
33 2: (4, 'I'), # 4-byte unsigned integer
39 1: 'public-key encrypted session key packet',
40 2: 'signature packet',
41 3: 'symmetric-key encrypted session key packet',
42 4: 'one-pass signature packet',
43 5: 'secret-key packet',
44 6: 'public-key packet',
45 7: 'secret-subkey packet',
46 8: 'compressed data packet',
47 9: 'symmetrically encrypted data packet',
49 11: 'literal data packet',
52 14: 'public-subkey packet',
53 17: 'user attribute packet',
54 18: 'sym. encrypted and integrity protected data packet',
55 19: 'modification detection code packet',
62 _public_key_algorithms = {
63 1: 'rsa (encrypt or sign)',
64 2: 'rsa encrypt-only',
66 16: 'elgamal (encrypt-only)',
67 17: 'dsa (digital signature algorithm)',
68 18: 'reserved for elliptic curve',
69 19: 'reserved for ecdsa',
70 20: 'reserved (formerly elgamal encrypt or sign)',
71 21: 'reserved for diffie-hellman',
85 _symmetric_key_algorithms = {
86 0: 'plaintext or unencrypted data',
93 7: 'aes with 128-bit key',
94 8: 'aes with 192-bit key',
95 9: 'aes with 256-bit key',
110 _cipher_block_size = { # in bits
111 'aes with 128-bit key': 128,
112 'aes with 192-bit key': 128,
113 'aes with 256-bit key': 128,
119 'aes with 128-bit key': _crypto_cipher_aes,
120 'aes with 192-bit key': _crypto_cipher_aes,
121 'aes with 256-bit key': _crypto_cipher_aes,
122 'blowfish': _crypto_cipher_blowfish,
123 'cast5': _crypto_cipher_cast,
124 'tripledes': _crypto_cipher_des3,
127 _key_size = { # in bits
128 'aes with 128-bit key': 128,
129 'aes with 192-bit key': 192,
130 'aes with 256-bit key': 256,
134 _compression_algorithms = {
177 _hashlib_name = { # map OpenPGP-based names to hashlib names
180 'ripe-md/160': 'ripemd160',
187 _string_to_key_types = {
191 3: 'iterated and salted',
205 _string_to_key_expbias = 6
208 0x00: 'binary document',
209 0x01: 'canonical text document',
211 0x10: 'generic user id and public-key packet',
212 0x11: 'persona user id and public-key packet',
213 0x12: 'casual user id and public-key packet',
214 0x13: 'postitive user id and public-key packet',
215 0x18: 'subkey binding',
216 0x19: 'primary key binding',
218 0x20: 'key revocation',
219 0x28: 'subkey revocation',
220 0x30: 'certification revocation',
222 0x50: 'third-party confirmation',
225 _signature_subpacket_types = {
228 2: 'signature creation time',
229 3: 'signature expiration time',
230 4: 'exportable certification',
231 5: 'trust signature',
232 6: 'regular expression',
235 9: 'key expiration time',
236 10: 'placeholder for backward compatibility',
237 11: 'preferred symmetric algorithms',
238 12: 'revocation key',
247 21: 'preferred hash algorithms',
248 22: 'preferred compression algorithms',
249 23: 'key server preferences',
250 24: 'preferred key server',
251 25: 'primary user id',
254 28: 'signer user id',
255 29: 'reason for revocation',
257 31: 'signature target',
258 32: 'embedded signature',
272 _clean_type_regex = _re.compile('\W+')
274 def __init__(self, key=None):
275 super(PGPPacket, self).__init__()
278 def _clean_type(self, type=None):
281 return self._clean_type_regex.sub('_', type)
284 def _reverse(dict, value):
285 """Reverse lookups in dictionaries
287 >>> PGPPacket._reverse(PGPPacket._packet_types, 'public-key packet')
290 return [k for k,v in dict.items() if v == value][0]
293 method_name = '_str_{}'.format(self._clean_type())
294 method = getattr(self, method_name, None)
298 return '{}: {}'.format(self['type'], details)
300 def _str_public_key_packet(self):
301 return self._str_generic_key_packet()
303 def _str_public_subkey_packet(self):
304 return self._str_generic_key_packet()
306 def _str_generic_key_packet(self):
307 return self['fingerprint'][-8:].upper()
309 def _str_secret_key_packet(self):
310 return self._str_generic_secret_key_packet()
312 def _str_secret_subkey_packet(self):
313 return self._str_generic_secret_key_packet()
315 def _str_generic_secret_key_packet(self):
316 lines = [self._str_generic_key_packet()]
318 ('symmetric encryption',
319 'symmetric-encryption-algorithm'),
320 ('s2k hash', 'string-to-key-hash-algorithm'),
321 ('s2k count', 'string-to-key-count'),
322 ('s2k salt', 'string-to-key-salt'),
323 ('IV', 'initial-vector'),
327 if isinstance(value, bytes):
328 value = ' '.join('{:02x}'.format(byte) for byte in value)
329 lines.append(' {}: {}'.format(label, value))
330 return '\n'.join(lines)
332 def _str_signature_packet(self):
333 lines = [self['signature-type']]
334 if self['hashed-subpackets']:
335 lines.append(' hashed subpackets:')
336 lines.extend(self._str_signature_subpackets(
337 self['hashed-subpackets'], prefix=' '))
338 if self['unhashed-subpackets']:
339 lines.append(' unhashed subpackets:')
340 lines.extend(self._str_signature_subpackets(
341 self['unhashed-subpackets'], prefix=' '))
342 return '\n'.join(lines)
344 def _str_signature_subpackets(self, subpackets, prefix):
346 for subpacket in subpackets:
347 method_name = '_str_{}_signature_subpacket'.format(
348 self._clean_type(type=subpacket['type']))
349 method = getattr(self, method_name, None)
351 lines.append(' {}: {}'.format(
353 method(subpacket=subpacket)))
355 lines.append(' {}'.format(subpacket['type']))
358 def _str_signature_creation_time_signature_subpacket(self, subpacket):
359 return str(subpacket['signature-creation-time'])
361 def _str_issuer_signature_subpacket(self, subpacket):
362 return subpacket['issuer'][-8:].upper()
364 def _str_key_expiration_time_signature_subpacket(self, subpacket):
365 return str(subpacket['key-expiration-time'])
367 def _str_preferred_symmetric_algorithms_signature_subpacket(
370 algo for algo in subpacket['preferred-symmetric-algorithms'])
372 def _str_preferred_hash_algorithms_signature_subpacket(
375 algo for algo in subpacket['preferred-hash-algorithms'])
377 def _str_preferred_compression_algorithms_signature_subpacket(
380 algo for algo in subpacket['preferred-compression-algorithms'])
382 def _str_key_server_preferences_signature_subpacket(self, subpacket):
384 x for x in sorted(subpacket['key-server-preferences']))
386 def _str_primary_user_id_signature_subpacket(self, subpacket):
387 return str(subpacket['primary-user-id'])
389 def _str_key_flags_signature_subpacket(self, subpacket):
390 return ', '.join(x for x in sorted(subpacket['key-flags']))
392 def _str_features_signature_subpacket(self, subpacket):
393 return ', '.join(x for x in sorted(subpacket['features']))
395 def _str_embedded_signature_signature_subpacket(self, subpacket):
396 return subpacket['embedded']['signature-type']
398 def _str_user_id_packet(self):
401 def from_bytes(self, data):
402 offset = self._parse_header(data=data)
403 packet = data[offset:offset + self['length']]
404 if len(packet) < self['length']:
405 raise ValueError('packet too short ({} < {})'.format(
406 len(packet), self['length']))
407 offset += self['length']
408 method_name = '_parse_{}'.format(self._clean_type())
409 method = getattr(self, method_name, None)
411 raise NotImplementedError(
412 'cannot parse packet type {!r}'.format(self['type']))
414 self['raw'] = data[:offset]
417 def _parse_header(self, data):
420 always_one = packet_tag & 1 << 7
422 raise ValueError('most significant packet tag bit not set')
423 self['new-format'] = packet_tag & 1 << 6
424 if self['new-format']:
425 type_code = packet_tag & 0b111111
426 raise NotImplementedError('new-format packet length')
428 type_code = packet_tag >> 2 & 0b1111
429 self['length-type'] = packet_tag & 0b11
430 length_bytes, length_type = self._old_format_packet_length_type[
433 raise NotImplementedError(
434 'old-format packet of indeterminate length')
435 length_format = '>{}'.format(length_type)
436 length_data = data[offset: offset + length_bytes]
437 offset += length_bytes
438 self['length'] = _struct.unpack(length_format, length_data)[0]
439 self['type'] = self._packet_types[type_code]
443 def _parse_multiprecision_integer(data):
444 r"""Parse RFC 4880's multiprecision integers
446 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x01\x01')
448 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x09\x01\xff')
451 bits = _struct.unpack('>H', data[:2])[0]
453 length = (bits + 7) // 8
455 for i in range(length):
456 value += data[offset + i] * 1 << (8 * (length - i - 1))
458 return (offset, value)
461 def _decode_string_to_key_count(cls, data):
462 r"""Decode RFC 4880's string-to-key count
464 >>> PGPPacket._decode_string_to_key_count(b'\x97'[0])
467 return (16 + (data & 15)) << ((data >> 4) + cls._string_to_key_expbias)
469 def _parse_string_to_key_specifier(self, data):
470 self['string-to-key-type'] = self._string_to_key_types[data[0]]
472 if self['string-to-key-type'] == 'simple':
473 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
476 elif self['string-to-key-type'] == 'salted':
477 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
480 self['string-to-key-salt'] = data[offset: offset + 8]
482 elif self['string-to-key-type'] == 'iterated and salted':
483 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
486 self['string-to-key-salt'] = data[offset: offset + 8]
488 self['string-to-key-count'] = self._decode_string_to_key_count(
492 raise NotImplementedError(
493 'string-to-key type {}'.format(self['string-to-key-type']))
496 def _parse_public_key_packet(self, data):
497 self._parse_generic_public_key_packet(data=data)
499 def _parse_public_subkey_packet(self, data):
500 self._parse_generic_public_key_packet(data=data)
502 def _parse_generic_public_key_packet(self, data):
503 self['key-version'] = data[0]
505 if self['key-version'] != 4:
506 raise NotImplementedError(
507 'public (sub)key packet version {}'.format(
508 self['key-version']))
510 self['creation-time'], algorithm = _struct.unpack(
511 '>IB', data[offset: offset + length])
513 self['public-key-algorithm'] = self._public_key_algorithms[algorithm]
514 if self['public-key-algorithm'].startswith('rsa '):
515 o, self['public-modulus'] = self._parse_multiprecision_integer(
518 o, self['public-exponent'] = self._parse_multiprecision_integer(
521 elif self['public-key-algorithm'].startswith('dsa '):
522 o, self['prime'] = self._parse_multiprecision_integer(
525 o, self['group-order'] = self._parse_multiprecision_integer(
528 o, self['group-generator'] = self._parse_multiprecision_integer(
531 o, self['public-key'] = self._parse_multiprecision_integer(
534 elif self['public-key-algorithm'].startswith('elgamal '):
535 o, self['prime'] = self._parse_multiprecision_integer(
538 o, self['group-generator'] = self._parse_multiprecision_integer(
541 o, self['public-key'] = self._parse_multiprecision_integer(
545 raise NotImplementedError(
546 'algorithm-specific key fields for {}'.format(
547 self['public-key-algorithm']))
548 fingerprint = _hashlib.sha1()
549 fingerprint.update(b'\x99')
550 fingerprint.update(_struct.pack('>H', len(data)))
551 fingerprint.update(data)
552 self['fingerprint'] = fingerprint.hexdigest()
555 def _parse_secret_key_packet(self, data):
556 self._parse_generic_secret_key_packet(data=data)
558 def _parse_secret_subkey_packet(self, data):
559 self._parse_generic_secret_key_packet(data=data)
561 def _parse_generic_secret_key_packet(self, data):
562 offset = self._parse_generic_public_key_packet(data=data)
563 string_to_key_usage = data[offset]
565 if string_to_key_usage in [255, 254]:
566 self['symmetric-encryption-algorithm'] = (
567 self._symmetric_key_algorithms[data[offset]])
569 offset += self._parse_string_to_key_specifier(data=data[offset:])
571 self['symmetric-encryption-algorithm'] = (
572 self._symmetric_key_algorithms[string_to_key_usage])
573 if string_to_key_usage:
574 block_size_bits = self._cipher_block_size.get(
575 self['symmetric-encryption-algorithm'], None)
576 if block_size_bits % 8:
577 raise NotImplementedError(
578 ('{}-bit block size for {} is not an integer number of bytes'
580 block_size_bits, self['symmetric-encryption-algorithm']))
581 block_size = block_size_bits // 8
583 raise NotImplementedError(
584 'unknown block size for {}'.format(
585 self['symmetric-encryption-algorithm']))
586 self['initial-vector'] = data[offset: offset + block_size]
588 ciphertext = data[offset:]
589 offset += len(ciphertext)
590 decrypted_data = self.decrypt_symmetric_encryption(data=ciphertext)
592 decrypted_data = data[offset:key_end]
593 if string_to_key_usage in [0, 255]:
595 elif string_to_key_usage == 254:
599 secret_key = decrypted_data[:key_end]
602 secret_key_checksum = decrypted_data[key_end:]
604 calculated_checksum = sum(secret_key) % 65536
606 checksum_hash = _hashlib.sha1()
607 checksum_hash.update(secret_key)
608 calculated_checksum = checksum_hash.digest()
609 if secret_key_checksum != calculated_checksum:
611 'corrupt secret key (checksum {} != expected {})'.format(
612 secret_key_checksum, calculated_checksum))
613 if self['public-key-algorithm'].startswith('rsa '):
614 o, self['secret-exponent'] = self._parse_multiprecision_integer(
615 secret_key[secret_offset:])
617 o, self['secret-prime-p'] = self._parse_multiprecision_integer(
618 secret_key[secret_offset:])
620 o, self['secret-prime-q'] = self._parse_multiprecision_integer(
621 secret_key[secret_offset:])
623 o, self['secret-inverse-of-p-mod-q'] = (
624 self._parse_multiprecision_integer(
625 secret_key[secret_offset:]))
627 elif self['public-key-algorithm'].startswith('dsa '):
628 o, self['secret-exponent'] = self._parse_multiprecision_integer(
629 secret_key[secret_offset:])
631 elif self['public-key-algorithm'].startswith('elgamal '):
632 o, self['secret-exponent'] = self._parse_multiprecision_integer(
633 secret_key[secret_offset:])
636 raise NotImplementedError(
637 'algorithm-specific key fields for {}'.format(
638 self['public-key-algorithm']))
639 if secret_offset != len(secret_key):
641 ('parsed {} out of {} bytes of algorithm-specific key fields '
643 secret_offset, len(secret_key),
644 self['public-key-algorithm']))
646 def _parse_signature_subpackets(self, data):
648 while offset < len(data):
649 o, subpacket = self._parse_signature_subpacket(data=data[offset:])
653 def _parse_signature_subpacket(self, data):
659 elif first >= 192 and first < 255:
660 second = data[offset]
662 length = ((first - 192) << 8) + second + 192
664 length = _struct.unpack(
665 '>I', data[offset: offset + 4])[0]
667 subpacket['type'] = self._signature_subpacket_types[data[offset]]
669 subpacket_data = data[offset: offset + length - 1]
670 offset += len(subpacket_data)
671 method_name = '_parse_{}_signature_subpacket'.format(
672 self._clean_type(type=subpacket['type']))
673 method = getattr(self, method_name, None)
675 raise NotImplementedError(
676 'cannot parse signature subpacket type {!r}'.format(
678 method(data=subpacket_data, subpacket=subpacket)
679 return (offset, subpacket)
681 def _parse_signature_packet(self, data):
682 self['signature-version'] = data[0]
684 if self['signature-version'] != 4:
685 raise NotImplementedError(
686 'signature packet version {}'.format(
687 self['signature-version']))
688 self['signature-type'] = self._signature_types[data[offset]]
690 self['public-key-algorithm'] = self._public_key_algorithms[
693 self['hash-algorithm'] = self._hash_algorithms[data[offset]]
695 hashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
697 self['hashed-subpackets'] = list(self._parse_signature_subpackets(
698 data[offset: offset + hashed_count]))
699 offset += hashed_count
700 unhashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
702 self['unhashed-subpackets'] = list(self._parse_signature_subpackets(
703 data=data[offset: offset + unhashed_count]))
704 offset += unhashed_count
705 self['signed-hash-word'] = data[offset: offset + 2]
707 self['signature'] = data[offset:]
709 def _parse_signature_creation_time_signature_subpacket(
710 self, data, subpacket):
711 subpacket['signature-creation-time'] = _struct.unpack('>I', data)[0]
713 def _parse_issuer_signature_subpacket(self, data, subpacket):
714 subpacket['issuer'] = ''.join('{:02x}'.format(byte) for byte in data)
716 def _parse_key_expiration_time_signature_subpacket(
717 self, data, subpacket):
718 subpacket['key-expiration-time'] = _struct.unpack('>I', data)[0]
720 def _parse_preferred_symmetric_algorithms_signature_subpacket(
721 self, data, subpacket):
722 subpacket['preferred-symmetric-algorithms'] = [
723 self._symmetric_key_algorithms[d] for d in data]
725 def _parse_preferred_hash_algorithms_signature_subpacket(
726 self, data, subpacket):
727 subpacket['preferred-hash-algorithms'] = [
728 self._hash_algorithms[d] for d in data]
730 def _parse_preferred_compression_algorithms_signature_subpacket(
731 self, data, subpacket):
732 subpacket['preferred-compression-algorithms'] = [
733 self._compression_algorithms[d] for d in data]
735 def _parse_key_server_preferences_signature_subpacket(
736 self, data, subpacket):
737 subpacket['key-server-preferences'] = set()
739 subpacket['key-server-preferences'].add('no-modify')
741 def _parse_primary_user_id_signature_subpacket(self, data, subpacket):
742 subpacket['primary-user-id'] = bool(data[0])
744 def _parse_key_flags_signature_subpacket(self, data, subpacket):
745 subpacket['key-flags'] = set()
747 subpacket['key-flags'].add('can certify')
749 subpacket['key-flags'].add('can sign')
751 subpacket['key-flags'].add('can encrypt communications')
753 subpacket['key-flags'].add('can encrypt storage')
755 subpacket['key-flags'].add('private split')
757 subpacket['key-flags'].add('can authenticate')
759 subpacket['key-flags'].add('private shared')
761 def _parse_features_signature_subpacket(self, data, subpacket):
762 subpacket['features'] = set()
764 subpacket['features'].add('modification detection')
766 def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
767 subpacket['embedded'] = PGPPacket(key=self.key)
768 subpacket['embedded']._parse_signature_packet(data=data)
770 def _parse_user_id_packet(self, data):
771 self['user'] = str(data, 'utf-8')
774 method_name = '_serialize_{}'.format(self._clean_type())
775 method = getattr(self, method_name, None)
777 raise NotImplementedError(
778 'cannot serialize packet type {!r}'.format(self['type']))
780 self['length'] = len(body)
782 self._serialize_header(),
786 def _serialize_header(self):
789 type_code = self._reverse(self._packet_types, self['type'])
791 always_one * (1 << 7) |
792 new_format * (1 << 6) |
793 type_code * (1 << 2) |
796 length_bytes, length_type = self._old_format_packet_length_type[
798 length_format = '>{}'.format(length_type)
799 length_data = _struct.pack(length_format, self['length'])
806 def _serialize_multiprecision_integer(integer):
807 r"""Serialize RFC 4880's multipricision integers
809 >>> PGPPacket._serialize_multiprecision_integer(1)
811 >>> PGPPacket._serialize_multiprecision_integer(511)
814 bit_length = int(_math.log(integer, 2)) + 1
816 _struct.pack('>H', bit_length),
819 chunks.insert(1, bytes([integer & 0xff]))
820 integer = integer >> 8
821 return b''.join(chunks)
824 def _encode_string_to_key_count(cls, count):
825 r"""Encode RFC 4880's string-to-key count
827 >>> PGPPacket._encode_string_to_key_count(753664)
831 count = count >> cls._string_to_key_expbias
834 coded_count += 1 << 4
835 coded_count += count & 15
836 return bytes([coded_count])
838 def _serialize_string_to_key_specifier(self):
839 string_to_key_type = bytes([
841 self._string_to_key_types, self['string-to-key-type']),
843 chunks = [string_to_key_type]
844 if self['string-to-key-type'] == 'simple':
845 chunks.append(bytes([self._reverse(
846 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
847 elif self['string-to-key-type'] == 'salted':
848 chunks.append(bytes([self._reverse(
849 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
850 chunks.append(self['string-to-key-salt'])
851 elif self['string-to-key-type'] == 'iterated and salted':
852 chunks.append(bytes([self._reverse(
853 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
854 chunks.append(self['string-to-key-salt'])
855 chunks.append(self._encode_string_to_key_count(
856 count=self['string-to-key-count']))
858 raise NotImplementedError(
859 'string-to-key type {}'.format(self['string-to-key-type']))
861 return b''.join(chunks)
863 def _serialize_public_key_packet(self):
864 return self._serialize_generic_public_key_packet()
866 def _serialize_public_subkey_packet(self):
867 return self._serialize_generic_public_key_packet()
869 def _serialize_generic_public_key_packet(self):
870 key_version = bytes([self['key-version']])
871 chunks = [key_version]
872 if self['key-version'] != 4:
873 raise NotImplementedError(
874 'public (sub)key packet version {}'.format(
875 self['key-version']))
876 chunks.append(_struct.pack('>I', self['creation-time']))
877 chunks.append(bytes([self._reverse(
878 self._public_key_algorithms, self['public-key-algorithm'])]))
879 if self['public-key-algorithm'].startswith('rsa '):
880 chunks.append(self._serialize_multiprecision_integer(
881 self['public-modulus']))
882 chunks.append(self._serialize_multiprecision_integer(
883 self['public-exponent']))
884 elif self['public-key-algorithm'].startswith('dsa '):
885 chunks.append(self._serialize_multiprecision_integer(
887 chunks.append(self._serialize_multiprecision_integer(
888 self['group-order']))
889 chunks.append(self._serialize_multiprecision_integer(
890 self['group-generator']))
891 chunks.append(self._serialize_multiprecision_integer(
893 elif self['public-key-algorithm'].startswith('elgamal '):
894 chunks.append(self._serialize_multiprecision_integer(
896 chunks.append(self._serialize_multiprecision_integer(
897 self['group-generator']))
898 chunks.append(self._serialize_multiprecision_integer(
901 raise NotImplementedError(
902 'algorithm-specific key fields for {}'.format(
903 self['public-key-algorithm']))
904 return b''.join(chunks)
906 def _string_to_key(self, string, key_size):
909 '{}-bit key is not an integer number of bytes'.format(
911 key_size_bytes = key_size // 8
912 hash_name = self._hashlib_name[
913 self['string-to-key-hash-algorithm']]
914 string_hash = _hashlib.new(hash_name)
915 hashes = _math.ceil(key_size_bytes / string_hash.digest_size)
917 if self['string-to-key-type'] == 'simple':
918 update_bytes = string
919 elif self['string-to-key-type'] in [
921 'iterated and salted',
923 update_bytes = self['string-to-key-salt'] + string
924 if self['string-to-key-type'] == 'iterated and salted':
925 count = self['string-to-key-count']
926 if count < len(update_bytes):
927 count = len(update_bytes)
929 raise NotImplementedError(
930 'key calculation for string-to-key type {}'.format(
931 self['string-to-key-type']))
932 for padding in range(hashes):
933 string_hash = _hashlib.new(hash_name)
934 string_hash.update(padding * b'\x00')
935 if self['string-to-key-type'] in [
939 string_hash.update(update_bytes)
940 elif self['string-to-key-type'] == 'iterated and salted':
943 string_hash.update(update_bytes[:remaining])
944 remaining -= len(update_bytes)
945 key += string_hash.digest()
946 key = key[:key_size_bytes]
949 def decrypt_symmetric_encryption(self, data):
950 """Decrypt OpenPGP's Cipher Feedback mode"""
951 algorithm = self['symmetric-encryption-algorithm']
952 module = self._crypto_module[algorithm]
953 key_size = self._key_size[algorithm]
954 segment_size_bits = self._cipher_block_size[algorithm]
955 if segment_size_bits % 8:
956 raise NotImplementedError(
957 ('{}-bit segment size for {} is not an integer number of bytes'
958 ).format(segment_size_bits, algorithm))
959 segment_size_bytes = segment_size_bits // 8
960 padding = segment_size_bytes - len(data) % segment_size_bytes
962 data += b'\x00' * padding
963 if self.key and self.key._cache_passphrase and self.key._passphrase:
964 passphrase = self.key._passphrase
966 passphrase = _getpass.getpass(
967 'passphrase for {}: '.format(self['fingerprint'][-8:]))
968 passphrase = passphrase.encode('ascii')
969 if self.key and self.key._cache_passphrase:
970 self.key._passphrase = passphrase
971 key = self._string_to_key(string=passphrase, key_size=key_size)
974 mode=module.MODE_CFB,
975 IV=self['initial-vector'],
976 segment_size=segment_size_bits)
977 plaintext = cipher.decrypt(data)
979 plaintext = plaintext[:-padding]
982 def check_roundtrip(self):
983 serialized = self.to_bytes()
985 if serialized != source:
986 if len(serialized) != len(source):
988 ('serialized {} is {} bytes long, '
989 'but input is {} bytes long').format(
990 self['type'], len(serialized), len(source)))
992 for i in range(0, len(source), 8):
993 in_chunk = source[i: i + chunk_size]
994 out_chunk = serialized[i: i + chunk_size]
995 if in_chunk != out_chunk:
997 ('serialized {} differs from input packet: '
998 'at byte {}, {} != {}').format(
1000 ' '.join('{:02x}'.format(byte) for byte in out_chunk),
1001 ' '.join('{:02x}'.format(byte) for byte in in_chunk)))
1004 class PGPKey (object):
1005 """An OpenPGP key with public and private parts.
1009 OpenPGP users may transfer public keys. The essential elements
1010 of a transferable public key are as follows:
1012 - One Public-Key packet
1013 - Zero or more revocation signatures
1014 - One or more User ID packets
1015 - After each User ID packet, zero or more Signature packets
1017 - Zero or more User Attribute packets
1018 - After each User Attribute packet, zero or more Signature
1019 packets (certifications)
1020 - Zero or more Subkey packets
1021 - After each Subkey packet, one Signature packet, plus
1022 optionally a revocation
1024 Secret keys have a similar packet stream [2]:
1026 OpenPGP users may transfer secret keys. The format of a
1027 transferable secret key is the same as a transferable public key
1028 except that secret-key and secret-subkey packets are used
1029 instead of the public key and public-subkey packets.
1030 Implementations SHOULD include self-signatures on any user IDs
1031 and subkeys, as this allows for a complete public key to be
1032 automatically extracted from the transferable secret key.
1033 Implementations MAY choose to omit the self-signatures,
1034 especially if a transferable public key accompanies the
1035 transferable secret key.
1037 [1]: http://tools.ietf.org/search/rfc4880#section-11.1
1038 [2]: http://tools.ietf.org/search/rfc4880#section-11.2
1040 def __init__(self, fingerprint, cache_passphrase=False):
1041 self.fingerprint = fingerprint
1042 self._cache_passphrase = cache_passphrase
1043 self._passphrase = None
1044 self.public_packets = None
1045 self.secret_packets = None
1048 lines = ['key: {}'.format(self.fingerprint)]
1049 if self.public_packets:
1050 lines.append(' public:')
1051 for packet in self.public_packets:
1052 lines.extend(self._str_packet(packet=packet, prefix=' '))
1053 if self.secret_packets:
1054 lines.append(' secret:')
1055 for packet in self.secret_packets:
1056 lines.extend(self._str_packet(packet=packet, prefix=' '))
1057 return '\n'.join(lines)
1059 def _str_packet(self, packet, prefix):
1060 lines = str(packet).split('\n')
1061 return [prefix + line for line in lines]
1063 def import_from_gpg(self):
1064 key_export = _get_stdout(
1065 ['gpg', '--export', self.fingerprint])
1066 self.public_packets = list(
1067 self._packets_from_bytes(data=key_export))
1068 if self.public_packets[0]['type'] != 'public-key packet':
1070 '{} does not start with a public-key packet'.format(
1072 key_secret_export = _get_stdout(
1073 ['gpg', '--export-secret-keys', self.fingerprint])
1074 self.secret_packets = list(
1075 self._packets_from_bytes(data=key_secret_export))
1076 if self.secret_packets[0]['type'] != 'secret-key packet':
1078 '{} does not start with a secret-key packet'.format(
1080 for packet in self.public_packets + self.secret_packets:
1081 packet.check_roundtrip()
1083 def _packets_from_bytes(self, data):
1085 while offset < len(data):
1086 packet = PGPPacket(key=self)
1087 offset += packet.from_bytes(data=data[offset:])
1090 def export_to_gpg(self):
1091 raise NotImplemetedError('export to gpg')
1093 def import_from_key(self, key):
1094 """Migrate the (sub)keys into this key"""
1098 def migrate(old_key, new_key, cache_passphrase=False):
1099 """Add the old key and sub-keys to the new key
1101 For example, to upgrade your master key, while preserving old
1102 signatures you'd made. You will lose signature *on* your old key
1103 though, since sub-keys can't be signed (I don't think).
1105 old_key = PGPKey(fingerprint=old_key, cache_passphrase=cache_passphrase)
1106 old_key.import_from_gpg()
1107 new_key = PGPKey(fingerprint=new_key, cache_passphrase=cache_passphrase)
1108 new_key.import_from_gpg()
1109 new_key.import_from_key(key=old_key)
1115 if __name__ == '__main__':
1118 old_key, new_key = _sys.argv[1:3]
1119 migrate(old_key=old_key, new_key=new_key, cache_passphrase=True)