Add pyassuan post, and remove pinentry.py in favor of pyassuan.
authorW. Trevor King <wking@drexel.edu>
Sat, 24 Mar 2012 11:22:00 +0000 (07:22 -0400)
committerW. Trevor King <wking@drexel.edu>
Sat, 24 Mar 2012 11:22:00 +0000 (07:22 -0400)
posts/Gentoo_overlay.mdwn
posts/gpg-agent.mdwn
posts/gpg-agent/pinentry.py [deleted file]
posts/pyassuan.mdwn [new file with mode: 0644]

index 88ac37b390f47052d63eb6f4958ccdbc33e0ba54..f26333e3b01d5fbf9d658d9e97279149cc40537d 100644 (file)
@@ -45,6 +45,8 @@ personal `layman.xml`).  The overlay is a fairly strange mix:
   (my [[h5config]] package)
 - dev-python/pgp-mime
   (my [[pgp-mime]] package)
+- dev-python/pyassuan
+  (my [[pyassuan]] package)
 - dev-python/pygrader
   (my [[pygrader]] package)
 - dev-python/pymodbus
index f4df5501b4db70e15af57efa3fbf82c890437cf5..53d2d8bfb5c9eaae09ee0089506ccd4fd5b8788e 100644 (file)
@@ -21,11 +21,7 @@ TTY.  This avoids troublesome issues like pinentry clobbering [[Mutt]]
 if they are both using ncurses.
 
 I didn't like any of the pinentry programs available on my system, so
-I wrote my own: [[pinentry.py]].  To use my script, save it somewhere
-on your system and add a line like the following to your
-`~/.gnupg/gpg-agent.conf`.
-
-    pinentry-program /path/to/pinentry.py
+I wrote my own in [[pyassuan]].
 
 When you are done with the agent, kill it with
 
