import logging as _logging
import os as _os
import os.path as _os_path
+from _socket import socket as _Socket
import socket as _socket
import subprocess as _subprocess
from pyassuan import client as _client
from pyassuan import common as _common
+from . import LOG as _LOG
-#class GPGMEClient(_client.AssuanClient):
-# pass
-#CLIENT = _client.AssuanClient(name='pgp-mime', close_on_disconnect=True)
-#CLIENT.filename = ...
-def connect(client, filename):
+def connect(client, filename, **kwargs):
filename = _os_path.expanduser(filename)
if False:
socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
else:
p = _subprocess.Popen(
filename, stdin=_subprocess.PIPE, stdout=_subprocess.PIPE,
- close_fds=True)
+ close_fds=True, **kwargs)
client.input = p.stdout
client.output = p.stdin
socket = p
client.connect()
return socket
-def get_client():
+def get_client(**kwargs):
client = _client.AssuanClient(name='pgp-mime', close_on_disconnect=True)
client.logger.setLevel(_logging.DEBUG)
- socket = connect(client, '~/src/gpgme/build/src/gpgme-tool')
+ socket = connect(client, '~/src/gpgme/build/src/gpgme-tool', **kwargs)
+ #socket = connect(client, '~/.assuan/S.gpgme-tool', **kwargs)
return (client, socket)
def disconnect(client, socket):
client.make_request(_common.Request('BYE'))
client.disconnect()
- if False:
+ if isinstance(socket, _Socket):
socket.shutdown(_socket.SHUT_RDWR)
socket.close()
else:
responses,data = client.get_responses() # get initial 'OK' from server
client.make_request(_common.Request('ARMOR', 'true'))
+def _read(fd, buffersize=512):
+ d = []
+ while True:
+ try:
+ new = _os.read(fd, buffersize)
+ except Exception as e:
+ _LOG.warn('error while reading: {}'.format(e))
+ break
+ if not new:
+ break
+ d.append(new)
+ return b''.join(d)
+
+def _write(fd, data):
+ i = 0
+ while i < len(data):
+ i += _os.write(fd, data[i:])
+
+
def sign_and_encrypt_bytes(data, signers=None, recipients=None,
always_trust=False, mode='detach'):
r"""Sign ``data`` with ``signers`` and encrypt to ``recipients``.
... # doctest: +ELLIPSIS
b'-----BEGIN PGP MESSAGE-----\n...-----END PGP MESSAGE-----\n'
"""
- client,socket = get_client()
+ input_read,input_write = _os.pipe()
+ output_read,output_write = _os.pipe()
+ client,socket = get_client(pass_fds=(input_read, output_write))
+ _os.close(input_read)
+ _os.close(output_write)
try:
hello(client)
if signers:
if recipients:
for recipient in recipients:
client.make_request(_common.Request('RECIPIENT', recipient))
- with open('/tmp/input', 'wb') as f:
- f.write(data)
- client.make_request(_common.Request('INPUT', 'FILE=/tmp/input'))
- client.make_request(_common.Request('OUTPUT', 'FILE=/tmp/output'))
+ client.make_request(
+ _common.Request('INPUT', 'FD={}'.format(input_read)))
+ client.make_request(
+ _common.Request('OUTPUT', 'FD={}'.format(output_write)))
parameters = []
if signers and recipients:
command = 'SIGN_ENCRYPT'
raise ValueError('must specify at least one signer or recipient')
if always_trust:
parameters.append('--always-trust')
+ _write(input_write, data)
+ _os.close(input_write)
+ input_write = -1
client.make_request(
_common.Request(command, ' '.join(parameters)))
- with open('/tmp/output', 'rb') as f:
- d = f.read()
+ d = _read(output_read)
finally:
disconnect(client, socket)
- try:
- _os.remove('/tmp/input')
- _os.remove('/tmp/output')
- except OSError:
- pass
+ for fd in [input_write, output_read]:
+ if fd >= 0:
+ _os.close(fd)
return d
def decrypt_bytes(data):
>>> decrypt_bytes(b)
b'Success!\n'
"""
- client,socket = get_client()
+ input_read,input_write = _os.pipe()
+ output_read,output_write = _os.pipe()
+ client,socket = get_client(pass_fds=(input_read, output_write))
+ _os.close(input_read)
+ _os.close(output_write)
try:
hello(client)
- with open('/tmp/input', 'wb') as f:
- f.write(data)
- client.make_request(_common.Request('INPUT', 'FILE=/tmp/input'))
- client.make_request(_common.Request('OUTPUT', 'FILE=/tmp/output'))
+ client.make_request(
+ _common.Request('INPUT', 'FD={}'.format(input_read)))
+ client.make_request(
+ _common.Request('OUTPUT', 'FD={}'.format(output_write)))
+ _write(input_write, data)
+ _os.close(input_write)
+ input_write = -1
client.make_request(_common.Request('DECRYPT'))
- with open('/tmp/output', 'rb') as f:
- d = f.read()
+ d = _read(output_read)
finally:
disconnect(client, socket)
- try:
- _os.remove('/tmp/input')
- _os.remove('/tmp/output')
- except OSError:
- pass
+ for fd in [input_write, output_read]:
+ if fd >= 0:
+ _os.close(fd)
return d
def verify_bytes(data, signature=None, always_trust=False):
</gpgme>
<BLANKLINE>
"""
- client,socket = get_client()
+ input_read,input_write = _os.pipe()
+ pass_fds = [input_read]
+ if signature:
+ message_read,message_write = _os.pipe()
+ output_read = -1
+ pass_fds.append(message_read)
+ else:
+ message_write = -1
+ output_read,output_write = _os.pipe()
+ pass_fds.append(output_write)
+ client,socket = get_client(pass_fds=pass_fds)
+ _os.close(input_read)
+ if signature:
+ _os.close(message_read)
+ else:
+ _os.close(output_write)
verified = result = None
try:
hello(client)
+ client.make_request(
+ _common.Request('INPUT', 'FD={}'.format(input_read)))
if signature:
- input_ = signature
- message = data
+ client.make_request(
+ _common.Request('MESSAGE', 'FD={}'.format(message_read)))
else:
- input_ = data
- message = None
- with open('/tmp/input', 'wb') as f:
- f.write(input_)
- client.make_request(_common.Request('INPUT', 'FILE=/tmp/input'))
- if message:
- with open('/tmp/message', 'wb') as f:
- f.write(message)
client.make_request(
- _common.Request('MESSAGE', 'FILE=/tmp/message'))
- if not signature:
- client.make_request(_common.Request('OUTPUT', 'FILE=/tmp/output'))
+ _common.Request('OUTPUT', 'FD={}'.format(output_write)))
+ if signature:
+ _write(input_write, signature)
+ _os.close(input_write)
+ input_write = -1
+ _write(message_write, data)
+ _os.close(message_write)
+ message_write = -1
+ else:
+ _write(input_write, data)
+ _os.close(input_write)
+ input_write = -1
client.make_request(_common.Request('VERIFY'))
+ if signature:
+ plain = data
+ else:
+ plain = _read(output_read)
rs,result = client.make_request(_common.Request('RESULT'))
verified = True
for line in result.splitlines():
verified = False
elif b'<pka-trust' in line and b'0x2' not in line:
verified = False
- if signature:
- plain = data
- else:
- with open('/tmp/output', 'rb') as f:
- plain = f.read()
finally:
disconnect(client, socket)
- try:
- pass
- _os.remove('/tmp/input')
- _os.remove('/tmp/output')
- _os.remove('/tmp/message')
- except OSError:
- pass
+ for fd in [input_write, message_write, output_read]:
+ if fd >= 0:
+ _os.close(fd)
return (plain, verified, result)