Fix a couple of syntax goofs.
[pyassuan.git] / bin / pinentry.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program.  If not, see
17 # <http://www.gnu.org/licenses/>.
18
19 """Simple pinentry program for getting pins from a terminal.
20 """
21
22 import copy as _copy
23 import os as _os
24 import os.path as _os_path
25 import pprint as _pprint
26 import re as _re
27 import signal as _signal
28 import sys as _sys
29 import termios as _termios
30
31 from pyassuan import __version__
32 from pyassuan import server as _server
33 from pyassuan import common as _common
34 from pyassuan import error as _error
35
36
37 class PinEntry (_server.AssuanServer):
38     """pinentry protocol server
39
40     See ``pinentry-0.8.0/doc/pinentry.texi`` at::
41
42       ftp://ftp.gnupg.org/gcrypt/pinentry/
43       http://www.gnupg.org/aegypten/
44
45     for details on the pinentry interface.
46
47     Alternatively, you can just watch the logs and guess ;).  Here's a
48     trace when driven by GnuPG 2.0.17 (libgcrypt 1.4.6)::
49
50       S: OK Your orders please
51       C: OPTION grab
52       S: OK
53       C: OPTION ttyname=/dev/pts/6
54       S: OK
55       C: OPTION ttytype=xterm
56       S: OK
57       C: OPTION lc-ctype=en_US.UTF-8
58       S: OK
59       C: OPTION lc-messages=en_US.UTF-8
60       S: OK
61       C: OPTION default-ok=_OK
62       S: OK
63       C: OPTION default-cancel=_Cancel
64       S: OK
65       C: OPTION default-prompt=PIN:
66       S: OK
67       C: OPTION touch-file=/tmp/gpg-7lElMX/S.gpg-agent
68       S: OK
69       C: GETINFO pid
70       S: D 14309
71       S: OK
72       C: SETDESC Enter passphrase%0A
73       S: OK
74       C: SETPROMPT Passphrase
75       S: OK
76       C: GETPIN
77       S: D testing!
78       S: OK
79       C: BYE
80       S: OK closing connection
81
82     Some drivers (e.g. ``gpgme``) have ``gpg-agent`` set ``ttyname``
83     to the terminal running ``gpgme``.  I don't like this, because the
84     pinentry program doesn't always play nicely with whatever is going
85     on in that terminal.  I'd rather have a free terminal that had
86     just been running Bash, and I export ``GPG_TTY`` to point to the
87     desired terminal.  To ignore the requested ``ttyname`` and use
88     whatever is in ``GPG_TTY``, initialize with ``override_ttyname``
89     set to ``True``.
90     """
91     _digit_regexp = _re.compile(r'\d+')
92
93     # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
94     _tpgrp_regexp = _re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
95
96     def __init__(self, name='pinentry', strict_options=False,
97                  single_request=True, override_ttyname=False, **kwargs):
98         self.strings = {}
99         self.connection = {}
100         super(PinEntry, self).__init__(
101             name=name, strict_options=strict_options,
102             single_request=single_request, **kwargs)
103         self.valid_options.append('ttyname')
104         self.override_ttyname = override_ttyname
105
106     def reset(self):
107         super(PinEntry, self).reset()
108         self.strings.clear()
109         self.connection.clear()
110
111     # user interface
112
113     def _connect(self):
114         self.logger.info('connecting to user')
115         self.logger.debug('options:\n{}'.format(_pprint.pformat(self.options)))
116         tty_name = None
117         if self.override_ttyname:
118             tty_name = _os.getenv('TTY_NAME')
119         if not tty_name:  # override not requested, or fall back on undefined
120             tty_name = self.options.get('ttyname', None)
121         if tty_name:
122             self.connection['tpgrp'] = self._get_pgrp(tty_name)
123             self.logger.info(
124                 'open to-user output stream for {}'.format(tty_name))
125             self.connection['to_user'] = open(tty_name, 'w')
126             self.logger.info(
127                 'open from-user input stream for {}'.format(tty_name))
128             self.connection['from_user'] = open(tty_name, 'r')
129             self.logger.info('get current termios line discipline')
130             self.connection['original termios'] = _termios.tcgetattr(
131                 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
132             new_termios = _copy.deepcopy(self.connection['original termios'])
133             # translate carriage return to newline on input
134             new_termios[0] |= _termios.ICRNL
135             # do not ignore carriage return on input
136             new_termios[0] &= ~_termios.IGNCR
137             # do not echo input characters
138             new_termios[3] &= ~_termios.ECHO
139             # echo input characters
140             #new_termios[3] |= _termios.ECHO
141             # echo the NL character even if ECHO is not set
142             new_termios[3] |= _termios.ECHONL
143             # enable canonical mode
144             new_termios[3] |= _termios.ICANON
145             self.logger.info('adjust termios line discipline')
146             _termios.tcsetattr(
147                 self.connection['to_user'], _termios.TCSANOW, new_termios)
148             self.logger.info('send SIGSTOP to pgrp {}'.format(
149                     self.connection['tpgrp']))
150             #_os.killpg(self.connection['tpgrp'], _signal.SIGSTOP)
151             _os.kill(-self.connection['tpgrp'], _signal.SIGSTOP)
152             self.connection['tpgrp stopped'] = True
153         else:
154             self.logger.info('no TTY name given; use stdin/stdout for I/O')
155             self.connection['to_user'] = _sys.stdout
156             self.connection['from_user'] = _sys.stdin
157         self.logger.info('connected to user')
158         self.connection['to_user'].write('\n')  # give a clean line to work on
159         self.connection['active'] = True
160
161     def _disconnect(self):
162         self.logger.info('disconnecting from user')
163         try:
164             if self.connection.get('original termios', None):
165                 self.logger.info('restore original termios line discipline')
166                 _termios.tcsetattr(
167                     self.connection['to_user'], _termios.TCSANOW,
168                     self.connection['original termios'])
169             if self.connection.get('tpgrp stopped', None) is True:
170                 self.logger.info(
171                     'send SIGCONT to pgrp {}'.format(self.connection['tpgrp']))
172                 #_os.killpg(self.connection['tpgrp'], _signal.SIGCONT)
173                 _os.kill(-self.connection['tpgrp'], _signal.SIGCONT)
174             if self.connection.get('to_user', None) not in [None, _sys.stdout]:
175                 self.logger.info('close to-user output stream')
176                 self.connection['to_user'].close()
177             if self.connection.get('from_user',None) not in [None,_sys.stdout]:
178                 self.logger.info('close from-user input stream')
179                 self.connection['from_user'].close()
180         finally:
181             self.connection = {'active': False}
182             self.logger.info('disconnected from user')
183
184     def _get_pgrp(self, tty_name):
185         self.logger.info('find process group contolling {}'.format(tty_name))
186         proc = '/proc'
187         for name in _os.listdir(proc):
188             path = _os_path.join(proc, name)
189             if not (self._digit_regexp.match(name) and _os_path.isdir(path)):
190                 continue  # not a process directory
191             self.logger.debug('checking process {}'.format(name))
192             fd_path = _os_path.join(path, 'fd', '0')
193             try:
194                 link = _os.readlink(fd_path)
195             except OSError as e:
196                 self.logger.debug('not our process: {}'.format(e))
197                 continue  # permission denied (not one of our processes)
198             if link != tty_name:
199                 self.logger.debug('wrong tty: {}'.format(link))
200                 continue  # not attached to our target tty
201             stat_path = _os_path.join(path, 'stat')
202             stat = open(stat_path, 'r').read()
203             self.logger.debug('check stat for pgrp: {}'.format(stat))
204             match = self._tpgrp_regexp.match(stat)
205             assert match != None, stat
206             pgrp = int(match.group(1))
207             self.logger.info('found pgrp {} for {}'.format(pgrp, tty_name))
208             return pgrp
209         raise ValueError(tty_name)
210
211     def _write(self, string):
212         "Write text to the user's terminal."
213         self.connection['to_user'].write(string + '\n')
214         self.connection['to_user'].flush()
215
216     def _read(self):
217         "Read and return a line from the user's terminal."
218         # drop trailing newline
219         return self.connection['from_user'].readline()[:-1]
220
221     def _prompt(self, prompt='?', add_colon=True):
222         if add_colon:
223             prompt += ':'
224         self.connection['to_user'].write(prompt)
225         self.connection['to_user'].write(' ')
226         self.connection['to_user'].flush()
227         return self._read()
228
229     # assuan handlers
230
231     def _handle_GETINFO(self, arg):
232         if arg == 'pid':
233             yield _common.Response('D', str(_os.getpid()))
234         elif arg == 'version':
235             yield _common.Response('D', __version__)
236         else:
237             raise _error.AssuanError(message='Invalid parameter')
238         yield _common.Response('OK')
239
240     def _handle_SETDESC(self, arg):
241         self.strings['description'] = arg
242         yield _common.Response('OK')
243
244     def _handle_SETPROMPT(self, arg):
245         self.strings['prompt'] = arg
246         yield _common.Response('OK')
247
248     def _handle_SETERROR(self, arg):
249         self.strings['error'] = arg
250         yield _common.Response('OK')
251
252     def _handle_SETTITLE(self, arg):
253         self.strings['title'] = arg
254         yield _common.Response('OK')
255
256     def _handle_SETOK(self, arg):
257         self.strings['ok'] = arg
258         yield _common.Response('OK')
259
260     def _handle_SETCANCEL(self, arg):
261         self.strings['cancel'] = arg
262         yield _common.Response('OK')
263
264     def _handle_SETNOTOK(self, arg):
265         self.strings['not ok'] = arg
266         yield _common.Response('OK')
267
268     def _handle_SETQUALITYBAR(self, arg):
269         """Adds a quality indicator to the GETPIN window.
270
271         This indicator is updated as the passphrase is typed.  The
272         clients needs to implement an inquiry named "QUALITY" which
273         gets passed the current passpharse (percent-plus escaped) and
274         should send back a string with a single numerical vauelue
275         between -100 and 100.  Negative values will be displayed in
276         red.
277
278         If a custom label for the quality bar is required, just add
279         that label as an argument as percent escaped string.  You will
280         need this feature to translate the label because pinentry has
281         no internal gettext except for stock strings from the toolkit
282         library.
283
284         If you want to show a tooltip for the quality bar, you may use
285
286             C: SETQUALITYBAR_TT string
287             S: OK
288
289         With STRING being a percent escaped string shown as the tooltip.
290         """
291         raise NotImplementedError()
292
293     def _handle_GETPIN(self, arg):
294         try:
295             self._connect()
296             self._write(self.strings['description'])
297             pin = self._prompt(self.strings['prompt'], add_colon=False)
298         finally:
299             self._disconnect()
300         yield _common.Response('D', pin)
301         yield _common.Response('OK')
302
303     def _handle_CONFIRM(self, arg):
304         try:
305             self._connect()
306             self._write(self.strings['description'])
307             self._write('1) '+self.strings['ok'])
308             self._write('2) '+self.strings['not ok'])
309             value = self._prompt('?')
310         finally:
311             self._disconnect()
312         if value == '1':
313             yield _common.Response('OK')
314         else:
315             raise _error.AssuanError(message='Not confirmed')
316
317     def _handle_MESSAGE(self, arg):
318         self._write(self.strings['description'])
319         yield _common.Response('OK')
320
321     def _handle_CONFIRM(self, args):
322         assert args == '--one-button', args
323         try:
324             self._connect()
325             self._write(self.strings['description'])
326             self._write('1) '+self.strings['ok'])
327             value = self._prompt('?')
328         finally:
329             self._disconnect()
330         assert value == '1', value
331         yield _common.Response('OK')
332
333
334 if __name__ == '__main__':
335     import argparse
336     import logging
337     import traceback
338
339     parser = argparse.ArgumentParser(description=__doc__, version=__version__)
340     parser.add_argument(
341         '-V', '--verbose', action='count', default=0,
342         help='increase verbosity')
343     parser.add_argument(
344         '--display',
345         help='set X display (ignored by this implementation)')
346
347     args = parser.parse_args()
348
349     p = PinEntry()
350
351     if args.verbose:
352         p.logger.setLevel(max(
353                 logging.DEBUG, p.logger.level - 10*args.verbose))
354
355     try:
356         p = PinEntry(override_ttyname=True)
357         p.run()
358     except:
359         p.logger.error(
360             'exiting due to exception:\n{}'.format(
361                 traceback.format_exc().rstrip()))
362         raise