Preserve public/secret distinction in _serialize_signature_packet_target
[gpg-migrate.git] / gpg-migrate.py
index 13597292ada4e23f1531dd5ee9fdb4ea66171c1c..ad4f0f52b94ac609fc424295dc7e2a265419fbbd 100755 (executable)
@@ -2,6 +2,7 @@
 
 import getpass as _getpass
 import hashlib as _hashlib
+import logging as _logging
 import math as _math
 import re as _re
 import subprocess as _subprocess
@@ -14,6 +15,14 @@ import Crypto.Cipher.DES3 as _crypto_cipher_des3
 import Crypto.PublicKey.DSA as _crypto_publickey_dsa
 import Crypto.PublicKey.ElGamal as _crypto_publickey_elgamal
 import Crypto.PublicKey.RSA as _crypto_publickey_rsa
+import Crypto.Random.random as _crypto_random_random
+import Crypto.Signature.PKCS1_v1_5 as _crypto_signature_pkcs1_v1_5
+import Crypto.Hash.SHA as _crypto_hash_sha
+
+
+LOG = _logging.getLogger('gpg-migrate')
+LOG.addHandler(_logging.StreamHandler())
+LOG.setLevel(_logging.WARNING)
 
 
 def _get_stdout(args, stdin=None):
@@ -581,12 +590,8 @@ class PGPPacket (dict):
                 'algorithm-specific key fields for {}'.format(
                     self['public-key-algorithm']))
         fingerprint = _hashlib.sha1()
-        fingerprint_target = self
-        if self['type'] != 'public-key packet':
-            fingerprint_target = self.copy()
-            fingerprint_target['type'] = 'public-key packet'
         fingerprint.update(
-            self._serialize_signature_packet_target(target=fingerprint_target))
+            self._serialize_signature_packet_target(target=self))
         self['fingerprint'] = fingerprint.hexdigest()
         return offset
 
@@ -742,17 +747,31 @@ class PGPPacket (dict):
         offset += unhashed_count
         self['signed-hash-word'] = data[offset: offset + 2]
         offset += 2
-        self['signature'] = data[offset:]
+        self['signature'] = []
+        while offset < len(data):
+            o, mpi = self._parse_multiprecision_integer(data=data[offset:])
+            offset += o
+            self['signature'].append(mpi)
+        if self.key.secret_packets:
+            packets = self.key.secret_packets
+        else:
+            packets = self.key.public_packets
         if self['signature-type'] == 'standalone':
             self['target'] = None
         elif self['signature-type'].endswith(' user id and public-key packet'):
             self['target'] = [
-                [p for p in self.key.public_packets if p['type'] == 'public-key packet'][-1],
-                [p for p in self.key.public_packets if p['type'] == 'user id packet'][-1],
+                packets[0],
+                [p for p in packets if p['type'] == 'user id packet'][-1],
+                ]
+        elif self['signature-type'].endswith('key binding'):
+            self['target'] = [
+                packets[0],
+                [p for p in packets if p['type'] == 'public-subkey packet'][-1],
                 ]
         else:
             raise NotImplementedError(
                 'target for {}'.format(self['signature-type']))
+        self.verify()
 
     def _parse_signature_creation_time_signature_subpacket(
             self, data, subpacket):
@@ -813,7 +832,10 @@ class PGPPacket (dict):
 
     def _parse_embedded_signature_signature_subpacket(self, data, subpacket):
         subpacket['embedded'] = PGPPacket(key=self.key)
+        subpacket['embedded']['type'] = 'signature packet'
+        subpacket['embedded']['embedded'] = True
         subpacket['embedded']._parse_signature_packet(data=data)
+        subpacket['embedded']['raw'] = data
 
     def _parse_user_id_packet(self, data):
         self['user'] = str(data, 'utf-8')
@@ -994,6 +1016,10 @@ class PGPPacket (dict):
         elif isinstance(target, bytes):
             return target
         elif isinstance(target, PGPPacket):
