diff options
author | David Garcia Quintas <dgq@google.com> | 2016-06-02 10:24:51 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-06-02 10:24:51 -0700 |
commit | e7518461c24b7f8745a3f1246081ce35eb4db68b (patch) | |
tree | 742669980975a5d229a1f50e796704a70c706ca4 /src/python | |
parent | 772f4853347860d394a7db08a89c2030cd271513 (diff) | |
parent | f78b822db787421bfd90a747ed6fdbb1d3dd4b80 (diff) |
Merge branch 'master' of github.com:grpc/grpc into lr_hook
Diffstat (limited to 'src/python')
-rw-r--r-- | src/python/grpcio/grpc/__init__.py | 758 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi | 2 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi | 12 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/imports.generated.c | 2 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/imports.generated.h | 3 | ||||
-rw-r--r-- | src/python/grpcio/grpc/framework/foundation/future.py | 2 | ||||
-rw-r--r-- | src/python/grpcio/grpc_core_dependencies.py | 37 | ||||
-rw-r--r-- | src/python/grpcio/tests/tests.json | 1 | ||||
-rw-r--r-- | src/python/grpcio/tests/unit/_cython/_cancel_many_calls_test.py | 222 | ||||
-rw-r--r-- | src/python/grpcio/tests/unit/framework/common/test_control.py | 26 |
10 files changed, 1044 insertions, 21 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index b844a14c48..86447314b6 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -27,5 +27,763 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +"""gRPC's Python API.""" + __import__('pkg_resources').declare_namespace(__name__) +import abc +import enum + +import six + +from grpc._cython import cygrpc as _cygrpc + + +############################## Future Interface ############################### + + +class FutureTimeoutError(Exception): + """Indicates that a method call on a Future timed out.""" + + +class FutureCancelledError(Exception): + """Indicates that the computation underlying a Future was cancelled.""" + + +class Future(six.with_metaclass(abc.ABCMeta)): + """A representation of a computation in another control flow. + + Computations represented by a Future may be yet to be begun, may be ongoing, + or may have already completed. + """ + + @abc.abstractmethod + def cancel(self): + """Attempts to cancel the computation. + + This method does not block. + + Returns: + True if the computation has not yet begun, will not be allowed to take + place, and determination of both was possible without blocking. False + under all other circumstances including but not limited to the + computation's already having begun, the computation's already having + finished, and the computation's having been scheduled for execution on a + remote system for which a determination of whether or not it commenced + before being cancelled cannot be made without blocking. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Describes whether the computation was cancelled. + + This method does not block. + + Returns: + True if the computation was cancelled any time before its result became + immediately available. False under all other circumstances including but + not limited to this object's cancel method not having been called and + the computation's result having become immediately available. + """ + raise NotImplementedError() + + @abc.abstractmethod + def running(self): + """Describes whether the computation is taking place. + + This method does not block. + + Returns: + True if the computation is scheduled to take place in the future or is + taking place now, or False if the computation took place in the past or + was cancelled. + """ + raise NotImplementedError() + + @abc.abstractmethod + def done(self): + """Describes whether the computation has taken place. + + This method does not block. + + Returns: + True if the computation is known to have either completed or have been + unscheduled or interrupted. False if the computation may possibly be + executing or scheduled to execute later. + """ + raise NotImplementedError() + + @abc.abstractmethod + def result(self, timeout=None): + """Accesses the outcome of the computation or raises its exception. + + This method may return immediately or may block. + + Args: + timeout: The length of time in seconds to wait for the computation to + finish or be cancelled, or None if this method should block until the + computation has finished or is cancelled no matter how long that takes. + + Returns: + The return value of the computation. + + Raises: + FutureTimeoutError: If a timeout value is passed and the computation does + not terminate within the allotted time. + FutureCancelledError: If the computation was cancelled. + Exception: If the computation raised an exception, this call will raise + the same exception. + """ + raise NotImplementedError() + + @abc.abstractmethod + def exception(self, timeout=None): + """Return the exception raised by the computation. + + This method may return immediately or may block. + + Args: + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. + + Returns: + The exception raised by the computation, or None if the computation did + not raise an exception. + + Raises: + FutureTimeoutError: If a timeout value is passed and the computation does + not terminate within the allotted time. + FutureCancelledError: If the computation was cancelled. + """ + raise NotImplementedError() + + @abc.abstractmethod + def traceback(self, timeout=None): + """Access the traceback of the exception raised by the computation. + + This method may return immediately or may block. + + Args: + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. + + Returns: + The traceback of the exception raised by the computation, or None if the + computation did not raise an exception. + + Raises: + FutureTimeoutError: If a timeout value is passed and the computation does + not terminate within the allotted time. + FutureCancelledError: If the computation was cancelled. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_done_callback(self, fn): + """Adds a function to be called at completion of the computation. + + The callback will be passed this Future object describing the outcome of + the computation. + + If the computation has already completed, the callback will be called + immediately. + + Args: + fn: A callable taking this Future object as its single parameter. + """ + raise NotImplementedError() + + +################################ gRPC Enums ################################## + + +@enum.unique +class ChannelConnectivity(enum.Enum): + """Mirrors grpc_connectivity_state in the gRPC Core. + + Attributes: + IDLE: The channel is idle. + CONNECTING: The channel is connecting. + READY: The channel is ready to conduct RPCs. + TRANSIENT_FAILURE: The channel has seen a failure from which it expects to + recover. + FATAL_FAILURE: The channel has seen a failure from which it cannot recover. + """ + IDLE = (_cygrpc.ConnectivityState.idle, 'idle') + CONNECTING = (_cygrpc.ConnectivityState.connecting, 'connecting') + READY = (_cygrpc.ConnectivityState.ready, 'ready') + TRANSIENT_FAILURE = ( + _cygrpc.ConnectivityState.transient_failure, 'transient failure') + FATAL_FAILURE = (_cygrpc.ConnectivityState.fatal_failure, 'fatal failure') + + +@enum.unique +class StatusCode(enum.Enum): + """Mirrors grpc_status_code in the gRPC Core.""" + OK = (_cygrpc.StatusCode.ok, 'ok') + CANCELLED = (_cygrpc.StatusCode.cancelled, 'cancelled') + UNKNOWN = (_cygrpc.StatusCode.unknown, 'unknown') + INVALID_ARGUMENT = ( + _cygrpc.StatusCode.invalid_argument, 'invalid argument') + DEADLINE_EXCEEDED = ( + _cygrpc.StatusCode.deadline_exceeded, 'deadline exceeded') + NOT_FOUND = (_cygrpc.StatusCode.not_found, 'not found') + ALREADY_EXISTS = (_cygrpc.StatusCode.already_exists, 'already exists') + PERMISSION_DENIED = ( + _cygrpc.StatusCode.permission_denied, 'permission denied') + RESOURCE_EXHAUSTED = ( + _cygrpc.StatusCode.resource_exhausted, 'resource exhausted') + FAILED_PRECONDITION = ( + _cygrpc.StatusCode.failed_precondition, 'failed precondition') + ABORTED = (_cygrpc.StatusCode.aborted, 'aborted') + OUT_OF_RANGE = (_cygrpc.StatusCode.out_of_range, 'out of range') + UNIMPLEMENTED = (_cygrpc.StatusCode.unimplemented, 'unimplemented') + INTERNAL = (_cygrpc.StatusCode.internal, 'internal') + UNAVAILABLE = (_cygrpc.StatusCode.unavailable, 'unavailable') + DATA_LOSS = (_cygrpc.StatusCode.data_loss, 'data loss') + UNAUTHENTICATED = (_cygrpc.StatusCode.unauthenticated, 'unauthenticated') + + +############################# gRPC Exceptions ################################ + + +class RpcError(Exception): + """Raised by the gRPC library to indicate non-OK-status RPC termination.""" + + +############################## Shared Context ################################ + + +class RpcContext(six.with_metaclass(abc.ABCMeta)): + """Provides RPC-related information and action.""" + + @abc.abstractmethod + def is_active(self): + """Describes whether the RPC is active or has terminated.""" + raise NotImplementedError() + + @abc.abstractmethod + def time_remaining(self): + """Describes the length of allowed time remaining for the RPC. + + Returns: + A nonnegative float indicating the length of allowed time in seconds + remaining for the RPC to complete before it is considered to have timed + out, or None if no deadline was specified for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + """Cancels the RPC. + + Idempotent and has no effect if the RPC has already terminated. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_callback(self, callback): + """Registers a callback to be called on RPC termination. + + Args: + callback: A no-parameter callable to be called on RPC termination. + + Returns: + True if the callback was added and will be called later; False if the + callback was not added and will not later be called (because the RPC + already terminated or some other reason). + """ + raise NotImplementedError() + + +######################### Invocation-Side Context ############################ + + +class Call(six.with_metaclass(abc.ABCMeta, RpcContext)): + """Invocation-side utility object for an RPC.""" + + @abc.abstractmethod + def initial_metadata(self): + """Accesses the initial metadata from the service-side of the RPC. + + This method blocks until the value is available. + + Returns: + The initial metadata as a sequence of pairs of bytes. + """ + raise NotImplementedError() + + @abc.abstractmethod + def trailing_metadata(self): + """Accesses the trailing metadata from the service-side of the RPC. + + This method blocks until the value is available. + + Returns: + The trailing metadata as a sequence of pairs of bytes. + """ + raise NotImplementedError() + + @abc.abstractmethod + def code(self): + """Accesses the status code emitted by the service-side of the RPC. + + This method blocks until the value is available. + + Returns: + The StatusCode value for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def details(self): + """Accesses the details value emitted by the service-side of the RPC. + + This method blocks until the value is available. + + Returns: + The bytes of the details of the RPC. + """ + raise NotImplementedError() + + +######################## Multi-Callable Interfaces ########################### + + +class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): + """Affords invoking a unary-unary RPC.""" + + @abc.abstractmethod + def __call__(self, request, timeout=None, metadata=None, with_call=False): + """Synchronously invokes the underlying RPC. + + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow for the RPC. + metadata: An optional sequence of pairs of bytes to be transmitted to the + service-side of the RPC. + with_call: Whether or not to include return a Call for the RPC in addition + to the response. + + Returns: + The response value for the RPC, and a Call for the RPC if with_call was + set to True at invocation. + + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ + raise NotImplementedError() + + @abc.abstractmethod + def future(self, request, timeout=None, metadata=None): + """Asynchronously invokes the underlying RPC. + + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow for the RPC. + metadata: An optional sequence of pairs of bytes to be transmitted to the + service-side of the RPC. + + Returns: + An object that is both a Call for the RPC and a Future. In the event of + RPC completion, the return Future's result value will be the response + message of the RPC. Should the event terminate with non-OK status, the + returned Future's exception value will be an RpcError. + """ + raise NotImplementedError() + + +class UnaryStreamMultiCallable(six.with_metaclass(abc.ABCMeta)): + """Affords invoking a unary-stream RPC.""" + + @abc.abstractmethod + def __call__(self, request, timeout=None, metadata=None): + """Invokes the underlying RPC. + + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow for the RPC. + metadata: An optional sequence of pairs of bytes to be transmitted to the + service-side of the RPC. + + Returns: + An object that is both a Call for the RPC and an iterator of response + values. Drawing response values from the returned iterator may raise + RpcError indicating termination of the RPC with non-OK status. + """ + raise NotImplementedError() + + +class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): + """Affords invoking a stream-unary RPC in any call style.""" + + @abc.abstractmethod + def __call__( + self, request_iterator, timeout=None, metadata=None, with_call=False): + """Synchronously invokes the underlying RPC. + + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: An optional duration of time in seconds to allow for the RPC. + metadata: An optional sequence of pairs of bytes to be transmitted to the + service-side of the RPC. + with_call: Whether or not to include return a Call for the RPC in addition + to the response. + + Returns: + The response value for the RPC, and a Call for the RPC if with_call was + set to True at invocation. + + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ + raise NotImplementedError() + + @abc.abstractmethod + def future(self, request_iterator, timeout=None, metadata=None): + """Asynchronously invokes the underlying RPC. + + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: An optional duration of time in seconds to allow for the RPC. + metadata: An optional sequence of pairs of bytes to be transmitted to the + service-side of the RPC. + + Returns: + An object that is both a Call for the RPC and a Future. In the event of + RPC completion, the return Future's result value will be the response + message of the RPC. Should the event terminate with non-OK status, the + returned Future's exception value will be an RpcError. + """ + raise NotImplementedError() + + +class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)): + """Affords invoking a stream-stream RPC in any call style.""" + + @abc.abstractmethod + def __call__(self, request_iterator, timeout=None, metadata=None): + """Invokes the underlying RPC. + + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: An optional duration of time in seconds to allow for the RPC. + metadata: An optional sequence of pairs of bytes to be transmitted to the + service-side of the RPC. + + Returns: + An object that is both a Call for the RPC and an iterator of response + values. Drawing response values from the returned iterator may raise + RpcError indicating termination of the RPC with non-OK status. + """ + raise NotImplementedError() + + +############################# Channel Interface ############################## + + +class Channel(six.with_metaclass(abc.ABCMeta)): + """Affords RPC invocation via generic methods.""" + + @abc.abstractmethod + def subscribe(self, callback, try_to_connect=False): + """Subscribes to this Channel's connectivity. + + Args: + callback: A callable to be invoked and passed a ChannelConnectivity value + describing this Channel's connectivity. The callable will be invoked + immediately upon subscription and again for every change to this + Channel's connectivity thereafter until it is unsubscribed or this + Channel object goes out of scope. + try_to_connect: A boolean indicating whether or not this Channel should + attempt to connect if it is not already connected and ready to conduct + RPCs. + """ + raise NotImplementedError() + + @abc.abstractmethod + def unsubscribe(self, callback): + """Unsubscribes a callback from this Channel's connectivity. + + Args: + callback: A callable previously registered with this Channel from having + been passed to its "subscribe" method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def unary_unary( + self, method, request_serializer=None, response_deserializer=None): + """Creates a UnaryUnaryMultiCallable for a unary-unary method. + + Args: + method: The name of the RPC method. + + Returns: + A UnaryUnaryMultiCallable value for the named unary-unary method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def unary_stream( + self, method, request_serializer=None, response_deserializer=None): + """Creates a UnaryStreamMultiCallable for a unary-stream method. + + Args: + method: The name of the RPC method. + + Returns: + A UnaryStreamMultiCallable value for the name unary-stream method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_unary( + self, method, request_serializer=None, response_deserializer=None): + """Creates a StreamUnaryMultiCallable for a stream-unary method. + + Args: + method: The name of the RPC method. + + Returns: + A StreamUnaryMultiCallable value for the named stream-unary method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_stream( + self, method, request_serializer=None, response_deserializer=None): + """Creates a StreamStreamMultiCallable for a stream-stream method. + + Args: + method: The name of the RPC method. + + Returns: + A StreamStreamMultiCallable value for the named stream-stream method. + """ + raise NotImplementedError() + + +########################## Service-Side Context ############################## + + +class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)): + """A context object passed to method implementations.""" + + @abc.abstractmethod + def invocation_metadata(self): + """Accesses the metadata from the invocation-side of the RPC. + + Returns: + The invocation metadata object as a sequence of pairs of bytes. + """ + raise NotImplementedError() + + @abc.abstractmethod + def peer(self): + """Identifies the peer that invoked the RPC being serviced. + + Returns: + A string identifying the peer that invoked the RPC being serviced. + """ + raise NotImplementedError() + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the initial metadata value to the invocation-side of the RPC. + + This method need not be called by method implementations if they have no + service-side initial metadata to transmit. + + Args: + initial_metadata: The initial metadata of the RPC as a sequence of pairs + of bytes. + """ + raise NotImplementedError() + + @abc.abstractmethod + def set_trailing_metadata(self, trailing_metadata): + """Accepts the trailing metadata value of the RPC. + + This method need not be called by method implementations if they have no + service-side trailing metadata to transmit. + + Args: + trailing_metadata: The trailing metadata of the RPC as a sequence of pairs + of bytes. + """ + raise NotImplementedError() + + @abc.abstractmethod + def set_code(self, code): + """Accepts the status code of the RPC. + + This method need not be called by method implementations if they wish the + gRPC runtime to determine the status code of the RPC. + + Args: + code: The integer status code of the RPC to be transmitted to the + invocation side of the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def set_details(self, details): + """Accepts the service-side details of the RPC. + + This method need not be called by method implementations if they have no + details to transmit. + + Args: + details: The details bytes of the RPC to be transmitted to + the invocation side of the RPC. + """ + raise NotImplementedError() + + +##################### Service-Side Handler Interfaces ######################## + + +class RpcMethodHandler(six.with_metaclass(abc.ABCMeta)): + """An implementation of a single RPC method. + + Attributes: + request_streaming: Whether the RPC supports exactly one request message or + any arbitrary number of request messages. + response_streaming: Whether the RPC supports exactly one response message or + any arbitrary number of response messages. + request_deserializer: A callable behavior that accepts a byte string and + returns an object suitable to be passed to this object's business logic, + or None to indicate that this object's business logic should be passed the + raw request bytes. + response_serializer: A callable behavior that accepts an object produced by + this object's business logic and returns a byte string, or None to + indicate that the byte strings produced by this object's business logic + should be transmitted on the wire as they are. + unary_unary: This object's application-specific business logic as a callable + value that takes a request value and a ServicerContext object and returns + a response value. Only non-None if both request_streaming and + response_streaming are False. + unary_stream: This object's application-specific business logic as a + callable value that takes a request value and a ServicerContext object and + returns an iterator of response values. Only non-None if request_streaming + is False and response_streaming is True. + stream_unary: This object's application-specific business logic as a + callable value that takes an iterator of request values and a + ServicerContext object and returns a response value. Only non-None if + request_streaming is True and response_streaming is False. + stream_stream: This object's application-specific business logic as a + callable value that takes an iterator of request values and a + ServicerContext object and returns an iterator of response values. Only + non-None if request_streaming and response_streaming are both True. + """ + + +class HandlerCallDetails(six.with_metaclass(abc.ABCMeta)): + """Describes an RPC that has just arrived for service. + + Attributes: + method: The method name of the RPC. + invocation_metadata: The metadata from the invocation side of the RPC. + """ + + +class GenericRpcHandler(six.with_metaclass(abc.ABCMeta)): + """An implementation of arbitrarily many RPC methods.""" + + @abc.abstractmethod + def service(self, handler_call_details): + """Services an RPC (or not). + + Args: + handler_call_details: A HandlerCallDetails describing the RPC. + + Returns: + An RpcMethodHandler with which the RPC may be serviced, or None to + indicate that this object will not be servicing the RPC. + """ + raise NotImplementedError() + + +############################# Server Interface ############################### + + +class Server(six.with_metaclass(abc.ABCMeta)): + """Services RPCs.""" + + @abc.abstractmethod + def add_generic_rpc_handlers(self, generic_rpc_handlers): + """Registers GenericRpcHandlers with this Server. + + This method is only safe to call before the server is started. + + Args: + generic_rpc_handlers: An iterable of GenericRpcHandlers that will be used + to service RPCs after this Server is started. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_insecure_port(self, address): + """Reserves a port for insecure RPC service once this Server becomes active. + + This method may only be called before calling this Server's start method is + called. + + Args: + address: The address for which to open a port. + + Returns: + An integer port on which RPCs will be serviced after this link has been + started. This is typically the same number as the port number contained + in the passed address, but will likely be different if the port number + contained in the passed address was zero. + """ + raise NotImplementedError() + + @abc.abstractmethod + def start(self): + """Starts this Server's service of RPCs. + + This method may only be called while the server is not serving RPCs (i.e. it + is not idempotent). + """ + raise NotImplementedError() + + @abc.abstractmethod + def stop(self, grace): + """Stops this Server's service of RPCs. + + All calls to this method immediately stop service of new RPCs. When existing + RPCs are aborted is controlled by the grace period parameter passed to this + method. + + This method may be called at any time and is idempotent. Passing a smaller + grace value than has been passed in a previous call will have the effect of + stopping the Server sooner. Passing a larger grace value than has been + passed in a previous call will not have the effect of stopping the server + later. + + Args: + grace: A duration of time in seconds to allow existing RPCs to complete + before being aborted by this Server's stopping. If None, this method + will block until the server is completely stopped. + + Returns: + A threading.Event that will be set when this Server has completely + stopped. The returned event may not be set until after the full grace + period (if some ongoing RPC continues for the full length of the period) + of it may be set much sooner (such as if this Server had no RPCs underway + at the time it was stopped or if all RPCs that it had underway completed + very early in the grace period). + """ + raise NotImplementedError() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 66e6e6b549..d42c58050f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -336,6 +336,8 @@ cdef extern from "grpc/_cython/loader.h": void grpc_server_register_completion_queue(grpc_server *server, grpc_completion_queue *cq, void *reserved) nogil + void grpc_server_register_non_listening_completion_queue( + grpc_server *server, grpc_completion_queue *cq, void *reserved) nogil int grpc_server_add_insecure_http2_port( grpc_server *server, const char *addr) nogil void grpc_server_start(grpc_server *server) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 8419a59068..55948755b2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -81,11 +81,20 @@ cdef class Server: self.c_server, queue.c_completion_queue, NULL) self.registered_completion_queues.append(queue) + def register_non_listening_completion_queue( + self, CompletionQueue queue not None): + if self.is_started: + raise ValueError("cannot register completion queues after start") + with nogil: + grpc_server_register_non_listening_completion_queue( + self.c_server, queue.c_completion_queue, NULL) + self.registered_completion_queues.append(queue) + def start(self): if self.is_started: raise ValueError("the server has already started") self.backup_shutdown_queue = CompletionQueue() - self.register_completion_queue(self.backup_shutdown_queue) + self.register_non_listening_completion_queue(self.backup_shutdown_queue) self.is_started = True with nogil: grpc_server_start(self.c_server) @@ -169,4 +178,3 @@ cdef class Server: time.sleep(0) with nogil: grpc_server_destroy(self.c_server) - diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c index 09551472b5..f71cf12844 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.c +++ b/src/python/grpcio/grpc/_cython/imports.generated.c @@ -115,6 +115,7 @@ grpc_server_register_method_type grpc_server_register_method_import; grpc_server_request_registered_call_type grpc_server_request_registered_call_import; grpc_server_create_type grpc_server_create_import; grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; +grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import; grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import; grpc_server_start_type grpc_server_start_import; grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import; @@ -386,6 +387,7 @@ void pygrpc_load_imports(HMODULE library) { grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call"); grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create"); grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue"); + grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue"); grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port"); grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start"); grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify"); diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h index 6de295414a..a364075e9e 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.h +++ b/src/python/grpcio/grpc/_cython/imports.generated.h @@ -296,6 +296,9 @@ extern grpc_server_create_type grpc_server_create_import; typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved); extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; #define grpc_server_register_completion_queue grpc_server_register_completion_queue_import +typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved); +extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import; +#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr); extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import; #define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import diff --git a/src/python/grpcio/grpc/framework/foundation/future.py b/src/python/grpcio/grpc/framework/foundation/future.py index 9210616150..6fb58eadb6 100644 --- a/src/python/grpcio/grpc/framework/foundation/future.py +++ b/src/python/grpcio/grpc/framework/foundation/future.py @@ -232,6 +232,6 @@ class Future(six.with_metaclass(abc.ABCMeta)): immediately. Args: - fn: A callable taking a this Future object as its single parameter. + fn: A callable taking this Future object as its single parameter. """ raise NotImplementedError() diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index f7a5ed8788..d29d00b2eb 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -183,20 +183,28 @@ CORE_SOURCE_FILES = [ 'src/core/ext/transport/chttp2/transport/writing.c', 'src/core/ext/transport/chttp2/alpn/alpn.c', 'src/core/lib/http/httpcli_security_connector.c', - 'src/core/lib/security/b64.c', - 'src/core/lib/security/client_auth_filter.c', - 'src/core/lib/security/credentials.c', - 'src/core/lib/security/credentials_metadata.c', - 'src/core/lib/security/credentials_posix.c', - 'src/core/lib/security/credentials_win32.c', - 'src/core/lib/security/google_default_credentials.c', - 'src/core/lib/security/handshake.c', - 'src/core/lib/security/json_token.c', - 'src/core/lib/security/jwt_verifier.c', - 'src/core/lib/security/secure_endpoint.c', - 'src/core/lib/security/security_connector.c', - 'src/core/lib/security/security_context.c', - 'src/core/lib/security/server_auth_filter.c', + 'src/core/lib/security/context/security_context.c', + 'src/core/lib/security/credentials/composite/composite_credentials.c', + 'src/core/lib/security/credentials/credentials.c', + 'src/core/lib/security/credentials/credentials_metadata.c', + 'src/core/lib/security/credentials/fake/fake_credentials.c', + 'src/core/lib/security/credentials/google_default/credentials_posix.c', + 'src/core/lib/security/credentials/google_default/credentials_win32.c', + 'src/core/lib/security/credentials/google_default/google_default_credentials.c', + 'src/core/lib/security/credentials/iam/iam_credentials.c', + 'src/core/lib/security/credentials/jwt/json_token.c', + 'src/core/lib/security/credentials/jwt/jwt_credentials.c', + 'src/core/lib/security/credentials/jwt/jwt_verifier.c', + 'src/core/lib/security/credentials/oauth2/oauth2_credentials.c', + 'src/core/lib/security/credentials/plugin/plugin_credentials.c', + 'src/core/lib/security/credentials/ssl/ssl_credentials.c', + 'src/core/lib/security/transport/client_auth_filter.c', + 'src/core/lib/security/transport/handshake.c', + 'src/core/lib/security/transport/secure_endpoint.c', + 'src/core/lib/security/transport/security_connector.c', + 'src/core/lib/security/transport/server_auth_filter.c', + 'src/core/lib/security/util/b64.c', + 'src/core/lib/security/util/json_util.c', 'src/core/lib/surface/init_secure.c', 'src/core/lib/tsi/fake_transport_security.c', 'src/core/lib/tsi/ssl_transport_security.c', @@ -238,6 +246,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/load_reporting/load_reporting.c', 'src/core/ext/load_reporting/load_reporting_filter.c', 'src/core/ext/census/context.c', + 'src/core/ext/census/gen/census.pb.c', 'src/core/ext/census/grpc_context.c', 'src/core/ext/census/grpc_filter.c', 'src/core/ext/census/grpc_plugin.c', diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json index 691062f25a..1beb619f87 100644 --- a/src/python/grpcio/tests/tests.json +++ b/src/python/grpcio/tests/tests.json @@ -5,6 +5,7 @@ "_base_interface_test.SyncPeasyTest", "_beta_features_test.BetaFeaturesTest", "_beta_features_test.ContextManagementAndLifecycleTest", + "_cancel_many_calls_test.CancelManyCallsTest", "_channel_test.ChannelTest", "_connectivity_channel_test.ChannelConnectivityTest", "_core_over_links_base_interface_test.AsyncEasyTest", diff --git a/src/python/grpcio/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio/tests/unit/_cython/_cancel_many_calls_test.py new file mode 100644 index 0000000000..c1de779014 --- /dev/null +++ b/src/python/grpcio/tests/unit/_cython/_cancel_many_calls_test.py @@ -0,0 +1,222 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test making many calls and immediately cancelling most of them.""" + +import threading +import unittest + +from grpc._cython import cygrpc +from grpc.framework.foundation import logging_pool +from tests.unit.framework.common import test_constants + +_INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) +_EMPTY_FLAGS = 0 +_EMPTY_METADATA = cygrpc.Metadata(()) + +_SERVER_SHUTDOWN_TAG = 'server_shutdown' +_REQUEST_CALL_TAG = 'request_call' +_RECEIVE_CLOSE_ON_SERVER_TAG = 'receive_close_on_server' +_RECEIVE_MESSAGE_TAG = 'receive_message' +_SERVER_COMPLETE_CALL_TAG = 'server_complete_call' + +_SUCCESS_CALL_FRACTION = 1.0 / 8.0 + + +class _State(object): + + def __init__(self): + self.condition = threading.Condition() + self.handlers_released = False + self.parked_handlers = 0 + self.handled_rpcs = 0 + + +def _is_cancellation_event(event): + return ( + event.tag is _RECEIVE_CLOSE_ON_SERVER_TAG and + event.batch_operations[0].received_cancelled) + + +class _Handler(object): + + def __init__(self, state, completion_queue, rpc_event): + self._state = state + self._lock = threading.Lock() + self._completion_queue = completion_queue + self._call = rpc_event.operation_call + + def __call__(self): + with self._state.condition: + self._state.parked_handlers += 1 + if self._state.parked_handlers == test_constants.THREAD_CONCURRENCY: + self._state.condition.notify_all() + while not self._state.handlers_released: + self._state.condition.wait() + + with self._lock: + self._call.start_batch( + cygrpc.Operations( + (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),)), + _RECEIVE_CLOSE_ON_SERVER_TAG) + self._call.start_batch( + cygrpc.Operations((cygrpc.operation_receive_message(_EMPTY_FLAGS),)), + _RECEIVE_MESSAGE_TAG) + first_event = self._completion_queue.poll() + if _is_cancellation_event(first_event): + self._completion_queue.poll() + else: + with self._lock: + operations = ( + cygrpc.operation_send_initial_metadata( + _EMPTY_METADATA, _EMPTY_FLAGS), + cygrpc.operation_send_message(b'\x79\x57', _EMPTY_FLAGS), + cygrpc.operation_send_status_from_server( + _EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!', + _EMPTY_FLAGS), + ) + self._call.start_batch( + cygrpc.Operations(operations), _SERVER_COMPLETE_CALL_TAG) + self._completion_queue.poll() + self._completion_queue.poll() + + +def _serve(state, server, server_completion_queue, thread_pool): + for _ in range(test_constants.RPC_CONCURRENCY): + call_completion_queue = cygrpc.CompletionQueue() + server.request_call( + call_completion_queue, server_completion_queue, _REQUEST_CALL_TAG) + rpc_event = server_completion_queue.poll() + thread_pool.submit(_Handler(state, call_completion_queue, rpc_event)) + with state.condition: + state.handled_rpcs += 1 + if test_constants.RPC_CONCURRENCY <= state.handled_rpcs: + state.condition.notify_all() + server_completion_queue.poll() + + +class _QueueDriver(object): + + def __init__(self, condition, completion_queue, due): + self._condition = condition + self._completion_queue = completion_queue + self._due = due + self._events = [] + self._returned = False + + def start(self): + def in_thread(): + while True: + event = self._completion_queue.poll() + with self._condition: + self._events.append(event) + self._due.remove(event.tag) + self._condition.notify_all() + if not self._due: + self._returned = True + return + thread = threading.Thread(target=in_thread) + thread.start() + + def events(self, at_least): + with self._condition: + while len(self._events) < at_least: + self._condition.wait() + return tuple(self._events) + + +class CancelManyCallsTest(unittest.TestCase): + + def testCancelManyCalls(self): + server_thread_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + + server_completion_queue = cygrpc.CompletionQueue() + server = cygrpc.Server() + server.register_completion_queue(server_completion_queue) + port = server.add_http2_port('[::]:0') + server.start() + channel = cygrpc.Channel('localhost:{}'.format(port)) + + state = _State() + + server_thread_args = ( + state, server, server_completion_queue, server_thread_pool,) + server_thread = threading.Thread(target=_serve, args=server_thread_args) + server_thread.start() + + client_condition = threading.Condition() + client_due = set() + client_completion_queue = cygrpc.CompletionQueue() + client_driver = _QueueDriver( + client_condition, client_completion_queue, client_due) + client_driver.start() + + with client_condition: + client_calls = [] + for index in range(test_constants.RPC_CONCURRENCY): + client_call = channel.create_call( + None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', None, + _INFINITE_FUTURE) + operations = ( + cygrpc.operation_send_initial_metadata( + _EMPTY_METADATA, _EMPTY_FLAGS), + cygrpc.operation_send_message(b'\x45\x56', _EMPTY_FLAGS), + cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), + cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), + ) + tag = 'client_complete_call_{0:04d}_tag'.format(index) + client_call.start_batch(cygrpc.Operations(operations), tag) + client_due.add(tag) + client_calls.append(client_call) + + with state.condition: + while True: + if state.parked_handlers < test_constants.THREAD_CONCURRENCY: + state.condition.wait() + elif state.handled_rpcs < test_constants.RPC_CONCURRENCY: + state.condition.wait() + else: + state.handlers_released = True + state.condition.notify_all() + break + + client_driver.events( + test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION) + with client_condition: + for client_call in client_calls: + client_call.cancel() + + with state.condition: + server.shutdown(server_completion_queue, _SERVER_SHUTDOWN_TAG) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/framework/common/test_control.py b/src/python/grpcio/tests/unit/framework/common/test_control.py index ca5ba3a854..088e2f8b88 100644 --- a/src/python/grpcio/tests/unit/framework/common/test_control.py +++ b/src/python/grpcio/tests/unit/framework/common/test_control.py @@ -60,10 +60,16 @@ class Control(six.with_metaclass(abc.ABCMeta)): class PauseFailControl(Control): - """A Control that can be used to pause or fail code under control.""" + """A Control that can be used to pause or fail code under control. + + This object is only safe for use from two threads: one of the system under + test calling control and the other from the test system calling pause, + block_until_paused, and fail. + """ def __init__(self): self._condition = threading.Condition() + self._pause = False self._paused = False self._fail = False @@ -72,19 +78,31 @@ class PauseFailControl(Control): if self._fail: raise Defect() - while self._paused: + while self._pause: + self._paused = True + self._condition.notify_all() self._condition.wait() + self._paused = False @contextlib.contextmanager def pause(self): """Pauses code under control while controlling code is in context.""" with self._condition: - self._paused = True + self._pause = True yield with self._condition: - self._paused = False + self._pause = False self._condition.notify_all() + def block_until_paused(self): + """Blocks controlling code until code under control is paused. + + May only be called within the context of a pause call. + """ + with self._condition: + while not self._paused: + self._condition.wait() + @contextlib.contextmanager def fail(self): """Fails code under control while controlling code is in context.""" |