X-Git-Url: http://git.tremily.us/?a=blobdiff_plain;f=pyassuan%2Fcommon.py;h=8b99a4539e65e695a6d0cb17f78657a7e095589b;hb=89a9ab22b2dbb22cf9a22a3d8b9da9e4168b4f25;hp=dbae91a97212a872529693ae7d113d43f8df5c94;hpb=63bf462c5adf8fce0061b36361eb56417dc6624d;p=pyassuan.git diff --git a/pyassuan/common.py b/pyassuan/common.py index dbae91a..8b99a45 100644 --- a/pyassuan/common.py +++ b/pyassuan/common.py @@ -1,4 +1,18 @@ -# Copyright +# Copyright (C) 2012 W. Trevor King +# +# This file is part of pyassuan. +# +# pyassuan is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# pyassuan is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along with +# pyassuan. If not, see . """Items common to both the client and server """ @@ -8,29 +22,44 @@ import re as _re from . import error as _error -_ENCODE_REGEXP = _re.compile( - '(' + '|'.join(['%', '\r', '\n']) + ')') -_DECODE_REGEXP = _re.compile('(%[0-9A-F]{2})') +LINE_LENGTH = 1002 # 1000 + [CR,]LF +_ENCODE_PATTERN = '(' + '|'.join(['%', '\r', '\n']) + ')' +_ENCODE_STR_REGEXP = _re.compile(_ENCODE_PATTERN) +_ENCODE_BYTE_REGEXP = _re.compile(_ENCODE_PATTERN.encode('ascii')) +_DECODE_STR_REGEXP = _re.compile('(%[0-9A-F]{2})') +_DECODE_BYTE_REGEXP = _re.compile(b'(%[0-9A-F]{2})') _REQUEST_REGEXP = _re.compile('^(\w+)( *)(.*)\Z') -def encode(string): +def encode(data): r""" >>> encode('It grew by 5%!\n') 'It grew by 5%25!%0A' - """ - return _ENCODE_REGEXP.sub( - lambda x : to_hex(x.group()), string) + >>> encode(b'It grew by 5%!\n') + b'It grew by 5%25!%0A' + """ + if isinstance(data, bytes): + regexp = _ENCODE_BYTE_REGEXP + else: + regexp = _ENCODE_STR_REGEXP + return regexp.sub( + lambda x : to_hex(x.group()), data) -def decode(string): +def decode(data): r""" >>> decode('%22Look out!%22%0AWhere%3F') '"Look out!"\nWhere?' + >>> decode(b'%22Look out!%22%0AWhere%3F') + b'"Look out!"\nWhere?' """ - return _DECODE_REGEXP.sub( - lambda x : from_hex(x.group()), string) + if isinstance(data, bytes): + regexp = _DECODE_BYTE_REGEXP + else: + regexp = _DECODE_STR_REGEXP + return regexp.sub( + lambda x : from_hex(x.group()), data) def from_hex(code): r""" @@ -39,8 +68,13 @@ def from_hex(code): '"' >>> from_hex('%0A') '\n' + >>> from_hex(b'%0A') + b'\n' """ - return chr(int(code[1:], 16)) + c = chr(int(code[1:], 16)) + if isinstance(code, bytes): + c =c.encode('ascii') + return c def to_hex(char): r""" @@ -49,8 +83,13 @@ def to_hex(char): '%22' >>> to_hex('\n') '%0A' + >>> to_hex(b'\n') + b'%0A' """ - return '%{:02X}'.format(ord(char)) + hx = '%{:02X}'.format(ord(char)) + if isinstance(char, bytes): + hx = hx.encode('ascii') + return hx class Request (object): @@ -64,38 +103,60 @@ class Request (object): >>> r = Request(command='OPTION', parameters='testing at 5%') >>> str(r) 'OPTION testing at 5%25' - >>> r.from_string('BYE') + >>> bytes(r) + b'OPTION testing at 5%25' + >>> r.from_bytes(b'BYE') >>> r.command 'BYE' >>> print(r.parameters) None - >>> r.from_string('OPTION testing at 5%25') + >>> r.from_bytes(b'OPTION testing at 5%25') >>> r.command 'OPTION' >>> print(r.parameters) testing at 5% - >>> r.from_string(' invalid') + >>> r.from_bytes(b' invalid') Traceback (most recent call last): ... pyassuan.error.AssuanError: 170 Invalid request - >>> r.from_string('in-valid') + >>> r.from_bytes(b'in-valid') Traceback (most recent call last): ... pyassuan.error.AssuanError: 170 Invalid request """ - def __init__(self, command=None, parameters=None): + def __init__(self, command=None, parameters=None, encoded=False): self.command = command self.parameters = parameters + self.encoded = encoded def __str__(self): if self.parameters: - return '{} {}'.format(self.command, encode(self.parameters)) + if self.encoded: + encoded_parameters = self.parameters + else: + encoded_parameters = encode(self.parameters) + return '{} {}'.format(self.command, encoded_parameters) return self.command - def from_string(self, string): - if len(string) > 1000: # TODO: byte-vs-str and newlines? + def __bytes__(self): + if self.parameters: + if self.encoded: + encoded_parameters = self.parameters + else: + encoded_parameters = encode(self.parameters) + return '{} {}'.format( + self.command, encoded_parameters).encode('utf-8') + return self.command.encode('utf-8') + + def from_bytes(self, line): + if len(line) > 1000: # TODO: byte-vs-str and newlines? raise _error.AssuanError(message='Line too long') - match = _REQUEST_REGEXP.match(string) + if line.startswith(b'D '): + self.command = 'D' + self.parameters = decode(line[2:]) + else: + line = str(line, encoding='utf-8') + match = _REQUEST_REGEXP.match(line) if not match: raise _error.AssuanError(message='Invalid request') self.command = match.group(1) @@ -119,21 +180,23 @@ class Response (object): >>> r = Response(type='ERR', parameters='1 General error') >>> str(r) 'ERR 1 General error' - >>> r.from_string('OK') + >>> bytes(r) + b'ERR 1 General error' + >>> r.from_bytes(b'OK') >>> r.type 'OK' >>> print(r.parameters) None - >>> r.from_string('ERR 1 General error') + >>> r.from_bytes(b'ERR 1 General error') >>> r.type 'ERR' >>> print(r.parameters) 1 General error - >>> r.from_string(' invalid') + >>> r.from_bytes(b' invalid') Traceback (most recent call last): ... pyassuan.error.AssuanError: 76 Invalid response - >>> r.from_string('in-valid') + >>> r.from_bytes(b'in-valid') Traceback (most recent call last): ... pyassuan.error.AssuanError: 76 Invalid response @@ -156,20 +219,34 @@ class Response (object): return '{} {}'.format(self.type, encode(self.parameters)) return self.type - def from_string(self, string): - if len(string) > 1000: # TODO: byte-vs-str and newlines? + def __bytes__(self): + if self.parameters: + if self.type == 'D': + return b'{} {}'.format(b'D', self.parameters) + else: + return '{} {}'.format( + self.type, encode(self.parameters)).encode('utf-8') + return self.type.encode('utf-8') + + def from_bytes(self, line): + if len(line) > 1000: # TODO: byte-vs-str and newlines? raise _error.AssuanError(message='Line too long') + if line.startswith(b'D'): + self.command = t = 'D' + else: + line = str(line, encoding='utf-8') + t = line[0] try: - type = self.types[string[0]] + type = self.types[t] except KeyError: raise _error.AssuanError(message='Invalid response') self.type = type if type == 'D': # data - self.parameters = decode(string[2:]) + self.parameters = decode(line[2:]) elif type == '#': # comment - self.parameters = decode(string[2:]) + self.parameters = decode(line[2:]) else: - match = _REQUEST_REGEXP.match(string) + match = _REQUEST_REGEXP.match(line) if not match: raise _error.AssuanError(message='Invalid request') if match.group(3):