aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi249
1 files changed, 44 insertions, 205 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 4f87261e17..26eaf50eb4 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -220,9 +220,26 @@ cdef class CallDetails:
cdef class OperationTag:
- def __cinit__(self, user_tag):
+ def __cinit__(self, user_tag, operations):
self.user_tag = user_tag
self.references = []
+ self._operations = operations
+
+ cdef void store_ops(self):
+ self.c_nops = 0 if self._operations is None else len(self._operations)
+ if 0 < self.c_nops:
+ self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
+ for index in range(self.c_nops):
+ self.c_ops[index] = (<Operation>(self._operations[index])).c_op
+
+ cdef object release_ops(self):
+ if 0 < self.c_nops:
+ for index, operation in enumerate(self._operations):
+ (<Operation>operation).c_op = self.c_ops[index]
+ gpr_free(self.c_ops)
+ return self._operations
+ else:
+ return ()
cdef class Event:
@@ -232,7 +249,7 @@ cdef class Event:
CallDetails request_call_details,
object request_metadata,
bint is_new_request,
- Operations batch_operations):
+ object batch_operations):
self.type = type
self.success = success
self.tag = tag
@@ -320,7 +337,7 @@ cdef void* copy_ptr(void* ptr):
return ptr
-cdef void destroy_ptr(grpc_exec_ctx* ctx, void* ptr):
+cdef void destroy_ptr(void* ptr):
pass
@@ -348,7 +365,7 @@ cdef class ChannelArg:
elif hasattr(value, '__int__'):
# Pointer objects must override __int__() to return
# the underlying C address (Python ints are word size). The
- # lifecycle of the pointer is fixed to the lifecycle of the
+ # lifecycle of the pointer is fixed to the lifecycle of the
# python object wrapping it.
self.ptr_vtable.copy = &copy_ptr
self.ptr_vtable.destroy = &destroy_ptr
@@ -390,140 +407,13 @@ cdef class ChannelArgs:
return self.args[i]
-cdef class Metadatum:
-
- def __cinit__(self, bytes key, bytes value):
- self.c_metadata.key = _slice_from_bytes(key)
- self.c_metadata.value = _slice_from_bytes(value)
-
- cdef void _copy_metadatum(self, grpc_metadata *destination) nogil:
- destination[0].key = _copy_slice(self.c_metadata.key)
- destination[0].value = _copy_slice(self.c_metadata.value)
-
- @property
- def key(self):
- return _slice_bytes(self.c_metadata.key)
-
- @property
- def value(self):
- return _slice_bytes(self.c_metadata.value)
-
- def __len__(self):
- return 2
-
- def __getitem__(self, size_t i):
- if i == 0:
- return self.key
- elif i == 1:
- return self.value
- else:
- raise IndexError("index must be 0 (key) or 1 (value)")
-
- def __iter__(self):
- return iter((self.key, self.value))
-
- def __dealloc__(self):
- grpc_slice_unref(self.c_metadata.key)
- grpc_slice_unref(self.c_metadata.value)
-
-cdef class _MetadataIterator:
-
- cdef size_t i
- cdef size_t _length
- cdef object _metadatum_indexable
-
- def __cinit__(self, length, metadatum_indexable):
- self._length = length
- self._metadatum_indexable = metadatum_indexable
- self.i = 0
-
- def __iter__(self):
- return self
-
- def __next__(self):
- if self.i < self._length:
- result = self._metadatum_indexable[self.i]
- self.i = self.i + 1
- return result
- else:
- raise StopIteration()
-
-
-# TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this; just use an
-# ordinary sequence of pairs of bytestrings all the way down to the
-# grpc_call_start_batch call.
-cdef class Metadata:
- """Metadata being passed from application to core."""
-
- def __cinit__(self, metadata_iterable):
- metadata_sequence = tuple(metadata_iterable)
- cdef size_t count = len(metadata_sequence)
- with nogil:
- grpc_init()
- self.c_metadata = <grpc_metadata *>gpr_malloc(
- count * sizeof(grpc_metadata))
- self.c_count = count
- for index, metadatum in enumerate(metadata_sequence):
- self.c_metadata[index].key = grpc_slice_copy(
- (<Metadatum>metadatum).c_metadata.key)
- self.c_metadata[index].value = grpc_slice_copy(
- (<Metadatum>metadatum).c_metadata.value)
-
- def __dealloc__(self):
- with nogil:
- for index in range(self.c_count):
- grpc_slice_unref(self.c_metadata[index].key)
- grpc_slice_unref(self.c_metadata[index].value)
- gpr_free(self.c_metadata)
- grpc_shutdown()
-
- def __len__(self):
- return self.c_count
-
- def __getitem__(self, size_t index):
- if index < self.c_count:
- key = _slice_bytes(self.c_metadata[index].key)
- value = _slice_bytes(self.c_metadata[index].value)
- return Metadatum(key, value)
- else:
- raise IndexError()
-
- def __iter__(self):
- return _MetadataIterator(self.c_count, self)
-
-
-cdef class MetadataArray:
- """Metadata being passed from core to application."""
-
- def __cinit__(self):
- with nogil:
- grpc_init()
- grpc_metadata_array_init(&self.c_metadata_array)
-
- def __dealloc__(self):
- with nogil:
- grpc_metadata_array_destroy(&self.c_metadata_array)
- grpc_shutdown()
-
- def __len__(self):
- return self.c_metadata_array.count
-
- def __getitem__(self, size_t i):
- if i >= self.c_metadata_array.count:
- raise IndexError()
- key = _slice_bytes(self.c_metadata_array.metadata[i].key)
- value = _slice_bytes(self.c_metadata_array.metadata[i].value)
- return Metadatum(key=key, value=value)
-
- def __iter__(self):
- return _MetadataIterator(self.c_metadata_array.count, self)
-
-
cdef class Operation:
def __cinit__(self):
grpc_init()
self.references = []
+ self._c_metadata_needs_release = False
+ self._c_metadata_array_needs_destruction = False
self._status_details = grpc_empty_slice()
self.is_valid = False
@@ -556,13 +446,7 @@ cdef class Operation:
if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
raise TypeError("self must be an operation receiving metadata")
- # TODO(https://github.com/grpc/grpc/issues/7950): Drop the "all Cython
- # objects must be legitimate for use from Python at any time" policy in
- # place today, shift the policy toward "Operation objects are only usable
- # while their calls are active", and move this making-a-copy-because-this-
- # data-needs-to-live-much-longer-than-the-call-from-which-it-arose to the
- # lowest Python layer.
- return tuple(self._received_metadata)
+ return _metadata(&self._c_metadata_array)
@property
def received_status_code(self):
@@ -602,16 +486,21 @@ cdef class Operation:
return False if self._received_cancelled == 0 else True
def __dealloc__(self):
+ if self._c_metadata_needs_release:
+ _release_c_metadata(self._c_metadata, self._c_metadata_count)
+ if self._c_metadata_array_needs_destruction:
+ grpc_metadata_array_destroy(&self._c_metadata_array)
grpc_slice_unref(self._status_details)
grpc_shutdown()
-def operation_send_initial_metadata(Metadata metadata, int flags):
+def operation_send_initial_metadata(metadata, int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
op.c_op.flags = flags
- op.c_op.data.send_initial_metadata.count = metadata.c_count
- op.c_op.data.send_initial_metadata.metadata = metadata.c_metadata
- op.references.append(metadata)
+ _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
+ op._c_metadata_needs_release = True
+ op.c_op.data.send_initial_metadata.count = op._c_metadata_count
+ op.c_op.data.send_initial_metadata.metadata = op._c_metadata
op.is_valid = True
return op
@@ -633,18 +522,19 @@ def operation_send_close_from_client(int flags):
return op
def operation_send_status_from_server(
- Metadata metadata, grpc_status_code code, bytes details, int flags):
+ metadata, grpc_status_code code, bytes details, int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
op.c_op.flags = flags
+ _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
+ op._c_metadata_needs_release = True
op.c_op.data.send_status_from_server.trailing_metadata_count = (
- metadata.c_count)
- op.c_op.data.send_status_from_server.trailing_metadata = metadata.c_metadata
+ op._c_metadata_count)
+ op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata
op.c_op.data.send_status_from_server.status = code
grpc_slice_unref(op._status_details)
op._status_details = _slice_from_bytes(details)
op.c_op.data.send_status_from_server.status_details = &op._status_details
- op.references.append(metadata)
op.is_valid = True
return op
@@ -652,9 +542,10 @@ def operation_receive_initial_metadata(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
op.c_op.flags = flags
- op._received_metadata = MetadataArray()
+ grpc_metadata_array_init(&op._c_metadata_array)
op.c_op.data.receive_initial_metadata.receive_initial_metadata = (
- &op._received_metadata.c_metadata_array)
+ &op._c_metadata_array)
+ op._c_metadata_array_needs_destruction = True
op.is_valid = True
return op
@@ -675,9 +566,10 @@ def operation_receive_status_on_client(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
op.c_op.flags = flags
- op._received_metadata = MetadataArray()
+ grpc_metadata_array_init(&op._c_metadata_array)
op.c_op.data.receive_status_on_client.trailing_metadata = (
- &op._received_metadata.c_metadata_array)
+ &op._c_metadata_array)
+ op._c_metadata_array_needs_destruction = True
op.c_op.data.receive_status_on_client.status = (
&op._received_status_code)
op.c_op.data.receive_status_on_client.status_details = (
@@ -694,59 +586,6 @@ def operation_receive_close_on_server(int flags):
return op
-cdef class _OperationsIterator:
-
- cdef size_t i
- cdef Operations operations
-
- def __cinit__(self, Operations operations not None):
- self.i = 0
- self.operations = operations
-
- def __iter__(self):
- return self
-
- def __next__(self):
- if self.i < len(self.operations):
- result = self.operations[self.i]
- self.i = self.i + 1
- return result
- else:
- raise StopIteration()
-
-
-cdef class Operations:
-
- def __cinit__(self, operations):
- grpc_init()
- self.operations = list(operations) # normalize iterable
- self.c_ops = NULL
- self.c_nops = 0
- for operation in self.operations:
- if not isinstance(operation, Operation):
- raise TypeError("expected operations to be iterable of Operation")
- self.c_nops = len(self.operations)
- with nogil:
- self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op)*self.c_nops)
- for i in range(self.c_nops):
- self.c_ops[i] = (<Operation>(self.operations[i])).c_op
-
- def __len__(self):
- return self.c_nops
-
- def __getitem__(self, size_t i):
- # self.operations is never stale; it's only updated from this file
- return self.operations[i]
-
- def __dealloc__(self):
- with nogil:
- gpr_free(self.c_ops)
- grpc_shutdown()
-
- def __iter__(self):
- return _OperationsIterator(self)
-
-
cdef class CompressionOptions:
def __cinit__(self):