crypt: fix get_smtp_params -> get_client_params in doctest.
[pgp-mime.git] / pgp_mime / crypt.py
index aee4c17d0bee72016ffc099fd63f6364118759ac..789a642a2acc35f654691ed0e475d5f205fc5452 100644 (file)
-# Copyright
+# Copyright (C) 2012 W. Trevor King <wking@tremily.us>
+#
+# This file is part of pgp-mime.
+#
+# pgp-mime is free software: you can redistribute it and/or modify it under the
+# terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+#
+# pgp-mime is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# pgp-mime.  If not, see <http://www.gnu.org/licenses/>.
 
 import codecs as _codecs
+import configparser as _configparser
 import logging as _logging
 import os as _os
 import os.path as _os_path
-import socket as _socket
-import subprocess as _subprocess
 
 from pyassuan import client as _client
 from pyassuan import common as _common
 
+from . import LOG as _LOG
+from . import signature as _signature
 
-#class GPGMEClient(_client.AssuanClient):
-#    pass
-#CLIENT = _client.AssuanClient(name='pgp-mime', close_on_disconnect=True)
-#CLIENT.filename = ...
 
-def connect(client, filename):
-    filename = _os_path.expanduser(filename)
-    if False:
-        socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
-        socket.connect(filename)
-        client.input = socket.makefile('rb')
-        client.output = socket.makefile('wb')
-    else:
-        p = _subprocess.Popen(
-            filename, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
-            close_fds=True)
-        client.input = p.stdout
-        client.output = p.stdin
-        socket = p
-    client.connect()
-    return socket
+SOCKET_PATH = _os_path.expanduser(_os_path.join('~', '.gnupg', 'S.gpgme-tool'))
+
+
+def get_client_params(config):
+    r"""Retrieve Assuan client paramters from a config file.
+
+    >>> from configparser import ConfigParser
+    >>> config = ConfigParser()
+    >>> config.read_string('\n'.join([
+    ...             '[gpgme-tool]',
+    ...             'socket-path: /tmp/S.gpgme-tool',
+    ...             ]))
+    >>> get_client_params(config)
+    {'socket_path': '/tmp/S.gpgme-tool'}
+    >>> config = ConfigParser()
+    >>> get_client_params(ConfigParser())
+    {'socket_path': None}
+    """
+    params = {'socket_path': None}
+    try:
+        params['socket_path'] = config.get('gpgme-tool', 'socket-path')
+    except _configparser.NoSectionError:
+        return params
+    except _configparser.NoOptionError:
+        pass
+    return params
 
-def get_client():
-    client = _client.AssuanClient(name='pgp-mime', close_on_disconnect=True)
-    client.logger.setLevel(_logging.DEBUG)
-    socket = connect(client, '~/src/gpgme/build/src/gpgme-tool')
-    return (client, socket)
+def get_client(socket_path=None):
+    if socket_path is None:
+        socket_path = SOCKET_PATH
+    logger = _logging.getLogger('{}.{}'.format(_LOG.name, 'pyassuan'))
+    client = _client.AssuanClient(
+        name='pgp-mime', logger=logger, use_sublogger=False,
+        close_on_disconnect=True)
+    client.connect(socket_path=socket_path)
+    return client
 
-def disconnect(client, socket):
+def disconnect(client):
     client.make_request(_common.Request('BYE'))
     client.disconnect()
-    if False:
-        socket.shutdown(_socket.SHUT_RDWR)
-        socket.close()
-    else:
-        status = socket.wait()
-        assert status == 0, status
 
 def hello(client):
     responses,data = client.get_responses()  # get initial 'OK' from server
     client.make_request(_common.Request('ARMOR', 'true'))
 
