From c4d10dfb1b65f6e559db55ad8448ab37fc8d4fa8 Mon Sep 17 00:00:00 2001 From: Masood Malekghassemi Date: Wed, 18 Jan 2017 17:47:52 -0800 Subject: Fix Python memory errors ... but for real this time. --- .../grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi | 19 +-- .../grpc/_cython/_cygrpc/completion_queue.pyx.pxi | 10 +- .../grpcio/grpc/_cython/_cygrpc/records.pxd.pxi | 24 +--- .../grpcio/grpc/_cython/_cygrpc/records.pyx.pxi | 144 +++++++++------------ 4 files changed, 84 insertions(+), 113 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index e4c24a83ab..246e8399bc 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -60,24 +60,25 @@ cdef class Channel: method, host, Timespec deadline not None): if queue.is_shutting_down: raise ValueError("queue must not be shutting down or shutdown") - cdef Slice method_slice = Slice.from_bytes(method) - cdef Slice host_slice - cdef grpc_slice *host_c_slice = NULL + cdef grpc_slice method_slice = _slice_from_bytes(method) + cdef grpc_slice host_slice + cdef grpc_slice *host_slice_ptr = NULL if host is not None: - host_slice = Slice.from_bytes(host) - host_c_slice = &host_slice.c_slice - else: - host_slice = Slice() + host_slice = _slice_from_bytes(host) + host_slice_ptr = &host_slice cdef Call operation_call = Call() - operation_call.references = [self, method_slice, host_slice, queue] + operation_call.references = [self, queue] cdef grpc_call *parent_call = NULL if parent is not None: parent_call = parent.c_call with nogil: operation_call.c_call = grpc_channel_create_call( self.c_channel, parent_call, flags, - queue.c_completion_queue, method_slice.c_slice, host_c_slice, + queue.c_completion_queue, method_slice, host_slice_ptr, deadline.c_time, NULL) + grpc_slice_unref(method_slice) + if host_slice_ptr: + grpc_slice_unref(host_slice) return operation_call def check_connectivity_state(self, bint try_to_connect): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 2b5cce88a4..d8df6c2ef4 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -51,6 +51,7 @@ cdef class CompletionQueue: cdef CallDetails request_call_details = None cdef Metadata request_metadata = None cdef Operations batch_operations = None + cdef Operation batch_operation = None if event.type == GRPC_QUEUE_TIMEOUT: return Event( event.type, False, None, None, None, None, False, None) @@ -69,10 +70,15 @@ cdef class CompletionQueue: user_tag = tag.user_tag operation_call = tag.operation_call request_call_details = tag.request_call_details - request_metadata = tag.request_metadata - if request_metadata is not None: + if tag.request_metadata is not None: + request_metadata = tag.request_metadata request_metadata._claim_slice_ownership() batch_operations = tag.batch_operations + if tag.batch_operations is not None: + for op in batch_operations.operations: + batch_operation = op + if batch_operation._received_metadata is not None: + batch_operation._received_metadata._claim_slice_ownership() if tag.is_new_request: # Stuff in the tag not explicitly handled by us needs to live through # the life of the call diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index e927605384..c4a17118c0 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -28,6 +28,11 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +cdef bytes _slice_bytes(grpc_slice slice) +cdef grpc_slice _copy_slice(grpc_slice slice) nogil +cdef grpc_slice _slice_from_bytes(bytes value) nogil + + cdef class Timespec: cdef gpr_timespec c_time @@ -70,17 +75,6 @@ cdef class Event: cdef readonly Operations batch_operations -cdef class Slice: - - cdef grpc_slice c_slice - - cdef void _assign_slice(self, grpc_slice new_slice) nogil - @staticmethod - cdef Slice from_slice(grpc_slice slice) - @staticmethod - cdef bytes bytes_from_slice(grpc_slice slice) - - cdef class ByteBuffer: cdef grpc_byte_buffer *c_byte_buffer @@ -108,17 +102,13 @@ cdef class ChannelArgs: cdef class Metadatum: cdef grpc_metadata c_metadata - cdef Slice _key, - cdef Slice _value + cdef void _copy_metadatum(self, grpc_metadata *destination) nogil cdef class Metadata: cdef grpc_metadata_array c_metadata_array - cdef bint owns_metadata_slices - cdef object metadata cdef void _claim_slice_ownership(self) - cdef void _drop_slice_ownership(self) cdef class Operation: @@ -127,7 +117,7 @@ cdef class Operation: cdef ByteBuffer _received_message cdef Metadata _received_metadata cdef grpc_status_code _received_status_code - cdef Slice _received_status_details + cdef grpc_slice _status_details cdef int _received_cancelled cdef readonly bint is_valid cdef object references diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index d7a7713332..d052b3f8bc 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -29,6 +29,26 @@ from libc.stdint cimport intptr_t + +cdef bytes _slice_bytes(grpc_slice slice): + cdef void *start = grpc_slice_start_ptr(slice) + cdef size_t length = grpc_slice_length(slice) + return (start)[:length] + +cdef grpc_slice _copy_slice(grpc_slice slice) nogil: + cdef void *start = grpc_slice_start_ptr(slice) + cdef size_t length = grpc_slice_length(slice) + return grpc_slice_from_copied_buffer(start, length) + +cdef grpc_slice _slice_from_bytes(bytes value) nogil: + cdef const char *value_ptr + cdef size_t length + with gil: + value_ptr = value + length = len(value) + return grpc_slice_from_copied_buffer(value_ptr, length) + + class ConnectivityState: idle = GRPC_CHANNEL_IDLE connecting = GRPC_CHANNEL_CONNECTING @@ -189,11 +209,11 @@ cdef class CallDetails: @property def method(self): - return Slice.bytes_from_slice(self.c_details.method) + return _slice_bytes(self.c_details.method) @property def host(self): - return Slice.bytes_from_slice(self.c_details.host) + return _slice_bytes(self.c_details.host) @property def deadline(self): @@ -227,46 +247,6 @@ cdef class Event: self.is_new_request = is_new_request -cdef class Slice: - - def __cinit__(self): - with nogil: - grpc_init() - self.c_slice = grpc_empty_slice() - - cdef void _assign_slice(self, grpc_slice new_slice) nogil: - grpc_slice_unref(self.c_slice) - self.c_slice = new_slice - - @staticmethod - def from_bytes(bytes data): - cdef Slice self = Slice() - self._assign_slice(grpc_slice_from_copied_buffer(data, len(data))) - return self - - @staticmethod - cdef Slice from_slice(grpc_slice slice): - cdef Slice self = Slice() - grpc_slice_ref(slice) - self._assign_slice(slice) - return self - - @staticmethod - cdef bytes bytes_from_slice(grpc_slice slice): - with nogil: - pointer = grpc_slice_start_ptr(slice) - length = grpc_slice_length(slice) - return (pointer)[:length] - - def bytes(self): - return Slice.bytes_from_slice(self.c_slice) - - def __dealloc__(self): - with nogil: - grpc_slice_unref(self.c_slice) - grpc_shutdown() - - cdef class ByteBuffer: def __cinit__(self, bytes data): @@ -416,20 +396,21 @@ cdef class ChannelArgs: cdef class Metadatum: - # TODO(atash) this should just accept Slice objects. def __cinit__(self, bytes key, bytes value): - self._key = Slice.from_bytes(key) - self._value = Slice.from_bytes(value) - self.c_metadata.key = self._key.c_slice - self.c_metadata.value = self._value.c_slice + self.c_metadata.key = _slice_from_bytes(key) + self.c_metadata.value = _slice_from_bytes(value) + + cdef void _copy_metadatum(self, grpc_metadata *destination) nogil: + destination[0].key = _copy_slice(self.c_metadata.key) + destination[0].value = _copy_slice(self.c_metadata.value) @property def key(self): - return self._key.bytes() + return _slice_bytes(self.c_metadata.key) @property def value(self): - return self._value.bytes() + return _slice_bytes(self.c_metadata.value) def __len__(self): return 2 @@ -445,6 +426,9 @@ cdef class Metadatum: def __iter__(self): return iter((self.key, self.value)) + def __dealloc__(self): + grpc_slice_unref(self.c_metadata.key) + grpc_slice_unref(self.c_metadata.value) cdef class _MetadataIterator: @@ -466,34 +450,27 @@ cdef class _MetadataIterator: else: raise StopIteration -cdef grpc_slice _copy_slice(grpc_slice slice) nogil: - cdef void *start = grpc_slice_start_ptr(slice) - cdef size_t length = grpc_slice_length(slice) - return grpc_slice_from_copied_buffer(start, length) cdef class Metadata: - def __cinit__(self, metadata): + def __cinit__(self, metadata_iterable): with nogil: grpc_init() grpc_metadata_array_init(&self.c_metadata_array) - self.owns_metadata_slices = False - self.metadata = list(metadata) - for metadatum in self.metadata: + metadata = list(metadata_iterable) + for metadatum in metadata: if not isinstance(metadatum, Metadatum): raise TypeError("expected list of Metadatum") - self.c_metadata_array.count = len(self.metadata) - self.c_metadata_array.capacity = len(self.metadata) + self.c_metadata_array.count = len(metadata) + self.c_metadata_array.capacity = len(metadata) with nogil: self.c_metadata_array.metadata = gpr_malloc( self.c_metadata_array.count*sizeof(grpc_metadata) ) for i in range(self.c_metadata_array.count): - self.c_metadata_array.metadata[i] = ( - (self.metadata[i]).c_metadata) + (metadata[i])._copy_metadatum(&self.c_metadata_array.metadata[i]) def __dealloc__(self): - self._drop_slice_ownership() with nogil: # this frees the allocated memory for the grpc_metadata_array (although # it'd be nice if that were documented somewhere...) @@ -507,30 +484,26 @@ cdef class Metadata: def __getitem__(self, size_t i): if i >= self.c_metadata_array.count: raise IndexError - return Metadatum( - key=Slice.bytes_from_slice(self.c_metadata_array.metadata[i].key), - value=Slice.bytes_from_slice(self.c_metadata_array.metadata[i].value)) + key = _slice_bytes(self.c_metadata_array.metadata[i].key) + value = _slice_bytes(self.c_metadata_array.metadata[i].value) + return Metadatum(key=key, value=value) def __iter__(self): return _MetadataIterator(self) cdef void _claim_slice_ownership(self): - if self.owns_metadata_slices: - return + cdef grpc_metadata_array new_c_metadata_array + grpc_metadata_array_init(&new_c_metadata_array) + new_c_metadata_array.metadata = gpr_malloc( + self.c_metadata_array.count*sizeof(grpc_metadata)) + new_c_metadata_array.count = self.c_metadata_array.count for i in range(self.c_metadata_array.count): - self.c_metadata_array.metadata[i].key = _copy_slice( + new_c_metadata_array.metadata[i].key = _copy_slice( self.c_metadata_array.metadata[i].key) - self.c_metadata_array.metadata[i].value = _copy_slice( + new_c_metadata_array.metadata[i].value = _copy_slice( self.c_metadata_array.metadata[i].value) - self.owns_metadata_slices = True - - cdef void _drop_slice_ownership(self): - if not self.owns_metadata_slices: - return - for i in range(self.c_metadata_array.count): - grpc_slice_unref(self.c_metadata_array.metadata[i].key) - grpc_slice_unref(self.c_metadata_array.metadata[i].value) - self.owns_metadata_slices = False + grpc_metadata_array_destroy(&self.c_metadata_array) + self.c_metadata_array = new_c_metadata_array cdef class Operation: @@ -538,7 +511,7 @@ cdef class Operation: def __cinit__(self): grpc_init() self.references = [] - self._received_status_details = Slice() + self._status_details = grpc_empty_slice() self.is_valid = False @property @@ -595,13 +568,13 @@ cdef class Operation: def received_status_details(self): if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: raise TypeError("self must be an operation receiving status details") - return self._received_status_details.bytes() + return _slice_bytes(self._status_details) @property def received_status_details_or_none(self): if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: return None - return self._received_status_details.bytes() + return _slice_bytes(self._status_details) @property def received_cancelled(self): @@ -617,6 +590,7 @@ cdef class Operation: return False if self._received_cancelled == 0 else True def __dealloc__(self): + grpc_slice_unref(self._status_details) grpc_shutdown() def operation_send_initial_metadata(Metadata metadata, int flags): @@ -657,10 +631,10 @@ def operation_send_status_from_server( op.c_op.data.send_status_from_server.trailing_metadata = ( metadata.c_metadata_array.metadata) op.c_op.data.send_status_from_server.status = code - cdef Slice details_slice = Slice.from_bytes(details) - op.c_op.data.send_status_from_server.status_details = &details_slice.c_slice + grpc_slice_unref(op._status_details) + op._status_details = _slice_from_bytes(details) + op.c_op.data.send_status_from_server.status_details = &op._status_details op.references.append(metadata) - op.references.append(details_slice) op.is_valid = True return op @@ -696,7 +670,7 @@ def operation_receive_status_on_client(int flags): op.c_op.data.receive_status_on_client.status = ( &op._received_status_code) op.c_op.data.receive_status_on_client.status_details = ( - &op._received_status_details.c_slice) + &op._status_details) op.is_valid = True return op -- cgit v1.2.3