1 # Copyright (C) 2012 W. Trevor King <wking@drexel.edu>
3 # This file is part of pyassuan.
5 # pyassuan is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
10 # pyassuan is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License along with
15 # pyassuan. If not, see <http://www.gnu.org/licenses/>.
17 """Items common to both the client and server
22 from . import error as _error
25 LINE_LENGTH = 1002 # 1000 + [CR,]LF
26 _ENCODE_PATTERN = '(' + '|'.join(['%', '\r', '\n']) + ')'
27 _ENCODE_STR_REGEXP = _re.compile(_ENCODE_PATTERN)
28 _ENCODE_BYTE_REGEXP = _re.compile(_ENCODE_PATTERN.encode('ascii'))
29 _DECODE_STR_REGEXP = _re.compile('(%[0-9A-F]{2})')
30 _DECODE_BYTE_REGEXP = _re.compile(b'(%[0-9A-F]{2})')
31 _REQUEST_REGEXP = _re.compile('^(\w+)( *)(.*)\Z')
37 >>> encode('It grew by 5%!\n')
39 >>> encode(b'It grew by 5%!\n')
40 b'It grew by 5%25!%0A'
42 if isinstance(data, bytes):
43 regexp = _ENCODE_BYTE_REGEXP
45 regexp = _ENCODE_STR_REGEXP
47 lambda x : to_hex(x.group()), data)
52 >>> decode('%22Look out!%22%0AWhere%3F')
54 >>> decode(b'%22Look out!%22%0AWhere%3F')
55 b'"Look out!"\nWhere?'
57 if isinstance(data, bytes):
58 regexp = _DECODE_BYTE_REGEXP
60 regexp = _DECODE_STR_REGEXP
62 lambda x : from_hex(x.group()), data)
74 c = chr(int(code[1:], 16))
75 if isinstance(code, bytes):
89 hx = '%{:02X}'.format(ord(char))
90 if isinstance(char, bytes):
91 hx = hx.encode('ascii')
95 class Request (object):
98 http://www.gnupg.org/documentation/manuals/assuan/Client-requests.html
100 >>> r = Request(command='BYE')
103 >>> r = Request(command='OPTION', parameters='testing at 5%')
105 'OPTION testing at 5%25'
107 b'OPTION testing at 5%25'
108 >>> r.from_bytes(b'BYE')
111 >>> print(r.parameters)
113 >>> r.from_bytes(b'OPTION testing at 5%25')
116 >>> print(r.parameters)
118 >>> r.from_bytes(b' invalid')
119 Traceback (most recent call last):
121 pyassuan.error.AssuanError: 170 Invalid request
122 >>> r.from_bytes(b'in-valid')
123 Traceback (most recent call last):
125 pyassuan.error.AssuanError: 170 Invalid request
127 def __init__(self, command=None, parameters=None, encoded=False):
128 self.command = command
129 self.parameters = parameters
130 self.encoded = encoded
135 encoded_parameters = self.parameters
137 encoded_parameters = encode(self.parameters)
138 return '{} {}'.format(self.command, encoded_parameters)
144 encoded_parameters = self.parameters
146 encoded_parameters = encode(self.parameters)
147 return '{} {}'.format(
148 self.command, encoded_parameters).encode('utf-8')
149 return self.command.encode('utf-8')
151 def from_bytes(self, line):
152 if len(line) > 1000: # TODO: byte-vs-str and newlines?
153 raise _error.AssuanError(message='Line too long')
154 if line.startswith(b'D '):
156 self.parameters = decode(line[2:])
158 line = str(line, encoding='utf-8')
159 match = _REQUEST_REGEXP.match(line)
161 raise _error.AssuanError(message='Invalid request')
162 self.command = match.group(1)
165 self.parameters = decode(match.group(3))
167 raise _error.AssuanError(message='Invalid request')
169 self.parameters = None
172 class Response (object):
175 http://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
177 >>> r = Response(type='OK')
180 >>> r = Response(type='ERR', parameters='1 General error')
182 'ERR 1 General error'
184 b'ERR 1 General error'
185 >>> r.from_bytes(b'OK')
188 >>> print(r.parameters)
190 >>> r.from_bytes(b'ERR 1 General error')
193 >>> print(r.parameters)
195 >>> r.from_bytes(b' invalid')
196 Traceback (most recent call last):
198 pyassuan.error.AssuanError: 76 Invalid response
199 >>> r.from_bytes(b'in-valid')
200 Traceback (most recent call last):
202 pyassuan.error.AssuanError: 76 Invalid response
213 def __init__(self, type=None, parameters=None):
215 self.parameters = parameters
219 return '{} {}'.format(self.type, encode(self.parameters))
225 return b'{} {}'.format(b'D', self.parameters)
227 return '{} {}'.format(
228 self.type, encode(self.parameters)).encode('utf-8')
229 return self.type.encode('utf-8')
231 def from_bytes(self, line):
232 if len(line) > 1000: # TODO: byte-vs-str and newlines?
233 raise _error.AssuanError(message='Line too long')
234 if line.startswith(b'D'):
235 self.command = t = 'D'
237 line = str(line, encoding='utf-8')
242 raise _error.AssuanError(message='Invalid response')
244 if type == 'D': # data
245 self.parameters = decode(line[2:])
246 elif type == '#': # comment
247 self.parameters = decode(line[2:])
249 match = _REQUEST_REGEXP.match(line)
251 raise _error.AssuanError(message='Invalid request')
254 self.parameters = decode(match.group(3))
256 raise _error.AssuanError(message='Invalid request')
258 self.parameters = None
261 def error_response(error):
264 >>> from pyassuan.error import AssuanError
265 >>> error = AssuanError(1)
266 >>> response = error_response(error)
270 return Response(type='ERR', parameters=str(error))