-# Copyright
+# Copyright (C) 2012 W. Trevor King <wking@tremily.us>
+#
+# This file is part of pgp-mime.
+#
+# pgp-mime is free software: you can redistribute it and/or modify it under the
+# terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+#
+# pgp-mime is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# pgp-mime. If not, see <http://www.gnu.org/licenses/>.
import codecs as _codecs
import logging as _logging
import os as _os
import os.path as _os_path
-from _socket import socket as _Socket
-import socket as _socket
-import subprocess as _subprocess
from pyassuan import client as _client
from pyassuan import common as _common
from . import LOG as _LOG
+from . import signature as _signature
-def connect(client, filename, **kwargs):
- filename = _os_path.expanduser(filename)
- if False:
- socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
- socket.connect(filename)
- client.input = socket.makefile('rb')
- client.output = socket.makefile('wb')
- else:
- p = _subprocess.Popen(
- filename, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
- close_fds=True, **kwargs)
- client.input = p.stdout
- client.output = p.stdin
- socket = p
- client.connect()
- return socket
+SOCKET_PATH = _os_path.expanduser(_os_path.join('~', '.gnupg', 'S.gpgme-tool'))
+
+
+def get_client_params(config):
+ r"""Retrieve Assuan client paramters from a config file.
-def get_client(**kwargs):
- client = _client.AssuanClient(name='pgp-mime', close_on_disconnect=True)
- client.logger.setLevel(_logging.DEBUG)
- socket = connect(client, '~/src/gpgme/build/src/gpgme-tool', **kwargs)
- #socket = connect(client, '~/.assuan/S.gpgme-tool', **kwargs)
- return (client, socket)
+ >>> from configparser import ConfigParser
+ >>> config = ConfigParser()
+ >>> config.read_string('\n'.join([
+ ... '[gpgme-tool]',
+ ... 'socket-path: /tmp/S.gpgme-tool',
+ ... ]))
+ >>> get_client_params(config)
+ {'socket_path': '/tmp/S.gpgme-tool'}
+ >>> config = ConfigParser()
+ >>> get_smtp_params(ConfigParser())
+ {'socket_path': None}
+ """
+ params = {'socket_path': None}
+ try:
+ params['socket_path'] = config.get('gpgme-tool', 'socket-path')
+ except _configparser.NoSectionError:
+ return params
+ except _configparser.NoOptionError:
+ pass
+ return params
-def disconnect(client, socket):
+def get_client(socket_path=None):
+ if socket_path is None:
+ socket_path = socket_path
+ logger = _logging.getLogger('{}.{}'.format(_LOG.name, 'pyassuan'))
+ client = _client.AssuanClient(
+ name='pgp-mime', logger=logger, use_sublogger=False,
+ close_on_disconnect=True)
+ client.connect(socket_path=socket_path)
+ return client
+
+def disconnect(client):
client.make_request(_common.Request('BYE'))
client.disconnect()
- if isinstance(socket, _Socket):
- socket.shutdown(_socket.SHUT_RDWR)
- socket.close()
- else:
- status = socket.wait()
- assert status == 0, status
def hello(client):
responses,data = client.get_responses() # get initial 'OK' from server
def sign_and_encrypt_bytes(data, signers=None, recipients=None,
always_trust=False, mode='detach',
- allow_default_signer=False):
+ allow_default_signer=False, **kwargs):
r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
- Just sign:
+ Just sign (with a detached signature):
>>> print(sign_and_encrypt_bytes(
... bytes(b'Hello'), signers=['pgp-mime@invalid.com']))
... recipients=['pgp-mime@invalid.com'], always_trust=True)
... # doctest: +ELLIPSIS
b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
+
+ Sign and encrypt with a specific subkey:
+
+ >>> sign_and_encrypt_bytes(
+ ... bytes(b'Hello'), signers=['0x2F73DE2E'],
+ ... recipients=['pgp-mime@invalid.com'], always_trust=True)
+ ... # doctest: +ELLIPSIS
+ b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
"""
input_read,input_write = _os.pipe()
output_read,output_write = _os.pipe()
- client,socket = get_client(pass_fds=(input_read, output_write))
- _os.close(input_read)
- _os.close(output_write)
+ client = get_client(**kwargs)
try:
hello(client)
if signers:
if recipients:
for recipient in recipients:
client.make_request(_common.Request('RECIPIENT', recipient))
- client.make_request(
- _common.Request('INPUT', 'FD={}'.format(input_read)))
- client.make_request(
- _common.Request('OUTPUT', 'FD={}'.format(output_write)))
+ client.send_fds([input_read])
+ client.make_request(_common.Request('INPUT', 'FD'))
+ _os.close(input_read)
+ input_read = -1
+ client.send_fds([output_write])
+ client.make_request(_common.Request('OUTPUT', 'FD'))
+ _os.close(output_write)
+ output_write = -1
parameters = []
if signers or allow_default_signer:
if recipients:
_common.Request(command, ' '.join(parameters)))
d = _read(output_read)
finally:
- disconnect(client, socket)
- for fd in [input_write, output_read]:
+ disconnect(client)
+ for fd in [input_read, input_write, output_read, output_write]:
if fd >= 0:
_os.close(fd)
return d
-def decrypt_bytes(data):
+def decrypt_bytes(data, **kwargs):
r"""Decrypt ``data``.
>>> b = '\n'.join([
"""
input_read,input_write = _os.pipe()
output_read,output_write = _os.pipe()
- client,socket = get_client(pass_fds=(input_read, output_write))
- _os.close(input_read)
- _os.close(output_write)
+ client = get_client(**kwargs)
try:
hello(client)
- client.make_request(
- _common.Request('INPUT', 'FD={}'.format(input_read)))
- client.make_request(
- _common.Request('OUTPUT', 'FD={}'.format(output_write)))
+ client.send_fds([input_read])
+ client.make_request(_common.Request('INPUT', 'FD'))
+ _os.close(input_read)
+ input_read = -1
+ client.send_fds([output_write])
+ client.make_request(_common.Request('OUTPUT', 'FD'))
+ _os.close(output_write)
+ output_write = -1
_write(input_write, data)
_os.close(input_write)
input_write = -1
client.make_request(_common.Request('DECRYPT'))
d = _read(output_read)
finally:
- disconnect(client, socket)
- for fd in [input_write, output_read]:
+ disconnect(client)
+ for fd in [input_read, input_write, output_read, output_write]:
if fd >= 0:
_os.close(fd)
return d
-def verify_bytes(data, signature=None, always_trust=False):
+def verify_bytes(data, signature=None, always_trust=False, **kwargs):
r"""Verify a signature on ``data``, possibly decrypting first.
These tests assume you didn't trust the distributed test key.
... '-----END PGP MESSAGE-----',
... '',
... ]).encode('us-ascii')
- >>> output,verified,result = verify_bytes(b)
+ >>> output,verified,signatures = verify_bytes(b)
>>> output
b'Success!\n'
>>> verified
False
- >>> print(str(result, 'utf-8').replace('\x00', ''))
+ >>> for s in signatures:
+ ... print(s.dumps())
... # doctest: +REPORT_UDIFF
- <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
- <gpgme>
- <verify-result>
- <signatures>
- <signature>
- <summary value="0x0" />
- <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
- <status value="0x0">Success <Unspecified source></status>
- <timestamp unix="1332357237i" />
- <exp-timestamp unix="0i" />
- <wrong-key-usage value="0x0" />
- <pka-trust value="0x0" />
- <chain-model value="0x0" />
- <validity value="0x0" />
- <validity-reason value="0x0">Success <Unspecified source></validity-reason>
- <pubkey-algo value="0x1">RSA</pubkey-algo>
- <hash-algo value="0x8">SHA256</hash-algo>
- </signature>
- </signatures>
- </verify-result>
- </gpgme>
- <BLANKLINE>
+ B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
+ summary:
+ CRL missing: False
+ CRL too old: False
+ bad policy: False
+ green: False
+ key expired: False
+ key missing: False
+ key revoked: False
+ red: False
+ signature expired: False
+ system error: False
+ valid: False
+ status: success
+ timestamp: Wed Mar 21 19:13:57 2012
+ expiration timestamp: None
+ wrong key usage: False
+ pka trust: not available
+ chain model: False
+ validity: unknown
+ validity reason: success
+ public key algorithm: RSA
+ hash algorithm: SHA256
>>> b = b'Success!\n'
>>> signature = '\n'.join([
... '-----BEGIN PGP SIGNATURE-----',
... '-----END PGP SIGNATURE-----',
... '',
... ]).encode('us-ascii')
- >>> output,verified,result = verify_bytes(b, signature=signature)
+ >>> output,verified,signatures = verify_bytes(b, signature=signature)
>>> output
b'Success!\n'
>>> verified
False
- >>> print(str(result, 'utf-8').replace('\x00', ''))
+ >>> for s in signatures:
+ ... print(s.dumps())
... # doctest: +REPORT_UDIFF
- <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
- <gpgme>
- <verify-result>
- <signatures>
- <signature>
- <summary value="0x0" />
- <fpr>B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3</fpr>
- <status value="0x0">Success <Unspecified source></status>
- <timestamp unix="1332358207i" />
- <exp-timestamp unix="0i" />
- <wrong-key-usage value="0x0" />
- <pka-trust value="0x0" />
- <chain-model value="0x0" />
- <validity value="0x0" />
- <validity-reason value="0x0">Success <Unspecified source></validity-reason>
- <pubkey-algo value="0x1">RSA</pubkey-algo>
- <hash-algo value="0x2">SHA1</hash-algo>
- </signature>
- </signatures>
- </verify-result>
- </gpgme>
- <BLANKLINE>
+ B2EDBE0E771A4B8708DD16A7511AEDA64332B6E3 signature:
+ summary:
+ CRL missing: False
+ CRL too old: False
+ bad policy: False
+ green: False
+ key expired: False
+ key missing: False
+ key revoked: False
+ red: False
+ signature expired: False
+ system error: False
+ valid: False
+ status: success
+ timestamp: Wed Mar 21 19:30:07 2012
+ expiration timestamp: None
+ wrong key usage: False
+ pka trust: not available
+ chain model: False
+ validity: unknown
+ validity reason: success
+ public key algorithm: RSA
+ hash algorithm: SHA1
+
+ Data signed by a subkey returns the subkey fingerprint. To find
+ the primary key for a given subkey, use
+ ``pgp_mime.key.lookup_keys()``.
+
+ >>> b = '\n'.join([
+ ... '-----BEGIN PGP MESSAGE-----',
+ ... 'Version: GnuPG v2.0.19 (GNU/Linux)',
+ ... '',
+ ... 'hQEMAxcQCLovc94uAQf9ErTZnr0lYRlLLZIk1VcpNNTHrMro+BmqpFC0jprA4/2m',
+ ... '92klBF4TIS1A9bU5oxzQquaAIDV42P3sXrbxu/YhHLmPGH+dc2JVSfPLL0XOL5GC',
+ ... 'qpQYe5lglRBReFSRktrfhukjHBoXvh3c8T4xYK2r+nIV4gsp+FrSQMIOdhhBoC36',
+ ... 'U1MOk+R+I0JDbWdzZzJONs7ZcAcNDVKqxmAXZUqVgkhPpnGBSBuF9ExKRT3S6e5N',
+ ... 'Rsorb/DjGIUHSZuH2EaWAUz1jJ3nSta7TnveT/avfJiAV7cRS4oVgyyFyuHO5gkI',
+ ... 'o0obeJaut3enVgpq2TUUk0M4L8TX4jjKvDGAYNyuPNLAsQFHLj5eLmJSudGStWuA',
+ ... 'WjKLqBHD0M8/OcwnrTMleJl+h50ZsHO1tvvkXelH+w/jD5SMS+ktxq2Te8Vj7BmM',
+ ... '0WQn3Ys7ViA5PgcSpbqNNLdgc1EMcpPI/sfJAORPKVWRPBKDXX/irY2onAMSe5gH',
+ ... 'teNX6bZd/gaoLWqD/1ZhsOCnlV7LY1R929TJ9vxnJcfKKAKwBDfAaSbecUUMECVw',
+ ... 's4u3ZT1pmNslBmH6XSy3ifLYWu/2xsJuhPradT88BJOBARMGg81gOE6zxGRrMLJa',
+ ... 'KojFgqaF2y4nlZAyaJ1Ld4qCaoQogaL9qE1BbmgtBehZ2FNQiIBSLC0fUUl8A4Py',
+ ... '4d9ZxUoSp7nZmgTN5pUH1N9DIC4ntp/Rak2WnpS7+dRPlp9A2SF0RkeLY+JD9gNm',
+ ... 'j44zBkI79KlgaE/cMt6xUXAF/1ZR/Hv/6GUazGx0l23CnSGuqzLpex2uKOxfKiJt',
+ ... 'jfgyZRhIdFJnRuEXt8dTTDiiYA==',
+ ... '=0o+x',
+ ... '-----END PGP MESSAGE-----',
+ ... '',
+ ... ]).encode('us-ascii')
+ >>> output,verified,signatures = verify_bytes(b)
+ >>> output
+ b'Hello'
+ >>> verified
+ False
+ >>> for s in signatures:
+ ... print(s.dumps())
+ ... # doctest: +REPORT_UDIFF
+ DECC812C8795ADD60538B0CD171008BA2F73DE2E signature:
+ summary:
+ CRL missing: False
+ CRL too old: False
+ bad policy: False
+ green: False
+ key expired: False
+ key missing: False
+ key revoked: False
+ red: False
+ signature expired: False
+ system error: False
+ valid: False
+ status: success
+ timestamp: Thu Sep 20 15:29:28 2012
+ expiration timestamp: None
+ wrong key usage: False
+ pka trust: not available
+ chain model: False
+ validity: unknown
+ validity reason: success
+ public key algorithm: RSA
+ hash algorithm: SHA256
"""
input_read,input_write = _os.pipe()
- pass_fds = [input_read]
if signature:
message_read,message_write = _os.pipe()
- output_read = -1
- pass_fds.append(message_read)
+ output_read = output_write = -1
else:
- message_write = -1
+ message_read = message_write = -1
output_read,output_write = _os.pipe()
- pass_fds.append(output_write)
- client,socket = get_client(pass_fds=pass_fds)
- _os.close(input_read)
- if signature:
- _os.close(message_read)
- else:
- _os.close(output_write)
- verified = result = None
+ client = get_client(**kwargs)
+ verified = None
+ signatures = []
try:
hello(client)
- client.make_request(
- _common.Request('INPUT', 'FD={}'.format(input_read)))
+ client.send_fds([input_read])
+ client.make_request(_common.Request('INPUT', 'FD'))
+ _os.close(input_read)
+ input_read = -1
if signature:
- client.make_request(
- _common.Request('MESSAGE', 'FD={}'.format(message_read)))
+ client.send_fds([message_read])
+ client.make_request(_common.Request('MESSAGE', 'FD'))
+ _os.close(message_read)
+ message_read = -1
else:
- client.make_request(
- _common.Request('OUTPUT', 'FD={}'.format(output_write)))
+ client.send_fds([output_write])
+ client.make_request(_common.Request('OUTPUT', 'FD'))
+ _os.close(output_write)
+ output_write = -1
if signature:
_write(input_write, signature)
_os.close(input_write)
else:
plain = _read(output_read)
rs,result = client.make_request(_common.Request('RESULT'))
+ signatures = list(_signature.verify_result_signatures(result))
verified = True
- for line in result.splitlines():
- if b'<status ' in line and b'Success' not in line:
+ for signature in signatures:
+ if signature.status != 'success':
verified = False
- elif b'<pka-trust' in line and b'0x2' not in line:
+ elif signature.pka_trust != 'good':
verified = False
finally:
- disconnect(client, socket)
- for fd in [input_write, message_write, output_read]:
+ disconnect(client)
+ for fd in [input_read, input_write, message_read, message_write,
+ output_read, output_write]:
if fd >= 0:
_os.close(fd)
- return (plain, verified, result)
+ return (plain, verified, signatures)