README: update nosetests-3.2 -> nosetests-3.3.
[pyassuan.git] / bin / pinentry.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2012 W. Trevor King <wking@drexel.edu>
4 #
5 # This file is part of pyassuan.
6 #
7 # pyassuan is free software: you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation, either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # pyassuan is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along with
17 # pyassuan.  If not, see <http://www.gnu.org/licenses/>.
18
19 """Simple pinentry program for getting pins from a terminal.
20 """
21
22 import copy as _copy
23 import os as _os
24 import os.path as _os_path
25 import pprint as _pprint
26 import re as _re
27 import signal as _signal
28 import sys as _sys
29 import termios as _termios
30
31 from pyassuan import __version__
32 from pyassuan import server as _server
33 from pyassuan import common as _common
34 from pyassuan import error as _error
35
36
37 class PinEntry (_server.AssuanServer):
38     """pinentry protocol server
39
40     See ``pinentry-0.8.0/doc/pinentry.texi`` at::
41
42       ftp://ftp.gnupg.org/gcrypt/pinentry/
43       http://www.gnupg.org/aegypten/
44
45     for details on the pinentry interface.
46
47     Alternatively, you can just watch the logs and guess ;).  Here's a
48     trace when driven by GnuPG 2.0.17 (libgcrypt 1.4.6)::
49
50       S: OK Your orders please
51       C: OPTION grab
52       S: OK
53       C: OPTION ttyname=/dev/pts/6
54       S: OK
55       C: OPTION ttytype=xterm
56       S: OK
57       C: OPTION lc-ctype=en_US.UTF-8
58       S: OK
59       C: OPTION lc-messages=en_US.UTF-8
60       S: OK
61       C: OPTION default-ok=_OK
62       S: OK
63       C: OPTION default-cancel=_Cancel
64       S: OK
65       C: OPTION default-prompt=PIN:
66       S: OK
67       C: OPTION touch-file=/tmp/gpg-7lElMX/S.gpg-agent
68       S: OK
69       C: GETINFO pid
70       S: D 14309
71       S: OK
72       C: SETDESC Enter passphrase%0A
73       S: OK
74       C: SETPROMPT Passphrase
75       S: OK
76       C: GETPIN
77       S: D testing!
78       S: OK
79       C: BYE
80       S: OK closing connection
81     """
82     _digit_regexp = _re.compile(r'\d+')
83
84     # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
85     _tpgrp_regexp = _re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
86
87     def __init__(self, name='pinentry', strict_options=False,
88                  single_request=True, **kwargs):
89         self.strings = {}
90         self.connection = {}
91         super(PinEntry, self).__init__(
92             name=name, strict_options=strict_options,
93             single_request=single_request, **kwargs)
94         self.valid_options.append('ttyname')
95
96     def reset(self):
97         super(PinEntry, self).reset()
98         self.strings.clear()
99         self.connection.clear()
100
101     # user interface
102
103     def _connect(self):
104         self.logger.info('connecting to user')
105         self.logger.debug('options:\n{}'.format(_pprint.pformat(self.options)))
106         tty_name = self.options.get('ttyname', None)
107         if tty_name:
108             self.connection['tpgrp'] = self._get_pgrp(tty_name)
109             self.logger.info(
110                 'open to-user output stream for {}'.format(tty_name))
111             self.connection['to_user'] = open(tty_name, 'w')
112             self.logger.info(
113                 'open from-user input stream for {}'.format(tty_name))
114             self.connection['from_user'] = open(tty_name, 'r')
115             self.logger.info('get current termios line discipline')
116             self.connection['original termios'] = _termios.tcgetattr(
117                 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
118             new_termios = _copy.deepcopy(self.connection['original termios'])
119             # translate carriage return to newline on input
120             new_termios[0] |= _termios.ICRNL
121             # do not ignore carriage return on input
122             new_termios[0] &= ~_termios.IGNCR
123             # do not echo input characters
124             new_termios[3] &= ~_termios.ECHO
125             # echo input characters
126             #new_termios[3] |= _termios.ECHO
127             # echo the NL character even if ECHO is not set
128             new_termios[3] |= _termios.ECHONL
129             # enable canonical mode
130             new_termios[3] |= _termios.ICANON
131             self.logger.info('adjust termios line discipline')
132             _termios.tcsetattr(
133                 self.connection['to_user'], _termios.TCSANOW, new_termios)
134             self.logger.info('send SIGSTOP to pgrp {}'.format(
135                     self.connection['tpgrp']))
136             #_os.killpg(self.connection['tpgrp'], _signal.SIGSTOP)
137             _os.kill(-self.connection['tpgrp'], _signal.SIGSTOP)
138             self.connection['tpgrp stopped'] = True
139         else:
140             self.logger.info('no TTY name given; use stdin/stdout for I/O')
141             self.connection['to_user'] = _sys.stdout
142             self.connection['from_user'] = _sys.stdin
143         self.logger.info('connected to user')
144         self.connection['to_user'].write('\n')  # give a clean line to work on
145         self.connection['active'] = True
146
147     def _disconnect(self):
148         self.logger.info('disconnecting from user')
149         try:
150             if self.connection.get('original termios', None):
151                 self.logger.info('restore original termios line discipline')
152                 _termios.tcsetattr(
153                     self.connection['to_user'], _termios.TCSANOW,
154                     self.connection['original termios'])
155             if self.connection.get('tpgrp stopped', None) is True:
156                 self.logger.info(
157                     'send SIGCONT to pgrp {}'.format(self.connection['tpgrp']))
158                 #_os.killpg(self.connection['tpgrp'], _signal.SIGCONT)
159                 _os.kill(-self.connection['tpgrp'], _signal.SIGCONT)
160             if self.connection.get('to_user', None) not in [None, _sys.stdout]:
161                 self.logger.info('close to-user output stream')
162                 self.connection['to_user'].close()
163             if self.connection.get('from_user',None) not in [None,_sys.stdout]:
164                 self.logger.info('close from-user input stream')
165                 self.connection['from_user'].close()
166         finally:
167             self.connection = {'active': False}
168             self.logger.info('disconnected from user')
169
170     def _get_pgrp(self, tty_name):
171         self.logger.info('find process group contolling {}'.format(tty_name))
172         proc = '/proc'
173         for name in _os.listdir(proc):
174             path = _os_path.join(proc, name)
175             if not (self._digit_regexp.match(name) and _os_path.isdir(path)):
176                 continue  # not a process directory
177             self.logger.debug('checking process {}'.format(name))
178             fd_path = _os_path.join(path, 'fd', '0')
179             try:
180                 link = _os.readlink(fd_path)
181             except OSError as e:
182                 self.logger.debug('not our process: {}'.format(e))
183                 continue  # permission denied (not one of our processes)
184             if link != tty_name:
185                 self.logger.debug('wrong tty: {}'.format(link))
186                 continue  # not attached to our target tty
187             stat_path = _os_path.join(path, 'stat')
188             stat = open(stat_path, 'r').read()
189             self.logger.debug('check stat for pgrp: {}'.format(stat))
190             match = self._tpgrp_regexp.match(stat)
191             assert match != None, stat
192             pgrp = int(match.group(1))
193             self.logger.info('found pgrp {} for {}'.format(pgrp, tty_name))
194             return pgrp
195         raise ValueError(tty_name)
196
197     def _write(self, string):
198         "Write text to the user's terminal."
199         self.connection['to_user'].write(string + '\n')
200         self.connection['to_user'].flush()
201
202     def _read(self):
203         "Read and return a line from the user's terminal."
204         # drop trailing newline
205         return self.connection['from_user'].readline()[:-1]
206
207     def _prompt(self, prompt='?', add_colon=True):
208         if add_colon:
209             prompt += ':'
210         self.connection['to_user'].write(prompt)
211         self.connection['to_user'].write(' ')
212         self.connection['to_user'].flush()
213         return self._read()
214
215     # assuan handlers
216
217     def _handle_GETINFO(self, arg):
218         if arg == 'pid':
219             yield _common.Response('D', str(_os.getpid()).encode('ascii'))
220         elif arg == 'version':
221             yield _common.Response('D', __version__.encode('ascii'))
222         else:
223             raise _error.AssuanError(message='Invalid parameter')
224         yield _common.Response('OK')
225
226     def _handle_SETDESC(self, arg):
227         self.strings['description'] = arg
228         yield _common.Response('OK')
229
230     def _handle_SETPROMPT(self, arg):
231         self.strings['prompt'] = arg
232         yield _common.Response('OK')
233
234     def _handle_SETERROR(self, arg):
235         self.strings['error'] = arg
236         yield _common.Response('OK')
237
238     def _handle_SETTITLE(self, arg):
239         self.strings['title'] = arg
240         yield _common.Response('OK')
241
242     def _handle_SETOK(self, arg):
243         self.strings['ok'] = arg
244         yield _common.Response('OK')
245
246     def _handle_SETCANCEL(self, arg):
247         self.strings['cancel'] = arg
248         yield _common.Response('OK')
249
250     def _handle_SETNOTOK(self, arg):
251         self.strings['not ok'] = arg
252         yield _common.Response('OK')
253
254     def _handle_SETQUALITYBAR(self, arg):
255         """Adds a quality indicator to the GETPIN window.
256
257         This indicator is updated as the passphrase is typed.  The
258         clients needs to implement an inquiry named "QUALITY" which
259         gets passed the current passphrase (percent-plus escaped) and
260         should send back a string with a single numerical vauelue
261         between -100 and 100.  Negative values will be displayed in
262         red.
263
264         If a custom label for the quality bar is required, just add
265         that label as an argument as percent escaped string.  You will
266         need this feature to translate the label because pinentry has
267         no internal gettext except for stock strings from the toolkit
268         library.
269
270         If you want to show a tooltip for the quality bar, you may use
271
272             C: SETQUALITYBAR_TT string
273             S: OK
274
275         With STRING being a percent escaped string shown as the tooltip.
276
277         Here is a real world example of these commands in use:
278
279             C: SETQUALITYBAR Quality%3a
280             S: OK
281             C: SETQUALITYBAR_TT The quality of the text entered above.%0aPlease ask your administrator for details about the criteria.
282             S: OK
283         """
284         self.strings['qualitybar'] = arg
285         yield _common.Response('OK')
286
287     def _handle_SETQUALITYBAR_TT(self, arg):
288         self.strings['qualitybar_tooltip'] = arg
289         yield _common.Response('OK')
290
291     def _handle_GETPIN(self, arg):
292         try:
293             self._connect()
294             self._write(self.strings['description'])
295             if 'qualitybar' in self.strings:
296                 self._write(self.strings['qualitybar'])
297             pin = self._prompt(self.strings['prompt'], add_colon=False)
298         finally:
299             self._disconnect()
300         yield _common.Response('D', pin.encode('ascii'))
301         yield _common.Response('OK')
302
303     def _handle_CONFIRM(self, arg):
304         try:
305             self._connect()
306             self._write(self.strings['description'])
307             self._write('1) '+self.strings['ok'])
308             self._write('2) '+self.strings['not ok'])
309             value = self._prompt('?')
310         finally:
311             self._disconnect()
312         if value == '1':
313             yield _common.Response('OK')
314         else:
315             raise _error.AssuanError(message='Not confirmed')
316
317     def _handle_MESSAGE(self, arg):
318         self._write(self.strings['description'])
319         yield _common.Response('OK')
320
321     def _handle_CONFIRM(self, args):
322         assert args == '--one-button', args
323         try:
324             self._connect()
325             self._write(self.strings['description'])
326             self._write('1) '+self.strings['ok'])
327             value = self._prompt('?')
328         finally:
329             self._disconnect()
330         assert value == '1', value
331         yield _common.Response('OK')
332
333
334 if __name__ == '__main__':
335     import argparse
336     import logging
337     import traceback
338
339     parser = argparse.ArgumentParser(description=__doc__)
340     parser.add_argument(
341         '-v', '--version', action='version',
342         version='%(prog)s {}'.format(__version__))
343     parser.add_argument(
344         '-V', '--verbose', action='count', default=0,
345         help='increase verbosity')
346     parser.add_argument(
347         '--display',
348         help='set X display (ignored by this implementation)')
349
350     args = parser.parse_args()
351
352     p = PinEntry()
353
354     if args.verbose:
355         p.logger.setLevel(max(
356                 logging.DEBUG, p.logger.level - 10*args.verbose))
357
358     try:
359         p.run()
360     except:
361         p.logger.error(
362             'exiting due to exception:\n{}'.format(
363                 traceback.format_exc().rstrip()))
364         raise