import PyrexTypes
from sets import Set as set
+def get_flags(buffer_aux, buffer_type):
+ flags = 'PyBUF_FORMAT | PyBUF_INDIRECT'
+ if buffer_aux.writable_needed: flags += "| PyBUF_WRITABLE"
+ return flags
+
def used_buffer_aux_vars(entry):
buffer_aux = entry.buffer_aux
buffer_aux.buffer_info_var.used = True
code.putln(" ".join(["%s = 0;" % s.cname
for s in buffer_aux.shapevars]))
+def getbuffer_cond_code(obj_cname, buffer_aux, flags, ndim):
+ bufstruct = buffer_aux.buffer_info_var.cname
+ checker = buffer_aux.tschecker
+ return "PyObject_GetBuffer(%s, &%s, %s) == -1 || %s(&%s, %d) == -1" % (
+ obj_cname, bufstruct, flags, checker, bufstruct, ndim)
+
def put_acquire_arg_buffer(entry, code, pos):
buffer_aux = entry.buffer_aux
cname = entry.cname
bufstruct = buffer_aux.buffer_info_var.cname
- flags = '0'
+ flags = get_flags(buffer_aux, entry.type)
# Acquire any new buffer
code.put('if (%s != Py_None) ' % cname)
code.begin_block()
code.putln('%s.buf = 0;' % bufstruct) # PEP requirement
- code.put(code.error_goto_if(
- 'PyObject_GetBuffer(%s, &%s, %s) == -1 || %s(&%s, %d) == -1' % (
- cname, bufstruct, flags, buffer_aux.tschecker, bufstruct, entry.type.ndim),
- pos))
+ code.put(code.error_goto_if(getbuffer_cond_code(cname,
+ buffer_aux,
+ flags,
+ entry.type.ndim),
+ pos))
# An exception raised in arg parsing cannot be catched, so no
# need to do care about the buffer then.
put_unpack_buffer_aux_into_scope(buffer_aux, code)
def put_assign_to_buffer(lhs_cname, rhs_cname, buffer_aux, buffer_type,
is_initialized, pos, code):
bufstruct = buffer_aux.buffer_info_var.cname
- flags = '0'
+ flags = get_flags(buffer_aux, buffer_type)
if is_initialized:
# Release any existing buffer
code.put('if (%s != Py_None) ' % rhs_cname)
code.begin_block()
code.putln('%s.buf = 0;' % bufstruct) # PEP requirement
- code.put('if (%s) ' % code.unlikely(
- 'PyObject_GetBuffer(%s, &%s, %s) == -1' % (
- rhs_cname,
- bufstruct,
- flags)
- + ' || %s(&%s, %d) == -1' % (
- buffer_aux.tschecker, bufstruct, buffer_type.ndim
- )))
+ code.put('if (%s) ' % code.unlikely(getbuffer_cond_code(rhs_cname, buffer_aux, flags, buffer_type.ndim)))
code.begin_block()
# If acquisition failed, attempt to reacquire the old buffer
# before raising the exception. A failure of reacquisition
# can consider working around this later.
if is_initialized:
put_zero_buffer_aux_into_scope(buffer_aux, code)
- code.put('if (%s != Py_None && PyObject_GetBuffer(%s, &%s, %s) == -1) ' % (
- lhs_cname, lhs_cname, bufstruct, flags))
+ code.put('if (%s != Py_None && (%s)) ' % (rhs_cname,
+ getbuffer_cond_code(rhs_cname, buffer_aux, flags, buffer_type.ndim)))
code.begin_block()
put_zero_buffer_aux_into_scope(buffer_aux, code)
code.end_block()
cimport __cython__
+# Tests the buffer access syntax functionality by constructing
+# mock buffer objects.
+#
+# Note that the buffers are mock objects created for testing
+# the buffer access behaviour -- for instance there is no flag
+# checking in the buffer objects (why test our test case?), rather
+# what we want to test is what is passed into the flags argument.
+#
+
+
cimport stdlib
+cimport python_buffer
# Add all test_X function docstrings as unit tests
__test__ = {}
ValueError: Buffer has wrong number of dimensions (expected 2, got 1)
"""
+#
+# Test which flags are passed.
+#
+@testcase
+def readonly(obj):
+ """
+ >>> R = UnsignedShortMockBuffer("R", range(27), shape=(3, 3, 3))
+ >>> readonly(R)
+ acquired R
+ 25
+ released R
+ >>> R.recieved_flags
+ ['FORMAT', 'INDIRECT', 'ND', 'STRIDES']
+ """
+ cdef object[unsigned short int, 3] buf = obj
+ print buf[2, 2, 1]
+
+@testcase
+def writable(obj):
+ """
+ >>> R = UnsignedShortMockBuffer("R", range(27), shape=(3, 3, 3))
+ >>> writable(R)
+ acquired R
+ released R
+ >>> R.recieved_flags
+ ['FORMAT', 'INDIRECT', 'ND', 'STRIDES', 'WRITABLE']
+ """
+ cdef object[unsigned short int, 3] buf = obj
+ buf[2, 2, 1] = 23
+
+
+#
+# Coercions
+#
+@testcase
+def coercions(object[unsigned char] uc):
+ """
+TODO
+ """
+ print type(uc[0])
+ uc[0] = -1
+ print uc[0]
+ uc[0] = <int>3.14
+ print uc[0]
@testcase
def printbuf_float(o, shape):
print
+available_flags = (
+ ('FORMAT', python_buffer.PyBUF_FORMAT),
+ ('INDIRECT', python_buffer.PyBUF_INDIRECT),
+ ('ND', python_buffer.PyBUF_ND),
+ ('STRIDES', python_buffer.PyBUF_STRIDES),
+ ('WRITABLE', python_buffer.PyBUF_WRITABLE)
+)
+
cdef class MockBuffer:
cdef object format
cdef char* buffer
cdef Py_ssize_t* strides
cdef Py_ssize_t* shape
cdef object label, log
+ cdef readonly object recieved_flags
def __init__(self, label, data, shape=None, strides=None, format=None):
self.label = label
if buffer is NULL:
print u"locking!"
return
+
+ self.recieved_flags = []
+ for name, value in available_flags:
+ if (value & flags) == value:
+ self.recieved_flags.append(name)
+
buffer.buf = self.buffer
buffer.len = self.len
buffer.readonly = 0
return 0
cdef get_itemsize(self): return sizeof(int)
cdef get_default_format(self): return "=i"
+
+cdef class UnsignedShortMockBuffer(MockBuffer):
+ cdef int write(self, char* buf, object value) except -1:
+ (<unsigned short*>buf)[0] = <unsigned short>value
+ return 0
+ cdef get_itemsize(self): return sizeof(unsigned short)
+ cdef get_default_format(self): return "=H"
cdef class ErrorBuffer:
cdef object label