import PyrexTypes
from sets import Set as set
-class PureCFuncNode(Node):
- def __init__(self, pos, cname, type, c_code, visibility='private'):
- self.pos = pos
- self.cname = cname
- self.type = type
- self.c_code = c_code
- self.visibility = visibility
-
- def analyse_types(self, env):
- self.entry = env.declare_cfunction(
- "<pure c function:%s>" % self.cname,
- self.type, self.pos, cname=self.cname,
- defining=True, visibility=self.visibility)
-
- def generate_function_definitions(self, env, code, transforms):
- assert self.type.optional_arg_count == 0
- visibility = self.entry.visibility
- if visibility != 'private':
- storage_class = "%s " % Naming.extern_c_macro
+def get_flags(buffer_aux, buffer_type):
+ flags = 'PyBUF_FORMAT | PyBUF_INDIRECT'
+ if buffer_aux.writable_needed: flags += "| PyBUF_WRITABLE"
+ return flags
+
+def used_buffer_aux_vars(entry):
+ buffer_aux = entry.buffer_aux
+ buffer_aux.buffer_info_var.used = True
+ for s in buffer_aux.shapevars: s.used = True
+ for s in buffer_aux.stridevars: s.used = True
+
+def put_unpack_buffer_aux_into_scope(buffer_aux, code):
+ bufstruct = buffer_aux.buffer_info_var.cname
+
+ code.putln(" ".join(["%s = %s.strides[%d];" %
+ (s.cname, bufstruct, idx)
+ for idx, s in enumerate(buffer_aux.stridevars)]))
+ code.putln(" ".join(["%s = %s.shape[%d];" %
+ (s.cname, bufstruct, idx)
+ for idx, s in enumerate(buffer_aux.shapevars)]))
+
+def put_zero_buffer_aux_into_scope(buffer_aux, code):
+ # If new buffer is None, set up to access 0
+ # for a "safer segfault" on access
+ code.putln("%s.buf = 0;" % buffer_aux.buffer_info_var.cname)
+ code.putln(" ".join(["%s = 0;" % s.cname
+ for s in buffer_aux.stridevars]))
+ code.putln(" ".join(["%s = 0;" % s.cname
+ for s in buffer_aux.shapevars]))
+
+def getbuffer_cond_code(obj_cname, buffer_aux, flags, ndim):
+ bufstruct = buffer_aux.buffer_info_var.cname
+ checker = buffer_aux.tschecker
+ return "PyObject_GetBuffer(%s, &%s, %s) == -1 || %s(&%s, %d) == -1" % (
+ obj_cname, bufstruct, flags, checker, bufstruct, ndim)
+
+def put_acquire_arg_buffer(entry, code, pos):
+ buffer_aux = entry.buffer_aux
+ cname = entry.cname
+ bufstruct = buffer_aux.buffer_info_var.cname
+ flags = get_flags(buffer_aux, entry.type)
+ # Acquire any new buffer
+ code.put('if (%s != Py_None) ' % cname)
+ code.begin_block()
+ code.putln('%s.buf = 0;' % bufstruct) # PEP requirement
+ code.put(code.error_goto_if(getbuffer_cond_code(cname,
+ buffer_aux,
+ flags,
+ entry.type.ndim),
+ pos))
+ # An exception raised in arg parsing cannot be catched, so no
+ # need to do care about the buffer then.
+ put_unpack_buffer_aux_into_scope(buffer_aux, code)
+ code.end_block()
+
+def put_release_buffer(entry, code):
+ code.putln("if (%s != Py_None) PyObject_ReleaseBuffer(%s, &%s);" % (
+ entry.cname, entry.cname, entry.buffer_aux.buffer_info_var.cname))
+
+def put_assign_to_buffer(lhs_cname, rhs_cname, buffer_aux, buffer_type,
+ is_initialized, pos, code):
+ bufstruct = buffer_aux.buffer_info_var.cname
+ flags = get_flags(buffer_aux, buffer_type)
+
+ if is_initialized:
+ # Release any existing buffer
+ code.put('if (%s != Py_None) ' % lhs_cname)
+ code.begin_block();
+ code.putln('PyObject_ReleaseBuffer(%s, &%s);' % (
+ lhs_cname, bufstruct))
+ code.end_block()
+ # Acquire any new buffer
+ code.put('if (%s != Py_None) ' % rhs_cname)
+ code.begin_block()
+ code.putln('%s.buf = 0;' % bufstruct) # PEP requirement
+ code.put('if (%s) ' % code.unlikely(getbuffer_cond_code(rhs_cname, buffer_aux, flags, buffer_type.ndim)))
+ code.begin_block()
+ # If acquisition failed, attempt to reacquire the old buffer
+ # before raising the exception. A failure of reacquisition
+ # will cause the reacquisition exception to be reported, one
+ # can consider working around this later.
+ if is_initialized:
+ put_zero_buffer_aux_into_scope(buffer_aux, code)
+ code.put('if (%s != Py_None && (%s)) ' % (rhs_cname,
+ getbuffer_cond_code(rhs_cname, buffer_aux, flags, buffer_type.ndim)))
+ code.begin_block()
+ put_zero_buffer_aux_into_scope(buffer_aux, code)
+ code.end_block()
+ else:
+ # our entry had no previous value, so set to None when acquisition fails
+ code.putln('%s = Py_None; Py_INCREF(Py_None);' % lhs_cname)
+ code.putln(code.error_goto(pos))
+ code.end_block() # acquisition failure
+ # Unpack indices
+ put_unpack_buffer_aux_into_scope(buffer_aux, code)
+ code.putln('} else {')
+ # If new buffer is None, set up to access 0
+ # for a "safer segfault" on access
+ put_zero_buffer_aux_into_scope(buffer_aux, code)
+ code.end_block()
+
+ # Everything is ok, assign object variable
+ code.putln("%s = %s;" % (lhs_cname, rhs_cname))
+
+
+def put_access(entry, index_types, index_cnames, tmp_cname, pos, code):
+ """Returns a c string which can be used to access the buffer
+ for reading or writing"""
+ bufaux = entry.buffer_aux
+ bufstruct = bufaux.buffer_info_var.cname
+ # Check bounds and fix negative indices
+ boundscheck = True
+ nonegs = True
+ if boundscheck:
+ code.putln("%s = -1;" % tmp_cname)
+ for idx, (type, cname, shape) in enumerate(zip(index_types, index_cnames,
+ bufaux.shapevars)):
+ if type.signed != 0:
+ nonegs = False
+ # not unsigned, deal with negative index
+ code.putln("if (%s < 0) {" % cname)
+ code.putln("%s += %s;" % (cname, shape.cname))
+ if boundscheck:
+ code.putln("if (%s) %s = %d;" % (
+ code.unlikely("%s < 0" % cname), tmp_cname, idx))
+ code.put("} else ")
else:
- storage_class = "static "
- arg_decls = [arg.declaration_code() for arg in self.type.args]
- sig = self.type.return_type.declaration_code(
- self.type.function_header_code(self.cname, ", ".join(arg_decls)))
- code.putln("")
- code.putln("%s%s {" % (storage_class, sig))
- code.put(self.c_code)
- code.putln("}")
+ if idx > 0: code.put("else ")
+ if boundscheck:
+ # check bounds in positive direction
+ code.putln("if (%s) %s = %d;" % (
+ code.unlikely("%s >= %s" % (cname, shape.cname)),
+ tmp_cname, idx))
+# if boundscheck or not nonegs:
+# code.putln("}")
+ if boundscheck:
+ code.put("if (%s) " % code.unlikely("%s != -1" % tmp_cname))
+ code.begin_block()
+ code.putln('__Pyx_BufferIndexError(%s);' % tmp_cname)
+ code.putln(code.error_goto(pos))
+ code.end_block()
+
+ # Create buffer lookup and return it
+
+ offset = " + ".join(["%s * %s" % (idx, stride.cname)
+ for idx, stride in
+ zip(index_cnames, bufaux.stridevars)])
+ ptrcode = "(%s.buf + %s)" % (bufstruct, offset)
+ valuecode = "*%s" % entry.type.buffer_ptr_type.cast_code(ptrcode)
+ return valuecode
+
+
+# Utility function to set the right exception
+# The caller should immediately goto_error
+buffer_boundsfail_error_utility_code = [
+"""\
+static void __Pyx_BufferIndexError(int axis); /*proto*/
+""","""\
+static void __Pyx_BufferIndexError(int axis) {
+ PyErr_Format(PyExc_IndexError,
+ "Out of bounds on buffer access (axis %d)", axis);
+}
+"""]
+
+
+#
+# Buffer type checking. Utility code for checking that acquired
+# buffers match our assumptions. We only need to check ndim and
+# the format string; the access mode/flags is checked by the
+# exporter.
+#
+buffer_check_utility_code = ["""\
+static const char* __Pyx_ConsumeWhitespace(const char* ts); /*proto*/
+static const char* __Pyx_BufferTypestringCheckEndian(const char* ts); /*proto*/
+static void __Pyx_BufferNdimError(Py_buffer* buffer, int expected_ndim); /*proto*/
+""", """
+static const char* __Pyx_ConsumeWhitespace(const char* ts) {
+ while (1) {
+ switch (*ts) {
+ case 10:
+ case 13:
+ case ' ':
+ ++ts;
+ default:
+ return ts;
+ }
+ }
+}
- def generate_execution_code(self, code):
- pass
+static const char* __Pyx_BufferTypestringCheckEndian(const char* ts) {
+ int ok = 1;
+ switch (*ts) {
+ case '@':
+ case '=':
+ ++ts; break;
+ case '<':
+ if (__BYTE_ORDER == __LITTLE_ENDIAN) ++ts;
+ else ok = 0;
+ break;
+ case '>':
+ case '!':
+ if (__BYTE_ORDER == __BIG_ENDIAN) ++ts;
+ else ok = 0;
+ break;
+ }
+ if (!ok) {
+ PyErr_Format(PyExc_ValueError, "Buffer has wrong endianness (rejecting on '%s')", ts);
+ return NULL;
+ }
+ return ts;
+}
+static void __Pyx_BufferNdimError(Py_buffer* buffer, int expected_ndim) {
+ PyErr_Format(PyExc_ValueError,
+ "Buffer has wrong number of dimensions (expected %d, got %d)",
+ expected_ndim, buffer->ndim);
+}
-tschecker_functype = PyrexTypes.CFuncType(
- PyrexTypes.c_char_ptr_type,
- [PyrexTypes.CFuncTypeArg(EncodedString("ts"), PyrexTypes.c_char_ptr_type,
- (0, 0, None), cname="ts")],
- exception_value = "NULL"
-)
+"""]
-tsprefix = "__Pyx_tsc"
+
+class IntroduceBufferAuxiliaryVars(CythonTransform):
+
+ #
+ # Entry point
+ #
+
+ def __call__(self, node):
+ assert isinstance(node, ModuleNode)
- try:
- cymod = self.context.modules[u'__cython__']
- except KeyError:
- # No buffer fun for this module
- return node
- self.bufstruct_type = cymod.entries[u'Py_buffer'].type
+ self.tscheckers = {}
+ self.tsfuncs = set()
+ self.ts_funcs = []
+ self.ts_item_checkers = {}
+ self.module_scope = node.scope
+ self.module_pos = node.pos
+ result = super(IntroduceBufferAuxiliaryVars, self).__call__(node)
+ # Register ts stuff
+ if "endian.h" not in node.scope.include_files:
+ node.scope.include_files.append("endian.h")
+ result.body.stats += self.ts_funcs
+ return result
+
+
+ #
+ # Basic operations for transforms
+ #
+ def handle_scope(self, node, scope):
+ # For all buffers, insert extra variables in the scope.
+ # The variables are also accessible from the buffer_info
+ # on the buffer entry
+ bufvars = [entry for name, entry
+ in scope.entries.iteritems()
+ if entry.type.is_buffer]
+
+ if isinstance(node, ModuleNode) and len(bufvars) > 0:
+ # for now...note that pos is wrong
+ raise CompileError(node.pos, "Buffer vars not allowed in module scope")
+ for entry in bufvars:
+ name = entry.name
+ buftype = entry.type
+
+ # Get or make a type string checker
+ tschecker = self.buffer_type_checker(buftype.dtype, scope)
+
+ # Declare auxiliary vars
+ cname = scope.mangle(Naming.bufstruct_prefix, name)
+ bufinfo = scope.declare_var(name="$%s" % cname, cname=cname,
- type=self.bufstruct_type, pos=node.pos)
++ type=PyrexTypes.c_py_buffer_type, pos=node.pos)
+
+ bufinfo.used = True
+
+ def var(prefix, idx):
+ cname = scope.mangle(prefix, "%d_%s" % (idx, name))
+ result = scope.declare_var("$%s" % cname, PyrexTypes.c_py_ssize_t_type,
+ node.pos, cname=cname, is_cdef=True)
+ result.init = "0"
+ if entry.is_arg:
+ result.used = True
+ return result
+
+ stridevars = [var(Naming.bufstride_prefix, i) for i in range(entry.type.ndim)]
+ shapevars = [var(Naming.bufshape_prefix, i) for i in range(entry.type.ndim)]
+ entry.buffer_aux = Symtab.BufferAux(bufinfo, stridevars, shapevars, tschecker)
+
+ scope.buffer_entries = bufvars
+ self.scope = scope
+
+ def visit_ModuleNode(self, node):
+ node.scope.use_utility_code(buffer_boundsfail_error_utility_code)
+ self.handle_scope(node, node.scope)
+ self.visitchildren(node)
+ return node
+
+ def visit_FuncDefNode(self, node):
+ self.handle_scope(node, node.local_scope)
+ self.visitchildren(node)
+ return node
+
+ #
+ # Utils for creating type string checkers
+ #
+ def mangle_dtype_name(self, dtype):
+ # Use prefixes to seperate user defined types from builtins
+ # (consider "typedef float unsigned_int")
+ if dtype.typestring is None:
+ prefix = "nn_"
+ else:
+ prefix = ""
+ return prefix + dtype.declaration_code("").replace(" ", "_")
+
+ def get_ts_check_item(self, dtype, env):
+ # See if we can consume one (unnamed) dtype as next item
+ # Put native types and structs in seperate namespaces (as one could create a struct named unsigned_int...)
+ name = "__Pyx_BufferTypestringCheck_item_%s" % self.mangle_dtype_name(dtype)
+ funcnode = self.ts_item_checkers.get(dtype)
+ if not name in self.tsfuncs:
+ char = dtype.typestring
+ if char is not None:
+ # Can use direct comparison
+ code = """\
+ if (*ts != '%s') {
+ PyErr_Format(PyExc_ValueError, "Buffer datatype mismatch (rejecting on '%%s')", ts);
+ return NULL;
+ } else return ts + 1;
+""" % char
+ else:
+ # Cannot trust declared size; but rely on int vs float and
+ # signed/unsigned to be correctly declared
+ ctype = dtype.declaration_code("")
+ code = """\
+ int ok;
+ switch (*ts) {"""
+ if dtype.is_int:
+ types = [
+ ('b', 'char'), ('h', 'short'), ('i', 'int'),
+ ('l', 'long'), ('q', 'long long')
+ ]
+ if dtype.signed == 0:
+ code += "".join(["\n case '%s': ok = (sizeof(%s) == sizeof(%s) && (%s)-1 > 0); break;" %
+ (char.upper(), ctype, against, ctype) for char, against in types])
+ else:
+ code += "".join(["\n case '%s': ok = (sizeof(%s) == sizeof(%s) && (%s)-1 < 0); break;" %
+ (char, ctype, against, ctype) for char, against in types])
+ code += """\
+ default: ok = 0;
+ }
+ if (!ok) {
+ PyErr_Format(PyExc_ValueError, "Buffer datatype mismatch (rejecting on '%s')", ts);
+ return NULL;
+ } else return ts + 1;
+"""
+ env.use_utility_code(["""\
+static const char* %s(const char* ts); /*proto*/
+""" % name, """
+static const char* %s(const char* ts) {
+%s
+}
+""" % (name, code)])
+ self.tsfuncs.add(name)
+
+ return name
+
+ def get_ts_check_simple(self, dtype, env):
+ # Check whole string for single unnamed item
+ name = "__Pyx_BufferTypestringCheck_simple_%s" % self.mangle_dtype_name(dtype)
+ if not name in self.tsfuncs:
+ itemchecker = self.get_ts_check_item(dtype, env)
+ utilcode = ["""
+static int %s(Py_buffer* buf, int e_nd); /*proto*/
+""" % name,"""
+static int %(name)s(Py_buffer* buf, int e_nd) {
+ const char* ts = buf->format;
+ if (buf->ndim != e_nd) {
+ __Pyx_BufferNdimError(buf, e_nd);
+ return -1;
+ }
+ ts = __Pyx_ConsumeWhitespace(ts);
+ ts = __Pyx_BufferTypestringCheckEndian(ts);
+ if (!ts) return -1;
+ ts = __Pyx_ConsumeWhitespace(ts);
+ ts = %(itemchecker)s(ts);
+ if (!ts) return -1;
+ ts = __Pyx_ConsumeWhitespace(ts);
+ if (*ts != 0) {
+ PyErr_Format(PyExc_ValueError,
+ "Expected non-struct buffer data type (rejecting on '%%s')", ts);
+ return -1;
+ }
+ return 0;
+}""" % locals()]
+ env.use_utility_code(buffer_check_utility_code)
+ env.use_utility_code(utilcode)
+ self.tsfuncs.add(name)
+ return name
+
+ def buffer_type_checker(self, dtype, env):
+ # Creates a type checker function for the given type.
+ # Each checker is created as utility code. However, as each function
+ # is dynamically constructed we also keep a set self.tsfuncs containing
+ # the right functions for the types that are already created.
+ if dtype.is_struct_or_union:
+ assert False
+ elif dtype.is_int or dtype.is_float:
+ # This includes simple typedef-ed types
+ funcname = self.get_ts_check_simple(dtype, env)
+ else:
+ assert False
+ return funcname
+
+
class BufferTransform(CythonTransform):
"""
--- /dev/null
- cimport __cython__
-
+# Tests the buffer access syntax functionality by constructing
+# mock buffer objects.
+#
+# Note that the buffers are mock objects created for testing
+# the buffer access behaviour -- for instance there is no flag
+# checking in the buffer objects (why test our test case?), rather
+# what we want to test is what is passed into the flags argument.
+#
+
+
+
+cimport stdlib
+cimport python_buffer
+# Add all test_X function docstrings as unit tests
+
+__test__ = {}
+setup_string = """
+ >>> A = IntMockBuffer("A", range(6))
+ >>> B = IntMockBuffer("B", range(6))
+ >>> C = IntMockBuffer("C", range(6), (2,3))
+ >>> E = ErrorBuffer("E")
+
+"""
+
+def testcase(func):
+ __test__[func.__name__] = setup_string + func.__doc__
+ return func
+
+@testcase
+def acquire_release(o1, o2):
+ """
+ >>> acquire_release(A, B)
+ acquired A
+ released A
+ acquired B
+ released B
+ """
+ cdef object[int] buf
+ buf = o1
+ buf = o2
+
+#@testcase
+def acquire_raise(o):
+ """
+ Apparently, doctest won't handle mixed exceptions and print
+ stats, so need to circumvent this.
+
+ >>> A.resetlog()
+ >>> acquire_raise(A)
+ Traceback (most recent call last):
+ ...
+ Exception: on purpose
+ >>> A.printlog()
+ acquired A
+ released A
+
+ """
+ cdef object[int] buf
+ buf = o
+ o.printlog()
+ raise Exception("on purpose")
+
+@testcase
+def as_argument(object[int] bufarg, int n):
+ """
+ >>> as_argument(A, 6)
+ acquired A
+ 0 1 2 3 4 5
+ released A
+ """
+ cdef int i
+ for i in range(n):
+ print bufarg[i],
+ print
+
+@testcase
+def as_argument_defval(object[int] bufarg=IntMockBuffer('default', range(6)), int n=6):
+ """
+ >>> as_argument_defval()
+ acquired default
+ 0 1 2 3 4 5
+ released default
+ >>> as_argument_defval(A, 6)
+ acquired A
+ 0 1 2 3 4 5
+ released A
+ """
+ cdef int i
+ for i in range(n):
+ print bufarg[i],
+ print
+
+@testcase
+def cdef_assignment(obj, n):
+ """
+ >>> cdef_assignment(A, 6)
+ acquired A
+ 0 1 2 3 4 5
+ released A
+
+ """
+ cdef object[int] buf = obj
+ cdef int i
+ for i in range(n):
+ print buf[i],
+ print
+
+@testcase
+def forin_assignment(objs, int pick):
+ """
+ >>> as_argument_defval()
+ acquired default
+ 0 1 2 3 4 5
+ released default
+ >>> as_argument_defval(A, 6)
+ acquired A
+ 0 1 2 3 4 5
+ released A
+ """
+ cdef object[int] buf
+ for buf in objs:
+ print buf[pick]
+
+@testcase
+def cascaded_buffer_assignment(obj):
+ """
+ >>> cascaded_buffer_assignment(A)
+ acquired A
+ acquired A
+ released A
+ released A
+ """
+ cdef object[int] a, b
+ a = b = obj
+
+@testcase
+def tuple_buffer_assignment1(a, b):
+ """
+ >>> tuple_buffer_assignment1(A, B)
+ acquired A
+ acquired B
+ released A
+ released B
+ """
+ cdef object[int] x, y
+ x, y = a, b
+
+@testcase
+def tuple_buffer_assignment2(tup):
+ """
+ >>> tuple_buffer_assignment2((A, B))
+ acquired A
+ acquired B
+ released A
+ released B
+ """
+ cdef object[int] x, y
+ x, y = tup
+
+@testcase
+def printbuf_int_2d(o, shape):
+ """
+ >>> printbuf_int_2d(C, (2,3))
+ acquired C
+ 0 1 2
+ 3 4 5
+ released C
+ """
+ # should make shape builtin
+ cdef object[int, 2] buf
+ buf = o
+ cdef int i, j
+ for i in range(shape[0]):
+ for j in range(shape[1]):
+ print buf[i, j],
+ print
+
+@testcase
+def get_int_2d(object[int, 2] buf, int i, int j):
+ """
+ Check negative indexing:
+ >>> get_int_2d(C, 1, 1)
+ acquired C
+ released C
+ 4
+ >>> get_int_2d(C, -1, 0)
+ acquired C
+ released C
+ 3
+ >>> get_int_2d(C, -1, -2)
+ acquired C
+ released C
+ 4
+ >>> get_int_2d(C, -2, -3)
+ acquired C
+ released C
+ 0
+
+ Out-of-bounds errors:
+ >>> get_int_2d(C, 2, 0)
+ Traceback (most recent call last):
+ ...
+ IndexError: Out of bounds on buffer access (axis 0)
+ >>> get_int_2d(C, 0, -4)
+ Traceback (most recent call last):
+ ...
+ IndexError: Out of bounds on buffer access (axis 1)
+
+ """
+ return buf[i, j]
+
+@testcase
+def get_int_2d_uintindex(object[int, 2] buf, unsigned int i, unsigned int j):
+ """
+ Unsigned indexing:
+ >>> get_int_2d_uintindex(C, 0, 0)
+ acquired C
+ released C
+ 0
+ >>> get_int_2d_uintindex(C, 1, 2)
+ acquired C
+ released C
+ 5
+ """
+ # This is most interesting with regards to the C code
+ # generated.
+ return buf[i, j]
+
+
+#
+# Buffer type mismatch examples. Varying the type and access
+# method simultaneously, the odds of an interaction is virtually
+# zero.
+#
+@testcase
+def fmtst1(buf):
+ """
+ >>> fmtst1(IntMockBuffer("A", range(3)))
+ Traceback (most recent call last):
+ ...
+ ValueError: Buffer datatype mismatch (rejecting on 'i')
+ """
+ cdef object[float] a = buf
+
+@testcase
+def fmtst2(object[int] buf):
+ """
+ >>> fmtst2(FloatMockBuffer("A", range(3)))
+ Traceback (most recent call last):
+ ...
+ ValueError: Buffer datatype mismatch (rejecting on 'f')
+ """
+
+@testcase
+def ndim1(object[int, 2] buf):
+ """
+ >>> ndim1(IntMockBuffer("A", range(3)))
+ Traceback (most recent call last):
+ ...
+ ValueError: Buffer has wrong number of dimensions (expected 2, got 1)
+ """
+
+#
+# Test which flags are passed.
+#
+@testcase
+def readonly(obj):
+ """
+ >>> R = UnsignedShortMockBuffer("R", range(27), shape=(3, 3, 3))
+ >>> readonly(R)
+ acquired R
+ 25
+ released R
+ >>> R.recieved_flags
+ ['FORMAT', 'INDIRECT', 'ND', 'STRIDES']
+ """
+ cdef object[unsigned short int, 3] buf = obj
+ print buf[2, 2, 1]
+
+@testcase
+def writable(obj):
+ """
+ >>> R = UnsignedShortMockBuffer("R", range(27), shape=(3, 3, 3))
+ >>> writable(R)
+ acquired R
+ released R
+ >>> R.recieved_flags
+ ['FORMAT', 'INDIRECT', 'ND', 'STRIDES', 'WRITABLE']
+ """
+ cdef object[unsigned short int, 3] buf = obj
+ buf[2, 2, 1] = 23
+
+
+#
+# Coercions
+#
+@testcase
+def coercions(object[unsigned char] uc):
+ """
+TODO
+ """
+ print type(uc[0])
+ uc[0] = -1
+ print uc[0]
+ uc[0] = <int>3.14
+ print uc[0]
+
+@testcase
+def printbuf_float(o, shape):
+ """
+ >>> printbuf_float(FloatMockBuffer("F", [1.0, 1.25, 0.75, 1.0]), (4,))
+ acquired F
+ 1.0 1.25 0.75 1.0
+ released F
+ """
+
+ # should make shape builtin
+ cdef object[float] buf
+ buf = o
+ cdef int i, j
+ for i in range(shape[0]):
+ print buf[i],
+ print
+
+
+available_flags = (
+ ('FORMAT', python_buffer.PyBUF_FORMAT),
+ ('INDIRECT', python_buffer.PyBUF_INDIRECT),
+ ('ND', python_buffer.PyBUF_ND),
+ ('STRIDES', python_buffer.PyBUF_STRIDES),
+ ('WRITABLE', python_buffer.PyBUF_WRITABLE)
+)
+
+cdef class MockBuffer:
+ cdef object format
+ cdef char* buffer
+ cdef int len, itemsize, ndim
+ cdef Py_ssize_t* strides
+ cdef Py_ssize_t* shape
+ cdef object label, log
+ cdef readonly object recieved_flags
+
+ def __init__(self, label, data, shape=None, strides=None, format=None):
+ self.label = label
+ self.log = ""
+ self.itemsize = self.get_itemsize()
+ if format is None: format = self.get_default_format()
+ if shape is None: shape = (len(data),)
+ if strides is None:
+ strides = []
+ cumprod = 1
+ rshape = list(shape)
+ rshape.reverse()
+ for s in rshape:
+ strides.append(cumprod)
+ cumprod *= s
+ strides.reverse()
+ strides = [x * self.itemsize for x in strides]
+ self.format = format
+ self.len = len(data) * self.itemsize
+ self.buffer = <char*>stdlib.malloc(self.len)
+ self.fill_buffer(data)
+ self.ndim = len(shape)
+ self.strides = <Py_ssize_t*>stdlib.malloc(self.ndim * sizeof(Py_ssize_t))
+ for i, x in enumerate(strides):
+ self.strides[i] = x
+ self.shape = <Py_ssize_t*>stdlib.malloc(self.ndim * sizeof(Py_ssize_t))
+ for i, x in enumerate(shape):
+ self.shape[i] = x
+ def __dealloc__(self):
+ stdlib.free(self.strides)
+ stdlib.free(self.shape)
+
+ def __getbuffer__(MockBuffer self, Py_buffer* buffer, int flags):
+ if buffer is NULL:
+ print u"locking!"
+ return
+
+ self.recieved_flags = []
+ for name, value in available_flags:
+ if (value & flags) == value:
+ self.recieved_flags.append(name)
+
+ buffer.buf = self.buffer
+ buffer.len = self.len
+ buffer.readonly = 0
+ buffer.format = <char*>self.format
+ buffer.ndim = self.ndim
+ buffer.shape = self.shape
+ buffer.strides = self.strides
+ buffer.suboffsets = NULL
+ buffer.itemsize = self.itemsize
+ buffer.internal = NULL
+ msg = "acquired %s" % self.label
+ print msg
+ self.log += msg + "\n"
+
+ def __releasebuffer__(MockBuffer self, Py_buffer* buffer):
+ msg = "released %s" % self.label
+ print msg
+ self.log += msg + "\n"
+
+ cdef fill_buffer(self, object data):
+ cdef char* it = self.buffer
+ for value in data:
+ self.write(it, value)
+ it += self.itemsize
+
+ def printlog(self):
+ print self.log,
+
+ def resetlog(self):
+ self.log = ""
+
+ cdef int write(self, char* buf, object value) except -1: raise Exception()
+ cdef get_itemsize(self):
+ print "ERROR, not subclassed", self.__class__
+ cdef get_default_format(self):
+ print "ERROR, not subclassed", self.__class__
+
+cdef class FloatMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ (<float*>buf)[0] = <float>value
+ return 0
+ cdef get_itemsize(self): return sizeof(float)
+ cdef get_default_format(self): return "=f"
+
+cdef class IntMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ (<int*>buf)[0] = <int>value
+ return 0
+ cdef get_itemsize(self): return sizeof(int)
+ cdef get_default_format(self): return "=i"
+
+cdef class UnsignedShortMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ (<unsigned short*>buf)[0] = <unsigned short>value
+ return 0
+ cdef get_itemsize(self): return sizeof(unsigned short)
+ cdef get_default_format(self): return "=H"
+
+cdef class ErrorBuffer:
+ cdef object label
+
+ def __init__(self, label):
+ self.label = label
+
+ def __getbuffer__(MockBuffer self, Py_buffer* buffer, int flags):
+ raise Exception("acquiring %s" % self.label)
+
+ def __releasebuffer__(MockBuffer self, Py_buffer* buffer):
+ raise Exception("releasing %s" % self.label)
+