aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
Diffstat (limited to 'src/python')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi3
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi59
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi6
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi2
4 files changed, 27 insertions, 43 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
index a67c963684..01089c3dc0 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
@@ -31,9 +31,6 @@
cdef class CompletionQueue:
cdef grpc_completion_queue *c_completion_queue
- cdef object pluck_condition
- cdef int num_plucking
- cdef int num_polling
cdef bint is_shutting_down
cdef bint is_shutdown
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index cdae39d519..90266516fe 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -32,6 +32,8 @@ cimport cpython
import threading
import time
+cdef int _INTERRUPT_CHECK_PERIOD_MS = 200
+
cdef class CompletionQueue:
@@ -40,9 +42,6 @@ cdef class CompletionQueue:
self.c_completion_queue = grpc_completion_queue_create(NULL)
self.is_shutting_down = False
self.is_shutdown = False
- self.pluck_condition = threading.Condition()
- self.num_plucking = 0
- self.num_polling = 0
cdef _interpret_event(self, grpc_event event):
cdef OperationTag tag = None
@@ -83,45 +82,27 @@ cdef class CompletionQueue:
def poll(self, Timespec deadline=None):
# We name this 'poll' to avoid problems with CPython's expectations for
# 'special' methods (like next and __next__).
+ cdef gpr_timespec c_increment
+ cdef gpr_timespec c_timeout
cdef gpr_timespec c_deadline
with nogil:
+ c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
- if deadline is not None:
- c_deadline = deadline.c_time
- cdef grpc_event event
-
- # Poll within a critical section to detect contention
- with self.pluck_condition:
- assert self.num_plucking == 0, 'cannot simultaneously pluck and poll'
- self.num_polling += 1
- with nogil:
- event = grpc_completion_queue_next(
- self.c_completion_queue, c_deadline, NULL)
- with self.pluck_condition:
- self.num_polling -= 1
- return self._interpret_event(event)
-
- def pluck(self, OperationTag tag, Timespec deadline=None):
- # Plucking a 'None' tag is equivalent to passing control to GRPC core until
- # the deadline.
- cdef gpr_timespec c_deadline = gpr_inf_future(
- GPR_CLOCK_REALTIME)
- if deadline is not None:
- c_deadline = deadline.c_time
- cdef grpc_event event
-
- # Pluck within a critical section to detect contention
- with self.pluck_condition:
- assert self.num_polling == 0, 'cannot simultaneously pluck and poll'
- assert self.num_plucking < GRPC_MAX_COMPLETION_QUEUE_PLUCKERS, (
- 'cannot pluck more than {} times simultaneously'.format(
- GRPC_MAX_COMPLETION_QUEUE_PLUCKERS))
- self.num_plucking += 1
- with nogil:
- event = grpc_completion_queue_pluck(
- self.c_completion_queue, <cpython.PyObject *>tag, c_deadline, NULL)
- with self.pluck_condition:
- self.num_plucking -= 1
+ if deadline is not None:
+ c_deadline = deadline.c_time
+
+ while True:
+ c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
+ if gpr_time_cmp(c_timeout, c_deadline) > 0:
+ c_timeout = c_deadline
+ event = grpc_completion_queue_next(
+ self.c_completion_queue, c_timeout, NULL)
+ if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0:
+ break;
+
+ # Handle any signals
+ with gil:
+ cpython.PyErr_CheckSignals()
return self._interpret_event(event)
def shutdown(self):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 05b8886df7..168b9751aa 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -80,6 +80,12 @@ cdef extern from "grpc/_cython/loader.h":
gpr_timespec gpr_convert_clock_type(gpr_timespec t,
gpr_clock_type target_clock) nogil
+ gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type type) nogil
+
+ gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) nogil
+
+ int gpr_time_cmp(gpr_timespec a, gpr_timespec b) nogil
+
ctypedef enum grpc_status_code:
GRPC_STATUS_OK
GRPC_STATUS_CANCELLED
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index 55948755b2..6ce589dbdc 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -99,7 +99,7 @@ cdef class Server:
with nogil:
grpc_server_start(self.c_server)
# Ensure the core has gotten a chance to do the start-up work
- self.backup_shutdown_queue.pluck(None, Timespec(None))
+ self.backup_shutdown_queue.poll(Timespec(None))
def add_http2_port(self, address,
ServerCredentials server_credentials=None):