3 # Copyright (C) 2011 W. Trevor King <wking@drexel.edu>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Lesser General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program. If not, see
17 # <http://www.gnu.org/licenses/>.
19 """Simple pinentry program for getting pins from a terminal.
24 import logging.handlers
39 logger = logging.getLogger('pinentry')
40 logger.setLevel(logging.WARNING)
41 _h = logging.handlers.SysLogHandler(address='/dev/log')
42 _h.setLevel(logging.DEBUG)
43 _f = logging.Formatter('%(name)s: %(levelname)s: %(message)s')
49 class PinEntry (object):
50 """pinentry protocol server
52 See the `Assuan manual`_ for a description of the protocol.
54 .. _Assuan manual: http://www.gnupg.org/documentation/manuals/assuan/
56 _digit_regexp = re.compile(r'\d+')
58 # from proc(5): pid comm state ppid pgrp session tty_nr tpgid
59 _tpgrp_regexp = re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)')
61 _assuan_encode_regexp = re.compile(
62 '(' + '|'.join(['%', '\r', '\n']) + ')')
63 _assuan_decode_regexp = re.compile('(%[0-9A-F]{2})')
72 logger.info('---opening pinentry---')
73 logger.info('OK Your orders please')
74 sys.stdout.write('OK Your orders please\n')
78 line = sys.stdin.readline()
81 line = line.rstrip() # dangerous?
83 line = self._decode(line)
84 fields = line.split(' ', 1)
90 handle = getattr(self, '_handle_%s' % cmd, None)
92 for response in handle(arg):
93 response = self._encode(response)
95 sys.stdout.write(response+'\n')
102 raise ValueError(line)
104 logger.info('---closing pinentry---')
109 logger.info('--connecting to user--')
110 logger.debug('options:\n%s' % pprint.pformat(self.options))
111 tty_name = self.options.get('ttyname', None)
113 self.connection['tpgrp'] = self._get_pgrp(tty_name)
114 logger.info('open to-user output stream for %s' % tty_name)
115 self.connection['to_user'] = open(tty_name, 'w')
116 logger.info('open from-user input stream for %s' % tty_name)
117 self.connection['from_user'] = open(tty_name, 'r')
118 logger.info('get current termios line discipline')
119 self.connection['original termios'] = termios.tcgetattr(
120 self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...]
121 new_termios = copy.deepcopy(self.connection['original termios'])
122 # translate carriage return to newline on input
123 new_termios[0] |= termios.ICRNL
124 # do not ignore carriage return on input
125 new_termios[0] &= ~termios.IGNCR
126 # do not echo input characters
127 new_termios[3] &= ~termios.ECHO
128 # echo input characters
129 #new_termios[3] |= termios.ECHO
130 # echo the NL character even if ECHO is not set
131 new_termios[3] |= termios.ECHONL
132 # enable canonical mode
133 new_termios[3] |= termios.ICANON
134 logger.info('adjust termios line discipline')
136 self.connection['to_user'], termios.TCSANOW, new_termios)
137 logger.info('send SIGSTOP to pgrp %d' % self.connection['tpgrp'])
138 #os.killpg(self.connection['tpgrp'], signal.SIGSTOP)
139 os.kill(-self.connection['tpgrp'], signal.SIGSTOP)
140 self.connection['tpgrp stopped'] = True
142 logger.info('no TTY name given; use stdin/stdout for I/O')
143 self.connection['to_user'] = sys.stdout
144 self.connection['from_user'] = sys.stdin
145 logger.info('--connected to user--')
146 self.connection['to_user'].write('\n') # give a clean line to work on
147 self.connection['active'] = True
149 def _disconnect(self):
150 logger.info('--disconnecting from user--')
152 if self.connection.get('original termios', None):
153 logger.info('restore original termios line discipline')
155 self.connection['to_user'], termios.TCSANOW,
156 self.connection['original termios'])
157 if self.connection.get('tpgrp stopped', None) is True:
159 'send SIGCONT to pgrp %d' % self.connection['tpgrp'])
160 #os.killpg(self.connection['tpgrp'], signal.SIGCONT)
161 os.kill(-self.connection['tpgrp'], signal.SIGCONT)
162 if self.connection.get('to_user', None) not in [None, sys.stdout]:
163 logger.info('close to-user output stream')
164 self.connection['to_user'].close()
165 if self.connection.get('from_user', None) not in [None,sys.stdout]:
166 logger.info('close from-user input stream')
167 self.connection['from_user'].close()
169 self.connection = {'active': False}
170 logger.info('--disconnected from user--')
172 def _get_pgrp(self, tty_name):
173 logger.info('find process group contolling %s' % tty_name)
175 for name in os.listdir(proc):
176 path = os.path.join(proc, name)
177 if not (self._digit_regexp.match(name) and os.path.isdir(path)):
178 continue # not a process directory
179 logger.debug('checking process %s' % name)
180 fd_path = os.path.join(path, 'fd', '0')
182 link = os.readlink(fd_path)
184 logger.debug('not our process: %s' % e)
185 continue # permission denied (not one of our processes)
187 logger.debug('wrong tty: %s' % link)
188 continue # not attached to our target tty
189 stat_path = os.path.join(path, 'stat')
190 stat = open(stat_path, 'r').read()
191 logger.debug('check stat for pgrp: %s' % stat)
192 match = self._tpgrp_regexp.match(stat)
193 assert match != None, stat
194 pgrp = int(match.group(1))
195 logger.info('found pgrp %d for %s' % (pgrp, tty_name))
197 raise ValueError(tty_name)
199 def _write(self, string):
200 "Write text to the user's terminal."
201 self.connection['to_user'].write(string + '\n')
202 self.connection['to_user'].flush()
205 "Read and return a line from the user's terminal."
206 # drop trailing newline
207 return self.connection['from_user'].readline()[:-1]
209 def _prompt(self, prompt='?', add_colon=True):
212 self.connection['to_user'].write('%s ' % prompt)
213 self.connection['to_user'].flush()
218 def _encode(self, string):
222 >>> p._encode('It grew by 5%!\\n')
223 'It grew by 5%25!%0A'
225 return self._assuan_encode_regexp.sub(
226 lambda x : self._to_hex(x.group()), string)
228 def _decode(self, string):
232 >>> p._decode('%22Look out!%22%0AWhere%3F')
233 '"Look out!"\\nWhere?'
235 return self._assuan_decode_regexp.sub(
236 lambda x : self._from_hex(x.group()), string)
238 def _from_hex(self, code):
242 >>> p._from_hex('%22')
244 >>> p._from_hex('%0A')
247 return chr(int(code[1:], 16))
249 def _to_hex(self, char):
258 return '%%%02X' % ord(char)
262 def _handle_BYE(self, arg):
264 yield 'OK closing connection'
266 def _handle_OPTION(self, arg):
267 # ttytype to set TERM
268 fields = arg.split('=', 1)
274 self.options[key] = value
277 def _handle_GETINFO(self, arg):
279 yield 'D %d' % os.getpid()
281 raise ValueError(arg)
284 def _handle_SETDESC(self, arg):
285 self.strings['description'] = arg
288 def _handle_SETPROMPT(self, arg):
289 self.strings['prompt'] = arg
292 def _handle_SETERROR(self, arg):
293 self.strings['error'] = arg
296 def _handle_SETTITLE(self, arg):
297 self.strings['title'] = arg
300 def _handle_SETOK(self, arg):
301 self.strings['ok'] = arg
304 def _handle_SETCANCEL(self, arg):
305 self.strings['cancel'] = arg
308 def _handle_SETNOTOK(self, arg):
309 self.strings['not ok'] = arg
312 def _handle_SETQUALITYBAR(self, arg):
313 """Adds a quality indicator to the GETPIN window. This
314 indicator is updated as the passphrase is typed. The clients
315 needs to implement an inquiry named "QUALITY" which gets passed
316 the current passpharse (percent-plus escaped) and should send
317 back a string with a single numerical vauelue between -100 and
318 100. Negative values will be displayed in red.
320 If a custom label for the quality bar is required, just add that
321 label as an argument as percent escaped string. You will need
322 this feature to translate the label because pinentry has no
323 internal gettext except for stock strings from the toolkit library.
325 If you want to show a tooltip for the quality bar, you may use
326 C: SETQUALITYBAR_TT string
329 With STRING being a percent escaped string shown as the tooltip.
331 raise NotImplementedError()
333 def _handle_GETPIN(self, arg):
336 self._write(self.strings['description'])
337 pin = self._prompt(self.strings['prompt'], add_colon=False)
343 def _handle_CONFIRM(self, arg):
346 self._write(self.strings['description'])
347 self._write('1) '+self.strings['ok'])
348 self._write('2) '+self.strings['not ok'])
349 value = self._prompt('?')
355 yield 'ASSUAN_Not_Confirmed'
357 def _handle_MESSAGE(self, arg):
358 self._write(self.strings['description'])
361 def _handle_CONFIRM(self, args):
362 assert args == '--one-button', args
365 self._write(self.strings['description'])
366 self._write('1) '+self.strings['ok'])
367 value = self._prompt('?')
370 assert value == '1', value
374 if __name__ == '__main__':
377 parser = argparse.ArgumentParser(description=__doc__, version=__version__)
379 '-V', '--verbose', action='count', default=0,
380 help='increase verbosity')
383 help='set X display (ignored by this implementation)')
385 args = parser.parse_args()
387 if args.verbose >= 2:
388 logger.setLevel(logging.DEBUG)
389 elif args.verbose >= 1:
390 logger.setLevel(logging.INFO)
396 logger.error('exiting due to exception:\n%s' %
397 traceback.format_exc().rstrip())