3 # Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program. If not, see
17 # <http://www.gnu.org/licenses/>.
19 """Simple pinentry program for getting pins from a terminal.
24 import os.path as _os_path
25 import pprint as _pprint
27 import signal as _signal
29 import termios as _termios
31 from pyassuan import __version__
32 from pyassuan import server as _server
33 from pyassuan import common as _common
34 from pyassuan import error as _error
37 class PinEntry (_server.AssuanServer):
38 """pinentry protocol server
40 See ``pinentry-0.8.0/doc/pinentry.texi`` at::
42 ftp://ftp.gnupg.org/gcrypt/pinentry/
43 http://www.gnupg.org/aegypten/
45 for details on the pinentry interface.
47 Alternatively, you can just watch the logs and guess ;). Here's a
48 trace when driven by GnuPG 2.0.17 (libgcrypt 1.4.6)::
50 S: OK Your orders please
53 C: OPTION ttyname=/dev/pts/6
55 C: OPTION ttytype=xterm
57 C: OPTION lc-ctype=en_US.UTF-8
59 C: OPTION lc-messages=en_US.UTF-8
61 C: OPTION default-ok=_OK
63 C: OPTION default-cancel=_Cancel
65 C: OPTION default-prompt=PIN:
67 C: OPTION touch-file=/tmp/gpg-7lElMX/S.gpg-agent
72 C: SETDESC Enter passphrase%0A
74 C: SETPROMPT Passphrase
80 S: OK closing connection
82 Some drivers (e.g. ``gpgme``) have ``gpg-agent`` set ``ttyname``
83 to the terminal running ``gpgme``. I don't like this, because the
84 pinentry program doesn't always play nicely with whatever is going
85 on in that terminal. I'd rather have a free terminal that had
86 just been running Bash, and I export ``GPG_TTY`` to point to the
87 desired terminal. To ignore the requested ``ttyname`` and use
88 whatever is in ``GPG_TTY``, initialize with ``override_ttyname``
91 _digit_regexp = _re.compile(r'\d+')
93 # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
94 _tpgrp_regexp = _re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
96 def __init__(self, name='pinentry', strict_options=False,
97 single_request=True, override_ttyname=False, **kwargs):
100 super(PinEntry, self).__init__(
101 name=name, strict_options=strict_options,
102 single_request=single_request, **kwargs)
103 self.valid_options.append('ttyname')
104 self.override_ttyname = override_ttyname
107 super(PinEntry, self).reset()
109 self.connection.clear()
114 self.logger.info('connecting to user')
115 self.logger.debug('options:\n{}'.format(_pprint.pformat(self.options)))
117 if self.override_ttyname:
118 tty_name = _os.getenv('TTY_NAME')
119 if not tty_name: # override not requested, or fall back on undefined
120 tty_name = self.options.get('ttyname', None)
122 self.connection['tpgrp'] = self._get_pgrp(tty_name)
124 'open to-user output stream for {}'.format(tty_name))
125 self.connection['to_user'] = open(tty_name, 'w')
127 'open from-user input stream for {}'.format(tty_name))
128 self.connection['from_user'] = open(tty_name, 'r')
129 self.logger.info('get current termios line discipline')
130 self.connection['original termios'] = _termios.tcgetattr(
131 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
132 new_termios = _copy.deepcopy(self.connection['original termios'])
133 # translate carriage return to newline on input
134 new_termios[0] |= _termios.ICRNL
135 # do not ignore carriage return on input
136 new_termios[0] &= ~_termios.IGNCR
137 # do not echo input characters
138 new_termios[3] &= ~_termios.ECHO
139 # echo input characters
140 #new_termios[3] |= _termios.ECHO
141 # echo the NL character even if ECHO is not set
142 new_termios[3] |= _termios.ECHONL
143 # enable canonical mode
144 new_termios[3] |= _termios.ICANON
145 self.logger.info('adjust termios line discipline')
147 self.connection['to_user'], _termios.TCSANOW, new_termios)
148 self.logger.info('send SIGSTOP to pgrp {}'.format(
149 self.connection['tpgrp']))
150 #_os.killpg(self.connection['tpgrp'], _signal.SIGSTOP)
151 _os.kill(-self.connection['tpgrp'], _signal.SIGSTOP)
152 self.connection['tpgrp stopped'] = True
154 self.logger.info('no TTY name given; use stdin/stdout for I/O')
155 self.connection['to_user'] = _sys.stdout
156 self.connection['from_user'] = _sys.stdin
157 self.logger.info('connected to user')
158 self.connection['to_user'].write('\n') # give a clean line to work on
159 self.connection['active'] = True
161 def _disconnect(self):
162 self.logger.info('disconnecting from user')
164 if self.connection.get('original termios', None):
165 self.logger.info('restore original termios line discipline')
167 self.connection['to_user'], _termios.TCSANOW,
168 self.connection['original termios'])
169 if self.connection.get('tpgrp stopped', None) is True:
171 'send SIGCONT to pgrp {}'.format(self.connection['tpgrp']))
172 #_os.killpg(self.connection['tpgrp'], _signal.SIGCONT)
173 _os.kill(-self.connection['tpgrp'], _signal.SIGCONT)
174 if self.connection.get('to_user', None) not in [None, _sys.stdout]:
175 self.logger.info('close to-user output stream')
176 self.connection['to_user'].close()
177 if self.connection.get('from_user',None) not in [None,_sys.stdout]:
178 self.logger.info('close from-user input stream')
179 self.connection['from_user'].close()
181 self.connection = {'active': False}
182 self.logger.info('disconnected from user')
184 def _get_pgrp(self, tty_name):
185 self.logger.info('find process group contolling {}'.format(tty_name))
187 for name in _os.listdir(proc):
188 path = _os_path.join(proc, name)
189 if not (self._digit_regexp.match(name) and _os_path.isdir(path)):
190 continue # not a process directory
191 self.logger.debug('checking process {}'.format(name))
192 fd_path = _os_path.join(path, 'fd', '0')
194 link = _os.readlink(fd_path)
196 self.logger.debug('not our process: {}'.format(e))
197 continue # permission denied (not one of our processes)
199 self.logger.debug('wrong tty: {}'.format(link))
200 continue # not attached to our target tty
201 stat_path = _os_path.join(path, 'stat')
202 stat = open(stat_path, 'r').read()
203 self.logger.debug('check stat for pgrp: {}'.format(stat))
204 match = self._tpgrp_regexp.match(stat)
205 assert match != None, stat
206 pgrp = int(match.group(1))
207 self.logger.info('found pgrp {} for {}'.format(pgrp, tty_name))
209 raise ValueError(tty_name)
211 def _write(self, string):
212 "Write text to the user's terminal."
213 self.connection['to_user'].write(string + '\n')
214 self.connection['to_user'].flush()
217 "Read and return a line from the user's terminal."
218 # drop trailing newline
219 return self.connection['from_user'].readline()[:-1]
221 def _prompt(self, prompt='?', add_colon=True):
224 self.connection['to_user'].write(prompt)
225 self.connection['to_user'].write(' ')
226 self.connection['to_user'].flush()
231 def _handle_GETINFO(self, arg):
233 yield _common.Response('D', str(_os.getpid()))
234 elif arg == 'version':
235 yield _common.Response('D', __version__)
237 raise _error.AssuanError(message='Invalid parameter')
238 yield _common.Response('OK')
240 def _handle_SETDESC(self, arg):
241 self.strings['description'] = arg
242 yield _common.Response('OK')
244 def _handle_SETPROMPT(self, arg):
245 self.strings['prompt'] = arg
246 yield _common.Response('OK')
248 def _handle_SETERROR(self, arg):
249 self.strings['error'] = arg
250 yield _common.Response('OK')
252 def _handle_SETTITLE(self, arg):
253 self.strings['title'] = arg
254 yield _common.Response('OK')
256 def _handle_SETOK(self, arg):
257 self.strings['ok'] = arg
258 yield _common.Response('OK')
260 def _handle_SETCANCEL(self, arg):
261 self.strings['cancel'] = arg
262 yield _common.Response('OK')
264 def _handle_SETNOTOK(self, arg):
265 self.strings['not ok'] = arg
266 yield _common.Response('OK')
268 def _handle_SETQUALITYBAR(self, arg):
269 """Adds a quality indicator to the GETPIN window.
271 This indicator is updated as the passphrase is typed. The
272 clients needs to implement an inquiry named "QUALITY" which
273 gets passed the current passpharse (percent-plus escaped) and
274 should send back a string with a single numerical vauelue
275 between -100 and 100. Negative values will be displayed in
278 If a custom label for the quality bar is required, just add
279 that label as an argument as percent escaped string. You will
280 need this feature to translate the label because pinentry has
281 no internal gettext except for stock strings from the toolkit
284 If you want to show a tooltip for the quality bar, you may use
286 C: SETQUALITYBAR_TT string
289 With STRING being a percent escaped string shown as the tooltip.
291 raise NotImplementedError()
293 def _handle_GETPIN(self, arg):
296 self._write(self.strings['description'])
297 pin = self._prompt(self.strings['prompt'], add_colon=False)
300 yield _common.Response('D', pin)
301 yield _common.Response('OK')
303 def _handle_CONFIRM(self, arg):
306 self._write(self.strings['description'])
307 self._write('1) '+self.strings['ok'])
308 self._write('2) '+self.strings['not ok'])
309 value = self._prompt('?')
313 yield _common.Response('OK')
315 raise _error.AssuanError(message='Not confirmed')
317 def _handle_MESSAGE(self, arg):
318 self._write(self.strings['description'])
319 yield _common.Response('OK')
321 def _handle_CONFIRM(self, args):
322 assert args == '--one-button', args
325 self._write(self.strings['description'])
326 self._write('1) '+self.strings['ok'])
327 value = self._prompt('?')
330 assert value == '1', value
331 yield _common.Response('OK')
334 if __name__ == '__main__':
339 parser = argparse.ArgumentParser(description=__doc__, version=__version__)
341 '-V', '--verbose', action='count', default=0,
342 help='increase verbosity')
345 help='set X display (ignored by this implementation)')
347 args = parser.parse_args()
352 p.logger.setLevel(max(
353 logging.DEBUG, p.logger.level - 10*args.verbose))
356 p = PinEntry(override_ttyname=True)
360 'exiting due to exception:\n{}'.format(
361 traceback.format_exc().rstrip()))