diff options
Diffstat (limited to 'src/python/grpcio')
6 files changed, 66 insertions, 41 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd index fd562ad75b..1ed5d4b229 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd @@ -37,3 +37,5 @@ cdef class CompletionQueue: cdef bint is_polling cdef bint is_shutting_down cdef bint is_shutdown + + cdef _interpret_event(self, grpc.grpc_event event) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx index 2cf49707b4..635a38fe28 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx @@ -46,35 +46,13 @@ cdef class CompletionQueue: self.poll_condition = threading.Condition() self.is_polling = False - def poll(self, records.Timespec deadline=None): - # We name this 'poll' to avoid problems with CPython's expectations for - # 'special' methods (like next and __next__). - cdef grpc.gpr_timespec c_deadline = grpc.gpr_inf_future( - grpc.GPR_CLOCK_REALTIME) + cdef _interpret_event(self, grpc.grpc_event event): cdef records.OperationTag tag = None cdef object user_tag = None cdef call.Call operation_call = None cdef records.CallDetails request_call_details = None cdef records.Metadata request_metadata = None cdef records.Operations batch_operations = None - if deadline is not None: - c_deadline = deadline.c_time - cdef grpc.grpc_event event - - # Poll within a critical section - # TODO consider making queue polling contention a hard error to enable - # easier bug discovery - with self.poll_condition: - while self.is_polling: - self.poll_condition.wait(float(deadline) - time.time()) - self.is_polling = True - with nogil: - event = grpc.grpc_completion_queue_next( - self.c_completion_queue, c_deadline, NULL) - with self.poll_condition: - self.is_polling = False - self.poll_condition.notify() - if event.type == grpc.GRPC_QUEUE_TIMEOUT: return records.Event( event.type, False, None, None, None, None, False, None) @@ -104,6 +82,54 @@ cdef class CompletionQueue: request_call_details, request_metadata, tag.is_new_request, batch_operations) + def poll(self, records.Timespec deadline=None): + # We name this 'poll' to avoid problems with CPython's expectations for + # 'special' methods (like next and __next__). + cdef grpc.gpr_timespec c_deadline = grpc.gpr_inf_future( + grpc.GPR_CLOCK_REALTIME) + if deadline is not None: + c_deadline = deadline.c_time + cdef grpc.grpc_event event + + # Poll within a critical section + # TODO(atash) consider making queue polling contention a hard error to + # enable easier bug discovery + with self.poll_condition: + while self.is_polling: + self.poll_condition.wait(float(deadline) - time.time()) + self.is_polling = True + with nogil: + event = grpc.grpc_completion_queue_next( + self.c_completion_queue, c_deadline, NULL) + with self.poll_condition: + self.is_polling = False + self.poll_condition.notify() + return self._interpret_event(event) + + def pluck(self, records.OperationTag tag, records.Timespec deadline=None): + # Plucking a 'None' tag is equivalent to passing control to GRPC core until + # the deadline. + cdef grpc.gpr_timespec c_deadline = grpc.gpr_inf_future( + grpc.GPR_CLOCK_REALTIME) + if deadline is not None: + c_deadline = deadline.c_time + cdef grpc.grpc_event event + + # Poll within a critical section + # TODO(atash) consider making queue polling contention a hard error to + # enable easier bug discovery + with self.poll_condition: + while self.is_polling: + self.poll_condition.wait(float(deadline) - time.time()) + self.is_polling = True + with nogil: + event = grpc.grpc_completion_queue_pluck( + self.c_completion_queue, <cpython.PyObject *>tag, c_deadline, NULL) + with self.poll_condition: + self.is_polling = False + self.poll_condition.notify() + return self._interpret_event(event) + def shutdown(self): grpc.grpc_completion_queue_shutdown(self.c_completion_queue) self.is_shutting_down = True diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd index 643cdc9e3d..9b10d2ae75 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd @@ -28,6 +28,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. cimport libc.time +from libc.stdint cimport int64_t, uint32_t, int32_t cdef extern from "grpc/support/alloc.h": @@ -55,15 +56,6 @@ cdef extern from "grpc/support/slice.h": size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) -cdef extern from "grpc/support/port_platform.h": - # As long as the header file gets this type right, we don't need to get this - # type exactly; just close enough that the operations will be supported in the - # underlying C layers. - ctypedef unsigned int gpr_uint32 - ctypedef int gpr_int32 - ctypedef long int gpr_int64 - - cdef extern from "grpc/support/time.h": ctypedef enum gpr_clock_type: @@ -73,8 +65,8 @@ cdef extern from "grpc/support/time.h": GPR_TIMESPAN ctypedef struct gpr_timespec: - gpr_int64 seconds "tv_sec" - gpr_int32 nanoseconds "tv_nsec" + int64_t seconds "tv_sec" + int32_t nanoseconds "tv_nsec" gpr_clock_type clock_type gpr_timespec gpr_time_0(gpr_clock_type type) @@ -282,7 +274,7 @@ cdef extern from "grpc/grpc.h": ctypedef struct grpc_op: grpc_op_type type "op" - gpr_uint32 flags + uint32_t flags grpc_op_data data void grpc_init() @@ -292,6 +284,9 @@ cdef extern from "grpc/grpc.h": grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) nogil + grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, + gpr_timespec deadline, + void *reserved) nogil void grpc_completion_queue_shutdown(grpc_completion_queue *cq) void grpc_completion_queue_destroy(grpc_completion_queue *cq) @@ -310,7 +305,7 @@ cdef extern from "grpc/grpc.h": void *reserved) grpc_call *grpc_channel_create_call(grpc_channel *channel, grpc_call *parent_call, - gpr_uint32 propagation_mask, + uint32_t propagation_mask, grpc_completion_queue *completion_queue, const char *method, const char *host, gpr_timespec deadline, void *reserved) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx index 46df8bf77f..b0bafbc1f4 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx @@ -89,6 +89,8 @@ cdef class Server: self.register_completion_queue(self.backup_shutdown_queue) self.is_started = True grpc.grpc_server_start(self.c_server) + # Ensure the core has gotten a chance to do the start-up work + self.backup_shutdown_queue.pluck(None, records.Timespec(None)) def add_http2_port(self, address, credentials.ServerCredentials server_credentials=None): diff --git a/src/python/grpcio/grpc/framework/interfaces/face/face.py b/src/python/grpcio/grpc/framework/interfaces/face/face.py index 3b402356d2..59da83dc9c 100644 --- a/src/python/grpcio/grpc/framework/interfaces/face/face.py +++ b/src/python/grpcio/grpc/framework/interfaces/face/face.py @@ -378,7 +378,7 @@ class UnaryUnaryMultiCallable(object): metadata: A metadata value to be passed to the service-side of the RPC. with_call: Whether or not to include return a Call for the RPC in addition - to the reponse. + to the response. protocol_options: A value specified by the provider of a Face interface implementation affording custom state and behavior. @@ -496,7 +496,7 @@ class StreamUnaryMultiCallable(object): metadata: A metadata value to be passed to the service-side of the RPC. with_call: Whether or not to include return a Call for the RPC in addition - to the reponse. + to the response. protocol_options: A value specified by the provider of a Face interface implementation affording custom state and behavior. @@ -699,7 +699,7 @@ class GenericStub(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. with_call: Whether or not to include return a Call for the RPC in addition - to the reponse. + to the response. protocol_options: A value specified by the provider of a Face interface implementation affording custom state and behavior. @@ -774,7 +774,7 @@ class GenericStub(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. with_call: Whether or not to include return a Call for the RPC in addition - to the reponse. + to the response. protocol_options: A value specified by the provider of a Face interface implementation affording custom state and behavior. diff --git a/src/python/grpcio/setup.py b/src/python/grpcio/setup.py index b8a98c3d85..366ebe3b3f 100644 --- a/src/python/grpcio/setup.py +++ b/src/python/grpcio/setup.py @@ -146,7 +146,7 @@ TEST_PACKAGE_DATA = { TESTS_REQUIRE = ( 'oauth2client>=1.4.7', - 'protobuf==3.0.0a3', + 'protobuf>=3.0.0a3', 'coverage>=4.0', ) + INSTALL_REQUIRES |