diff options
author | Masood Malekghassemi <atash@google.com> | 2016-02-29 14:41:42 -0800 |
---|---|---|
committer | Masood Malekghassemi <atash@google.com> | 2016-02-29 14:48:33 -0800 |
commit | 334e0ee37012752a9747594dbdbde9f3f0dbc8d5 (patch) | |
tree | dab035c86ff4099d0a00923d434841f079eacaa5 /src/python | |
parent | 1df086067c66abeaded61fc725144df780362c0e (diff) |
Address some memory hazards in Cython code
Some __dealloc__ methods were calling Python methods, and some
references were being dropped on the floor instead of threaded through
gRPC core.
Diffstat (limited to 'src/python')
4 files changed, 24 insertions, 15 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index ac67f32d92..f68dfd1b24 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -89,12 +89,13 @@ cdef class Channel: def check_connectivity_state(self, bint try_to_connect): return grpc_channel_check_connectivity_state(self.c_channel, - try_to_connect) + try_to_connect) def watch_connectivity_state( - self, last_observed_state, Timespec deadline not None, - CompletionQueue queue not None, tag): + self, grpc_connectivity_state last_observed_state, + Timespec deadline not None, CompletionQueue queue not None, tag): cdef OperationTag operation_tag = OperationTag(tag) + operation_tag.references = [self, queue] cpython.Py_INCREF(operation_tag) grpc_channel_watch_connectivity_state( self.c_channel, last_observed_state, deadline.c_time, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index bbf8413299..59cfc1f452 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -137,10 +137,14 @@ cdef class CompletionQueue: pass def __dealloc__(self): + cdef gpr_timespec c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) if self.c_completion_queue != NULL: - # Ensure shutdown, pump the queue + # Ensure shutdown if not self.is_shutting_down: - self.shutdown() + grpc_completion_queue_shutdown(self.c_completion_queue) + # Pump the queue while not self.is_shutdown: - self.poll() + event = grpc_completion_queue_next( + self.c_completion_queue, c_deadline, NULL) + self._interpret_event(event) grpc_completion_queue_destroy(self.c_completion_queue) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi index 9db49e4d30..a35eb5ea77 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi @@ -39,4 +39,5 @@ cdef class Server: cdef list references cdef list registered_completion_queues + cdef _c_shutdown(self, CompletionQueue queue, tag) cdef notify_shutdown_complete(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 8b65935c3b..60db447798 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -102,6 +102,16 @@ cdef class Server: else: return grpc_server_add_insecure_http2_port(self.c_server, address) + cdef _c_shutdown(self, CompletionQueue queue, tag): + self.is_shutting_down = True + operation_tag = OperationTag(tag) + operation_tag.shutting_down_server = self + operation_tag.references.extend([self, queue]) + cpython.Py_INCREF(operation_tag) + grpc_server_shutdown_and_notify( + self.c_server, queue.c_completion_queue, + <cpython.PyObject *>operation_tag) + def shutdown(self, CompletionQueue queue not None, tag): cdef OperationTag operation_tag if queue.is_shutting_down: @@ -113,14 +123,7 @@ cdef class Server: elif queue not in self.registered_completion_queues: raise ValueError("expected registered completion queue") else: - self.is_shutting_down = True - operation_tag = OperationTag(tag) - operation_tag.shutting_down_server = self - operation_tag.references.extend([self, queue]) - cpython.Py_INCREF(operation_tag) - grpc_server_shutdown_and_notify( - self.c_server, queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + self._c_shutdown(queue, tag) cdef notify_shutdown_complete(self): # called only by a completion queue on receiving our shutdown operation tag @@ -142,7 +145,7 @@ cdef class Server: pass elif not self.is_shutting_down: # the user didn't call shutdown - use our backup queue - self.shutdown(self.backup_shutdown_queue, None) + self._c_shutdown(self.backup_shutdown_queue, None) # and now we wait while not self.is_shutdown: self.backup_shutdown_queue.poll() |