Run update-copyright.py.
[pyassuan.git] / bin / pinentry.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2012 W. Trevor King <wking@drexel.edu>
4 #
5 # This file is part of pyassuan.
6 #
7 # pyassuan is free software: you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation, either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # pyassuan is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along with
17 # pyassuan.  If not, see <http://www.gnu.org/licenses/>.
18
19 """Simple pinentry program for getting pins from a terminal.
20 """
21
22 import copy as _copy
23 import os as _os
24 import os.path as _os_path
25 import pprint as _pprint
26 import re as _re
27 import signal as _signal
28 import sys as _sys
29 import termios as _termios
30
31 from pyassuan import __version__
32 from pyassuan import server as _server
33 from pyassuan import common as _common
34 from pyassuan import error as _error
35
36
37 class PinEntry (_server.AssuanServer):
38     """pinentry protocol server
39
40     See ``pinentry-0.8.0/doc/pinentry.texi`` at::
41
42       ftp://ftp.gnupg.org/gcrypt/pinentry/
43       http://www.gnupg.org/aegypten/
44
45     for details on the pinentry interface.
46
47     Alternatively, you can just watch the logs and guess ;).  Here's a
48     trace when driven by GnuPG 2.0.17 (libgcrypt 1.4.6)::
49
50       S: OK Your orders please
51       C: OPTION grab
52       S: OK
53       C: OPTION ttyname=/dev/pts/6
54       S: OK
55       C: OPTION ttytype=xterm
56       S: OK
57       C: OPTION lc-ctype=en_US.UTF-8
58       S: OK
59       C: OPTION lc-messages=en_US.UTF-8
60       S: OK
61       C: OPTION default-ok=_OK
62       S: OK
63       C: OPTION default-cancel=_Cancel
64       S: OK
65       C: OPTION default-prompt=PIN:
66       S: OK
67       C: OPTION touch-file=/tmp/gpg-7lElMX/S.gpg-agent
68       S: OK
69       C: GETINFO pid
70       S: D 14309
71       S: OK
72       C: SETDESC Enter passphrase%0A
73       S: OK
74       C: SETPROMPT Passphrase
75       S: OK
76       C: GETPIN
77       S: D testing!
78       S: OK
79       C: BYE
80       S: OK closing connection
81
82     Some drivers (e.g. ``gpgme``) have ``gpg-agent`` set ``ttyname``
83     to the terminal running ``gpgme``.  I don't like this, because the
84     pinentry program doesn't always play nicely with whatever is going
85     on in that terminal.  I'd rather have a free terminal that had
86     just been running Bash, and I export ``GPG_TTY`` to point to the
87     desired terminal.  To ignore the requested ``ttyname`` and use
88     whatever is in ``GPG_TTY``, initialize with ``override_ttyname``
89     set to ``True``.
90     """
91     _digit_regexp = _re.compile(r'\d+')
92
93     # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
94     _tpgrp_regexp = _re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
95
96     def __init__(self, name='pinentry', strict_options=False,
97                  single_request=True, override_ttyname=False, **kwargs):
98         self.strings = {}
99         self.connection = {}
100         super(PinEntry, self).__init__(
101             name=name, strict_options=strict_options,
102             single_request=single_request, **kwargs)
103         self.valid_options.append('ttyname')
104         self.override_ttyname = override_ttyname
105
106     def reset(self):
107         super(PinEntry, self).reset()
108         self.strings.clear()
109         self.connection.clear()
110
111     # user interface
112
113     def _connect(self):
114         self.logger.info('connecting to user')
115         self.logger.debug('options:\n{}'.format(_pprint.pformat(self.options)))
116         tty_name = None
117         if self.override_ttyname:
118             tty_name = _os.getenv('GPG_TTY')
119             if tty_name:
120                 self.logger.debug('override ttyname with {}'.format(tty_name))
121             else:
122                 self.logger.debug(
123                     'GPG_TTY not set, fallback to ttyname option')
124         if not tty_name:
125             tty_name = self.options.get('ttyname', None)
126         if tty_name:
127             self.connection['tpgrp'] = self._get_pgrp(tty_name)
128             self.logger.info(
129                 'open to-user output stream for {}'.format(tty_name))
130             self.connection['to_user'] = open(tty_name, 'w')
131             self.logger.info(
132                 'open from-user input stream for {}'.format(tty_name))
133             self.connection['from_user'] = open(tty_name, 'r')
134             self.logger.info('get current termios line discipline')
135             self.connection['original termios'] = _termios.tcgetattr(
136                 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
137             new_termios = _copy.deepcopy(self.connection['original termios'])
138             # translate carriage return to newline on input
139             new_termios[0] |= _termios.ICRNL
140             # do not ignore carriage return on input
141             new_termios[0] &= ~_termios.IGNCR
142             # do not echo input characters
143             new_termios[3] &= ~_termios.ECHO
144             # echo input characters
145             #new_termios[3] |= _termios.ECHO
146             # echo the NL character even if ECHO is not set
147             new_termios[3] |= _termios.ECHONL
148             # enable canonical mode
149             new_termios[3] |= _termios.ICANON
150             self.logger.info('adjust termios line discipline')
151             _termios.tcsetattr(
152                 self.connection['to_user'], _termios.TCSANOW, new_termios)
153             self.logger.info('send SIGSTOP to pgrp {}'.format(
154                     self.connection['tpgrp']))
155             #_os.killpg(self.connection['tpgrp'], _signal.SIGSTOP)
156             _os.kill(-self.connection['tpgrp'], _signal.SIGSTOP)
157             self.connection['tpgrp stopped'] = True
158         else:
159             self.logger.info('no TTY name given; use stdin/stdout for I/O')
160             self.connection['to_user'] = _sys.stdout
161             self.connection['from_user'] = _sys.stdin
162         self.logger.info('connected to user')
163         self.connection['to_user'].write('\n')  # give a clean line to work on
164         self.connection['active'] = True
165
166     def _disconnect(self):
167         self.logger.info('disconnecting from user')
168         try:
169             if self.connection.get('original termios', None):
170                 self.logger.info('restore original termios line discipline')
171                 _termios.tcsetattr(
172                     self.connection['to_user'], _termios.TCSANOW,
173                     self.connection['original termios'])
174             if self.connection.get('tpgrp stopped', None) is True:
175                 self.logger.info(
176                     'send SIGCONT to pgrp {}'.format(self.connection['tpgrp']))
177                 #_os.killpg(self.connection['tpgrp'], _signal.SIGCONT)
178                 _os.kill(-self.connection['tpgrp'], _signal.SIGCONT)
179             if self.connection.get('to_user', None) not in [None, _sys.stdout]:
180                 self.logger.info('close to-user output stream')
181                 self.connection['to_user'].close()
182             if self.connection.get('from_user',None) not in [None,_sys.stdout]:
183                 self.logger.info('close from-user input stream')
184                 self.connection['from_user'].close()
185         finally:
186             self.connection = {'active': False}
187             self.logger.info('disconnected from user')
188
189     def _get_pgrp(self, tty_name):
190         self.logger.info('find process group contolling {}'.format(tty_name))
191         proc = '/proc'
192         for name in _os.listdir(proc):
193             path = _os_path.join(proc, name)
194             if not (self._digit_regexp.match(name) and _os_path.isdir(path)):
195                 continue  # not a process directory
196             self.logger.debug('checking process {}'.format(name))
197             fd_path = _os_path.join(path, 'fd', '0')
198             try:
199                 link = _os.readlink(fd_path)
200             except OSError as e:
201                 self.logger.debug('not our process: {}'.format(e))
202                 continue  # permission denied (not one of our processes)
203             if link != tty_name:
204                 self.logger.debug('wrong tty: {}'.format(link))
205                 continue  # not attached to our target tty
206             stat_path = _os_path.join(path, 'stat')
207             stat = open(stat_path, 'r').read()
208             self.logger.debug('check stat for pgrp: {}'.format(stat))
209             match = self._tpgrp_regexp.match(stat)
210             assert match != None, stat
211             pgrp = int(match.group(1))
212             self.logger.info('found pgrp {} for {}'.format(pgrp, tty_name))
213             return pgrp
214         raise ValueError(tty_name)
215
216     def _write(self, string):
217         "Write text to the user's terminal."
218         self.connection['to_user'].write(string + '\n')
219         self.connection['to_user'].flush()
220
221     def _read(self):
222         "Read and return a line from the user's terminal."
223         # drop trailing newline
224         return self.connection['from_user'].readline()[:-1]
225
226     def _prompt(self, prompt='?', add_colon=True):
227         if add_colon:
228             prompt += ':'
229         self.connection['to_user'].write(prompt)
230         self.connection['to_user'].write(' ')
231         self.connection['to_user'].flush()
232         return self._read()
233
234     # assuan handlers
235
236     def _handle_GETINFO(self, arg):
237         if arg == 'pid':
238             yield _common.Response('D', str(_os.getpid()))
239         elif arg == 'version':
240             yield _common.Response('D', __version__)
241         else:
242             raise _error.AssuanError(message='Invalid parameter')
243         yield _common.Response('OK')
244
245     def _handle_SETDESC(self, arg):
246         self.strings['description'] = arg
247         yield _common.Response('OK')
248
249     def _handle_SETPROMPT(self, arg):
250         self.strings['prompt'] = arg
251         yield _common.Response('OK')
252
253     def _handle_SETERROR(self, arg):
254         self.strings['error'] = arg
255         yield _common.Response('OK')
256
257     def _handle_SETTITLE(self, arg):
258         self.strings['title'] = arg
259         yield _common.Response('OK')
260
261     def _handle_SETOK(self, arg):
262         self.strings['ok'] = arg
263         yield _common.Response('OK')
264
265     def _handle_SETCANCEL(self, arg):
266         self.strings['cancel'] = arg
267         yield _common.Response('OK')
268
269     def _handle_SETNOTOK(self, arg):
270         self.strings['not ok'] = arg
271         yield _common.Response('OK')
272
273     def _handle_SETQUALITYBAR(self, arg):
274         """Adds a quality indicator to the GETPIN window.
275
276         This indicator is updated as the passphrase is typed.  The
277         clients needs to implement an inquiry named "QUALITY" which
278         gets passed the current passpharse (percent-plus escaped) and
279         should send back a string with a single numerical vauelue
280         between -100 and 100.  Negative values will be displayed in
281         red.
282
283         If a custom label for the quality bar is required, just add
284         that label as an argument as percent escaped string.  You will
285         need this feature to translate the label because pinentry has
286         no internal gettext except for stock strings from the toolkit
287         library.
288
289         If you want to show a tooltip for the quality bar, you may use
290
291             C: SETQUALITYBAR_TT string
292             S: OK
293
294         With STRING being a percent escaped string shown as the tooltip.
295         """
296         raise NotImplementedError()
297
298     def _handle_GETPIN(self, arg):
299         try:
300             self._connect()
301             self._write(self.strings['description'])
302             pin = self._prompt(self.strings['prompt'], add_colon=False)
303         finally:
304             self._disconnect()
305         yield _common.Response('D', pin)
306         yield _common.Response('OK')
307
308     def _handle_CONFIRM(self, arg):
309         try:
310             self._connect()
311             self._write(self.strings['description'])
312             self._write('1) '+self.strings['ok'])
313             self._write('2) '+self.strings['not ok'])
314             value = self._prompt('?')
315         finally:
316             self._disconnect()
317         if value == '1':
318             yield _common.Response('OK')
319         else:
320             raise _error.AssuanError(message='Not confirmed')
321
322     def _handle_MESSAGE(self, arg):
323         self._write(self.strings['description'])
324         yield _common.Response('OK')
325
326     def _handle_CONFIRM(self, args):
327         assert args == '--one-button', args
328         try:
329             self._connect()
330             self._write(self.strings['description'])
331             self._write('1) '+self.strings['ok'])
332             value = self._prompt('?')
333         finally:
334             self._disconnect()
335         assert value == '1', value
336         yield _common.Response('OK')
337
338
339 if __name__ == '__main__':
340     import argparse
341     import logging
342     import traceback
343
344     parser = argparse.ArgumentParser(description=__doc__, version=__version__)
345     parser.add_argument(
346         '-V', '--verbose', action='count', default=0,
347         help='increase verbosity')
348     parser.add_argument(
349         '--display',
350         help='set X display (ignored by this implementation)')
351
352     args = parser.parse_args()
353
354     p = PinEntry(override_ttyname=True)
355
356     if args.verbose:
357         p.logger.setLevel(max(
358                 logging.DEBUG, p.logger.level - 10*args.verbose))
359
360     try:
361         p.run()
362     except:
363         p.logger.error(
364             'exiting due to exception:\n{}'.format(
365                 traceback.format_exc().rstrip()))
366         raise