3 import getpass as _getpass
4 import hashlib as _hashlib
7 import subprocess as _subprocess
8 import struct as _struct
11 def _get_stdout(args, stdin=None):
14 stdin_pipe = _subprocess.PIPE
15 p = _subprocess.Popen(args, stdin=stdin_pipe, stdout=_subprocess.PIPE)
16 stdout, stderr = p.communicate(stdin)
19 raise RuntimeError(status)
23 class PGPPacket (dict):
24 # http://tools.ietf.org/search/rfc4880
25 _old_format_packet_length_type = { # type: (bytes, struct type)
26 0: (1, 'B'), # 1-byte unsigned integer
27 1: (2, 'H'), # 2-byte unsigned integer
28 2: (4, 'I'), # 4-byte unsigned integer
34 1: 'public-key encrypted session key packet',
35 2: 'signature packet',
36 3: 'symmetric-key encrypted session key packet',
37 4: 'one-pass signature packet',
38 5: 'secret-key packet',
39 6: 'public-key packet',
40 7: 'secret-subkey packet',
41 8: 'compressed data packet',
42 9: 'symmetrically encrypted data packet',
44 11: 'literal data packet',
47 14: 'public-subkey packet',
48 17: 'user attribute packet',
49 18: 'sym. encrypted and integrity protected data packet',
50 19: 'modification detection code packet',
57 _public_key_algorithms = {
58 1: 'rsa (encrypt or sign)',
59 2: 'rsa encrypt-only',
61 16: 'elgamal (encrypt-only)',
62 17: 'dsa (digital signature algorithm)',
63 18: 'reserved for elliptic curve',
64 19: 'reserved for ecdsa',
65 20: 'reserved (formerly elgamal encrypt or sign)',
66 21: 'reserved for diffie-hellman',
80 _symmetric_key_algorithms = {
81 0: 'plaintext or unencrypted data',
88 7: 'aes with 128-bit key',
89 8: 'aes with 192-bit key',
90 9: 'aes with 256-bit key',
105 _cipher_block_size = { # in bits
106 'aes with 128-bit key': 128,
107 'aes with 192-bit key': 128,
108 'aes with 256-bit key': 128,
112 _compression_algorithms = {
155 _string_to_key_types = {
159 3: 'iterated and salted',
174 0x00: 'binary document',
175 0x01: 'canonical text document',
177 0x10: 'generic user id and public-key packet',
178 0x11: 'persona user id and public-key packet',
179 0x12: 'casual user id and public-key packet',
180 0x13: 'postitive user id and public-key packet',
181 0x18: 'subkey binding',
182 0x19: 'primary key binding',
184 0x20: 'key revocation',
185 0x28: 'subkey revocation',
186 0x30: 'certification revocation',
188 0x50: 'third-party confirmation',
191 _signature_subpacket_types = {
194 2: 'signature creation time',
195 3: 'signature expiration time',
196 4: 'exportable certification',
197 5: 'trust signature',
198 6: 'regular expression',
201 9: 'key expiration time',
202 10: 'placeholder for backward compatibility',
203 11: 'preferred symmetric algorithms',
204 12: 'revocation key',
213 21: 'preferred hash algorithms',
214 22: 'preferred compression algorithms',
215 23: 'key server preferences',
216 24: 'preferred key server',
217 25: 'primary user id',
220 28: 'signer user id',
221 29: 'reason for revocation',
223 31: 'signature target',
224 32: 'embedded signature',
238 _clean_type_regex = _re.compile('\W+')
240 def _clean_type(self, type=None):
243 return self._clean_type_regex.sub('_', type)
246 def _reverse(dict, value):
247 """Reverse lookups in dictionaries
249 >>> PGPPacket._reverse(PGPPacket._packet_types, 'public-key packet')
252 return [k for k,v in dict.items() if v == value][0]
255 method_name = '_str_{}'.format(self._clean_type())
256 method = getattr(self, method_name, None)
260 return '{}: {}'.format(self['type'], details)
262 def _str_public_key_packet(self):
263 return self._str_generic_key_packet()
265 def _str_public_subkey_packet(self):
266 return self._str_generic_key_packet()
268 def _str_secret_key_packet(self):
269 return self._str_generic_key_packet()
271 def _str_secret_subkey_packet(self):
272 return self._str_generic_key_packet()
274 def _str_generic_key_packet(self):
275 return self['fingerprint'][-8:].upper()
277 def _str_signature_packet(self):
278 lines = [self['signature-type']]
279 if self['hashed-subpackets']:
280 lines.append(' hashed subpackets:')
281 lines.extend(self._str_signature_subpackets(
282 self['hashed-subpackets'], prefix=' '))
283 if self['unhashed-subpackets']:
284 lines.append(' unhashed subpackets:')
285 lines.extend(self._str_signature_subpackets(
286 self['unhashed-subpackets'], prefix=' '))
287 return '\n'.join(lines)
289 def _str_signature_subpackets(self, subpackets, prefix):
291 for subpacket in subpackets:
292 method_name = '_str_{}_signature_subpacket'.format(
293 self._clean_type(type=subpacket['type']))
294 method = getattr(self, method_name, None)
296 lines.append(' {}: {}'.format(
298 method(subpacket=subpacket)))
300 lines.append(' {}'.format(subpacket['type']))
303 def _str_signature_creation_time_signature_subpacket(self, subpacket):
304 return str(subpacket['signature-creation-time'])
306 def _str_issuer_signature_subpacket(self, subpacket):
307 return subpacket['issuer'][-8:].upper()
309 def _str_key_expiration_time_signature_subpacket(self, subpacket):
310 return str(subpacket['key-expiration-time'])
312 def _str_preferred_symmetric_algorithms_signature_subpacket(
315 algo for algo in subpacket['preferred-symmetric-algorithms'])
317 def _str_preferred_hash_algorithms_signature_subpacket(
320 algo for algo in subpacket['preferred-hash-algorithms'])
322 def _str_preferred_compression_algorithms_signature_subpacket(
325 algo for algo in subpacket['preferred-compression-algorithms'])
327 def _str_key_server_preferences_signature_subpacket(self, subpacket):
329 x for x in sorted(subpacket['key-server-preferences']))
331 def _str_primary_user_id_signature_subpacket(self, subpacket):
332 return str(subpacket['primary-user-id'])
334 def _str_key_flags_signature_subpacket(self, subpacket):
335 return ', '.join(x for x in sorted(subpacket['key-flags']))
337 def _str_features_signature_subpacket(self, subpacket):
338 return ', '.join(x for x in sorted(subpacket['features']))
340 def _str_embedded_signature_signature_subpacket(self, subpacket):
341 return subpacket['embedded']['signature-type']
343 def _str_user_id_packet(self):
346 def from_bytes(self, data):
347 offset = self._parse_header(data=data)
348 packet = data[offset:offset + self['length']]
349 if len(packet) < self['length']:
350 raise ValueError('packet too short ({} < {})'.format(
351 len(packet), self['length']))
352 offset += self['length']
353 method_name = '_parse_{}'.format(self._clean_type())
354 method = getattr(self, method_name, None)
356 raise NotImplementedError(
357 'cannot parse packet type {!r}'.format(self['type']))
361 def _parse_header(self, data):
364 always_one = packet_tag & 1 << 7
366 raise ValueError('most significant packet tag bit not set')
367 self['new-format'] = packet_tag & 1 << 6
368 if self['new-format']:
369 type_code = packet_tag & 0b111111
370 raise NotImplementedError('new-format packet length')
372 type_code = packet_tag >> 2 & 0b1111
373 self['length-type'] = packet_tag & 0b11
374 length_bytes, length_type = self._old_format_packet_length_type[
377 raise NotImplementedError(
378 'old-format packet of indeterminate length')
379 length_format = '>{}'.format(length_type)
380 length_data = data[offset: offset + length_bytes]
381 offset += length_bytes
382 self['length'] = _struct.unpack(length_format, length_data)[0]
383 self['type'] = self._packet_types[type_code]
387 def _parse_multiprecision_integer(data):
388 r"""Parse RFC 4880's multiprecision integers
390 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x01\x01')
392 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x09\x01\xff')
395 bits = _struct.unpack('>H', data[:2])[0]
397 length = (bits + 7) // 8
399 for i in range(length):
400 value += data[offset + i] * 1 << (8 * (length - i - 1))
402 return (offset, value)
404 def _parse_string_to_key_specifier(self, data):
405 self['string-to-key-type'] = self._string_to_key_types[data[0]]
407 if self['string-to-key-type'] == 'simple':
408 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
411 elif self['string-to-key-type'] == 'salted':
412 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
415 self['string-to-key-salt'] = data[offset: offset + 8]
417 elif self['string-to-key-type'] == 'iterated and salted':
418 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
421 self['string-to-key-salt'] = data[offset: offset + 8]
423 self['string-to-key-coded-count'] = data[offset]
426 raise NotImplementedError(
427 'string-to-key type {}'.format(self['string-to-key-type']))
430 def _parse_public_key_packet(self, data):
431 self._parse_generic_public_key_packet(data=data)
433 def _parse_public_subkey_packet(self, data):
434 self._parse_generic_public_key_packet(data=data)
436 def _parse_generic_public_key_packet(self, data):
437 self['key-version'] = data[0]
439 if self['key-version'] != 4:
440 raise NotImplementedError(
441 'public (sub)key packet version {}'.format(
442 self['key-version']))
444 self['creation-time'], algorithm = _struct.unpack(
445 '>IB', data[offset: offset + length])
447 self['public-key-algorithm'] = self._public_key_algorithms[algorithm]
448 if self['public-key-algorithm'].startswith('rsa '):
449 o, self['public-modulus'] = self._parse_multiprecision_integer(
452 o, self['public-exponent'] = self._parse_multiprecision_integer(
455 elif self['public-key-algorithm'].startswith('dsa '):
456 o, self['prime'] = self._parse_multiprecision_integer(
459 o, self['group-order'] = self._parse_multiprecision_integer(
462 o, self['group-generator'] = self._parse_multiprecision_integer(
465 o, self['public-key'] = self._parse_multiprecision_integer(
468 elif self['public-key-algorithm'].startswith('elgamal '):
469 o, self['prime'] = self._parse_multiprecision_integer(
472 o, self['group-generator'] = self._parse_multiprecision_integer(
475 o, self['public-key'] = self._parse_multiprecision_integer(
479 raise NotImplementedError(
480 'algorithm-specific key fields for {}'.format(
481 self['public-key-algorithm']))
482 fingerprint = _hashlib.sha1()
483 fingerprint.update(b'\x99')
484 fingerprint.update(_struct.pack('>H', len(data)))
485 fingerprint.update(data)
486 self['fingerprint'] = fingerprint.hexdigest()
489 def _parse_secret_key_packet(self, data):
490 self._parse_generic_secret_key_packet(data=data)
492 def _parse_secret_subkey_packet(self, data):
493 self._parse_generic_secret_key_packet(data=data)
495 def _parse_generic_secret_key_packet(self, data):
496 offset = self._parse_generic_public_key_packet(data=data)
497 string_to_key_usage = data[offset]
499 if string_to_key_usage in [255, 254]:
500 self['symmetric-encryption-algorithm'] = (
501 self._symmetric_key_algorithms[data[offset]])
503 offset += self._parse_string_to_key_specifier(data=data[offset:])
505 self['symmetric-encryption-algorithm'] = (
506 self._symmetric_key_algorithms[string_to_key_usage])
507 if string_to_key_usage:
508 block_size_bits = self._cipher_block_size.get(
509 self['symmetric-encryption-algorithm'], None)
510 if block_size_bits % 8:
511 raise NotImplementedError(
512 ('{}-bit block size for {} is not an integer number of bytes'
514 block_size_bits, self['symmetric-encryption-algorithm']))
515 block_size = block_size_bits // 8
517 raise NotImplementedError(
518 'unknown block size for {}'.format(
519 self['symmetric-encryption-algorithm']))
520 self['initial-vector'] = data[offset: offset + block_size]
522 ciphertext = data[offset:]
523 offset += len(ciphertext)
524 decrypted_data = self.decrypt_symmetric_encryption(data=ciphertext)
526 decrypted_data = data[offset:key_end]
527 if string_to_key_usage in [0, 255]:
529 elif string_to_key_usage == 254:
533 secret_key = decrypted_data[:key_end]
535 secret_key_checksum = decrypted_data[key_end:]
537 calculated_checksum = sum(secret_key) % 65536
539 checksum_hash = _hashlib.sha1()
540 checksum_hash.update(secret_key)
541 calculated_checksum = checksum_hash.digest()
542 if secret_key_checksum != calculated_checksum:
544 'corrupt secret key (checksum {} != expected {})'.format(
545 secret_key_checksum, calculated_checksum))
546 self['secret-key'] = secret_key
548 def _parse_signature_subpackets(self, data):
550 while offset < len(data):
551 o, subpacket = self._parse_signature_subpacket(data=data[offset:])
555 def _parse_signature_subpacket(self, data):
561 elif first >= 192 and first < 255:
562 second = data[offset]
564 length = ((first - 192) << 8) + second + 192
566 length = _struct.unpack(
567 '>I', data[offset: offset + 4])[0]
569 subpacket['type'] = self._signature_subpacket_types[data[offset]]
571 subpacket_data = data[offset: offset + length - 1]
572 offset += len(subpacket_data)
573 method_name = '_parse_{}_signature_subpacket'.format(
574 self._clean_type(type=subpacket['type']))
575 method = getattr(self, method_name, None)
577 raise NotImplementedError(
578 'cannot parse signature subpacket type {!r}'.format(
580 method(data=subpacket_data, subpacket=subpacket)
581 return (offset, subpacket)
583 def _parse_signature_packet(self, data):
584 self['signature-version'] = data[0]
586 if self['signature-version'] != 4:
587 raise NotImplementedError(
588 'signature packet version {}'.format(
589 self['signature-version']))
590 self['signature-type'] = self._signature_types[data[offset]]
592 self['public-key-algorithm'] = self._public_key_algorithms[
595 self['hash-algorithm'] = self._hash_algorithms[data[offset]]
597 hashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
599 self['hashed-subpackets'] = list(self._parse_signature_subpackets(
600 data[offset: offset + hashed_count]))
601 offset += hashed_count
602 unhashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
604 self['unhashed-subpackets'] = list(self._parse_signature_subpackets(
605 data=data[offset: offset + unhashed_count]))
606 offset += unhashed_count
607 self['signed-hash-word'] = data[offset: offset + 2]
609 self['signature'] = data[offset:]
611 def _parse_signature_creation_time_signature_subpacket(
612 self, data, subpacket):
613 subpacket['signature-creation-time'] = _struct.unpack('>I', data)[0]
615 def _parse_issuer_signature_subpacket(self, data, subpacket):
616 subpacket['issuer'] = ''.join('{:02x}'.format(byte) for byte in data)
618 def _parse_key_expiration_time_signature_subpacket(
619 self, data, subpacket):
620 subpacket['key-expiration-time'] = _struct.unpack('>I', data)[0]
622 def _parse_preferred_symmetric_algorithms_signature_subpacket(
623 self, data, subpacket):
624 subpacket['preferred-symmetric-algorithms'] = [
625 self._symmetric_key_algorithms[d] for d in data]
627 def _parse_preferred_hash_algorithms_signature_subpacket(
628 self, data, subpacket):
629 subpacket['preferred-hash-algorithms'] = [
630 self._hash_algorithms[d] for d in data]
632 def _parse_preferred_compression_algorithms_signature_subpacket(
633 self, data, subpacket):
634 subpacket['preferred-compression-algorithms'] = [
635 self._compression_algorithms[d] for d in data]
637 def _parse_key_server_preferences_signature_subpacket(
638 self, data, subpacket):
639 subpacket['key-server-preferences'] = set()
641 subpacket['key-server-preferences'].add('no-modify')
643 def _parse_primary_user_id_signature_subpacket(self, data, subpacket):
644 subpacket['primary-user-id'] = bool(data[0])
646 def _parse_key_flags_signature_subpacket(self, data, subpacket):
647 subpacket['key-flags'] = set()
649 subpacket['key-flags'].add('can certify')
651 subpacket['key-flags'].add('can sign')
653 subpacket['key-flags'].add('can encrypt communications')
655 subpacket['key-flags'].add('can encrypt storage')
657 subpacket['key-flags'].add('private split')
659 subpacket['key-flags'].add('can authenticate')
661 subpacket['key-flags'].add('private shared')
663 def _parse_features_signature_subpacket(self, data, subpacket):
664 subpacket['features'] = set()
666 subpacket['features'].add('modification detection')
668 def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
669 subpacket['embedded'] = PGPPacket()
670 subpacket['embedded']._parse_signature_packet(data=data)
672 def _parse_user_id_packet(self, data):
673 self['user'] = str(data, 'utf-8')
676 method_name = '_serialize_{}'.format(self._clean_type())
677 method = getattr(self, method_name, None)
679 raise NotImplementedError(
680 'cannot serialize packet type {!r}'.format(self['type']))
682 self['length'] = len(body)
684 self._serialize_header(),
688 def _serialize_header(self):
691 type_code = self._reverse(self._packet_types, self['type'])
693 always_one * (1 << 7) |
694 new_format * (1 << 6) |
695 type_code * (1 << 2) |
698 length_bytes, length_type = self._old_format_packet_length_type[
700 length_format = '>{}'.format(length_type)
701 length_data = _struct.pack(length_format, self['length'])
708 def _serialize_multiprecision_integer(integer):
709 r"""Serialize RFC 4880's multipricision integers
711 >>> PGPPacket._serialize_multiprecision_integer(1)
713 >>> PGPPacket._serialize_multiprecision_integer(511)
716 bit_length = int(_math.log(integer, 2)) + 1
718 _struct.pack('>H', bit_length),
721 chunks.insert(1, bytes([integer & 0xff]))
722 integer = integer >> 8
723 return b''.join(chunks)
725 def _serialize_string_to_key_specifier(self):
726 string_to_key_type = bytes([
728 self._string_to_key_types, self['string-to-key-type']),
730 chunks = [string_to_key_type]
731 if self['string-to-key-type'] == 'simple':
732 chunks.append(bytes([self._reverse(
733 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
734 elif self['string-to-key-type'] == 'salted':
735 chunks.append(bytes([self._reverse(
736 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
737 chunks.append(self['string-to-key-salt'])
738 elif self['string-to-key-type'] == 'iterated and salted':
739 chunks.append(bytes([self._reverse(
740 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
741 chunks.append(self['string-to-key-salt'])
742 chunks.append(bytes([self['string-to-key-coded-count']]))
744 raise NotImplementedError(
745 'string-to-key type {}'.format(self['string-to-key-type']))
747 return b''.join(chunks)
749 def _serialize_public_key_packet(self):
750 return self._serialize_generic_public_key_packet()
752 def _serialize_public_subkey_packet(self):
753 return self._serialize_generic_public_key_packet()
755 def _serialize_generic_public_key_packet(self):
756 key_version = bytes([self['key-version']])
757 chunks = [key_version]
758 if self['key-version'] != 4:
759 raise NotImplementedError(
760 'public (sub)key packet version {}'.format(
761 self['key-version']))
762 chunks.append(_struct.pack('>I', self['creation-time']))
763 chunks.append(bytes([self._reverse(
764 self._public_key_algorithms, self['public-key-algorithm'])]))
765 if self['public-key-algorithm'].startswith('rsa '):
766 chunks.append(self._serialize_multiprecision_integer(
767 self['public-modulus']))
768 chunks.append(self._serialize_multiprecision_integer(
769 self['public-exponent']))
770 elif self['public-key-algorithm'].startswith('dsa '):
771 chunks.append(self._serialize_multiprecision_integer(
773 chunks.append(self._serialize_multiprecision_integer(
774 self['group-order']))
775 chunks.append(self._serialize_multiprecision_integer(
776 self['group-generator']))
777 chunks.append(self._serialize_multiprecision_integer(
779 elif self['public-key-algorithm'].startswith('elgamal '):
780 chunks.append(self._serialize_multiprecision_integer(
782 chunks.append(self._serialize_multiprecision_integer(
783 self['group-generator']))
784 chunks.append(self._serialize_multiprecision_integer(
787 raise NotImplementedError(
788 'algorithm-specific key fields for {}'.format(
789 self['public-key-algorithm']))
790 return b''.join(chunks)
792 def decrypt_symmetric_encryption(self, data):
793 raise NotImplementedError('decrypt symmetric encryption')
796 def packets_from_bytes(data):
798 while offset < len(data):
800 offset += packet.from_bytes(data=data[offset:])
804 class PGPKey (object):
805 """An OpenPGP key with public and private parts.
809 OpenPGP users may transfer public keys. The essential elements
810 of a transferable public key are as follows:
812 - One Public-Key packet
813 - Zero or more revocation signatures
814 - One or more User ID packets
815 - After each User ID packet, zero or more Signature packets
817 - Zero or more User Attribute packets
818 - After each User Attribute packet, zero or more Signature
819 packets (certifications)
820 - Zero or more Subkey packets
821 - After each Subkey packet, one Signature packet, plus
822 optionally a revocation
824 Secret keys have a similar packet stream [2]:
826 OpenPGP users may transfer secret keys. The format of a
827 transferable secret key is the same as a transferable public key
828 except that secret-key and secret-subkey packets are used
829 instead of the public key and public-subkey packets.
830 Implementations SHOULD include self-signatures on any user IDs
831 and subkeys, as this allows for a complete public key to be
832 automatically extracted from the transferable secret key.
833 Implementations MAY choose to omit the self-signatures,
834 especially if a transferable public key accompanies the
835 transferable secret key.
837 [1]: http://tools.ietf.org/search/rfc4880#section-11.1
838 [2]: http://tools.ietf.org/search/rfc4880#section-11.2
840 def __init__(self, fingerprint):
841 self.fingerprint = fingerprint
842 self.public_packets = None
843 self.secret_packets = None
846 lines = ['key: {}'.format(self.fingerprint)]
847 if self.public_packets:
848 lines.append(' public:')
849 for packet in self.public_packets:
850 lines.extend(self._str_packet(packet=packet, prefix=' '))
851 if self.secret_packets:
852 lines.append(' secret:')
853 for packet in self.secret_packets:
854 lines.extend(self._str_packet(packet=packet, prefix=' '))
855 return '\n'.join(lines)
857 def _str_packet(self, packet, prefix):
858 lines = str(packet).split('\n')
859 return [prefix + line for line in lines]
861 def import_from_gpg(self):
862 key_export = _get_stdout(
863 ['gpg', '--export', self.fingerprint])
864 self.public_packets = list(
865 packets_from_bytes(data=key_export))
866 if self.public_packets[0]['type'] != 'public-key packet':
868 '{} does not start with a public-key packet'.format(
870 key_secret_export = _get_stdout(
871 ['gpg', '--export-secret-keys', self.fingerprint])
872 self.secret_packets = list(
873 packets_from_bytes(data=key_secret_export))
874 if self.secret_packets[0]['type'] != 'secret-key packet':
876 '{} does not start with a secret-key packet'.format(
879 def export_to_gpg(self):
880 raise NotImplemetedError('export to gpg')
882 def import_from_key(self, key):
883 """Migrate the (sub)keys into this key"""
887 def migrate(old_key, new_key):
888 """Add the old key and sub-keys to the new key
890 For example, to upgrade your master key, while preserving old
891 signatures you'd made. You will lose signature *on* your old key
892 though, since sub-keys can't be signed (I don't think).
894 old_key = PGPKey(fingerprint=old_key)
895 old_key.import_from_gpg()
896 new_key = PGPKey(fingerprint=new_key)
897 new_key.import_from_gpg()
898 new_key.import_from_key(key=old_key)
904 if __name__ == '__main__':
907 old_key, new_key = _sys.argv[1:3]
908 migrate(old_key=old_key, new_key=new_key)