+def _read(fd, buffersize=512):
+    d = []
+    while True:
+        try:
+            new = _os.read(fd, buffersize)
+        except Exception as e:
+            _LOG.warn('error while reading: {}'.format(e))
+            break
+        if not new:
+            break
+        d.append(new)
+    return b''.join(d)
+
+def _write(fd, data):
+    i = 0
+    while i < len(data):
+        i += _os.write(fd, data[i:])
+
+
 def sign_and_encrypt_bytes(data, signers=None, recipients=None,
-                           always_trust=False, mode='detach'):
+                           always_trust=False, mode='detach',
+                           allow_default_signer=False, **kwargs):
     r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
 
-    Just sign:
+    Just sign (with a detached signature):
 
     >>> print(sign_and_encrypt_bytes(
     ...     bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
@@ -79,8 +118,18 @@ def sign_and_encrypt_bytes(data, signers=None, recipients=None,
     ...     recipients=['pgp-mime@invalid.com'], always_trust=True)
     ... # doctest: +ELLIPSIS
     b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
+
+    Sign and encrypt with a specific subkey:
+
+    >>> sign_and_encrypt_bytes(
+    ...     bytes(b'Hello'), signers=['0x2F73DE2E'],
+    ...     recipients=['pgp-mime@invalid.com'], always_trust=True)
+    ... # doctest: +ELLIPSIS
+    b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
     """
-    client,socket = get_client()
+    input_read,input_write = _os.pipe()
+    output_read,output_write = _os.pipe()
+    client = get_client(**kwargs)
     try:
         hello(client)
         if signers:
@@ -89,36 +138,41 @@ def sign_and_encrypt_bytes(data, signers=None, recipients=None,
         if recipients:
             for recipient in recipients:
                 client.make_request(_common.Request('RECIPIENT', recipient))
-        with open('/tmp/input', 'wb') as f:
-            f.write(data)
-        client.make_request(_common.Request('INPUT', 'FILE=/tmp/input'))
-        client.make_request(_common.Request('OUTPUT', 'FILE=/tmp/output'))
+        client.send_fds([input_read])
+        client.make_request(_common.Request('INPUT', 'FD'))
+        _os.close(input_read)
+        input_read = -1
+        client.send_fds([output_write])
+        client.make_request(_common.Request('OUTPUT', 'FD'))
+        _os.close(output_write)
+        output_write = -1
         parameters = []
-        if signers and recipients:
-            command = 'SIGN_ENCRYPT'
-        elif signers:
-            command = 'SIGN'
-            parameters.append('--{}'.format(mode))
+        if signers or allow_default_signer:
+            if recipients:
+                command = 'SIGN_ENCRYPT'
+            else:
+                command = 'SIGN'
+                parameters.append('--{}'.format(mode))
         elif recipients:
             command = 'ENCRYPT'
         else:
             raise ValueError('must specify at least one signer or recipient')
         if always_trust:
             parameters.append('--always-trust')
+        _write(input_write, data)
+        _os.close(input_write)
+        input_write = -1
         client.make_request(
             _common.Request(command, ' '.join(parameters)))
-        with open('/tmp/output', 'rb') as f:
-            d = f.read()
+        d = _read(output_read)
     finally:
-        disconnect(client, socket)
-        try:
-            _os.remove('/tmp/input')
-            _os.remove('/tmp/output')
-        except OSError:
-            pass
+        disconnect(client)
+        for fd in [input_read, input_write, output_read, output_write]:
+            if fd >= 0:
+                _os.close(fd)
     return d
 
-def decrypt_bytes(data):
+def decrypt_bytes(data, **kwargs):
     r"""Decrypt ``data``.
 
     >>> b = '\n'.join([
@@ -140,26 +194,32 @@ def decrypt_bytes(data):
     >>> decrypt_bytes(b)
     b'Success!\n'
     """
-    client,socket = get_client()
+    input_read,input_write = _os.pipe()
+    output_read,output_write = _os.pipe()
+    client = get_client(**kwargs)
     try:
         hello(client)
-        with open('/tmp/input', 'wb') as f:
-            f.write(data)
-        client.make_request(_common.Request('INPUT', 'FILE=/tmp/input'))
-        client.make_request(_common.Request('OUTPUT', 'FILE=/tmp/output'))
+        client.send_fds([input_read])
+        client.make_request(_common.Request('INPUT', 'FD'))
+        _os.close(input_read)
+        input_read = -1
+        client.send_fds([output_write])
+        client.make_request(_common.Request('OUTPUT', 'FD'))
+        _os.close(output_write)
+        output_write = -1
+        _write(input_write, data)
+        _os.close(input_write)
+        input_write = -1
         client.make_request(_common.Request('DECRYPT'))
-        with open('/tmp/output', 'rb') as f:
-            d = f.read()
+        d = _read(output_read)
     finally:
-        disconnect(client, socket)
-        try:
-            _os.remove('/tmp/input')
-            _os.remove('/tmp/output')
-        except OSError:
-            pass
+        disconnect(client)
+        for fd in [input_read, input_write, output_read, output_write]:
+            if fd >= 0:
+                _os.close(fd)
     return d
 
-def verify_bytes(data, signature=None, always_trust=False):
+def verify_bytes(data, signature=None, always_trust=False, **kwargs):
     r"""Verify a signature on ``data``, possibly decrypting first.
 
     These tests assume you didn't trust the distributed test key.
@@ -186,35 +246,37 @@ def verify_bytes(data, signature=None, always_trust=False):
     ...     '-----END PGP MESSAGE-----',
     ...     '',
     ...     ]).encode('us-ascii')
-    >>> output,verified,result = verify_bytes(b)
+    >>> output,verified,signatures = verify_bytes(b)
     >>> output
     b'Success!\n'
     >>> verified
     False
-    >>> print(str(result, 'utf-8').replace('\x00', ''))
+    >>> for s in signatures:
+    ...     print(s.dumps())
     ... # doctest: +REPORT_UDIFF
-    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-    <gpgme>
-      <verify-result>
-        <signatures>
-          <signature>
-            <summary value="0x0" />
-            <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
-            <status value="0x0">Success &lt;Unspecified source&gt;</status>
-            <timestamp unix="1332357237i" />
-            <exp-timestamp unix="0i" />
-            <wrong-key-usage value="0x0" />
-            <pka-trust value="0x0" />
-            <chain-model value="0x0" />
-            <validity value="0x0" />
-            <validity-reason value="0x0">Success &lt;Unspecified source&gt;</validity-reason>
-            <pubkey-algo value="0x1">RSA</pubkey-algo>
-            <hash-algo value="0x8">SHA256</hash-algo>
-          </signature>
-        </signatures>
-      </verify-result>
-    </gpgme>
-    <BLANKLINE>
+    B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
+      summary:
+        CRL missing: False
+        CRL too old: False
+        bad policy: False
+        green: False
+        key expired: False
+        key missing: False
+        key revoked: False
+        red: False
+        signature expired: False
+        system error: False
+        valid: False
+      status: success
+      timestamp: Wed Mar 21 19:13:57 2012
+      expiration timestamp: None
+      wrong key usage: False
+      pka trust: not available
+      chain model: False
+      validity: unknown
+      validity reason: success
+      public key algorithm: RSA
+      hash algorithm: SHA256
     >>> b = b'Success!\n'
     >>> signature = '\n'.join([
     ...     '-----BEGIN PGP SIGNATURE-----',
@@ -230,76 +292,150 @@ def verify_bytes(data, signature=None, always_trust=False):
     ...     '-----END PGP SIGNATURE-----',
     ...     '',
     ...     ]).encode('us-ascii')
-    >>> output,verified,result = verify_bytes(b, signature=signature)
+    >>> output,verified,signatures = verify_bytes(b, signature=signature)
     >>> output
     b'Success!\n'
     >>> verified
     False
-    >>> print(str(result, 'utf-8').replace('\x00', ''))
+    >>> for s in signatures:
+    ...     print(s.dumps())
     ... # doctest: +REPORT_UDIFF
-    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-    <gpgme>
-      <verify-result>
-        <signatures>
-          <signature>
-            <summary value="0x0" />
-            <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
-            <status value="0x0">Success &lt;Unspecified source&gt;</status>
-            <timestamp unix="1332358207i" />
-            <exp-timestamp unix="0i" />
-            <wrong-key-usage value="0x0" />
-            <pka-trust value="0x0" />
-            <chain-model value="0x0" />
-            <validity value="0x0" />
-            <validity-reason value="0x0">Success &lt;Unspecified source&gt;</validity-reason>
-            <pubkey-algo value="0x1">RSA</pubkey-algo>
-            <hash-algo value="0x2">SHA1</hash-algo>
-          </signature>
-        </signatures>
-      </verify-result>
-    </gpgme>
-    <BLANKLINE>
+    B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
+      summary:
+        CRL missing: False
+        CRL too old: False
+        bad policy: False
+        green: False
+        key expired: False
+        key missing: False
+        key revoked: False
+        red: False
+        signature expired: False
+        system error: False
+        valid: False
+      status: success
+      timestamp: Wed Mar 21 19:30:07 2012
+      expiration timestamp: None
+      wrong key usage: False
+      pka trust: not available
+      chain model: False
+      validity: unknown
+      validity reason: success
+      public key algorithm: RSA
+      hash algorithm: SHA1
+
+    Data signed by a subkey returns the subkey fingerprint.  To find
+    the primary key for a given subkey, use
+    ``pgp_mime.key.lookup_keys()``.
+
+    >>> b = '\n'.join([
+    ...     '-----BEGIN PGP MESSAGE-----',
+    ...     'Version: GnuPG v2.0.19 (GNU/Linux)',
+    ...     '',
+    ...     'hQEMAxcQCLovc94uAQf9ErTZnr0lYRlLLZIk1VcpNNTHrMro+BmqpFC0jprA4/2m',
+    ...     '92klBF4TIS1A9bU5oxzQquaAIDV42P3sXrbxu/YhHLmPGH+dc2JVSfPLL0XOL5GC',
+    ...     'qpQYe5lglRBReFSRktrfhukjHBoXvh3c8T4xYK2r+nIV4gsp+FrSQMIOdhhBoC36',
+    ...     'U1MOk+R+I0JDbWdzZzJONs7ZcAcNDVKqxmAXZUqVgkhPpnGBSBuF9ExKRT3S6e5N',
+    ...     'Rsorb/DjGIUHSZuH2EaWAUz1jJ3nSta7TnveT/avfJiAV7cRS4oVgyyFyuHO5gkI',
+    ...     'o0obeJaut3enVgpq2TUUk0M4L8TX4jjKvDGAYNyuPNLAsQFHLj5eLmJSudGStWuA',
+    ...     'WjKLqBHD0M8/OcwnrTMleJl+h50ZsHO1tvvkXelH+w/jD5SMS+ktxq2Te8Vj7BmM',
+    ...     '0WQn3Ys7ViA5PgcSpbqNNLdgc1EMcpPI/sfJAORPKVWRPBKDXX/irY2onAMSe5gH',
+    ...     'teNX6bZd/gaoLWqD/1ZhsOCnlV7LY1R929TJ9vxnJcfKKAKwBDfAaSbecUUMECVw',
+    ...     's4u3ZT1pmNslBmH6XSy3ifLYWu/2xsJuhPradT88BJOBARMGg81gOE6zxGRrMLJa',
+    ...     'KojFgqaF2y4nlZAyaJ1Ld4qCaoQogaL9qE1BbmgtBehZ2FNQiIBSLC0fUUl8A4Py',
+    ...     '4d9ZxUoSp7nZmgTN5pUH1N9DIC4ntp/Rak2WnpS7+dRPlp9A2SF0RkeLY+JD9gNm',
+    ...     'j44zBkI79KlgaE/cMt6xUXAF/1ZR/Hv/6GUazGx0l23CnSGuqzLpex2uKOxfKiJt',
+    ...     'jfgyZRhIdFJnRuEXt8dTTDiiYA==',
+    ...     '=0o+x',
+    ...     '-----END PGP MESSAGE-----',
+    ...     '',
+    ...     ]).encode('us-ascii')
+    >>> output,verified,signatures = verify_bytes(b)
+    >>> output
+    b'Hello'
+    >>> verified
+    False
+    >>> for s in signatures:
+    ...     print(s.dumps())
+    ... # doctest: +REPORT_UDIFF
+    DECC812C8795ADD60538B0CD171008BA2F73DE2E signature:
+      summary:
+        CRL missing: False
+        CRL too old: False
+        bad policy: False
+        green: False
+        key expired: False
+        key missing: False
+        key revoked: False
+        red: False
+        signature expired: False
+        system error: False
+        valid: False
+      status: success
+      timestamp: Thu Sep 20 15:29:28 2012
+      expiration timestamp: None
+      wrong key usage: False
+      pka trust: not available
+      chain model: False
+      validity: unknown
+      validity reason: success
+      public key algorithm: RSA
+      hash algorithm: SHA256
     """
-    client,socket = get_client()
-    verified = result = None
+    input_read,input_write = _os.pipe()
+    if signature:
+        message_read,message_write = _os.pipe()
+        output_read = output_write = -1
+    else:
+        message_read = message_write = -1
+        output_read,output_write = _os.pipe()
+    client = get_client(**kwargs)
+    verified = None
+    signatures = []
     try:
         hello(client)
+        client.send_fds([input_read])
+        client.make_request(_common.Request('INPUT', 'FD'))
+        _os.close(input_read)
+        input_read = -1
+        if signature:
+            client.send_fds([message_read])
+            client.make_request(_common.Request('MESSAGE', 'FD'))
+            _os.close(message_read)
+            message_read = -1
+        else:
+            client.send_fds([output_write])
+            client.make_request(_common.Request('OUTPUT', 'FD'))
+            _os.close(output_write)
+            output_write = -1
         if signature:
-            input_ = signature
-            message = data
+            _write(input_write, signature)
+            _os.close(input_write)
+            input_write = -1
+            _write(message_write, data)
+            _os.close(message_write)
+            message_write = -1
         else:
-            input_ = data
-            message = None
-        with open('/tmp/input', 'wb') as f:
-            f.write(input_)
-        client.make_request(_common.Request('INPUT', 'FILE=/tmp/input'))
-        if message:
-            with open('/tmp/message', 'wb') as f:
-                f.write(message)
-            client.make_request(
-                _common.Request('MESSAGE', 'FILE=/tmp/message'))
-        if not signature:
-            client.make_request(_common.Request('OUTPUT', 'FILE=/tmp/output'))
+            _write(input_write, data)
+            _os.close(input_write)
+            input_write = -1
         client.make_request(_common.Request('VERIFY'))
+        if signature:
+            plain = data
+        else:
+            plain = _read(output_read)
         rs,result = client.make_request(_common.Request('RESULT'))
+        signatures = list(_signature.verify_result_signatures(result))
         verified = True
-        for line in result.splitlines():
-            if b'<status ' in line and b'Success' not in line:
+        for signature in signatures:
+            if signature.status != 'success':
                 verified = False
-            elif b'<pka-trust' in line and b'0x2' not in line:
+            elif signature.pka_trust != 'good':
                 verified = False
-        if signature:
-            plain = data
-        else:
-            with open('/tmp/output', 'rb') as f:
-                plain = f.read()
     finally:
-        disconnect(client, socket)
-        try:
-            pass
-            _os.remove('/tmp/input')
-            _os.remove('/tmp/output')
-            _os.remove('/tmp/message')
-        except OSError:
-            pass
-    return (plain, verified, result)
+        disconnect(client)
+        for fd in [input_read, input_write, message_read, message_write,
+                   output_read, output_write]:
+            if fd >= 0:
+                _os.close(fd)
+    return (plain, verified, signatures)