diff options
Diffstat (limited to 'src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi')
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi | 249 |
1 files changed, 44 insertions, 205 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 4f87261e17..26eaf50eb4 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -220,9 +220,26 @@ cdef class CallDetails: cdef class OperationTag: - def __cinit__(self, user_tag): + def __cinit__(self, user_tag, operations): self.user_tag = user_tag self.references = [] + self._operations = operations + + cdef void store_ops(self): + self.c_nops = 0 if self._operations is None else len(self._operations) + if 0 < self.c_nops: + self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops) + for index in range(self.c_nops): + self.c_ops[index] = (<Operation>(self._operations[index])).c_op + + cdef object release_ops(self): + if 0 < self.c_nops: + for index, operation in enumerate(self._operations): + (<Operation>operation).c_op = self.c_ops[index] + gpr_free(self.c_ops) + return self._operations + else: + return () cdef class Event: @@ -232,7 +249,7 @@ cdef class Event: CallDetails request_call_details, object request_metadata, bint is_new_request, - Operations batch_operations): + object batch_operations): self.type = type self.success = success self.tag = tag @@ -320,7 +337,7 @@ cdef void* copy_ptr(void* ptr): return ptr -cdef void destroy_ptr(grpc_exec_ctx* ctx, void* ptr): +cdef void destroy_ptr(void* ptr): pass @@ -348,7 +365,7 @@ cdef class ChannelArg: elif hasattr(value, '__int__'): # Pointer objects must override __int__() to return # the underlying C address (Python ints are word size). The - # lifecycle of the pointer is fixed to the lifecycle of the + # lifecycle of the pointer is fixed to the lifecycle of the # python object wrapping it. self.ptr_vtable.copy = ©_ptr self.ptr_vtable.destroy = &destroy_ptr @@ -390,140 +407,13 @@ cdef class ChannelArgs: return self.args[i] -cdef class Metadatum: - - def __cinit__(self, bytes key, bytes value): - self.c_metadata.key = _slice_from_bytes(key) - self.c_metadata.value = _slice_from_bytes(value) - - cdef void _copy_metadatum(self, grpc_metadata *destination) nogil: - destination[0].key = _copy_slice(self.c_metadata.key) - destination[0].value = _copy_slice(self.c_metadata.value) - - @property - def key(self): - return _slice_bytes(self.c_metadata.key) - - @property - def value(self): - return _slice_bytes(self.c_metadata.value) - - def __len__(self): - return 2 - - def __getitem__(self, size_t i): - if i == 0: - return self.key - elif i == 1: - return self.value - else: - raise IndexError("index must be 0 (key) or 1 (value)") - - def __iter__(self): - return iter((self.key, self.value)) - - def __dealloc__(self): - grpc_slice_unref(self.c_metadata.key) - grpc_slice_unref(self.c_metadata.value) - -cdef class _MetadataIterator: - - cdef size_t i - cdef size_t _length - cdef object _metadatum_indexable - - def __cinit__(self, length, metadatum_indexable): - self._length = length - self._metadatum_indexable = metadatum_indexable - self.i = 0 - - def __iter__(self): - return self - - def __next__(self): - if self.i < self._length: - result = self._metadatum_indexable[self.i] - self.i = self.i + 1 - return result - else: - raise StopIteration() - - -# TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this; just use an -# ordinary sequence of pairs of bytestrings all the way down to the -# grpc_call_start_batch call. -cdef class Metadata: - """Metadata being passed from application to core.""" - - def __cinit__(self, metadata_iterable): - metadata_sequence = tuple(metadata_iterable) - cdef size_t count = len(metadata_sequence) - with nogil: - grpc_init() - self.c_metadata = <grpc_metadata *>gpr_malloc( - count * sizeof(grpc_metadata)) - self.c_count = count - for index, metadatum in enumerate(metadata_sequence): - self.c_metadata[index].key = grpc_slice_copy( - (<Metadatum>metadatum).c_metadata.key) - self.c_metadata[index].value = grpc_slice_copy( - (<Metadatum>metadatum).c_metadata.value) - - def __dealloc__(self): - with nogil: - for index in range(self.c_count): - grpc_slice_unref(self.c_metadata[index].key) - grpc_slice_unref(self.c_metadata[index].value) - gpr_free(self.c_metadata) - grpc_shutdown() - - def __len__(self): - return self.c_count - - def __getitem__(self, size_t index): - if index < self.c_count: - key = _slice_bytes(self.c_metadata[index].key) - value = _slice_bytes(self.c_metadata[index].value) - return Metadatum(key, value) - else: - raise IndexError() - - def __iter__(self): - return _MetadataIterator(self.c_count, self) - - -cdef class MetadataArray: - """Metadata being passed from core to application.""" - - def __cinit__(self): - with nogil: - grpc_init() - grpc_metadata_array_init(&self.c_metadata_array) - - def __dealloc__(self): - with nogil: - grpc_metadata_array_destroy(&self.c_metadata_array) - grpc_shutdown() - - def __len__(self): - return self.c_metadata_array.count - - def __getitem__(self, size_t i): - if i >= self.c_metadata_array.count: - raise IndexError() - key = _slice_bytes(self.c_metadata_array.metadata[i].key) - value = _slice_bytes(self.c_metadata_array.metadata[i].value) - return Metadatum(key=key, value=value) - - def __iter__(self): - return _MetadataIterator(self.c_metadata_array.count, self) - - cdef class Operation: def __cinit__(self): grpc_init() self.references = [] + self._c_metadata_needs_release = False + self._c_metadata_array_needs_destruction = False self._status_details = grpc_empty_slice() self.is_valid = False @@ -556,13 +446,7 @@ cdef class Operation: if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT): raise TypeError("self must be an operation receiving metadata") - # TODO(https://github.com/grpc/grpc/issues/7950): Drop the "all Cython - # objects must be legitimate for use from Python at any time" policy in - # place today, shift the policy toward "Operation objects are only usable - # while their calls are active", and move this making-a-copy-because-this- - # data-needs-to-live-much-longer-than-the-call-from-which-it-arose to the - # lowest Python layer. - return tuple(self._received_metadata) + return _metadata(&self._c_metadata_array) @property def received_status_code(self): @@ -602,16 +486,21 @@ cdef class Operation: return False if self._received_cancelled == 0 else True def __dealloc__(self): + if self._c_metadata_needs_release: + _release_c_metadata(self._c_metadata, self._c_metadata_count) + if self._c_metadata_array_needs_destruction: + grpc_metadata_array_destroy(&self._c_metadata_array) grpc_slice_unref(self._status_details) grpc_shutdown() -def operation_send_initial_metadata(Metadata metadata, int flags): +def operation_send_initial_metadata(metadata, int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA op.c_op.flags = flags - op.c_op.data.send_initial_metadata.count = metadata.c_count - op.c_op.data.send_initial_metadata.metadata = metadata.c_metadata - op.references.append(metadata) + _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) + op._c_metadata_needs_release = True + op.c_op.data.send_initial_metadata.count = op._c_metadata_count + op.c_op.data.send_initial_metadata.metadata = op._c_metadata op.is_valid = True return op @@ -633,18 +522,19 @@ def operation_send_close_from_client(int flags): return op def operation_send_status_from_server( - Metadata metadata, grpc_status_code code, bytes details, int flags): + metadata, grpc_status_code code, bytes details, int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER op.c_op.flags = flags + _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) + op._c_metadata_needs_release = True op.c_op.data.send_status_from_server.trailing_metadata_count = ( - metadata.c_count) - op.c_op.data.send_status_from_server.trailing_metadata = metadata.c_metadata + op._c_metadata_count) + op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata op.c_op.data.send_status_from_server.status = code grpc_slice_unref(op._status_details) op._status_details = _slice_from_bytes(details) op.c_op.data.send_status_from_server.status_details = &op._status_details - op.references.append(metadata) op.is_valid = True return op @@ -652,9 +542,10 @@ def operation_receive_initial_metadata(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA op.c_op.flags = flags - op._received_metadata = MetadataArray() + grpc_metadata_array_init(&op._c_metadata_array) op.c_op.data.receive_initial_metadata.receive_initial_metadata = ( - &op._received_metadata.c_metadata_array) + &op._c_metadata_array) + op._c_metadata_array_needs_destruction = True op.is_valid = True return op @@ -675,9 +566,10 @@ def operation_receive_status_on_client(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT op.c_op.flags = flags - op._received_metadata = MetadataArray() + grpc_metadata_array_init(&op._c_metadata_array) op.c_op.data.receive_status_on_client.trailing_metadata = ( - &op._received_metadata.c_metadata_array) + &op._c_metadata_array) + op._c_metadata_array_needs_destruction = True op.c_op.data.receive_status_on_client.status = ( &op._received_status_code) op.c_op.data.receive_status_on_client.status_details = ( @@ -694,59 +586,6 @@ def operation_receive_close_on_server(int flags): return op -cdef class _OperationsIterator: - - cdef size_t i - cdef Operations operations - - def __cinit__(self, Operations operations not None): - self.i = 0 - self.operations = operations - - def __iter__(self): - return self - - def __next__(self): - if self.i < len(self.operations): - result = self.operations[self.i] - self.i = self.i + 1 - return result - else: - raise StopIteration() - - -cdef class Operations: - - def __cinit__(self, operations): - grpc_init() - self.operations = list(operations) # normalize iterable - self.c_ops = NULL - self.c_nops = 0 - for operation in self.operations: - if not isinstance(operation, Operation): - raise TypeError("expected operations to be iterable of Operation") - self.c_nops = len(self.operations) - with nogil: - self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op)*self.c_nops) - for i in range(self.c_nops): - self.c_ops[i] = (<Operation>(self.operations[i])).c_op - - def __len__(self): - return self.c_nops - - def __getitem__(self, size_t i): - # self.operations is never stale; it's only updated from this file - return self.operations[i] - - def __dealloc__(self): - with nogil: - gpr_free(self.c_ops) - grpc_shutdown() - - def __iter__(self): - return _OperationsIterator(self) - - cdef class CompressionOptions: def __cinit__(self): |