2b29648cc0169e5f279eed929ea9bd3d9116f386
[pyassuan.git] / pyassuan / common.py
1 # Copyright (C) 2012 W. Trevor King <wking@drexel.edu>
2 #
3 # This file is part of pyassuan.
4 #
5 # pyassuan is free software: you can redistribute it and/or modify it under the
6 # terms of the GNU General Public License as published by the Free Software
7 # Foundation, either version 3 of the License, or (at your option) any later
8 # version.
9 #
10 # pyassuan is distributed in the hope that it will be useful, but WITHOUT ANY
11 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
12 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # pyassuan.  If not, see <http://www.gnu.org/licenses/>.
16
17 """Items common to both the client and server
18 """
19
20 import array as _array
21 import re as _re
22 import socket as _socket
23
24 from . import error as _error
25
26
27 LINE_LENGTH = 1002  # 1000 + [CR,]LF
28 _ENCODE_PATTERN = '(' + '|'.join(['%', '\r', '\n']) + ')'
29 _ENCODE_STR_REGEXP = _re.compile(_ENCODE_PATTERN)
30 _ENCODE_BYTE_REGEXP = _re.compile(_ENCODE_PATTERN.encode('ascii'))    
31 _DECODE_STR_REGEXP = _re.compile('(%[0-9A-Fa-f]{2})')
32 _DECODE_BYTE_REGEXP = _re.compile(b'(%[0-9A-Fa-f]{2})')
33 _REQUEST_REGEXP = _re.compile('^(\w+)( *)(.*)\Z')
34
35
36 def encode(data):
37     r"""
38
39     >>> encode('It grew by 5%!\n')
40     'It grew by 5%25!%0A'
41     >>> encode(b'It grew by 5%!\n')
42     b'It grew by 5%25!%0A'
43     """
44     if isinstance(data, bytes):
45         regexp = _ENCODE_BYTE_REGEXP
46     else:
47         regexp = _ENCODE_STR_REGEXP
48     return regexp.sub(
49         lambda x : to_hex(x.group()), data)
50
51 def decode(data):
52     r"""
53
54     >>> decode('%22Look out!%22%0AWhere%3F')
55     '"Look out!"\nWhere?'
56     >>> decode(b'%22Look out!%22%0AWhere%3F')
57     b'"Look out!"\nWhere?'
58     """
59     if isinstance(data, bytes):
60         regexp = _DECODE_BYTE_REGEXP
61     else:
62         regexp = _DECODE_STR_REGEXP
63     return regexp.sub(
64         lambda x : from_hex(x.group()), data)
65
66 def from_hex(code):
67     r"""
68
69     >>> from_hex('%22')
70     '"'
71     >>> from_hex('%0A')
72     '\n'
73     >>> from_hex(b'%0A')
74     b'\n'
75     """
76     c = chr(int(code[1:], 16))
77     if isinstance(code, bytes):
78         c =c.encode('ascii')
79     return c
80
81 def to_hex(char):
82     r"""
83
84     >>> to_hex('"')
85     '%22'
86     >>> to_hex('\n')
87     '%0A'
88     >>> to_hex(b'\n')
89     b'%0A'
90     """
91     hx = '%{:02X}'.format(ord(char))
92     if isinstance(char, bytes):
93         hx = hx.encode('ascii')
94     return hx
95
96
97 class Request (object):
98     """A client request
99
100     http://www.gnupg.org/documentation/manuals/assuan/Client-requests.html
101
102     >>> r = Request(command='BYE')
103     >>> str(r)
104     'BYE'
105     >>> r = Request(command='OPTION', parameters='testing at 5%')
106     >>> str(r)
107     'OPTION testing at 5%25'
108     >>> bytes(r)
109     b'OPTION testing at 5%25'
110     >>> r.from_bytes(b'BYE')
111     >>> r.command
112     'BYE'
113     >>> print(r.parameters)
114     None
115     >>> r.from_bytes(b'OPTION testing at 5%25')
116     >>> r.command
117     'OPTION'
118     >>> print(r.parameters)
119     testing at 5%
120     >>> r.from_bytes(b' invalid')
121     Traceback (most recent call last):
122       ...
123     pyassuan.error.AssuanError: 170 Invalid request
124     >>> r.from_bytes(b'in-valid')
125     Traceback (most recent call last):
126       ...
127     pyassuan.error.AssuanError: 170 Invalid request
128     """
129     def __init__(self, command=None, parameters=None, encoded=False):
130         self.command = command
131         self.parameters = parameters
132         self.encoded = encoded
133
134     def __str__(self):
135         if self.parameters:
136             if self.encoded:
137                 encoded_parameters = self.parameters
138             else:
139                 encoded_parameters = encode(self.parameters)
140             return '{} {}'.format(self.command, encoded_parameters)
141         return self.command
142
143     def __bytes__(self):
144         if self.parameters:
145             if self.encoded:
146                 encoded_parameters = self.parameters
147             else:
148                 encoded_parameters = encode(self.parameters)
149             return '{} {}'.format(
150                 self.command, encoded_parameters).encode('utf-8')
151         return self.command.encode('utf-8')
152
153     def from_bytes(self, line):
154         if len(line) > 1000:  # TODO: byte-vs-str and newlines?
155             raise _error.AssuanError(message='Line too long')
156         line = str(line, encoding='utf-8')
157         match = _REQUEST_REGEXP.match(line)
158         if not match:
159             raise _error.AssuanError(message='Invalid request')
160         self.command = match.group(1)
161         if match.group(3):
162             if match.group(2):
163                 self.parameters = decode(match.group(3))
164             else:
165                 raise _error.AssuanError(message='Invalid request')
166         else:
167             self.parameters = None
168
169
170 class Response (object):
171     """A server response
172
173     http://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
174
175     >>> r = Response(type='OK')
176     >>> str(r)
177     'OK'
178     >>> r = Response(type='ERR', parameters='1 General error')
179     >>> str(r)
180     'ERR 1 General error'
181     >>> bytes(r)
182     b'ERR 1 General error'
183     >>> r.from_bytes(b'OK')
184     >>> r.type
185     'OK'
186     >>> print(r.parameters)
187     None
188     >>> r.from_bytes(b'ERR 1 General error')
189     >>> r.type
190     'ERR'
191     >>> print(r.parameters)
192     1 General error
193     >>> r.from_bytes(b' invalid')
194     Traceback (most recent call last):
195       ...
196     pyassuan.error.AssuanError: 76 Invalid response
197     >>> r.from_bytes(b'in-valid')
198     Traceback (most recent call last):
199       ...
200     pyassuan.error.AssuanError: 76 Invalid response
201     """
202     types = {
203         'O': 'OK',
204         'E': 'ERR',
205         'S': 'S',
206         '#': '#',
207         'D': 'D',
208         'I': 'INQUIRE',
209         }
210
211     def __init__(self, type=None, parameters=None):
212         self.type = type
213         self.parameters = parameters
214
215     def __str__(self):
216         if self.parameters:
217             return '{} {}'.format(self.type, encode(self.parameters))
218         return self.type
219
220     def __bytes__(self):
221         if self.parameters:
222             if self.type == 'D':
223                 return b' '.join((b'D', self.parameters))
224             else:
225                 return '{} {}'.format(
226                     self.type, encode(self.parameters)).encode('utf-8')
227         return self.type.encode('utf-8')
228
229     def from_bytes(self, line):
230         if len(line) > 1000:  # TODO: byte-vs-str and newlines?
231             raise _error.AssuanError(message='Line too long')
232         if line.startswith(b'D'):
233             self.command = t = 'D'
234         else:
235             line = str(line, encoding='utf-8')
236             t = line[0]
237         try:
238             type = self.types[t]
239         except KeyError:
240             raise _error.AssuanError(message='Invalid response')
241         self.type = type
242         if type == 'D':  # data
243             self.parameters = decode(line[2:])
244         elif type == '#':  # comment
245             self.parameters = decode(line[2:])
246         else:
247             match = _REQUEST_REGEXP.match(line)
248             if not match:
249                 raise _error.AssuanError(message='Invalid request')
250             if match.group(3):
251                 if match.group(2):
252                     self.parameters = decode(match.group(3))
253                 else:
254                     raise _error.AssuanError(message='Invalid request')
255             else:
256                 self.parameters = None
257
258
259 def error_response(error):
260     """
261
262     >>> from pyassuan.error import AssuanError
263     >>> error = AssuanError(1)
264     >>> response = error_response(error)
265     >>> print(response)
266     ERR 1 General error
267     """
268     return Response(type='ERR', parameters=str(error))
269
270
271 def send_fds(socket, msg, fds):
272     """Send a file descriptor over a Unix socket using ``sendmsg``.
273
274     ``sendmsg`` suport requires Python >= 3.3.
275
276     Code from
277     http://docs.python.org/dev/library/socket.html#socket.socket.sendmsg
278
279     Assuan equivalent is
280     http://www.gnupg.org/documentation/manuals/assuan/Client-code.html#function-assuan_005fsendfd
281     """
282     return socket.sendmsg(
283         [msg],
284         [(_socket.SOL_SOCKET, _socket.SCM_RIGHTS, _array.array('i', fds))])
285
286 def receive_fds(socket, msglen, maxfds):
287     """Recieve file descriptors using ``recvmsg``.
288
289     ``recvmsg`` suport requires Python >= 3.3.
290
291     Code from http://docs.python.org/dev/library/socket.html
292
293     Assuan equivalent is
294     http://www.gnupg.org/documentation/manuals/assuan/Client-code.html#fun_002dassuan_005freceivedfd
295     """
296     fds = _array.array('i')   # Array of ints
297     msg,ancdata,flags,addr = socket.recvmsg(
298         msglen, _socket.CMSG_LEN(maxfds * fds.itemsize))
299     for cmsg_level,cmsg_type,cmsg_data in ancdata:
300         if (cmsg_level == _socket.SOL_SOCKET and
301             cmsg_type == _socket.SCM_RIGHTS):
302             # Append data, ignoring any truncated integers at the end.
303             fds.fromstring(
304                 cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
305     return (msg, list(fds))