From: W. Trevor King Date: Tue, 7 Jan 2014 17:33:19 +0000 (-0800) Subject: Add signature verification to PGPPacket and PGPKey X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=f5e936a32fc016c24eabac90ce3b638acb7eafe4;p=gpg-migrate.git Add signature verification to PGPPacket and PGPKey From RFC 4880 [1]: With RSA signatures, the hash value is encoded using PKCS#1 encoding type EMSA-PKCS1-v1_5 as described in Section 9.2 of RFC 3447. This requires inserting the hash value as an octet string into an ASN.1 structure. The object identifier for the type of hash being used is included in the structure. The hexadecimal representations for the currently defined hash algorithms are as follows: ... Rather than coding all these object identifiers in myself, I'm piggybacking on PyCrypto [2,3,4] which already sets up per-hash-algarithm OIDs. However, older versions of PyCrypto attached the OIDs to the PyCrypto-specific hash implementations, and we're using hashlib's implementations. Since 59018ff (Hash: Remove "oid" attributes; add "name" attribute, 2013-02-17, released in PyCrypto v2.7a1), PyCrypto has been able to handle hashlib hashes, so you'll need a fairly modern installation to work with this script. [1]: https://tools.ietf.org/html/rfc4880#section-5.2.2 [2]: http://www.pycrypto.org/ [3]: https://www.dlitz.net/software/pycrypto/ [4]: https://github.com/dlitz/pycrypto/ --- diff --git a/gpg-migrate.py b/gpg-migrate.py index 9ecd9ae..f25cb36 100755 --- a/gpg-migrate.py +++ b/gpg-migrate.py @@ -2,6 +2,7 @@ import getpass as _getpass import hashlib as _hashlib +import logging as _logging import math as _math import re as _re import subprocess as _subprocess @@ -14,6 +15,14 @@ import Crypto.Cipher.DES3 as _crypto_cipher_des3 import Crypto.PublicKey.DSA as _crypto_publickey_dsa import Crypto.PublicKey.ElGamal as _crypto_publickey_elgamal import Crypto.PublicKey.RSA as _crypto_publickey_rsa +import Crypto.Random.random as _crypto_random_random +import Crypto.Signature.PKCS1_v1_5 as _crypto_signature_pkcs1_v1_5 +import Crypto.Hash.SHA as _crypto_hash_sha + + +LOG = _logging.getLogger('gpg-migrate') +LOG.addHandler(_logging.StreamHandler()) +LOG.setLevel(_logging.WARNING) def _get_stdout(args, stdin=None): @@ -762,6 +771,7 @@ class PGPPacket (dict): else: raise NotImplementedError( 'target for {}'.format(self['signature-type'])) + self.verify() def _parse_signature_creation_time_signature_subpacket( self, data, subpacket): @@ -1246,6 +1256,40 @@ class PGPPacket (dict): self['type'], i, byte_string(data=out_chunk), byte_string(data=in_chunk))) + def verify(self): + if self['type'] != 'signature packet': + raise NotImplmentedError('verify {}'.format(self['type'])) + hashed_signature_data = self._serialize_hashed_signature_packet() + signed_data = self._signature_packet_signed_data( + hashed_signature_data=hashed_signature_data) + key_packet = None + subpackets = self['hashed-subpackets'] + self['unhashed-subpackets'] + issuer_subpackets = [p for p in subpackets if p['type'] == 'issuer'] + if issuer_subpackets: + issuer = issuer_subpackets[0] + packets = (self.key.public_packets or []) + ( + self.key.secret_packets or []) + keys = [k for k in packets + if k.get('fingerprint', '').endswith(issuer['issuer'])] + if keys: + key_packet = keys[-1] + else: + LOG.info('no packet found for issuer {}'.format( + issuer['issuer'][-8:].upper())) + return + LOG.debug('verify {} with {}'.format( + self['signature-type'], + key_packet['fingerprint'][-8:].upper())) + verified = self.key.verify( + data=signed_data, signature=self['signature'], + hash_algorithm=self['hash-algorithm'], + signature_algorithm=self['public-key-algorithm'], + key_packet=key_packet, digest_check=self['signed-hash-word']) + if not verified: + raise ValueError('verification failed for {}'.format(self)) + else: + LOG.debug('verified {}'.format(self['signature-type'])) + class PGPKey (object): """An OpenPGP key with public and private parts. @@ -1340,6 +1384,78 @@ class PGPKey (object): """Migrate the (sub)keys into this key""" pass + def _get_signer(self, signature_algorithm=None, key_packet=None, + secret=False): + if key_packet is None: + if secret: + key_packet = self.secret_packets[0] + else: + key_packet = self.public_packets[0] + elif secret: + if 'secret' not in key_packet['type']: + raise ValueError( + '{} is not a secret key'.format(key_packet['type'])) + if signature_algorithm is None: + signature_algorithm = key_packet['public-key-algorithm'] + if signature_algorithm != key_packet['public-key-algorithm']: + raise ValueError( + 'cannot act on a {} signature with a {} key'.format( + signature_algorithm, key_packet['public-key-algorithm'])) + module = key_packet._crypto_module[signature_algorithm] + if signature_algorithm.startswith('rsa '): + key = module.construct(( + key_packet['public-modulus'], # n + key_packet['public-exponent'], # e + )) + if secret: + LOG.debug('secret') + key.d = key_packet['secret-exponent'] + key.p = key_packet['secret-prime-p'] + key.q = key_packet['secret-prime-q'] + key.u = key_packet['secret-inverse-of-p-mod-q'] + signer = _crypto_signature_pkcs1_v1_5.new(key) + elif signature_algorithm.startswith('dsa '): + signer = module.construct(( + key_packet['public-key'], # y + key_packet['group-generator'], # g + key_packet['prime'], # p + key_packet['group-order'], # q + )) + if secret: + signer.x = key_packet['secret-exponent'] + else: + raise NotImplementedError( + 'construct {}'.format(signature_algorithm)) + return (key_packet, signer) + + def _hash(self, data, hash_algorithm, key_packet): + hash_name = key_packet._hashlib_name[hash_algorithm] + data_hash = _hashlib.new(hash_name) + data_hash.update(data) + return data_hash + + def verify(self, data, signature, hash_algorithm, signature_algorithm=None, + key_packet=None, digest_check=None): + key_packet, signer = self._get_signer( + signature_algorithm=signature_algorithm, key_packet=key_packet) + data_hash = self._hash( + data=data, hash_algorithm=hash_algorithm, key_packet=key_packet) + digest = data_hash.digest() + hexdigest = data_hash.hexdigest() + if digest_check and not digest.startswith(digest_check): + raise ValueError( + 'corrupted hash: {} does not start with {}'.format( + byte_string(digest), + byte_string(digest_check))) + if signature_algorithm.startswith('rsa '): + sig_hex = '{:x}'.format(signature[0]) + signature = string_bytes(data=sig_hex, sep='') + elif signature_algorithm.startswith('dsa '): + data_hash = digest + LOG.debug('verify signature {} on {} with {}'.format( + signature, hexdigest, signer)) + return signer.verify(data_hash, signature) + def migrate(old_key, new_key, cache_passphrase=False): """Add the old key and sub-keys to the new key