8ed3b4dcbb21ed539ff84bf44e0d94dfb1570044
[pyassuan.git] / bin / pinentry.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2012 W. Trevor King <wking@drexel.edu>
4 #
5 # This file is part of pyassuan.
6 #
7 # pyassuan is free software: you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation, either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # pyassuan is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along with
17 # pyassuan.  If not, see <http://www.gnu.org/licenses/>.
18
19 """Simple pinentry program for getting pins from a terminal.
20 """
21
22 import copy as _copy
23 import os as _os
24 import os.path as _os_path
25 import pprint as _pprint
26 import re as _re
27 import signal as _signal
28 import sys as _sys
29 import termios as _termios
30
31 from pyassuan import __version__
32 from pyassuan import server as _server
33 from pyassuan import common as _common
34 from pyassuan import error as _error
35
36
37 class PinEntry (_server.AssuanServer):
38     """pinentry protocol server
39
40     See ``pinentry-0.8.0/doc/pinentry.texi`` at::
41
42       ftp://ftp.gnupg.org/gcrypt/pinentry/
43       http://www.gnupg.org/aegypten/
44
45     for details on the pinentry interface.
46
47     Alternatively, you can just watch the logs and guess ;).  Here's a
48     trace when driven by GnuPG 2.0.28 (libgcrypt 1.6.3)::
49
50       S: OK Your orders please
51       C: OPTION grab
52       S: OK
53       C: OPTION ttyname=/dev/pts/6
54       S: OK
55       C: OPTION ttytype=xterm
56       S: OK
57       C: OPTION lc-ctype=en_US.UTF-8
58       S: OK
59       C: OPTION lc-messages=en_US.UTF-8
60       S: OK
61       C: OPTION allow-external-password-cache
62       S: OK
63       C: OPTION default-ok=_OK
64       S: OK
65       C: OPTION default-cancel=_Cancel
66       S: OK
67       C: OPTION default-yes=_Yes
68       S: OK
69       C: OPTION default-no=_No
70       S: OK
71       C: OPTION default-prompt=PIN:
72       S: OK
73       C: OPTION default-pwmngr=_Save in password manager
74       S: OK
75       C: OPTION default-cf-visi=Do you really want to make your passphrase visible on the screen?
76       S: OK
77       C: OPTION default-tt-visi=Make passphrase visible
78       S: OK
79       C: OPTION default-tt-hide=Hide passphrase
80       S: OK
81       C: GETINFO pid
82       S: D 14309
83       S: OK
84       C: SETKEYINFO u/S9464F2C2825D2FE3
85       S: OK
86       C: SETDESC Enter passphrase%0A
87       S: OK
88       C: SETPROMPT Passphrase
89       S: OK
90       C: GETPIN
91       S: D testing!
92       S: OK
93       C: BYE
94       S: OK closing connection
95     """
96     _digit_regexp = _re.compile(r'\d+')
97
98     # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
99     _tpgrp_regexp = _re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
100
101     def __init__(self, name='pinentry', strict_options=False,
102                  single_request=True, **kwargs):
103         self.strings = {}
104         self.connection = {}
105         super(PinEntry, self).__init__(
106             name=name, strict_options=strict_options,
107             single_request=single_request, **kwargs)
108         self.valid_options.append('ttyname')
109
110     def reset(self):
111         super(PinEntry, self).reset()
112         self.strings.clear()
113         self.connection.clear()
114
115     # user interface
116
117     def _connect(self):
118         self.logger.info('connecting to user')
119         self.logger.debug('options:\n{}'.format(_pprint.pformat(self.options)))
120         tty_name = self.options.get('ttyname', None)
121         if tty_name:
122             self.connection['tpgrp'] = self._get_pgrp(tty_name)
123             self.logger.info(
124                 'open to-user output stream for {}'.format(tty_name))
125             self.connection['to_user'] = open(tty_name, 'w')
126             self.logger.info(
127                 'open from-user input stream for {}'.format(tty_name))
128             self.connection['from_user'] = open(tty_name, 'r')
129             self.logger.info('get current termios line discipline')
130             self.connection['original termios'] = _termios.tcgetattr(
131                 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
132             new_termios = _copy.deepcopy(self.connection['original termios'])
133             # translate carriage return to newline on input
134             new_termios[0] |= _termios.ICRNL
135             # do not ignore carriage return on input
136             new_termios[0] &= ~_termios.IGNCR
137             # do not echo input characters
138             new_termios[3] &= ~_termios.ECHO
139             # echo input characters
140             #new_termios[3] |= _termios.ECHO
141             # echo the NL character even if ECHO is not set
142             new_termios[3] |= _termios.ECHONL
143             # enable canonical mode
144             new_termios[3] |= _termios.ICANON
145             self.logger.info('adjust termios line discipline')
146             _termios.tcsetattr(
147                 self.connection['to_user'], _termios.TCSANOW, new_termios)
148             self.logger.info('send SIGSTOP to pgrp {}'.format(
149                     self.connection['tpgrp']))
150             #_os.killpg(self.connection['tpgrp'], _signal.SIGSTOP)
151             _os.kill(-self.connection['tpgrp'], _signal.SIGSTOP)
152             self.connection['tpgrp stopped'] = True
153         else:
154             self.logger.info('no TTY name given; use stdin/stdout for I/O')
155             self.connection['to_user'] = _sys.stdout
156             self.connection['from_user'] = _sys.stdin
157         self.logger.info('connected to user')
158         self.connection['to_user'].write('\n')  # give a clean line to work on
159         self.connection['active'] = True
160
161     def _disconnect(self):
162         self.logger.info('disconnecting from user')
163         try:
164             if self.connection.get('original termios', None):
165                 self.logger.info('restore original termios line discipline')
166                 _termios.tcsetattr(
167                     self.connection['to_user'], _termios.TCSANOW,
168                     self.connection['original termios'])
169             if self.connection.get('tpgrp stopped', None) is True:
170                 self.logger.info(
171                     'send SIGCONT to pgrp {}'.format(self.connection['tpgrp']))
172                 #_os.killpg(self.connection['tpgrp'], _signal.SIGCONT)
173                 _os.kill(-self.connection['tpgrp'], _signal.SIGCONT)
174             if self.connection.get('to_user', None) not in [None, _sys.stdout]:
175                 self.logger.info('close to-user output stream')
176                 self.connection['to_user'].close()
177             if self.connection.get('from_user',None) not in [None,_sys.stdout]:
178                 self.logger.info('close from-user input stream')
179                 self.connection['from_user'].close()
180         finally:
181             self.connection = {'active': False}
182             self.logger.info('disconnected from user')
183
184     def _get_pgrp(self, tty_name):
185         self.logger.info('find process group contolling {}'.format(tty_name))
186         proc = '/proc'
187         for name in _os.listdir(proc):
188             path = _os_path.join(proc, name)
189             if not (self._digit_regexp.match(name) and _os_path.isdir(path)):
190                 continue  # not a process directory
191             self.logger.debug('checking process {}'.format(name))
192             fd_path = _os_path.join(path, 'fd', '0')
193             try:
194                 link = _os.readlink(fd_path)
195             except OSError as e:
196                 self.logger.debug('not our process: {}'.format(e))
197                 continue  # permission denied (not one of our processes)
198             if link != tty_name:
199                 self.logger.debug('wrong tty: {}'.format(link))
200                 continue  # not attached to our target tty
201             stat_path = _os_path.join(path, 'stat')
202             stat = open(stat_path, 'r').read()
203             self.logger.debug('check stat for pgrp: {}'.format(stat))
204             match = self._tpgrp_regexp.match(stat)
205             assert match != None, stat
206             pgrp = int(match.group(1))
207             self.logger.info('found pgrp {} for {}'.format(pgrp, tty_name))
208             return pgrp
209         raise ValueError(tty_name)
210
211     def _write(self, string):
212         "Write text to the user's terminal."
213         self.connection['to_user'].write(string + '\n')
214         self.connection['to_user'].flush()
215
216     def _read(self):
217         "Read and return a line from the user's terminal."
218         # drop trailing newline
219         return self.connection['from_user'].readline()[:-1]
220
221     def _prompt(self, prompt='?', error=None, add_colon=True):
222         if add_colon:
223             prompt += ':'
224         if error:
225             self.connection['to_user'].write(error)
226             self.connection['to_user'].write('\n')
227         self.connection['to_user'].write(prompt)
228         self.connection['to_user'].write(' ')
229         self.connection['to_user'].flush()
230         return self._read()
231
232     # assuan handlers
233
234     def _handle_GETINFO(self, arg):
235         if arg == 'pid':
236             yield _common.Response('D', str(_os.getpid()).encode('ascii'))
237         elif arg == 'version':
238             yield _common.Response('D', __version__.encode('ascii'))
239         else:
240             raise _error.AssuanError(message='Invalid parameter')
241         yield _common.Response('OK')
242
243     def _handle_SETKEYINFO(self, arg):
244         self.strings['key info'] = arg
245         yield _common.Response('OK')
246
247     def _handle_CLEARPASSPHRASE(self, arg):
248         yield _common.Response('OK')
249
250     def _handle_SETDESC(self, arg):
251         self.strings['description'] = arg
252         yield _common.Response('OK')
253
254     def _handle_SETPROMPT(self, arg):
255         self.strings['prompt'] = arg
256         yield _common.Response('OK')
257
258     def _handle_SETERROR(self, arg):
259         self.strings['error'] = arg
260         yield _common.Response('OK')
261
262     def _handle_SETTITLE(self, arg):
263         self.strings['title'] = arg
264         yield _common.Response('OK')
265
266     def _handle_SETOK(self, arg):
267         self.strings['ok'] = arg
268         yield _common.Response('OK')
269
270     def _handle_SETCANCEL(self, arg):
271         self.strings['cancel'] = arg
272         yield _common.Response('OK')
273
274     def _handle_SETNOTOK(self, arg):
275         self.strings['not ok'] = arg
276         yield _common.Response('OK')
277
278     def _handle_SETQUALITYBAR(self, arg):
279         """Adds a quality indicator to the GETPIN window.
280
281         This indicator is updated as the passphrase is typed.  The
282         clients needs to implement an inquiry named "QUALITY" which
283         gets passed the current passphrase (percent-plus escaped) and
284         should send back a string with a single numerical vauelue
285         between -100 and 100.  Negative values will be displayed in
286         red.
287
288         If a custom label for the quality bar is required, just add
289         that label as an argument as percent escaped string.  You will
290         need this feature to translate the label because pinentry has
291         no internal gettext except for stock strings from the toolkit
292         library.
293
294         If you want to show a tooltip for the quality bar, you may use
295
296             C: SETQUALITYBAR_TT string
297             S: OK
298
299         With STRING being a percent escaped string shown as the tooltip.
300
301         Here is a real world example of these commands in use:
302
303             C: SETQUALITYBAR Quality%3a
304             S: OK
305             C: SETQUALITYBAR_TT The quality of the text entered above.%0aPlease ask your administrator for details about the criteria.
306             S: OK
307         """
308         self.strings['qualitybar'] = arg
309         yield _common.Response('OK')
310
311     def _handle_SETQUALITYBAR_TT(self, arg):
312         self.strings['qualitybar_tooltip'] = arg
313         yield _common.Response('OK')
314
315     def _handle_GETPIN(self, arg):
316         try:
317             self._connect()
318             self._write(self.strings['description'])
319             if 'key info' in self.strings:
320                 self._write('key: {}'.format(self.strings['key info']))
321             if 'qualitybar' in self.strings:
322                 self._write(self.strings['qualitybar'])
323             pin = self._prompt(
324                 prompt=self.strings['prompt'],
325                 error=self.strings.get('error'),
326                 add_colon=False)
327         finally:
328             self._disconnect()
329         yield _common.Response('D', pin.encode('ascii'))
330         yield _common.Response('OK')
331
332     def _handle_CONFIRM(self, arg):
333         try:
334             self._connect()
335             self._write(self.strings['description'])
336             self._write('1) '+self.strings['ok'])
337             self._write('2) '+self.strings['not ok'])
338             value = self._prompt('?')
339         finally:
340             self._disconnect()
341         if value == '1':
342             yield _common.Response('OK')
343         else:
344             raise _error.AssuanError(message='Not confirmed')
345
346     def _handle_MESSAGE(self, arg):
347         self._write(self.strings['description'])
348         yield _common.Response('OK')
349
350     def _handle_CONFIRM(self, args):
351         assert args == '--one-button', args
352         try:
353             self._connect()
354             self._write(self.strings['description'])
355             self._write('1) '+self.strings['ok'])
356             value = self._prompt('?')
357         finally:
358             self._disconnect()
359         assert value == '1', value
360         yield _common.Response('OK')
361
362
363 if __name__ == '__main__':
364     import argparse
365     import logging
366     import traceback
367
368     parser = argparse.ArgumentParser(description=__doc__)
369     parser.add_argument(
370         '-v', '--version', action='version',
371         version='%(prog)s {}'.format(__version__))
372     parser.add_argument(
373         '-V', '--verbose', action='count', default=0,
374         help='increase verbosity')
375     parser.add_argument(
376         '--display',
377         help='set X display (ignored by this implementation)')
378
379     args = parser.parse_args()
380
381     p = PinEntry()
382
383     if args.verbose:
384         p.logger.setLevel(max(
385                 logging.DEBUG, p.logger.level - 10*args.verbose))
386
387     try:
388         p.run()
389     except:
390         p.logger.error(
391             'exiting due to exception:\n{}'.format(
392                 traceback.format_exc().rstrip()))
393         raise