From 2010985ab269c8df0443e4f3782cbdffb083e9d4 Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Fri, 28 Jul 2017 01:24:52 +0000 Subject: gRPC Python test infrastructure (The channel-related second part of it.) --- .pylintrc | 3 + src/python/grpcio_testing/grpc_testing/__init__.py | 289 ++++++++++++++++++ .../grpc_testing/_channel/__init__.py | 23 ++ .../grpc_testing/_channel/_channel.py | 62 ++++ .../grpc_testing/_channel/_channel_rpc.py | 119 ++++++++ .../grpc_testing/_channel/_channel_state.py | 48 +++ .../grpc_testing/_channel/_invocation.py | 322 +++++++++++++++++++++ .../grpc_testing/_channel/_multi_callable.py | 115 ++++++++ .../grpc_testing/_channel/_rpc_state.py | 193 ++++++++++++ src/python/grpcio_testing/grpc_testing/_common.py | 92 ++++++ src/python/grpcio_tests/setup.py | 4 + .../tests/testing/_application_common.py | 36 +++ .../tests/testing/_application_testing_common.py | 33 +++ .../tests/testing/_client_application.py | 260 +++++++++++++++++ .../grpcio_tests/tests/testing/_client_test.py | 306 ++++++++++++++++++++ .../grpcio_tests/tests/testing/proto/__init__.py | 13 + .../tests/testing/proto/requests.proto | 29 ++ .../tests/testing/proto/services.proto | 42 +++ src/python/grpcio_tests/tests/tests.json | 1 + 19 files changed, 1990 insertions(+) create mode 100644 src/python/grpcio_testing/grpc_testing/_channel/__init__.py create mode 100644 src/python/grpcio_testing/grpc_testing/_channel/_channel.py create mode 100644 src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py create mode 100644 src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py create mode 100644 src/python/grpcio_testing/grpc_testing/_channel/_invocation.py create mode 100644 src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py create mode 100644 src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py create mode 100644 src/python/grpcio_testing/grpc_testing/_common.py create mode 100644 src/python/grpcio_tests/tests/testing/_application_common.py create mode 100644 src/python/grpcio_tests/tests/testing/_application_testing_common.py create mode 100644 src/python/grpcio_tests/tests/testing/_client_application.py create mode 100644 src/python/grpcio_tests/tests/testing/_client_test.py create mode 100644 src/python/grpcio_tests/tests/testing/proto/__init__.py create mode 100644 src/python/grpcio_tests/tests/testing/proto/requests.proto create mode 100644 src/python/grpcio_tests/tests/testing/proto/services.proto diff --git a/.pylintrc b/.pylintrc index 05b4e685fb..453b45aab5 100644 --- a/.pylintrc +++ b/.pylintrc @@ -38,6 +38,9 @@ disable= # TODO(https://github.com/grpc/grpc/issues/261): This doesn't seem to # work for now? Try with a later pylint? locally-disabled, + # NOTE(nathaniel): What even is this? *Enabling* an inspection results + # in a warning? How does that encourage more analysis and coverage? + locally-enabled, # NOTE(nathaniel): We don't write doc strings for most private code # elements. missing-docstring, diff --git a/src/python/grpcio_testing/grpc_testing/__init__.py b/src/python/grpcio_testing/grpc_testing/__init__.py index c5a17f457a..14e25f09e2 100644 --- a/src/python/grpcio_testing/grpc_testing/__init__.py +++ b/src/python/grpcio_testing/grpc_testing/__init__.py @@ -15,11 +15,284 @@ import abc +from google.protobuf import descriptor import six import grpc +class UnaryUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a unary-unary RPC invoked by a system under test. + + Enables users to "play server" for the RPC. + """ + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the RPC's initial metadata to the system under test. + + Args: + initial_metadata: The RPC's initial metadata to be "sent" to + the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Blocks until the system under test has cancelled the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self, response, trailing_metadata, code, details): + """Terminates the RPC. + + Args: + response: The response for the RPC. + trailing_metadata: The RPC's trailing metadata. + code: The RPC's status code. + details: The RPC's status details. + """ + raise NotImplementedError() + + +class UnaryStreamChannelRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a unary-stream RPC invoked by a system under test. + + Enables users to "play server" for the RPC. + """ + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the RPC's initial metadata to the system under test. + + Args: + initial_metadata: The RPC's initial metadata to be "sent" to + the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def send_response(self, response): + """Sends a response to the system under test. + + Args: + response: A response message to be "sent" to the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Blocks until the system under test has cancelled the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self, trailing_metadata, code, details): + """Terminates the RPC. + + Args: + trailing_metadata: The RPC's trailing metadata. + code: The RPC's status code. + details: The RPC's status details. + """ + raise NotImplementedError() + + +class StreamUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a stream-unary RPC invoked by a system under test. + + Enables users to "play server" for the RPC. + """ + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the RPC's initial metadata to the system under test. + + Args: + initial_metadata: The RPC's initial metadata to be "sent" to + the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_request(self): + """Draws one of the requests added to the RPC by the system under test. + + This method blocks until the system under test has added to the RPC + the request to be returned. + + Successive calls to this method return requests in the same order in + which the system under test added them to the RPC. + + Returns: + A request message added to the RPC by the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def requests_closed(self): + """Blocks until the system under test has closed the request stream.""" + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Blocks until the system under test has cancelled the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self, response, trailing_metadata, code, details): + """Terminates the RPC. + + Args: + response: The response for the RPC. + trailing_metadata: The RPC's trailing metadata. + code: The RPC's status code. + details: The RPC's status details. + """ + raise NotImplementedError() + + +class StreamStreamChannelRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a stream-stream RPC invoked by a system under test. + + Enables users to "play server" for the RPC. + """ + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + """Sends the RPC's initial metadata to the system under test. + + Args: + initial_metadata: The RPC's initial metadata to be "sent" to the + system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_request(self): + """Draws one of the requests added to the RPC by the system under test. + + This method blocks until the system under test has added to the RPC + the request to be returned. + + Successive calls to this method return requests in the same order in + which the system under test added them to the RPC. + + Returns: + A request message added to the RPC by the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def send_response(self, response): + """Sends a response to the system under test. + + Args: + response: A response messages to be "sent" to the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def requests_closed(self): + """Blocks until the system under test has closed the request stream.""" + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Blocks until the system under test has cancelled the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self, trailing_metadata, code, details): + """Terminates the RPC. + + Args: + trailing_metadata: The RPC's trailing metadata. + code: The RPC's status code. + details: The RPC's status details. + """ + raise NotImplementedError() + + +class Channel(six.with_metaclass(abc.ABCMeta), grpc.Channel): + """A grpc.Channel double with which to test a system that invokes RPCs.""" + + @abc.abstractmethod + def take_unary_unary(self, method_descriptor): + """Draws an RPC currently being made by the system under test. + + If the given descriptor does not identify any RPC currently being made + by the system under test, this method blocks until the system under + test invokes such an RPC. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a + unary-unary RPC method. + + Returns: + A (invocation_metadata, request, unary_unary_channel_rpc) tuple of + the RPC's invocation metadata, its request, and a + UnaryUnaryChannelRpc with which to "play server" for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_unary_stream(self, method_descriptor): + """Draws an RPC currently being made by the system under test. + + If the given descriptor does not identify any RPC currently being made + by the system under test, this method blocks until the system under + test invokes such an RPC. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a + unary-stream RPC method. + + Returns: + A (invocation_metadata, request, unary_stream_channel_rpc) tuple of + the RPC's invocation metadata, its request, and a + UnaryStreamChannelRpc with which to "play server" for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_stream_unary(self, method_descriptor): + """Draws an RPC currently being made by the system under test. + + If the given descriptor does not identify any RPC currently being made + by the system under test, this method blocks until the system under + test invokes such an RPC. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a + stream-unary RPC method. + + Returns: + A (invocation_metadata, stream_unary_channel_rpc) tuple of the RPC's + invocation metadata and a StreamUnaryChannelRpc with which to "play + server" for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_stream_stream(self, method_descriptor): + """Draws an RPC currently being made by the system under test. + + If the given descriptor does not identify any RPC currently being made + by the system under test, this method blocks until the system under + test invokes such an RPC. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a + stream-stream RPC method. + + Returns: + A (invocation_metadata, stream_stream_channel_rpc) tuple of the RPC's + invocation metadata and a StreamStreamChannelRpc with which to + "play server" for the RPC. + """ + raise NotImplementedError() + + class Time(six.with_metaclass(abc.ABCMeta)): """A simulation of time. @@ -117,3 +390,19 @@ def strict_fake_time(now): """ from grpc_testing import _time return _time.StrictFakeTime(now) + + +def channel(service_descriptors, time): + """Creates a Channel for use in tests of a gRPC Python-using system. + + Args: + service_descriptors: An iterable of descriptor.ServiceDescriptors + describing the RPCs that will be made on the returned Channel by the + system under test. + time: A Time to be used for tests. + + Returns: + A Channel for use in tests. + """ + from grpc_testing import _channel + return _channel.testing_channel(service_descriptors, time) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/__init__.py b/src/python/grpcio_testing/grpc_testing/_channel/__init__.py new file mode 100644 index 0000000000..8011975d0a --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/__init__.py @@ -0,0 +1,23 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from grpc_testing._channel import _channel +from grpc_testing._channel import _channel_state + + +# descriptors is reserved for later use. +# pylint: disable=unused-argument +def testing_channel(descriptors, time): + return _channel.TestingChannel(time, _channel_state.State()) +# pylint: enable=unused-argument diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py new file mode 100644 index 0000000000..fbd064db88 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py @@ -0,0 +1,62 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc_testing +from grpc_testing._channel import _channel_rpc +from grpc_testing._channel import _multi_callable + + +# All serializer and deserializer parameters are not (yet) used by this +# test infrastructure. +# pylint: disable=unused-argument +class TestingChannel(grpc_testing.Channel): + + def __init__(self, time, state): + self._time = time + self._state = state + + def subscribe(self, callback, try_to_connect=False): + raise NotImplementedError() + + def unsubscribe(self, callback): + raise NotImplementedError() + + def unary_unary( + self, method, request_serializer=None, response_deserializer=None): + return _multi_callable.UnaryUnary(method, self._state) + + def unary_stream( + self, method, request_serializer=None, response_deserializer=None): + return _multi_callable.UnaryStream(method, self._state) + + def stream_unary( + self, method, request_serializer=None, response_deserializer=None): + return _multi_callable.StreamUnary(method, self._state) + + def stream_stream( + self, method, request_serializer=None, response_deserializer=None): + return _multi_callable.StreamStream(method, self._state) + + def take_unary_unary(self, method_descriptor): + return _channel_rpc.unary_unary(self._state, method_descriptor) + + def take_unary_stream(self, method_descriptor): + return _channel_rpc.unary_stream(self._state, method_descriptor) + + def take_stream_unary(self, method_descriptor): + return _channel_rpc.stream_unary(self._state, method_descriptor) + + def take_stream_stream(self, method_descriptor): + return _channel_rpc.stream_stream(self._state, method_descriptor) +# pylint: enable=unused-argument diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py new file mode 100644 index 0000000000..762b6a035b --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py @@ -0,0 +1,119 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc_testing + + +class _UnaryUnary(grpc_testing.UnaryUnaryChannelRpc): + + def __init__(self, rpc_state): + self._rpc_state = rpc_state + + def send_initial_metadata(self, initial_metadata): + self._rpc_state.send_initial_metadata(initial_metadata) + + def cancelled(self): + self._rpc_state.cancelled() + + def terminate(self, response, trailing_metadata, code, details): + self._rpc_state.terminate_with_response( + response, trailing_metadata, code, details) + + +class _UnaryStream(grpc_testing.UnaryStreamChannelRpc): + + def __init__(self, rpc_state): + self._rpc_state = rpc_state + + def send_initial_metadata(self, initial_metadata): + self._rpc_state.send_initial_metadata(initial_metadata) + + def send_response(self, response): + self._rpc_state.send_response(response) + + def cancelled(self): + self._rpc_state.cancelled() + + def terminate(self, trailing_metadata, code, details): + self._rpc_state.terminate(trailing_metadata, code, details) + + +class _StreamUnary(grpc_testing.StreamUnaryChannelRpc): + + def __init__(self, rpc_state): + self._rpc_state = rpc_state + + def send_initial_metadata(self, initial_metadata): + self._rpc_state.send_initial_metadata(initial_metadata) + + def take_request(self): + return self._rpc_state.take_request() + + def requests_closed(self): + return self._rpc_state.requests_closed() + + def cancelled(self): + self._rpc_state.cancelled() + + def terminate(self, response, trailing_metadata, code, details): + self._rpc_state.terminate_with_response( + response, trailing_metadata, code, details) + + +class _StreamStream(grpc_testing.StreamStreamChannelRpc): + + def __init__(self, rpc_state): + self._rpc_state = rpc_state + + def send_initial_metadata(self, initial_metadata): + self._rpc_state.send_initial_metadata(initial_metadata) + + def take_request(self): + return self._rpc_state.take_request() + + def send_response(self, response): + self._rpc_state.send_response(response) + + def requests_closed(self): + return self._rpc_state.requests_closed() + + def cancelled(self): + self._rpc_state.cancelled() + + def terminate(self, trailing_metadata, code, details): + self._rpc_state.terminate(trailing_metadata, code, details) + + +def unary_unary(channel_state, method_descriptor): + rpc_state = channel_state.take_rpc_state(method_descriptor) + invocation_metadata, request = ( + rpc_state.take_invocation_metadata_and_request()) + return invocation_metadata, request, _UnaryUnary(rpc_state) + + +def unary_stream(channel_state, method_descriptor): + rpc_state = channel_state.take_rpc_state(method_descriptor) + invocation_metadata, request = ( + rpc_state.take_invocation_metadata_and_request()) + return invocation_metadata, request, _UnaryStream(rpc_state) + + +def stream_unary(channel_state, method_descriptor): + rpc_state = channel_state.take_rpc_state(method_descriptor) + return rpc_state.take_invocation_metadata(), _StreamUnary(rpc_state) + + +def stream_stream(channel_state, method_descriptor): + rpc_state = channel_state.take_rpc_state(method_descriptor) + return rpc_state.take_invocation_metadata(), _StreamStream(rpc_state) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py new file mode 100644 index 0000000000..569c41d79d --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py @@ -0,0 +1,48 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import threading + +from grpc_testing import _common +from grpc_testing._channel import _rpc_state + + +class State(_common.ChannelHandler): + + def __init__(self): + self._condition = threading.Condition() + self._rpc_states = collections.defaultdict(list) + + def invoke_rpc( + self, method_full_rpc_name, invocation_metadata, requests, + requests_closed, timeout): + rpc_state = _rpc_state.State( + invocation_metadata, requests, requests_closed) + with self._condition: + self._rpc_states[method_full_rpc_name].append(rpc_state) + self._condition.notify_all() + return rpc_state + + def take_rpc_state(self, method_descriptor): + method_full_rpc_name = '/{}/{}'.format( + method_descriptor.containing_service.full_name, + method_descriptor.name) + with self._condition: + while True: + method_rpc_states = self._rpc_states[method_full_rpc_name] + if method_rpc_states: + return method_rpc_states.pop(0) + else: + self._condition.wait() diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py new file mode 100644 index 0000000000..ebce652eeb --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py @@ -0,0 +1,322 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading + +import grpc + +_NOT_YET_OBSERVED = object() + + +def _cancel(handler): + return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!') + + +def _is_active(handler): + return handler.is_active() + + +def _time_remaining(unused_handler): + raise NotImplementedError() + + +def _add_callback(handler, callback): + return handler.add_callback(callback) + + +def _initial_metadata(handler): + return handler.initial_metadata() + + +def _trailing_metadata(handler): + trailing_metadata, unused_code, unused_details = handler.termination() + return trailing_metadata + + +def _code(handler): + unused_trailing_metadata, code, unused_details = handler.termination() + return code + + +def _details(handler): + unused_trailing_metadata, unused_code, details = handler.termination() + return details + + +class _Call(grpc.Call): + + def __init__(self, handler): + self._handler = handler + + def cancel(self): + _cancel(self._handler) + + def is_active(self): + return _is_active(self._handler) + + def time_remaining(self): + return _time_remaining(self._handler) + + def add_callback(self, callback): + return _add_callback(self._handler, callback) + + def initial_metadata(self): + return _initial_metadata(self._handler) + + def trailing_metadata(self): + return _trailing_metadata(self._handler) + + def code(self): + return _code(self._handler) + + def details(self): + return _details(self._handler) + + +class _RpcErrorCall(grpc.RpcError, grpc.Call): + + def __init__(self, handler): + self._handler = handler + + def cancel(self): + _cancel(self._handler) + + def is_active(self): + return _is_active(self._handler) + + def time_remaining(self): + return _time_remaining(self._handler) + + def add_callback(self, callback): + return _add_callback(self._handler, callback) + + def initial_metadata(self): + return _initial_metadata(self._handler) + + def trailing_metadata(self): + return _trailing_metadata(self._handler) + + def code(self): + return _code(self._handler) + + def details(self): + return _details(self._handler) + + +def _next(handler): + read = handler.take_response() + if read.code is None: + return read.response + elif read.code is grpc.StatusCode.OK: + raise StopIteration() + else: + raise _RpcErrorCall(handler) + + +class _HandlerExtras(object): + + def __init__(self): + self.condition = threading.Condition() + self.unary_response = _NOT_YET_OBSERVED + self.cancelled = False + + +def _with_extras_cancel(handler, extras): + with extras.condition: + if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'): + extras.cancelled = True + return True + else: + return False + + +def _extras_without_cancelled(extras): + with extras.condition: + return extras.cancelled + + +def _running(handler): + return handler.is_active() + + +def _done(handler): + return not handler.is_active() + + +def _with_extras_unary_response(handler, extras): + with extras.condition: + if extras.unary_response is _NOT_YET_OBSERVED: + read = handler.take_response() + if read.code is None: + extras.unary_response = read.response + return read.response + else: + raise _RpcErrorCall(handler) + else: + return extras.unary_response + + +def _exception(unused_handler): + raise NotImplementedError('TODO!') + + +def _traceback(unused_handler): + raise NotImplementedError('TODO!') + + +def _add_done_callback(handler, callback, future): + adapted_callback = lambda: callback(future) + if not handler.add_callback(adapted_callback): + callback(future) + + +class _FutureCall(grpc.Future, grpc.Call): + + def __init__(self, handler, extras): + self._handler = handler + self._extras = extras + + def cancel(self): + return _with_extras_cancel(self._handler, self._extras) + + def cancelled(self): + return _extras_without_cancelled(self._extras) + + def running(self): + return _running(self._handler) + + def done(self): + return _done(self._handler) + + def result(self): + return _with_extras_unary_response(self._handler, self._extras) + + def exception(self): + return _exception(self._handler) + + def traceback(self): + return _traceback(self._handler) + + def add_done_callback(self, fn): + _add_done_callback(self._handler, fn, self) + + def is_active(self): + return _is_active(self._handler) + + def time_remaining(self): + return _time_remaining(self._handler) + + def add_callback(self, callback): + return _add_callback(self._handler, callback) + + def initial_metadata(self): + return _initial_metadata(self._handler) + + def trailing_metadata(self): + return _trailing_metadata(self._handler) + + def code(self): + return _code(self._handler) + + def details(self): + return _details(self._handler) + + +def consume_requests(request_iterator, handler): + + def _consume(): + while True: + try: + request = next(request_iterator) + added = handler.add_request(request) + if not added: + break + except StopIteration: + handler.close_requests() + break + except Exception: # pylint: disable=broad-except + details = 'Exception iterating requests!' + logging.exception(details) + handler.cancel(grpc.StatusCode.UNKNOWN, details) + + consumption = threading.Thread(target=_consume) + consumption.start() + + +def blocking_unary_response(handler): + read = handler.take_response() + if read.code is None: + unused_trailing_metadata, code, unused_details = handler.termination() + if code is grpc.StatusCode.OK: + return read.response + else: + raise _RpcErrorCall(handler) + else: + raise _RpcErrorCall(handler) + + +def blocking_unary_response_with_call(handler): + read = handler.take_response() + if read.code is None: + unused_trailing_metadata, code, unused_details = handler.termination() + if code is grpc.StatusCode.OK: + return read.response, _Call(handler) + else: + raise _RpcErrorCall(handler) + else: + raise _RpcErrorCall(handler) + + +def future_call(handler): + return _FutureCall(handler, _HandlerExtras()) + + +class ResponseIteratorCall(grpc.Call): + + def __init__(self, handler): + self._handler = handler + + def __iter__(self): + return self + + def __next__(self): + return _next(self._handler) + + def next(self): + return _next(self._handler) + + def cancel(self): + _cancel(self._handler) + + def is_active(self): + return _is_active(self._handler) + + def time_remaining(self): + return _time_remaining(self._handler) + + def add_callback(self, callback): + return _add_callback(self._handler, callback) + + def initial_metadata(self): + return _initial_metadata(self._handler) + + def trailing_metadata(self): + return _trailing_metadata(self._handler) + + def code(self): + return _code(self._handler) + + def details(self): + return _details(self._handler) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py b/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py new file mode 100644 index 0000000000..fe69257f5b --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py @@ -0,0 +1,115 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +from grpc_testing import _common +from grpc_testing._channel import _invocation + +# All per-call credentials parameters are unused by this test infrastructure. +# pylint: disable=unused-argument +class UnaryUnary(grpc.UnaryUnaryMultiCallable): + + def __init__(self, method_full_rpc_name, channel_handler): + self._method_full_rpc_name = method_full_rpc_name + self._channel_handler = channel_handler + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) + return _invocation.blocking_unary_response(rpc_handler) + + def with_call(self, request, timeout=None, metadata=None, credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) + return _invocation.blocking_unary_response_with_call(rpc_handler) + + def future(self, request, timeout=None, metadata=None, credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) + return _invocation.future_call(rpc_handler) + + +class UnaryStream(grpc.StreamStreamMultiCallable): + + def __init__(self, method_full_rpc_name, channel_handler): + self._method_full_rpc_name = method_full_rpc_name + self._channel_handler = channel_handler + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [request], True, timeout) + return _invocation.ResponseIteratorCall(rpc_handler) + + +class StreamUnary(grpc.StreamUnaryMultiCallable): + + def __init__(self, method_full_rpc_name, channel_handler): + self._method_full_rpc_name = method_full_rpc_name + self._channel_handler = channel_handler + + def __call__(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [], False, timeout) + _invocation.consume_requests(request_iterator, rpc_handler) + return _invocation.blocking_unary_response(rpc_handler) + + def with_call(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [], False, timeout) + _invocation.consume_requests(request_iterator, rpc_handler) + return _invocation.blocking_unary_response_with_call(rpc_handler) + + def future(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [], False, timeout) + _invocation.consume_requests(request_iterator, rpc_handler) + return _invocation.future_call(rpc_handler) + + +class StreamStream(grpc.StreamStreamMultiCallable): + + def __init__(self, method_full_rpc_name, channel_handler): + self._method_full_rpc_name = method_full_rpc_name + self._channel_handler = channel_handler + + def __call__(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + rpc_handler = self._channel_handler.invoke_rpc( + self._method_full_rpc_name, + _common.fuss_with_metadata(metadata), [], False, timeout) + _invocation.consume_requests(request_iterator, rpc_handler) + return _invocation.ResponseIteratorCall(rpc_handler) +# pylint: enable=unused-argument diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py b/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py new file mode 100644 index 0000000000..e1fa49a2a8 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py @@ -0,0 +1,193 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading + +import grpc +from grpc_testing import _common + + +class State(_common.ChannelRpcHandler): + + def __init__(self, invocation_metadata, requests, requests_closed): + self._condition = threading.Condition() + self._invocation_metadata = invocation_metadata + self._requests = requests + self._requests_closed = requests_closed + self._initial_metadata = None + self._responses = [] + self._trailing_metadata = None + self._code = None + self._details = None + + def initial_metadata(self): + with self._condition: + while True: + if self._initial_metadata is None: + if self._code is None: + self._condition.wait() + else: + return _common.FUSSED_EMPTY_METADATA + else: + return self._initial_metadata + + def add_request(self, request): + with self._condition: + if self._code is None and not self._requests_closed: + self._requests.append(request) + self._condition.notify_all() + return True + else: + return False + + def close_requests(self): + with self._condition: + if self._code is None and not self._requests_closed: + self._requests_closed = True + self._condition.notify_all() + + def take_response(self): + with self._condition: + while True: + if self._code is grpc.StatusCode.OK: + if self._responses: + response = self._responses.pop(0) + return _common.ChannelRpcRead( + response, None, None, None) + else: + return _common.ChannelRpcRead( + None, self._trailing_metadata, + grpc.StatusCode.OK, self._details) + elif self._code is None: + if self._responses: + response = self._responses.pop(0) + return _common.ChannelRpcRead( + response, None, None, None) + else: + self._condition.wait() + else: + return _common.ChannelRpcRead( + None, self._trailing_metadata, self._code, + self._details) + + def termination(self): + with self._condition: + while True: + if self._code is None: + self._condition.wait() + else: + return self._trailing_metadata, self._code, self._details + + def cancel(self, code, details): + with self._condition: + if self._code is None: + if self._initial_metadata is None: + self._initial_metadata = _common.FUSSED_EMPTY_METADATA + self._trailing_metadata = _common.FUSSED_EMPTY_METADATA + self._code = code + self._details = details + self._condition.notify_all() + return True + else: + return False + + def take_invocation_metadata(self): + with self._condition: + if self._invocation_metadata is None: + raise ValueError('Expected invocation metadata!') + else: + invocation_metadata = self._invocation_metadata + self._invocation_metadata = None + return invocation_metadata + + def take_invocation_metadata_and_request(self): + with self._condition: + if self._invocation_metadata is None: + raise ValueError('Expected invocation metadata!') + elif not self._requests: + raise ValueError('Expected at least one request!') + else: + invocation_metadata = self._invocation_metadata + self._invocation_metadata = None + return invocation_metadata, self._requests.pop(0) + + def send_initial_metadata(self, initial_metadata): + with self._condition: + self._initial_metadata = _common.fuss_with_metadata( + initial_metadata) + self._condition.notify_all() + + def take_request(self): + with self._condition: + while True: + if self._requests: + return self._requests.pop(0) + else: + self._condition.wait() + + def requests_closed(self): + with self._condition: + while True: + if self._requests_closed: + return + else: + self._condition.wait() + + def send_response(self, response): + with self._condition: + if self._code is None: + self._responses.append(response) + self._condition.notify_all() + + def terminate_with_response( + self, response, trailing_metadata, code, details): + with self._condition: + if self._initial_metadata is None: + self._initial_metadata = _common.FUSSED_EMPTY_METADATA + self._responses.append(response) + self._trailing_metadata = _common.fuss_with_metadata( + trailing_metadata) + self._code = code + self._details = details + self._condition.notify_all() + + def terminate(self, trailing_metadata, code, details): + with self._condition: + if self._initial_metadata is None: + self._initial_metadata = _common.FUSSED_EMPTY_METADATA + self._trailing_metadata = _common.fuss_with_metadata( + trailing_metadata) + self._code = code + self._details = details + self._condition.notify_all() + + def cancelled(self): + with self._condition: + while True: + if self._code is grpc.StatusCode.CANCELLED: + return + elif self._code is None: + self._condition.wait() + else: + raise ValueError( + 'Status code unexpectedly {}!'.format(self._code)) + + def is_active(self): + raise NotImplementedError() + + def time_remaining(self): + raise NotImplementedError() + + def add_callback(self, callback): + raise NotImplementedError() diff --git a/src/python/grpcio_testing/grpc_testing/_common.py b/src/python/grpcio_testing/grpc_testing/_common.py new file mode 100644 index 0000000000..cb4a7f5fa2 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_common.py @@ -0,0 +1,92 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Common interfaces and implementation.""" + +import abc +import collections + +import six + + +def _fuss(tuplified_metadata): + return tuplified_metadata + ( + ( + 'grpc.metadata_added_by_runtime', + 'gRPC is allowed to add metadata in transmission and does so.', + ), + ) + +FUSSED_EMPTY_METADATA = _fuss(()) + + +def fuss_with_metadata(metadata): + if metadata is None: + return FUSSED_EMPTY_METADATA + else: + return _fuss(tuple(metadata)) + + +class ChannelRpcRead( + collections.namedtuple( + 'ChannelRpcRead', + ('response', 'trailing_metadata', 'code', 'details',))): + pass + + +class ChannelRpcHandler(six.with_metaclass(abc.ABCMeta)): + + @abc.abstractmethod + def initial_metadata(self): + raise NotImplementedError() + + @abc.abstractmethod + def add_request(self, request): + raise NotImplementedError() + + @abc.abstractmethod + def close_requests(self): + raise NotImplementedError() + + @abc.abstractmethod + def take_response(self): + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self, code, details): + raise NotImplementedError() + + @abc.abstractmethod + def termination(self): + raise NotImplementedError() + + @abc.abstractmethod + def is_active(self): + raise NotImplementedError() + + @abc.abstractmethod + def time_remaining(self): + raise NotImplementedError() + + @abc.abstractmethod + def add_callback(self, callback): + raise NotImplementedError() + + +class ChannelHandler(six.with_metaclass(abc.ABCMeta)): + + @abc.abstractmethod + def invoke_rpc( + self, method_full_rpc_name, invocation_metadata, requests, + requests_closed, timeout): + raise NotImplementedError() diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index adc909ccdc..debe14c40e 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -68,6 +68,10 @@ PACKAGE_DATA = { 'tests.protoc_plugin.protos.invocation_testing.split_services': [ 'services.proto', ], + 'tests.testing.proto': [ + 'requests.proto', + 'services.proto', + ], 'tests.unit': [ 'credentials/ca.pem', 'credentials/server1.key', diff --git a/src/python/grpcio_tests/tests/testing/_application_common.py b/src/python/grpcio_tests/tests/testing/_application_common.py new file mode 100644 index 0000000000..4e98879607 --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/_application_common.py @@ -0,0 +1,36 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example gRPC Python-using application's common code elements.""" + +from tests.testing.proto import requests_pb2 +from tests.testing.proto import services_pb2 + +SERVICE_NAME = 'tests_of_grpc_testing.FirstService' +UNARY_UNARY_METHOD_NAME = 'UnUn' +UNARY_STREAM_METHOD_NAME = 'UnStre' +STREAM_UNARY_METHOD_NAME = 'StreUn' +STREAM_STREAM_METHOD_NAME = 'StreStre' + +UNARY_UNARY_REQUEST = requests_pb2.Up(first_up_field=2) +ERRONEOUS_UNARY_UNARY_REQUEST = requests_pb2.Up(first_up_field=3) +UNARY_UNARY_RESPONSE = services_pb2.Down(first_down_field=5) +ERRONEOUS_UNARY_UNARY_RESPONSE = services_pb2.Down(first_down_field=7) +UNARY_STREAM_REQUEST = requests_pb2.Charm(first_charm_field=11) +STREAM_UNARY_REQUEST = requests_pb2.Charm(first_charm_field=13) +STREAM_UNARY_RESPONSE = services_pb2.Strange(first_strange_field=17) +STREAM_STREAM_REQUEST = requests_pb2.Top(first_top_field=19) +STREAM_STREAM_RESPONSE = services_pb2.Bottom(first_bottom_field=23) +TWO_STREAM_STREAM_RESPONSES = (STREAM_STREAM_RESPONSE,) * 2 + +INFINITE_REQUEST_STREAM_TIMEOUT = 0.2 diff --git a/src/python/grpcio_tests/tests/testing/_application_testing_common.py b/src/python/grpcio_tests/tests/testing/_application_testing_common.py new file mode 100644 index 0000000000..9c9e485a78 --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/_application_testing_common.py @@ -0,0 +1,33 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc_testing + +from tests.testing.proto import requests_pb2 +from tests.testing.proto import services_pb2 + +# TODO(https://github.com/grpc/grpc/issues/11657): Eliminate this entirely. +# TODO(https://github.com/google/protobuf/issues/3452): Eliminate this if/else. +if services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None: + FIRST_SERVICE = 'Fix protobuf issue 3452!' + FIRST_SERVICE_UNUN = 'Fix protobuf issue 3452!' + FIRST_SERVICE_UNSTRE = 'Fix protobuf issue 3452!' + FIRST_SERVICE_STREUN = 'Fix protobuf issue 3452!' + FIRST_SERVICE_STRESTRE = 'Fix protobuf issue 3452!' +else: + FIRST_SERVICE = services_pb2.DESCRIPTOR.services_by_name['FirstService'] + FIRST_SERVICE_UNUN = FIRST_SERVICE.methods_by_name['UnUn'] + FIRST_SERVICE_UNSTRE = FIRST_SERVICE.methods_by_name['UnStre'] + FIRST_SERVICE_STREUN = FIRST_SERVICE.methods_by_name['StreUn'] + FIRST_SERVICE_STRESTRE = FIRST_SERVICE.methods_by_name['StreStre'] diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py new file mode 100644 index 0000000000..aff32fb4dc --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/_client_application.py @@ -0,0 +1,260 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example gRPC Python-using client-side application.""" + +import collections +import enum +import threading +import time + +import grpc +from tests.unit.framework.common import test_constants + +from tests.testing.proto import requests_pb2 +from tests.testing.proto import services_pb2 +from tests.testing.proto import services_pb2_grpc + +from tests.testing import _application_common + + +@enum.unique +class Scenario(enum.Enum): + UNARY_UNARY = 'unary unary' + UNARY_STREAM = 'unary stream' + STREAM_UNARY = 'stream unary' + STREAM_STREAM = 'stream stream' + CONCURRENT_STREAM_UNARY = 'concurrent stream unary' + CONCURRENT_STREAM_STREAM = 'concurrent stream stream' + CANCEL_UNARY_UNARY = 'cancel unary unary' + CANCEL_UNARY_STREAM = 'cancel unary stream' + INFINITE_REQUEST_STREAM = 'infinite request stream' + + +class Outcome(collections.namedtuple('Outcome', ('kind', 'code', 'details'))): + """Outcome of a client application scenario. + + Attributes: + kind: A Kind value describing the overall kind of scenario execution. + code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR. + details: A status details string. Only valid if kind is Kind.RPC_ERROR. + """ + + @enum.unique + class Kind(enum.Enum): + SATISFACTORY = 'satisfactory' + UNSATISFACTORY = 'unsatisfactory' + RPC_ERROR = 'rpc error' + + +_SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None) +_UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None) + + +class _Pipe(object): + + def __init__(self): + self._condition = threading.Condition() + self._values = [] + self._open = True + + def __iter__(self): + return self + + def _next(self): + with self._condition: + while True: + if self._values: + return self._values.pop(0) + elif not self._open: + raise StopIteration() + else: + self._condition.wait() + + def __next__(self): # (Python 3 Iterator Protocol) + return self._next() + + def next(self): # (Python 2 Iterator Protocol) + return self._next() + + def add(self, value): + with self._condition: + self._values.append(value) + self._condition.notify_all() + + def close(self): + with self._condition: + self._open = False + self._condition.notify_all() + + +def _run_unary_unary(stub): + response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST) + if _application_common.UNARY_UNARY_RESPONSE == response: + return _SATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + + +def _run_unary_stream(stub): + response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST) + try: + next(response_iterator) + except StopIteration: + return _SATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + + +def _run_stream_unary(stub): + response, call = stub.StreUn.with_call( + iter((_application_common.STREAM_UNARY_REQUEST,) * 3)) + if (_application_common.STREAM_UNARY_RESPONSE == response and + call.code() is grpc.StatusCode.OK): + return _SATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + + +def _run_stream_stream(stub): + request_pipe = _Pipe() + response_iterator = stub.StreStre(iter(request_pipe)) + request_pipe.add(_application_common.STREAM_STREAM_REQUEST) + first_responses = next(response_iterator), next(response_iterator), + request_pipe.add(_application_common.STREAM_STREAM_REQUEST) + second_responses = next(response_iterator), next(response_iterator), + request_pipe.close() + try: + next(response_iterator) + except StopIteration: + unexpected_extra_response = False + else: + unexpected_extra_response = True + if (first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES and + second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES + and not unexpected_extra_response): + return _SATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + + +def _run_concurrent_stream_unary(stub): + future_calls = tuple( + stub.StreUn.future( + iter((_application_common.STREAM_UNARY_REQUEST,) * 3)) + for _ in range(test_constants.THREAD_CONCURRENCY)) + for future_call in future_calls: + if future_call.code() is grpc.StatusCode.OK: + response = future_call.result() + if _application_common.STREAM_UNARY_RESPONSE != response: + return _UNSATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + else: + return _SATISFACTORY_OUTCOME + + +def _run_concurrent_stream_stream(stub): + condition = threading.Condition() + outcomes = [None] * test_constants.RPC_CONCURRENCY + + def run_stream_stream(index): + outcome = _run_stream_stream(stub) + with condition: + outcomes[index] = outcome + condition.notify() + + for index in range(test_constants.RPC_CONCURRENCY): + thread = threading.Thread(target=run_stream_stream, args=(index,)) + thread.start() + with condition: + while True: + if all(outcomes): + for outcome in outcomes: + if outcome.kind is not Outcome.Kind.SATISFACTORY: + return _UNSATISFACTORY_OUTCOME + else: + return _SATISFACTORY_OUTCOME + else: + condition.wait() + + +def _run_cancel_unary_unary(stub): + response_future_call = stub.UnUn.future( + _application_common.UNARY_UNARY_REQUEST) + initial_metadata = response_future_call.initial_metadata() + cancelled = response_future_call.cancel() + if initial_metadata is not None and cancelled: + return _SATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + + +def _run_infinite_request_stream(stub): + + def infinite_request_iterator(): + while True: + yield _application_common.STREAM_UNARY_REQUEST + + response_future_call = stub.StreUn.future( + infinite_request_iterator(), + timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT) + if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED: + return _SATISFACTORY_OUTCOME + else: + return _UNSATISFACTORY_OUTCOME + + +def run(scenario, channel): + stub = services_pb2_grpc.FirstServiceStub(channel) + try: + if scenario is Scenario.UNARY_UNARY: + return _run_unary_unary(stub) + elif scenario is Scenario.UNARY_STREAM: + return _run_unary_stream(stub) + elif scenario is Scenario.STREAM_UNARY: + return _run_stream_unary(stub) + elif scenario is Scenario.STREAM_STREAM: + return _run_stream_stream(stub) + elif scenario is Scenario.CONCURRENT_STREAM_UNARY: + return _run_concurrent_stream_unary(stub) + elif scenario is Scenario.CONCURRENT_STREAM_STREAM: + return _run_concurrent_stream_stream(stub) + elif scenario is Scenario.CANCEL_UNARY_UNARY: + return _run_cancel_unary_unary(stub) + elif scenario is Scenario.INFINITE_REQUEST_STREAM: + return _run_infinite_request_stream(stub) + except grpc.RpcError as rpc_error: + return Outcome(Outcome.Kind.RPC_ERROR, + rpc_error.code(), rpc_error.details()) + + +_IMPLEMENTATIONS = { + Scenario.UNARY_UNARY: _run_unary_unary, + Scenario.UNARY_STREAM: _run_unary_stream, + Scenario.STREAM_UNARY: _run_stream_unary, + Scenario.STREAM_STREAM: _run_stream_stream, + Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary, + Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream, + Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary, + Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream, +} + + +def run(scenario, channel): + stub = services_pb2_grpc.FirstServiceStub(channel) + try: + return _IMPLEMENTATIONS[scenario](stub) + except grpc.RpcError as rpc_error: + return Outcome(Outcome.Kind.RPC_ERROR, + rpc_error.code(), rpc_error.details()) diff --git a/src/python/grpcio_tests/tests/testing/_client_test.py b/src/python/grpcio_tests/tests/testing/_client_test.py new file mode 100644 index 0000000000..172f386d7b --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/_client_test.py @@ -0,0 +1,306 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent import futures +import time +import unittest + +import grpc +from grpc.framework.foundation import logging_pool +from tests.unit.framework.common import test_constants +import grpc_testing + +from tests.testing import _application_common +from tests.testing import _application_testing_common +from tests.testing import _client_application +from tests.testing.proto import requests_pb2 +from tests.testing.proto import services_pb2 + + +# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip. +@unittest.skipIf( + services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None, + 'Fix protobuf issue 3452!') +class ClientTest(unittest.TestCase): + + def setUp(self): + # In this test the client-side application under test executes in + # a separate thread while we retain use of the test thread to "play + # server". + self._client_execution_thread_pool = logging_pool.pool(1) + + self._fake_time = grpc_testing.strict_fake_time(time.time()) + self._real_time = grpc_testing.strict_real_time() + self._fake_time_channel = grpc_testing.channel( + services_pb2.DESCRIPTOR.services_by_name.values(), self._fake_time) + self._real_time_channel = grpc_testing.channel( + services_pb2.DESCRIPTOR.services_by_name.values(), self._real_time) + + def tearDown(self): + self._client_execution_thread_pool.shutdown(wait=True) + + def test_successful_unary_unary(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.UNARY_UNARY, + self._real_time_channel) + invocation_metadata, request, rpc = ( + self._real_time_channel.take_unary_unary( + _application_testing_common.FIRST_SERVICE_UNUN)) + rpc.send_initial_metadata(()) + rpc.terminate(_application_common.UNARY_UNARY_RESPONSE, (), + grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + def test_successful_unary_stream(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.UNARY_STREAM, + self._fake_time_channel) + invocation_metadata, request, rpc = ( + self._fake_time_channel.take_unary_stream( + _application_testing_common.FIRST_SERVICE_UNSTRE)) + rpc.send_initial_metadata(()) + rpc.terminate((), grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.UNARY_STREAM_REQUEST, request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + def test_successful_stream_unary(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.STREAM_UNARY, + self._real_time_channel) + invocation_metadata, rpc = self._real_time_channel.take_stream_unary( + _application_testing_common.FIRST_SERVICE_STREUN) + rpc.send_initial_metadata(()) + first_request = rpc.take_request() + second_request = rpc.take_request() + third_request = rpc.take_request() + rpc.requests_closed() + rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (), + grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.STREAM_UNARY_REQUEST, + first_request) + self.assertEqual(_application_common.STREAM_UNARY_REQUEST, + second_request) + self.assertEqual(_application_common.STREAM_UNARY_REQUEST, + third_request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + def test_successful_stream_stream(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.STREAM_STREAM, + self._fake_time_channel) + invocation_metadata, rpc = self._fake_time_channel.take_stream_stream( + _application_testing_common.FIRST_SERVICE_STRESTRE) + first_request = rpc.take_request() + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + second_request = rpc.take_request() + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.requests_closed() + rpc.terminate((), grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + first_request) + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + second_request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + def test_concurrent_stream_stream(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, + _client_application.Scenario.CONCURRENT_STREAM_STREAM, + self._real_time_channel) + rpcs = [] + for _ in range(test_constants.RPC_CONCURRENCY): + invocation_metadata, rpc = ( + self._real_time_channel.take_stream_stream( + _application_testing_common.FIRST_SERVICE_STRESTRE)) + rpcs.append(rpc) + requests = {} + for rpc in rpcs: + requests[rpc] = [rpc.take_request()] + for rpc in rpcs: + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + for rpc in rpcs: + requests[rpc].append(rpc.take_request()) + for rpc in rpcs: + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + for rpc in rpcs: + rpc.requests_closed() + for rpc in rpcs: + rpc.terminate((), grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + for requests_of_one_rpc in requests.values(): + for request in requests_of_one_rpc: + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + def test_cancelled_unary_unary(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, + _client_application.Scenario.CANCEL_UNARY_UNARY, + self._fake_time_channel) + invocation_metadata, request, rpc = ( + self._fake_time_channel.take_unary_unary( + _application_testing_common.FIRST_SERVICE_UNUN)) + rpc.send_initial_metadata(()) + rpc.cancelled() + application_return_value = application_future.result() + + self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + def test_status_stream_unary(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, + _client_application.Scenario.CONCURRENT_STREAM_UNARY, + self._fake_time_channel) + rpcs = tuple( + self._fake_time_channel.take_stream_unary( + _application_testing_common.FIRST_SERVICE_STREUN)[1] + for _ in range(test_constants.THREAD_CONCURRENCY)) + for rpc in rpcs: + rpc.take_request() + rpc.take_request() + rpc.take_request() + rpc.requests_closed() + rpc.send_initial_metadata(( + ('my_metadata_key', 'My Metadata Value!',),)) + for rpc in rpcs[:-1]: + rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (), + grpc.StatusCode.OK, '') + rpcs[-1].terminate(_application_common.STREAM_UNARY_RESPONSE, (), + grpc.StatusCode.RESOURCE_EXHAUSTED, + 'nope; not able to handle all those RPCs!') + application_return_value = application_future.result() + + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.UNSATISFACTORY) + + def test_status_stream_stream(self): + code = grpc.StatusCode.DEADLINE_EXCEEDED + details = 'test deadline exceeded!' + + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.STREAM_STREAM, + self._real_time_channel) + invocation_metadata, rpc = self._real_time_channel.take_stream_stream( + _application_testing_common.FIRST_SERVICE_STRESTRE) + first_request = rpc.take_request() + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + second_request = rpc.take_request() + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.requests_closed() + rpc.terminate((), code, details) + application_return_value = application_future.result() + + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + first_request) + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + second_request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.RPC_ERROR) + self.assertIs(application_return_value.code, code) + self.assertEqual(application_return_value.details, details) + + def test_misbehaving_server_unary_unary(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.UNARY_UNARY, + self._fake_time_channel) + invocation_metadata, request, rpc = ( + self._fake_time_channel.take_unary_unary( + _application_testing_common.FIRST_SERVICE_UNUN)) + rpc.send_initial_metadata(()) + rpc.terminate(_application_common.ERRONEOUS_UNARY_UNARY_RESPONSE, (), + grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.UNSATISFACTORY) + + def test_misbehaving_server_stream_stream(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, _client_application.Scenario.STREAM_STREAM, + self._real_time_channel) + invocation_metadata, rpc = self._real_time_channel.take_stream_stream( + _application_testing_common.FIRST_SERVICE_STRESTRE) + first_request = rpc.take_request() + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + second_request = rpc.take_request() + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.send_response(_application_common.STREAM_STREAM_RESPONSE) + rpc.requests_closed() + rpc.terminate((), grpc.StatusCode.OK, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + first_request) + self.assertEqual(_application_common.STREAM_STREAM_REQUEST, + second_request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.UNSATISFACTORY) + + def test_infinite_request_stream_real_time(self): + application_future = self._client_execution_thread_pool.submit( + _client_application.run, + _client_application.Scenario.INFINITE_REQUEST_STREAM, + self._real_time_channel) + invocation_metadata, rpc = self._real_time_channel.take_stream_unary( + _application_testing_common.FIRST_SERVICE_STREUN) + rpc.send_initial_metadata(()) + first_request = rpc.take_request() + second_request = rpc.take_request() + third_request = rpc.take_request() + self._real_time.sleep_for( + _application_common.INFINITE_REQUEST_STREAM_TIMEOUT) + rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (), + grpc.StatusCode.DEADLINE_EXCEEDED, '') + application_return_value = application_future.result() + + self.assertEqual(_application_common.STREAM_UNARY_REQUEST, + first_request) + self.assertEqual(_application_common.STREAM_UNARY_REQUEST, + second_request) + self.assertEqual(_application_common.STREAM_UNARY_REQUEST, + third_request) + self.assertIs(application_return_value.kind, + _client_application.Outcome.Kind.SATISFACTORY) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/testing/proto/__init__.py b/src/python/grpcio_tests/tests/testing/proto/__init__.py new file mode 100644 index 0000000000..1e120359cf --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/proto/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests/testing/proto/requests.proto b/src/python/grpcio_tests/tests/testing/proto/requests.proto new file mode 100644 index 0000000000..54a60bff86 --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/proto/requests.proto @@ -0,0 +1,29 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package tests_of_grpc_testing; + +message Up { + int32 first_up_field = 1; +} + +message Charm { + int32 first_charm_field = 1; +} + +message Top { + int32 first_top_field = 1; +} diff --git a/src/python/grpcio_tests/tests/testing/proto/services.proto b/src/python/grpcio_tests/tests/testing/proto/services.proto new file mode 100644 index 0000000000..cb15c0d1ce --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/proto/services.proto @@ -0,0 +1,42 @@ +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +import "tests/testing/proto/requests.proto"; + +package tests_of_grpc_testing; + +message Down { + int32 first_down_field = 1; +} + +message Strange { + int32 first_strange_field = 1; +} + +message Bottom { + int32 first_bottom_field = 1; +} + +service FirstService { + rpc UnUn(Up) returns (Down); + rpc UnStre(Charm) returns (stream Strange); + rpc StreUn(stream Charm) returns (Strange); + rpc StreStre(stream Top) returns (stream Bottom); +} + +service SecondService { + rpc UnStre(Strange) returns (stream Charm); +} diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index f86eeb76c7..c10719b86f 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -9,6 +9,7 @@ "protoc_plugin._split_definitions_test.SplitSeparateTest", "protoc_plugin.beta_python_plugin_test.PythonPluginTest", "reflection._reflection_servicer_test.ReflectionServicerTest", + "testing._client_test.ClientTest", "testing._time_test.StrictFakeTimeTest", "testing._time_test.StrictRealTimeTest", "unit._api_test.AllTest", -- cgit v1.2.3