--- /dev/null
+build
+__pycache__
+*.pyc
--- /dev/null
+#!/usr/bin/env python3
+#
+# Copyright
+
+"""Simple pinentry program for getting server info.
+"""
+
+import socket as _socket
+
+from pyassuan import __version__
+from pyassuan import client as _client
+from pyassuan import common as _common
+
+
+if __name__ == '__main__':
+ import argparse
+ import logging
+
+ parser = argparse.ArgumentParser(description=__doc__, version=__version__)
+ parser.add_argument(
+ '-V', '--verbose', action='count', default=0,
+ help='increase verbosity')
+ parser.add_argument(
+ 'filename',
+ help="path to server's unix socket")
+
+ args = parser.parse_args()
+
+ client = _client.AssuanClient(name='get-info', close_on_disconnect=True)
+
+ if args.verbose:
+ client.logger.setLevel(max(
+ logging.DEBUG, client.logger.level - 10*args.verbose))
+
+ socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
+ socket.connect(args.filename)
+ client.input = socket.makefile('r')
+ client.output = socket.makefile('w')
+ client.connect()
+ try:
+ response = client.read_response()
+ assert response.type == 'OK', response
+ responses = client.make_request(_common.Request('HELP'))
+ responses = client.make_request(_common.Request('HELP GETINFO'))
+ for attribute in ['version', 'pid', 'socket_name', 'ssh_socket_name']:
+ responses = client.make_request(
+ _common.Request('GETINFO', attribute))
+ finally:
+ responses = client.make_request(_common.Request('BYE'))
+ client.disconnect()
--- /dev/null
+#!/usr/bin/env python3
+#
+# Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+
+"""Simple pinentry program for getting pins from a terminal.
+"""
+
+import copy as _copy
+import os as _os
+import os.path as _os_path
+import pprint as _pprint
+import re as _re
+import signal as _signal
+import sys as _sys
+import termios as _termios
+
+from pyassuan import __version__
+from pyassuan import server as _server
+from pyassuan import common as _common
+from pyassuan import error as _error
+
+
+class PinEntry (_server.AssuanServer):
+ """pinentry protocol server
+
+ See ``pinentry-0.8.0/doc/pinentry.texi`` at::
+
+ ftp://ftp.gnupg.org/gcrypt/pinentry/
+ http://www.gnupg.org/aegypten/
+
+ for details on the pinentry interface.
+
+ Alternatively, you can just watch the logs and guess ;). Here's a
+ trace when driven by GnuPG 2.0.17 (libgcrypt 1.4.6)::
+
+ S: OK Your orders please
+ C: OPTION grab
+ S: OK
+ C: OPTION ttyname=/dev/pts/6
+ S: OK
+ C: OPTION ttytype=xterm
+ S: OK
+ C: OPTION lc-ctype=en_US.UTF-8
+ S: OK
+ C: OPTION lc-messages=en_US.UTF-8
+ S: OK
+ C: OPTION default-ok=_OK
+ S: OK
+ C: OPTION default-cancel=_Cancel
+ S: OK
+ C: OPTION default-prompt=PIN:
+ S: OK
+ C: OPTION touch-file=/tmp/gpg-7lElMX/S.gpg-agent
+ S: OK
+ C: GETINFO pid
+ S: D 14309
+ S: OK
+ C: SETDESC Enter passphrase%0A
+ S: OK
+ C: SETPROMPT Passphrase
+ S: OK
+ C: GETPIN
+ S: D testing!
+ S: OK
+ C: BYE
+ S: OK closing connection
+ """
+ _digit_regexp = _re.compile(r'\d+')
+
+ # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
+ _tpgrp_regexp = _re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
+
+ def __init__(self, name='pinentry', strict_options=False,
+ single_request=True, **kwargs):
+ self.strings = {}
+ self.connection = {}
+ super(PinEntry, self).__init__(
+ name=name, strict_options=strict_options,
+ single_request=single_request, **kwargs)
+ self.valid_options.append('ttyname')
+
+ def reset(self):
+ super(PinEntry, self).reset()
+ self.strings.clear()
+ self.connection.clear()
+
+ # user interface
+
+ def _connect(self):
+ self.logger.info('connecting to user')
+ self.logger.debug('options:\n{}'.format(_pprint.pformat(self.options)))
+ tty_name = self.options.get('ttyname', None)
+ if tty_name:
+ self.connection['tpgrp'] = self._get_pgrp(tty_name)
+ self.logger.info(
+ 'open to-user output stream for {}'.format(tty_name))
+ self.connection['to_user'] = open(tty_name, 'w')
+ self.logger.info(
+ 'open from-user input stream for {}'.format(tty_name))
+ self.connection['from_user'] = open(tty_name, 'r')
+ self.logger.info('get current termios line discipline')
+ self.connection['original termios'] = _termios.tcgetattr(
+ self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
+ new_termios = _copy.deepcopy(self.connection['original termios'])
+ # translate carriage return to newline on input
+ new_termios[0] |= _termios.ICRNL
+ # do not ignore carriage return on input
+ new_termios[0] &= ~_termios.IGNCR
+ # do not echo input characters
+ new_termios[3] &= ~_termios.ECHO
+ # echo input characters
+ #new_termios[3] |= _termios.ECHO
+ # echo the NL character even if ECHO is not set
+ new_termios[3] |= _termios.ECHONL
+ # enable canonical mode
+ new_termios[3] |= _termios.ICANON
+ self.logger.info('adjust termios line discipline')
+ _termios.tcsetattr(
+ self.connection['to_user'], _termios.TCSANOW, new_termios)
+ self.logger.info('send SIGSTOP to pgrp {}'.format(
+ self.connection['tpgrp']))
+ #_os.killpg(self.connection['tpgrp'], _signal.SIGSTOP)
+ _os.kill(-self.connection['tpgrp'], _signal.SIGSTOP)
+ self.connection['tpgrp stopped'] = True
+ else:
+ self.logger.info('no TTY name given; use stdin/stdout for I/O')
+ self.connection['to_user'] = _sys.stdout
+ self.connection['from_user'] = _sys.stdin
+ self.logger.info('connected to user')
+ self.connection['to_user'].write('\n') # give a clean line to work on
+ self.connection['active'] = True
+
+ def _disconnect(self):
+ self.logger.info('disconnecting from user')
+ try:
+ if self.connection.get('original termios', None):
+ self.logger.info('restore original termios line discipline')
+ _termios.tcsetattr(
+ self.connection['to_user'], _termios.TCSANOW,
+ self.connection['original termios'])
+ if self.connection.get('tpgrp stopped', None) is True:
+ self.logger.info(
+ 'send SIGCONT to pgrp {}'.format(self.connection['tpgrp']))
+ #_os.killpg(self.connection['tpgrp'], _signal.SIGCONT)
+ _os.kill(-self.connection['tpgrp'], _signal.SIGCONT)
+ if self.connection.get('to_user', None) not in [None, _sys.stdout]:
+ self.logger.info('close to-user output stream')
+ self.connection['to_user'].close()
+ if self.connection.get('from_user',None) not in [None,_sys.stdout]:
+ self.logger.info('close from-user input stream')
+ self.connection['from_user'].close()
+ finally:
+ self.connection = {'active': False}
+ self.logger.info('disconnected from user')
+
+ def _get_pgrp(self, tty_name):
+ self.logger.info('find process group contolling {}'.format(tty_name))
+ proc = '/proc'
+ for name in _os.listdir(proc):
+ path = _os_path.join(proc, name)
+ if not (self._digit_regexp.match(name) and _os_path.isdir(path)):
+ continue # not a process directory
+ self.logger.debug('checking process {}'.format(name))
+ fd_path = _os_path.join(path, 'fd', '0')
+ try:
+ link = _os.readlink(fd_path)
+ except OSError, e:
+ self.logger.debug('not our process: {}'.format(e))
+ continue # permission denied (not one of our processes)
+ if link != tty_name:
+ self.logger.debug('wrong tty: {}'.format(link))
+ continue # not attached to our target tty
+ stat_path = _os_path.join(path, 'stat')
+ stat = open(stat_path, 'r').read()
+ self.logger.debug('check stat for pgrp: {}'.format(stat))
+ match = self._tpgrp_regexp.match(stat)
+ assert match != None, stat
+ pgrp = int(match.group(1))
+ self.logger.info('found pgrp {} for {}'.format(pgrp, tty_name))
+ return pgrp
+ raise ValueError(tty_name)
+
+ def _write(self, string):
+ "Write text to the user's terminal."
+ self.connection['to_user'].write(string + '\n')
+ self.connection['to_user'].flush()
+
+ def _read(self):
+ "Read and return a line from the user's terminal."
+ # drop trailing newline
+ return self.connection['from_user'].readline()[:-1]
+
+ def _prompt(self, prompt='?', add_colon=True):
+ if add_colon:
+ prompt += ':'
+ self.connection['to_user'].write(prompt)
+ self.connection['to_user'].write(' ')
+ self.connection['to_user'].flush()
+ return self._read()
+
+ # assuan handlers
+
+ def _handle_GETINFO(self, arg):
+ if arg == 'pid':
+ yield _common.Response('D', str(_os.getpid()))
+ elif arg == 'version':
+ yield _common.Response('D', __version__)
+ else:
+ raise _error.AssuanError(message='Invalid parameter')
+ yield _common.Response('OK')
+
+ def _handle_SETDESC(self, arg):
+ self.strings['description'] = arg
+ yield _common.Response('OK')
+
+ def _handle_SETPROMPT(self, arg):
+ self.strings['prompt'] = arg
+ yield _common.Response('OK')
+
+ def _handle_SETERROR(self, arg):
+ self.strings['error'] = arg
+ yield _common.Response('OK')
+
+ def _handle_SETTITLE(self, arg):
+ self.strings['title'] = arg
+ yield _common.Response('OK')
+
+ def _handle_SETOK(self, arg):
+ self.strings['ok'] = arg
+ yield _common.Response('OK')
+
+ def _handle_SETCANCEL(self, arg):
+ self.strings['cancel'] = arg
+ yield _common.Response('OK')
+
+ def _handle_SETNOTOK(self, arg):
+ self.strings['not ok'] = arg
+ yield _common.Response('OK')
+
+ def _handle_SETQUALITYBAR(self, arg):
+ """Adds a quality indicator to the GETPIN window.
+
+ This indicator is updated as the passphrase is typed. The
+ clients needs to implement an inquiry named "QUALITY" which
+ gets passed the current passpharse (percent-plus escaped) and
+ should send back a string with a single numerical vauelue
+ between -100 and 100. Negative values will be displayed in
+ red.
+
+ If a custom label for the quality bar is required, just add
+ that label as an argument as percent escaped string. You will
+ need this feature to translate the label because pinentry has
+ no internal gettext except for stock strings from the toolkit
+ library.
+
+ If you want to show a tooltip for the quality bar, you may use
+
+ C: SETQUALITYBAR_TT string
+ S: OK
+
+ With STRING being a percent escaped string shown as the tooltip.
+ """
+ raise NotImplementedError()
+
+ def _handle_GETPIN(self, arg):
+ try:
+ self._connect()
+ self._write(self.strings['description'])
+ pin = self._prompt(self.strings['prompt'], add_colon=False)
+ finally:
+ self._disconnect()
+ yield _common.Response('D', pin)
+ yield _common.Response('OK')
+
+ def _handle_CONFIRM(self, arg):
+ try:
+ self._connect()
+ self._write(self.strings['description'])
+ self._write('1) '+self.strings['ok'])
+ self._write('2) '+self.strings['not ok'])
+ value = self._prompt('?')
+ finally:
+ self._disconnect()
+ if value == '1':
+ yield _common.Response('OK')
+ else:
+ raise _error.AssuanError(message='Not confirmed')
+
+ def _handle_MESSAGE(self, arg):
+ self._write(self.strings['description'])
+ yield _common.Response('OK')
+
+ def _handle_CONFIRM(self, args):
+ assert args == '--one-button', args
+ try:
+ self._connect()
+ self._write(self.strings['description'])
+ self._write('1) '+self.strings['ok'])
+ value = self._prompt('?')
+ finally:
+ self._disconnect()
+ assert value == '1', value
+ yield _common.Response('OK')
+
+
+if __name__ == '__main__':
+ import argparse
+ import logging
+ import traceback
+
+ parser = argparse.ArgumentParser(description=__doc__, version=__version__)
+ parser.add_argument(
+ '-V', '--verbose', action='count', default=0,
+ help='increase verbosity')
+ parser.add_argument(
+ '--display',
+ help='set X display (ignored by this implementation)')
+
+ args = parser.parse_args()
+
+ p = PinEntry()
+
+ if args.verbose:
+ p.logger.setLevel(max(
+ logging.DEBUG, p.logger.level - 10*args.verbose))
+
+ try:
+ p = PinEntry()
+ p.run()
+ except:
+ p.logger.error(
+ 'exiting due to exception:\n{}'.format(
+ traceback.format_exc().rstrip()))
+ raise
+++ /dev/null
-#!/usr/bin/env python
-#
-# Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this program. If not, see
-# <http://www.gnu.org/licenses/>.
-
-"""Simple pinentry program for getting pins from a terminal.
-"""
-
-import copy
-import logging
-import logging.handlers
-import os
-import os.path
-import pprint
-import re
-import signal
-import sys
-import termios
-import traceback
-
-
-__version__ = '0.1'
-
-
-# create logger
-logger = logging.getLogger('pinentry')
-logger.setLevel(logging.WARNING)
-_h = logging.handlers.SysLogHandler(address='/dev/log')
-_h.setLevel(logging.DEBUG)
-_f = logging.Formatter('%(name)s: %(levelname)s: %(message)s')
-_h.setFormatter(_f)
-logger.addHandler(_h)
-del _h, _f
-
-
-class PinEntry (object):
- """pinentry protocol server
-
- See the `Assuan manual`_ for a description of the protocol.
-
- .. _Assuan manual: http://www.gnupg.org/documentation/manuals/assuan/
- """
- _digit_regexp = re.compile(r'\d+')
-
- # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
- _tpgrp_regexp = re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
-
- _assuan_encode_regexp = re.compile(
- '(' + '|'.join(['%', '\r', '\n']) + ')')
- _assuan_decode_regexp = re.compile('(%[0-9A-F]{2})')
-
- def __init__(self):
- self.stop = False
- self.options = {}
- self.strings = {}
- self.connection = {}
-
- def run(self):
- logger.info('---opening pinentry---')
- logger.info('OK Your orders please')
- sys.stdout.write('OK Your orders please\n')
- sys.stdout.flush()
- try:
- while not self.stop:
- line = sys.stdin.readline()
- if not line:
- break # EOF
- line = line.rstrip() # dangerous?
- logger.info(line)
- line = self._decode(line)
- fields = line.split(' ', 1)
- cmd = fields[0]
- if len(fields) > 1:
- arg = fields[1]
- else:
- arg = None
- handle = getattr(self, '_handle_%s' % cmd, None)
- if handle:
- for response in handle(arg):
- response = self._encode(response)
- logger.info(response)
- sys.stdout.write(response+'\n')
- try:
- sys.stdout.flush()
- except IOError:
- if not self.stop:
- raise
- else:
- raise ValueError(line)
- finally:
- logger.info('---closing pinentry---')
-
- # user interface
-
- def _connect(self):
- logger.info('--connecting to user--')
- logger.debug('options:\n%s' % pprint.pformat(self.options))
- tty_name = self.options.get('ttyname', None)
- if tty_name:
- self.connection['tpgrp'] = self._get_pgrp(tty_name)
- logger.info('open to-user output stream for %s' % tty_name)
- self.connection['to_user'] = open(tty_name, 'w')
- logger.info('open from-user input stream for %s' % tty_name)
- self.connection['from_user'] = open(tty_name, 'r')
- logger.info('get current termios line discipline')
- self.connection['original termios'] = termios.tcgetattr(
- self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
- new_termios = copy.deepcopy(self.connection['original termios'])
- # translate carriage return to newline on input
- new_termios[0] |= termios.ICRNL
- # do not ignore carriage return on input
- new_termios[0] &= ~termios.IGNCR
- # do not echo input characters
- new_termios[3] &= ~termios.ECHO
- # echo input characters
- #new_termios[3] |= termios.ECHO
- # echo the NL character even if ECHO is not set
- new_termios[3] |= termios.ECHONL
- # enable canonical mode
- new_termios[3] |= termios.ICANON
- logger.info('adjust termios line discipline')
- termios.tcsetattr(
- self.connection['to_user'], termios.TCSANOW, new_termios)
- logger.info('send SIGSTOP to pgrp %d' % self.connection['tpgrp'])
- #os.killpg(self.connection['tpgrp'], signal.SIGSTOP)
- os.kill(-self.connection['tpgrp'], signal.SIGSTOP)
- self.connection['tpgrp stopped'] = True
- else:
- logger.info('no TTY name given; use stdin/stdout for I/O')
- self.connection['to_user'] = sys.stdout
- self.connection['from_user'] = sys.stdin
- logger.info('--connected to user--')
- self.connection['to_user'].write('\n') # give a clean line to work on
- self.connection['active'] = True
-
- def _disconnect(self):
- logger.info('--disconnecting from user--')
- try:
- if self.connection.get('original termios', None):
- logger.info('restore original termios line discipline')
- termios.tcsetattr(
- self.connection['to_user'], termios.TCSANOW,
- self.connection['original termios'])
- if self.connection.get('tpgrp stopped', None) is True:
- logger.info(
- 'send SIGCONT to pgrp %d' % self.connection['tpgrp'])
- #os.killpg(self.connection['tpgrp'], signal.SIGCONT)
- os.kill(-self.connection['tpgrp'], signal.SIGCONT)
- if self.connection.get('to_user', None) not in [None, sys.stdout]:
- logger.info('close to-user output stream')
- self.connection['to_user'].close()
- if self.connection.get('from_user', None) not in [None,sys.stdout]:
- logger.info('close from-user input stream')
- self.connection['from_user'].close()
- finally:
- self.connection = {'active': False}
- logger.info('--disconnected from user--')
-
- def _get_pgrp(self, tty_name):
- logger.info('find process group contolling %s' % tty_name)
- proc = '/proc'
- for name in os.listdir(proc):
- path = os.path.join(proc, name)
- if not (self._digit_regexp.match(name) and os.path.isdir(path)):
- continue # not a process directory
- logger.debug('checking process %s' % name)
- fd_path = os.path.join(path, 'fd', '0')
- try:
- link = os.readlink(fd_path)
- except OSError, e:
- logger.debug('not our process: %s' % e)
- continue # permission denied (not one of our processes)
- if link != tty_name:
- logger.debug('wrong tty: %s' % link)
- continue # not attached to our target tty
- stat_path = os.path.join(path, 'stat')
- stat = open(stat_path, 'r').read()
- logger.debug('check stat for pgrp: %s' % stat)
- match = self._tpgrp_regexp.match(stat)
- assert match != None, stat
- pgrp = int(match.group(1))
- logger.info('found pgrp %d for %s' % (pgrp, tty_name))
- return pgrp
- raise ValueError(tty_name)
-
- def _write(self, string):
- "Write text to the user's terminal."
- self.connection['to_user'].write(string + '\n')
- self.connection['to_user'].flush()
-
- def _read(self):
- "Read and return a line from the user's terminal."
- # drop trailing newline
- return self.connection['from_user'].readline()[:-1]
-
- def _prompt(self, prompt='?', add_colon=True):
- if add_colon:
- prompt += ':'
- self.connection['to_user'].write('%s ' % prompt)
- self.connection['to_user'].flush()
- return self._read()
-
- # Assuan utilities
-
- def _encode(self, string):
- """
-
- >>> p = PinEntry()
- >>> p._encode('It grew by 5%!\\n')
- 'It grew by 5%25!%0A'
- """
- return self._assuan_encode_regexp.sub(
- lambda x : self._to_hex(x.group()), string)
-
- def _decode(self, string):
- """
-
- >>> p = PinEntry()
- >>> p._decode('%22Look out!%22%0AWhere%3F')
- '"Look out!"\\nWhere?'
- """
- return self._assuan_decode_regexp.sub(
- lambda x : self._from_hex(x.group()), string)
-
- def _from_hex(self, code):
- """
-
- >>> p = PinEntry()
- >>> p._from_hex('%22')
- '"'
- >>> p._from_hex('%0A')
- '\\n'
- """
- return chr(int(code[1:], 16))
-
- def _to_hex(self, char):
- """
-
- >>> p = PinEntry()
- >>> p._to_hex('"')
- '%22'
- >>> p._to_hex('\\n')
- '%0A'
- """
- return '%%%02X' % ord(char)
-
- # handlers
-
- def _handle_BYE(self, arg):
- self.stop = True
- yield 'OK closing connection'
-
- def _handle_OPTION(self, arg):
- # ttytype to set TERM
- fields = arg.split('=', 1)
- key = fields[0]
- if len(fields) > 1:
- value = fields[1]
- else:
- value = True
- self.options[key] = value
- yield 'OK'
-
- def _handle_GETINFO(self, arg):
- if arg == 'pid':
- yield 'D %d' % os.getpid()
- else:
- raise ValueError(arg)
- yield 'OK'
-
- def _handle_SETDESC(self, arg):
- self.strings['description'] = arg
- yield 'OK'
-
- def _handle_SETPROMPT(self, arg):
- self.strings['prompt'] = arg
- yield 'OK'
-
- def _handle_SETERROR(self, arg):
- self.strings['error'] = arg
- yield 'OK'
-
- def _handle_SETTITLE(self, arg):
- self.strings['title'] = arg
- yield 'OK'
-
- def _handle_SETOK(self, arg):
- self.strings['ok'] = arg
- yield 'OK'
-
- def _handle_SETCANCEL(self, arg):
- self.strings['cancel'] = arg
- yield 'OK'
-
- def _handle_SETNOTOK(self, arg):
- self.strings['not ok'] = arg
- yield 'OK'
-
- def _handle_SETQUALITYBAR(self, arg):
- """Adds a quality indicator to the GETPIN window. This
- indicator is updated as the passphrase is typed. The clients
- needs to implement an inquiry named "QUALITY" which gets passed
- the current passpharse (percent-plus escaped) and should send
- back a string with a single numerical vauelue between -100 and
- 100. Negative values will be displayed in red.
-
- If a custom label for the quality bar is required, just add that
- label as an argument as percent escaped string. You will need
- this feature to translate the label because pinentry has no
- internal gettext except for stock strings from the toolkit library.
-
- If you want to show a tooltip for the quality bar, you may use
- C: SETQUALITYBAR_TT string
- S: OK
-
- With STRING being a percent escaped string shown as the tooltip.
- """
- raise NotImplementedError()
-
- def _handle_GETPIN(self, arg):
- try:
- self._connect()
- self._write(self.strings['description'])
- pin = self._prompt(self.strings['prompt'], add_colon=False)
- finally:
- self._disconnect()
- yield 'D %s' % pin
- yield 'OK'
-
- def _handle_CONFIRM(self, arg):
- try:
- self._connect()
- self._write(self.strings['description'])
- self._write('1) '+self.strings['ok'])
- self._write('2) '+self.strings['not ok'])
- value = self._prompt('?')
- finally:
- self._disconnect()
- if value == '1':
- yield 'OK'
- else:
- yield 'ASSUAN_Not_Confirmed'
-
- def _handle_MESSAGE(self, arg):
- self._write(self.strings['description'])
- yield 'OK'
-
- def _handle_CONFIRM(self, args):
- assert args == '--one-button', args
- try:
- self._connect()
- self._write(self.strings['description'])
- self._write('1) '+self.strings['ok'])
- value = self._prompt('?')
- finally:
- self._disconnect()
- assert value == '1', value
- yield 'OK'
-
-
-if __name__ == '__main__':
- import argparse
-
- parser = argparse.ArgumentParser(description=__doc__, version=__version__)
- parser.add_argument(
- '-V', '--verbose', action='count', default=0,
- help='increase verbosity')
- parser.add_argument(
- '--display',
- help='set X display (ignored by this implementation)')
-
- args = parser.parse_args()
-
- if args.verbose >= 2:
- logger.setLevel(logging.DEBUG)
- elif args.verbose >= 1:
- logger.setLevel(logging.INFO)
-
- try:
- p = PinEntry()
- p.run()
- except:
- logger.error('exiting due to exception:\n%s' %
- traceback.format_exc().rstrip())
- raise
--- /dev/null
+# Copyright
+
+"""A Python implementation of the `Assuan protocol`_.
+
+.. _Assuan protocol: http://www.gnupg.org/documentation/manuals/assuan/
+"""
+
+import logging as _logging
+import logging.handlers as _logging_handlers
+
+
+__version__ = '0.1'
+
+LOG = _logging.getLogger('pyassuan')
+LOG.setLevel(_logging.ERROR)
+LOG.addHandler(_logging.StreamHandler())
+#LOG.addHandler(_logging.FileHandler('/tmp/pinentry.log'))
+#LOG.addHandler(_logging_handlers.SysLogHandler(address='/dev/log'))
+LOG.handlers[0].setFormatter(
+ _logging.Formatter('%(name)s: %(levelname)s: %(message)s'))
--- /dev/null
+# Copyright
+
+import logging as _logging
+import sys as _sys
+
+from . import LOG as _LOG
+from . import common as _common
+from . import error as _error
+
+
+class AssuanClient (object):
+ """A single-threaded Assuan client based on the `devolpment suggestions`_
+
+ .. _development suggestions:
+ http://www.gnupg.org/documentation/manuals/assuan/Client-code.html
+ """
+ def __init__(self, name, logger=_LOG, use_sublogger=True,
+ close_on_disconnect=False):
+ self.name = name
+ if use_sublogger:
+ logger = _logging.getLogger('{}.{}'.format(logger.name, self.name))
+ self.logger = logger
+ self.close_on_disconnect = close_on_disconnect
+ self.input = self.output = None
+
+ def connect(self):
+ if not self.input:
+ self.logger.info('read from stdin')
+ self.input = _sys.stdin
+ if not self.output:
+ self.logger.info('write to stdout')
+ self.output = _sys.stdout
+
+ def disconnect(self):
+ if self.close_on_disconnect:
+ self.logger.info('disconnecting')
+ self.input = None
+ self.output = None
+
+ def raise_error(self, error):
+ self.logger.error(str(error))
+ raise(error)
+
+ def read_response(self):
+ line = self.input.readline()
+ if not line:
+ self.raise_error(
+ _error.AssuanError(message='IPC accept call failed'))
+ if not line.endswith('\n'):
+ self.raise_error(
+ _error.AssuanError(message='Invalid response'))
+ line = line[:-1] # remove trailing newline
+ # TODO, line length?
+ response = _common.Response()
+ try:
+ response.from_string(line)
+ except _error.AssuanError as e:
+ self.logger.error(str(e))
+ raise
+ self.logger.info('S: {}'.format(response))
+ return response
+
+ def make_request(self, request):
+ rstring = str(request)
+ self.logger.info('C: {}'.format(rstring))
+ self.output.write(rstring)
+ self.output.write('\n')
+ try:
+ self.output.flush()
+ except IOError:
+ if not self.stop:
+ raise
+ return list(self.responses())
+
+ def responses(self):
+ while True:
+ response = self.read_response()
+ yield response
+ if response.type in ['OK', 'ERR']:
+ break
--- /dev/null
+# Copyright
+
+"""Items common to both the client and server
+"""
+
+import re as _re
+
+from . import error as _error
+
+
+_ENCODE_REGEXP = _re.compile(
+ '(' + '|'.join(['%', '\r', '\n']) + ')')
+_DECODE_REGEXP = _re.compile('(%[0-9A-F]{2})')
+_REQUEST_REGEXP = _re.compile('^(\w+)( *)(.*)\Z')
+
+
+def encode(string):
+ r"""
+
+ >>> encode('It grew by 5%!\n')
+ 'It grew by 5%25!%0A'
+ """
+ return _ENCODE_REGEXP.sub(
+ lambda x : to_hex(x.group()), string)
+
+def decode(string):
+ r"""
+
+ >>> decode('%22Look out!%22%0AWhere%3F')
+ '"Look out!"\nWhere?'
+ """
+ return _DECODE_REGEXP.sub(
+ lambda x : from_hex(x.group()), string)
+
+def from_hex(code):
+ r"""
+
+ >>> from_hex('%22')
+ '"'
+ >>> from_hex('%0A')
+ '\n'
+ """
+ return chr(int(code[1:], 16))
+
+def to_hex(char):
+ r"""
+
+ >>> to_hex('"')
+ '%22'
+ >>> to_hex('\n')
+ '%0A'
+ """
+ return '%{:02X}'.format(ord(char))
+
+
+class Request (object):
+ """A client request
+
+ http://www.gnupg.org/documentation/manuals/assuan/Client-requests.html
+
+ >>> r = Request(command='BYE')
+ >>> str(r)
+ 'BYE'
+ >>> r = Request(command='OPTION', parameters='testing at 5%')
+ >>> str(r)
+ 'OPTION testing at 5%25'
+ >>> r.from_string('BYE')
+ >>> r.command
+ 'BYE'
+ >>> print(r.parameters)
+ None
+ >>> r.from_string('OPTION testing at 5%25')
+ >>> r.command
+ 'OPTION'
+ >>> print(r.parameters)
+ testing at 5%
+ >>> r.from_string(' invalid')
+ Traceback (most recent call last):
+ ...
+ pyassuan.error.AssuanError: 170 Invalid request
+ >>> r.from_string('in-valid')
+ Traceback (most recent call last):
+ ...
+ pyassuan.error.AssuanError: 170 Invalid request
+ """
+ def __init__(self, command=None, parameters=None):
+ self.command = command
+ self.parameters = parameters
+
+ def __str__(self):
+ if self.parameters:
+ return '{} {}'.format(self.command, encode(self.parameters))
+ return self.command
+
+ def from_string(self, string):
+ if len(string) > 1000: # TODO: byte-vs-str and newlines?
+ raise _error.AssuanError(message='Line too long')
+ match = _REQUEST_REGEXP.match(string)
+ if not match:
+ raise _error.AssuanError(message='Invalid request')
+ self.command = match.group(1)
+ if match.group(3):
+ if match.group(2):
+ self.parameters = decode(match.group(3))
+ else:
+ raise _error.AssuanError(message='Invalid request')
+ else:
+ self.parameters = None
+
+
+class Response (object):
+ """A server response
+
+ http://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
+
+ >>> r = Response(type='OK')
+ >>> str(r)
+ 'OK'
+ >>> r = Response(type='ERR', parameters='1 General error')
+ >>> str(r)
+ 'ERR 1 General error'
+ >>> r.from_string('OK')
+ >>> r.type
+ 'OK'
+ >>> print(r.parameters)
+ None
+ >>> r.from_string('ERR 1 General error')
+ >>> r.type
+ 'ERR'
+ >>> print(r.parameters)
+ 1 General error
+ >>> r.from_string(' invalid')
+ Traceback (most recent call last):
+ ...
+ pyassuan.error.AssuanError: 76 Invalid response
+ >>> r.from_string('in-valid')
+ Traceback (most recent call last):
+ ...
+ pyassuan.error.AssuanError: 76 Invalid response
+ """
+ types = {
+ 'O': 'OK',
+ 'E': 'ERR',
+ 'S': 'S',
+ '#': '#',
+ 'D': 'D',
+ 'I': 'INQUIRE',
+ }
+
+ def __init__(self, type=None, parameters=None):
+ self.type = type
+ self.parameters = parameters
+
+ def __str__(self):
+ if self.parameters:
+ return '{} {}'.format(self.type, encode(self.parameters))
+ return self.type
+
+ def from_string(self, string):
+ if len(string) > 1000: # TODO: byte-vs-str and newlines?
+ raise _error.AssuanError(message='Line too long')
+ try:
+ type = self.types[string[0]]
+ except KeyError:
+ raise _error.AssuanError(message='Invalid response')
+ self.type = type
+ if type == 'D': # data
+ self.parameters = decode(string[2:])
+ elif type == '#': # comment
+ self.parameters = decode(string[2:])
+ else:
+ match = _REQUEST_REGEXP.match(string)
+ if not match:
+ raise _error.AssuanError(message='Invalid request')
+ if match.group(3):
+ if match.group(2):
+ self.parameters = decode(match.group(3))
+ else:
+ raise _error.AssuanError(message='Invalid request')
+ else:
+ self.parameters = None
+
+
+def error_response(error):
+ """
+
+ >>> from pyassuan.error import AssuanError
+ >>> error = AssuanError(1)
+ >>> response = error_response(error)
+ >>> print(response)
+ ERR 1 General error
+ """
+ return Response(type='ERR', parameters=str(error))
--- /dev/null
+# Copyright
+
+"""Assuan errors as defined in `libgpg-error`_.
+
+The Assuan_ docs_ suggest these error codes.
+
+.. _libgpg-error: http://www.gnupg.org/related_software/libgpg-error/
+.. _Assuan:
+ http://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
+.. _docs: http://www.gnupg.org/documentation/manuals/assuan/Error-codes.html
+"""
+
+MESSAGE = { # extracted from libgpg-error-1.10/src/err-codes.h and gpg-error.h
+ 0: 'Success',
+ 1: 'General error',
+ 2: 'Unknown packet',
+ 3: 'Unknown version in packet',
+ 4: 'Invalid public key algorithm',
+ 5: 'Invalid digest algorithm',
+ 6: 'Bad public key',
+ 7: 'Bad secret key',
+ 8: 'Bad signature',
+ 9: 'No public key',
+ 10: 'Checksum error',
+ 11: 'Bad passphrase',
+ 12: 'Invalid cipher algorithm',
+ 13: 'Keyring open',
+ 14: 'Invalid packet',
+ 15: 'Invalid armor',
+ 16: 'No user ID',
+ 17: 'No secret key',
+ 18: 'Wrong secret key used',
+ 19: 'Bad session key',
+ 20: 'Unknown compression algorithm',
+ 21: 'Number is not prime',
+ 22: 'Invalid encoding method',
+ 23: 'Invalid encryption scheme',
+ 24: 'Invalid signature scheme',
+ 25: 'Invalid attribute',
+ 26: 'No value',
+ 27: 'Not found',
+ 28: 'Value not found',
+ 29: 'Syntax error',
+ 30: 'Bad MPI value',
+ 31: 'Invalid passphrase',
+ 32: 'Invalid signature class',
+ 33: 'Resources exhausted',
+ 34: 'Invalid keyring',
+ 35: 'Trust DB error',
+ 36: 'Bad certificate',
+ 37: 'Invalid user ID',
+ 38: 'Unexpected error',
+ 39: 'Time conflict',
+ 40: 'Keyserver error',
+ 41: 'Wrong public key algorithm',
+ 42: 'Tribute to D. A.',
+ 43: 'Weak encryption key',
+ 44: 'Invalid key length',
+ 45: 'Invalid argument',
+ 46: 'Syntax error in URI',
+ 47: 'Invalid URI',
+ 48: 'Network error',
+ 49: 'Unknown host',
+ 50: 'Selftest failed',
+ 51: 'Data not encrypted',
+ 52: 'Data not processed',
+ 53: 'Unusable public key',
+ 54: 'Unusable secret key',
+ 55: 'Invalid value',
+ 56: 'Bad certificate chain',
+ 57: 'Missing certificate',
+ 58: 'No data',
+ 59: 'Bug',
+ 60: 'Not supported',
+ 61: 'Invalid operation code',
+ 62: 'Timeout',
+ 63: 'Internal error',
+ 64: 'EOF (gcrypt)',
+ 65: 'Invalid object',
+ 66: 'Provided object is too short',
+ 67: 'Provided object is too large',
+ 68: 'Missing item in object',
+ 69: 'Not implemented',
+ 70: 'Conflicting use',
+ 71: 'Invalid cipher mode',
+ 72: 'Invalid flag',
+ 73: 'Invalid handle',
+ 74: 'Result truncated',
+ 75: 'Incomplete line',
+ 76: 'Invalid response',
+ 77: 'No agent running',
+ 78: 'agent error',
+ 79: 'Invalid data',
+ 80: 'Unspecific Assuan server fault',
+ 81: 'General Assuan error',
+ 82: 'Invalid session key',
+ 83: 'Invalid S-expression',
+ 84: 'Unsupported algorithm',
+ 85: 'No pinentry',
+ 86: 'pinentry error',
+ 87: 'Bad PIN',
+ 88: 'Invalid name',
+ 89: 'Bad data',
+ 90: 'Invalid parameter',
+ 91: 'Wrong card',
+ 92: 'No dirmngr',
+ 93: 'dirmngr error',
+ 94: 'Certificate revoked',
+ 95: 'No CRL known',
+ 96: 'CRL too old',
+ 97: 'Line too long',
+ 98: 'Not trusted',
+ 99: 'Operation cancelled',
+ 100: 'Bad CA certificate',
+ 101: 'Certificate expired',
+ 102: 'Certificate too young',
+ 103: 'Unsupported certificate',
+ 104: 'Unknown S-expression',
+ 105: 'Unsupported protection',
+ 106: 'Corrupted protection',
+ 107: 'Ambiguous name',
+ 108: 'Card error',
+ 109: 'Card reset required',
+ 110: 'Card removed',
+ 111: 'Invalid card',
+ 112: 'Card not present',
+ 113: 'No PKCS15 application',
+ 114: 'Not confirmed',
+ 115: 'Configuration error',
+ 116: 'No policy match',
+ 117: 'Invalid index',
+ 118: 'Invalid ID',
+ 119: 'No SmartCard daemon',
+ 120: 'SmartCard daemon error',
+ 121: 'Unsupported protocol',
+ 122: 'Bad PIN method',
+ 123: 'Card not initialized',
+ 124: 'Unsupported operation',
+ 125: 'Wrong key usage',
+ 126: 'Nothing found',
+ 127: 'Wrong blob type',
+ 128: 'Missing value',
+ 129: 'Hardware problem',
+ 130: 'PIN blocked',
+ 131: 'Conditions of use not satisfied',
+ 132: 'PINs are not synced',
+ 133: 'Invalid CRL',
+ 134: 'BER error',
+ 135: 'Invalid BER',
+ 136: 'Element not found',
+ 137: 'Identifier not found',
+ 138: 'Invalid tag',
+ 139: 'Invalid length',
+ 140: 'Invalid key info',
+ 141: 'Unexpected tag',
+ 142: 'Not DER encoded',
+ 143: 'No CMS object',
+ 144: 'Invalid CMS object',
+ 145: 'Unknown CMS object',
+ 146: 'Unsupported CMS object',
+ 147: 'Unsupported encoding',
+ 148: 'Unsupported CMS version',
+ 149: 'Unknown algorithm',
+ 150: 'Invalid crypto engine',
+ 151: 'Public key not trusted',
+ 152: 'Decryption failed',
+ 153: 'Key expired',
+ 154: 'Signature expired',
+ 155: 'Encoding problem',
+ 156: 'Invalid state',
+ 157: 'Duplicated value',
+ 158: 'Missing action',
+ 159: 'ASN.1 module not found',
+ 160: 'Invalid OID string',
+ 161: 'Invalid time',
+ 162: 'Invalid CRL object',
+ 163: 'Unsupported CRL version',
+ 164: 'Invalid certificate object',
+ 165: 'Unknown name',
+ 166: 'A locale function failed',
+ 167: 'Not locked',
+ 168: 'Protocol violation',
+ 169: 'Invalid MAC',
+ 170: 'Invalid request',
+ 171: 'Unknown extension',
+ 172: 'Unknown critical extension',
+ 173: 'Locked',
+ 174: 'Unknown option',
+ 175: 'Unknown command',
+ 176: 'Not operational',
+ 177: 'No passphrase given',
+ 178: 'No PIN given',
+ 179: 'Not enabled',
+ 180: 'No crypto engine',
+ 181: 'Missing key',
+ 182: 'Too many objects',
+ 183: 'Limit reached',
+ 184: 'Not initialized',
+ 185: 'Missing issuer certificate',
+ 198: 'Operation fully cancelled',
+ 199: 'Operation not yet finished',
+ 200: 'Buffer too short',
+ 201: 'Invalid length specifier in S-expression',
+ 202: 'String too long in S-expression',
+ 203: 'Unmatched parentheses in S-expression',
+ 204: 'S-expression not canonical',
+ 205: 'Bad character in S-expression',
+ 206: 'Bad quotation in S-expression',
+ 207: 'Zero prefix in S-expression',
+ 208: 'Nested display hints in S-expression',
+ 209: 'Unmatched display hints',
+ 210: 'Unexpected reserved punctuation in S-expression',
+ 211: 'Bad hexadecimal character in S-expression',
+ 212: 'Odd hexadecimal numbers in S-expression',
+ 213: 'Bad octal character in S-expression',
+ 257: 'General IPC error',
+ 258: 'IPC accept call failed',
+ 259: 'IPC connect call failed',
+ 260: 'Invalid IPC response',
+ 261: 'Invalid value passed to IPC',
+ 262: 'Incomplete line passed to IPC',
+ 263: 'Line passed to IPC too long',
+ 264: 'Nested IPC commands',
+ 265: 'No data callback in IPC',
+ 266: 'No inquire callback in IPC',
+ 267: 'Not an IPC server',
+ 268: 'Not an IPC client',
+ 269: 'Problem starting IPC server',
+ 270: 'IPC read error',
+ 271: 'IPC write error',
+ 273: 'Too much data for IPC layer',
+ 274: 'Unexpected IPC command',
+ 275: 'Unknown IPC command',
+ 276: 'IPC syntax error',
+ 277: 'IPC call has been cancelled',
+ 278: 'No input source for IPC',
+ 279: 'No output source for IPC',
+ 280: 'IPC parameter error',
+ 281: 'Unknown IPC inquire',
+ 1024: 'User defined error code 1',
+ 1025: 'User defined error code 2',
+ 1026: 'User defined error code 3',
+ 1027: 'User defined error code 4',
+ 1028: 'User defined error code 5',
+ 1029: 'User defined error code 6',
+ 1030: 'User defined error code 7',
+ 1031: 'User defined error code 8',
+ 1032: 'User defined error code 9',
+ 1033: 'User defined error code 10',
+ 1034: 'User defined error code 11',
+ 1035: 'User defined error code 12',
+ 1036: 'User defined error code 13',
+ 1037: 'User defined error code 14',
+ 1038: 'User defined error code 15',
+ 1039: 'User defined error code 16',
+ 16381: 'System error w/o errno',
+ 16382: 'Unknown system error',
+ 16383: 'End of file',
+ }
+UNKNOWN = 'Unknown error code'
+
+CODE = dict((message,code) for code,message in MESSAGE.items())
+
+# TODO: system errors (GPG_ERR_E2BIG = GPG_ERR_SYSTEM_ERROR | 0, etc.)
+
+class AssuanError (Exception):
+ r"""
+
+ >>> e = AssuanError(1)
+ >>> print(e)
+ 1 General error
+ >>> e = AssuanError(1024, 'testing!')
+ >>> print(e)
+ 1024 testing!
+ >>> e = AssuanError(message='Unknown packet')
+ >>> print(e)
+ 2 Unknown packet
+ """
+ def __init__(self, code=None, message=None):
+ if code is None and message is None:
+ raise ValueError('missing both `code` and `message`')
+ if message is None:
+ message = MESSAGE[code]
+ if code is None:
+ code = CODE.get(message, UNKNOWN)
+ self.code = code
+ self.message = message
+ super(AssuanError, self).__init__('{} {}'.format(code, message))
--- /dev/null
+# Copyright
+
+import logging as _logging
+import re as _re
+import socket as _socket
+import sys as _sys
+import threading as _threading
+import traceback as _traceback
+
+from . import LOG as _LOG
+from . import common as _common
+from . import error as _error
+
+
+_OPTION_REGEXP = _re.compile('^-?-?([-\w]+)( *)(=?) *(.*?) *\Z')
+
+
+class AssuanServer (object):
+ """A single-threaded Assuan server based on the `devolpment suggestions`_
+
+ Extend by subclassing and adding ``_handle_XXX`` methods for each
+ command you want to handle.
+
+ .. _development suggestions:
+ http://www.gnupg.org/documentation/manuals/assuan/Server-code.html
+ """
+ def __init__(self, name, logger=_LOG, use_sublogger=True,
+ valid_options=None, strict_options=True,
+ single_request=False, listen_to_quit=False,
+ close_on_disconnect=False):
+ self.name = name
+ if use_sublogger:
+ logger = _logging.getLogger('{}.{}'.format(logger.name, self.name))
+ self.logger = logger
+ if valid_options is None:
+ valid_options = []
+ self.valid_options = valid_options
+ self.strict_options = strict_options
+ self.single_request = single_request
+ self.listen_to_quit = listen_to_quit
+ self.close_on_disconnect = close_on_disconnect
+ self.input = self.output = None
+ self.options = {}
+ self.reset()
+
+ def reset(self):
+ self.stop = False
+ self.options.clear()
+
+ def run(self):
+ self.reset()
+ self.logger.info('running')
+ self.connect()
+ try:
+ self.handle_requests()
+ finally:
+ self.disconnect()
+ self.logger.info('stopping')
+
+ def connect(self):
+ if not self.input:
+ self.logger.info('read from stdin')
+ self.input = _sys.stdin
+ if not self.output:
+ self.logger.info('write to stdout')
+ self.output = _sys.stdout
+
+ def disconnect(self):
+ if self.close_on_disconnect:
+ self.logger.info('disconnecting')
+ self.input = None
+ self.output = None
+
+ def handle_requests(self):
+ self.send_response(self, _common.Response('OK', 'Your orders please'))
+ self.output.flush()
+ while not self.stop:
+ line = self.input.readline()
+ if not line:
+ break # EOF
+ if not line.endswith('\n'):
+ self.logger.info('C: {}'.format(line))
+ self.send_error_response(
+ _error.AssuanError(message='Invalid request'))
+ continue
+ line = line[:-1] # remove the trailing newline
+ self.logger.info('C: {}'.format(line))
+ request = _common.Request()
+ try:
+ request.from_string(line)
+ except _error.AssuanError as e:
+ self.send_error_response(e)
+ continue
+ self.handle_request(request)
+
+ def handle_request(self, request):
+ try:
+ handle = getattr(
+ self, '_handle_{}'.format(request.command))
+ except AttributeError:
+ self.send_error_response(
+ _error.AssuanError(message='Unknown command'))
+ return
+ try:
+ responses = handle(request.parameters)
+ for response in responses:
+ self.send_response(response)
+ except _error.AssuanError as error:
+ self.send_error_response(error)
+ return
+ except Exception as e:
+ self.logger.error(
+ 'exception while executing {}:\n{}'.format(
+ handle, _traceback.format_exc().rstrip()))
+ self.send_error_response(
+ _error.AssuanError(message='Unspecific Assuan server fault'))
+ return
+
+ def send_response(self, response):
+ """For internal use by ``.handle_requests()``
+ """
+ rstring = str(response)
+ self.logger.info('S: {}'.format(rstring))
+ self.output.write(rstring)
+ self.output.write('\n')
+ try:
+ self.output.flush()
+ except IOError:
+ if not self.stop:
+ raise
+
+ def send_error_response(self, error):
+ """For internal use by ``.handle_requests()``
+ """
+ self.send_response(_common.error_response(error))
+
+ # common commands defined at
+ # http://www.gnupg.org/documentation/manuals/assuan/Client-requests.html
+
+ def _handle_BYE(self, arg):
+ if self.single_request:
+ self.stop = True
+ yield _common.Response('OK', 'closing connection')
+
+ def _handle_RESET(self, arg):
+ self.reset()
+
+ def _handle_END(self, arg):
+ raise _error.AssuanError(
+ code=175, message='Unknown command (reserved)')
+
+ def _handle_HELP(self, arg):
+ raise _error.AssuanError(
+ code=175, message='Unknown command (reserved)')
+
+ def _handle_QUIT(self, arg):
+ if self.listen_to_quit:
+ self.stop = True
+ yield _common.Response('OK', 'stopping the server')
+ raise _error.AssuanError(
+ code=175, message='Unknown command (reserved)')
+
+ def _handle_OPTION(self, arg):
+ """
+
+ >>> s = AssuanServer(name='test', valid_options=['my-op'])
+ >>> list(s._handle_OPTION('my-op = 1 ')) # doctest: +ELLIPSIS
+ [<pyassuan.common.Response object at ...>]
+ >>> s.options
+ {'my-op': '1'}
+ >>> list(s._handle_OPTION('my-op 2')) # doctest: +ELLIPSIS
+ [<pyassuan.common.Response object at ...>]
+ >>> s.options
+ {'my-op': '2'}
+ >>> list(s._handle_OPTION('--my-op 3')) # doctest: +ELLIPSIS
+ [<pyassuan.common.Response object at ...>]
+ >>> s.options
+ {'my-op': '3'}
+ >>> list(s._handle_OPTION('my-op')) # doctest: +ELLIPSIS
+ [<pyassuan.common.Response object at ...>]
+ >>> s.options
+ {'my-op': None}
+ >>> list(s._handle_OPTION('inv'))
+ Traceback (most recent call last):
+ ...
+ pyassuan.error.AssuanError: 174 Unknown option
+ >>> list(s._handle_OPTION('in|valid'))
+ Traceback (most recent call last):
+ ...
+ pyassuan.error.AssuanError: 90 Invalid parameter
+ """
+ match = _OPTION_REGEXP.match(arg)
+ if not match:
+ raise _error.AssuanError(message='Invalid parameter')
+ name,space,equal,value = match.groups()
+ if value and not space and not equal:
+ # need either space or equal to separate value
+ raise _error.AssuanError(message='Invalid parameter')
+ if name not in self.valid_options:
+ if self.strict_options:
+ raise _error.AssuanError(message='Unknown option')
+ else:
+ self.logger.info('skipping invalid option: {}'.format(name))
+ else:
+ if not value:
+ value = None
+ self.options[name] = value
+ yield _common.Response('OK')
+
+ def _handle_CANCEL(self, arg):
+ raise _error.AssuanError(
+ code=175, message='Unknown command (reserved)')
+
+ def _handle_AUTH(self, arg):
+ raise _error.AssuanError(
+ code=175, message='Unknown command (reserved)')
+
+
+class AssuanSocketServer (object):
+ """A threaded server spawning ``AssuanServer``\s for each connection
+ """
+ def __init__(self, name, socket, server, kwargs={}, max_threads=10,
+ logger=_LOG, use_sublogger=True):
+ self.name = name
+ if use_sublogger:
+ logger = _logging.getLogger('{}.{}'.format(logger.name, self.name))
+ self.logger = logger
+ self.socket = socket
+ self.server = server
+ assert 'name' not in kwargs, kwargs['name']
+ assert 'logger' not in kwargs, kwargs['logger']
+ kwargs['logger'] = self.logger
+ assert 'use_sublogger' not in kwargs, kwargs['use_sublogger']
+ kwargs['use_sublogger'] = True
+ if 'close_on_disconnect' in kwargs:
+ assert kwargs['close_on_disconnect'] == True, (
+ kwargs['close_on_disconnect'])
+ else:
+ kwargs['close_on_disconnect'] = True
+ self.kwargs = kwargs
+ self.max_threads = max_threads
+ self.threads = []
+
+ def run(self):
+ self.logger.info('listen on socket')
+ self.socket.listen()
+ thread_index = 0
+ while True:
+ socket,address = self.socket.accept()
+ self.logger.info('connection from {}'.format(address))
+ self.cleanup_threads()
+ if len(threads) > self.max_threads:
+ self.drop_connection(socket, address)
+ self.spawn_thread(
+ 'server-thread-{}'.format(thread_index), socket, address)
+ thread_index = (thread_index + 1) % self.max_threads
+
+ def cleanup_threads(self):
+ i = 0
+ while i < len(self.threads):
+ thread = self.threads[i]
+ thread.join(0)
+ if thread.is_alive():
+ self.logger.info('joined thread {}'.format(thread.name))
+ self.threads.pop(i)
+ thread.socket.shutdown()
+ thread.socket.close()
+ else:
+ i += 1
+
+ def drop_connection(self, socket, address):
+ self.logger.info('drop connection from {}'.format(address))
+ # TODO: proper error to send to the client?
+
+ def spawn_thread(self, name, socket, address):
+ server = self.server(name=name, **self.kwargs)
+ server.input = socket.makefile('r')
+ server.output = socket.makefile('w')
+ thread = _threading.Thread(target=server.run, name=name)
+ thread.start()
+ self.threads.append(thread)