aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2017-09-09 15:52:11 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2017-09-09 21:42:38 +0000
commiteda5a4db1e88e484d0c45401c345cbc199d633fd (patch)
treee9ad2c76518d81854aa1585b0704a1306ba3c473 /src/python
parent8380090f953c193f9c511ee844c3099d3f7e0766 (diff)
Fix metadata memory leak
The gRPC Core has two styles for passing metadata: as an integer count along with a grpc_metadata* pointer, which is used for passing metadata into the core, and as a grpc_metadata_array, which is used for passing metadata out of the core. The Cython layer of gRPC Python was using a single data structure wrapping grpc_metadata_array for both purposes, but this was complex because the core manages the slices contained in grpc_metadata_array objects (at least those of which it is aware), so the Cython layer had to keep track of whether or not the core was aware of the slices it was using (and it was also defective, leaking slices). This is solved by realigning with the Cython layer’s intended design of mirroring as closely as possible in Python the gRPC Core API: we use one structure for passing metadata into the core (what is now called cygrpc.Metadata) and second, different structure for receiving metadata out of the core (what was called cygrpc.Metadata but is now cygrpc.MetadataArray, reflecting that it wraps the core’s grpc_metadata_array). All bug fixes should contain added tests preventing regression but this doesn't because I don't know at this time how to write a does-not-leak test for Python that fits well into our existing body of tests. Phooey. Thanks to Dominik Janků (djanku@email.cz) for investigation and an earlier draft of a solution.
Diffstat (limited to 'src/python')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi11
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi9
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi1
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi13
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi118
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi2
6 files changed, 83 insertions, 71 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 28c30e5d35..237f430799 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -41,9 +41,8 @@ cdef class CompletionQueue:
cdef object user_tag = None
cdef Call operation_call = None
cdef CallDetails request_call_details = None
- cdef Metadata request_metadata = None
+ cdef object request_metadata = None
cdef Operations batch_operations = None
- cdef Operation batch_operation = None
if event.type == GRPC_QUEUE_TIMEOUT:
return Event(
event.type, False, None, None, None, None, False, None)
@@ -63,14 +62,8 @@ cdef class CompletionQueue:
operation_call = tag.operation_call
request_call_details = tag.request_call_details
if tag.request_metadata is not None:
- request_metadata = tag.request_metadata
- request_metadata._claim_slice_ownership()
+ request_metadata = tuple(tag.request_metadata)
batch_operations = tag.batch_operations
- if tag.batch_operations is not None:
- for op in batch_operations.operations:
- batch_operation = <Operation>op
- if batch_operation._received_metadata is not None:
- batch_operation._received_metadata._claim_slice_ownership()
if tag.is_new_request:
# Stuff in the tag not explicitly handled by us needs to live through
# the life of the call
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
index 98d7a9820d..57816f1cab 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -76,7 +76,7 @@ cdef class CredentialsMetadataPlugin:
"""
Args:
plugin_callback (callable): Callback accepting a service URL (str/bytes)
- and callback object (accepting a Metadata,
+ and callback object (accepting a MetadataArray,
grpc_status_code, and a str/bytes error message). This argument
when called should be non-blocking and eventually call the callback
object with the appropriate status code/details and metadata (if
@@ -129,8 +129,7 @@ cdef void plugin_get_metadata(
def python_callback(
Metadata metadata, grpc_status_code status,
bytes error_details):
- cb(user_data, metadata.c_metadata_array.metadata,
- metadata.c_metadata_array.count, status, error_details)
+ cb(user_data, metadata.c_metadata, metadata.c_count, status, error_details)
called_flag[0] = True
cdef CredentialsMetadataPlugin self = <CredentialsMetadataPlugin>state
cdef AuthMetadataContext cy_context = AuthMetadataContext()
@@ -139,8 +138,8 @@ cdef void plugin_get_metadata(
self.plugin_callback(cy_context, python_callback)
except Exception as error:
if not called_flag[0]:
- cb(user_data, Metadata([]).c_metadata_array.metadata,
- 0, StatusCode.unknown, traceback.format_exc().encode())
+ cb(user_data, NULL, 0, StatusCode.unknown,
+ traceback.format_exc().encode())
cdef void plugin_destroy_c_plugin_state(void *state) with gil:
cpython.Py_DECREF(<CredentialsMetadataPlugin>state)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 5950bfa0e6..28cf114451 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -59,6 +59,7 @@ cdef extern from "grpc/grpc.h":
grpc_slice grpc_slice_malloc(size_t length) nogil
grpc_slice grpc_slice_from_copied_string(const char *source) nogil
grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len) nogil
+ grpc_slice grpc_slice_copy(grpc_slice s) nogil
# Declare functions for function-like macros (because Cython)...
void *grpc_slice_start_ptr "GRPC_SLICE_START_PTR" (grpc_slice s) nogil
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index 8ace6aeb52..9c40ebf0c2 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -37,7 +37,7 @@ cdef class OperationTag:
cdef Server shutting_down_server
cdef Call operation_call
cdef CallDetails request_call_details
- cdef Metadata request_metadata
+ cdef MetadataArray request_metadata
cdef Operations batch_operations
cdef bint is_new_request
@@ -51,7 +51,7 @@ cdef class Event:
# For Server.request_call
cdef readonly bint is_new_request
cdef readonly CallDetails request_call_details
- cdef readonly Metadata request_metadata
+ cdef readonly object request_metadata
# For server calls
cdef readonly Call operation_call
@@ -92,15 +92,20 @@ cdef class Metadatum:
cdef class Metadata:
+ cdef grpc_metadata *c_metadata
+ cdef readonly size_t c_count
+
+
+cdef class MetadataArray:
+
cdef grpc_metadata_array c_metadata_array
- cdef void _claim_slice_ownership(self)
cdef class Operation:
cdef grpc_op c_op
cdef ByteBuffer _received_message
- cdef Metadata _received_metadata
+ cdef MetadataArray _received_metadata
cdef grpc_status_code _received_status_code
cdef grpc_slice _status_details
cdef int _received_cancelled
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 1b2ddd2469..0a2a6eee05 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -238,7 +238,7 @@ cdef class Event:
def __cinit__(self, grpc_completion_type type, bint success,
object tag, Call operation_call,
CallDetails request_call_details,
- Metadata request_metadata,
+ object request_metadata,
bint is_new_request,
Operations batch_operations):
self.type = type
@@ -437,48 +437,79 @@ cdef class Metadatum:
cdef class _MetadataIterator:
cdef size_t i
- cdef Metadata metadata
+ cdef size_t _length
+ cdef object _metadatum_indexable
- def __cinit__(self, Metadata metadata not None):
+ def __cinit__(self, length, metadatum_indexable):
+ self._length = length
+ self._metadatum_indexable = metadatum_indexable
self.i = 0
- self.metadata = metadata
def __iter__(self):
return self
def __next__(self):
- if self.i < len(self.metadata):
- result = self.metadata[self.i]
+ if self.i < self._length:
+ result = self._metadatum_indexable[self.i]
self.i = self.i + 1
return result
else:
raise StopIteration
+# TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this; just use an
+# ordinary sequence of pairs of bytestrings all the way down to the
+# grpc_call_start_batch call.
cdef class Metadata:
+ """Metadata being passed from application to core."""
def __cinit__(self, metadata_iterable):
+ metadata_sequence = tuple(metadata_iterable)
+ cdef size_t count = len(metadata_sequence)
with nogil:
grpc_init()
- grpc_metadata_array_init(&self.c_metadata_array)
- metadata = list(metadata_iterable)
- for metadatum in metadata:
- if not isinstance(metadatum, Metadatum):
- raise TypeError("expected list of Metadatum")
- self.c_metadata_array.count = len(metadata)
- self.c_metadata_array.capacity = len(metadata)
+ self.c_metadata = <grpc_metadata *>gpr_malloc(
+ count * sizeof(grpc_metadata))
+ self.c_count = count
+ for index, metadatum in enumerate(metadata_sequence):
+ self.c_metadata[index].key = grpc_slice_copy(
+ (<Metadatum>metadatum).c_metadata.key)
+ self.c_metadata[index].value = grpc_slice_copy(
+ (<Metadatum>metadatum).c_metadata.value)
+
+ def __dealloc__(self):
+ with nogil:
+ for index in range(self.c_count):
+ grpc_slice_unref(self.c_metadata[index].key)
+ grpc_slice_unref(self.c_metadata[index].value)
+ gpr_free(self.c_metadata)
+ grpc_shutdown()
+
+ def __len__(self):
+ return self.c_count
+
+ def __getitem__(self, size_t index):
+ if index < self.c_count:
+ key = _slice_bytes(self.c_metadata[index].key)
+ value = _slice_bytes(self.c_metadata[index].value)
+ return Metadatum(key, value)
+ else:
+ raise IndexError()
+
+ def __iter__(self):
+ return _MetadataIterator(self.c_count, self)
+
+
+cdef class MetadataArray:
+ """Metadata being passed from core to application."""
+
+ def __cinit__(self):
with nogil:
- self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
- self.c_metadata_array.count*sizeof(grpc_metadata)
- )
- for i in range(self.c_metadata_array.count):
- (<Metadatum>metadata[i])._copy_metadatum(&self.c_metadata_array.metadata[i])
+ grpc_init()
+ grpc_metadata_array_init(&self.c_metadata_array)
def __dealloc__(self):
with nogil:
- # this frees the allocated memory for the grpc_metadata_array (although
- # it'd be nice if that were documented somewhere...)
- # TODO(atash): document this in the C core
grpc_metadata_array_destroy(&self.c_metadata_array)
grpc_shutdown()
@@ -493,21 +524,7 @@ cdef class Metadata:
return Metadatum(key=key, value=value)
def __iter__(self):
- return _MetadataIterator(self)
-
- cdef void _claim_slice_ownership(self):
- cdef grpc_metadata_array new_c_metadata_array
- grpc_metadata_array_init(&new_c_metadata_array)
- new_c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
- self.c_metadata_array.count*sizeof(grpc_metadata))
- new_c_metadata_array.count = self.c_metadata_array.count
- for i in range(self.c_metadata_array.count):
- new_c_metadata_array.metadata[i].key = _copy_slice(
- self.c_metadata_array.metadata[i].key)
- new_c_metadata_array.metadata[i].value = _copy_slice(
- self.c_metadata_array.metadata[i].value)
- grpc_metadata_array_destroy(&self.c_metadata_array)
- self.c_metadata_array = new_c_metadata_array
+ return _MetadataIterator(self.c_metadata_array.count, self)
cdef class Operation:
@@ -547,14 +564,13 @@ cdef class Operation:
if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
raise TypeError("self must be an operation receiving metadata")
- return self._received_metadata
-
- @property
- def received_metadata_or_none(self):
- if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
- self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
- return None
- return self._received_metadata
+ # TODO(https://github.com/grpc/grpc/issues/7950): Drop the "all Cython
+ # objects must be legitimate for use from Python at any time" policy in
+ # place today, shift the policy toward "Operation objects are only usable
+ # while their calls are active", and move this making-a-copy-because-this-
+ # data-needs-to-live-much-longer-than-the-call-from-which-it-arose to the
+ # lowest Python layer.
+ return tuple(self._received_metadata)
@property
def received_status_code(self):
@@ -601,9 +617,8 @@ def operation_send_initial_metadata(Metadata metadata, int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
op.c_op.flags = flags
- op.c_op.data.send_initial_metadata.count = metadata.c_metadata_array.count
- op.c_op.data.send_initial_metadata.metadata = (
- metadata.c_metadata_array.metadata)
+ op.c_op.data.send_initial_metadata.count = metadata.c_count
+ op.c_op.data.send_initial_metadata.metadata = metadata.c_metadata
op.references.append(metadata)
op.is_valid = True
return op
@@ -631,9 +646,8 @@ def operation_send_status_from_server(
op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
op.c_op.flags = flags
op.c_op.data.send_status_from_server.trailing_metadata_count = (
- metadata.c_metadata_array.count)
- op.c_op.data.send_status_from_server.trailing_metadata = (
- metadata.c_metadata_array.metadata)
+ metadata.c_count)
+ op.c_op.data.send_status_from_server.trailing_metadata = metadata.c_metadata
op.c_op.data.send_status_from_server.status = code
grpc_slice_unref(op._status_details)
op._status_details = _slice_from_bytes(details)
@@ -646,7 +660,7 @@ def operation_receive_initial_metadata(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
op.c_op.flags = flags
- op._received_metadata = Metadata([])
+ op._received_metadata = MetadataArray()
op.c_op.data.receive_initial_metadata.receive_initial_metadata = (
&op._received_metadata.c_metadata_array)
op.is_valid = True
@@ -669,7 +683,7 @@ def operation_receive_status_on_client(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
op.c_op.flags = flags
- op._received_metadata = Metadata([])
+ op._received_metadata = MetadataArray()
op.c_op.data.receive_status_on_client.trailing_metadata = (
&op._received_metadata.c_metadata_array)
op.c_op.data.receive_status_on_client.status = (
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index dd276fd57b..b8db27469f 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -44,7 +44,7 @@ cdef class Server:
cdef OperationTag operation_tag = OperationTag(tag)
operation_tag.operation_call = Call()
operation_tag.request_call_details = CallDetails()
- operation_tag.request_metadata = Metadata([])
+ operation_tag.request_metadata = MetadataArray()
operation_tag.references.extend([self, call_queue, server_queue])
operation_tag.is_new_request = True
operation_tag.batch_operations = Operations([])