From 7d4ff835519e6a8fa4273c364b571c2874bd31d5 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Thu, 20 Sep 2012 10:12:38 -0400 Subject: [PATCH] signature: add Signature class for more Pythonic verification. Now verify_bytes() returns a list of `Signature`s instead of XML. This should be much easier for callers to handle, and it provides a layer of insulation between the gpgme-tool output and Python code. --- pgp_mime/crypt.py | 113 +++++++------ pgp_mime/pgp.py | 150 +++++++++-------- pgp_mime/signature.py | 384 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 522 insertions(+), 125 deletions(-) create mode 100644 pgp_mime/signature.py diff --git a/pgp_mime/crypt.py b/pgp_mime/crypt.py index 82a0cac..fc0cea2 100644 --- a/pgp_mime/crypt.py +++ b/pgp_mime/crypt.py @@ -26,6 +26,7 @@ from pyassuan import client as _client from pyassuan import common as _common from . import LOG as _LOG +from . import signature as _signature def connect(client, filename, **kwargs): @@ -230,35 +231,37 @@ def verify_bytes(data, signature=None, always_trust=False): ... '-----END PGP MESSAGE-----', ... '', ... ]).encode('us-ascii') - >>> output,verified,result = verify_bytes(b) + >>> output,verified,signatures = verify_bytes(b) >>> output b'Success!\n' >>> verified False - >>> print(str(result, 'utf-8').replace('\x00', '')) + >>> for s in signatures: + ... print(s.dumps()) ... # doctest: +REPORT_UDIFF - - - - - - - B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 - Success <Unspecified source> - - - - - - - Success <Unspecified source> - RSA - SHA256 - - - - - + B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature: + summary: + CRL missing: False + CRL too old: False + bad policy: False + green: False + key expired: False + key missing: False + key revoked: False + red: False + signature expired: False + system error: False + valid: False + status: success + timestamp: Wed Mar 21 19:13:57 2012 + expiration timestamp: None + wrong key usage: False + pka trust: not available + chain model: False + validity: unknown + validity reason: success + public key algorithm: RSA + hash algorithm: SHA256 >>> b = b'Success!\n' >>> signature = '\n'.join([ ... '-----BEGIN PGP SIGNATURE-----', @@ -274,35 +277,37 @@ def verify_bytes(data, signature=None, always_trust=False): ... '-----END PGP SIGNATURE-----', ... '', ... ]).encode('us-ascii') - >>> output,verified,result = verify_bytes(b, signature=signature) + >>> output,verified,signatures = verify_bytes(b, signature=signature) >>> output b'Success!\n' >>> verified False - >>> print(str(result, 'utf-8').replace('\x00', '')) + >>> for s in signatures: + ... print(s.dumps()) ... # doctest: +REPORT_UDIFF - - - - - - - B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 - Success <Unspecified source> - - - - - - - Success <Unspecified source> - RSA - SHA1 - - - - - + B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature: + summary: + CRL missing: False + CRL too old: False + bad policy: False + green: False + key expired: False + key missing: False + key revoked: False + red: False + signature expired: False + system error: False + valid: False + status: success + timestamp: Wed Mar 21 19:30:07 2012 + expiration timestamp: None + wrong key usage: False + pka trust: not available + chain model: False + validity: unknown + validity reason: success + public key algorithm: RSA + hash algorithm: SHA1 """ input_read,input_write = _os.pipe() pass_fds = [input_read] @@ -320,7 +325,8 @@ def verify_bytes(data, signature=None, always_trust=False): _os.close(message_read) else: _os.close(output_write) - verified = result = None + verified = None + signatures = [] try: hello(client) client.make_request( @@ -348,15 +354,16 @@ def verify_bytes(data, signature=None, always_trust=False): else: plain = _read(output_read) rs,result = client.make_request(_common.Request('RESULT')) + signatures = list(_signature.verify_result_signatures(result)) verified = True - for line in result.splitlines(): - if b'= 0: _os.close(fd) - return (plain, verified, result) + return (plain, verified, signatures) diff --git a/pgp_mime/pgp.py b/pgp_mime/pgp.py index 37ffe49..fa45bda 100644 --- a/pgp_mime/pgp.py +++ b/pgp_mime/pgp.py @@ -433,7 +433,7 @@ def verify(message): >>> message['To'] = 'pgp-mime-test ' >>> encrypted = sign_and_encrypt(message, signers=['pgp-mime@invalid.com'], ... always_trust=True) - >>> decrypted,verified,result = verify(encrypted) + >>> decrypted,verified,signatures = verify(encrypted) >>> print(decrypted.as_string().replace('\r\n', '\n')) ... # doctest: +ELLIPSIS, +REPORT_UDIFF Content-Type: text/plain; charset="us-ascii" @@ -446,37 +446,39 @@ def verify(message): Bye >>> verified False - >>> print(str(result, 'utf-8').replace('\x00', '')) + >>> for s in signatures: + ... print(s.dumps()) # doctest: +REPORT_UDIFF ... # doctest: +REPORT_UDIFF, +ELLIPSIS - - - - - - - B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 - Success <Unspecified source> - - - - - - - Success <Unspecified source> - RSA - SHA256 - - - - - + B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature: + summary: + CRL missing: False + CRL too old: False + bad policy: False + green: False + key expired: False + key missing: False + key revoked: False + red: False + signature expired: False + system error: False + valid: False + status: success + timestamp: ... + expiration timestamp: None + wrong key usage: False + pka trust: not available + chain model: False + validity: unknown + validity reason: success + public key algorithm: RSA + hash algorithm: SHA256 >>> from email.mime.multipart import MIMEMultipart >>> message = MIMEMultipart() >>> message.attach(encodedMIMEText('Part A')) >>> message.attach(encodedMIMEText('Part B')) >>> signed = sign(message, signers=['pgp-mime@invalid.com']) - >>> decrypted,verified,result = verify(signed) + >>> decrypted,verified,signatures = verify(signed) >>> decrypted.set_boundary('boundsep') >>> print(decrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF Content-Type: multipart/mixed; boundary="boundsep" @@ -499,30 +501,32 @@ def verify(message): --boundsep-- >>> verified False - >>> print(str(result, 'utf-8').replace('\x00', '')) + >>> for s in signatures: + ... print(s.dumps()) # doctest: +REPORT_UDIFF ... # doctest: +REPORT_UDIFF, +ELLIPSIS - - - - - - - B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 - Success <Unspecified source> - - - - - - - Success <Unspecified source> - RSA - SHA1 - - - - - + B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature: + summary: + CRL missing: False + CRL too old: False + bad policy: False + green: False + key expired: False + key missing: False + key revoked: False + red: False + signature expired: False + system error: False + valid: False + status: success + timestamp: ... + expiration timestamp: None + wrong key usage: False + pka trust: not available + chain model: False + validity: unknown + validity reason: success + public key algorithm: RSA + hash algorithm: SHA1 Test a message generated by Mutt (for sanity): @@ -569,7 +573,7 @@ def verify(message): ... b'--kORqDWCi7qDJ0mEj--', ... b'']) >>> message = message_from_bytes(message_bytes) - >>> decrypted,verified,result = verify(message) + >>> decrypted,verified,signatures = verify(message) >>> print(decrypted.as_string()) # doctest: +ELLIPSIS, +REPORT_UDIFF Content-Type: text/plain; charset=us-ascii Content-Disposition: inline @@ -578,30 +582,32 @@ def verify(message): >>> verified False - >>> print(str(result, 'utf-8').replace('\x00', '')) + >>> for s in signatures: + ... print(s.dumps()) # doctest: +REPORT_UDIFF ... # doctest: +REPORT_UDIFF, +ELLIPSIS - - - - - - - B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 - Success <Unspecified source> - - - - - - - Success <Unspecified source> - RSA - SHA1 - - - - - + B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature: + summary: + CRL missing: False + CRL too old: False + bad policy: False + green: False + key expired: False + key missing: False + key revoked: False + red: False + signature expired: False + system error: False + valid: False + status: success + timestamp: Tue Apr 24 23:46:57 2012 + expiration timestamp: None + wrong key usage: False + pka trust: not available + chain model: False + validity: unknown + validity reason: success + public key algorithm: RSA + hash algorithm: SHA1 """ ct = message.get_content_type() if ct == 'multipart/encrypted': diff --git a/pgp_mime/signature.py b/pgp_mime/signature.py new file mode 100644 index 0000000..f34f748 --- /dev/null +++ b/pgp_mime/signature.py @@ -0,0 +1,384 @@ +# Copyright + +"""A Python version of GPGME verification signatures. + +See the `GPGME manual`_ for details. + +.. GPGME manual: http://www.gnupg.org/documentation/manuals/gpgme/Verify.html +""" + +import pprint as _pprint +import time as _time +import xml.etree.ElementTree as _etree + + +class Signature (object): + """Python version of ``gpgme_signature_t`` + + >>> from pprint import pprint + >>> s = Signature() + + You can set flag fields using their integer value (from C). + + >>> s.set_summary(0x3) + + This sets up a convenient dictionary. + + >>> pprint(s.summary) + {'CRL missing': False, + 'CRL too old': False, + 'bad policy': False, + 'green': True, + 'key expired': False, + 'key missing': False, + 'key revoked': False, + 'red': False, + 'signature expired': False, + 'system error': False, + 'valid': True} + + If you alter the dictionary, it's easy to convert back to the + equivalent integer value. + + >>> s.summary['green'] = s.summary['valid'] = False + >>> s.summary['red'] = s.summary['key expired'] = True + >>> type(s.get_summary()) + + >>> '0x{:x}'.format(s.get_summary()) + '0x24' + + If you try and parse a flag field, but have some wonky input, you + get a helpful error. + + >>> s.set_summary(-1) + Traceback (most recent call last): + ... + ValueError: invalid flags for summary (-1) + >>> s.set_summary(0x1024) + Traceback (most recent call last): + ... + ValueError: unknown flags for summary (0x1000) + + You can set enumerated fields using their integer value. + + >>> s.set_status(94) + >>> s.status + 'certificate revoked' + >>> s.status = 'bad signature' + >>> s.get_status() + 8 + + >>> s.fingerprint = 'ABCDEFG' + >>> print(s.dumps()) # doctest: +REPORT_UDIFF + ABCDEFG signature: + summary: + CRL missing: False + CRL too old: False + bad policy: False + green: False + key expired: True + key missing: False + key revoked: False + red: True + signature expired: False + system error: False + valid: False + status: bad signature + >>> print(s.dumps(prefix='xx')) # doctest: +REPORT_UDIFF + xxABCDEFG signature: + xx summary: + xx CRL missing: False + xx CRL too old: False + xx bad policy: False + xx green: False + xx key expired: True + xx key missing: False + xx key revoked: False + xx red: True + xx signature expired: False + xx system error: False + xx valid: False + xx status: bad signature + """ + _error_enum = { # GPG_ERR_* in gpg-error.h + 0: 'success', + 1: 'general error', + 8: 'bad signature', + 9: 'no public key', + 94: 'certificate revoked', + 153: 'key expired', + 154: 'signature expired', + # lots more, to be included as they occur in the wild + } + _error_enum_inv = dict((v,k) for k,v in _error_enum.items()) + + _summary_flags = { # GPGME_SIGSUM_* in gpgme.h + 0x001: 'valid', + 0x002: 'green', + 0x004: 'red', + 0x008: 'key revoked', + 0x020: 'key expired', + 0x040: 'signature expired', + 0x080: 'key missing', + 0x100: 'CRL missing', + 0x200: 'CRL too old', + 0x400: 'bad policy', + 0x800: 'system error', + } + + _pka_trust_enum = { # struct _gpgme_signature in gpgme.h + 0: 'not available', + 1: 'bad', + 2: 'good', + 3: 'reserved', + } + _pka_trust_enum_inv = dict((v,k) for k,v in _pka_trust_enum.items()) + + _validity_enum = { # GPGME_VALIDITY_* in gpgme.h + 0: 'unknown', + 1: 'undefined', + 2: 'never', + 3: 'marginal', + 4: 'full', + 5: 'ultimate', + } + _validity_enum_inv = dict((v,k) for k,v in _validity_enum.items()) + + _public_key_algorithm_enum = { # GPGME_PK_* in gpgme.h + 1: 'RSA', # Rivest, Shamir, Adleman + 2: 'RSA for encryption and decryption only', + 3: 'RSA for signing and verification only', + 16: 'ELGamal in GnuPG', + 17: 'DSA', # Digital Signature Algorithm + 20: 'ELGamal', + 301: 'ECDSA', # Elliptic Curve Digital Signature Algorithm + 302: 'ECDH', # Elliptic curve Diffie-Hellman + } + _public_key_algorithm_enum_inv = dict( + (v,k) for k,v in _public_key_algorithm_enum.items()) + + _hash_algorithm_enum = { # GPGME_MD_* in gpgme.h + 0: 'none', + 1: 'MD5', + 2: 'SHA1', + 3: 'RMD160', + 5: 'MD2', + 6: 'TIGER/192', + 7: 'HAVAL, 5 pass, 160 bit', + 8: 'SHA256', + 9: 'SHA384', + 10: 'SHA512', + 301: 'MD4', + 302: 'CRC32', + 303: 'CRC32 RFC1510', + 304: 'CRC24 RFC2440', + } + _hash_algorithm_enum_inv = dict( + (v,k) for k,v in _hash_algorithm_enum.items()) + + def __init__(self, summary=None, fingerprint=None, status=None, + notations=None, timestamp=None, expiration_timestamp=None, + wrong_key_usage=None, pka_trust=None, chain_model=None, + validity=None, validity_reason=None, + public_key_algorithm=None, hash_algorithm=None): + self.summary = summary + self.fingerprint = fingerprint + self.status = status + self.notations = notations + self.timestamp = timestamp + self.expiration_timestamp = expiration_timestamp + self.wrong_key_usage = wrong_key_usage + self.pka_trust = pka_trust + self.chain_model = chain_model + self.validity = validity + self.validity_reason = validity_reason + self.public_key_algorithm = public_key_algorithm + self.hash_algorithm = hash_algorithm + + def _set_flags(self, attribute, value, flags): + if value < 0: + raise ValueError( + 'invalid flags for {} ({})'.format(attribute, value)) + d = {} + for flag,name in flags.items(): + x = flag & value + d[name] = bool(x) + value -= x + if value: + raise ValueError( + 'unknown flags for {} (0x{:x})'.format(attribute, value)) + setattr(self, attribute, d) + + def _get_flags(self, attribute, flags): + value = 0 + d = getattr(self, attribute) + for flag,name in flags.items(): + if d[name]: + value |= flag + return value + + def set_summary(self, value): + self._set_flags('summary', value, self._summary_flags) + + def get_summary(self): + return self._get_flags('summary', self._summary_flags) + + def set_status(self, value): + self.status = self._error_enum[value] + + def get_status(self): + return self._error_enum_inv[self.status] + + def set_pka_trust(self, value): + self.pka_trust = self._pka_trust_enum[value] + + def get_pka_trust(self): + return self._pka_trust_enum_inv[self.pka_trust] + + def set_validity(self, value): + self.validity = self._validity_enum[value] + + def get_validity(self): + return self._error_validity_inv[self.validity] + + def set_validity_reason(self, value): + self.validity_reason = self._error_enum[value] + + def get_validity_reason(self): + return self._error_enum_inv[self.validity_reason] + + def set_public_key_algorithm(self, value): + self.public_key_algorithm = self._public_key_algorithm_enum[value] + + def get_public_key_algorithm(self): + return self._error_pubkey_algorithm_inv[self.public_key_algorithm] + + def set_hash_algorithm(self, value): + self.hash_algorithm = self._hash_algorithm_enum[value] + + def get_hash_algorithm(self): + return self._error_hash_algorithm_inv[self.hash_algorithm] + + def dumps(self, prefix=''): + lines = ['{}{} signature:'.format(prefix, self.fingerprint)] + for attribute in ['summary', 'status', 'notations', 'timestamp', + 'expiration_timestamp', 'wrong_key_usage', + 'pka_trust', 'chain_model', 'validity', + 'validity_reason', 'public_key_algorithm', + 'hash_algorithm']: + label = attribute.replace('_', ' ') + value = getattr(self, attribute) + if value is None: + continue # no information + elif attribute.endswith('timestamp'): + if value == 0 and attribute == 'expiration_timestamp': + value = None + else: + value = _time.asctime(_time.gmtime(value)) + if isinstance(value, dict): # flag field + lines.append(' {}:'.format(label)) + lines.extend( + [' {}: {}'.format(k,v) + for k,v in sorted(value.items())]) + else: + lines.append(' {}: {}'.format(label, value)) + sep = '\n{}'.format(prefix) + return sep.join(lines) + + +def verify_result_signatures(result): + """ + >>> from pprint import pprint + >>> result = b'\\n'.join([ + ... b'', + ... b'', + ... b' ', + ... b' ', + ... b' ', + ... b' ', + ... b' B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3', + ... b' Success <Unspecified source>', + ... b' ', + ... b' ', + ... b' ', + ... b' ', + ... b' ', + ... b' ', + ... b' Success <Unspecified source>', + ... b' RSA', + ... b' SHA1', + ... b' ', + ... b' ', + ... b' ', + ... b'', + ... b'', + ... ]) + >>> signatures = list(verify_result_signatures(result)) + >>> signatures # doctest: +ELLIPSIS + [] + >>> for s in signatures: + ... print(s.dumps()) # doctest: +REPORT_UDIFF + B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature: + summary: + CRL missing: False + CRL too old: False + bad policy: False + green: False + key expired: False + key missing: False + key revoked: False + red: False + signature expired: False + system error: False + valid: False + status: success + timestamp: Wed Mar 21 19:30:07 2012 + expiration timestamp: None + wrong key usage: False + pka trust: not available + chain model: False + validity: unknown + validity reason: success + public key algorithm: RSA + hash algorithm: SHA1 + """ + tag_mapping = { + 'exp-timestamp': 'expiration_timestamp', + 'fpr': 'fingerprint', + 'pubkey-algo': 'public_key_algorithm', + 'hash-algo': 'hash_algorithm', + } + tree = _etree.fromstring(result.replace(b'\x00', b'')) + for signature in tree.findall('.//signature'): + s = Signature() + for child in signature.iter(): + if child == signature: # iter() includes the root element + continue + attribute = tag_mapping.get(child.tag, child.tag.replace('-', '_')) + if child.tag in ['summary', 'wrong-key-usage', 'pka-trust', + 'chain-model', 'validity', 'pubkey-algo', + 'hash-algo']: + value = child.get('value') + if not value.startswith('0x'): + raise NotImplementedError('summary value {}'.format(value)) + value = int(value, 16) + if attribute in ['wrong_key_usage', 'chain_model']: + value = bool(value) # boolean + else: # flags or enum + setter = getattr(s, 'set_{}'.format(attribute)) + setter(value) + continue + elif child.tag in ['timestamp', 'exp-timestamp']: + value = child.get('unix') + if value.endswith('i'): + value = int(value[:-1]) + else: + raise NotImplementedError('timestamp value {}'.format(value)) + elif child.tag in ['fpr', 'status', 'validity-reason']: + value = child.text + if value.endswith(' '): + value = value[:-len(' ')].lower() + else: + raise NotImplementedError(child.tag) + setattr(s, attribute, value) + yield s -- 2.26.2