"""Simple pinentry program for getting server info.
"""
-import socket as _socket
-
from pyassuan import __version__
from pyassuan import client as _client
from pyassuan import common as _common
client.logger.setLevel(max(
logging.DEBUG, client.logger.level - 10*args.verbose))
- socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
- socket.connect(args.filename)
- client.input = socket.makefile('rb')
- client.output = socket.makefile('wb')
- client.connect()
+ client.connect(socket_path=args.filename)
try:
response = client.read_response()
assert response.type == 'OK', response
finally:
client.make_request(_common.Request('BYE'))
client.disconnect()
- socket.shutdown(_socket.SHUT_RDWR)
- socket.close()
# pyassuan. If not, see <http://www.gnu.org/licenses/>.
import logging as _logging
+import socket as _socket
import sys as _sys
from . import LOG as _LOG
logger = _logging.getLogger('{}.{}'.format(logger.name, self.name))
self.logger = logger
self.close_on_disconnect = close_on_disconnect
- self.input = self.output = None
+ self.input = self.output = self.socket = None
- def connect(self):
- if not self.input:
- self.logger.info('read from stdin')
- self.input = _sys.stdin.buffer
- if not self.output:
- self.logger.info('write to stdout')
- self.output = _sys.stdout.buffer
+ def connect(self, socket_path=None):
+ if socket_path:
+ self.logger.info(
+ 'connect to Unix socket at {}'.format(socket_path))
+ self.socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
+ self.socket.connect(socket_path)
+ self.input = self.socket.makefile('rb')
+ self.output = self.socket.makefile('wb')
+ else:
+ if not self.input:
+ self.logger.info('read from stdin')
+ self.input = _sys.stdin.buffer
+ if not self.output:
+ self.logger.info('write to stdout')
+ self.output = _sys.stdout.buffer
def disconnect(self):
if self.close_on_disconnect:
self.logger.info('disconnecting')
- self.input = None
- self.output = None
+ if self.input is not None:
+ self.input.close()
+ self.input = None
+ if self.output is not None:
+ self.output.close()
+ self.output = None
+ if self.socket is not None:
+ self.socket.shutdown(_socket.SHUT_RDWR)
+ self.socket.close()
+ self.socket = None
def raise_error(self, error):
self.logger.error(str(error))