3 # Copyright (C) 2012-2017 W. Trevor King <wking@tremily.us>
5 # This file is part of pyassuan.
7 # pyassuan is free software: you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation, either version 3 of the License, or (at your option) any later
12 # pyassuan is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License along with
17 # pyassuan. If not, see <http://www.gnu.org/licenses/>.
19 """Simple pinentry program for getting pins from a terminal.
24 import os.path as _os_path
25 import pprint as _pprint
27 import signal as _signal
29 import termios as _termios
31 from pyassuan import __version__
32 from pyassuan import server as _server
33 from pyassuan import common as _common
34 from pyassuan import error as _error
37 class PinEntry (_server.AssuanServer):
38 """pinentry protocol server
40 See ``pinentry-0.8.0/doc/pinentry.texi`` at::
42 ftp://ftp.gnupg.org/gcrypt/pinentry/
43 http://www.gnupg.org/aegypten/
45 for details on the pinentry interface.
47 Alternatively, you can just watch the logs and guess ;). Here's a
48 trace when driven by GnuPG 2.0.28 (libgcrypt 1.6.3)::
50 S: OK Your orders please
53 C: OPTION ttyname=/dev/pts/6
55 C: OPTION ttytype=xterm
57 C: OPTION lc-ctype=en_US.UTF-8
59 C: OPTION lc-messages=en_US.UTF-8
61 C: OPTION allow-external-password-cache
63 C: OPTION default-ok=_OK
65 C: OPTION default-cancel=_Cancel
67 C: OPTION default-yes=_Yes
69 C: OPTION default-no=_No
71 C: OPTION default-prompt=PIN:
73 C: OPTION default-pwmngr=_Save in password manager
75 C: OPTION default-cf-visi=Do you really want to make your passphrase visible on the screen?
77 C: OPTION default-tt-visi=Make passphrase visible
79 C: OPTION default-tt-hide=Hide passphrase
84 C: SETKEYINFO u/S9464F2C2825D2FE3
86 C: SETDESC Enter passphrase%0A
88 C: SETPROMPT Passphrase
94 S: OK closing connection
96 _digit_regexp = _re.compile(r'\d+')
98 # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
99 _tpgrp_regexp = _re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
101 def __init__(self, name='pinentry', strict_options=False,
102 single_request=True, **kwargs):
105 super(PinEntry, self).__init__(
106 name=name, strict_options=strict_options,
107 single_request=single_request, **kwargs)
108 self.valid_options.append('ttyname')
111 super(PinEntry, self).reset()
113 self.connection.clear()
118 self.logger.info('connecting to user')
119 self.logger.debug('options:\n{}'.format(_pprint.pformat(self.options)))
120 tty_name = self.options.get('ttyname', None)
122 self.connection['tpgrp'] = self._get_pgrp(tty_name)
124 'open to-user output stream for {}'.format(tty_name))
125 self.connection['to_user'] = open(tty_name, 'w')
127 'open from-user input stream for {}'.format(tty_name))
128 self.connection['from_user'] = open(tty_name, 'r')
129 self.logger.info('get current termios line discipline')
130 self.connection['original termios'] = _termios.tcgetattr(
131 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
132 new_termios = _copy.deepcopy(self.connection['original termios'])
133 # translate carriage return to newline on input
134 new_termios[0] |= _termios.ICRNL
135 # do not ignore carriage return on input
136 new_termios[0] &= ~_termios.IGNCR
137 # do not echo input characters
138 new_termios[3] &= ~_termios.ECHO
139 # echo input characters
140 #new_termios[3] |= _termios.ECHO
141 # echo the NL character even if ECHO is not set
142 new_termios[3] |= _termios.ECHONL
143 # enable canonical mode
144 new_termios[3] |= _termios.ICANON
145 self.logger.info('adjust termios line discipline')
147 self.connection['to_user'], _termios.TCSANOW, new_termios)
148 self.logger.info('send SIGSTOP to pgrp {}'.format(
149 self.connection['tpgrp']))
150 #_os.killpg(self.connection['tpgrp'], _signal.SIGSTOP)
151 _os.kill(-self.connection['tpgrp'], _signal.SIGSTOP)
152 self.connection['tpgrp stopped'] = True
154 self.logger.info('no TTY name given; use stdin/stdout for I/O')
155 self.connection['to_user'] = _sys.stdout
156 self.connection['from_user'] = _sys.stdin
157 self.logger.info('connected to user')
158 self.connection['to_user'].write('\n') # give a clean line to work on
159 self.connection['active'] = True
161 def _disconnect(self):
162 self.logger.info('disconnecting from user')
164 if self.connection.get('original termios', None):
165 self.logger.info('restore original termios line discipline')
167 self.connection['to_user'], _termios.TCSANOW,
168 self.connection['original termios'])
169 if self.connection.get('tpgrp stopped', None) is True:
171 'send SIGCONT to pgrp {}'.format(self.connection['tpgrp']))
172 #_os.killpg(self.connection['tpgrp'], _signal.SIGCONT)
173 _os.kill(-self.connection['tpgrp'], _signal.SIGCONT)
174 if self.connection.get('to_user', None) not in [None, _sys.stdout]:
175 self.logger.info('close to-user output stream')
176 self.connection['to_user'].close()
177 if self.connection.get('from_user',None) not in [None,_sys.stdout]:
178 self.logger.info('close from-user input stream')
179 self.connection['from_user'].close()
181 self.connection = {'active': False}
182 self.logger.info('disconnected from user')
184 def _get_pgrp(self, tty_name):
185 self.logger.info('find process group contolling {}'.format(tty_name))
187 for name in _os.listdir(proc):
188 path = _os_path.join(proc, name)
189 if not (self._digit_regexp.match(name) and _os_path.isdir(path)):
190 continue # not a process directory
191 self.logger.debug('checking process {}'.format(name))
192 fd_path = _os_path.join(path, 'fd', '0')
194 link = _os.readlink(fd_path)
196 self.logger.debug('not our process: {}'.format(e))
197 continue # permission denied (not one of our processes)
199 self.logger.debug('wrong tty: {}'.format(link))
200 continue # not attached to our target tty
201 stat_path = _os_path.join(path, 'stat')
202 stat = open(stat_path, 'r').read()
203 self.logger.debug('check stat for pgrp: {}'.format(stat))
204 match = self._tpgrp_regexp.match(stat)
205 assert match != None, stat
206 pgrp = int(match.group(1))
207 self.logger.info('found pgrp {} for {}'.format(pgrp, tty_name))
209 raise ValueError(tty_name)
211 def _write(self, string):
212 "Write text to the user's terminal."
213 self.connection['to_user'].write(string + '\n')
214 self.connection['to_user'].flush()
217 "Read and return a line from the user's terminal."
218 # drop trailing newline
219 return self.connection['from_user'].readline()[:-1]
221 def _prompt(self, prompt='?', error=None, add_colon=True):
225 self.connection['to_user'].write(error)
226 self.connection['to_user'].write('\n')
227 self.connection['to_user'].write(prompt)
228 self.connection['to_user'].write(' ')
229 self.connection['to_user'].flush()
234 def _handle_GETINFO(self, arg):
236 yield _common.Response('D', str(_os.getpid()).encode('ascii'))
237 elif arg == 'version':
238 yield _common.Response('D', __version__.encode('ascii'))
240 raise _error.AssuanError(message='Invalid parameter')
241 yield _common.Response('OK')
243 def _handle_SETKEYINFO(self, arg):
244 self.strings['key info'] = arg
245 yield _common.Response('OK')
247 def _handle_CLEARPASSPHRASE(self, arg):
248 yield _common.Response('OK')
250 def _handle_SETDESC(self, arg):
251 self.strings['description'] = arg
252 yield _common.Response('OK')
254 def _handle_SETPROMPT(self, arg):
255 self.strings['prompt'] = arg
256 yield _common.Response('OK')
258 def _handle_SETERROR(self, arg):
259 self.strings['error'] = arg
260 yield _common.Response('OK')
262 def _handle_SETTITLE(self, arg):
263 self.strings['title'] = arg
264 yield _common.Response('OK')
266 def _handle_SETOK(self, arg):
267 self.strings['ok'] = arg
268 yield _common.Response('OK')
270 def _handle_SETCANCEL(self, arg):
271 self.strings['cancel'] = arg
272 yield _common.Response('OK')
274 def _handle_SETNOTOK(self, arg):
275 self.strings['not ok'] = arg
276 yield _common.Response('OK')
278 def _handle_SETQUALITYBAR(self, arg):
279 """Adds a quality indicator to the GETPIN window.
281 This indicator is updated as the passphrase is typed. The
282 clients needs to implement an inquiry named "QUALITY" which
283 gets passed the current passphrase (percent-plus escaped) and
284 should send back a string with a single numerical vauelue
285 between -100 and 100. Negative values will be displayed in
288 If a custom label for the quality bar is required, just add
289 that label as an argument as percent escaped string. You will
290 need this feature to translate the label because pinentry has
291 no internal gettext except for stock strings from the toolkit
294 If you want to show a tooltip for the quality bar, you may use
296 C: SETQUALITYBAR_TT string
299 With STRING being a percent escaped string shown as the tooltip.
301 Here is a real world example of these commands in use:
303 C: SETQUALITYBAR Quality%3a
305 C: SETQUALITYBAR_TT The quality of the text entered above.%0aPlease ask your administrator for details about the criteria.
308 self.strings['qualitybar'] = arg
309 yield _common.Response('OK')
311 def _handle_SETQUALITYBAR_TT(self, arg):
312 self.strings['qualitybar_tooltip'] = arg
313 yield _common.Response('OK')
315 def _handle_GETPIN(self, arg):
318 self._write(self.strings['description'])
319 if 'key info' in self.strings:
320 self._write('key: {}'.format(self.strings['key info']))
321 if 'qualitybar' in self.strings:
322 self._write(self.strings['qualitybar'])
324 prompt=self.strings['prompt'],
325 error=self.strings.get('error'),
329 yield _common.Response('D', pin.encode('ascii'))
330 yield _common.Response('OK')
332 def _handle_CONFIRM(self, arg):
335 self._write(self.strings['description'])
336 self._write('1) '+self.strings['ok'])
337 self._write('2) '+self.strings['not ok'])
338 value = self._prompt('?')
342 yield _common.Response('OK')
344 raise _error.AssuanError(message='Not confirmed')
346 def _handle_MESSAGE(self, arg):
347 self._write(self.strings['description'])
348 yield _common.Response('OK')
350 def _handle_CONFIRM(self, args):
351 assert args == '--one-button', args
354 self._write(self.strings['description'])
355 self._write('1) '+self.strings['ok'])
356 value = self._prompt('?')
359 assert value == '1', value
360 yield _common.Response('OK')
363 if __name__ == '__main__':
368 parser = argparse.ArgumentParser(description=__doc__)
370 '-v', '--version', action='version',
371 version='%(prog)s {}'.format(__version__))
373 '-V', '--verbose', action='count', default=0,
374 help='increase verbosity')
377 help='set X display (ignored by this implementation)')
379 args = parser.parse_args()
384 p.logger.setLevel(max(
385 logging.DEBUG, p.logger.level - 10*args.verbose))
391 'exiting due to exception:\n{}'.format(
392 traceback.format_exc().rstrip()))