import getpass as _getpass
import hashlib as _hashlib
+import logging as _logging
import math as _math
import re as _re
import subprocess as _subprocess
import Crypto.PublicKey.DSA as _crypto_publickey_dsa
import Crypto.PublicKey.ElGamal as _crypto_publickey_elgamal
import Crypto.PublicKey.RSA as _crypto_publickey_rsa
+import Crypto.Random.random as _crypto_random_random
+import Crypto.Signature.PKCS1_v1_5 as _crypto_signature_pkcs1_v1_5
+import Crypto.Hash.SHA as _crypto_hash_sha
+
+
+LOG = _logging.getLogger('gpg-migrate')
+LOG.addHandler(_logging.StreamHandler())
+LOG.setLevel(_logging.WARNING)
def _get_stdout(args, stdin=None):
'algorithm-specific key fields for {}'.format(
self['public-key-algorithm']))
fingerprint = _hashlib.sha1()
- fingerprint_target = self
- if self['type'] != 'public-key packet':
- fingerprint_target = self.copy()
- fingerprint_target['type'] = 'public-key packet'
fingerprint.update(
- self._serialize_signature_packet_target(target=fingerprint_target))
+ self._serialize_signature_packet_target(target=self))
self['fingerprint'] = fingerprint.hexdigest()
return offset
offset += unhashed_count
self['signed-hash-word'] = data[offset: offset + 2]
offset += 2
- self['signature'] = data[offset:]
+ self['signature'] = []
+ while offset < len(data):
+ o, mpi = self._parse_multiprecision_integer(data=data[offset:])
+ offset += o
+ self['signature'].append(mpi)
+ if self.key.secret_packets:
+ packets = self.key.secret_packets
+ else:
+ packets = self.key.public_packets
if self['signature-type'] == 'standalone':
self['target'] = None
elif self['signature-type'].endswith(' user id and public-key packet'):
self['target'] = [
- [p for p in self.key.public_packets if p['type'] == 'public-key packet'][-1],
- [p for p in self.key.public_packets if p['type'] == 'user id packet'][-1],
+ packets[0],
+ [p for p in packets if p['type'] == 'user id packet'][-1],
+ ]
+ elif self['signature-type'].endswith('key binding'):
+ self['target'] = [
+ packets[0],
+ [p for p in packets if p['type'] == 'public-subkey packet'][-1],
]
else:
raise NotImplementedError(
'target for {}'.format(self['signature-type']))
+ self.verify()
def _parse_signature_creation_time_signature_subpacket(
self, data, subpacket):
def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
subpacket['embedded'] = PGPPacket(key=self.key)
+ subpacket['embedded']['type'] = 'signature packet'
+ subpacket['embedded']['embedded'] = True
subpacket['embedded']._parse_signature_packet(data=data)
+ subpacket['embedded']['raw'] = data
def _parse_user_id_packet(self, data):
self['user'] = str(data, 'utf-8')
elif isinstance(target, bytes):
return target
elif isinstance(target, PGPPacket):
+ if target['type'].endswith('-subkey packet'):
+ target = target.copy()
+ target['type'] = target['type'].replace(
+ '-subkey packet', '-key packet')
serialized = target._serialize_body()
if target['type'] in [
'public-key packet',
self._serialize_signature_packet_target(target=x)
for x in target)
- def _serialize_signature_packet(self):
+ def _serialize_hashed_signature_packet(self):
if self['signature-version'] != 4:
raise NotImplementedError(
'signature packet version {}'.format(
self['signature-version']))
- signature_version = bytes([self['signature-version']])
- chunks = [signature_version]
+ chunks = [bytes([self['signature-version']])]
chunks.append(bytes([self._reverse(
self._signature_types, self['signature-type'])]))
chunks.append(bytes([self._reverse(
self['hashed-subpackets'])
chunks.append(_struct.pack('>H', len(hashed_subpackets)))
chunks.append(hashed_subpackets)
- hashed_signature_data = b''.join(chunks)
- unhashed_subpackets = self._serialize_signature_subpackets(
- self['unhashed-subpackets'])
- chunks.append(_struct.pack('>H', len(unhashed_subpackets)))
- chunks.append(unhashed_subpackets)
+ return b''.join(chunks)
+
+ def _signature_packet_signed_data(self, hashed_signature_data):
target = self._serialize_signature_packet_target(target=self['target'])
- signed_data = b''.join([
+ return b''.join([
target,
hashed_signature_data,
- signature_version,
+ bytes([self['signature-version']]),
b'\xff',
_struct.pack('>I', len(hashed_signature_data)),
])
+
+ def _serialize_signature_packet(self):
+ hashed_signature_data = self._serialize_hashed_signature_packet()
+ chunks = [hashed_signature_data]
+ unhashed_subpackets = self._serialize_signature_subpackets(
+ self['unhashed-subpackets'])
+ chunks.append(_struct.pack('>H', len(unhashed_subpackets)))
+ chunks.append(unhashed_subpackets)
+ signed_data = self._signature_packet_signed_data(
+ hashed_signature_data=hashed_signature_data)
digest, signature = self.key.sign(
data=signed_data, hash_algorithm=self['hash-algorithm'],
signature_algorithm=self['public-key-algorithm'])
- chunks.extend([digest[:2], signature])
+ chunks.append(digest[:2])
+ chunks.extend(
+ self._serialize_multiprecision_integer(integer=integer)
+ for integer in signature)
return b''.join(chunks)
def _serialize_signature_creation_time_signature_subpacket(
self['type'], i, byte_string(data=out_chunk),
byte_string(data=in_chunk)))
+ def verify(self):
+ if self['type'] != 'signature packet':
+ raise NotImplmentedError('verify {}'.format(self['type']))
+ hashed_signature_data = self._serialize_hashed_signature_packet()
+ signed_data = self._signature_packet_signed_data(
+ hashed_signature_data=hashed_signature_data)
+ key_packet = None
+ subpackets = self['hashed-subpackets'] + self['unhashed-subpackets']
+ issuer_subpackets = [p for p in subpackets if p['type'] == 'issuer']
+ if issuer_subpackets:
+ issuer = issuer_subpackets[0]
+ packets = (self.key.public_packets or []) + (
+ self.key.secret_packets or [])
+ keys = [k for k in packets
+ if k.get('fingerprint', '').endswith(issuer['issuer'])]
+ if keys:
+ key_packet = keys[-1]
+ else:
+ LOG.info('no packet found for issuer {}'.format(
+ issuer['issuer'][-8:].upper()))
+ return
+ LOG.debug('verify {} with {}'.format(
+ self['signature-type'],
+ key_packet['fingerprint'][-8:].upper()))
+ verified = self.key.verify(
+ data=signed_data, signature=self['signature'],
+ hash_algorithm=self['hash-algorithm'],
+ signature_algorithm=self['public-key-algorithm'],
+ key_packet=key_packet, digest_check=self['signed-hash-word'])
+ if not verified:
+ raise ValueError('verification failed for {}'.format(self))
+ else:
+ LOG.debug('verified {}'.format(self['signature-type']))
+
class PGPKey (object):
"""An OpenPGP key with public and private parts.
"""Migrate the (sub)keys into this key"""
pass
+ def _get_signer(self, signature_algorithm=None, key_packet=None,
+ secret=False):
+ if key_packet is None:
+ if secret:
+ key_packet = self.secret_packets[0]
+ else:
+ key_packet = self.public_packets[0]
+ elif secret:
+ if 'secret' not in key_packet['type']:
+ raise ValueError(
+ '{} is not a secret key'.format(key_packet['type']))
+ if signature_algorithm is None:
+ signature_algorithm = key_packet['public-key-algorithm']
+ if signature_algorithm != key_packet['public-key-algorithm']:
+ raise ValueError(
+ 'cannot act on a {} signature with a {} key'.format(
+ signature_algorithm, key_packet['public-key-algorithm']))
+ module = key_packet._crypto_module[signature_algorithm]
+ if signature_algorithm.startswith('rsa '):
+ key = module.construct((
+ key_packet['public-modulus'], # n
+ key_packet['public-exponent'], # e
+ ))
+ if secret:
+ LOG.debug('secret')
+ key.d = key_packet['secret-exponent']
+ key.p = key_packet['secret-prime-p']
+ key.q = key_packet['secret-prime-q']
+ key.u = key_packet['secret-inverse-of-p-mod-q']
+ signer = _crypto_signature_pkcs1_v1_5.new(key)
+ elif signature_algorithm.startswith('dsa '):
+ signer = module.construct((
+ key_packet['public-key'], # y
+ key_packet['group-generator'], # g
+ key_packet['prime'], # p
+ key_packet['group-order'], # q
+ ))
+ if secret:
+ signer.x = key_packet['secret-exponent']
+ else:
+ raise NotImplementedError(
+ 'construct {}'.format(signature_algorithm))
+ return (key_packet, signer)
+
+ def _hash(self, data, hash_algorithm, key_packet):
+ hash_name = key_packet._hashlib_name[hash_algorithm]
+ data_hash = _hashlib.new(hash_name)
+ data_hash.update(data)
+ return data_hash
+
+ def verify(self, data, signature, hash_algorithm, signature_algorithm=None,
+ key_packet=None, digest_check=None):
+ key_packet, signer = self._get_signer(
+ signature_algorithm=signature_algorithm, key_packet=key_packet)
+ data_hash = self._hash(
+ data=data, hash_algorithm=hash_algorithm, key_packet=key_packet)
+ digest = data_hash.digest()
+ hexdigest = data_hash.hexdigest()
+ if digest_check and not digest.startswith(digest_check):
+ raise ValueError(
+ 'corrupted hash: {} does not start with {}'.format(
+ byte_string(digest),
+ byte_string(digest_check)))
+ if signature_algorithm.startswith('rsa '):
+ sig_hex = '{:x}'.format(signature[0])
+ signature = string_bytes(data=sig_hex, sep='')
+ elif signature_algorithm.startswith('dsa '):
+ data_hash = digest
+ LOG.debug('verify signature {} on {} with {}'.format(
+ signature, hexdigest, signer))
+ return signer.verify(data_hash, signature)
+
def migrate(old_key, new_key, cache_passphrase=False):
"""Add the old key and sub-keys to the new key