aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx72
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd21
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx2
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/face/face.py8
-rw-r--r--src/python/grpcio/setup.py2
6 files changed, 66 insertions, 41 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd
index fd562ad75b..1ed5d4b229 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd
@@ -37,3 +37,5 @@ cdef class CompletionQueue:
cdef bint is_polling
cdef bint is_shutting_down
cdef bint is_shutdown
+
+ cdef _interpret_event(self, grpc.grpc_event event)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx
index 2cf49707b4..635a38fe28 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx
@@ -46,35 +46,13 @@ cdef class CompletionQueue:
self.poll_condition = threading.Condition()
self.is_polling = False
- def poll(self, records.Timespec deadline=None):
- # We name this 'poll' to avoid problems with CPython's expectations for
- # 'special' methods (like next and __next__).
- cdef grpc.gpr_timespec c_deadline = grpc.gpr_inf_future(
- grpc.GPR_CLOCK_REALTIME)
+ cdef _interpret_event(self, grpc.grpc_event event):
cdef records.OperationTag tag = None
cdef object user_tag = None
cdef call.Call operation_call = None
cdef records.CallDetails request_call_details = None
cdef records.Metadata request_metadata = None
cdef records.Operations batch_operations = None
- if deadline is not None:
- c_deadline = deadline.c_time
- cdef grpc.grpc_event event
-
- # Poll within a critical section
- # TODO consider making queue polling contention a hard error to enable
- # easier bug discovery
- with self.poll_condition:
- while self.is_polling:
- self.poll_condition.wait(float(deadline) - time.time())
- self.is_polling = True
- with nogil:
- event = grpc.grpc_completion_queue_next(
- self.c_completion_queue, c_deadline, NULL)
- with self.poll_condition:
- self.is_polling = False
- self.poll_condition.notify()
-
if event.type == grpc.GRPC_QUEUE_TIMEOUT:
return records.Event(
event.type, False, None, None, None, None, False, None)
@@ -104,6 +82,54 @@ cdef class CompletionQueue:
request_call_details, request_metadata, tag.is_new_request,
batch_operations)
+ def poll(self, records.Timespec deadline=None):
+ # We name this 'poll' to avoid problems with CPython's expectations for
+ # 'special' methods (like next and __next__).
+ cdef grpc.gpr_timespec c_deadline = grpc.gpr_inf_future(
+ grpc.GPR_CLOCK_REALTIME)
+ if deadline is not None:
+ c_deadline = deadline.c_time
+ cdef grpc.grpc_event event
+
+ # Poll within a critical section
+ # TODO(atash) consider making queue polling contention a hard error to
+ # enable easier bug discovery
+ with self.poll_condition:
+ while self.is_polling:
+ self.poll_condition.wait(float(deadline) - time.time())
+ self.is_polling = True
+ with nogil:
+ event = grpc.grpc_completion_queue_next(
+ self.c_completion_queue, c_deadline, NULL)
+ with self.poll_condition:
+ self.is_polling = False
+ self.poll_condition.notify()
+ return self._interpret_event(event)
+
+ def pluck(self, records.OperationTag tag, records.Timespec deadline=None):
+ # Plucking a 'None' tag is equivalent to passing control to GRPC core until
+ # the deadline.
+ cdef grpc.gpr_timespec c_deadline = grpc.gpr_inf_future(
+ grpc.GPR_CLOCK_REALTIME)
+ if deadline is not None:
+ c_deadline = deadline.c_time
+ cdef grpc.grpc_event event
+
+ # Poll within a critical section
+ # TODO(atash) consider making queue polling contention a hard error to
+ # enable easier bug discovery
+ with self.poll_condition:
+ while self.is_polling:
+ self.poll_condition.wait(float(deadline) - time.time())
+ self.is_polling = True
+ with nogil:
+ event = grpc.grpc_completion_queue_pluck(
+ self.c_completion_queue, <cpython.PyObject *>tag, c_deadline, NULL)
+ with self.poll_condition:
+ self.is_polling = False
+ self.poll_condition.notify()
+ return self._interpret_event(event)
+
def shutdown(self):
grpc.grpc_completion_queue_shutdown(self.c_completion_queue)
self.is_shutting_down = True
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd
index 643cdc9e3d..9b10d2ae75 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd
@@ -28,6 +28,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
cimport libc.time
+from libc.stdint cimport int64_t, uint32_t, int32_t
cdef extern from "grpc/support/alloc.h":
@@ -55,15 +56,6 @@ cdef extern from "grpc/support/slice.h":
size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s)
-cdef extern from "grpc/support/port_platform.h":
- # As long as the header file gets this type right, we don't need to get this
- # type exactly; just close enough that the operations will be supported in the
- # underlying C layers.
- ctypedef unsigned int gpr_uint32
- ctypedef int gpr_int32
- ctypedef long int gpr_int64
-
-
cdef extern from "grpc/support/time.h":
ctypedef enum gpr_clock_type:
@@ -73,8 +65,8 @@ cdef extern from "grpc/support/time.h":
GPR_TIMESPAN
ctypedef struct gpr_timespec:
- gpr_int64 seconds "tv_sec"
- gpr_int32 nanoseconds "tv_nsec"
+ int64_t seconds "tv_sec"
+ int32_t nanoseconds "tv_nsec"
gpr_clock_type clock_type
gpr_timespec gpr_time_0(gpr_clock_type type)
@@ -282,7 +274,7 @@ cdef extern from "grpc/grpc.h":
ctypedef struct grpc_op:
grpc_op_type type "op"
- gpr_uint32 flags
+ uint32_t flags
grpc_op_data data
void grpc_init()
@@ -292,6 +284,9 @@ cdef extern from "grpc/grpc.h":
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline,
void *reserved) nogil
+ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
+ gpr_timespec deadline,
+ void *reserved) nogil
void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
void grpc_completion_queue_destroy(grpc_completion_queue *cq)
@@ -310,7 +305,7 @@ cdef extern from "grpc/grpc.h":
void *reserved)
grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_call *parent_call,
- gpr_uint32 propagation_mask,
+ uint32_t propagation_mask,
grpc_completion_queue *completion_queue,
const char *method, const char *host,
gpr_timespec deadline, void *reserved)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx
index 46df8bf77f..b0bafbc1f4 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx
@@ -89,6 +89,8 @@ cdef class Server:
self.register_completion_queue(self.backup_shutdown_queue)
self.is_started = True
grpc.grpc_server_start(self.c_server)
+ # Ensure the core has gotten a chance to do the start-up work
+ self.backup_shutdown_queue.pluck(None, records.Timespec(None))
def add_http2_port(self, address,
credentials.ServerCredentials server_credentials=None):
diff --git a/src/python/grpcio/grpc/framework/interfaces/face/face.py b/src/python/grpcio/grpc/framework/interfaces/face/face.py
index 3b402356d2..59da83dc9c 100644
--- a/src/python/grpcio/grpc/framework/interfaces/face/face.py
+++ b/src/python/grpcio/grpc/framework/interfaces/face/face.py
@@ -378,7 +378,7 @@ class UnaryUnaryMultiCallable(object):
metadata: A metadata value to be passed to the service-side of
the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
- to the reponse.
+ to the response.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
@@ -496,7 +496,7 @@ class StreamUnaryMultiCallable(object):
metadata: A metadata value to be passed to the service-side of
the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
- to the reponse.
+ to the response.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
@@ -699,7 +699,7 @@ class GenericStub(object):
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
- to the reponse.
+ to the response.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
@@ -774,7 +774,7 @@ class GenericStub(object):
timeout: A duration of time in seconds to allow for the RPC.
metadata: A metadata value to be passed to the service-side of the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
- to the reponse.
+ to the response.
protocol_options: A value specified by the provider of a Face interface
implementation affording custom state and behavior.
diff --git a/src/python/grpcio/setup.py b/src/python/grpcio/setup.py
index b8a98c3d85..366ebe3b3f 100644
--- a/src/python/grpcio/setup.py
+++ b/src/python/grpcio/setup.py
@@ -146,7 +146,7 @@ TEST_PACKAGE_DATA = {
TESTS_REQUIRE = (
'oauth2client>=1.4.7',
- 'protobuf==3.0.0a3',
+ 'protobuf>=3.0.0a3',
'coverage>=4.0',
) + INSTALL_REQUIRES