3 # Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program. If not, see
17 # <http://www.gnu.org/licenses/>.
19 """Simple pinentry program for getting pins from a terminal.
24 import os.path as _os_path
25 import pprint as _pprint
27 import signal as _signal
29 import termios as _termios
31 from pyassuan import __version__
32 from pyassuan import server as _server
33 from pyassuan import common as _common
34 from pyassuan import error as _error
37 class PinEntry (_server.AssuanServer):
38 """pinentry protocol server
40 See ``pinentry-0.8.0/doc/pinentry.texi`` at::
42 ftp://ftp.gnupg.org/gcrypt/pinentry/
43 http://www.gnupg.org/aegypten/
45 for details on the pinentry interface.
47 Alternatively, you can just watch the logs and guess ;). Here's a
48 trace when driven by GnuPG 2.0.17 (libgcrypt 1.4.6)::
50 S: OK Your orders please
53 C: OPTION ttyname=/dev/pts/6
55 C: OPTION ttytype=xterm
57 C: OPTION lc-ctype=en_US.UTF-8
59 C: OPTION lc-messages=en_US.UTF-8
61 C: OPTION default-ok=_OK
63 C: OPTION default-cancel=_Cancel
65 C: OPTION default-prompt=PIN:
67 C: OPTION touch-file=/tmp/gpg-7lElMX/S.gpg-agent
72 C: SETDESC Enter passphrase%0A
74 C: SETPROMPT Passphrase
80 S: OK closing connection
82 Some drivers (e.g. ``gpgme``) have ``gpg-agent`` set ``ttyname``
83 to the terminal running ``gpgme``. I don't like this, because the
84 pinentry program doesn't always play nicely with whatever is going
85 on in that terminal. I'd rather have a free terminal that had
86 just been running Bash, and I export ``GPG_TTY`` to point to the
87 desired terminal. To ignore the requested ``ttyname`` and use
88 whatever is in ``GPG_TTY``, initialize with ``override_ttyname``
91 _digit_regexp = _re.compile(r'\d+')
93 # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
94 _tpgrp_regexp = _re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
96 def __init__(self, name='pinentry', strict_options=False,
97 single_request=True, override_ttyname=False, **kwargs):
100 super(PinEntry, self).__init__(
101 name=name, strict_options=strict_options,
102 single_request=single_request, **kwargs)
103 self.valid_options.append('ttyname')
104 self.override_ttyname = override_ttyname
107 super(PinEntry, self).reset()
109 self.connection.clear()
114 self.logger.info('connecting to user')
115 self.logger.debug('options:\n{}'.format(_pprint.pformat(self.options)))
117 if self.override_ttyname:
118 tty_name = _os.getenv('GPG_TTY')
120 self.logger.debug('override ttyname with {}'.format(tty_name))
123 'GPG_TTY not set, fallback to ttyname option')
125 tty_name = self.options.get('ttyname', None)
127 self.connection['tpgrp'] = self._get_pgrp(tty_name)
129 'open to-user output stream for {}'.format(tty_name))
130 self.connection['to_user'] = open(tty_name, 'w')
132 'open from-user input stream for {}'.format(tty_name))
133 self.connection['from_user'] = open(tty_name, 'r')
134 self.logger.info('get current termios line discipline')
135 self.connection['original termios'] = _termios.tcgetattr(
136 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
137 new_termios = _copy.deepcopy(self.connection['original termios'])
138 # translate carriage return to newline on input
139 new_termios[0] |= _termios.ICRNL
140 # do not ignore carriage return on input
141 new_termios[0] &= ~_termios.IGNCR
142 # do not echo input characters
143 new_termios[3] &= ~_termios.ECHO
144 # echo input characters
145 #new_termios[3] |= _termios.ECHO
146 # echo the NL character even if ECHO is not set
147 new_termios[3] |= _termios.ECHONL
148 # enable canonical mode
149 new_termios[3] |= _termios.ICANON
150 self.logger.info('adjust termios line discipline')
152 self.connection['to_user'], _termios.TCSANOW, new_termios)
153 self.logger.info('send SIGSTOP to pgrp {}'.format(
154 self.connection['tpgrp']))
155 #_os.killpg(self.connection['tpgrp'], _signal.SIGSTOP)
156 _os.kill(-self.connection['tpgrp'], _signal.SIGSTOP)
157 self.connection['tpgrp stopped'] = True
159 self.logger.info('no TTY name given; use stdin/stdout for I/O')
160 self.connection['to_user'] = _sys.stdout
161 self.connection['from_user'] = _sys.stdin
162 self.logger.info('connected to user')
163 self.connection['to_user'].write('\n') # give a clean line to work on
164 self.connection['active'] = True
166 def _disconnect(self):
167 self.logger.info('disconnecting from user')
169 if self.connection.get('original termios', None):
170 self.logger.info('restore original termios line discipline')
172 self.connection['to_user'], _termios.TCSANOW,
173 self.connection['original termios'])
174 if self.connection.get('tpgrp stopped', None) is True:
176 'send SIGCONT to pgrp {}'.format(self.connection['tpgrp']))
177 #_os.killpg(self.connection['tpgrp'], _signal.SIGCONT)
178 _os.kill(-self.connection['tpgrp'], _signal.SIGCONT)
179 if self.connection.get('to_user', None) not in [None, _sys.stdout]:
180 self.logger.info('close to-user output stream')
181 self.connection['to_user'].close()
182 if self.connection.get('from_user',None) not in [None,_sys.stdout]:
183 self.logger.info('close from-user input stream')
184 self.connection['from_user'].close()
186 self.connection = {'active': False}
187 self.logger.info('disconnected from user')
189 def _get_pgrp(self, tty_name):
190 self.logger.info('find process group contolling {}'.format(tty_name))
192 for name in _os.listdir(proc):
193 path = _os_path.join(proc, name)
194 if not (self._digit_regexp.match(name) and _os_path.isdir(path)):
195 continue # not a process directory
196 self.logger.debug('checking process {}'.format(name))
197 fd_path = _os_path.join(path, 'fd', '0')
199 link = _os.readlink(fd_path)
201 self.logger.debug('not our process: {}'.format(e))
202 continue # permission denied (not one of our processes)
204 self.logger.debug('wrong tty: {}'.format(link))
205 continue # not attached to our target tty
206 stat_path = _os_path.join(path, 'stat')
207 stat = open(stat_path, 'r').read()
208 self.logger.debug('check stat for pgrp: {}'.format(stat))
209 match = self._tpgrp_regexp.match(stat)
210 assert match != None, stat
211 pgrp = int(match.group(1))
212 self.logger.info('found pgrp {} for {}'.format(pgrp, tty_name))
214 raise ValueError(tty_name)
216 def _write(self, string):
217 "Write text to the user's terminal."
218 self.connection['to_user'].write(string + '\n')
219 self.connection['to_user'].flush()
222 "Read and return a line from the user's terminal."
223 # drop trailing newline
224 return self.connection['from_user'].readline()[:-1]
226 def _prompt(self, prompt='?', add_colon=True):
229 self.connection['to_user'].write(prompt)
230 self.connection['to_user'].write(' ')
231 self.connection['to_user'].flush()
236 def _handle_GETINFO(self, arg):
238 yield _common.Response('D', str(_os.getpid()))
239 elif arg == 'version':
240 yield _common.Response('D', __version__)
242 raise _error.AssuanError(message='Invalid parameter')
243 yield _common.Response('OK')
245 def _handle_SETDESC(self, arg):
246 self.strings['description'] = arg
247 yield _common.Response('OK')
249 def _handle_SETPROMPT(self, arg):
250 self.strings['prompt'] = arg
251 yield _common.Response('OK')
253 def _handle_SETERROR(self, arg):
254 self.strings['error'] = arg
255 yield _common.Response('OK')
257 def _handle_SETTITLE(self, arg):
258 self.strings['title'] = arg
259 yield _common.Response('OK')
261 def _handle_SETOK(self, arg):
262 self.strings['ok'] = arg
263 yield _common.Response('OK')
265 def _handle_SETCANCEL(self, arg):
266 self.strings['cancel'] = arg
267 yield _common.Response('OK')
269 def _handle_SETNOTOK(self, arg):
270 self.strings['not ok'] = arg
271 yield _common.Response('OK')
273 def _handle_SETQUALITYBAR(self, arg):
274 """Adds a quality indicator to the GETPIN window.
276 This indicator is updated as the passphrase is typed. The
277 clients needs to implement an inquiry named "QUALITY" which
278 gets passed the current passpharse (percent-plus escaped) and
279 should send back a string with a single numerical vauelue
280 between -100 and 100. Negative values will be displayed in
283 If a custom label for the quality bar is required, just add
284 that label as an argument as percent escaped string. You will
285 need this feature to translate the label because pinentry has
286 no internal gettext except for stock strings from the toolkit
289 If you want to show a tooltip for the quality bar, you may use
291 C: SETQUALITYBAR_TT string
294 With STRING being a percent escaped string shown as the tooltip.
296 raise NotImplementedError()
298 def _handle_GETPIN(self, arg):
301 self._write(self.strings['description'])
302 pin = self._prompt(self.strings['prompt'], add_colon=False)
305 yield _common.Response('D', pin)
306 yield _common.Response('OK')
308 def _handle_CONFIRM(self, arg):
311 self._write(self.strings['description'])
312 self._write('1) '+self.strings['ok'])
313 self._write('2) '+self.strings['not ok'])
314 value = self._prompt('?')
318 yield _common.Response('OK')
320 raise _error.AssuanError(message='Not confirmed')
322 def _handle_MESSAGE(self, arg):
323 self._write(self.strings['description'])
324 yield _common.Response('OK')
326 def _handle_CONFIRM(self, args):
327 assert args == '--one-button', args
330 self._write(self.strings['description'])
331 self._write('1) '+self.strings['ok'])
332 value = self._prompt('?')
335 assert value == '1', value
336 yield _common.Response('OK')
339 if __name__ == '__main__':
344 parser = argparse.ArgumentParser(description=__doc__, version=__version__)
346 '-V', '--verbose', action='count', default=0,
347 help='increase verbosity')
350 help='set X display (ignored by this implementation)')
352 args = parser.parse_args()
354 p = PinEntry(override_ttyname=True)
357 p.logger.setLevel(max(
358 logging.DEBUG, p.logger.level - 10*args.verbose))
364 'exiting due to exception:\n{}'.format(
365 traceback.format_exc().rstrip()))