Add signature verification to PGPPacket and PGPKey
authorW. Trevor King <wking@tremily.us>
Tue, 7 Jan 2014 17:33:19 +0000 (09:33 -0800)
committerW. Trevor King <wking@tremily.us>
Tue, 7 Jan 2014 21:17:45 +0000 (13:17 -0800)
From RFC 4880 [1]:

  With RSA signatures, the hash value is encoded using PKCS#1 encoding
  type EMSA-PKCS1-v1_5 as described in Section 9.2 of RFC 3447.  This
  requires inserting the hash value as an octet string into an ASN.1
  structure.  The object identifier for the type of hash being used is
  included in the structure.  The hexadecimal representations for the
  currently defined hash algorithms are as follows: ...

Rather than coding all these object identifiers in myself, I'm
piggybacking on PyCrypto [2,3,4] which already sets up
per-hash-algarithm OIDs.  However, older versions of PyCrypto attached
the OIDs to the PyCrypto-specific hash implementations, and we're
using hashlib's implementations.  Since 59018ff (Hash: Remove "oid"
attributes; add "name" attribute, 2013-02-17, released in PyCrypto
v2.7a1), PyCrypto has been able to handle hashlib hashes, so you'll
need a fairly modern installation to work with this script.

[1]: https://tools.ietf.org/html/rfc4880#section-5.2.2
[2]: http://www.pycrypto.org/
[3]: https://www.dlitz.net/software/pycrypto/
[4]: https://github.com/dlitz/pycrypto/

gpg-migrate.py

index 9ecd9aef929fc4121c1218721edec80738d5b6e9..f25cb3609d2f23c504396e18463b1e6f30330c24 100755 (executable)
@@ -2,6 +2,7 @@
 
 import getpass as _getpass
 import hashlib as _hashlib
+import logging as _logging
 import math as _math
 import re as _re
 import subprocess as _subprocess
@@ -14,6 +15,14 @@ import Crypto.Cipher.DES3 as _crypto_cipher_des3
 import Crypto.PublicKey.DSA as _crypto_publickey_dsa
 import Crypto.PublicKey.ElGamal as _crypto_publickey_elgamal
 import Crypto.PublicKey.RSA as _crypto_publickey_rsa
+import Crypto.Random.random as _crypto_random_random
+import Crypto.Signature.PKCS1_v1_5 as _crypto_signature_pkcs1_v1_5
+import Crypto.Hash.SHA as _crypto_hash_sha
+
+
+LOG = _logging.getLogger('gpg-migrate')
+LOG.addHandler(_logging.StreamHandler())
+LOG.setLevel(_logging.WARNING)
 
 
 def _get_stdout(args, stdin=None):
@@ -762,6 +771,7 @@ class PGPPacket (dict):
         else:
             raise NotImplementedError(
                 'target for {}'.format(self['signature-type']))
+        self.verify()
 
     def _parse_signature_creation_time_signature_subpacket(
             self, data, subpacket):
@@ -1246,6 +1256,40 @@ class PGPPacket (dict):
                             self['type'], i, byte_string(data=out_chunk),
                             byte_string(data=in_chunk)))
 
