aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2018-04-20 02:49:34 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2018-05-02 18:24:47 +0000
commitca7ba4d0ac3ab452c5db60befc8be37fd6e2339b (patch)
tree813d64334b810569555737be39f3d81bc9f4dc7c /src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
parentd3eaf64687416c19177ba6a078f9f11599c063b9 (diff)
Keep Core memory inside cygrpc.Channel objects
This removes invocation-side completion queues from the _cygrpc API. Invocation-side calls are changed to no longer share the same lifetime as Core calls. Illegal metadata is now detected on invocation rather than at the start of a batch (so passing illegal metadata to a response-streaming method will now raise an exception immediately rather than later on when attempting to read the first response message). It is no longer possible to create a call without immediately starting at least one batch of operations on it. Only tests are affected by this change; there are no real use cases in which one wants to start a call but wait a little while before learning that the server has rejected it. It is now required that code above cygrpc.Channel spend threads on next_event whenever events are pending. A cygrpc.Channel.close method is introduced, but it merely blocks until the cygrpc.Channel's completion queues are drained; it does not itself drain them. Noteworthy here is that we drop the cygrpc.Channel.__dealloc__ method. It is not the same as __del__ (which is not something that can be added to cygrpc.Channel) and there is no guarantee that __dealloc__ will be called at all or that it will be called while the cygrpc.Channel instance's Python attributes are intact (in testing, I saw both in different environments). This commit does not knowingly break any garbage-collection-based memory management working (or "happening to appear to work in some circumstances"), though if it does, the proper remedy is to call grpc.Channel.close... which is the objective towards which this commit builds.
Diffstat (limited to 'src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi93
1 files changed, 54 insertions, 39 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 40496d1124..a2d765546a 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -20,6 +20,53 @@ import time
cdef int _INTERRUPT_CHECK_PERIOD_MS = 200
+cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline):
+ cdef gpr_timespec c_increment
+ cdef gpr_timespec c_timeout
+ cdef gpr_timespec c_deadline
+ c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
+ if deadline is None:
+ c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
+ else:
+ c_deadline = _timespec_from_time(deadline)
+
+ with nogil:
+ while True:
+ c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
+ if gpr_time_cmp(c_timeout, c_deadline) > 0:
+ c_timeout = c_deadline
+ c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL)
+ if (c_event.type != GRPC_QUEUE_TIMEOUT or
+ gpr_time_cmp(c_timeout, c_deadline) == 0):
+ break
+
+ # Handle any signals
+ with gil:
+ cpython.PyErr_CheckSignals()
+ return c_event
+
+
+cdef _interpret_event(grpc_event c_event):
+ cdef _Tag tag
+ if c_event.type == GRPC_QUEUE_TIMEOUT:
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
+ elif c_event.type == GRPC_QUEUE_SHUTDOWN:
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None)
+ else:
+ tag = <_Tag>c_event.tag
+ # We receive event tags only after they've been inc-ref'd elsewhere in
+ # the code.
+ cpython.Py_DECREF(tag)
+ return tag, tag.event(c_event)
+
+
+cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline):
+ cdef grpc_event c_event = _next(c_completion_queue, deadline)
+ return _interpret_event(c_event)
+
+
cdef class CompletionQueue:
def __cinit__(self, shutdown_cq=False):
@@ -36,48 +83,16 @@ cdef class CompletionQueue:
self.is_shutting_down = False
self.is_shutdown = False
- cdef _interpret_event(self, grpc_event event):
- cdef _Tag tag = None
- if event.type == GRPC_QUEUE_TIMEOUT:
- # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
- return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
- elif event.type == GRPC_QUEUE_SHUTDOWN:
+ cdef _interpret_event(self, grpc_event c_event):
+ unused_tag, event = _interpret_event(c_event)
+ if event.completion_type == GRPC_QUEUE_SHUTDOWN:
self.is_shutdown = True
- # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
- return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None)
- else:
- tag = <_Tag>event.tag
- # We receive event tags only after they've been inc-ref'd elsewhere in
- # the code.
- cpython.Py_DECREF(tag)
- return tag.event(event)
+ return event
+ # We name this 'poll' to avoid problems with CPython's expectations for
+ # 'special' methods (like next and __next__).
def poll(self, deadline=None):
- # We name this 'poll' to avoid problems with CPython's expectations for
- # 'special' methods (like next and __next__).
- cdef gpr_timespec c_increment
- cdef gpr_timespec c_timeout
- cdef gpr_timespec c_deadline
- if deadline is None:
- c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
- else:
- c_deadline = _timespec_from_time(deadline)
- with nogil:
- c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
-
- while True:
- c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
- if gpr_time_cmp(c_timeout, c_deadline) > 0:
- c_timeout = c_deadline
- event = grpc_completion_queue_next(
- self.c_completion_queue, c_timeout, NULL)
- if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0:
- break;
-
- # Handle any signals
- with gil:
- cpython.PyErr_CheckSignals()
- return self._interpret_event(event)
+ return self._interpret_event(_next(self.c_completion_queue, deadline))
def shutdown(self):
with nogil: