3 # Copyright (C) 2012 W. Trevor King <wking@drexel.edu>
5 # This file is part of pyassuan.
7 # pyassuan is free software: you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation, either version 3 of the License, or (at your option) any later
12 # pyassuan is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License along with
17 # pyassuan. If not, see <http://www.gnu.org/licenses/>.
19 """Simple pinentry program for getting pins from a terminal.
24 import os.path as _os_path
25 import pprint as _pprint
27 import signal as _signal
29 import termios as _termios
31 from pyassuan import __version__
32 from pyassuan import server as _server
33 from pyassuan import common as _common
34 from pyassuan import error as _error
37 class PinEntry (_server.AssuanServer):
38 """pinentry protocol server
40 See ``pinentry-0.8.0/doc/pinentry.texi`` at::
42 ftp://ftp.gnupg.org/gcrypt/pinentry/
43 http://www.gnupg.org/aegypten/
45 for details on the pinentry interface.
47 Alternatively, you can just watch the logs and guess ;). Here's a
48 trace when driven by GnuPG 2.0.17 (libgcrypt 1.4.6)::
50 S: OK Your orders please
53 C: OPTION ttyname=/dev/pts/6
55 C: OPTION ttytype=xterm
57 C: OPTION lc-ctype=en_US.UTF-8
59 C: OPTION lc-messages=en_US.UTF-8
61 C: OPTION default-ok=_OK
63 C: OPTION default-cancel=_Cancel
65 C: OPTION default-prompt=PIN:
67 C: OPTION touch-file=/tmp/gpg-7lElMX/S.gpg-agent
72 C: SETDESC Enter passphrase%0A
74 C: SETPROMPT Passphrase
80 S: OK closing connection
82 _digit_regexp = _re.compile(r'\d+')
84 # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
85 _tpgrp_regexp = _re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
87 def __init__(self, name='pinentry', strict_options=False,
88 single_request=True, **kwargs):
91 super(PinEntry, self).__init__(
92 name=name, strict_options=strict_options,
93 single_request=single_request, **kwargs)
94 self.valid_options.append('ttyname')
97 super(PinEntry, self).reset()
99 self.connection.clear()
104 self.logger.info('connecting to user')
105 self.logger.debug('options:\n{}'.format(_pprint.pformat(self.options)))
106 tty_name = self.options.get('ttyname', None)
108 self.connection['tpgrp'] = self._get_pgrp(tty_name)
110 'open to-user output stream for {}'.format(tty_name))
111 self.connection['to_user'] = open(tty_name, 'w')
113 'open from-user input stream for {}'.format(tty_name))
114 self.connection['from_user'] = open(tty_name, 'r')
115 self.logger.info('get current termios line discipline')
116 self.connection['original termios'] = _termios.tcgetattr(
117 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
118 new_termios = _copy.deepcopy(self.connection['original termios'])
119 # translate carriage return to newline on input
120 new_termios[0] |= _termios.ICRNL
121 # do not ignore carriage return on input
122 new_termios[0] &= ~_termios.IGNCR
123 # do not echo input characters
124 new_termios[3] &= ~_termios.ECHO
125 # echo input characters
126 #new_termios[3] |= _termios.ECHO
127 # echo the NL character even if ECHO is not set
128 new_termios[3] |= _termios.ECHONL
129 # enable canonical mode
130 new_termios[3] |= _termios.ICANON
131 self.logger.info('adjust termios line discipline')
133 self.connection['to_user'], _termios.TCSANOW, new_termios)
134 self.logger.info('send SIGSTOP to pgrp {}'.format(
135 self.connection['tpgrp']))
136 #_os.killpg(self.connection['tpgrp'], _signal.SIGSTOP)
137 _os.kill(-self.connection['tpgrp'], _signal.SIGSTOP)
138 self.connection['tpgrp stopped'] = True
140 self.logger.info('no TTY name given; use stdin/stdout for I/O')
141 self.connection['to_user'] = _sys.stdout
142 self.connection['from_user'] = _sys.stdin
143 self.logger.info('connected to user')
144 self.connection['to_user'].write('\n') # give a clean line to work on
145 self.connection['active'] = True
147 def _disconnect(self):
148 self.logger.info('disconnecting from user')
150 if self.connection.get('original termios', None):
151 self.logger.info('restore original termios line discipline')
153 self.connection['to_user'], _termios.TCSANOW,
154 self.connection['original termios'])
155 if self.connection.get('tpgrp stopped', None) is True:
157 'send SIGCONT to pgrp {}'.format(self.connection['tpgrp']))
158 #_os.killpg(self.connection['tpgrp'], _signal.SIGCONT)
159 _os.kill(-self.connection['tpgrp'], _signal.SIGCONT)
160 if self.connection.get('to_user', None) not in [None, _sys.stdout]:
161 self.logger.info('close to-user output stream')
162 self.connection['to_user'].close()
163 if self.connection.get('from_user',None) not in [None,_sys.stdout]:
164 self.logger.info('close from-user input stream')
165 self.connection['from_user'].close()
167 self.connection = {'active': False}
168 self.logger.info('disconnected from user')
170 def _get_pgrp(self, tty_name):
171 self.logger.info('find process group contolling {}'.format(tty_name))
173 for name in _os.listdir(proc):
174 path = _os_path.join(proc, name)
175 if not (self._digit_regexp.match(name) and _os_path.isdir(path)):
176 continue # not a process directory
177 self.logger.debug('checking process {}'.format(name))
178 fd_path = _os_path.join(path, 'fd', '0')
180 link = _os.readlink(fd_path)
182 self.logger.debug('not our process: {}'.format(e))
183 continue # permission denied (not one of our processes)
185 self.logger.debug('wrong tty: {}'.format(link))
186 continue # not attached to our target tty
187 stat_path = _os_path.join(path, 'stat')
188 stat = open(stat_path, 'r').read()
189 self.logger.debug('check stat for pgrp: {}'.format(stat))
190 match = self._tpgrp_regexp.match(stat)
191 assert match != None, stat
192 pgrp = int(match.group(1))
193 self.logger.info('found pgrp {} for {}'.format(pgrp, tty_name))
195 raise ValueError(tty_name)
197 def _write(self, string):
198 "Write text to the user's terminal."
199 self.connection['to_user'].write(string + '\n')
200 self.connection['to_user'].flush()
203 "Read and return a line from the user's terminal."
204 # drop trailing newline
205 return self.connection['from_user'].readline()[:-1]
207 def _prompt(self, prompt='?', add_colon=True):
210 self.connection['to_user'].write(prompt)
211 self.connection['to_user'].write(' ')
212 self.connection['to_user'].flush()
217 def _handle_GETINFO(self, arg):
219 yield _common.Response('D', str(_os.getpid()).encode('ascii'))
220 elif arg == 'version':
221 yield _common.Response('D', __version__.encode('ascii'))
223 raise _error.AssuanError(message='Invalid parameter')
224 yield _common.Response('OK')
226 def _handle_SETDESC(self, arg):
227 self.strings['description'] = arg
228 yield _common.Response('OK')
230 def _handle_SETPROMPT(self, arg):
231 self.strings['prompt'] = arg
232 yield _common.Response('OK')
234 def _handle_SETERROR(self, arg):
235 self.strings['error'] = arg
236 yield _common.Response('OK')
238 def _handle_SETTITLE(self, arg):
239 self.strings['title'] = arg
240 yield _common.Response('OK')
242 def _handle_SETOK(self, arg):
243 self.strings['ok'] = arg
244 yield _common.Response('OK')
246 def _handle_SETCANCEL(self, arg):
247 self.strings['cancel'] = arg
248 yield _common.Response('OK')
250 def _handle_SETNOTOK(self, arg):
251 self.strings['not ok'] = arg
252 yield _common.Response('OK')
254 def _handle_SETQUALITYBAR(self, arg):
255 """Adds a quality indicator to the GETPIN window.
257 This indicator is updated as the passphrase is typed. The
258 clients needs to implement an inquiry named "QUALITY" which
259 gets passed the current passphrase (percent-plus escaped) and
260 should send back a string with a single numerical vauelue
261 between -100 and 100. Negative values will be displayed in
264 If a custom label for the quality bar is required, just add
265 that label as an argument as percent escaped string. You will
266 need this feature to translate the label because pinentry has
267 no internal gettext except for stock strings from the toolkit
270 If you want to show a tooltip for the quality bar, you may use
272 C: SETQUALITYBAR_TT string
275 With STRING being a percent escaped string shown as the tooltip.
277 Here is a real world example of these commands in use:
279 C: SETQUALITYBAR Quality%3a
281 C: SETQUALITYBAR_TT The quality of the text entered above.%0aPlease ask your administrator for details about the criteria.
284 self.strings['qualitybar'] = arg
285 yield _common.Response('OK')
287 def _handle_SETQUALITYBAR_TT(self, arg):
288 self.strings['qualitybar_tooltip'] = arg
289 yield _common.Response('OK')
291 def _handle_GETPIN(self, arg):
294 self._write(self.strings['description'])
295 if 'qualitybar' in self.strings:
296 self._write(self.strings['qualitybar'])
297 pin = self._prompt(self.strings['prompt'], add_colon=False)
300 yield _common.Response('D', pin.encode('ascii'))
301 yield _common.Response('OK')
303 def _handle_CONFIRM(self, arg):
306 self._write(self.strings['description'])
307 self._write('1) '+self.strings['ok'])
308 self._write('2) '+self.strings['not ok'])
309 value = self._prompt('?')
313 yield _common.Response('OK')
315 raise _error.AssuanError(message='Not confirmed')
317 def _handle_MESSAGE(self, arg):
318 self._write(self.strings['description'])
319 yield _common.Response('OK')
321 def _handle_CONFIRM(self, args):
322 assert args == '--one-button', args
325 self._write(self.strings['description'])
326 self._write('1) '+self.strings['ok'])
327 value = self._prompt('?')
330 assert value == '1', value
331 yield _common.Response('OK')
334 if __name__ == '__main__':
339 parser = argparse.ArgumentParser(description=__doc__)
341 '-v', '--version', action='version',
342 version='%(prog)s {}'.format(__version__))
344 '-V', '--verbose', action='count', default=0,
345 help='increase verbosity')
348 help='set X display (ignored by this implementation)')
350 args = parser.parse_args()
355 p.logger.setLevel(max(
356 logging.DEBUG, p.logger.level - 10*args.verbose))
362 'exiting due to exception:\n{}'.format(
363 traceback.format_exc().rstrip()))