3 # Copyright (C) 2012 W. Trevor King <wking@drexel.edu>
5 # This file is part of pyassuan.
7 # pyassuan is free software: you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation, either version 3 of the License, or (at your option) any later
12 # pyassuan is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License along with
17 # pyassuan. If not, see <http://www.gnu.org/licenses/>.
19 """Simple pinentry program for getting pins from a terminal.
24 import os.path as _os_path
25 import pprint as _pprint
27 import signal as _signal
29 import termios as _termios
31 from pyassuan import __version__
32 from pyassuan import server as _server
33 from pyassuan import common as _common
34 from pyassuan import error as _error
37 class PinEntry (_server.AssuanServer):
38 """pinentry protocol server
40 See ``pinentry-0.8.0/doc/pinentry.texi`` at::
42 ftp://ftp.gnupg.org/gcrypt/pinentry/
43 http://www.gnupg.org/aegypten/
45 for details on the pinentry interface.
47 Alternatively, you can just watch the logs and guess ;). Here's a
48 trace when driven by GnuPG 2.0.17 (libgcrypt 1.4.6)::
50 S: OK Your orders please
53 C: OPTION ttyname=/dev/pts/6
55 C: OPTION ttytype=xterm
57 C: OPTION lc-ctype=en_US.UTF-8
59 C: OPTION lc-messages=en_US.UTF-8
61 C: OPTION default-ok=_OK
63 C: OPTION default-cancel=_Cancel
65 C: OPTION default-prompt=PIN:
67 C: OPTION touch-file=/tmp/gpg-7lElMX/S.gpg-agent
72 C: SETDESC Enter passphrase%0A
74 C: SETPROMPT Passphrase
80 S: OK closing connection
82 _digit_regexp = _re.compile(r'\d+')
84 # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
85 _tpgrp_regexp = _re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
87 def __init__(self, name='pinentry', strict_options=False,
88 single_request=True, **kwargs):
91 super(PinEntry, self).__init__(
92 name=name, strict_options=strict_options,
93 single_request=single_request, **kwargs)
94 self.valid_options.append('ttyname')
97 super(PinEntry, self).reset()
99 self.connection.clear()
104 self.logger.info('connecting to user')
105 self.logger.debug('options:\n{}'.format(_pprint.pformat(self.options)))
106 tty_name = self.options.get('ttyname', None)
108 self.connection['tpgrp'] = self._get_pgrp(tty_name)
110 'open to-user output stream for {}'.format(tty_name))
111 self.connection['to_user'] = open(tty_name, 'w')
113 'open from-user input stream for {}'.format(tty_name))
114 self.connection['from_user'] = open(tty_name, 'r')
115 self.logger.info('get current termios line discipline')
116 self.connection['original termios'] = _termios.tcgetattr(
117 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
118 new_termios = _copy.deepcopy(self.connection['original termios'])
119 # translate carriage return to newline on input
120 new_termios[0] |= _termios.ICRNL
121 # do not ignore carriage return on input
122 new_termios[0] &= ~_termios.IGNCR
123 # do not echo input characters
124 new_termios[3] &= ~_termios.ECHO
125 # echo input characters
126 #new_termios[3] |= _termios.ECHO
127 # echo the NL character even if ECHO is not set
128 new_termios[3] |= _termios.ECHONL
129 # enable canonical mode
130 new_termios[3] |= _termios.ICANON
131 self.logger.info('adjust termios line discipline')
133 self.connection['to_user'], _termios.TCSANOW, new_termios)
134 self.logger.info('send SIGSTOP to pgrp {}'.format(
135 self.connection['tpgrp']))
136 #_os.killpg(self.connection['tpgrp'], _signal.SIGSTOP)
137 _os.kill(-self.connection['tpgrp'], _signal.SIGSTOP)
138 self.connection['tpgrp stopped'] = True
140 self.logger.info('no TTY name given; use stdin/stdout for I/O')
141 self.connection['to_user'] = _sys.stdout
142 self.connection['from_user'] = _sys.stdin
143 self.logger.info('connected to user')
144 self.connection['to_user'].write('\n') # give a clean line to work on
145 self.connection['active'] = True
147 def _disconnect(self):
148 self.logger.info('disconnecting from user')
150 if self.connection.get('original termios', None):
151 self.logger.info('restore original termios line discipline')
153 self.connection['to_user'], _termios.TCSANOW,
154 self.connection['original termios'])
155 if self.connection.get('tpgrp stopped', None) is True:
157 'send SIGCONT to pgrp {}'.format(self.connection['tpgrp']))
158 #_os.killpg(self.connection['tpgrp'], _signal.SIGCONT)
159 _os.kill(-self.connection['tpgrp'], _signal.SIGCONT)
160 if self.connection.get('to_user', None) not in [None, _sys.stdout]:
161 self.logger.info('close to-user output stream')
162 self.connection['to_user'].close()
163 if self.connection.get('from_user',None) not in [None,_sys.stdout]:
164 self.logger.info('close from-user input stream')
165 self.connection['from_user'].close()
167 self.connection = {'active': False}
168 self.logger.info('disconnected from user')
170 def _get_pgrp(self, tty_name):
171 self.logger.info('find process group contolling {}'.format(tty_name))
173 for name in _os.listdir(proc):
174 path = _os_path.join(proc, name)
175 if not (self._digit_regexp.match(name) and _os_path.isdir(path)):
176 continue # not a process directory
177 self.logger.debug('checking process {}'.format(name))
178 fd_path = _os_path.join(path, 'fd', '0')
180 link = _os.readlink(fd_path)
182 self.logger.debug('not our process: {}'.format(e))
183 continue # permission denied (not one of our processes)
185 self.logger.debug('wrong tty: {}'.format(link))
186 continue # not attached to our target tty
187 stat_path = _os_path.join(path, 'stat')
188 stat = open(stat_path, 'r').read()
189 self.logger.debug('check stat for pgrp: {}'.format(stat))
190 match = self._tpgrp_regexp.match(stat)
191 assert match != None, stat
192 pgrp = int(match.group(1))
193 self.logger.info('found pgrp {} for {}'.format(pgrp, tty_name))
195 raise ValueError(tty_name)
197 def _write(self, string):
198 "Write text to the user's terminal."
199 self.connection['to_user'].write(string + '\n')
200 self.connection['to_user'].flush()
203 "Read and return a line from the user's terminal."
204 # drop trailing newline
205 return self.connection['from_user'].readline()[:-1]
207 def _prompt(self, prompt='?', add_colon=True):
210 self.connection['to_user'].write(prompt)
211 self.connection['to_user'].write(' ')
212 self.connection['to_user'].flush()
217 def _handle_GETINFO(self, arg):
219 yield _common.Response('D', str(_os.getpid()).encode('ascii'))
220 elif arg == 'version':
221 yield _common.Response('D', __version__.encode('ascii'))
223 raise _error.AssuanError(message='Invalid parameter')
224 yield _common.Response('OK')
226 def _handle_SETDESC(self, arg):
227 self.strings['description'] = arg
228 yield _common.Response('OK')
230 def _handle_SETPROMPT(self, arg):
231 self.strings['prompt'] = arg
232 yield _common.Response('OK')
234 def _handle_SETERROR(self, arg):
235 self.strings['error'] = arg
236 yield _common.Response('OK')
238 def _handle_SETTITLE(self, arg):
239 self.strings['title'] = arg
240 yield _common.Response('OK')
242 def _handle_SETOK(self, arg):
243 self.strings['ok'] = arg
244 yield _common.Response('OK')
246 def _handle_SETCANCEL(self, arg):
247 self.strings['cancel'] = arg
248 yield _common.Response('OK')
250 def _handle_SETNOTOK(self, arg):
251 self.strings['not ok'] = arg
252 yield _common.Response('OK')
254 def _handle_SETQUALITYBAR(self, arg):
255 """Adds a quality indicator to the GETPIN window.
257 This indicator is updated as the passphrase is typed. The
258 clients needs to implement an inquiry named "QUALITY" which
259 gets passed the current passpharse (percent-plus escaped) and
260 should send back a string with a single numerical vauelue
261 between -100 and 100. Negative values will be displayed in
264 If a custom label for the quality bar is required, just add
265 that label as an argument as percent escaped string. You will
266 need this feature to translate the label because pinentry has
267 no internal gettext except for stock strings from the toolkit
270 If you want to show a tooltip for the quality bar, you may use
272 C: SETQUALITYBAR_TT string
275 With STRING being a percent escaped string shown as the tooltip.
277 raise NotImplementedError()
279 def _handle_GETPIN(self, arg):
282 self._write(self.strings['description'])
283 pin = self._prompt(self.strings['prompt'], add_colon=False)
286 yield _common.Response('D', pin.encode('ascii'))
287 yield _common.Response('OK')
289 def _handle_CONFIRM(self, arg):
292 self._write(self.strings['description'])
293 self._write('1) '+self.strings['ok'])
294 self._write('2) '+self.strings['not ok'])
295 value = self._prompt('?')
299 yield _common.Response('OK')
301 raise _error.AssuanError(message='Not confirmed')
303 def _handle_MESSAGE(self, arg):
304 self._write(self.strings['description'])
305 yield _common.Response('OK')
307 def _handle_CONFIRM(self, args):
308 assert args == '--one-button', args
311 self._write(self.strings['description'])
312 self._write('1) '+self.strings['ok'])
313 value = self._prompt('?')
316 assert value == '1', value
317 yield _common.Response('OK')
320 if __name__ == '__main__':
325 parser = argparse.ArgumentParser(description=__doc__, version=__version__)
327 '-V', '--verbose', action='count', default=0,
328 help='increase verbosity')
331 help='set X display (ignored by this implementation)')
333 args = parser.parse_args()
338 p.logger.setLevel(max(
339 logging.DEBUG, p.logger.level - 10*args.verbose))
345 'exiting due to exception:\n{}'.format(
346 traceback.format_exc().rstrip()))