# pyassuan. If not, see <http://www.gnu.org/licenses/>.
import logging as _logging
+import socket as _socket
import sys as _sys
from . import LOG as _LOG
class AssuanClient (object):
- """A single-threaded Assuan client based on the `devolpment suggestions`_
+ """A single-threaded Assuan client based on the `development suggestions`_
.. _development suggestions:
http://www.gnupg.org/documentation/manuals/assuan/Client-code.html
logger = _logging.getLogger('{}.{}'.format(logger.name, self.name))
self.logger = logger
self.close_on_disconnect = close_on_disconnect
- self.input = self.output = None
+ self.input = self.output = self.socket = None
- def connect(self):
- if not self.input:
- self.logger.info('read from stdin')
- self.input = _sys.stdin.buffer
- if not self.output:
- self.logger.info('write to stdout')
- self.output = _sys.stdout.buffer
+ def connect(self, socket_path=None):
+ if socket_path:
+ self.logger.info(
+ 'connect to Unix socket at {}'.format(socket_path))
+ self.socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
+ self.socket.connect(socket_path)
+ self.input = self.socket.makefile('rb')
+ self.output = self.socket.makefile('wb')
+ else:
+ if not self.input:
+ self.logger.info('read from stdin')
+ self.input = _sys.stdin.buffer
+ if not self.output:
+ self.logger.info('write to stdout')
+ self.output = _sys.stdout.buffer
def disconnect(self):
if self.close_on_disconnect:
self.logger.info('disconnecting')
- self.input = None
- self.output = None
+ if self.input is not None:
+ self.input.close()
+ self.input = None
+ if self.output is not None:
+ self.output.close()
+ self.output = None
+ if self.socket is not None:
+ self.socket.shutdown(_socket.SHUT_RDWR)
+ self.socket.close()
+ self.socket = None
def raise_error(self, error):
self.logger.error(str(error))
self._write_request(request=request)
if response:
return self.get_responses(requests=requests, expect=expect)
+
+ def send_fds(self, fds):
+ """Send a file descriptor over a Unix socket.
+ """
+ msg = '# descriptors in flight: {}\n'.format(fds)
+ self.logger.info('C: {}'.format(msg.rstrip('\n')))
+ msg = msg.encode('ascii')
+ return _common.send_fds(
+ socket=self.socket, msg=msg, fds=fds, logger=None)
+
+ def receive_fds(self, msglen=200, maxfds=10):
+ """Receive file descriptors over a Unix socket.
+ """
+ msg,fds = _common.receive_fds(
+ socket=self.socket, msglen=msglen, maxfds=maxfds, logger=None)
+ msg = str(msg, 'utf-8')
+ self.logger.info('S: {}'.format(msg.rstrip('\n')))
+ return fds