X-Git-Url: http://git.tremily.us/?a=blobdiff_plain;f=gpg-migrate.py;h=f8c1fd488934357da431f9f17940c46d0486ed6f;hb=309ff8afbf8a592b668c7539583b44656e68501c;hp=a1dbb7abb2aea919a34e0d52e1e96a7b1cf5eea5;hpb=f4477b63f603f998a9b766ead822b9ed5bd82138;p=gpg-migrate.git diff --git a/gpg-migrate.py b/gpg-migrate.py index a1dbb7a..f8c1fd4 100755 --- a/gpg-migrate.py +++ b/gpg-migrate.py @@ -1,10 +1,17 @@ #!/usr/bin/python +import getpass as _getpass import hashlib as _hashlib +import math as _math import re as _re import subprocess as _subprocess import struct as _struct +import Crypto.Cipher.AES as _crypto_cipher_aes +import Crypto.Cipher.Blowfish as _crypto_cipher_blowfish +import Crypto.Cipher.CAST as _crypto_cipher_cast +import Crypto.Cipher.DES3 as _crypto_cipher_des3 + def _get_stdout(args, stdin=None): stdin_pipe = None @@ -107,6 +114,15 @@ class PGPPacket (dict): 'cast5': 64, } + _crypto_module = { + 'aes with 128-bit key': _crypto_cipher_aes, + 'aes with 192-bit key': _crypto_cipher_aes, + 'aes with 256-bit key': _crypto_cipher_aes, + 'blowfish': _crypto_cipher_blowfish, + 'cast5': _crypto_cipher_cast, + 'tripledes': _crypto_cipher_des3, + } + _compression_algorithms = { 0: 'uncompressed', 1: 'zip', @@ -168,6 +184,8 @@ class PGPPacket (dict): 110: 'private', } + _string_to_key_expbias = 6 + _signature_types = { 0x00: 'binary document', 0x01: 'canonical text document', @@ -240,6 +258,15 @@ class PGPPacket (dict): type = self['type'] return self._clean_type_regex.sub('_', type) + @staticmethod + def _reverse(dict, value): + """Reverse lookups in dictionaries + + >>> PGPPacket._reverse(PGPPacket._packet_types, 'public-key packet') + 6 + """ + return [k for k,v in dict.items() if v == value][0] + def __str__(self): method_name = '_str_{}'.format(self._clean_type()) method = getattr(self, method_name, None) @@ -289,17 +316,43 @@ class PGPPacket (dict): lines.append(' {}'.format(subpacket['type'])) return lines + def _str_signature_creation_time_signature_subpacket(self, subpacket): + return str(subpacket['signature-creation-time']) + def _str_issuer_signature_subpacket(self, subpacket): return subpacket['issuer'][-8:].upper() + def _str_key_expiration_time_signature_subpacket(self, subpacket): + return str(subpacket['key-expiration-time']) + def _str_preferred_symmetric_algorithms_signature_subpacket( self, subpacket): return ', '.join( algo for algo in subpacket['preferred-symmetric-algorithms']) + def _str_preferred_hash_algorithms_signature_subpacket( + self, subpacket): + return ', '.join( + algo for algo in subpacket['preferred-hash-algorithms']) + + def _str_preferred_compression_algorithms_signature_subpacket( + self, subpacket): + return ', '.join( + algo for algo in subpacket['preferred-compression-algorithms']) + + def _str_key_server_preferences_signature_subpacket(self, subpacket): + return ', '.join( + x for x in sorted(subpacket['key-server-preferences'])) + + def _str_primary_user_id_signature_subpacket(self, subpacket): + return str(subpacket['primary-user-id']) + def _str_key_flags_signature_subpacket(self, subpacket): return ', '.join(x for x in sorted(subpacket['key-flags'])) + def _str_features_signature_subpacket(self, subpacket): + return ', '.join(x for x in sorted(subpacket['features'])) + def _str_embedded_signature_signature_subpacket(self, subpacket): return subpacket['embedded']['signature-type'] @@ -364,6 +417,15 @@ class PGPPacket (dict): offset += length return (offset, value) + @classmethod + def _decode_string_to_key_count(cls, data): + r"""Decode RFC 4880's string-to-key count + + >>> PGPPacket._decode_string_to_key_count(b'\x97'[0]) + 753664 + """ + return (16 + (data & 15)) << ((data >> 4) + cls._string_to_key_expbias) + def _parse_string_to_key_specifier(self, data): self['string-to-key-type'] = self._string_to_key_types[data[0]] offset = 1 @@ -383,7 +445,8 @@ class PGPPacket (dict): offset += 1 self['string-to-key-salt'] = data[offset: offset + 8] offset += 8 - self['string-to-key-coded-count'] = data[offset] + self['string-to-key-count'] = self._decode_string_to_key_count( + data=data[offset]) offset += 1 else: raise NotImplementedError( @@ -482,13 +545,31 @@ class PGPPacket (dict): self['symmetric-encryption-algorithm'])) self['initial-vector'] = data[offset: offset + block_size] offset += block_size + ciphertext = data[offset:] + offset += len(ciphertext) + decrypted_data = self.decrypt_symmetric_encryption(data=ciphertext) + else: + decrypted_data = data[offset:key_end] if string_to_key_usage in [0, 255]: key_end = -2 + elif string_to_key_usage == 254: + key_end = -20 else: key_end = 0 - self['secret-key'] = data[offset:key_end] + secret_key = decrypted_data[:key_end] if key_end: - self['secret-key-checksum'] = data[key_end:] + secret_key_checksum = decrypted_data[key_end:] + if key_end == -2: + calculated_checksum = sum(secret_key) % 65536 + else: + checksum_hash = _hashlib.sha1() + checksum_hash.update(secret_key) + calculated_checksum = checksum_hash.digest() + if secret_key_checksum != calculated_checksum: + raise ValueError( + 'corrupt secret key (checksum {} != expected {})'.format( + secret_key_checksum, calculated_checksum)) + self['secret-key'] = secret_key def _parse_signature_subpackets(self, data): offset = 0 @@ -553,14 +634,41 @@ class PGPPacket (dict): offset += 2 self['signature'] = data[offset:] + def _parse_signature_creation_time_signature_subpacket( + self, data, subpacket): + subpacket['signature-creation-time'] = _struct.unpack('>I', data)[0] + def _parse_issuer_signature_subpacket(self, data, subpacket): subpacket['issuer'] = ''.join('{:02x}'.format(byte) for byte in data) + def _parse_key_expiration_time_signature_subpacket( + self, data, subpacket): + subpacket['key-expiration-time'] = _struct.unpack('>I', data)[0] + def _parse_preferred_symmetric_algorithms_signature_subpacket( self, data, subpacket): subpacket['preferred-symmetric-algorithms'] = [ self._symmetric_key_algorithms[d] for d in data] + def _parse_preferred_hash_algorithms_signature_subpacket( + self, data, subpacket): + subpacket['preferred-hash-algorithms'] = [ + self._hash_algorithms[d] for d in data] + + def _parse_preferred_compression_algorithms_signature_subpacket( + self, data, subpacket): + subpacket['preferred-compression-algorithms'] = [ + self._compression_algorithms[d] for d in data] + + def _parse_key_server_preferences_signature_subpacket( + self, data, subpacket): + subpacket['key-server-preferences'] = set() + if data[0] & 0x80: + subpacket['key-server-preferences'].add('no-modify') + + def _parse_primary_user_id_signature_subpacket(self, data, subpacket): + subpacket['primary-user-id'] = bool(data[0]) + def _parse_key_flags_signature_subpacket(self, data, subpacket): subpacket['key-flags'] = set() if data[0] & 0x1: @@ -578,6 +686,11 @@ class PGPPacket (dict): if data[0] & 0x80: subpacket['key-flags'].add('private shared') + def _parse_features_signature_subpacket(self, data, subpacket): + subpacket['features'] = set() + if data[0] & 0x1: + subpacket['features'].add('modification detection') + def _parse_embedded_signature_signature_subpacket(self, data, subpacket): subpacket['embedded'] = PGPPacket() subpacket['embedded']._parse_signature_packet(data=data) @@ -586,7 +699,164 @@ class PGPPacket (dict): self['user'] = str(data, 'utf-8') def to_bytes(self): - pass + method_name = '_serialize_{}'.format(self._clean_type()) + method = getattr(self, method_name, None) + if not method: + raise NotImplementedError( + 'cannot serialize packet type {!r}'.format(self['type'])) + body = method() + self['length'] = len(body) + return b''.join([ + self._serialize_header(), + body, + ]) + + def _serialize_header(self): + always_one = 1 + new_format = 0 + type_code = self._reverse(self._packet_types, self['type']) + packet_tag = ( + always_one * (1 << 7) | + new_format * (1 << 6) | + type_code * (1 << 2) | + self['length-type'] + ) + length_bytes, length_type = self._old_format_packet_length_type[ + self['length-type']] + length_format = '>{}'.format(length_type) + length_data = _struct.pack(length_format, self['length']) + return b''.join([ + bytes([packet_tag]), + length_data, + ]) + + @staticmethod + def _serialize_multiprecision_integer(integer): + r"""Serialize RFC 4880's multipricision integers + + >>> PGPPacket._serialize_multiprecision_integer(1) + b'\x00\x01\x01' + >>> PGPPacket._serialize_multiprecision_integer(511) + b'\x00\t\x01\xff' + """ + bit_length = int(_math.log(integer, 2)) + 1 + chunks = [ + _struct.pack('>H', bit_length), + ] + while integer > 0: + chunks.insert(1, bytes([integer & 0xff])) + integer = integer >> 8 + return b''.join(chunks) + + @classmethod + def _encode_string_to_key_count(cls, count): + r"""Encode RFC 4880's string-to-key count + + >>> PGPPacket._encode_string_to_key_count(753664) + b'\x97' + """ + coded_count = 0 + count = count >> cls._string_to_key_expbias + while not count & 1: + count = count >> 1 + coded_count += 1 << 4 + coded_count += count & 15 + return bytes([coded_count]) + + def _serialize_string_to_key_specifier(self): + string_to_key_type = bytes([ + self._reverse( + self._string_to_key_types, self['string-to-key-type']), + ]) + chunks = [string_to_key_type] + if self['string-to-key-type'] == 'simple': + chunks.append(bytes([self._reverse( + self._hash_algorithms, self['string-to-key-hash-algorithm'])])) + elif self['string-to-key-type'] == 'salted': + chunks.append(bytes([self._reverse( + self._hash_algorithms, self['string-to-key-hash-algorithm'])])) + chunks.append(self['string-to-key-salt']) + elif self['string-to-key-type'] == 'iterated and salted': + chunks.append(bytes([self._reverse( + self._hash_algorithms, self['string-to-key-hash-algorithm'])])) + chunks.append(self['string-to-key-salt']) + chunks.append(self._encode_string_to_key_count( + count=self['string-to-key-count'])) + else: + raise NotImplementedError( + 'string-to-key type {}'.format(self['string-to-key-type'])) + return offset + return b''.join(chunks) + + def _serialize_public_key_packet(self): + return self._serialize_generic_public_key_packet() + + def _serialize_public_subkey_packet(self): + return self._serialize_generic_public_key_packet() + + def _serialize_generic_public_key_packet(self): + key_version = bytes([self['key-version']]) + chunks = [key_version] + if self['key-version'] != 4: + raise NotImplementedError( + 'public (sub)key packet version {}'.format( + self['key-version'])) + chunks.append(_struct.pack('>I', self['creation-time'])) + chunks.append(bytes([self._reverse( + self._public_key_algorithms, self['public-key-algorithm'])])) + if self['public-key-algorithm'].startswith('rsa '): + chunks.append(self._serialize_multiprecision_integer( + self['public-modulus'])) + chunks.append(self._serialize_multiprecision_integer( + self['public-exponent'])) + elif self['public-key-algorithm'].startswith('dsa '): + chunks.append(self._serialize_multiprecision_integer( + self['prime'])) + chunks.append(self._serialize_multiprecision_integer( + self['group-order'])) + chunks.append(self._serialize_multiprecision_integer( + self['group-generator'])) + chunks.append(self._serialize_multiprecision_integer( + self['public-key'])) + elif self['public-key-algorithm'].startswith('elgamal '): + chunks.append(self._serialize_multiprecision_integer( + self['prime'])) + chunks.append(self._serialize_multiprecision_integer( + self['group-generator'])) + chunks.append(self._serialize_multiprecision_integer( + self['public-key'])) + else: + raise NotImplementedError( + 'algorithm-specific key fields for {}'.format( + self['public-key-algorithm'])) + return b''.join(chunks) + + def decrypt_symmetric_encryption(self, data): + """Decrypt OpenPGP's Cipher Feedback mode""" + algorithm = self['symmetric-encryption-algorithm'] + module = self._crypto_module[algorithm] + key_size = self._key_size[algorithm] + segment_size_bits = self._cipher_block_size[algorithm] + if segment_size_bits % 8: + raise NotImplementedError( + ('{}-bit segment size for {} is not an integer number of bytes' + ).format(segment_size_bits, algorithm)) + segment_size_bytes = segment_size_bits // 8 + padding = segment_size_bytes - len(data) % segment_size_bytes + if padding: + data += b'\x00' * padding + passphrase = _getpass.getpass( + 'passphrase for {}: '.format(self['fingerprint'][-8:])) + passphrase = passphrase.encode('ascii') + cipher = module.new( + key=passphrase, + mode=module.MODE_CFB, + IV=self['initial-vector'], + segment_size=segment_size_bits) + plaintext = cipher.decrypt(data) + if padding: + plaintext = plaintext[:-padding] + return plaintext def packets_from_bytes(data): @@ -598,6 +868,41 @@ def packets_from_bytes(data): class PGPKey (object): + """An OpenPGP key with public and private parts. + + From RFC 4880 [1]: + + OpenPGP users may transfer public keys. The essential elements + of a transferable public key are as follows: + + - One Public-Key packet + - Zero or more revocation signatures + - One or more User ID packets + - After each User ID packet, zero or more Signature packets + (certifications) + - Zero or more User Attribute packets + - After each User Attribute packet, zero or more Signature + packets (certifications) + - Zero or more Subkey packets + - After each Subkey packet, one Signature packet, plus + optionally a revocation + + Secret keys have a similar packet stream [2]: + + OpenPGP users may transfer secret keys. The format of a + transferable secret key is the same as a transferable public key + except that secret-key and secret-subkey packets are used + instead of the public key and public-subkey packets. + Implementations SHOULD include self-signatures on any user IDs + and subkeys, as this allows for a complete public key to be + automatically extracted from the transferable secret key. + Implementations MAY choose to omit the self-signatures, + especially if a transferable public key accompanies the + transferable secret key. + + [1]: http://tools.ietf.org/search/rfc4880#section-11.1 + [2]: http://tools.ietf.org/search/rfc4880#section-11.2 + """ def __init__(self, fingerprint): self.fingerprint = fingerprint self.public_packets = None @@ -632,6 +937,10 @@ class PGPKey (object): ['gpg', '--export-secret-keys', self.fingerprint]) self.secret_packets = list( packets_from_bytes(data=key_secret_export)) + if self.secret_packets[0]['type'] != 'secret-key packet': + raise ValueError( + '{} does not start with a secret-key packet'.format( + self.fingerprint)) def export_to_gpg(self): raise NotImplemetedError('export to gpg')