#!/usr/bin/python
+import getpass as _getpass
import hashlib as _hashlib
+import logging as _logging
import math as _math
import re as _re
import subprocess as _subprocess
import struct as _struct
+import Crypto.Cipher.AES as _crypto_cipher_aes
+import Crypto.Cipher.Blowfish as _crypto_cipher_blowfish
+import Crypto.Cipher.CAST as _crypto_cipher_cast
+import Crypto.Cipher.DES3 as _crypto_cipher_des3
+import Crypto.PublicKey.DSA as _crypto_publickey_dsa
+import Crypto.PublicKey.ElGamal as _crypto_publickey_elgamal
+import Crypto.PublicKey.RSA as _crypto_publickey_rsa
+import Crypto.Random.random as _crypto_random_random
+import Crypto.Signature.PKCS1_v1_5 as _crypto_signature_pkcs1_v1_5
+import Crypto.Hash.SHA as _crypto_hash_sha
+
+
+LOG = _logging.getLogger('gpg-migrate')
+LOG.addHandler(_logging.StreamHandler())
+LOG.setLevel(_logging.WARNING)
+
def _get_stdout(args, stdin=None):
stdin_pipe = None
return stdout
+def byte_string(data, sep=' '):
+ r"""Convert a byte-string to human readable form
+
+ >>> byte_string(b'\x12\x34\x56')
+ '12 34 56'
+ """
+ return sep.join('{:02x}'.format(byte) for byte in data)
+
+
+def string_bytes(data, sep=' '):
+ r"""Reverse byte_string()
+
+ >>> string_bytes('12 fa fb')
+ b'\x12\xfa\xfb'
+ """
+ return bytes(
+ int(c1+c2, base=16) for c1,c2 in
+ zip(data[::2 + len(sep)], data[1::2 + len(sep)]))
+
+
class PGPPacket (dict):
# http://tools.ietf.org/search/rfc4880
_old_format_packet_length_type = { # type: (bytes, struct type)
'aes with 192-bit key': 128,
'aes with 256-bit key': 128,
'cast5': 64,
+ 'twofish': 128,
+ }
+
+ _crypto_module = {
+ # symmetric-key encryption
+ 'aes with 128-bit key': _crypto_cipher_aes,
+ 'aes with 192-bit key': _crypto_cipher_aes,
+ 'aes with 256-bit key': _crypto_cipher_aes,
+ 'blowfish': _crypto_cipher_blowfish,
+ 'cast5': _crypto_cipher_cast,
+ 'tripledes': _crypto_cipher_des3,
+ # public-key encryption
+ 'dsa (digital signature algorithm)': _crypto_publickey_dsa,
+ 'elgamal (encrypt-only)': _crypto_publickey_elgamal,
+ 'rsa (encrypt or sign)': _crypto_publickey_rsa,
+ 'rsa encrypt-only': _crypto_publickey_rsa,
+ 'rsa sign-only': _crypto_publickey_rsa,
+ }
+
+ _key_size = { # in bits
+ 'aes with 128-bit key': 128,
+ 'aes with 192-bit key': 192,
+ 'aes with 256-bit key': 256,
+ 'cast5': 128,
}
_compression_algorithms = {
110: 'private',
}
+ _hashlib_name = { # map OpenPGP-based names to hashlib names
+ 'md5': 'md5',
+ 'sha-1': 'sha1',
+ 'ripe-md/160': 'ripemd160',
+ 'sha256': 'sha256',
+ 'sha384': 'sha384',
+ 'sha512': 'sha512',
+ 'sha224': 'sha224',
+ }
+
_string_to_key_types = {
0: 'simple',
1: 'salted',
110: 'private',
}
+ _string_to_key_expbias = 6
+
_signature_types = {
0x00: 'binary document',
0x01: 'canonical text document',
_clean_type_regex = _re.compile('\W+')
+ def __init__(self, key=None):
+ super(PGPPacket, self).__init__()
+ self.key = key
+
def _clean_type(self, type=None):
if type is None:
type = self['type']
"""
return [k for k,v in dict.items() if v == value][0]
+ def copy(self):
+ packet = PGPPacket(key=self.key)
+ packet.update(self)
+ return packet
+
def __str__(self):
method_name = '_str_{}'.format(self._clean_type())
method = getattr(self, method_name, None)
def _str_public_subkey_packet(self):
return self._str_generic_key_packet()
+ def _str_generic_key_packet(self):
+ return self['fingerprint'][-8:].upper()
+
def _str_secret_key_packet(self):
- return self._str_generic_key_packet()
+ return self._str_generic_secret_key_packet()
def _str_secret_subkey_packet(self):
- return self._str_generic_key_packet()
-
- def _str_generic_key_packet(self):
- return self['fingerprint'][-8:].upper()
+ return self._str_generic_secret_key_packet()
+
+ def _str_generic_secret_key_packet(self):
+ lines = [self._str_generic_key_packet()]
+ for label, key in [
+ ('symmetric encryption',
+ 'symmetric-encryption-algorithm'),
+ ('s2k hash', 'string-to-key-hash-algorithm'),
+ ('s2k count', 'string-to-key-count'),
+ ('s2k salt', 'string-to-key-salt'),
+ ('IV', 'initial-vector'),
+ ]:
+ if key in self:
+ value = self[key]
+ if isinstance(value, bytes):
+ value = byte_string(data=value)
+ lines.append(' {}: {}'.format(label, value))
+ return '\n'.join(lines)
def _str_signature_packet(self):
lines = [self['signature-type']]
raise NotImplementedError(
'cannot parse packet type {!r}'.format(self['type']))
method(data=packet)
+ self['raw'] = data[:offset]
return offset
def _parse_header(self, data):
offset += length
return (offset, value)
+ @classmethod
+ def _decode_string_to_key_count(cls, data):
+ r"""Decode RFC 4880's string-to-key count
+
+ >>> PGPPacket._decode_string_to_key_count(b'\x97'[0])
+ 753664
+ """
+ return (16 + (data & 15)) << ((data >> 4) + cls._string_to_key_expbias)
+
def _parse_string_to_key_specifier(self, data):
self['string-to-key-type'] = self._string_to_key_types[data[0]]
offset = 1
offset += 1
self['string-to-key-salt'] = data[offset: offset + 8]
offset += 8
- self['string-to-key-coded-count'] = data[offset]
+ self['string-to-key-count'] = self._decode_string_to_key_count(
+ data=data[offset])
offset += 1
else:
raise NotImplementedError(
'algorithm-specific key fields for {}'.format(
self['public-key-algorithm']))
fingerprint = _hashlib.sha1()
- fingerprint.update(b'\x99')
- fingerprint.update(_struct.pack('>H', len(data)))
- fingerprint.update(data)
+ fingerprint.update(
+ self._serialize_signature_packet_target(target=self))
self['fingerprint'] = fingerprint.hexdigest()
return offset
self['symmetric-encryption-algorithm']))
self['initial-vector'] = data[offset: offset + block_size]
offset += block_size
+ ciphertext = data[offset:]
+ offset += len(ciphertext)
+ decrypted_data = self.decrypt_symmetric_encryption(data=ciphertext)
+ else:
+ decrypted_data = data[offset:key_end]
if string_to_key_usage in [0, 255]:
key_end = -2
+ elif string_to_key_usage == 254:
+ key_end = -20
else:
key_end = 0
- secret_key = data[offset:key_end]
+ secret_key = decrypted_data[:key_end]
+ secret_offset = 0
if key_end:
- secret_key_checksum = data[key_end:]
- calculated_checksum = sum(secret_key) % 65536
+ secret_key_checksum = decrypted_data[key_end:]
+ if key_end == -2:
+ calculated_checksum = sum(secret_key) % 65536
+ else:
+ checksum_hash = _hashlib.sha1()
+ checksum_hash.update(secret_key)
+ calculated_checksum = checksum_hash.digest()
if secret_key_checksum != calculated_checksum:
raise ValueError(
'corrupt secret key (checksum {} != expected {})'.format(
secret_key_checksum, calculated_checksum))
- self['secret-key'] = secret_key
+ if self['public-key-algorithm'].startswith('rsa '):
+ o, self['secret-exponent'] = self._parse_multiprecision_integer(
+ secret_key[secret_offset:])
+ secret_offset += o
+ o, self['secret-prime-p'] = self._parse_multiprecision_integer(
+ secret_key[secret_offset:])
+ secret_offset += o
+ o, self['secret-prime-q'] = self._parse_multiprecision_integer(
+ secret_key[secret_offset:])
+ secret_offset += o
+ o, self['secret-inverse-of-p-mod-q'] = (
+ self._parse_multiprecision_integer(
+ secret_key[secret_offset:]))
+ secret_offset += o
+ elif self['public-key-algorithm'].startswith('dsa '):
+ o, self['secret-exponent'] = self._parse_multiprecision_integer(
+ secret_key[secret_offset:])
+ secret_offset += o
+ elif self['public-key-algorithm'].startswith('elgamal '):
+ o, self['secret-exponent'] = self._parse_multiprecision_integer(
+ secret_key[secret_offset:])
+ secret_offset += o
+ else:
+ raise NotImplementedError(
+ 'algorithm-specific key fields for {}'.format(
+ self['public-key-algorithm']))
+ if secret_offset != len(secret_key):
+ raise ValueError(
+ ('parsed {} out of {} bytes of algorithm-specific key fields '
+ 'for {}').format(
+ secret_offset, len(secret_key),
+ self['public-key-algorithm']))
def _parse_signature_subpackets(self, data):
offset = 0
offset += unhashed_count
self['signed-hash-word'] = data[offset: offset + 2]
offset += 2
- self['signature'] = data[offset:]
+ self['signature'] = []
+ while offset < len(data):
+ o, mpi = self._parse_multiprecision_integer(data=data[offset:])
+ offset += o
+ self['signature'].append(mpi)
+ if self.key.secret_packets:
+ packets = self.key.secret_packets
+ else:
+ packets = self.key.public_packets
+ if self['signature-type'] == 'standalone':
+ self['target'] = None
+ elif self['signature-type'].endswith(' user id and public-key packet'):
+ self['target'] = [
+ packets[0],
+ [p for p in packets if p['type'] == 'user id packet'][-1],
+ ]
+ elif self['signature-type'].endswith('key binding'):
+ self['target'] = [
+ packets[0],
+ [p for p in packets if p['type'] == 'public-subkey packet'][-1],
+ ]
+ else:
+ raise NotImplementedError(
+ 'target for {}'.format(self['signature-type']))
+ self.verify()
def _parse_signature_creation_time_signature_subpacket(
self, data, subpacket):
subpacket['signature-creation-time'] = _struct.unpack('>I', data)[0]
def _parse_issuer_signature_subpacket(self, data, subpacket):
- subpacket['issuer'] = ''.join('{:02x}'.format(byte) for byte in data)
+ subpacket['issuer'] = byte_string(data=data, sep='')
def _parse_key_expiration_time_signature_subpacket(
self, data, subpacket):
subpacket['features'].add('modification detection')
def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
- subpacket['embedded'] = PGPPacket()
+ subpacket['embedded'] = PGPPacket(key=self.key)
+ subpacket['embedded']['type'] = 'signature packet'
+ subpacket['embedded']['embedded'] = True
subpacket['embedded']._parse_signature_packet(data=data)
+ subpacket['embedded']['raw'] = data
def _parse_user_id_packet(self, data):
self['user'] = str(data, 'utf-8')
def to_bytes(self):
- method_name = '_serialize_{}'.format(self._clean_type())
- method = getattr(self, method_name, None)
- if not method:
- raise NotImplementedError(
- 'cannot serialize packet type {!r}'.format(self['type']))
- body = method()
+ body = self._serialize_body()
+ if body is None:
+ raise ValueError(method)
self['length'] = len(body)
return b''.join([
self._serialize_header(),
length_data,
])
+ def _serialize_body(self):
+ method_name = '_serialize_{}'.format(self._clean_type())
+ method = getattr(self, method_name, None)
+ if not method:
+ raise NotImplementedError(
+ 'cannot serialize packet type {!r}'.format(self['type']))
+ return method()
+
@staticmethod
def _serialize_multiprecision_integer(integer):
r"""Serialize RFC 4880's multipricision integers
integer = integer >> 8
return b''.join(chunks)
+ @classmethod
+ def _encode_string_to_key_count(cls, count):
+ r"""Encode RFC 4880's string-to-key count
+
+ >>> PGPPacket._encode_string_to_key_count(753664)
+ b'\x97'
+ """
+ coded_count = 0
+ count = count >> cls._string_to_key_expbias
+ while not count & 1:
+ count = count >> 1
+ coded_count += 1 << 4
+ coded_count += count & 15
+ return bytes([coded_count])
+
def _serialize_string_to_key_specifier(self):
string_to_key_type = bytes([
self._reverse(
chunks.append(bytes([self._reverse(
self._hash_algorithms, self['string-to-key-hash-algorithm'])]))
chunks.append(self['string-to-key-salt'])
- chunks.append(bytes([self['string-to-key-coded-count']]))
+ chunks.append(self._encode_string_to_key_count(
+ count=self['string-to-key-count']))
else:
raise NotImplementedError(
'string-to-key type {}'.format(self['string-to-key-type']))
self['public-key-algorithm']))
return b''.join(chunks)
+ def _serialize_signature_subpackets(self, subpackets):
+ return b''.join(
+ self._serialize_signature_subpacket(subpacket=subpacket)
+ for subpacket in subpackets)
-def packets_from_bytes(data):
- offset = 0
- while offset < len(data):
- packet = PGPPacket()
- offset += packet.from_bytes(data=data[offset:])
- yield packet
+ def _serialize_signature_subpacket(self, subpacket):
+ method_name = '_serialize_{}_signature_subpacket'.format(
+ self._clean_type(type=subpacket['type']))
+ method = getattr(self, method_name, None)
+ if not method:
+ raise NotImplementedError(
+ 'cannot serialize signature subpacket type {!r}'.format(
+ subpacket['type']))
+ body = method(subpacket=subpacket)
+ length = len(body) + 1
+ chunks = []
+ if length < 192:
+ chunks.append(bytes([length]))
+ else:
+ first = ((length - 192) >> 8) + 192
+ if first < 255:
+ chunks.extend([
+ first,
+ (length - 192) % 256,
+ ])
+ else:
+ chunks.append(_struct.pack('>I', length))
+ chunks.append(bytes([self._reverse(
+ self._signature_subpacket_types, subpacket['type'])]))
+ chunks.append(body)
+ return b''.join(chunks)
+
+ def _serialize_signature_packet_target(self, target):
+ if target is None:
+ return b''
+ elif isinstance(target, bytes):
+ return target
+ elif isinstance(target, PGPPacket):
+ if target['type'].endswith('-subkey packet'):
+ target = target.copy()
+ target['type'] = target['type'].replace(
+ '-subkey packet', '-key packet')
+ serialized = target._serialize_body()
+ if target['type'] in [
+ 'public-key packet',
+ 'public-subkey packet'
+ 'secret-key packet',
+ 'secret-subkey packet'
+ ]:
+ serialized = b''.join([
+ b'\x99',
+ _struct.pack('>H', len(serialized)),
+ serialized,
+ ])
+ elif target['type'] == 'user id packet':
+ serialized = b''.join([
+ b'\xb4',
+ _struct.pack('>I', len(serialized)),
+ serialized,
+ ])
+ elif target['type'] == 'user attribute packet':
+ serialized = b''.join([
+ b'\xd1',
+ _struct.pack('>I', len(serialized)),
+ serialized,
+ ])
+ return serialized
+ elif isinstance(target, list):
+ return b''.join(
+ self._serialize_signature_packet_target(target=x)
+ for x in target)
+
+ def _serialize_hashed_signature_packet(self):
+ if self['signature-version'] != 4:
+ raise NotImplementedError(
+ 'signature packet version {}'.format(
+ self['signature-version']))
+ chunks = [bytes([self['signature-version']])]
+ chunks.append(bytes([self._reverse(
+ self._signature_types, self['signature-type'])]))
+ chunks.append(bytes([self._reverse(
+ self._public_key_algorithms, self['public-key-algorithm'])]))
+ chunks.append(bytes([self._reverse(
+ self._hash_algorithms, self['hash-algorithm'])]))
+ hashed_subpackets = self._serialize_signature_subpackets(
+ self['hashed-subpackets'])
+ chunks.append(_struct.pack('>H', len(hashed_subpackets)))
+ chunks.append(hashed_subpackets)
+ return b''.join(chunks)
+
+ def _signature_packet_signed_data(self, hashed_signature_data):
+ target = self._serialize_signature_packet_target(target=self['target'])
+ return b''.join([
+ target,
+ hashed_signature_data,
+ bytes([self['signature-version']]),
+ b'\xff',
+ _struct.pack('>I', len(hashed_signature_data)),
+ ])
+
+ def _serialize_signature_packet(self):
+ hashed_signature_data = self._serialize_hashed_signature_packet()
+ chunks = [hashed_signature_data]
+ unhashed_subpackets = self._serialize_signature_subpackets(
+ self['unhashed-subpackets'])
+ chunks.append(_struct.pack('>H', len(unhashed_subpackets)))
+ chunks.append(unhashed_subpackets)
+ signed_data = self._signature_packet_signed_data(
+ hashed_signature_data=hashed_signature_data)
+ digest, signature = self.key.sign(
+ data=signed_data, hash_algorithm=self['hash-algorithm'],
+ signature_algorithm=self['public-key-algorithm'])
+ chunks.append(digest[:2])
+ chunks.extend(
+ self._serialize_multiprecision_integer(integer=integer)
+ for integer in signature)
+ return b''.join(chunks)
+
+ def _serialize_signature_creation_time_signature_subpacket(
+ self, subpacket):
+ return _struct.pack('>I', subpacket['signature-creation-time'])
+
+ def _serialize_issuer_signature_subpacket(self, subpacket):
+ return string_bytes(data=subpacket['issuer'], sep='')
+
+ def _serialize_key_expiration_time_signature_subpacket(self, subpacket):
+ return _struct.pack('>I', subpacket['key-expiration-time'])
+
+ def _serialize_preferred_symmetric_algorithms_signature_subpacket(
+ self, subpacket):
+ return bytes(
+ self._reverse(self._symmetric_key_algorithms, a)
+ for a in subpacket['preferred-symmetric-algorithms'])
+
+ def _serialize_preferred_hash_algorithms_signature_subpacket(
+ self, subpacket):
+ return bytes(
+ self._reverse(self._hash_algorithms, a)
+ for a in subpacket['preferred-hash-algorithms'])
+
+ def _serialize_preferred_compression_algorithms_signature_subpacket(
+ self, subpacket):
+ return bytes(
+ self._reverse(self._compression_algorithms, a)
+ for a in subpacket['preferred-compression-algorithms'])
+
+ def _serialize_key_server_preferences_signature_subpacket(self, subpacket):
+ return bytes([
+ 0x80 * ('no-modify' in subpacket['key-server-preferences']) |
+ 0,
+ ])
+
+ def _serialize_primary_user_id_signature_subpacket(self, subpacket):
+ return bytes([0x1 * subpacket['primary-user-id']])
+
+ def _serialize_key_flags_signature_subpacket(self, subpacket):
+ return bytes([
+ 0x1 * ('can certify' in subpacket['key-flags']) |
+ 0x2 * ('can sign' in subpacket['key-flags']) |
+ 0x4 * ('can encrypt communications' in subpacket['key-flags']) |
+ 0x8 * ('can encrypt storage' in subpacket['key-flags']) |
+ 0x10 * ('private split' in subpacket['key-flags']) |
+ 0x20 * ('can authenticate' in subpacket['key-flags']) |
+ 0x80 * ('private shated' in subpacket['key-flags']) |
+ 0,
+ ])
+
+ def _serialize_features_signature_subpacket(self, subpacket):
+ return bytes([
+ 0x1 * ('modification detection' in subpacket['features']) |
+ 0,
+ ])
+
+ def _serialize_embedded_signature_signature_subpacket(self, subpacket):
+ return subpacket['embedded'].to_bytes()
+
+ def _serialize_user_id_packet(self):
+ return self['user'].encode('utf-8')
+
+ def _string_to_key(self, string, key_size):
+ if key_size % 8:
+ raise ValueError(
+ '{}-bit key is not an integer number of bytes'.format(
+ key_size))
+ key_size_bytes = key_size // 8
+ hash_name = self._hashlib_name[
+ self['string-to-key-hash-algorithm']]
+ string_hash = _hashlib.new(hash_name)
+ hashes = _math.ceil(key_size_bytes / string_hash.digest_size)
+ key = b''
+ if self['string-to-key-type'] == 'simple':
+ update_bytes = string
+ elif self['string-to-key-type'] in [
+ 'salted',
+ 'iterated and salted',
+ ]:
+ update_bytes = self['string-to-key-salt'] + string
+ if self['string-to-key-type'] == 'iterated and salted':
+ count = self['string-to-key-count']
+ if count < len(update_bytes):
+ count = len(update_bytes)
+ else:
+ raise NotImplementedError(
+ 'key calculation for string-to-key type {}'.format(
+ self['string-to-key-type']))
+ for padding in range(hashes):
+ string_hash = _hashlib.new(hash_name)
+ string_hash.update(padding * b'\x00')
+ if self['string-to-key-type'] in [
+ 'simple',
+ 'salted',
+ ]:
+ string_hash.update(update_bytes)
+ elif self['string-to-key-type'] == 'iterated and salted':
+ remaining = count
+ while remaining > 0:
+ string_hash.update(update_bytes[:remaining])
+ remaining -= len(update_bytes)
+ key += string_hash.digest()
+ key = key[:key_size_bytes]
+ return key
+
+ def decrypt_symmetric_encryption(self, data):
+ """Decrypt OpenPGP's Cipher Feedback mode"""
+ algorithm = self['symmetric-encryption-algorithm']
+ module = self._crypto_module[algorithm]
+ key_size = self._key_size[algorithm]
+ segment_size_bits = self._cipher_block_size[algorithm]
+ if segment_size_bits % 8:
+ raise NotImplementedError(
+ ('{}-bit segment size for {} is not an integer number of bytes'
+ ).format(segment_size_bits, algorithm))
+ segment_size_bytes = segment_size_bits // 8
+ padding = segment_size_bytes - len(data) % segment_size_bytes
+ if padding:
+ data += b'\x00' * padding
+ if self.key and self.key._cache_passphrase and self.key._passphrase:
+ passphrase = self.key._passphrase
+ else:
+ passphrase = _getpass.getpass(
+ 'passphrase for {}: '.format(self['fingerprint'][-8:]))
+ passphrase = passphrase.encode('ascii')
+ if self.key and self.key._cache_passphrase:
+ self.key._passphrase = passphrase
+ key = self._string_to_key(string=passphrase, key_size=key_size)
+ cipher = module.new(
+ key=key,
+ mode=module.MODE_CFB,
+ IV=self['initial-vector'],
+ segment_size=segment_size_bits)
+ plaintext = cipher.decrypt(data)
+ if padding:
+ plaintext = plaintext[:-padding]
+ return plaintext
+
+ def check_roundtrip(self):
+ serialized = self.to_bytes()
+ source = self['raw']
+ if serialized != source:
+ if len(serialized) != len(source):
+ raise ValueError(
+ ('serialized {} is {} bytes long, '
+ 'but input is {} bytes long').format(
+ self['type'], len(serialized), len(source)))
+ chunk_size = 8
+ for i in range(0, len(source), 8):
+ in_chunk = source[i: i + chunk_size]
+ out_chunk = serialized[i: i + chunk_size]
+ if in_chunk != out_chunk:
+ raise ValueError(
+ ('serialized {} differs from input packet: '
+ 'at byte {}, {} != {}').format(
+ self['type'], i, byte_string(data=out_chunk),
+ byte_string(data=in_chunk)))
+
+ def verify(self):
+ if self['type'] != 'signature packet':
+ raise NotImplmentedError('verify {}'.format(self['type']))
+ hashed_signature_data = self._serialize_hashed_signature_packet()
+ signed_data = self._signature_packet_signed_data(
+ hashed_signature_data=hashed_signature_data)
+ key_packet = None
+ subpackets = self['hashed-subpackets'] + self['unhashed-subpackets']
+ issuer_subpackets = [p for p in subpackets if p['type'] == 'issuer']
+ if issuer_subpackets:
+ issuer = issuer_subpackets[0]
+ packets = (self.key.public_packets or []) + (
+ self.key.secret_packets or [])
+ keys = [k for k in packets
+ if k.get('fingerprint', '').endswith(issuer['issuer'])]
+ if keys:
+ key_packet = keys[-1]
+ else:
+ LOG.info('no packet found for issuer {}'.format(
+ issuer['issuer'][-8:].upper()))
+ return
+ LOG.debug('verify {} with {}'.format(
+ self['signature-type'],
+ key_packet['fingerprint'][-8:].upper()))
+ verified = self.key.verify(
+ data=signed_data, signature=self['signature'],
+ hash_algorithm=self['hash-algorithm'],
+ signature_algorithm=self['public-key-algorithm'],
+ key_packet=key_packet, digest_check=self['signed-hash-word'])
+ if not verified:
+ raise ValueError('verification failed for {}'.format(self))
+ else:
+ LOG.debug('verified {}'.format(self['signature-type']))
class PGPKey (object):
[1]: http://tools.ietf.org/search/rfc4880#section-11.1
[2]: http://tools.ietf.org/search/rfc4880#section-11.2
"""
- def __init__(self, fingerprint):
+ def __init__(self, fingerprint, cache_passphrase=False):
self.fingerprint = fingerprint
+ self._cache_passphrase = cache_passphrase
+ self._passphrase = None
self.public_packets = None
self.secret_packets = None
def import_from_gpg(self):
key_export = _get_stdout(
['gpg', '--export', self.fingerprint])
- self.public_packets = list(
- packets_from_bytes(data=key_export))
+ self.public_packets = []
+ self._packets_from_bytes(list=self.public_packets, data=key_export)
if self.public_packets[0]['type'] != 'public-key packet':
raise ValueError(
'{} does not start with a public-key packet'.format(
self.fingerprint))
key_secret_export = _get_stdout(
['gpg', '--export-secret-keys', self.fingerprint])
- self.secret_packets = list(
- packets_from_bytes(data=key_secret_export))
+ self.secret_packets = []
+ self._packets_from_bytes(list=self.secret_packets, data=key_secret_export)
if self.secret_packets[0]['type'] != 'secret-key packet':
raise ValueError(
'{} does not start with a secret-key packet'.format(
self.fingerprint))
+ for packet in self.public_packets + self.secret_packets:
+ packet.check_roundtrip()
+
+ def _packets_from_bytes(self, list, data):
+ offset = 0
+ while offset < len(data):
+ packet = PGPPacket(key=self)
+ offset += packet.from_bytes(data=data[offset:])
+ list.append(packet)
def export_to_gpg(self):
raise NotImplemetedError('export to gpg')
"""Migrate the (sub)keys into this key"""
pass
-
-def migrate(old_key, new_key):
+ def _get_signer(self, signature_algorithm=None, key_packet=None,
+ secret=False):
+ if key_packet is None:
+ if secret:
+ key_packet = self.secret_packets[0]
+ else:
+ key_packet = self.public_packets[0]
+ elif secret:
+ if 'secret' not in key_packet['type']:
+ raise ValueError(
+ '{} is not a secret key'.format(key_packet['type']))
+ if signature_algorithm is None:
+ signature_algorithm = key_packet['public-key-algorithm']
+ if signature_algorithm != key_packet['public-key-algorithm']:
+ raise ValueError(
+ 'cannot act on a {} signature with a {} key'.format(
+ signature_algorithm, key_packet['public-key-algorithm']))
+ module = key_packet._crypto_module[signature_algorithm]
+ if signature_algorithm.startswith('rsa '):
+ key = module.construct((
+ key_packet['public-modulus'], # n
+ key_packet['public-exponent'], # e
+ ))
+ if secret:
+ LOG.debug('secret')
+ key.d = key_packet['secret-exponent']
+ key.p = key_packet['secret-prime-p']
+ key.q = key_packet['secret-prime-q']
+ key.u = key_packet['secret-inverse-of-p-mod-q']
+ signer = _crypto_signature_pkcs1_v1_5.new(key)
+ elif signature_algorithm.startswith('dsa '):
+ signer = module.construct((
+ key_packet['public-key'], # y
+ key_packet['group-generator'], # g
+ key_packet['prime'], # p
+ key_packet['group-order'], # q
+ ))
+ if secret:
+ signer.x = key_packet['secret-exponent']
+ else:
+ raise NotImplementedError(
+ 'construct {}'.format(signature_algorithm))
+ return (key_packet, signer)
+
+ def _hash(self, data, hash_algorithm, key_packet):
+ hash_name = key_packet._hashlib_name[hash_algorithm]
+ data_hash = _hashlib.new(hash_name)
+ data_hash.update(data)
+ return data_hash
+
+ def verify(self, data, signature, hash_algorithm, signature_algorithm=None,
+ key_packet=None, digest_check=None):
+ key_packet, signer = self._get_signer(
+ signature_algorithm=signature_algorithm, key_packet=key_packet)
+ data_hash = self._hash(
+ data=data, hash_algorithm=hash_algorithm, key_packet=key_packet)
+ digest = data_hash.digest()
+ hexdigest = data_hash.hexdigest()
+ if digest_check and not digest.startswith(digest_check):
+ raise ValueError(
+ 'corrupted hash: {} does not start with {}'.format(
+ byte_string(digest),
+ byte_string(digest_check)))
+ if signature_algorithm.startswith('rsa '):
+ sig_hex = '{:x}'.format(signature[0])
+ signature = string_bytes(data=sig_hex, sep='')
+ elif signature_algorithm.startswith('dsa '):
+ data_hash = digest
+ LOG.debug('verify signature {} on {} with {}'.format(
+ signature, hexdigest, signer))
+ return signer.verify(data_hash, signature)
+
+
+def migrate(old_key, new_key, cache_passphrase=False):
"""Add the old key and sub-keys to the new key
For example, to upgrade your master key, while preserving old
signatures you'd made. You will lose signature *on* your old key
though, since sub-keys can't be signed (I don't think).
"""
- old_key = PGPKey(fingerprint=old_key)
+ old_key = PGPKey(fingerprint=old_key, cache_passphrase=cache_passphrase)
old_key.import_from_gpg()
- new_key = PGPKey(fingerprint=new_key)
+ new_key = PGPKey(fingerprint=new_key, cache_passphrase=cache_passphrase)
new_key.import_from_gpg()
new_key.import_from_key(key=old_key)
import sys as _sys
old_key, new_key = _sys.argv[1:3]
- migrate(old_key=old_key, new_key=new_key)
+ migrate(old_key=old_key, new_key=new_key, cache_passphrase=True)