3 # Copyright (C) 2012 W. Trevor King <wking@drexel.edu>
5 # This file is part of pyassuan.
7 # pyassuan is free software: you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation, either version 3 of the License, or (at your option) any later
12 # pyassuan is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License along with
17 # pyassuan. If not, see <http://www.gnu.org/licenses/>.
19 """Simple pinentry program for getting pins from a terminal.
24 import os.path as _os_path
25 import pprint as _pprint
27 import signal as _signal
29 import termios as _termios
31 from pyassuan import __version__
32 from pyassuan import server as _server
33 from pyassuan import common as _common
34 from pyassuan import error as _error
37 class PinEntry (_server.AssuanServer):
38 """pinentry protocol server
40 See ``pinentry-0.8.0/doc/pinentry.texi`` at::
42 ftp://ftp.gnupg.org/gcrypt/pinentry/
43 http://www.gnupg.org/aegypten/
45 for details on the pinentry interface.
47 Alternatively, you can just watch the logs and guess ;). Here's a
48 trace when driven by GnuPG 2.0.28 (libgcrypt 1.6.3)::
50 S: OK Your orders please
53 C: OPTION ttyname=/dev/pts/6
55 C: OPTION ttytype=xterm
57 C: OPTION lc-ctype=en_US.UTF-8
59 C: OPTION lc-messages=en_US.UTF-8
61 C: OPTION allow-external-password-cache
63 C: OPTION default-ok=_OK
65 C: OPTION default-cancel=_Cancel
67 C: OPTION default-yes=_Yes
69 C: OPTION default-no=_No
71 C: OPTION default-prompt=PIN:
73 C: OPTION default-pwmngr=_Save in password manager
75 C: OPTION default-cf-visi=Do you really want to make your passphrase visible on the screen?
77 C: OPTION default-tt-visi=Make passphrase visible
79 C: OPTION default-tt-hide=Hide passphrase
84 C: SETKEYINFO u/S9464F2C2825D2FE3
86 C: SETDESC Enter passphrase%0A
88 C: SETPROMPT Passphrase
94 S: OK closing connection
96 _digit_regexp = _re.compile(r'\d+')
98 # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
99 _tpgrp_regexp = _re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
101 def __init__(self, name='pinentry', strict_options=False,
102 single_request=True, **kwargs):
105 super(PinEntry, self).__init__(
106 name=name, strict_options=strict_options,
107 single_request=single_request, **kwargs)
108 self.valid_options.append('ttyname')
111 super(PinEntry, self).reset()
113 self.connection.clear()
118 self.logger.info('connecting to user')
119 self.logger.debug('options:\n{}'.format(_pprint.pformat(self.options)))
120 tty_name = self.options.get('ttyname', None)
122 self.connection['tpgrp'] = self._get_pgrp(tty_name)
124 'open to-user output stream for {}'.format(tty_name))
125 self.connection['to_user'] = open(tty_name, 'w')
127 'open from-user input stream for {}'.format(tty_name))
128 self.connection['from_user'] = open(tty_name, 'r')
129 self.logger.info('get current termios line discipline')
130 self.connection['original termios'] = _termios.tcgetattr(
131 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
132 new_termios = _copy.deepcopy(self.connection['original termios'])
133 # translate carriage return to newline on input
134 new_termios[0] |= _termios.ICRNL
135 # do not ignore carriage return on input
136 new_termios[0] &= ~_termios.IGNCR
137 # do not echo input characters
138 new_termios[3] &= ~_termios.ECHO
139 # echo input characters
140 #new_termios[3] |= _termios.ECHO
141 # echo the NL character even if ECHO is not set
142 new_termios[3] |= _termios.ECHONL
143 # enable canonical mode
144 new_termios[3] |= _termios.ICANON
145 self.logger.info('adjust termios line discipline')
147 self.connection['to_user'], _termios.TCSANOW, new_termios)
148 self.logger.info('send SIGSTOP to pgrp {}'.format(
149 self.connection['tpgrp']))
150 #_os.killpg(self.connection['tpgrp'], _signal.SIGSTOP)
151 _os.kill(-self.connection['tpgrp'], _signal.SIGSTOP)
152 self.connection['tpgrp stopped'] = True
154 self.logger.info('no TTY name given; use stdin/stdout for I/O')
155 self.connection['to_user'] = _sys.stdout
156 self.connection['from_user'] = _sys.stdin
157 self.logger.info('connected to user')
158 self.connection['to_user'].write('\n') # give a clean line to work on
159 self.connection['active'] = True
161 def _disconnect(self):
162 self.logger.info('disconnecting from user')
164 if self.connection.get('original termios', None):
165 self.logger.info('restore original termios line discipline')
167 self.connection['to_user'], _termios.TCSANOW,
168 self.connection['original termios'])
169 if self.connection.get('tpgrp stopped', None) is True:
171 'send SIGCONT to pgrp {}'.format(self.connection['tpgrp']))
172 #_os.killpg(self.connection['tpgrp'], _signal.SIGCONT)
173 _os.kill(-self.connection['tpgrp'], _signal.SIGCONT)
174 if self.connection.get('to_user', None) not in [None, _sys.stdout]:
175 self.logger.info('close to-user output stream')
176 self.connection['to_user'].close()
177 if self.connection.get('from_user',None) not in [None,_sys.stdout]:
178 self.logger.info('close from-user input stream')
179 self.connection['from_user'].close()
181 self.connection = {'active': False}
182 self.logger.info('disconnected from user')
184 def _get_pgrp(self, tty_name):
185 self.logger.info('find process group contolling {}'.format(tty_name))
187 for name in _os.listdir(proc):
188 path = _os_path.join(proc, name)
189 if not (self._digit_regexp.match(name) and _os_path.isdir(path)):
190 continue # not a process directory
191 self.logger.debug('checking process {}'.format(name))
192 fd_path = _os_path.join(path, 'fd', '0')
194 link = _os.readlink(fd_path)
196 self.logger.debug('not our process: {}'.format(e))
197 continue # permission denied (not one of our processes)
199 self.logger.debug('wrong tty: {}'.format(link))
200 continue # not attached to our target tty
201 stat_path = _os_path.join(path, 'stat')
202 stat = open(stat_path, 'r').read()
203 self.logger.debug('check stat for pgrp: {}'.format(stat))
204 match = self._tpgrp_regexp.match(stat)
205 assert match != None, stat
206 pgrp = int(match.group(1))
207 self.logger.info('found pgrp {} for {}'.format(pgrp, tty_name))
209 raise ValueError(tty_name)
211 def _write(self, string):
212 "Write text to the user's terminal."
213 self.connection['to_user'].write(string + '\n')
214 self.connection['to_user'].flush()
217 "Read and return a line from the user's terminal."
218 # drop trailing newline
219 return self.connection['from_user'].readline()[:-1]
221 def _prompt(self, prompt='?', add_colon=True):
224 self.connection['to_user'].write(prompt)
225 self.connection['to_user'].write(' ')
226 self.connection['to_user'].flush()
231 def _handle_GETINFO(self, arg):
233 yield _common.Response('D', str(_os.getpid()).encode('ascii'))
234 elif arg == 'version':
235 yield _common.Response('D', __version__.encode('ascii'))
237 raise _error.AssuanError(message='Invalid parameter')
238 yield _common.Response('OK')
240 def _handle_SETKEYINFO(self, arg):
241 self.strings['key info'] = arg
242 yield _common.Response('OK')
244 def _handle_SETDESC(self, arg):
245 self.strings['description'] = arg
246 yield _common.Response('OK')
248 def _handle_SETPROMPT(self, arg):
249 self.strings['prompt'] = arg
250 yield _common.Response('OK')
252 def _handle_SETERROR(self, arg):
253 self.strings['error'] = arg
254 yield _common.Response('OK')
256 def _handle_SETTITLE(self, arg):
257 self.strings['title'] = arg
258 yield _common.Response('OK')
260 def _handle_SETOK(self, arg):
261 self.strings['ok'] = arg
262 yield _common.Response('OK')
264 def _handle_SETCANCEL(self, arg):
265 self.strings['cancel'] = arg
266 yield _common.Response('OK')
268 def _handle_SETNOTOK(self, arg):
269 self.strings['not ok'] = arg
270 yield _common.Response('OK')
272 def _handle_SETQUALITYBAR(self, arg):
273 """Adds a quality indicator to the GETPIN window.
275 This indicator is updated as the passphrase is typed. The
276 clients needs to implement an inquiry named "QUALITY" which
277 gets passed the current passphrase (percent-plus escaped) and
278 should send back a string with a single numerical vauelue
279 between -100 and 100. Negative values will be displayed in
282 If a custom label for the quality bar is required, just add
283 that label as an argument as percent escaped string. You will
284 need this feature to translate the label because pinentry has
285 no internal gettext except for stock strings from the toolkit
288 If you want to show a tooltip for the quality bar, you may use
290 C: SETQUALITYBAR_TT string
293 With STRING being a percent escaped string shown as the tooltip.
295 Here is a real world example of these commands in use:
297 C: SETQUALITYBAR Quality%3a
299 C: SETQUALITYBAR_TT The quality of the text entered above.%0aPlease ask your administrator for details about the criteria.
302 self.strings['qualitybar'] = arg
303 yield _common.Response('OK')
305 def _handle_SETQUALITYBAR_TT(self, arg):
306 self.strings['qualitybar_tooltip'] = arg
307 yield _common.Response('OK')
309 def _handle_GETPIN(self, arg):
312 self._write(self.strings['description'])
313 if 'key info' in self.strings:
314 self._write('key: {}'.format(self.strings['key info']))
315 if 'qualitybar' in self.strings:
316 self._write(self.strings['qualitybar'])
317 pin = self._prompt(self.strings['prompt'], add_colon=False)
320 yield _common.Response('D', pin.encode('ascii'))
321 yield _common.Response('OK')
323 def _handle_CONFIRM(self, arg):
326 self._write(self.strings['description'])
327 self._write('1) '+self.strings['ok'])
328 self._write('2) '+self.strings['not ok'])
329 value = self._prompt('?')
333 yield _common.Response('OK')
335 raise _error.AssuanError(message='Not confirmed')
337 def _handle_MESSAGE(self, arg):
338 self._write(self.strings['description'])
339 yield _common.Response('OK')
341 def _handle_CONFIRM(self, args):
342 assert args == '--one-button', args
345 self._write(self.strings['description'])
346 self._write('1) '+self.strings['ok'])
347 value = self._prompt('?')
350 assert value == '1', value
351 yield _common.Response('OK')
354 if __name__ == '__main__':
359 parser = argparse.ArgumentParser(description=__doc__)
361 '-v', '--version', action='version',
362 version='%(prog)s {}'.format(__version__))
364 '-V', '--verbose', action='count', default=0,
365 help='increase verbosity')
368 help='set X display (ignored by this implementation)')
370 args = parser.parse_args()
375 p.logger.setLevel(max(
376 logging.DEBUG, p.logger.level - 10*args.verbose))
382 'exiting due to exception:\n{}'.format(
383 traceback.format_exc().rstrip()))