From: W. Trevor King Date: Sat, 24 Mar 2012 11:22:00 +0000 (-0400) Subject: Add pyassuan post, and remove pinentry.py in favor of pyassuan. X-Git-Url: http://git.tremily.us/?a=commitdiff_plain;h=78c29555462396ede4f01042202e255fb4c58c8a;p=blog.git Add pyassuan post, and remove pinentry.py in favor of pyassuan. --- diff --git a/posts/Gentoo_overlay.mdwn b/posts/Gentoo_overlay.mdwn index 88ac37b..f26333e 100644 --- a/posts/Gentoo_overlay.mdwn +++ b/posts/Gentoo_overlay.mdwn @@ -45,6 +45,8 @@ personal `layman.xml`). The overlay is a fairly strange mix: (my [[h5config]] package) - dev-python/pgp-mime (my [[pgp-mime]] package) +- dev-python/pyassuan + (my [[pyassuan]] package) - dev-python/pygrader (my [[pygrader]] package) - dev-python/pymodbus diff --git a/posts/gpg-agent.mdwn b/posts/gpg-agent.mdwn index f4df550..53d2d8b 100644 --- a/posts/gpg-agent.mdwn +++ b/posts/gpg-agent.mdwn @@ -21,11 +21,7 @@ TTY. This avoids troublesome issues like pinentry clobbering [[Mutt]] if they are both using ncurses. I didn't like any of the pinentry programs available on my system, so -I wrote my own: [[pinentry.py]]. To use my script, save it somewhere -on your system and add a line like the following to your -`~/.gnupg/gpg-agent.conf`. - - pinentry-program /path/to/pinentry.py +I wrote my own in [[pyassuan]]. When you are done with the agent, kill it with diff --git a/posts/gpg-agent/pinentry.py b/posts/gpg-agent/pinentry.py deleted file mode 100755 index 99e1bc3..0000000 --- a/posts/gpg-agent/pinentry.py +++ /dev/null @@ -1,398 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2011 W. Trevor King -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this program. If not, see -# . - -"""Simple pinentry program for getting pins from a terminal. -""" - -import copy -import logging -import logging.handlers -import os -import os.path -import pprint -import re -import signal -import sys -import termios -import traceback - - -__version__ = '0.1' - - -# create logger -logger = logging.getLogger('pinentry') -logger.setLevel(logging.WARNING) -_h = logging.handlers.SysLogHandler(address='/dev/log') -_h.setLevel(logging.DEBUG) -_f = logging.Formatter('%(name)s: %(levelname)s: %(message)s') -_h.setFormatter(_f) -logger.addHandler(_h) -del _h, _f - - -class PinEntry (object): - """pinentry protocol server - - See the `Assuan manual`_ for a description of the protocol. - - .. _Assuan manual: http://www.gnupg.org/documentation/manuals/assuan/ - """ - _digit_regexp = re.compile(r'\d+') - - # from proc(5): pid comm state ppid pgrp session tty_nr tpgid - _tpgrp_regexp = re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)') - - _assuan_encode_regexp = re.compile( - '(' + '|'.join(['%', '\r', '\n']) + ')') - _assuan_decode_regexp = re.compile('(%[0-9A-F]{2})') - - def __init__(self): - self.stop = False - self.options = {} - self.strings = {} - self.connection = {} - - def run(self): - logger.info('---opening pinentry---') - logger.info('OK Your orders please') - sys.stdout.write('OK Your orders please\n') - sys.stdout.flush() - try: - while not self.stop: - line = sys.stdin.readline() - if not line: - break # EOF - line = line.rstrip() # dangerous? - logger.info(line) - line = self._decode(line) - fields = line.split(' ', 1) - cmd = fields[0] - if len(fields) > 1: - arg = fields[1] - else: - arg = None - handle = getattr(self, '_handle_%s' % cmd, None) - if handle: - for response in handle(arg): - response = self._encode(response) - logger.info(response) - sys.stdout.write(response+'\n') - try: - sys.stdout.flush() - except IOError: - if not self.stop: - raise - else: - raise ValueError(line) - finally: - logger.info('---closing pinentry---') - - # user interface - - def _connect(self): - logger.info('--connecting to user--') - logger.debug('options:\n%s' % pprint.pformat(self.options)) - tty_name = self.options.get('ttyname', None) - if tty_name: - self.connection['tpgrp'] = self._get_pgrp(tty_name) - logger.info('open to-user output stream for %s' % tty_name) - self.connection['to_user'] = open(tty_name, 'w') - logger.info('open from-user input stream for %s' % tty_name) - self.connection['from_user'] = open(tty_name, 'r') - logger.info('get current termios line discipline') - self.connection['original termios'] = termios.tcgetattr( - self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...] - new_termios = copy.deepcopy(self.connection['original termios']) - # translate carriage return to newline on input - new_termios[0] |= termios.ICRNL - # do not ignore carriage return on input - new_termios[0] &= ~termios.IGNCR - # do not echo input characters - new_termios[3] &= ~termios.ECHO - # echo input characters - #new_termios[3] |= termios.ECHO - # echo the NL character even if ECHO is not set - new_termios[3] |= termios.ECHONL - # enable canonical mode - new_termios[3] |= termios.ICANON - logger.info('adjust termios line discipline') - termios.tcsetattr( - self.connection['to_user'], termios.TCSANOW, new_termios) - logger.info('send SIGSTOP to pgrp %d' % self.connection['tpgrp']) - #os.killpg(self.connection['tpgrp'], signal.SIGSTOP) - os.kill(-self.connection['tpgrp'], signal.SIGSTOP) - self.connection['tpgrp stopped'] = True - else: - logger.info('no TTY name given; use stdin/stdout for I/O') - self.connection['to_user'] = sys.stdout - self.connection['from_user'] = sys.stdin - logger.info('--connected to user--') - self.connection['to_user'].write('\n') # give a clean line to work on - self.connection['active'] = True - - def _disconnect(self): - logger.info('--disconnecting from user--') - try: - if self.connection.get('original termios', None): - logger.info('restore original termios line discipline') - termios.tcsetattr( - self.connection['to_user'], termios.TCSANOW, - self.connection['original termios']) - if self.connection.get('tpgrp stopped', None) is True: - logger.info( - 'send SIGCONT to pgrp %d' % self.connection['tpgrp']) - #os.killpg(self.connection['tpgrp'], signal.SIGCONT) - os.kill(-self.connection['tpgrp'], signal.SIGCONT) - if self.connection.get('to_user', None) not in [None, sys.stdout]: - logger.info('close to-user output stream') - self.connection['to_user'].close() - if self.connection.get('from_user', None) not in [None,sys.stdout]: - logger.info('close from-user input stream') - self.connection['from_user'].close() - finally: - self.connection = {'active': False} - logger.info('--disconnected from user--') - - def _get_pgrp(self, tty_name): - logger.info('find process group contolling %s' % tty_name) - proc = '/proc' - for name in os.listdir(proc): - path = os.path.join(proc, name) - if not (self._digit_regexp.match(name) and os.path.isdir(path)): - continue # not a process directory - logger.debug('checking process %s' % name) - fd_path = os.path.join(path, 'fd', '0') - try: - link = os.readlink(fd_path) - except OSError, e: - logger.debug('not our process: %s' % e) - continue # permission denied (not one of our processes) - if link != tty_name: - logger.debug('wrong tty: %s' % link) - continue # not attached to our target tty - stat_path = os.path.join(path, 'stat') - stat = open(stat_path, 'r').read() - logger.debug('check stat for pgrp: %s' % stat) - match = self._tpgrp_regexp.match(stat) - assert match != None, stat - pgrp = int(match.group(1)) - logger.info('found pgrp %d for %s' % (pgrp, tty_name)) - return pgrp - raise ValueError(tty_name) - - def _write(self, string): - "Write text to the user's terminal." - self.connection['to_user'].write(string + '\n') - self.connection['to_user'].flush() - - def _read(self): - "Read and return a line from the user's terminal." - # drop trailing newline - return self.connection['from_user'].readline()[:-1] - - def _prompt(self, prompt='?', add_colon=True): - if add_colon: - prompt += ':' - self.connection['to_user'].write('%s ' % prompt) - self.connection['to_user'].flush() - return self._read() - - # Assuan utilities - - def _encode(self, string): - """ - - >>> p = PinEntry() - >>> p._encode('It grew by 5%!\\n') - 'It grew by 5%25!%0A' - """ - return self._assuan_encode_regexp.sub( - lambda x : self._to_hex(x.group()), string) - - def _decode(self, string): - """ - - >>> p = PinEntry() - >>> p._decode('%22Look out!%22%0AWhere%3F') - '"Look out!"\\nWhere?' - """ - return self._assuan_decode_regexp.sub( - lambda x : self._from_hex(x.group()), string) - - def _from_hex(self, code): - """ - - >>> p = PinEntry() - >>> p._from_hex('%22') - '"' - >>> p._from_hex('%0A') - '\\n' - """ - return chr(int(code[1:], 16)) - - def _to_hex(self, char): - """ - - >>> p = PinEntry() - >>> p._to_hex('"') - '%22' - >>> p._to_hex('\\n') - '%0A' - """ - return '%%%02X' % ord(char) - - # handlers - - def _handle_BYE(self, arg): - self.stop = True - yield 'OK closing connection' - - def _handle_OPTION(self, arg): - # ttytype to set TERM - fields = arg.split('=', 1) - key = fields[0] - if len(fields) > 1: - value = fields[1] - else: - value = True - self.options[key] = value - yield 'OK' - - def _handle_GETINFO(self, arg): - if arg == 'pid': - yield 'D %d' % os.getpid() - else: - raise ValueError(arg) - yield 'OK' - - def _handle_SETDESC(self, arg): - self.strings['description'] = arg - yield 'OK' - - def _handle_SETPROMPT(self, arg): - self.strings['prompt'] = arg - yield 'OK' - - def _handle_SETERROR(self, arg): - self.strings['error'] = arg - yield 'OK' - - def _handle_SETTITLE(self, arg): - self.strings['title'] = arg - yield 'OK' - - def _handle_SETOK(self, arg): - self.strings['ok'] = arg - yield 'OK' - - def _handle_SETCANCEL(self, arg): - self.strings['cancel'] = arg - yield 'OK' - - def _handle_SETNOTOK(self, arg): - self.strings['not ok'] = arg - yield 'OK' - - def _handle_SETQUALITYBAR(self, arg): - """Adds a quality indicator to the GETPIN window. This - indicator is updated as the passphrase is typed. The clients - needs to implement an inquiry named "QUALITY" which gets passed - the current passpharse (percent-plus escaped) and should send - back a string with a single numerical vauelue between -100 and - 100. Negative values will be displayed in red. - - If a custom label for the quality bar is required, just add that - label as an argument as percent escaped string. You will need - this feature to translate the label because pinentry has no - internal gettext except for stock strings from the toolkit library. - - If you want to show a tooltip for the quality bar, you may use - C: SETQUALITYBAR_TT string - S: OK - - With STRING being a percent escaped string shown as the tooltip. - """ - raise NotImplementedError() - - def _handle_GETPIN(self, arg): - try: - self._connect() - self._write(self.strings['description']) - pin = self._prompt(self.strings['prompt'], add_colon=False) - finally: - self._disconnect() - yield 'D %s' % pin - yield 'OK' - - def _handle_CONFIRM(self, arg): - try: - self._connect() - self._write(self.strings['description']) - self._write('1) '+self.strings['ok']) - self._write('2) '+self.strings['not ok']) - value = self._prompt('?') - finally: - self._disconnect() - if value == '1': - yield 'OK' - else: - yield 'ASSUAN_Not_Confirmed' - - def _handle_MESSAGE(self, arg): - self._write(self.strings['description']) - yield 'OK' - - def _handle_CONFIRM(self, args): - assert args == '--one-button', args - try: - self._connect() - self._write(self.strings['description']) - self._write('1) '+self.strings['ok']) - value = self._prompt('?') - finally: - self._disconnect() - assert value == '1', value - yield 'OK' - - -if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser(description=__doc__, version=__version__) - parser.add_argument( - '-V', '--verbose', action='count', default=0, - help='increase verbosity') - parser.add_argument( - '--display', - help='set X display (ignored by this implementation)') - - args = parser.parse_args() - - if args.verbose >= 2: - logger.setLevel(logging.DEBUG) - elif args.verbose >= 1: - logger.setLevel(logging.INFO) - - try: - p = PinEntry() - p.run() - except: - logger.error('exiting due to exception:\n%s' % - traceback.format_exc().rstrip()) - raise diff --git a/posts/pyassuan.mdwn b/posts/pyassuan.mdwn new file mode 100644 index 0000000..a6fd81e --- /dev/null +++ b/posts/pyassuan.mdwn @@ -0,0 +1,42 @@ +[[!meta title="pyassuan"]] +[[!template id=gitrepo repo=pyassuan]] + +I've been trying to come up with a clean way to verify detached +[[PGP]] signatures from [[Python]]. There are [a number of existing +approaches to this problem][wrappers]. Many of them call [gpg][] +using Python's `multiprocessing` module, but to verify detached +signatures, you need to send the signature in [on a separate file +descriptor][enable-special-filenames], and handling that in a way safe +from deadlocks is difficult. The other approach, taken by [PyMe][] is +to wrap [GPGME][] using [[SWIG]], which is great as far as it goes, +but development seems to have stalled, and I find the raw GPGME +interface excessively complicated. + +The GnuPG tools themselves often communicate over sockets using the +[Assuan protocol][assuan], and I'd already written an Assuan server to +handle pinentry (originally for my [[gpg-agent]] post, not [part of +pyassuan][pinentry.py]). I though it would be natural if there was a +[[gpgme-agent]] which would handle cryptographic tasks over this +protocol, which would make the [[pgp-mime]] implementation easier. In +order to talk to this (currently hypothetical) agent, I turned my +pinentry script into the more general pyassuan package. Now using +Assuan from Python should be as easy (or easier?) than using it from C +via [libassuan][]. + +The `README` is posted on the [PyPI page][pypi]. + +[wrappers]: http://wiki.python.org/moin/GnuPrivacyGuard +[gpg]: http://www.gnupg.org/ +[enable-special-filenames]: http://lists.gnupg.org/pipermail/gnupg-devel/2002-November/019343.html +[PyMe]: http://pyme.sourceforge.net/ +[GPGME]: http://www.gnupg.org/related_software/gpgme/ +[assuan]: http://www.gnupg.org/documentation/manuals/assuan/ +[pinentry.py]: http://git.tremily.us/?p=pyassuan.git;a=blob;f=bin/pinentry.py;hb=HEAD +[libassuan]: http://www.gnupg.org/related_software/libassuan/ +[pypi]: http://pypi.python.org/pypi/pyassuan/ + +[[!tag tags/code]] +[[!tag tags/linux]] +[[!tag tags/programming]] +[[!tag tags/pypi]] +[[!tag tags/python]]