Change my email address back to drexel (Verizon blocks incoming port 25).
[blog.git] / posts / gpg-agent / pinentry.py
1 #!/usr/bin/env python
2 #
3 # Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Simple pinentry program for getting pins from a terminal.
20 """
21
22 import copy
23 import logging
24 import logging.handlers
25 import os
26 import os.path
27 import pprint
28 import re
29 import signal
30 import sys
31 import termios
32 import traceback
33
34
35 __version__ = '0.1'
36
37
38 # create logger
39 logger = logging.getLogger('pinentry')
40 logger.setLevel(logging.WARNING)
41 _h = logging.handlers.SysLogHandler(address='/dev/log')
42 _h.setLevel(logging.DEBUG)
43 _f = logging.Formatter('%(name)s: %(levelname)s: %(message)s')
44 _h.setFormatter(_f)
45 logger.addHandler(_h)
46 del _h, _f
47
48
49 class PinEntry (object):
50     """pinentry protocol server
51
52     See the `Assuan manual`_ for a description of the protocol.
53
54     .. _Assuan manual: http://www.gnupg.org/documentation/manuals/assuan/
55     """
56     _digit_regexp = re.compile(r'\d+')
57
58     # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
59     _tpgrp_regexp = re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
60
61     _assuan_encode_regexp = re.compile(
62         '(' + '|'.join(['%', '\r', '\n']) + ')')
63     _assuan_decode_regexp = re.compile('(%[0-9A-F]{2})')
64
65     def __init__(self):
66         self.stop = False
67         self.options = {}
68         self.strings = {}
69         self.connection = {}
70
71     def run(self):
72         logger.info('---opening pinentry---')
73         logger.info('OK Your orders please')
74         sys.stdout.write('OK Your orders please\n')
75         sys.stdout.flush()
76         try:
77             while not self.stop:
78                 line = sys.stdin.readline()
79                 if not line:
80                     break  # EOF
81                 line = line.rstrip()  # dangerous?
82                 logger.info(line)
83                 line = self._decode(line)
84                 fields = line.split(' ', 1)
85                 cmd = fields[0]
86                 if len(fields) > 1:
87                     arg = fields[1]
88                 else:
89                     arg = None
90                 handle = getattr(self, '_handle_%s' % cmd, None)
91                 if handle:
92                     for response in handle(arg):
93                         response = self._encode(response)
94                         logger.info(response)
95                         sys.stdout.write(response+'\n')
96                         try:
97                             sys.stdout.flush()
98                         except IOError:
99                             if not self.stop:
100                                 raise
101                 else:
102                     raise ValueError(line)
103         finally:
104             logger.info('---closing pinentry---')
105
106     # user interface
107
108     def _connect(self):
109         logger.info('--connecting to user--')
110         logger.debug('options:\n%s' % pprint.pformat(self.options))
111         tty_name = self.options.get('ttyname', None)
112         if tty_name:
113             self.connection['tpgrp'] = self._get_pgrp(tty_name)
114             logger.info('open to-user output stream for %s' % tty_name)
115             self.connection['to_user'] = open(tty_name, 'w')
116             logger.info('open from-user input stream for %s' % tty_name)
117             self.connection['from_user'] = open(tty_name, 'r')
118             logger.info('get current termios line discipline')
119             self.connection['original termios'] = termios.tcgetattr(
120                 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
121             new_termios = copy.deepcopy(self.connection['original termios'])
122             # translate carriage return to newline on input
123             new_termios[0] |= termios.ICRNL
124             # do not ignore carriage return on input
125             new_termios[0] &= ~termios.IGNCR
126             # do not echo input characters
127             new_termios[3] &= ~termios.ECHO
128             # echo input characters
129             #new_termios[3] |= termios.ECHO
130             # echo the NL character even if ECHO is not set
131             new_termios[3] |= termios.ECHONL
132             # enable canonical mode
133             new_termios[3] |= termios.ICANON
134             logger.info('adjust termios line discipline')
135             termios.tcsetattr(
136                 self.connection['to_user'], termios.TCSANOW, new_termios)
137             logger.info('send SIGSTOP to pgrp %d' % self.connection['tpgrp'])
138             #os.killpg(self.connection['tpgrp'], signal.SIGSTOP)
139             os.kill(-self.connection['tpgrp'], signal.SIGSTOP)
140             self.connection['tpgrp stopped'] = True
141         else:
142             logger.info('no TTY name given; use stdin/stdout for I/O')
143             self.connection['to_user'] = sys.stdout
144             self.connection['from_user'] = sys.stdin
145         logger.info('--connected to user--')
146         self.connection['to_user'].write('\n')  # give a clean line to work on
147         self.connection['active'] = True
148
149     def _disconnect(self):
150         logger.info('--disconnecting from user--')
151         try:
152             if self.connection.get('original termios', None):
153                 logger.info('restore original termios line discipline')
154                 termios.tcsetattr(
155                     self.connection['to_user'], termios.TCSANOW,
156                     self.connection['original termios'])
157             if self.connection.get('tpgrp stopped', None) is True:
158                 logger.info(
159                     'send SIGCONT to pgrp %d' % self.connection['tpgrp'])
160                 #os.killpg(self.connection['tpgrp'], signal.SIGCONT)
161                 os.kill(-self.connection['tpgrp'], signal.SIGCONT)
162             if self.connection.get('to_user', None) not in [None, sys.stdout]:
163                 logger.info('close to-user output stream')
164                 self.connection['to_user'].close()
165             if self.connection.get('from_user', None) not in [None,sys.stdout]:
166                 logger.info('close from-user input stream')
167                 self.connection['from_user'].close()
168         finally:
169             self.connection = {'active': False}
170             logger.info('--disconnected from user--')
171
172     def _get_pgrp(self, tty_name):
173         logger.info('find process group contolling %s' % tty_name)
174         proc = '/proc'
175         for name in os.listdir(proc):
176             path = os.path.join(proc, name)
177             if not (self._digit_regexp.match(name) and os.path.isdir(path)):
178                 continue  # not a process directory
179             logger.debug('checking process %s' % name)
180             fd_path = os.path.join(path, 'fd', '0')
181             try:
182                 link = os.readlink(fd_path)
183             except OSError, e:
184                 logger.debug('not our process: %s' % e)
185                 continue  # permission denied (not one of our processes)
186             if link != tty_name:
187                 logger.debug('wrong tty: %s' % link)
188                 continue  # not attached to our target tty
189             stat_path = os.path.join(path, 'stat')
190             stat = open(stat_path, 'r').read()
191             logger.debug('check stat for pgrp: %s' % stat)
192             match = self._tpgrp_regexp.match(stat)
193             assert match != None, stat
194             pgrp = int(match.group(1))
195             logger.info('found pgrp %d for %s' % (pgrp, tty_name))
196             return pgrp
197         raise ValueError(tty_name)
198
199     def _write(self, string):
200         "Write text to the user's terminal."
201         self.connection['to_user'].write(string + '\n')
202         self.connection['to_user'].flush()
203
204     def _read(self):
205         "Read and return a line from the user's terminal."
206         # drop trailing newline
207         return self.connection['from_user'].readline()[:-1]
208
209     def _prompt(self, prompt='?', add_colon=True):
210         if add_colon:
211             prompt += ':'
212         self.connection['to_user'].write('%s ' % prompt)
213         self.connection['to_user'].flush()
214         return self._read()
215
216     # Assuan utilities
217
218     def _encode(self, string):
219         """
220
221         >>> p = PinEntry()
222         >>> p._encode('It grew by 5%!\\n')
223         'It grew by 5%25!%0A'
224         """   
225         return self._assuan_encode_regexp.sub(
226             lambda x : self._to_hex(x.group()), string)
227
228     def _decode(self, string):
229         """
230
231         >>> p = PinEntry()
232         >>> p._decode('%22Look out!%22%0AWhere%3F')
233         '"Look out!"\\nWhere?'
234         """
235         return self._assuan_decode_regexp.sub(
236             lambda x : self._from_hex(x.group()), string)
237
238     def _from_hex(self, code):
239         """
240
241         >>> p = PinEntry()
242         >>> p._from_hex('%22')
243         '"'
244         >>> p._from_hex('%0A')
245         '\\n'
246         """
247         return chr(int(code[1:], 16))
248
249     def _to_hex(self, char):
250         """
251
252         >>> p = PinEntry()
253         >>> p._to_hex('"')
254         '%22'
255         >>> p._to_hex('\\n')
256         '%0A'
257         """
258         return '%%%02X' % ord(char)
259
260     # handlers
261
262     def _handle_BYE(self, arg):
263         self.stop = True
264         yield 'OK closing connection'
265
266     def _handle_OPTION(self, arg):
267         # ttytype to set TERM
268         fields = arg.split('=', 1)
269         key = fields[0]
270         if len(fields) > 1:
271             value = fields[1]
272         else:
273             value = True
274         self.options[key] = value
275         yield 'OK'
276
277     def _handle_GETINFO(self, arg):
278         if arg == 'pid':
279             yield 'D %d' % os.getpid()
280         else:
281             raise ValueError(arg)
282         yield 'OK'
283
284     def _handle_SETDESC(self, arg):
285         self.strings['description'] = arg
286         yield 'OK'
287
288     def _handle_SETPROMPT(self, arg):
289         self.strings['prompt'] = arg
290         yield 'OK'
291
292     def _handle_SETERROR(self, arg):
293         self.strings['error'] = arg
294         yield 'OK'
295
296     def _handle_SETTITLE(self, arg):
297         self.strings['title'] = arg
298         yield 'OK'
299
300     def _handle_SETOK(self, arg):
301         self.strings['ok'] = arg
302         yield 'OK'
303
304     def _handle_SETCANCEL(self, arg):
305         self.strings['cancel'] = arg
306         yield 'OK'
307
308     def _handle_SETNOTOK(self, arg):
309         self.strings['not ok'] = arg
310         yield 'OK'
311
312     def _handle_SETQUALITYBAR(self, arg):
313         """Adds a quality indicator to the GETPIN window.  This
314      indicator is updated as the passphrase is typed.  The clients
315      needs to implement an inquiry named "QUALITY" which gets passed
316      the current passpharse (percent-plus escaped) and should send
317      back a string with a single numerical vauelue between -100 and
318      100.  Negative values will be displayed in red.
319
320      If a custom label for the quality bar is required, just add that
321      label as an argument as percent escaped string.  You will need
322      this feature to translate the label because pinentry has no
323      internal gettext except for stock strings from the toolkit library.
324
325      If you want to show a tooltip for the quality bar, you may use
326             C: SETQUALITYBAR_TT string
327             S: OK
328
329      With STRING being a percent escaped string shown as the tooltip.
330      """
331         raise NotImplementedError()
332
333     def _handle_GETPIN(self, arg):
334         try:
335             self._connect()
336             self._write(self.strings['description'])
337             pin = self._prompt(self.strings['prompt'], add_colon=False)
338         finally:
339             self._disconnect()
340         yield 'D %s' % pin
341         yield 'OK'
342
343     def _handle_CONFIRM(self, arg):
344         try:
345             self._connect()
346             self._write(self.strings['description'])
347             self._write('1) '+self.strings['ok'])
348             self._write('2) '+self.strings['not ok'])
349             value = self._prompt('?')
350         finally:
351             self._disconnect()
352         if value == '1':
353             yield 'OK'
354         else:
355             yield 'ASSUAN_Not_Confirmed'
356
357     def _handle_MESSAGE(self, arg):
358         self._write(self.strings['description'])
359         yield 'OK'
360
361     def _handle_CONFIRM(self, args):
362         assert args == '--one-button', args
363         try:
364             self._connect()
365             self._write(self.strings['description'])
366             self._write('1) '+self.strings['ok'])
367             value = self._prompt('?')
368         finally:
369             self._disconnect()
370         assert value == '1', value
371         yield 'OK'
372
373
374 if __name__ == '__main__':
375     import argparse
376
377     parser = argparse.ArgumentParser(description=__doc__, version=__version__)
378     parser.add_argument(
379         '-V', '--verbose', action='count', default=0,
380         help='increase verbosity')
381     parser.add_argument(
382         '--display',
383         help='set X display (ignored by this implementation)')
384
385     args = parser.parse_args()
386
387     if args.verbose >= 2:
388         logger.setLevel(logging.DEBUG)
389     elif args.verbose >= 1:
390         logger.setLevel(logging.INFO)
391
392     try:
393         p = PinEntry()
394         p.run()
395     except:
396         logger.error('exiting due to exception:\n%s' %
397                      traceback.format_exc().rstrip())
398         raise