diff options
author | 2018-04-20 02:49:34 +0000 | |
---|---|---|
committer | 2018-05-02 18:24:47 +0000 | |
commit | ca7ba4d0ac3ab452c5db60befc8be37fd6e2339b (patch) | |
tree | 813d64334b810569555737be39f3d81bc9f4dc7c /src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi | |
parent | d3eaf64687416c19177ba6a078f9f11599c063b9 (diff) |
Keep Core memory inside cygrpc.Channel objects
This removes invocation-side completion queues from the _cygrpc API.
Invocation-side calls are changed to no longer share the same lifetime
as Core calls.
Illegal metadata is now detected on invocation rather than at the start
of a batch (so passing illegal metadata to a response-streaming method
will now raise an exception immediately rather than later on when
attempting to read the first response message).
It is no longer possible to create a call without immediately starting
at least one batch of operations on it. Only tests are affected by this
change; there are no real use cases in which one wants to start a call
but wait a little while before learning that the server has rejected
it.
It is now required that code above cygrpc.Channel spend threads on
next_event whenever events are pending. A cygrpc.Channel.close method
is introduced, but it merely blocks until the cygrpc.Channel's
completion queues are drained; it does not itself drain them.
Noteworthy here is that we drop the cygrpc.Channel.__dealloc__ method.
It is not the same as __del__ (which is not something that can be added
to cygrpc.Channel) and there is no guarantee that __dealloc__ will be
called at all or that it will be called while the cygrpc.Channel
instance's Python attributes are intact (in testing, I saw both in
different environments). This commit does not knowingly break any
garbage-collection-based memory management working (or "happening to
appear to work in some circumstances"), though if it does, the proper
remedy is to call grpc.Channel.close... which is the objective towards
which this commit builds.
Diffstat (limited to 'src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi')
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi | 93 |
1 files changed, 54 insertions, 39 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 40496d1124..a2d765546a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -20,6 +20,53 @@ import time cdef int _INTERRUPT_CHECK_PERIOD_MS = 200 +cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline): + cdef gpr_timespec c_increment + cdef gpr_timespec c_timeout + cdef gpr_timespec c_deadline + c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN) + if deadline is None: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) + else: + c_deadline = _timespec_from_time(deadline) + + with nogil: + while True: + c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) + if gpr_time_cmp(c_timeout, c_deadline) > 0: + c_timeout = c_deadline + c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL) + if (c_event.type != GRPC_QUEUE_TIMEOUT or + gpr_time_cmp(c_timeout, c_deadline) == 0): + break + + # Handle any signals + with gil: + cpython.PyErr_CheckSignals() + return c_event + + +cdef _interpret_event(grpc_event c_event): + cdef _Tag tag + if c_event.type == GRPC_QUEUE_TIMEOUT: + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) + elif c_event.type == GRPC_QUEUE_SHUTDOWN: + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None) + else: + tag = <_Tag>c_event.tag + # We receive event tags only after they've been inc-ref'd elsewhere in + # the code. + cpython.Py_DECREF(tag) + return tag, tag.event(c_event) + + +cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline): + cdef grpc_event c_event = _next(c_completion_queue, deadline) + return _interpret_event(c_event) + + cdef class CompletionQueue: def __cinit__(self, shutdown_cq=False): @@ -36,48 +83,16 @@ cdef class CompletionQueue: self.is_shutting_down = False self.is_shutdown = False - cdef _interpret_event(self, grpc_event event): - cdef _Tag tag = None - if event.type == GRPC_QUEUE_TIMEOUT: - # NOTE(nathaniel): For now we coopt ConnectivityEvent here. - return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) - elif event.type == GRPC_QUEUE_SHUTDOWN: + cdef _interpret_event(self, grpc_event c_event): + unused_tag, event = _interpret_event(c_event) + if event.completion_type == GRPC_QUEUE_SHUTDOWN: self.is_shutdown = True - # NOTE(nathaniel): For now we coopt ConnectivityEvent here. - return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None) - else: - tag = <_Tag>event.tag - # We receive event tags only after they've been inc-ref'd elsewhere in - # the code. - cpython.Py_DECREF(tag) - return tag.event(event) + return event + # We name this 'poll' to avoid problems with CPython's expectations for + # 'special' methods (like next and __next__). def poll(self, deadline=None): - # We name this 'poll' to avoid problems with CPython's expectations for - # 'special' methods (like next and __next__). - cdef gpr_timespec c_increment - cdef gpr_timespec c_timeout - cdef gpr_timespec c_deadline - if deadline is None: - c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) - else: - c_deadline = _timespec_from_time(deadline) - with nogil: - c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN) - - while True: - c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) - if gpr_time_cmp(c_timeout, c_deadline) > 0: - c_timeout = c_deadline - event = grpc_completion_queue_next( - self.c_completion_queue, c_timeout, NULL) - if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0: - break; - - # Handle any signals - with gil: - cpython.PyErr_CheckSignals() - return self._interpret_event(event) + return self._interpret_event(_next(self.c_completion_queue, deadline)) def shutdown(self): with nogil: |