+            if target['type'].endswith('-subkey packet'):
+                target = target.copy()
+                target['type'] = target['type'].replace(
+                    '-subkey packet', '-key packet')
             serialized = target._serialize_body()
             if target['type'] in [
                     'public-key packet',
@@ -1024,13 +1050,12 @@ class PGPPacket (dict):
                 self._serialize_signature_packet_target(target=x)
                 for x in target)
 
-    def _serialize_signature_packet(self):
+    def _serialize_hashed_signature_packet(self):
         if self['signature-version'] != 4:
             raise NotImplementedError(
                 'signature packet version {}'.format(
                     self['signature-version']))
-        signature_version = bytes([self['signature-version']])
-        chunks = [signature_version]
+        chunks = [bytes([self['signature-version']])]
         chunks.append(bytes([self._reverse(
             self._signature_types, self['signature-type'])]))
         chunks.append(bytes([self._reverse(
@@ -1041,23 +1066,34 @@ class PGPPacket (dict):
             self['hashed-subpackets'])
         chunks.append(_struct.pack('>H', len(hashed_subpackets)))
         chunks.append(hashed_subpackets)
-        hashed_signature_data = b''.join(chunks)
-        unhashed_subpackets = self._serialize_signature_subpackets(
-            self['unhashed-subpackets'])
-        chunks.append(_struct.pack('>H', len(unhashed_subpackets)))
-        chunks.append(unhashed_subpackets)
+        return b''.join(chunks)
+
+    def _signature_packet_signed_data(self, hashed_signature_data):
         target = self._serialize_signature_packet_target(target=self['target'])
-        signed_data = b''.join([
+        return b''.join([
             target,
             hashed_signature_data,
-            signature_version,
+            bytes([self['signature-version']]),
             b'\xff',
             _struct.pack('>I', len(hashed_signature_data)),
             ])
+
+    def _serialize_signature_packet(self):
+        hashed_signature_data = self._serialize_hashed_signature_packet()
+        chunks = [hashed_signature_data]
+        unhashed_subpackets = self._serialize_signature_subpackets(
+            self['unhashed-subpackets'])
+        chunks.append(_struct.pack('>H', len(unhashed_subpackets)))
+        chunks.append(unhashed_subpackets)
+        signed_data = self._signature_packet_signed_data(
+            hashed_signature_data=hashed_signature_data)
         digest, signature = self.key.sign(
             data=signed_data, hash_algorithm=self['hash-algorithm'],
             signature_algorithm=self['public-key-algorithm'])
-        chunks.extend([digest[:2], signature])
+        chunks.append(digest[:2])
+        chunks.extend(
+            self._serialize_multiprecision_integer(integer=integer)
+            for integer in signature)
         return b''.join(chunks)
 
     def _serialize_signature_creation_time_signature_subpacket(
@@ -1217,6 +1253,40 @@ class PGPPacket (dict):
                             self['type'], i, byte_string(data=out_chunk),
                             byte_string(data=in_chunk)))
 
+    def verify(self):
+        if self['type'] != 'signature packet':
+            raise NotImplmentedError('verify {}'.format(self['type']))
+        hashed_signature_data = self._serialize_hashed_signature_packet()
+        signed_data = self._signature_packet_signed_data(
+            hashed_signature_data=hashed_signature_data)
+        key_packet = None
+        subpackets = self['hashed-subpackets'] + self['unhashed-subpackets']
+        issuer_subpackets = [p for p in subpackets if p['type'] == 'issuer']
+        if issuer_subpackets:
+            issuer = issuer_subpackets[0]
+            packets = (self.key.public_packets or []) + (
+                self.key.secret_packets or [])
+            keys = [k for k in packets
+                    if k.get('fingerprint', '').endswith(issuer['issuer'])]
+            if keys:
+                key_packet = keys[-1]
+            else:
+                LOG.info('no packet found for issuer {}'.format(
+                    issuer['issuer'][-8:].upper()))
+                return
+        LOG.debug('verify {} with {}'.format(
+            self['signature-type'],
+            key_packet['fingerprint'][-8:].upper()))
+        verified = self.key.verify(
+            data=signed_data, signature=self['signature'],
+            hash_algorithm=self['hash-algorithm'],
+            signature_algorithm=self['public-key-algorithm'],
+            key_packet=key_packet, digest_check=self['signed-hash-word'])
+        if not verified:
+            raise ValueError('verification failed for {}'.format(self))
+        else:
+            LOG.debug('verified {}'.format(self['signature-type']))
+
 
 class PGPKey (object):
     """An OpenPGP key with public and private parts.
@@ -1311,6 +1381,78 @@ class PGPKey (object):
         """Migrate the (sub)keys into this key"""
         pass
 
+    def _get_signer(self, signature_algorithm=None, key_packet=None,
+                    secret=False):
+        if key_packet is None:
+            if secret:
+                key_packet = self.secret_packets[0]
+            else:
+                key_packet = self.public_packets[0]
+        elif secret:
+            if 'secret' not in key_packet['type']:
+                raise ValueError(
+                    '{} is not a secret key'.format(key_packet['type']))
+        if signature_algorithm is None:
+            signature_algorithm = key_packet['public-key-algorithm']
+        if signature_algorithm != key_packet['public-key-algorithm']:
+            raise ValueError(
+                'cannot act on a {} signature with a {} key'.format(
+                    signature_algorithm, key_packet['public-key-algorithm']))
+        module = key_packet._crypto_module[signature_algorithm]
+        if signature_algorithm.startswith('rsa '):
+            key = module.construct((
+                key_packet['public-modulus'],   # n
+                key_packet['public-exponent'],  # e
+                ))
+            if secret:
+                LOG.debug('secret')
+                key.d = key_packet['secret-exponent']
+                key.p = key_packet['secret-prime-p']
+                key.q = key_packet['secret-prime-q']
+                key.u = key_packet['secret-inverse-of-p-mod-q']
+            signer = _crypto_signature_pkcs1_v1_5.new(key)
+        elif signature_algorithm.startswith('dsa '):
+            signer = module.construct((
+                key_packet['public-key'],       # y
+                key_packet['group-generator'],  # g
+                key_packet['prime'],            # p
+                key_packet['group-order'],      # q
+                ))
+            if secret:
+                signer.x = key_packet['secret-exponent']
+        else:
+            raise NotImplementedError(
+                'construct {}'.format(signature_algorithm))
+        return (key_packet, signer)
+
+    def _hash(self, data, hash_algorithm, key_packet):
+        hash_name = key_packet._hashlib_name[hash_algorithm]
+        data_hash = _hashlib.new(hash_name)
+        data_hash.update(data)
+        return data_hash
+
+    def verify(self, data, signature, hash_algorithm, signature_algorithm=None,
+               key_packet=None, digest_check=None):
+        key_packet, signer = self._get_signer(
+            signature_algorithm=signature_algorithm, key_packet=key_packet)
+        data_hash = self._hash(
+            data=data, hash_algorithm=hash_algorithm, key_packet=key_packet)
+        digest = data_hash.digest()
+        hexdigest = data_hash.hexdigest()
+        if digest_check and not digest.startswith(digest_check):
+            raise ValueError(
+                'corrupted hash: {} does not start with {}'.format(
+                    byte_string(digest),
+                    byte_string(digest_check)))
+        if signature_algorithm.startswith('rsa '):
+            sig_hex = '{:x}'.format(signature[0])
+            signature = string_bytes(data=sig_hex, sep='')
+        elif signature_algorithm.startswith('dsa '):
+            data_hash = digest
+        LOG.debug('verify signature {} on {} with {}'.format(
+            signature, hexdigest, signer))
+        return signer.verify(data_hash, signature)
+
 
 def migrate(old_key, new_key, cache_passphrase=False):
     """Add the old key and sub-keys to the new key