diff --git a/posts/gpg-agent/pinentry.py b/posts/gpg-agent/pinentry.py
deleted file mode 100755 (executable)
index 99e1bc3..0000000
+++ /dev/null
@@ -1,398 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this program.  If not, see
-# <http://www.gnu.org/licenses/>.
-
-"""Simple pinentry program for getting pins from a terminal.
-"""
-
-import copy
-import logging
-import logging.handlers
-import os
-import os.path
-import pprint
-import re
-import signal
-import sys
-import termios
-import traceback
-
-
-__version__ = '0.1'
-
-
-# create logger
-logger = logging.getLogger('pinentry')
-logger.setLevel(logging.WARNING)
-_h = logging.handlers.SysLogHandler(address='/dev/log')
-_h.setLevel(logging.DEBUG)
-_f = logging.Formatter('%(name)s: %(levelname)s: %(message)s')
-_h.setFormatter(_f)
-logger.addHandler(_h)
-del _h, _f
-
-
-class PinEntry (object):
-    """pinentry protocol server
-
-    See the `Assuan manual`_ for a description of the protocol.
-
-    .. _Assuan manual: http://www.gnupg.org/documentation/manuals/assuan/
-    """
-    _digit_regexp = re.compile(r'\d+')
-
-    # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
-    _tpgrp_regexp = re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
-
-    _assuan_encode_regexp = re.compile(
-        '(' + '|'.join(['%', '\r', '\n']) + ')')
-    _assuan_decode_regexp = re.compile('(%[0-9A-F]{2})')
-
-    def __init__(self):
-        self.stop = False
-        self.options = {}
-        self.strings = {}
-        self.connection = {}
-
-    def run(self):
-        logger.info('---opening pinentry---')
-        logger.info('OK Your orders please')
-        sys.stdout.write('OK Your orders please\n')
-        sys.stdout.flush()
-        try:
-            while not self.stop:
-                line = sys.stdin.readline()
-                if not line:
-                    break  # EOF
-                line = line.rstrip()  # dangerous?
-                logger.info(line)
-                line = self._decode(line)
-                fields = line.split(' ', 1)
-                cmd = fields[0]
-                if len(fields) > 1:
-                    arg = fields[1]
-                else:
-                    arg = None
-                handle = getattr(self, '_handle_%s' % cmd, None)
-                if handle:
-                    for response in handle(arg):
-                        response = self._encode(response)
-                        logger.info(response)
-                        sys.stdout.write(response+'\n')
-                        try:
-                            sys.stdout.flush()
-                        except IOError:
-                            if not self.stop:
-                                raise
-                else:
-                    raise ValueError(line)
-        finally:
-            logger.info('---closing pinentry---')
-
-    # user interface
-
-    def _connect(self):
-        logger.info('--connecting to user--')
-        logger.debug('options:\n%s' % pprint.pformat(self.options))
-        tty_name = self.options.get('ttyname', None)
-        if tty_name:
-            self.connection['tpgrp'] = self._get_pgrp(tty_name)
-            logger.info('open to-user output stream for %s' % tty_name)
-            self.connection['to_user'] = open(tty_name, 'w')
-            logger.info('open from-user input stream for %s' % tty_name)
-            self.connection['from_user'] = open(tty_name, 'r')
-            logger.info('get current termios line discipline')
-            self.connection['original termios'] = termios.tcgetattr(
-                self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
-            new_termios = copy.deepcopy(self.connection['original termios'])
-            # translate carriage return to newline on input
-            new_termios[0] |= termios.ICRNL
-            # do not ignore carriage return on input
-            new_termios[0] &= ~termios.IGNCR
-            # do not echo input characters
-            new_termios[3] &= ~termios.ECHO
-            # echo input characters
-            #new_termios[3] |= termios.ECHO
-            # echo the NL character even if ECHO is not set
-            new_termios[3] |= termios.ECHONL
-            # enable canonical mode
-            new_termios[3] |= termios.ICANON
-            logger.info('adjust termios line discipline')
-            termios.tcsetattr(
-                self.connection['to_user'], termios.TCSANOW, new_termios)
-            logger.info('send SIGSTOP to pgrp %d' % self.connection['tpgrp'])
-            #os.killpg(self.connection['tpgrp'], signal.SIGSTOP)
-            os.kill(-self.connection['tpgrp'], signal.SIGSTOP)
-            self.connection['tpgrp stopped'] = True
-        else:
-            logger.info('no TTY name given; use stdin/stdout for I/O')
-            self.connection['to_user'] = sys.stdout
-            self.connection['from_user'] = sys.stdin
-        logger.info('--connected to user--')
-        self.connection['to_user'].write('\n')  # give a clean line to work on
-        self.connection['active'] = True
-
-    def _disconnect(self):
-        logger.info('--disconnecting from user--')
-        try:
-            if self.connection.get('original termios', None):
-                logger.info('restore original termios line discipline')
-                termios.tcsetattr(
-                    self.connection['to_user'], termios.TCSANOW,
-                    self.connection['original termios'])
-            if self.connection.get('tpgrp stopped', None) is True:
-                logger.info(
-                    'send SIGCONT to pgrp %d' % self.connection['tpgrp'])
-                #os.killpg(self.connection['tpgrp'], signal.SIGCONT)
-                os.kill(-self.connection['tpgrp'], signal.SIGCONT)
-            if self.connection.get('to_user', None) not in [None, sys.stdout]:
-                logger.info('close to-user output stream')
-                self.connection['to_user'].close()
-            if self.connection.get('from_user', None) not in [None,sys.stdout]:
-                logger.info('close from-user input stream')
-                self.connection['from_user'].close()
-        finally:
-            self.connection = {'active': False}
-            logger.info('--disconnected from user--')
-
-    def _get_pgrp(self, tty_name):
-        logger.info('find process group contolling %s' % tty_name)
-        proc = '/proc'
-        for name in os.listdir(proc):
-            path = os.path.join(proc, name)
-            if not (self._digit_regexp.match(name) and os.path.isdir(path)):
-                continue  # not a process directory
-            logger.debug('checking process %s' % name)
-            fd_path = os.path.join(path, 'fd', '0')
-            try:
-                link = os.readlink(fd_path)
-            except OSError, e:
-                logger.debug('not our process: %s' % e)
-                continue  # permission denied (not one of our processes)
-            if link != tty_name:
-                logger.debug('wrong tty: %s' % link)
-                continue  # not attached to our target tty
-            stat_path = os.path.join(path, 'stat')
-            stat = open(stat_path, 'r').read()
-            logger.debug('check stat for pgrp: %s' % stat)
-            match = self._tpgrp_regexp.match(stat)
-            assert match != None, stat
-            pgrp = int(match.group(1))
-            logger.info('found pgrp %d for %s' % (pgrp, tty_name))
-            return pgrp
-        raise ValueError(tty_name)
-
-    def _write(self, string):
-        "Write text to the user's terminal."
-        self.connection['to_user'].write(string + '\n')
-        self.connection['to_user'].flush()
-
-    def _read(self):
-        "Read and return a line from the user's terminal."
-        # drop trailing newline
-        return self.connection['from_user'].readline()[:-1]
-
-    def _prompt(self, prompt='?', add_colon=True):
-        if add_colon:
-            prompt += ':'
-        self.connection['to_user'].write('%s ' % prompt)
-        self.connection['to_user'].flush()
-        return self._read()
-
-    # Assuan utilities
-
-    def _encode(self, string):
-        """
-
-        >>> p = PinEntry()
-        >>> p._encode('It grew by 5%!\\n')
-        'It grew by 5%25!%0A'
-        """   
-        return self._assuan_encode_regexp.sub(
-            lambda x : self._to_hex(x.group()), string)
-
-    def _decode(self, string):
-        """
-
-        >>> p = PinEntry()
-        >>> p._decode('%22Look out!%22%0AWhere%3F')
-        '"Look out!"\\nWhere?'
-        """
-        return self._assuan_decode_regexp.sub(
-            lambda x : self._from_hex(x.group()), string)
-
-    def _from_hex(self, code):
-        """
-
-        >>> p = PinEntry()
-        >>> p._from_hex('%22')
-        '"'
-        >>> p._from_hex('%0A')
-        '\\n'
-        """
-        return chr(int(code[1:], 16))
-
-    def _to_hex(self, char):
-        """
-
-        >>> p = PinEntry()
-        >>> p._to_hex('"')
-        '%22'
-        >>> p._to_hex('\\n')
-        '%0A'
-        """
-        return '%%%02X' % ord(char)
-
-    # handlers
-
-    def _handle_BYE(self, arg):
-        self.stop = True
-        yield 'OK closing connection'
-
-    def _handle_OPTION(self, arg):
-        # ttytype to set TERM
-        fields = arg.split('=', 1)
-        key = fields[0]
-        if len(fields) > 1:
-            value = fields[1]
-        else:
-            value = True
-        self.options[key] = value
-        yield 'OK'
-
-    def _handle_GETINFO(self, arg):
-        if arg == 'pid':
-            yield 'D %d' % os.getpid()
-        else:
-            raise ValueError(arg)
-        yield 'OK'
-
-    def _handle_SETDESC(self, arg):
-        self.strings['description'] = arg
-        yield 'OK'
-
-    def _handle_SETPROMPT(self, arg):
-        self.strings['prompt'] = arg
-        yield 'OK'
-
-    def _handle_SETERROR(self, arg):
-        self.strings['error'] = arg
-        yield 'OK'
-
-    def _handle_SETTITLE(self, arg):
-        self.strings['title'] = arg
-        yield 'OK'
-
-    def _handle_SETOK(self, arg):
-        self.strings['ok'] = arg
-        yield 'OK'
-
-    def _handle_SETCANCEL(self, arg):
-        self.strings['cancel'] = arg
-        yield 'OK'
-
-    def _handle_SETNOTOK(self, arg):
-        self.strings['not ok'] = arg
-        yield 'OK'
-
-    def _handle_SETQUALITYBAR(self, arg):
-        """Adds a quality indicator to the GETPIN window.  This
-     indicator is updated as the passphrase is typed.  The clients
-     needs to implement an inquiry named "QUALITY" which gets passed
-     the current passpharse (percent-plus escaped) and should send
-     back a string with a single numerical vauelue between -100 and
-     100.  Negative values will be displayed in red.
-
-     If a custom label for the quality bar is required, just add that
-     label as an argument as percent escaped string.  You will need
-     this feature to translate the label because pinentry has no
-     internal gettext except for stock strings from the toolkit library.
-
-     If you want to show a tooltip for the quality bar, you may use
-            C: SETQUALITYBAR_TT string
-            S: OK
-
-     With STRING being a percent escaped string shown as the tooltip.
-     """
-        raise NotImplementedError()
-
-    def _handle_GETPIN(self, arg):
-        try:
-            self._connect()
-            self._write(self.strings['description'])
-            pin = self._prompt(self.strings['prompt'], add_colon=False)
-        finally:
-            self._disconnect()
-        yield 'D %s' % pin
-        yield 'OK'
-
-    def _handle_CONFIRM(self, arg):
-        try:
-            self._connect()
-            self._write(self.strings['description'])
-            self._write('1) '+self.strings['ok'])
-            self._write('2) '+self.strings['not ok'])
-            value = self._prompt('?')
-        finally:
-            self._disconnect()
-        if value == '1':
-            yield 'OK'
-        else:
-            yield 'ASSUAN_Not_Confirmed'
-
-    def _handle_MESSAGE(self, arg):
-        self._write(self.strings['description'])
-        yield 'OK'
-
-    def _handle_CONFIRM(self, args):
-        assert args == '--one-button', args
-        try:
-            self._connect()
-            self._write(self.strings['description'])
-            self._write('1) '+self.strings['ok'])
-            value = self._prompt('?')
-        finally:
-            self._disconnect()
-        assert value == '1', value
-        yield 'OK'
-
-
-if __name__ == '__main__':
-    import argparse
-
-    parser = argparse.ArgumentParser(description=__doc__, version=__version__)
-    parser.add_argument(
-        '-V', '--verbose', action='count', default=0,
-        help='increase verbosity')
-    parser.add_argument(
-        '--display',
-        help='set X display (ignored by this implementation)')
-
-    args = parser.parse_args()
-
-    if args.verbose >= 2:
-        logger.setLevel(logging.DEBUG)
-    elif args.verbose >= 1:
-        logger.setLevel(logging.INFO)
-
-    try:
-        p = PinEntry()
-        p.run()
-    except:
-        logger.error('exiting due to exception:\n%s' %
-                     traceback.format_exc().rstrip())
-        raise
diff --git a/posts/pyassuan.mdwn b/posts/pyassuan.mdwn
new file mode 100644 (file)
index 0000000..a6fd81e
--- /dev/null
@@ -0,0 +1,42 @@
+[[!meta  title="pyassuan"]]
+[[!template id=gitrepo repo=pyassuan]]
+
+I've been trying to come up with a clean way to verify detached
+[[PGP]] signatures from [[Python]].  There are [a number of existing
+approaches to this problem][wrappers].  Many of them call [gpg][]
+using Python's `multiprocessing` module, but to verify detached
+signatures, you need to send the signature in [on a separate file
+descriptor][enable-special-filenames], and handling that in a way safe
+from deadlocks is difficult.  The other approach, taken by [PyMe][] is
+to wrap [GPGME][] using [[SWIG]], which is great as far as it goes,
+but development seems to have stalled, and I find the raw GPGME
+interface excessively complicated.
+
+The GnuPG tools themselves often communicate over sockets using the
+[Assuan protocol][assuan], and I'd already written an Assuan server to
+handle pinentry (originally for my [[gpg-agent]] post, not [part of
+pyassuan][pinentry.py]).  I though it would be natural if there was a
+[[gpgme-agent]] which would handle cryptographic tasks over this
+protocol, which would make the [[pgp-mime]] implementation easier.  In
+order to talk to this (currently hypothetical) agent, I turned my
+pinentry script into the more general pyassuan package.  Now using
+Assuan from Python should be as easy (or easier?) than using it from C
+via [libassuan][].
+
+The `README` is posted on the [PyPI page][pypi].
+
+[wrappers]: http://wiki.python.org/moin/GnuPrivacyGuard
+[gpg]: http://www.gnupg.org/
+[enable-special-filenames]: http://lists.gnupg.org/pipermail/gnupg-devel/2002-November/019343.html
+[PyMe]: http://pyme.sourceforge.net/
+[GPGME]: http://www.gnupg.org/related_software/gpgme/
+[assuan]: http://www.gnupg.org/documentation/manuals/assuan/
+[pinentry.py]: http://git.tremily.us/?p=pyassuan.git;a=blob;f=bin/pinentry.py;hb=HEAD
+[libassuan]: http://www.gnupg.org/related_software/libassuan/
+[pypi]: http://pypi.python.org/pypi/pyassuan/
+
+[[!tag tags/code]]
+[[!tag tags/linux]]
+[[!tag tags/programming]]
+[[!tag tags/pypi]]
+[[!tag tags/python]]