3 import getpass as _getpass
4 import hashlib as _hashlib
7 import subprocess as _subprocess
8 import struct as _struct
10 import Crypto.Cipher.AES as _crypto_cipher_aes
11 import Crypto.Cipher.Blowfish as _crypto_cipher_blowfish
12 import Crypto.Cipher.CAST as _crypto_cipher_cast
13 import Crypto.Cipher.DES3 as _crypto_cipher_des3
16 def _get_stdout(args, stdin=None):
19 stdin_pipe = _subprocess.PIPE
20 p = _subprocess.Popen(args, stdin=stdin_pipe, stdout=_subprocess.PIPE)
21 stdout, stderr = p.communicate(stdin)
24 raise RuntimeError(status)
28 class PGPPacket (dict):
29 # http://tools.ietf.org/search/rfc4880
30 _old_format_packet_length_type = { # type: (bytes, struct type)
31 0: (1, 'B'), # 1-byte unsigned integer
32 1: (2, 'H'), # 2-byte unsigned integer
33 2: (4, 'I'), # 4-byte unsigned integer
39 1: 'public-key encrypted session key packet',
40 2: 'signature packet',
41 3: 'symmetric-key encrypted session key packet',
42 4: 'one-pass signature packet',
43 5: 'secret-key packet',
44 6: 'public-key packet',
45 7: 'secret-subkey packet',
46 8: 'compressed data packet',
47 9: 'symmetrically encrypted data packet',
49 11: 'literal data packet',
52 14: 'public-subkey packet',
53 17: 'user attribute packet',
54 18: 'sym. encrypted and integrity protected data packet',
55 19: 'modification detection code packet',
62 _public_key_algorithms = {
63 1: 'rsa (encrypt or sign)',
64 2: 'rsa encrypt-only',
66 16: 'elgamal (encrypt-only)',
67 17: 'dsa (digital signature algorithm)',
68 18: 'reserved for elliptic curve',
69 19: 'reserved for ecdsa',
70 20: 'reserved (formerly elgamal encrypt or sign)',
71 21: 'reserved for diffie-hellman',
85 _symmetric_key_algorithms = {
86 0: 'plaintext or unencrypted data',
93 7: 'aes with 128-bit key',
94 8: 'aes with 192-bit key',
95 9: 'aes with 256-bit key',
110 _cipher_block_size = { # in bits
111 'aes with 128-bit key': 128,
112 'aes with 192-bit key': 128,
113 'aes with 256-bit key': 128,
118 'aes with 128-bit key': _crypto_cipher_aes,
119 'aes with 192-bit key': _crypto_cipher_aes,
120 'aes with 256-bit key': _crypto_cipher_aes,
121 'blowfish': _crypto_cipher_blowfish,
122 'cast5': _crypto_cipher_cast,
123 'tripledes': _crypto_cipher_des3,
126 _key_size = { # in bits
127 'aes with 128-bit key': 128,
128 'aes with 192-bit key': 192,
129 'aes with 256-bit key': 256,
133 _compression_algorithms = {
176 _hashlib_name = { # map OpenPGP-based names to hashlib names
179 'ripe-md/160': 'ripemd160',
186 _string_to_key_types = {
190 3: 'iterated and salted',
204 _string_to_key_expbias = 6
207 0x00: 'binary document',
208 0x01: 'canonical text document',
210 0x10: 'generic user id and public-key packet',
211 0x11: 'persona user id and public-key packet',
212 0x12: 'casual user id and public-key packet',
213 0x13: 'postitive user id and public-key packet',
214 0x18: 'subkey binding',
215 0x19: 'primary key binding',
217 0x20: 'key revocation',
218 0x28: 'subkey revocation',
219 0x30: 'certification revocation',
221 0x50: 'third-party confirmation',
224 _signature_subpacket_types = {
227 2: 'signature creation time',
228 3: 'signature expiration time',
229 4: 'exportable certification',
230 5: 'trust signature',
231 6: 'regular expression',
234 9: 'key expiration time',
235 10: 'placeholder for backward compatibility',
236 11: 'preferred symmetric algorithms',
237 12: 'revocation key',
246 21: 'preferred hash algorithms',
247 22: 'preferred compression algorithms',
248 23: 'key server preferences',
249 24: 'preferred key server',
250 25: 'primary user id',
253 28: 'signer user id',
254 29: 'reason for revocation',
256 31: 'signature target',
257 32: 'embedded signature',
271 _clean_type_regex = _re.compile('\W+')
273 def _clean_type(self, type=None):
276 return self._clean_type_regex.sub('_', type)
279 def _reverse(dict, value):
280 """Reverse lookups in dictionaries
282 >>> PGPPacket._reverse(PGPPacket._packet_types, 'public-key packet')
285 return [k for k,v in dict.items() if v == value][0]
288 method_name = '_str_{}'.format(self._clean_type())
289 method = getattr(self, method_name, None)
293 return '{}: {}'.format(self['type'], details)
295 def _str_public_key_packet(self):
296 return self._str_generic_key_packet()
298 def _str_public_subkey_packet(self):
299 return self._str_generic_key_packet()
301 def _str_generic_key_packet(self):
302 return self['fingerprint'][-8:].upper()
304 def _str_secret_key_packet(self):
305 return self._str_generic_secret_key_packet()
307 def _str_secret_subkey_packet(self):
308 return self._str_generic_secret_key_packet()
310 def _str_generic_secret_key_packet(self):
311 lines = [self._str_generic_key_packet()]
313 ('symmetric encryption',
314 'symmetric-encryption-algorithm'),
315 ('s2k hash', 'string-to-key-hash-algorithm'),
316 ('s2k count', 'string-to-key-count'),
317 ('s2k salt', 'string-to-key-salt'),
318 ('IV', 'initial-vector'),
322 if isinstance(value, bytes):
323 value = ' '.join('{:02x}'.format(byte) for byte in value)
324 lines.append(' {}: {}'.format(label, value))
325 return '\n'.join(lines)
327 def _str_signature_packet(self):
328 lines = [self['signature-type']]
329 if self['hashed-subpackets']:
330 lines.append(' hashed subpackets:')
331 lines.extend(self._str_signature_subpackets(
332 self['hashed-subpackets'], prefix=' '))
333 if self['unhashed-subpackets']:
334 lines.append(' unhashed subpackets:')
335 lines.extend(self._str_signature_subpackets(
336 self['unhashed-subpackets'], prefix=' '))
337 return '\n'.join(lines)
339 def _str_signature_subpackets(self, subpackets, prefix):
341 for subpacket in subpackets:
342 method_name = '_str_{}_signature_subpacket'.format(
343 self._clean_type(type=subpacket['type']))
344 method = getattr(self, method_name, None)
346 lines.append(' {}: {}'.format(
348 method(subpacket=subpacket)))
350 lines.append(' {}'.format(subpacket['type']))
353 def _str_signature_creation_time_signature_subpacket(self, subpacket):
354 return str(subpacket['signature-creation-time'])
356 def _str_issuer_signature_subpacket(self, subpacket):
357 return subpacket['issuer'][-8:].upper()
359 def _str_key_expiration_time_signature_subpacket(self, subpacket):
360 return str(subpacket['key-expiration-time'])
362 def _str_preferred_symmetric_algorithms_signature_subpacket(
365 algo for algo in subpacket['preferred-symmetric-algorithms'])
367 def _str_preferred_hash_algorithms_signature_subpacket(
370 algo for algo in subpacket['preferred-hash-algorithms'])
372 def _str_preferred_compression_algorithms_signature_subpacket(
375 algo for algo in subpacket['preferred-compression-algorithms'])
377 def _str_key_server_preferences_signature_subpacket(self, subpacket):
379 x for x in sorted(subpacket['key-server-preferences']))
381 def _str_primary_user_id_signature_subpacket(self, subpacket):
382 return str(subpacket['primary-user-id'])
384 def _str_key_flags_signature_subpacket(self, subpacket):
385 return ', '.join(x for x in sorted(subpacket['key-flags']))
387 def _str_features_signature_subpacket(self, subpacket):
388 return ', '.join(x for x in sorted(subpacket['features']))
390 def _str_embedded_signature_signature_subpacket(self, subpacket):
391 return subpacket['embedded']['signature-type']
393 def _str_user_id_packet(self):
396 def from_bytes(self, data):
397 offset = self._parse_header(data=data)
398 packet = data[offset:offset + self['length']]
399 if len(packet) < self['length']:
400 raise ValueError('packet too short ({} < {})'.format(
401 len(packet), self['length']))
402 offset += self['length']
403 method_name = '_parse_{}'.format(self._clean_type())
404 method = getattr(self, method_name, None)
406 raise NotImplementedError(
407 'cannot parse packet type {!r}'.format(self['type']))
411 def _parse_header(self, data):
414 always_one = packet_tag & 1 << 7
416 raise ValueError('most significant packet tag bit not set')
417 self['new-format'] = packet_tag & 1 << 6
418 if self['new-format']:
419 type_code = packet_tag & 0b111111
420 raise NotImplementedError('new-format packet length')
422 type_code = packet_tag >> 2 & 0b1111
423 self['length-type'] = packet_tag & 0b11
424 length_bytes, length_type = self._old_format_packet_length_type[
427 raise NotImplementedError(
428 'old-format packet of indeterminate length')
429 length_format = '>{}'.format(length_type)
430 length_data = data[offset: offset + length_bytes]
431 offset += length_bytes
432 self['length'] = _struct.unpack(length_format, length_data)[0]
433 self['type'] = self._packet_types[type_code]
437 def _parse_multiprecision_integer(data):
438 r"""Parse RFC 4880's multiprecision integers
440 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x01\x01')
442 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x09\x01\xff')
445 bits = _struct.unpack('>H', data[:2])[0]
447 length = (bits + 7) // 8
449 for i in range(length):
450 value += data[offset + i] * 1 << (8 * (length - i - 1))
452 return (offset, value)
455 def _decode_string_to_key_count(cls, data):
456 r"""Decode RFC 4880's string-to-key count
458 >>> PGPPacket._decode_string_to_key_count(b'\x97'[0])
461 return (16 + (data & 15)) << ((data >> 4) + cls._string_to_key_expbias)
463 def _parse_string_to_key_specifier(self, data):
464 self['string-to-key-type'] = self._string_to_key_types[data[0]]
466 if self['string-to-key-type'] == 'simple':
467 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
470 elif self['string-to-key-type'] == 'salted':
471 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
474 self['string-to-key-salt'] = data[offset: offset + 8]
476 elif self['string-to-key-type'] == 'iterated and salted':
477 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
480 self['string-to-key-salt'] = data[offset: offset + 8]
482 self['string-to-key-count'] = self._decode_string_to_key_count(
486 raise NotImplementedError(
487 'string-to-key type {}'.format(self['string-to-key-type']))
490 def _parse_public_key_packet(self, data):
491 self._parse_generic_public_key_packet(data=data)
493 def _parse_public_subkey_packet(self, data):
494 self._parse_generic_public_key_packet(data=data)
496 def _parse_generic_public_key_packet(self, data):
497 self['key-version'] = data[0]
499 if self['key-version'] != 4:
500 raise NotImplementedError(
501 'public (sub)key packet version {}'.format(
502 self['key-version']))
504 self['creation-time'], algorithm = _struct.unpack(
505 '>IB', data[offset: offset + length])
507 self['public-key-algorithm'] = self._public_key_algorithms[algorithm]
508 if self['public-key-algorithm'].startswith('rsa '):
509 o, self['public-modulus'] = self._parse_multiprecision_integer(
512 o, self['public-exponent'] = self._parse_multiprecision_integer(
515 elif self['public-key-algorithm'].startswith('dsa '):
516 o, self['prime'] = self._parse_multiprecision_integer(
519 o, self['group-order'] = self._parse_multiprecision_integer(
522 o, self['group-generator'] = self._parse_multiprecision_integer(
525 o, self['public-key'] = self._parse_multiprecision_integer(
528 elif self['public-key-algorithm'].startswith('elgamal '):
529 o, self['prime'] = self._parse_multiprecision_integer(
532 o, self['group-generator'] = self._parse_multiprecision_integer(
535 o, self['public-key'] = self._parse_multiprecision_integer(
539 raise NotImplementedError(
540 'algorithm-specific key fields for {}'.format(
541 self['public-key-algorithm']))
542 fingerprint = _hashlib.sha1()
543 fingerprint.update(b'\x99')
544 fingerprint.update(_struct.pack('>H', len(data)))
545 fingerprint.update(data)
546 self['fingerprint'] = fingerprint.hexdigest()
549 def _parse_secret_key_packet(self, data):
550 self._parse_generic_secret_key_packet(data=data)
552 def _parse_secret_subkey_packet(self, data):
553 self._parse_generic_secret_key_packet(data=data)
555 def _parse_generic_secret_key_packet(self, data):
556 offset = self._parse_generic_public_key_packet(data=data)
557 string_to_key_usage = data[offset]
559 if string_to_key_usage in [255, 254]:
560 self['symmetric-encryption-algorithm'] = (
561 self._symmetric_key_algorithms[data[offset]])
563 offset += self._parse_string_to_key_specifier(data=data[offset:])
565 self['symmetric-encryption-algorithm'] = (
566 self._symmetric_key_algorithms[string_to_key_usage])
567 if string_to_key_usage:
568 block_size_bits = self._cipher_block_size.get(
569 self['symmetric-encryption-algorithm'], None)
570 if block_size_bits % 8:
571 raise NotImplementedError(
572 ('{}-bit block size for {} is not an integer number of bytes'
574 block_size_bits, self['symmetric-encryption-algorithm']))
575 block_size = block_size_bits // 8
577 raise NotImplementedError(
578 'unknown block size for {}'.format(
579 self['symmetric-encryption-algorithm']))
580 self['initial-vector'] = data[offset: offset + block_size]
582 ciphertext = data[offset:]
583 offset += len(ciphertext)
584 decrypted_data = self.decrypt_symmetric_encryption(data=ciphertext)
586 decrypted_data = data[offset:key_end]
587 if string_to_key_usage in [0, 255]:
589 elif string_to_key_usage == 254:
593 secret_key = decrypted_data[:key_end]
595 secret_key_checksum = decrypted_data[key_end:]
597 calculated_checksum = sum(secret_key) % 65536
599 checksum_hash = _hashlib.sha1()
600 checksum_hash.update(secret_key)
601 calculated_checksum = checksum_hash.digest()
602 if secret_key_checksum != calculated_checksum:
604 'corrupt secret key (checksum {} != expected {})'.format(
605 secret_key_checksum, calculated_checksum))
606 self['secret-key'] = secret_key
608 def _parse_signature_subpackets(self, data):
610 while offset < len(data):
611 o, subpacket = self._parse_signature_subpacket(data=data[offset:])
615 def _parse_signature_subpacket(self, data):
621 elif first >= 192 and first < 255:
622 second = data[offset]
624 length = ((first - 192) << 8) + second + 192
626 length = _struct.unpack(
627 '>I', data[offset: offset + 4])[0]
629 subpacket['type'] = self._signature_subpacket_types[data[offset]]
631 subpacket_data = data[offset: offset + length - 1]
632 offset += len(subpacket_data)
633 method_name = '_parse_{}_signature_subpacket'.format(
634 self._clean_type(type=subpacket['type']))
635 method = getattr(self, method_name, None)
637 raise NotImplementedError(
638 'cannot parse signature subpacket type {!r}'.format(
640 method(data=subpacket_data, subpacket=subpacket)
641 return (offset, subpacket)
643 def _parse_signature_packet(self, data):
644 self['signature-version'] = data[0]
646 if self['signature-version'] != 4:
647 raise NotImplementedError(
648 'signature packet version {}'.format(
649 self['signature-version']))
650 self['signature-type'] = self._signature_types[data[offset]]
652 self['public-key-algorithm'] = self._public_key_algorithms[
655 self['hash-algorithm'] = self._hash_algorithms[data[offset]]
657 hashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
659 self['hashed-subpackets'] = list(self._parse_signature_subpackets(
660 data[offset: offset + hashed_count]))
661 offset += hashed_count
662 unhashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
664 self['unhashed-subpackets'] = list(self._parse_signature_subpackets(
665 data=data[offset: offset + unhashed_count]))
666 offset += unhashed_count
667 self['signed-hash-word'] = data[offset: offset + 2]
669 self['signature'] = data[offset:]
671 def _parse_signature_creation_time_signature_subpacket(
672 self, data, subpacket):
673 subpacket['signature-creation-time'] = _struct.unpack('>I', data)[0]
675 def _parse_issuer_signature_subpacket(self, data, subpacket):
676 subpacket['issuer'] = ''.join('{:02x}'.format(byte) for byte in data)
678 def _parse_key_expiration_time_signature_subpacket(
679 self, data, subpacket):
680 subpacket['key-expiration-time'] = _struct.unpack('>I', data)[0]
682 def _parse_preferred_symmetric_algorithms_signature_subpacket(
683 self, data, subpacket):
684 subpacket['preferred-symmetric-algorithms'] = [
685 self._symmetric_key_algorithms[d] for d in data]
687 def _parse_preferred_hash_algorithms_signature_subpacket(
688 self, data, subpacket):
689 subpacket['preferred-hash-algorithms'] = [
690 self._hash_algorithms[d] for d in data]
692 def _parse_preferred_compression_algorithms_signature_subpacket(
693 self, data, subpacket):
694 subpacket['preferred-compression-algorithms'] = [
695 self._compression_algorithms[d] for d in data]
697 def _parse_key_server_preferences_signature_subpacket(
698 self, data, subpacket):
699 subpacket['key-server-preferences'] = set()
701 subpacket['key-server-preferences'].add('no-modify')
703 def _parse_primary_user_id_signature_subpacket(self, data, subpacket):
704 subpacket['primary-user-id'] = bool(data[0])
706 def _parse_key_flags_signature_subpacket(self, data, subpacket):
707 subpacket['key-flags'] = set()
709 subpacket['key-flags'].add('can certify')
711 subpacket['key-flags'].add('can sign')
713 subpacket['key-flags'].add('can encrypt communications')
715 subpacket['key-flags'].add('can encrypt storage')
717 subpacket['key-flags'].add('private split')
719 subpacket['key-flags'].add('can authenticate')
721 subpacket['key-flags'].add('private shared')
723 def _parse_features_signature_subpacket(self, data, subpacket):
724 subpacket['features'] = set()
726 subpacket['features'].add('modification detection')
728 def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
729 subpacket['embedded'] = PGPPacket()
730 subpacket['embedded']._parse_signature_packet(data=data)
732 def _parse_user_id_packet(self, data):
733 self['user'] = str(data, 'utf-8')
736 method_name = '_serialize_{}'.format(self._clean_type())
737 method = getattr(self, method_name, None)
739 raise NotImplementedError(
740 'cannot serialize packet type {!r}'.format(self['type']))
742 self['length'] = len(body)
744 self._serialize_header(),
748 def _serialize_header(self):
751 type_code = self._reverse(self._packet_types, self['type'])
753 always_one * (1 << 7) |
754 new_format * (1 << 6) |
755 type_code * (1 << 2) |
758 length_bytes, length_type = self._old_format_packet_length_type[
760 length_format = '>{}'.format(length_type)
761 length_data = _struct.pack(length_format, self['length'])
768 def _serialize_multiprecision_integer(integer):
769 r"""Serialize RFC 4880's multipricision integers
771 >>> PGPPacket._serialize_multiprecision_integer(1)
773 >>> PGPPacket._serialize_multiprecision_integer(511)
776 bit_length = int(_math.log(integer, 2)) + 1
778 _struct.pack('>H', bit_length),
781 chunks.insert(1, bytes([integer & 0xff]))
782 integer = integer >> 8
783 return b''.join(chunks)
786 def _encode_string_to_key_count(cls, count):
787 r"""Encode RFC 4880's string-to-key count
789 >>> PGPPacket._encode_string_to_key_count(753664)
793 count = count >> cls._string_to_key_expbias
796 coded_count += 1 << 4
797 coded_count += count & 15
798 return bytes([coded_count])
800 def _serialize_string_to_key_specifier(self):
801 string_to_key_type = bytes([
803 self._string_to_key_types, self['string-to-key-type']),
805 chunks = [string_to_key_type]
806 if self['string-to-key-type'] == 'simple':
807 chunks.append(bytes([self._reverse(
808 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
809 elif self['string-to-key-type'] == 'salted':
810 chunks.append(bytes([self._reverse(
811 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
812 chunks.append(self['string-to-key-salt'])
813 elif self['string-to-key-type'] == 'iterated and salted':
814 chunks.append(bytes([self._reverse(
815 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
816 chunks.append(self['string-to-key-salt'])
817 chunks.append(self._encode_string_to_key_count(
818 count=self['string-to-key-count']))
820 raise NotImplementedError(
821 'string-to-key type {}'.format(self['string-to-key-type']))
823 return b''.join(chunks)
825 def _serialize_public_key_packet(self):
826 return self._serialize_generic_public_key_packet()
828 def _serialize_public_subkey_packet(self):
829 return self._serialize_generic_public_key_packet()
831 def _serialize_generic_public_key_packet(self):
832 key_version = bytes([self['key-version']])
833 chunks = [key_version]
834 if self['key-version'] != 4:
835 raise NotImplementedError(
836 'public (sub)key packet version {}'.format(
837 self['key-version']))
838 chunks.append(_struct.pack('>I', self['creation-time']))
839 chunks.append(bytes([self._reverse(
840 self._public_key_algorithms, self['public-key-algorithm'])]))
841 if self['public-key-algorithm'].startswith('rsa '):
842 chunks.append(self._serialize_multiprecision_integer(
843 self['public-modulus']))
844 chunks.append(self._serialize_multiprecision_integer(
845 self['public-exponent']))
846 elif self['public-key-algorithm'].startswith('dsa '):
847 chunks.append(self._serialize_multiprecision_integer(
849 chunks.append(self._serialize_multiprecision_integer(
850 self['group-order']))
851 chunks.append(self._serialize_multiprecision_integer(
852 self['group-generator']))
853 chunks.append(self._serialize_multiprecision_integer(
855 elif self['public-key-algorithm'].startswith('elgamal '):
856 chunks.append(self._serialize_multiprecision_integer(
858 chunks.append(self._serialize_multiprecision_integer(
859 self['group-generator']))
860 chunks.append(self._serialize_multiprecision_integer(
863 raise NotImplementedError(
864 'algorithm-specific key fields for {}'.format(
865 self['public-key-algorithm']))
866 return b''.join(chunks)
868 def _string_to_key(self, string, key_size):
871 '{}-bit key is not an integer number of bytes'.format(
873 key_size_bytes = key_size // 8
874 hash_name = self._hashlib_name[
875 self['string-to-key-hash-algorithm']]
876 string_hash = _hashlib.new(hash_name)
877 hashes = _math.ceil(key_size_bytes / string_hash.digest_size)
879 if self['string-to-key-type'] == 'simple':
880 update_bytes = string
881 elif self['string-to-key-type'] in [
883 'iterated and salted',
885 update_bytes = self['string-to-key-salt'] + string
886 if self['string-to-key-type'] == 'iterated and salted':
887 count = self['string-to-key-count']
888 if count < len(update_bytes):
889 count = len(update_bytes)
891 raise NotImplementedError(
892 'key calculation for string-to-key type {}'.format(
893 self['string-to-key-type']))
894 for padding in range(hashes):
895 string_hash = _hashlib.new(hash_name)
896 string_hash.update(padding * b'\x00')
897 if self['string-to-key-type'] in [
901 string_hash.update(update_bytes)
902 elif self['string-to-key-type'] == 'iterated and salted':
905 string_hash.update(update_bytes[:remaining])
906 remaining -= len(update_bytes)
907 key += string_hash.digest()
908 key = key[:key_size_bytes]
911 def decrypt_symmetric_encryption(self, data):
912 """Decrypt OpenPGP's Cipher Feedback mode"""
913 algorithm = self['symmetric-encryption-algorithm']
914 module = self._crypto_module[algorithm]
915 key_size = self._key_size[algorithm]
916 segment_size_bits = self._cipher_block_size[algorithm]
917 if segment_size_bits % 8:
918 raise NotImplementedError(
919 ('{}-bit segment size for {} is not an integer number of bytes'
920 ).format(segment_size_bits, algorithm))
921 segment_size_bytes = segment_size_bits // 8
922 padding = segment_size_bytes - len(data) % segment_size_bytes
924 data += b'\x00' * padding
925 passphrase = _getpass.getpass(
926 'passphrase for {}: '.format(self['fingerprint'][-8:]))
927 passphrase = passphrase.encode('ascii')
928 key = self._string_to_key(string=passphrase, key_size=key_size)
931 mode=module.MODE_CFB,
932 IV=self['initial-vector'],
933 segment_size=segment_size_bits)
934 plaintext = cipher.decrypt(data)
936 plaintext = plaintext[:-padding]
940 def packets_from_bytes(data):
942 while offset < len(data):
944 offset += packet.from_bytes(data=data[offset:])
948 class PGPKey (object):
949 """An OpenPGP key with public and private parts.
953 OpenPGP users may transfer public keys. The essential elements
954 of a transferable public key are as follows:
956 - One Public-Key packet
957 - Zero or more revocation signatures
958 - One or more User ID packets
959 - After each User ID packet, zero or more Signature packets
961 - Zero or more User Attribute packets
962 - After each User Attribute packet, zero or more Signature
963 packets (certifications)
964 - Zero or more Subkey packets
965 - After each Subkey packet, one Signature packet, plus
966 optionally a revocation
968 Secret keys have a similar packet stream [2]:
970 OpenPGP users may transfer secret keys. The format of a
971 transferable secret key is the same as a transferable public key
972 except that secret-key and secret-subkey packets are used
973 instead of the public key and public-subkey packets.
974 Implementations SHOULD include self-signatures on any user IDs
975 and subkeys, as this allows for a complete public key to be
976 automatically extracted from the transferable secret key.
977 Implementations MAY choose to omit the self-signatures,
978 especially if a transferable public key accompanies the
979 transferable secret key.
981 [1]: http://tools.ietf.org/search/rfc4880#section-11.1
982 [2]: http://tools.ietf.org/search/rfc4880#section-11.2
984 def __init__(self, fingerprint):
985 self.fingerprint = fingerprint
986 self.public_packets = None
987 self.secret_packets = None
990 lines = ['key: {}'.format(self.fingerprint)]
991 if self.public_packets:
992 lines.append(' public:')
993 for packet in self.public_packets:
994 lines.extend(self._str_packet(packet=packet, prefix=' '))
995 if self.secret_packets:
996 lines.append(' secret:')
997 for packet in self.secret_packets:
998 lines.extend(self._str_packet(packet=packet, prefix=' '))
999 return '\n'.join(lines)
1001 def _str_packet(self, packet, prefix):
1002 lines = str(packet).split('\n')
1003 return [prefix + line for line in lines]
1005 def import_from_gpg(self):
1006 key_export = _get_stdout(
1007 ['gpg', '--export', self.fingerprint])
1008 self.public_packets = list(
1009 packets_from_bytes(data=key_export))
1010 if self.public_packets[0]['type'] != 'public-key packet':
1012 '{} does not start with a public-key packet'.format(
1014 key_secret_export = _get_stdout(
1015 ['gpg', '--export-secret-keys', self.fingerprint])
1016 self.secret_packets = list(
1017 packets_from_bytes(data=key_secret_export))
1018 if self.secret_packets[0]['type'] != 'secret-key packet':
1020 '{} does not start with a secret-key packet'.format(
1023 def export_to_gpg(self):
1024 raise NotImplemetedError('export to gpg')
1026 def import_from_key(self, key):
1027 """Migrate the (sub)keys into this key"""
1031 def migrate(old_key, new_key):
1032 """Add the old key and sub-keys to the new key
1034 For example, to upgrade your master key, while preserving old
1035 signatures you'd made. You will lose signature *on* your old key
1036 though, since sub-keys can't be signed (I don't think).
1038 old_key = PGPKey(fingerprint=old_key)
1039 old_key.import_from_gpg()
1040 new_key = PGPKey(fingerprint=new_key)
1041 new_key.import_from_gpg()
1042 new_key.import_from_key(key=old_key)
1048 if __name__ == '__main__':
1051 old_key, new_key = _sys.argv[1:3]
1052 migrate(old_key=old_key, new_key=new_key)