+    def verify(self):
+        if self['type'] != 'signature packet':
+            raise NotImplmentedError('verify {}'.format(self['type']))
+        hashed_signature_data = self._serialize_hashed_signature_packet()
+        signed_data = self._signature_packet_signed_data(
+            hashed_signature_data=hashed_signature_data)
+        key_packet = None
+        subpackets = self['hashed-subpackets'] + self['unhashed-subpackets']
+        issuer_subpackets = [p for p in subpackets if p['type'] == 'issuer']
+        if issuer_subpackets:
+            issuer = issuer_subpackets[0]
+            packets = (self.key.public_packets or []) + (
+                self.key.secret_packets or [])
+            keys = [k for k in packets
+                    if k.get('fingerprint', '').endswith(issuer['issuer'])]
+            if keys:
+                key_packet = keys[-1]
+            else:
+                LOG.info('no packet found for issuer {}'.format(
+                    issuer['issuer'][-8:].upper()))
+                return
+        LOG.debug('verify {} with {}'.format(
+            self['signature-type'],
+            key_packet['fingerprint'][-8:].upper()))
+        verified = self.key.verify(
+            data=signed_data, signature=self['signature'],
+            hash_algorithm=self['hash-algorithm'],
+            signature_algorithm=self['public-key-algorithm'],
+            key_packet=key_packet, digest_check=self['signed-hash-word'])
+        if not verified:
+            raise ValueError('verification failed for {}'.format(self))
+        else:
+            LOG.debug('verified {}'.format(self['signature-type']))
+
 
 class PGPKey (object):
     """An OpenPGP key with public and private parts.
@@ -1340,6 +1384,78 @@ class PGPKey (object):
         """Migrate the (sub)keys into this key"""
         pass
 
+    def _get_signer(self, signature_algorithm=None, key_packet=None,
+                    secret=False):
+        if key_packet is None:
+            if secret:
+                key_packet = self.secret_packets[0]
+            else:
+                key_packet = self.public_packets[0]
+        elif secret:
+            if 'secret' not in key_packet['type']:
+                raise ValueError(
+                    '{} is not a secret key'.format(key_packet['type']))
+        if signature_algorithm is None:
+            signature_algorithm = key_packet['public-key-algorithm']
+        if signature_algorithm != key_packet['public-key-algorithm']:
+            raise ValueError(
+                'cannot act on a {} signature with a {} key'.format(
+                    signature_algorithm, key_packet['public-key-algorithm']))
+        module = key_packet._crypto_module[signature_algorithm]
+        if signature_algorithm.startswith('rsa '):
+            key = module.construct((
+                key_packet['public-modulus'],   # n
+                key_packet['public-exponent'],  # e
+                ))
+            if secret:
+                LOG.debug('secret')
+                key.d = key_packet['secret-exponent']
+                key.p = key_packet['secret-prime-p']
+                key.q = key_packet['secret-prime-q']
+                key.u = key_packet['secret-inverse-of-p-mod-q']
+            signer = _crypto_signature_pkcs1_v1_5.new(key)
+        elif signature_algorithm.startswith('dsa '):
+            signer = module.construct((
+                key_packet['public-key'],       # y
+                key_packet['group-generator'],  # g
+                key_packet['prime'],            # p
+                key_packet['group-order'],      # q
+                ))
+            if secret:
+                signer.x = key_packet['secret-exponent']
+        else:
+            raise NotImplementedError(
+                'construct {}'.format(signature_algorithm))
+        return (key_packet, signer)
+
+    def _hash(self, data, hash_algorithm, key_packet):
+        hash_name = key_packet._hashlib_name[hash_algorithm]
+        data_hash = _hashlib.new(hash_name)
+        data_hash.update(data)
+        return data_hash
+
+    def verify(self, data, signature, hash_algorithm, signature_algorithm=None,
+               key_packet=None, digest_check=None):
+        key_packet, signer = self._get_signer(
+            signature_algorithm=signature_algorithm, key_packet=key_packet)
+        data_hash = self._hash(
+            data=data, hash_algorithm=hash_algorithm, key_packet=key_packet)
+        digest = data_hash.digest()
+        hexdigest = data_hash.hexdigest()
+        if digest_check and not digest.startswith(digest_check):
+            raise ValueError(
+                'corrupted hash: {} does not start with {}'.format(
+                    byte_string(digest),
+                    byte_string(digest_check)))
+        if signature_algorithm.startswith('rsa '):
+            sig_hex = '{:x}'.format(signature[0])
+            signature = string_bytes(data=sig_hex, sep='')
+        elif signature_algorithm.startswith('dsa '):
+            data_hash = digest
+        LOG.debug('verify signature {} on {} with {}'.format(
+            signature, hexdigest, signer))
+        return signer.verify(data_hash, signature)
+
 
 def migrate(old_key, new_key, cache_passphrase=False):
     """Add the old key and sub-keys to the new key