from pyassuan import __version__
from pyassuan import client as _client
from pyassuan import common as _common
+from pyassuan import error as _error
if __name__ == '__main__':
socket = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
socket.connect(args.filename)
- client.input = socket.makefile('r')
- client.output = socket.makefile('w')
+ client.input = socket.makefile('rb')
+ client.output = socket.makefile('wb')
client.connect()
try:
response = client.read_response()
client.make_request(_common.Request('HELP'))
client.make_request(_common.Request('HELP GETINFO'))
for attribute in ['version', 'pid', 'socket_name', 'ssh_socket_name']:
- client.make_request(_common.Request('GETINFO', attribute))
+ try:
+ client.make_request(_common.Request('GETINFO', attribute))
+ except _error.AssuanError as e:
+ if e.message.startswith('No data'):
+ pass
+ else:
+ raise
finally:
client.make_request(_common.Request('BYE'))
client.disconnect()
if not line:
self.raise_error(
_error.AssuanError(message='IPC accept call failed'))
- if not line.endswith('\n'):
+ if len(line) > _common.LINE_LENGTH:
+ self.raise_error(
+ _error.AssuanError(message='Line too long'))
+ if not line.endswith(b'\n'):
+ self.logger.info('S: {}'.format(line))
self.raise_error(
_error.AssuanError(message='Invalid response'))
line = line[:-1] # remove trailing newline
- # TODO, line length?
response = _common.Response()
try:
- response.from_string(line)
+ response.from_bytes(line)
except _error.AssuanError as e:
self.logger.error(str(e))
raise
return response
def _write_request(self, request):
- rstring = str(request)
- self.logger.info('C: {}'.format(rstring))
- self.output.write(rstring)
- self.output.write('\n')
+ self.logger.info('C: {}'.format(request))
+ self.output.write(bytes(request))
+ self.output.write(b'\n')
try:
self.output.flush()
except IOError:
if response.type == 'D':
data.append(response.parameters)
if data:
- data = ''.join(data)
+ data = b''.join(data)
else:
data = None
return (responses, data)
LINE_LENGTH = 1002 # 1000 + [CR,]LF
-_ENCODE_REGEXP = _re.compile(
- '(' + '|'.join(['%', '\r', '\n']) + ')')
-_DECODE_REGEXP = _re.compile('(%[0-9A-F]{2})')
+_ENCODE_PATTERN = '(' + '|'.join(['%', '\r', '\n']) + ')'
+_ENCODE_STR_REGEXP = _re.compile(_ENCODE_PATTERN)
+_ENCODE_BYTE_REGEXP = _re.compile(_ENCODE_PATTERN.encode('ascii'))
+_DECODE_STR_REGEXP = _re.compile('(%[0-9A-F]{2})')
+_DECODE_BYTE_REGEXP = _re.compile(b'(%[0-9A-F]{2})')
_REQUEST_REGEXP = _re.compile('^(\w+)( *)(.*)\Z')
-def encode(string):
+def encode(data):
r"""
>>> encode('It grew by 5%!\n')
'It grew by 5%25!%0A'
- """
- return _ENCODE_REGEXP.sub(
- lambda x : to_hex(x.group()), string)
+ >>> encode(b'It grew by 5%!\n')
+ b'It grew by 5%25!%0A'
+ """
+ if isinstance(data, bytes):
+ regexp = _ENCODE_BYTE_REGEXP
+ else:
+ regexp = _ENCODE_STR_REGEXP
+ return regexp.sub(
+ lambda x : to_hex(x.group()), data)
-def decode(string):
+def decode(data):
r"""
>>> decode('%22Look out!%22%0AWhere%3F')
'"Look out!"\nWhere?'
+ >>> decode(b'%22Look out!%22%0AWhere%3F')
+ b'"Look out!"\nWhere?'
"""
- return _DECODE_REGEXP.sub(
- lambda x : from_hex(x.group()), string)
+ if isinstance(data, bytes):
+ regexp = _DECODE_BYTE_REGEXP
+ else:
+ regexp = _DECODE_STR_REGEXP
+ return regexp.sub(
+ lambda x : from_hex(x.group()), data)
def from_hex(code):
r"""
'"'
>>> from_hex('%0A')
'\n'
+ >>> from_hex(b'%0A')
+ b'\n'
"""
- return chr(int(code[1:], 16))
+ c = chr(int(code[1:], 16))
+ if isinstance(code, bytes):
+ c =c.encode('ascii')
+ return c
def to_hex(char):
r"""
'%22'
>>> to_hex('\n')
'%0A'
+ >>> to_hex(b'\n')
+ b'%0A'
"""
- return '%{:02X}'.format(ord(char))
+ hx = '%{:02X}'.format(ord(char))
+ if isinstance(char, bytes):
+ hx = hx.encode('ascii')
+ return hx
class Request (object):
>>> r = Request(command='OPTION', parameters='testing at 5%')
>>> str(r)
'OPTION testing at 5%25'
- >>> r.from_string('BYE')
+ >>> bytes(r)
+ b'OPTION testing at 5%25'
+ >>> r.from_bytes(b'BYE')
>>> r.command
'BYE'
>>> print(r.parameters)
None
- >>> r.from_string('OPTION testing at 5%25')
+ >>> r.from_bytes(b'OPTION testing at 5%25')
>>> r.command
'OPTION'
>>> print(r.parameters)
testing at 5%
- >>> r.from_string(' invalid')
+ >>> r.from_bytes(b' invalid')
Traceback (most recent call last):
...
pyassuan.error.AssuanError: 170 Invalid request
- >>> r.from_string('in-valid')
+ >>> r.from_bytes(b'in-valid')
Traceback (most recent call last):
...
pyassuan.error.AssuanError: 170 Invalid request
return '{} {}'.format(self.command, encoded_parameters)
return self.command
- def from_string(self, string):
- if len(string) > 1000: # TODO: byte-vs-str and newlines?
+ def __bytes__(self):
+ if self.parameters:
+ if self.encoded:
+ encoded_parameters = self.parameters
+ else:
+ encoded_parameters = encode(self.parameters)
+ return '{} {}'.format(
+ self.command, encoded_parameters).encode('utf-8')
+ return self.command.encode('utf-8')
+
+ def from_bytes(self, line):
+ if len(line) > 1000: # TODO: byte-vs-str and newlines?
raise _error.AssuanError(message='Line too long')
- match = _REQUEST_REGEXP.match(string)
+ if line.startswith(b'D '):
+ self.command = 'D'
+ self.parameters = decode(line[2:])
+ else:
+ line = str(line, encoding='utf-8')
+ match = _REQUEST_REGEXP.match(line)
if not match:
raise _error.AssuanError(message='Invalid request')
self.command = match.group(1)
>>> r = Response(type='ERR', parameters='1 General error')
>>> str(r)
'ERR 1 General error'
- >>> r.from_string('OK')
+ >>> bytes(r)
+ b'ERR 1 General error'
+ >>> r.from_bytes(b'OK')
>>> r.type
'OK'
>>> print(r.parameters)
None
- >>> r.from_string('ERR 1 General error')
+ >>> r.from_bytes(b'ERR 1 General error')
>>> r.type
'ERR'
>>> print(r.parameters)
1 General error
- >>> r.from_string(' invalid')
+ >>> r.from_bytes(b' invalid')
Traceback (most recent call last):
...
pyassuan.error.AssuanError: 76 Invalid response
- >>> r.from_string('in-valid')
+ >>> r.from_bytes(b'in-valid')
Traceback (most recent call last):
...
pyassuan.error.AssuanError: 76 Invalid response
return '{} {}'.format(self.type, encode(self.parameters))
return self.type
- def from_string(self, string):
- if len(string) > 1000: # TODO: byte-vs-str and newlines?
+ def __bytes__(self):
+ if self.parameters:
+ if self.type == 'D':
+ return b'{} {}'.format(b'D', self.parameters)
+ else:
+ return '{} {}'.format(
+ self.type, encode(self.parameters)).encode('utf-8')
+ return self.type.encode('utf-8')
+
+ def from_bytes(self, line):
+ if len(line) > 1000: # TODO: byte-vs-str and newlines?
raise _error.AssuanError(message='Line too long')
+ if line.startswith(b'D'):
+ self.command = t = 'D'
+ else:
+ line = str(line, encoding='utf-8')
+ t = line[0]
try:
- type = self.types[string[0]]
+ type = self.types[t]
except KeyError:
raise _error.AssuanError(message='Invalid response')
self.type = type
if type == 'D': # data
- self.parameters = decode(string[2:])
+ self.parameters = decode(line[2:])
elif type == '#': # comment
- self.parameters = decode(string[2:])
+ self.parameters = decode(line[2:])
else:
- match = _REQUEST_REGEXP.match(string)
+ match = _REQUEST_REGEXP.match(line)
if not match:
raise _error.AssuanError(message='Invalid request')
if match.group(3):