3 # Copyright (C) 2012 W. Trevor King <wking@drexel.edu>
5 # This file is part of pyassuan.
7 # pyassuan is free software: you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation, either version 3 of the License, or (at your option) any later
12 # pyassuan is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License along with
17 # pyassuan. If not, see <http://www.gnu.org/licenses/>.
19 """Simple pinentry program for getting pins from a terminal.
24 import os.path as _os_path
25 import pprint as _pprint
27 import signal as _signal
29 import termios as _termios
31 from pyassuan import __version__
32 from pyassuan import server as _server
33 from pyassuan import common as _common
34 from pyassuan import error as _error
37 class PinEntry (_server.AssuanServer):
38 """pinentry protocol server
40 See ``pinentry-0.8.0/doc/pinentry.texi`` at::
42 ftp://ftp.gnupg.org/gcrypt/pinentry/
43 http://www.gnupg.org/aegypten/
45 for details on the pinentry interface.
47 Alternatively, you can just watch the logs and guess ;). Here's a
48 trace when driven by GnuPG 2.0.28 (libgcrypt 1.6.3)::
50 S: OK Your orders please
53 C: OPTION ttyname=/dev/pts/6
55 C: OPTION ttytype=xterm
57 C: OPTION lc-ctype=en_US.UTF-8
59 C: OPTION lc-messages=en_US.UTF-8
61 C: OPTION allow-external-password-cache
63 C: OPTION default-ok=_OK
65 C: OPTION default-cancel=_Cancel
67 C: OPTION default-yes=_Yes
69 C: OPTION default-no=_No
71 C: OPTION default-prompt=PIN:
73 C: OPTION default-pwmngr=_Save in password manager
75 C: OPTION default-cf-visi=Do you really want to make your passphrase visible on the screen?
77 C: OPTION default-tt-visi=Make passphrase visible
79 C: OPTION default-tt-hide=Hide passphrase
84 C: SETKEYINFO u/S9464F2C2825D2FE3
86 C: SETDESC Enter passphrase%0A
88 C: SETPROMPT Passphrase
94 S: OK closing connection
96 _digit_regexp = _re.compile(r'\d+')
98 # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
99 _tpgrp_regexp = _re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
101 def __init__(self, name='pinentry', strict_options=False,
102 single_request=True, **kwargs):
105 super(PinEntry, self).__init__(
106 name=name, strict_options=strict_options,
107 single_request=single_request, **kwargs)
108 self.valid_options.append('ttyname')
111 super(PinEntry, self).reset()
113 self.connection.clear()
118 self.logger.info('connecting to user')
119 self.logger.debug('options:\n{}'.format(_pprint.pformat(self.options)))
120 tty_name = self.options.get('ttyname', None)
122 self.connection['tpgrp'] = self._get_pgrp(tty_name)
124 'open to-user output stream for {}'.format(tty_name))
125 self.connection['to_user'] = open(tty_name, 'w')
127 'open from-user input stream for {}'.format(tty_name))
128 self.connection['from_user'] = open(tty_name, 'r')
129 self.logger.info('get current termios line discipline')
130 self.connection['original termios'] = _termios.tcgetattr(
131 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
132 new_termios = _copy.deepcopy(self.connection['original termios'])
133 # translate carriage return to newline on input
134 new_termios[0] |= _termios.ICRNL
135 # do not ignore carriage return on input
136 new_termios[0] &= ~_termios.IGNCR
137 # do not echo input characters
138 new_termios[3] &= ~_termios.ECHO
139 # echo input characters
140 #new_termios[3] |= _termios.ECHO
141 # echo the NL character even if ECHO is not set
142 new_termios[3] |= _termios.ECHONL
143 # enable canonical mode
144 new_termios[3] |= _termios.ICANON
145 self.logger.info('adjust termios line discipline')
147 self.connection['to_user'], _termios.TCSANOW, new_termios)
148 self.logger.info('send SIGSTOP to pgrp {}'.format(
149 self.connection['tpgrp']))
150 #_os.killpg(self.connection['tpgrp'], _signal.SIGSTOP)
151 _os.kill(-self.connection['tpgrp'], _signal.SIGSTOP)
152 self.connection['tpgrp stopped'] = True
154 self.logger.info('no TTY name given; use stdin/stdout for I/O')
155 self.connection['to_user'] = _sys.stdout
156 self.connection['from_user'] = _sys.stdin
157 self.logger.info('connected to user')
158 self.connection['to_user'].write('\n') # give a clean line to work on
159 self.connection['active'] = True
161 def _disconnect(self):
162 self.logger.info('disconnecting from user')
164 if self.connection.get('original termios', None):
165 self.logger.info('restore original termios line discipline')
167 self.connection['to_user'], _termios.TCSANOW,
168 self.connection['original termios'])
169 if self.connection.get('tpgrp stopped', None) is True:
171 'send SIGCONT to pgrp {}'.format(self.connection['tpgrp']))
172 #_os.killpg(self.connection['tpgrp'], _signal.SIGCONT)
173 _os.kill(-self.connection['tpgrp'], _signal.SIGCONT)
174 if self.connection.get('to_user', None) not in [None, _sys.stdout]:
175 self.logger.info('close to-user output stream')
176 self.connection['to_user'].close()
177 if self.connection.get('from_user',None) not in [None,_sys.stdout]:
178 self.logger.info('close from-user input stream')
179 self.connection['from_user'].close()
181 self.connection = {'active': False}
182 self.logger.info('disconnected from user')
184 def _get_pgrp(self, tty_name):
185 self.logger.info('find process group contolling {}'.format(tty_name))
187 for name in _os.listdir(proc):
188 path = _os_path.join(proc, name)
189 if not (self._digit_regexp.match(name) and _os_path.isdir(path)):
190 continue # not a process directory
191 self.logger.debug('checking process {}'.format(name))
192 fd_path = _os_path.join(path, 'fd', '0')
194 link = _os.readlink(fd_path)
196 self.logger.debug('not our process: {}'.format(e))
197 continue # permission denied (not one of our processes)
199 self.logger.debug('wrong tty: {}'.format(link))
200 continue # not attached to our target tty
201 stat_path = _os_path.join(path, 'stat')
202 stat = open(stat_path, 'r').read()
203 self.logger.debug('check stat for pgrp: {}'.format(stat))
204 match = self._tpgrp_regexp.match(stat)
205 assert match != None, stat
206 pgrp = int(match.group(1))
207 self.logger.info('found pgrp {} for {}'.format(pgrp, tty_name))
209 raise ValueError(tty_name)
211 def _write(self, string):
212 "Write text to the user's terminal."
213 self.connection['to_user'].write(string + '\n')
214 self.connection['to_user'].flush()
217 "Read and return a line from the user's terminal."
218 # drop trailing newline
219 return self.connection['from_user'].readline()[:-1]
221 def _prompt(self, prompt='?', add_colon=True):
224 self.connection['to_user'].write(prompt)
225 self.connection['to_user'].write(' ')
226 self.connection['to_user'].flush()
231 def _handle_GETINFO(self, arg):
233 yield _common.Response('D', str(_os.getpid()).encode('ascii'))
234 elif arg == 'version':
235 yield _common.Response('D', __version__.encode('ascii'))
237 raise _error.AssuanError(message='Invalid parameter')
238 yield _common.Response('OK')
240 def _handle_SETKEYINFO(self, arg):
241 self.strings['key info'] = arg
242 yield _common.Response('OK')
244 def _handle_CLEARPASSPHRASE(self, arg):
245 yield _common.Response('OK')
247 def _handle_SETDESC(self, arg):
248 self.strings['description'] = arg
249 yield _common.Response('OK')
251 def _handle_SETPROMPT(self, arg):
252 self.strings['prompt'] = arg
253 yield _common.Response('OK')
255 def _handle_SETERROR(self, arg):
256 self.strings['error'] = arg
257 yield _common.Response('OK')
259 def _handle_SETTITLE(self, arg):
260 self.strings['title'] = arg
261 yield _common.Response('OK')
263 def _handle_SETOK(self, arg):
264 self.strings['ok'] = arg
265 yield _common.Response('OK')
267 def _handle_SETCANCEL(self, arg):
268 self.strings['cancel'] = arg
269 yield _common.Response('OK')
271 def _handle_SETNOTOK(self, arg):
272 self.strings['not ok'] = arg
273 yield _common.Response('OK')
275 def _handle_SETQUALITYBAR(self, arg):
276 """Adds a quality indicator to the GETPIN window.
278 This indicator is updated as the passphrase is typed. The
279 clients needs to implement an inquiry named "QUALITY" which
280 gets passed the current passphrase (percent-plus escaped) and
281 should send back a string with a single numerical vauelue
282 between -100 and 100. Negative values will be displayed in
285 If a custom label for the quality bar is required, just add
286 that label as an argument as percent escaped string. You will
287 need this feature to translate the label because pinentry has
288 no internal gettext except for stock strings from the toolkit
291 If you want to show a tooltip for the quality bar, you may use
293 C: SETQUALITYBAR_TT string
296 With STRING being a percent escaped string shown as the tooltip.
298 Here is a real world example of these commands in use:
300 C: SETQUALITYBAR Quality%3a
302 C: SETQUALITYBAR_TT The quality of the text entered above.%0aPlease ask your administrator for details about the criteria.
305 self.strings['qualitybar'] = arg
306 yield _common.Response('OK')
308 def _handle_SETQUALITYBAR_TT(self, arg):
309 self.strings['qualitybar_tooltip'] = arg
310 yield _common.Response('OK')
312 def _handle_GETPIN(self, arg):
315 self._write(self.strings['description'])
316 if 'key info' in self.strings:
317 self._write('key: {}'.format(self.strings['key info']))
318 if 'qualitybar' in self.strings:
319 self._write(self.strings['qualitybar'])
320 pin = self._prompt(self.strings['prompt'], add_colon=False)
323 yield _common.Response('D', pin.encode('ascii'))
324 yield _common.Response('OK')
326 def _handle_CONFIRM(self, arg):
329 self._write(self.strings['description'])
330 self._write('1) '+self.strings['ok'])
331 self._write('2) '+self.strings['not ok'])
332 value = self._prompt('?')
336 yield _common.Response('OK')
338 raise _error.AssuanError(message='Not confirmed')
340 def _handle_MESSAGE(self, arg):
341 self._write(self.strings['description'])
342 yield _common.Response('OK')
344 def _handle_CONFIRM(self, args):
345 assert args == '--one-button', args
348 self._write(self.strings['description'])
349 self._write('1) '+self.strings['ok'])
350 value = self._prompt('?')
353 assert value == '1', value
354 yield _common.Response('OK')
357 if __name__ == '__main__':
362 parser = argparse.ArgumentParser(description=__doc__)
364 '-v', '--version', action='version',
365 version='%(prog)s {}'.format(__version__))
367 '-V', '--verbose', action='count', default=0,
368 help='increase verbosity')
371 help='set X display (ignored by this implementation)')
373 args = parser.parse_args()
378 p.logger.setLevel(max(
379 logging.DEBUG, p.logger.level - 10*args.verbose))
385 'exiting due to exception:\n{}'.format(
386 traceback.format_exc().rstrip()))