3 import hashlib as _hashlib
6 import subprocess as _subprocess
7 import struct as _struct
10 def _get_stdout(args, stdin=None):
13 stdin_pipe = _subprocess.PIPE
14 p = _subprocess.Popen(args, stdin=stdin_pipe, stdout=_subprocess.PIPE)
15 stdout, stderr = p.communicate(stdin)
18 raise RuntimeError(status)
22 class PGPPacket (dict):
23 # http://tools.ietf.org/search/rfc4880
24 _old_format_packet_length_type = { # type: (bytes, struct type)
25 0: (1, 'B'), # 1-byte unsigned integer
26 1: (2, 'H'), # 2-byte unsigned integer
27 2: (4, 'I'), # 4-byte unsigned integer
33 1: 'public-key encrypted session key packet',
34 2: 'signature packet',
35 3: 'symmetric-key encrypted session key packet',
36 4: 'one-pass signature packet',
37 5: 'secret-key packet',
38 6: 'public-key packet',
39 7: 'secret-subkey packet',
40 8: 'compressed data packet',
41 9: 'symmetrically encrypted data packet',
43 11: 'literal data packet',
46 14: 'public-subkey packet',
47 17: 'user attribute packet',
48 18: 'sym. encrypted and integrity protected data packet',
49 19: 'modification detection code packet',
56 _public_key_algorithms = {
57 1: 'rsa (encrypt or sign)',
58 2: 'rsa encrypt-only',
60 16: 'elgamal (encrypt-only)',
61 17: 'dsa (digital signature algorithm)',
62 18: 'reserved for elliptic curve',
63 19: 'reserved for ecdsa',
64 20: 'reserved (formerly elgamal encrypt or sign)',
65 21: 'reserved for diffie-hellman',
79 _symmetric_key_algorithms = {
80 0: 'plaintext or unencrypted data',
87 7: 'aes with 128-bit key',
88 8: 'aes with 192-bit key',
89 9: 'aes with 256-bit key',
104 _cipher_block_size = { # in bits
105 'aes with 128-bit key': 128,
106 'aes with 192-bit key': 128,
107 'aes with 256-bit key': 128,
111 _compression_algorithms = {
154 _string_to_key_types = {
158 3: 'iterated and salted',
173 0x00: 'binary document',
174 0x01: 'canonical text document',
176 0x10: 'generic user id and public-key packet',
177 0x11: 'persona user id and public-key packet',
178 0x12: 'casual user id and public-key packet',
179 0x13: 'postitive user id and public-key packet',
180 0x18: 'subkey binding',
181 0x19: 'primary key binding',
183 0x20: 'key revocation',
184 0x28: 'subkey revocation',
185 0x30: 'certification revocation',
187 0x50: 'third-party confirmation',
190 _signature_subpacket_types = {
193 2: 'signature creation time',
194 3: 'signature expiration time',
195 4: 'exportable certification',
196 5: 'trust signature',
197 6: 'regular expression',
200 9: 'key expiration time',
201 10: 'placeholder for backward compatibility',
202 11: 'preferred symmetric algorithms',
203 12: 'revocation key',
212 21: 'preferred hash algorithms',
213 22: 'preferred compression algorithms',
214 23: 'key server preferences',
215 24: 'preferred key server',
216 25: 'primary user id',
219 28: 'signer user id',
220 29: 'reason for revocation',
222 31: 'signature target',
223 32: 'embedded signature',
237 _clean_type_regex = _re.compile('\W+')
239 def _clean_type(self, type=None):
242 return self._clean_type_regex.sub('_', type)
245 def _reverse(dict, value):
246 """Reverse lookups in dictionaries
248 >>> PGPPacket._reverse(PGPPacket._packet_types, 'public-key packet')
251 return [k for k,v in dict.items() if v == value][0]
254 method_name = '_str_{}'.format(self._clean_type())
255 method = getattr(self, method_name, None)
259 return '{}: {}'.format(self['type'], details)
261 def _str_public_key_packet(self):
262 return self._str_generic_key_packet()
264 def _str_public_subkey_packet(self):
265 return self._str_generic_key_packet()
267 def _str_secret_key_packet(self):
268 return self._str_generic_key_packet()
270 def _str_secret_subkey_packet(self):
271 return self._str_generic_key_packet()
273 def _str_generic_key_packet(self):
274 return self['fingerprint'][-8:].upper()
276 def _str_signature_packet(self):
277 lines = [self['signature-type']]
278 if self['hashed-subpackets']:
279 lines.append(' hashed subpackets:')
280 lines.extend(self._str_signature_subpackets(
281 self['hashed-subpackets'], prefix=' '))
282 if self['unhashed-subpackets']:
283 lines.append(' unhashed subpackets:')
284 lines.extend(self._str_signature_subpackets(
285 self['unhashed-subpackets'], prefix=' '))
286 return '\n'.join(lines)
288 def _str_signature_subpackets(self, subpackets, prefix):
290 for subpacket in subpackets:
291 method_name = '_str_{}_signature_subpacket'.format(
292 self._clean_type(type=subpacket['type']))
293 method = getattr(self, method_name, None)
295 lines.append(' {}: {}'.format(
297 method(subpacket=subpacket)))
299 lines.append(' {}'.format(subpacket['type']))
302 def _str_signature_creation_time_signature_subpacket(self, subpacket):
303 return str(subpacket['signature-creation-time'])
305 def _str_issuer_signature_subpacket(self, subpacket):
306 return subpacket['issuer'][-8:].upper()
308 def _str_key_expiration_time_signature_subpacket(self, subpacket):
309 return str(subpacket['key-expiration-time'])
311 def _str_preferred_symmetric_algorithms_signature_subpacket(
314 algo for algo in subpacket['preferred-symmetric-algorithms'])
316 def _str_preferred_hash_algorithms_signature_subpacket(
319 algo for algo in subpacket['preferred-hash-algorithms'])
321 def _str_preferred_compression_algorithms_signature_subpacket(
324 algo for algo in subpacket['preferred-compression-algorithms'])
326 def _str_key_server_preferences_signature_subpacket(self, subpacket):
328 x for x in sorted(subpacket['key-server-preferences']))
330 def _str_primary_user_id_signature_subpacket(self, subpacket):
331 return str(subpacket['primary-user-id'])
333 def _str_key_flags_signature_subpacket(self, subpacket):
334 return ', '.join(x for x in sorted(subpacket['key-flags']))
336 def _str_features_signature_subpacket(self, subpacket):
337 return ', '.join(x for x in sorted(subpacket['features']))
339 def _str_embedded_signature_signature_subpacket(self, subpacket):
340 return subpacket['embedded']['signature-type']
342 def _str_user_id_packet(self):
345 def from_bytes(self, data):
346 offset = self._parse_header(data=data)
347 packet = data[offset:offset + self['length']]
348 if len(packet) < self['length']:
349 raise ValueError('packet too short ({} < {})'.format(
350 len(packet), self['length']))
351 offset += self['length']
352 method_name = '_parse_{}'.format(self._clean_type())
353 method = getattr(self, method_name, None)
355 raise NotImplementedError(
356 'cannot parse packet type {!r}'.format(self['type']))
360 def _parse_header(self, data):
363 always_one = packet_tag & 1 << 7
365 raise ValueError('most significant packet tag bit not set')
366 self['new-format'] = packet_tag & 1 << 6
367 if self['new-format']:
368 type_code = packet_tag & 0b111111
369 raise NotImplementedError('new-format packet length')
371 type_code = packet_tag >> 2 & 0b1111
372 self['length-type'] = packet_tag & 0b11
373 length_bytes, length_type = self._old_format_packet_length_type[
376 raise NotImplementedError(
377 'old-format packet of indeterminate length')
378 length_format = '>{}'.format(length_type)
379 length_data = data[offset: offset + length_bytes]
380 offset += length_bytes
381 self['length'] = _struct.unpack(length_format, length_data)[0]
382 self['type'] = self._packet_types[type_code]
386 def _parse_multiprecision_integer(data):
387 r"""Parse RFC 4880's multiprecision integers
389 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x01\x01')
391 >>> PGPPacket._parse_multiprecision_integer(b'\x00\x09\x01\xff')
394 bits = _struct.unpack('>H', data[:2])[0]
396 length = (bits + 7) // 8
398 for i in range(length):
399 value += data[offset + i] * 1 << (8 * (length - i - 1))
401 return (offset, value)
403 def _parse_string_to_key_specifier(self, data):
404 self['string-to-key-type'] = self._string_to_key_types[data[0]]
406 if self['string-to-key-type'] == 'simple':
407 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
410 elif self['string-to-key-type'] == 'salted':
411 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
414 self['string-to-key-salt'] = data[offset: offset + 8]
416 elif self['string-to-key-type'] == 'iterated and salted':
417 self['string-to-key-hash-algorithm'] = self._hash_algorithms[
420 self['string-to-key-salt'] = data[offset: offset + 8]
422 self['string-to-key-coded-count'] = data[offset]
425 raise NotImplementedError(
426 'string-to-key type {}'.format(self['string-to-key-type']))
429 def _parse_public_key_packet(self, data):
430 self._parse_generic_public_key_packet(data=data)
432 def _parse_public_subkey_packet(self, data):
433 self._parse_generic_public_key_packet(data=data)
435 def _parse_generic_public_key_packet(self, data):
436 self['key-version'] = data[0]
438 if self['key-version'] != 4:
439 raise NotImplementedError(
440 'public (sub)key packet version {}'.format(
441 self['key-version']))
443 self['creation-time'], algorithm = _struct.unpack(
444 '>IB', data[offset: offset + length])
446 self['public-key-algorithm'] = self._public_key_algorithms[algorithm]
447 if self['public-key-algorithm'].startswith('rsa '):
448 o, self['public-modulus'] = self._parse_multiprecision_integer(
451 o, self['public-exponent'] = self._parse_multiprecision_integer(
454 elif self['public-key-algorithm'].startswith('dsa '):
455 o, self['prime'] = self._parse_multiprecision_integer(
458 o, self['group-order'] = self._parse_multiprecision_integer(
461 o, self['group-generator'] = self._parse_multiprecision_integer(
464 o, self['public-key'] = self._parse_multiprecision_integer(
467 elif self['public-key-algorithm'].startswith('elgamal '):
468 o, self['prime'] = self._parse_multiprecision_integer(
471 o, self['group-generator'] = self._parse_multiprecision_integer(
474 o, self['public-key'] = self._parse_multiprecision_integer(
478 raise NotImplementedError(
479 'algorithm-specific key fields for {}'.format(
480 self['public-key-algorithm']))
481 fingerprint = _hashlib.sha1()
482 fingerprint.update(b'\x99')
483 fingerprint.update(_struct.pack('>H', len(data)))
484 fingerprint.update(data)
485 self['fingerprint'] = fingerprint.hexdigest()
488 def _parse_secret_key_packet(self, data):
489 self._parse_generic_secret_key_packet(data=data)
491 def _parse_secret_subkey_packet(self, data):
492 self._parse_generic_secret_key_packet(data=data)
494 def _parse_generic_secret_key_packet(self, data):
495 offset = self._parse_generic_public_key_packet(data=data)
496 string_to_key_usage = data[offset]
498 if string_to_key_usage in [255, 254]:
499 self['symmetric-encryption-algorithm'] = (
500 self._symmetric_key_algorithms[data[offset]])
502 offset += self._parse_string_to_key_specifier(data=data[offset:])
504 self['symmetric-encryption-algorithm'] = (
505 self._symmetric_key_algorithms[string_to_key_usage])
506 if string_to_key_usage:
507 block_size_bits = self._cipher_block_size.get(
508 self['symmetric-encryption-algorithm'], None)
509 if block_size_bits % 8:
510 raise NotImplementedError(
511 ('{}-bit block size for {} is not an integer number of bytes'
513 block_size_bits, self['symmetric-encryption-algorithm']))
514 block_size = block_size_bits // 8
516 raise NotImplementedError(
517 'unknown block size for {}'.format(
518 self['symmetric-encryption-algorithm']))
519 self['initial-vector'] = data[offset: offset + block_size]
521 if string_to_key_usage in [0, 255]:
525 self['secret-key'] = data[offset:key_end]
527 self['secret-key-checksum'] = data[key_end:]
529 def _parse_signature_subpackets(self, data):
531 while offset < len(data):
532 o, subpacket = self._parse_signature_subpacket(data=data[offset:])
536 def _parse_signature_subpacket(self, data):
542 elif first >= 192 and first < 255:
543 second = data[offset]
545 length = ((first - 192) << 8) + second + 192
547 length = _struct.unpack(
548 '>I', data[offset: offset + 4])[0]
550 subpacket['type'] = self._signature_subpacket_types[data[offset]]
552 subpacket_data = data[offset: offset + length - 1]
553 offset += len(subpacket_data)
554 method_name = '_parse_{}_signature_subpacket'.format(
555 self._clean_type(type=subpacket['type']))
556 method = getattr(self, method_name, None)
558 raise NotImplementedError(
559 'cannot parse signature subpacket type {!r}'.format(
561 method(data=subpacket_data, subpacket=subpacket)
562 return (offset, subpacket)
564 def _parse_signature_packet(self, data):
565 self['signature-version'] = data[0]
567 if self['signature-version'] != 4:
568 raise NotImplementedError(
569 'signature packet version {}'.format(
570 self['signature-version']))
571 self['signature-type'] = self._signature_types[data[offset]]
573 self['public-key-algorithm'] = self._public_key_algorithms[
576 self['hash-algorithm'] = self._hash_algorithms[data[offset]]
578 hashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
580 self['hashed-subpackets'] = list(self._parse_signature_subpackets(
581 data[offset: offset + hashed_count]))
582 offset += hashed_count
583 unhashed_count = _struct.unpack('>H', data[offset: offset + 2])[0]
585 self['unhashed-subpackets'] = list(self._parse_signature_subpackets(
586 data=data[offset: offset + unhashed_count]))
587 offset += unhashed_count
588 self['signed-hash-word'] = data[offset: offset + 2]
590 self['signature'] = data[offset:]
592 def _parse_signature_creation_time_signature_subpacket(
593 self, data, subpacket):
594 subpacket['signature-creation-time'] = _struct.unpack('>I', data)[0]
596 def _parse_issuer_signature_subpacket(self, data, subpacket):
597 subpacket['issuer'] = ''.join('{:02x}'.format(byte) for byte in data)
599 def _parse_key_expiration_time_signature_subpacket(
600 self, data, subpacket):
601 subpacket['key-expiration-time'] = _struct.unpack('>I', data)[0]
603 def _parse_preferred_symmetric_algorithms_signature_subpacket(
604 self, data, subpacket):
605 subpacket['preferred-symmetric-algorithms'] = [
606 self._symmetric_key_algorithms[d] for d in data]
608 def _parse_preferred_hash_algorithms_signature_subpacket(
609 self, data, subpacket):
610 subpacket['preferred-hash-algorithms'] = [
611 self._hash_algorithms[d] for d in data]
613 def _parse_preferred_compression_algorithms_signature_subpacket(
614 self, data, subpacket):
615 subpacket['preferred-compression-algorithms'] = [
616 self._compression_algorithms[d] for d in data]
618 def _parse_key_server_preferences_signature_subpacket(
619 self, data, subpacket):
620 subpacket['key-server-preferences'] = set()
622 subpacket['key-server-preferences'].add('no-modify')
624 def _parse_primary_user_id_signature_subpacket(self, data, subpacket):
625 subpacket['primary-user-id'] = bool(data[0])
627 def _parse_key_flags_signature_subpacket(self, data, subpacket):
628 subpacket['key-flags'] = set()
630 subpacket['key-flags'].add('can certify')
632 subpacket['key-flags'].add('can sign')
634 subpacket['key-flags'].add('can encrypt communications')
636 subpacket['key-flags'].add('can encrypt storage')
638 subpacket['key-flags'].add('private split')
640 subpacket['key-flags'].add('can authenticate')
642 subpacket['key-flags'].add('private shared')
644 def _parse_features_signature_subpacket(self, data, subpacket):
645 subpacket['features'] = set()
647 subpacket['features'].add('modification detection')
649 def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
650 subpacket['embedded'] = PGPPacket()
651 subpacket['embedded']._parse_signature_packet(data=data)
653 def _parse_user_id_packet(self, data):
654 self['user'] = str(data, 'utf-8')
657 method_name = '_serialize_{}'.format(self._clean_type())
658 method = getattr(self, method_name, None)
660 raise NotImplementedError(
661 'cannot serialize packet type {!r}'.format(self['type']))
663 self['length'] = len(body)
665 self._serialize_header(),
669 def _serialize_header(self):
672 type_code = self._reverse(self._packet_types, self['type'])
674 always_one * (1 << 7) |
675 new_format * (1 << 6) |
676 type_code * (1 << 2) |
679 length_bytes, length_type = self._old_format_packet_length_type[
681 length_format = '>{}'.format(length_type)
682 length_data = _struct.pack(length_format, self['length'])
689 def _serialize_multiprecision_integer(integer):
690 r"""Serialize RFC 4880's multipricision integers
692 >>> PGPPacket._serialize_multiprecision_integer(1)
694 >>> PGPPacket._serialize_multiprecision_integer(511)
697 bit_length = int(_math.log(integer, 2)) + 1
699 _struct.pack('>H', bit_length),
702 chunks.insert(1, bytes([integer & 0xff]))
703 integer = integer >> 8
704 return b''.join(chunks)
706 def _serialize_string_to_key_specifier(self):
707 string_to_key_type = bytes([
709 self._string_to_key_types, self['string-to-key-type']),
711 chunks = [string_to_key_type]
712 if self['string-to-key-type'] == 'simple':
713 chunks.append(bytes([self._reverse(
714 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
715 elif self['string-to-key-type'] == 'salted':
716 chunks.append(bytes([self._reverse(
717 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
718 chunks.append(self['string-to-key-salt'])
719 elif self['string-to-key-type'] == 'iterated and salted':
720 chunks.append(bytes([self._reverse(
721 self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
722 chunks.append(self['string-to-key-salt'])
723 chunks.append(bytes([self['string-to-key-coded-count']]))
725 raise NotImplementedError(
726 'string-to-key type {}'.format(self['string-to-key-type']))
728 return b''.join(chunks)
730 def _serialize_public_key_packet(self):
731 return self._serialize_generic_public_key_packet()
733 def _serialize_public_subkey_packet(self):
734 return self._serialize_generic_public_key_packet()
736 def _serialize_generic_public_key_packet(self):
737 key_version = bytes([self['key-version']])
738 chunks = [key_version]
739 if self['key-version'] != 4:
740 raise NotImplementedError(
741 'public (sub)key packet version {}'.format(
742 self['key-version']))
743 chunks.append(_struct.pack('>I', self['creation-time']))
744 chunks.append(bytes([self._reverse(
745 self._public_key_algorithms, self['public-key-algorithm'])]))
746 if self['public-key-algorithm'].startswith('rsa '):
747 chunks.append(self._serialize_multiprecision_integer(
748 self['public-modulus']))
749 chunks.append(self._serialize_multiprecision_integer(
750 self['public-exponent']))
751 elif self['public-key-algorithm'].startswith('dsa '):
752 chunks.append(self._serialize_multiprecision_integer(
754 chunks.append(self._serialize_multiprecision_integer(
755 self['group-order']))
756 chunks.append(self._serialize_multiprecision_integer(
757 self['group-generator']))
758 chunks.append(self._serialize_multiprecision_integer(
760 elif self['public-key-algorithm'].startswith('elgamal '):
761 chunks.append(self._serialize_multiprecision_integer(
763 chunks.append(self._serialize_multiprecision_integer(
764 self['group-generator']))
765 chunks.append(self._serialize_multiprecision_integer(
768 raise NotImplementedError(
769 'algorithm-specific key fields for {}'.format(
770 self['public-key-algorithm']))
771 return b''.join(chunks)
774 def packets_from_bytes(data):
776 while offset < len(data):
778 offset += packet.from_bytes(data=data[offset:])
782 class PGPKey (object):
783 """An OpenPGP key with public and private parts.
787 OpenPGP users may transfer public keys. The essential elements
788 of a transferable public key are as follows:
790 - One Public-Key packet
791 - Zero or more revocation signatures
792 - One or more User ID packets
793 - After each User ID packet, zero or more Signature packets
795 - Zero or more User Attribute packets
796 - After each User Attribute packet, zero or more Signature
797 packets (certifications)
798 - Zero or more Subkey packets
799 - After each Subkey packet, one Signature packet, plus
800 optionally a revocation
802 Secret keys have a similar packet stream [2]:
804 OpenPGP users may transfer secret keys. The format of a
805 transferable secret key is the same as a transferable public key
806 except that secret-key and secret-subkey packets are used
807 instead of the public key and public-subkey packets.
808 Implementations SHOULD include self-signatures on any user IDs
809 and subkeys, as this allows for a complete public key to be
810 automatically extracted from the transferable secret key.
811 Implementations MAY choose to omit the self-signatures,
812 especially if a transferable public key accompanies the
813 transferable secret key.
815 [1]: http://tools.ietf.org/search/rfc4880#section-11.1
816 [2]: http://tools.ietf.org/search/rfc4880#section-11.2
818 def __init__(self, fingerprint):
819 self.fingerprint = fingerprint
820 self.public_packets = None
821 self.secret_packets = None
824 lines = ['key: {}'.format(self.fingerprint)]
825 if self.public_packets:
826 lines.append(' public:')
827 for packet in self.public_packets:
828 lines.extend(self._str_packet(packet=packet, prefix=' '))
829 if self.secret_packets:
830 lines.append(' secret:')
831 for packet in self.secret_packets:
832 lines.extend(self._str_packet(packet=packet, prefix=' '))
833 return '\n'.join(lines)
835 def _str_packet(self, packet, prefix):
836 lines = str(packet).split('\n')
837 return [prefix + line for line in lines]
839 def import_from_gpg(self):
840 key_export = _get_stdout(
841 ['gpg', '--export', self.fingerprint])
842 self.public_packets = list(
843 packets_from_bytes(data=key_export))
844 if self.public_packets[0]['type'] != 'public-key packet':
846 '{} does not start with a public-key packet'.format(
848 key_secret_export = _get_stdout(
849 ['gpg', '--export-secret-keys', self.fingerprint])
850 self.secret_packets = list(
851 packets_from_bytes(data=key_secret_export))
852 if self.secret_packets[0]['type'] != 'secret-key packet':
854 '{} does not start with a secret-key packet'.format(
857 def export_to_gpg(self):
858 raise NotImplemetedError('export to gpg')
860 def import_from_key(self, key):
861 """Migrate the (sub)keys into this key"""
865 def migrate(old_key, new_key):
866 """Add the old key and sub-keys to the new key
868 For example, to upgrade your master key, while preserving old
869 signatures you'd made. You will lose signature *on* your old key
870 though, since sub-keys can't be signed (I don't think).
872 old_key = PGPKey(fingerprint=old_key)
873 old_key.import_from_gpg()
874 new_key = PGPKey(fingerprint=new_key)
875 new_key.import_from_gpg()
876 new_key.import_from_key(key=old_key)
882 if __name__ == '__main__':
885 old_key, new_key = _sys.argv[1:3]
886 migrate(old_key=old_key, new_key=new_key)