aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2017-07-28 01:24:52 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2017-08-03 22:38:40 +0000
commit2010985ab269c8df0443e4f3782cbdffb083e9d4 (patch)
tree06a2a9a8a3f061542eeeac42edf2089b78c179be
parent69b7231776dc42c87abad33430c66e7b302bf00c (diff)
gRPC Python test infrastructure
(The channel-related second part of it.)
-rw-r--r--.pylintrc3
-rw-r--r--src/python/grpcio_testing/grpc_testing/__init__.py289
-rw-r--r--src/python/grpcio_testing/grpc_testing/_channel/__init__.py23
-rw-r--r--src/python/grpcio_testing/grpc_testing/_channel/_channel.py62
-rw-r--r--src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py119
-rw-r--r--src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py48
-rw-r--r--src/python/grpcio_testing/grpc_testing/_channel/_invocation.py322
-rw-r--r--src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py115
-rw-r--r--src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py193
-rw-r--r--src/python/grpcio_testing/grpc_testing/_common.py92
-rw-r--r--src/python/grpcio_tests/setup.py4
-rw-r--r--src/python/grpcio_tests/tests/testing/_application_common.py36
-rw-r--r--src/python/grpcio_tests/tests/testing/_application_testing_common.py33
-rw-r--r--src/python/grpcio_tests/tests/testing/_client_application.py260
-rw-r--r--src/python/grpcio_tests/tests/testing/_client_test.py306
-rw-r--r--src/python/grpcio_tests/tests/testing/proto/__init__.py13
-rw-r--r--src/python/grpcio_tests/tests/testing/proto/requests.proto29
-rw-r--r--src/python/grpcio_tests/tests/testing/proto/services.proto42
-rw-r--r--src/python/grpcio_tests/tests/tests.json1
19 files changed, 1990 insertions, 0 deletions
diff --git a/.pylintrc b/.pylintrc
index 05b4e685fb..453b45aab5 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -38,6 +38,9 @@ disable=
# TODO(https://github.com/grpc/grpc/issues/261): This doesn't seem to
# work for now? Try with a later pylint?
locally-disabled,
+ # NOTE(nathaniel): What even is this? *Enabling* an inspection results
+ # in a warning? How does that encourage more analysis and coverage?
+ locally-enabled,
# NOTE(nathaniel): We don't write doc strings for most private code
# elements.
missing-docstring,
diff --git a/src/python/grpcio_testing/grpc_testing/__init__.py b/src/python/grpcio_testing/grpc_testing/__init__.py
index c5a17f457a..14e25f09e2 100644
--- a/src/python/grpcio_testing/grpc_testing/__init__.py
+++ b/src/python/grpcio_testing/grpc_testing/__init__.py
@@ -15,11 +15,284 @@
import abc
+from google.protobuf import descriptor
import six
import grpc
+class UnaryUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a unary-unary RPC invoked by a system under test.
+
+ Enables users to "play server" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def send_initial_metadata(self, initial_metadata):
+ """Sends the RPC's initial metadata to the system under test.
+
+ Args:
+ initial_metadata: The RPC's initial metadata to be "sent" to
+ the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancelled(self):
+ """Blocks until the system under test has cancelled the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self, response, trailing_metadata, code, details):
+ """Terminates the RPC.
+
+ Args:
+ response: The response for the RPC.
+ trailing_metadata: The RPC's trailing metadata.
+ code: The RPC's status code.
+ details: The RPC's status details.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamChannelRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a unary-stream RPC invoked by a system under test.
+
+ Enables users to "play server" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def send_initial_metadata(self, initial_metadata):
+ """Sends the RPC's initial metadata to the system under test.
+
+ Args:
+ initial_metadata: The RPC's initial metadata to be "sent" to
+ the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_response(self, response):
+ """Sends a response to the system under test.
+
+ Args:
+ response: A response message to be "sent" to the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancelled(self):
+ """Blocks until the system under test has cancelled the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self, trailing_metadata, code, details):
+ """Terminates the RPC.
+
+ Args:
+ trailing_metadata: The RPC's trailing metadata.
+ code: The RPC's status code.
+ details: The RPC's status details.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryChannelRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a stream-unary RPC invoked by a system under test.
+
+ Enables users to "play server" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def send_initial_metadata(self, initial_metadata):
+ """Sends the RPC's initial metadata to the system under test.
+
+ Args:
+ initial_metadata: The RPC's initial metadata to be "sent" to
+ the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_request(self):
+ """Draws one of the requests added to the RPC by the system under test.
+
+ This method blocks until the system under test has added to the RPC
+ the request to be returned.
+
+ Successive calls to this method return requests in the same order in
+ which the system under test added them to the RPC.
+
+ Returns:
+ A request message added to the RPC by the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def requests_closed(self):
+ """Blocks until the system under test has closed the request stream."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancelled(self):
+ """Blocks until the system under test has cancelled the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self, response, trailing_metadata, code, details):
+ """Terminates the RPC.
+
+ Args:
+ response: The response for the RPC.
+ trailing_metadata: The RPC's trailing metadata.
+ code: The RPC's status code.
+ details: The RPC's status details.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamChannelRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a stream-stream RPC invoked by a system under test.
+
+ Enables users to "play server" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def send_initial_metadata(self, initial_metadata):
+ """Sends the RPC's initial metadata to the system under test.
+
+ Args:
+ initial_metadata: The RPC's initial metadata to be "sent" to the
+ system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_request(self):
+ """Draws one of the requests added to the RPC by the system under test.
+
+ This method blocks until the system under test has added to the RPC
+ the request to be returned.
+
+ Successive calls to this method return requests in the same order in
+ which the system under test added them to the RPC.
+
+ Returns:
+ A request message added to the RPC by the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_response(self, response):
+ """Sends a response to the system under test.
+
+ Args:
+ response: A response messages to be "sent" to the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def requests_closed(self):
+ """Blocks until the system under test has closed the request stream."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancelled(self):
+ """Blocks until the system under test has cancelled the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self, trailing_metadata, code, details):
+ """Terminates the RPC.
+
+ Args:
+ trailing_metadata: The RPC's trailing metadata.
+ code: The RPC's status code.
+ details: The RPC's status details.
+ """
+ raise NotImplementedError()
+
+
+class Channel(six.with_metaclass(abc.ABCMeta), grpc.Channel):
+ """A grpc.Channel double with which to test a system that invokes RPCs."""
+
+ @abc.abstractmethod
+ def take_unary_unary(self, method_descriptor):
+ """Draws an RPC currently being made by the system under test.
+
+ If the given descriptor does not identify any RPC currently being made
+ by the system under test, this method blocks until the system under
+ test invokes such an RPC.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a
+ unary-unary RPC method.
+
+ Returns:
+ A (invocation_metadata, request, unary_unary_channel_rpc) tuple of
+ the RPC's invocation metadata, its request, and a
+ UnaryUnaryChannelRpc with which to "play server" for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_unary_stream(self, method_descriptor):
+ """Draws an RPC currently being made by the system under test.
+
+ If the given descriptor does not identify any RPC currently being made
+ by the system under test, this method blocks until the system under
+ test invokes such an RPC.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a
+ unary-stream RPC method.
+
+ Returns:
+ A (invocation_metadata, request, unary_stream_channel_rpc) tuple of
+ the RPC's invocation metadata, its request, and a
+ UnaryStreamChannelRpc with which to "play server" for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_stream_unary(self, method_descriptor):
+ """Draws an RPC currently being made by the system under test.
+
+ If the given descriptor does not identify any RPC currently being made
+ by the system under test, this method blocks until the system under
+ test invokes such an RPC.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a
+ stream-unary RPC method.
+
+ Returns:
+ A (invocation_metadata, stream_unary_channel_rpc) tuple of the RPC's
+ invocation metadata and a StreamUnaryChannelRpc with which to "play
+ server" for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_stream_stream(self, method_descriptor):
+ """Draws an RPC currently being made by the system under test.
+
+ If the given descriptor does not identify any RPC currently being made
+ by the system under test, this method blocks until the system under
+ test invokes such an RPC.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a
+ stream-stream RPC method.
+
+ Returns:
+ A (invocation_metadata, stream_stream_channel_rpc) tuple of the RPC's
+ invocation metadata and a StreamStreamChannelRpc with which to
+ "play server" for the RPC.
+ """
+ raise NotImplementedError()
+
+
class Time(six.with_metaclass(abc.ABCMeta)):
"""A simulation of time.
@@ -117,3 +390,19 @@ def strict_fake_time(now):
"""
from grpc_testing import _time
return _time.StrictFakeTime(now)
+
+
+def channel(service_descriptors, time):
+ """Creates a Channel for use in tests of a gRPC Python-using system.
+
+ Args:
+ service_descriptors: An iterable of descriptor.ServiceDescriptors
+ describing the RPCs that will be made on the returned Channel by the
+ system under test.
+ time: A Time to be used for tests.
+
+ Returns:
+ A Channel for use in tests.
+ """
+ from grpc_testing import _channel
+ return _channel.testing_channel(service_descriptors, time)
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/__init__.py b/src/python/grpcio_testing/grpc_testing/_channel/__init__.py
new file mode 100644
index 0000000000..8011975d0a
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_channel/__init__.py
@@ -0,0 +1,23 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from grpc_testing._channel import _channel
+from grpc_testing._channel import _channel_state
+
+
+# descriptors is reserved for later use.
+# pylint: disable=unused-argument
+def testing_channel(descriptors, time):
+ return _channel.TestingChannel(time, _channel_state.State())
+# pylint: enable=unused-argument
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py
new file mode 100644
index 0000000000..fbd064db88
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py
@@ -0,0 +1,62 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import grpc_testing
+from grpc_testing._channel import _channel_rpc
+from grpc_testing._channel import _multi_callable
+
+
+# All serializer and deserializer parameters are not (yet) used by this
+# test infrastructure.
+# pylint: disable=unused-argument
+class TestingChannel(grpc_testing.Channel):
+
+ def __init__(self, time, state):
+ self._time = time
+ self._state = state
+
+ def subscribe(self, callback, try_to_connect=False):
+ raise NotImplementedError()
+
+ def unsubscribe(self, callback):
+ raise NotImplementedError()
+
+ def unary_unary(
+ self, method, request_serializer=None, response_deserializer=None):
+ return _multi_callable.UnaryUnary(method, self._state)
+
+ def unary_stream(
+ self, method, request_serializer=None, response_deserializer=None):
+ return _multi_callable.UnaryStream(method, self._state)
+
+ def stream_unary(
+ self, method, request_serializer=None, response_deserializer=None):
+ return _multi_callable.StreamUnary(method, self._state)
+
+ def stream_stream(
+ self, method, request_serializer=None, response_deserializer=None):
+ return _multi_callable.StreamStream(method, self._state)
+
+ def take_unary_unary(self, method_descriptor):
+ return _channel_rpc.unary_unary(self._state, method_descriptor)
+
+ def take_unary_stream(self, method_descriptor):
+ return _channel_rpc.unary_stream(self._state, method_descriptor)
+
+ def take_stream_unary(self, method_descriptor):
+ return _channel_rpc.stream_unary(self._state, method_descriptor)
+
+ def take_stream_stream(self, method_descriptor):
+ return _channel_rpc.stream_stream(self._state, method_descriptor)
+# pylint: enable=unused-argument
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py
new file mode 100644
index 0000000000..762b6a035b
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel_rpc.py
@@ -0,0 +1,119 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import grpc_testing
+
+
+class _UnaryUnary(grpc_testing.UnaryUnaryChannelRpc):
+
+ def __init__(self, rpc_state):
+ self._rpc_state = rpc_state
+
+ def send_initial_metadata(self, initial_metadata):
+ self._rpc_state.send_initial_metadata(initial_metadata)
+
+ def cancelled(self):
+ self._rpc_state.cancelled()
+
+ def terminate(self, response, trailing_metadata, code, details):
+ self._rpc_state.terminate_with_response(
+ response, trailing_metadata, code, details)
+
+
+class _UnaryStream(grpc_testing.UnaryStreamChannelRpc):
+
+ def __init__(self, rpc_state):
+ self._rpc_state = rpc_state
+
+ def send_initial_metadata(self, initial_metadata):
+ self._rpc_state.send_initial_metadata(initial_metadata)
+
+ def send_response(self, response):
+ self._rpc_state.send_response(response)
+
+ def cancelled(self):
+ self._rpc_state.cancelled()
+
+ def terminate(self, trailing_metadata, code, details):
+ self._rpc_state.terminate(trailing_metadata, code, details)
+
+
+class _StreamUnary(grpc_testing.StreamUnaryChannelRpc):
+
+ def __init__(self, rpc_state):
+ self._rpc_state = rpc_state
+
+ def send_initial_metadata(self, initial_metadata):
+ self._rpc_state.send_initial_metadata(initial_metadata)
+
+ def take_request(self):
+ return self._rpc_state.take_request()
+
+ def requests_closed(self):
+ return self._rpc_state.requests_closed()
+
+ def cancelled(self):
+ self._rpc_state.cancelled()
+
+ def terminate(self, response, trailing_metadata, code, details):
+ self._rpc_state.terminate_with_response(
+ response, trailing_metadata, code, details)
+
+
+class _StreamStream(grpc_testing.StreamStreamChannelRpc):
+
+ def __init__(self, rpc_state):
+ self._rpc_state = rpc_state
+
+ def send_initial_metadata(self, initial_metadata):
+ self._rpc_state.send_initial_metadata(initial_metadata)
+
+ def take_request(self):
+ return self._rpc_state.take_request()
+
+ def send_response(self, response):
+ self._rpc_state.send_response(response)
+
+ def requests_closed(self):
+ return self._rpc_state.requests_closed()
+
+ def cancelled(self):
+ self._rpc_state.cancelled()
+
+ def terminate(self, trailing_metadata, code, details):
+ self._rpc_state.terminate(trailing_metadata, code, details)
+
+
+def unary_unary(channel_state, method_descriptor):
+ rpc_state = channel_state.take_rpc_state(method_descriptor)
+ invocation_metadata, request = (
+ rpc_state.take_invocation_metadata_and_request())
+ return invocation_metadata, request, _UnaryUnary(rpc_state)
+
+
+def unary_stream(channel_state, method_descriptor):
+ rpc_state = channel_state.take_rpc_state(method_descriptor)
+ invocation_metadata, request = (
+ rpc_state.take_invocation_metadata_and_request())
+ return invocation_metadata, request, _UnaryStream(rpc_state)
+
+
+def stream_unary(channel_state, method_descriptor):
+ rpc_state = channel_state.take_rpc_state(method_descriptor)
+ return rpc_state.take_invocation_metadata(), _StreamUnary(rpc_state)
+
+
+def stream_stream(channel_state, method_descriptor):
+ rpc_state = channel_state.take_rpc_state(method_descriptor)
+ return rpc_state.take_invocation_metadata(), _StreamStream(rpc_state)
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py
new file mode 100644
index 0000000000..569c41d79d
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel_state.py
@@ -0,0 +1,48 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import threading
+
+from grpc_testing import _common
+from grpc_testing._channel import _rpc_state
+
+
+class State(_common.ChannelHandler):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._rpc_states = collections.defaultdict(list)
+
+ def invoke_rpc(
+ self, method_full_rpc_name, invocation_metadata, requests,
+ requests_closed, timeout):
+ rpc_state = _rpc_state.State(
+ invocation_metadata, requests, requests_closed)
+ with self._condition:
+ self._rpc_states[method_full_rpc_name].append(rpc_state)
+ self._condition.notify_all()
+ return rpc_state
+
+ def take_rpc_state(self, method_descriptor):
+ method_full_rpc_name = '/{}/{}'.format(
+ method_descriptor.containing_service.full_name,
+ method_descriptor.name)
+ with self._condition:
+ while True:
+ method_rpc_states = self._rpc_states[method_full_rpc_name]
+ if method_rpc_states:
+ return method_rpc_states.pop(0)
+ else:
+ self._condition.wait()
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py
new file mode 100644
index 0000000000..ebce652eeb
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_channel/_invocation.py
@@ -0,0 +1,322 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import threading
+
+import grpc
+
+_NOT_YET_OBSERVED = object()
+
+
+def _cancel(handler):
+ return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!')
+
+
+def _is_active(handler):
+ return handler.is_active()
+
+
+def _time_remaining(unused_handler):
+ raise NotImplementedError()
+
+
+def _add_callback(handler, callback):
+ return handler.add_callback(callback)
+
+
+def _initial_metadata(handler):
+ return handler.initial_metadata()
+
+
+def _trailing_metadata(handler):
+ trailing_metadata, unused_code, unused_details = handler.termination()
+ return trailing_metadata
+
+
+def _code(handler):
+ unused_trailing_metadata, code, unused_details = handler.termination()
+ return code
+
+
+def _details(handler):
+ unused_trailing_metadata, unused_code, details = handler.termination()
+ return details
+
+
+class _Call(grpc.Call):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def cancel(self):
+ _cancel(self._handler)
+
+ def is_active(self):
+ return _is_active(self._handler)
+
+ def time_remaining(self):
+ return _time_remaining(self._handler)
+
+ def add_callback(self, callback):
+ return _add_callback(self._handler, callback)
+
+ def initial_metadata(self):
+ return _initial_metadata(self._handler)
+
+ def trailing_metadata(self):
+ return _trailing_metadata(self._handler)
+
+ def code(self):
+ return _code(self._handler)
+
+ def details(self):
+ return _details(self._handler)
+
+
+class _RpcErrorCall(grpc.RpcError, grpc.Call):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def cancel(self):
+ _cancel(self._handler)
+
+ def is_active(self):
+ return _is_active(self._handler)
+
+ def time_remaining(self):
+ return _time_remaining(self._handler)
+
+ def add_callback(self, callback):
+ return _add_callback(self._handler, callback)
+
+ def initial_metadata(self):
+ return _initial_metadata(self._handler)
+
+ def trailing_metadata(self):
+ return _trailing_metadata(self._handler)
+
+ def code(self):
+ return _code(self._handler)
+
+ def details(self):
+ return _details(self._handler)
+
+
+def _next(handler):
+ read = handler.take_response()
+ if read.code is None:
+ return read.response
+ elif read.code is grpc.StatusCode.OK:
+ raise StopIteration()
+ else:
+ raise _RpcErrorCall(handler)
+
+
+class _HandlerExtras(object):
+
+ def __init__(self):
+ self.condition = threading.Condition()
+ self.unary_response = _NOT_YET_OBSERVED
+ self.cancelled = False
+
+
+def _with_extras_cancel(handler, extras):
+ with extras.condition:
+ if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'):
+ extras.cancelled = True
+ return True
+ else:
+ return False
+
+
+def _extras_without_cancelled(extras):
+ with extras.condition:
+ return extras.cancelled
+
+
+def _running(handler):
+ return handler.is_active()
+
+
+def _done(handler):
+ return not handler.is_active()
+
+
+def _with_extras_unary_response(handler, extras):
+ with extras.condition:
+ if extras.unary_response is _NOT_YET_OBSERVED:
+ read = handler.take_response()
+ if read.code is None:
+ extras.unary_response = read.response
+ return read.response
+ else:
+ raise _RpcErrorCall(handler)
+ else:
+ return extras.unary_response
+
+
+def _exception(unused_handler):
+ raise NotImplementedError('TODO!')
+
+
+def _traceback(unused_handler):
+ raise NotImplementedError('TODO!')
+
+
+def _add_done_callback(handler, callback, future):
+ adapted_callback = lambda: callback(future)
+ if not handler.add_callback(adapted_callback):
+ callback(future)
+
+
+class _FutureCall(grpc.Future, grpc.Call):
+
+ def __init__(self, handler, extras):
+ self._handler = handler
+ self._extras = extras
+
+ def cancel(self):
+ return _with_extras_cancel(self._handler, self._extras)
+
+ def cancelled(self):
+ return _extras_without_cancelled(self._extras)
+
+ def running(self):
+ return _running(self._handler)
+
+ def done(self):
+ return _done(self._handler)
+
+ def result(self):
+ return _with_extras_unary_response(self._handler, self._extras)
+
+ def exception(self):
+ return _exception(self._handler)
+
+ def traceback(self):
+ return _traceback(self._handler)
+
+ def add_done_callback(self, fn):
+ _add_done_callback(self._handler, fn, self)
+
+ def is_active(self):
+ return _is_active(self._handler)
+
+ def time_remaining(self):
+ return _time_remaining(self._handler)
+
+ def add_callback(self, callback):
+ return _add_callback(self._handler, callback)
+
+ def initial_metadata(self):
+ return _initial_metadata(self._handler)
+
+ def trailing_metadata(self):
+ return _trailing_metadata(self._handler)
+
+ def code(self):
+ return _code(self._handler)
+
+ def details(self):
+ return _details(self._handler)
+
+
+def consume_requests(request_iterator, handler):
+
+ def _consume():
+ while True:
+ try:
+ request = next(request_iterator)
+ added = handler.add_request(request)
+ if not added:
+ break
+ except StopIteration:
+ handler.close_requests()
+ break
+ except Exception: # pylint: disable=broad-except
+ details = 'Exception iterating requests!'
+ logging.exception(details)
+ handler.cancel(grpc.StatusCode.UNKNOWN, details)
+
+ consumption = threading.Thread(target=_consume)
+ consumption.start()
+
+
+def blocking_unary_response(handler):
+ read = handler.take_response()
+ if read.code is None:
+ unused_trailing_metadata, code, unused_details = handler.termination()
+ if code is grpc.StatusCode.OK:
+ return read.response
+ else:
+ raise _RpcErrorCall(handler)
+ else:
+ raise _RpcErrorCall(handler)
+
+
+def blocking_unary_response_with_call(handler):
+ read = handler.take_response()
+ if read.code is None:
+ unused_trailing_metadata, code, unused_details = handler.termination()
+ if code is grpc.StatusCode.OK:
+ return read.response, _Call(handler)
+ else:
+ raise _RpcErrorCall(handler)
+ else:
+ raise _RpcErrorCall(handler)
+
+
+def future_call(handler):
+ return _FutureCall(handler, _HandlerExtras())
+
+
+class ResponseIteratorCall(grpc.Call):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return _next(self._handler)
+
+ def next(self):
+ return _next(self._handler)
+
+ def cancel(self):
+ _cancel(self._handler)
+
+ def is_active(self):
+ return _is_active(self._handler)
+
+ def time_remaining(self):
+ return _time_remaining(self._handler)
+
+ def add_callback(self, callback):
+ return _add_callback(self._handler, callback)
+
+ def initial_metadata(self):
+ return _initial_metadata(self._handler)
+
+ def trailing_metadata(self):
+ return _trailing_metadata(self._handler)
+
+ def code(self):
+ return _code(self._handler)
+
+ def details(self):
+ return _details(self._handler)
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py b/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py
new file mode 100644
index 0000000000..fe69257f5b
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py
@@ -0,0 +1,115 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import grpc
+from grpc_testing import _common
+from grpc_testing._channel import _invocation
+
+# All per-call credentials parameters are unused by this test infrastructure.
+# pylint: disable=unused-argument
+class UnaryUnary(grpc.UnaryUnaryMultiCallable):
+
+ def __init__(self, method_full_rpc_name, channel_handler):
+ self._method_full_rpc_name = method_full_rpc_name
+ self._channel_handler = channel_handler
+
+ def __call__(self, request, timeout=None, metadata=None, credentials=None):
+ rpc_handler = self._channel_handler.invoke_rpc(
+ self._method_full_rpc_name, _common.fuss_with_metadata(metadata),
+ [request], True, timeout)
+ return _invocation.blocking_unary_response(rpc_handler)
+
+ def with_call(self, request, timeout=None, metadata=None, credentials=None):
+ rpc_handler = self._channel_handler.invoke_rpc(
+ self._method_full_rpc_name, _common.fuss_with_metadata(metadata),
+ [request], True, timeout)
+ return _invocation.blocking_unary_response_with_call(rpc_handler)
+
+ def future(self, request, timeout=None, metadata=None, credentials=None):
+ rpc_handler = self._channel_handler.invoke_rpc(
+ self._method_full_rpc_name, _common.fuss_with_metadata(metadata),
+ [request], True, timeout)
+ return _invocation.future_call(rpc_handler)
+
+
+class UnaryStream(grpc.StreamStreamMultiCallable):
+
+ def __init__(self, method_full_rpc_name, channel_handler):
+ self._method_full_rpc_name = method_full_rpc_name
+ self._channel_handler = channel_handler
+
+ def __call__(self, request, timeout=None, metadata=None, credentials=None):
+ rpc_handler = self._channel_handler.invoke_rpc(
+ self._method_full_rpc_name,
+ _common.fuss_with_metadata(metadata), [request], True, timeout)
+ return _invocation.ResponseIteratorCall(rpc_handler)
+
+
+class StreamUnary(grpc.StreamUnaryMultiCallable):
+
+ def __init__(self, method_full_rpc_name, channel_handler):
+ self._method_full_rpc_name = method_full_rpc_name
+ self._channel_handler = channel_handler
+
+ def __call__(self,
+ request_iterator,
+ timeout=None,
+ metadata=None,
+ credentials=None):
+ rpc_handler = self._channel_handler.invoke_rpc(
+ self._method_full_rpc_name,
+ _common.fuss_with_metadata(metadata), [], False, timeout)
+ _invocation.consume_requests(request_iterator, rpc_handler)
+ return _invocation.blocking_unary_response(rpc_handler)
+
+ def with_call(self,
+ request_iterator,
+ timeout=None,
+ metadata=None,
+ credentials=None):
+ rpc_handler = self._channel_handler.invoke_rpc(
+ self._method_full_rpc_name,
+ _common.fuss_with_metadata(metadata), [], False, timeout)
+ _invocation.consume_requests(request_iterator, rpc_handler)
+ return _invocation.blocking_unary_response_with_call(rpc_handler)
+
+ def future(self,
+ request_iterator,
+ timeout=None,
+ metadata=None,
+ credentials=None):
+ rpc_handler = self._channel_handler.invoke_rpc(
+ self._method_full_rpc_name,
+ _common.fuss_with_metadata(metadata), [], False, timeout)
+ _invocation.consume_requests(request_iterator, rpc_handler)
+ return _invocation.future_call(rpc_handler)
+
+
+class StreamStream(grpc.StreamStreamMultiCallable):
+
+ def __init__(self, method_full_rpc_name, channel_handler):
+ self._method_full_rpc_name = method_full_rpc_name
+ self._channel_handler = channel_handler
+
+ def __call__(self,
+ request_iterator,
+ timeout=None,
+ metadata=None,
+ credentials=None):
+ rpc_handler = self._channel_handler.invoke_rpc(
+ self._method_full_rpc_name,
+ _common.fuss_with_metadata(metadata), [], False, timeout)
+ _invocation.consume_requests(request_iterator, rpc_handler)
+ return _invocation.ResponseIteratorCall(rpc_handler)
+# pylint: enable=unused-argument
diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py b/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py
new file mode 100644
index 0000000000..e1fa49a2a8
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py
@@ -0,0 +1,193 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+
+import grpc
+from grpc_testing import _common
+
+
+class State(_common.ChannelRpcHandler):
+
+ def __init__(self, invocation_metadata, requests, requests_closed):
+ self._condition = threading.Condition()
+ self._invocation_metadata = invocation_metadata
+ self._requests = requests
+ self._requests_closed = requests_closed
+ self._initial_metadata = None
+ self._responses = []
+ self._trailing_metadata = None
+ self._code = None
+ self._details = None
+
+ def initial_metadata(self):
+ with self._condition:
+ while True:
+ if self._initial_metadata is None:
+ if self._code is None:
+ self._condition.wait()
+ else:
+ return _common.FUSSED_EMPTY_METADATA
+ else:
+ return self._initial_metadata
+
+ def add_request(self, request):
+ with self._condition:
+ if self._code is None and not self._requests_closed:
+ self._requests.append(request)
+ self._condition.notify_all()
+ return True
+ else:
+ return False
+
+ def close_requests(self):
+ with self._condition:
+ if self._code is None and not self._requests_closed:
+ self._requests_closed = True
+ self._condition.notify_all()
+
+ def take_response(self):
+ with self._condition:
+ while True:
+ if self._code is grpc.StatusCode.OK:
+ if self._responses:
+ response = self._responses.pop(0)
+ return _common.ChannelRpcRead(
+ response, None, None, None)
+ else:
+ return _common.ChannelRpcRead(
+ None, self._trailing_metadata,
+ grpc.StatusCode.OK, self._details)
+ elif self._code is None:
+ if self._responses:
+ response = self._responses.pop(0)
+ return _common.ChannelRpcRead(
+ response, None, None, None)
+ else:
+ self._condition.wait()
+ else:
+ return _common.ChannelRpcRead(
+ None, self._trailing_metadata, self._code,
+ self._details)
+
+ def termination(self):
+ with self._condition:
+ while True:
+ if self._code is None:
+ self._condition.wait()
+ else:
+ return self._trailing_metadata, self._code, self._details
+
+ def cancel(self, code, details):
+ with self._condition:
+ if self._code is None:
+ if self._initial_metadata is None:
+ self._initial_metadata = _common.FUSSED_EMPTY_METADATA
+ self._trailing_metadata = _common.FUSSED_EMPTY_METADATA
+ self._code = code
+ self._details = details
+ self._condition.notify_all()
+ return True
+ else:
+ return False
+
+ def take_invocation_metadata(self):
+ with self._condition:
+ if self._invocation_metadata is None:
+ raise ValueError('Expected invocation metadata!')
+ else:
+ invocation_metadata = self._invocation_metadata
+ self._invocation_metadata = None
+ return invocation_metadata
+
+ def take_invocation_metadata_and_request(self):
+ with self._condition:
+ if self._invocation_metadata is None:
+ raise ValueError('Expected invocation metadata!')
+ elif not self._requests:
+ raise ValueError('Expected at least one request!')
+ else:
+ invocation_metadata = self._invocation_metadata
+ self._invocation_metadata = None
+ return invocation_metadata, self._requests.pop(0)
+
+ def send_initial_metadata(self, initial_metadata):
+ with self._condition:
+ self._initial_metadata = _common.fuss_with_metadata(
+ initial_metadata)
+ self._condition.notify_all()
+
+ def take_request(self):
+ with self._condition:
+ while True:
+ if self._requests:
+ return self._requests.pop(0)
+ else:
+ self._condition.wait()
+
+ def requests_closed(self):
+ with self._condition:
+ while True:
+ if self._requests_closed:
+ return
+ else:
+ self._condition.wait()
+
+ def send_response(self, response):
+ with self._condition:
+ if self._code is None:
+ self._responses.append(response)
+ self._condition.notify_all()
+
+ def terminate_with_response(
+ self, response, trailing_metadata, code, details):
+ with self._condition:
+ if self._initial_metadata is None:
+ self._initial_metadata = _common.FUSSED_EMPTY_METADATA
+ self._responses.append(response)
+ self._trailing_metadata = _common.fuss_with_metadata(
+ trailing_metadata)
+ self._code = code
+ self._details = details
+ self._condition.notify_all()
+
+ def terminate(self, trailing_metadata, code, details):
+ with self._condition:
+ if self._initial_metadata is None:
+ self._initial_metadata = _common.FUSSED_EMPTY_METADATA
+ self._trailing_metadata = _common.fuss_with_metadata(
+ trailing_metadata)
+ self._code = code
+ self._details = details
+ self._condition.notify_all()
+
+ def cancelled(self):
+ with self._condition:
+ while True:
+ if self._code is grpc.StatusCode.CANCELLED:
+ return
+ elif self._code is None:
+ self._condition.wait()
+ else:
+ raise ValueError(
+ 'Status code unexpectedly {}!'.format(self._code))
+
+ def is_active(self):
+ raise NotImplementedError()
+
+ def time_remaining(self):
+ raise NotImplementedError()
+
+ def add_callback(self, callback):
+ raise NotImplementedError()
diff --git a/src/python/grpcio_testing/grpc_testing/_common.py b/src/python/grpcio_testing/grpc_testing/_common.py
new file mode 100644
index 0000000000..cb4a7f5fa2
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_common.py
@@ -0,0 +1,92 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Common interfaces and implementation."""
+
+import abc
+import collections
+
+import six
+
+
+def _fuss(tuplified_metadata):
+ return tuplified_metadata + (
+ (
+ 'grpc.metadata_added_by_runtime',
+ 'gRPC is allowed to add metadata in transmission and does so.',
+ ),
+ )
+
+FUSSED_EMPTY_METADATA = _fuss(())
+
+
+def fuss_with_metadata(metadata):
+ if metadata is None:
+ return FUSSED_EMPTY_METADATA
+ else:
+ return _fuss(tuple(metadata))
+
+
+class ChannelRpcRead(
+ collections.namedtuple(
+ 'ChannelRpcRead',
+ ('response', 'trailing_metadata', 'code', 'details',))):
+ pass
+
+
+class ChannelRpcHandler(six.with_metaclass(abc.ABCMeta)):
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_request(self, request):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def close_requests(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_response(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self, code, details):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def termination(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def is_active(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def time_remaining(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_callback(self, callback):
+ raise NotImplementedError()
+
+
+class ChannelHandler(six.with_metaclass(abc.ABCMeta)):
+
+ @abc.abstractmethod
+ def invoke_rpc(
+ self, method_full_rpc_name, invocation_metadata, requests,
+ requests_closed, timeout):
+ raise NotImplementedError()
diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py
index adc909ccdc..debe14c40e 100644
--- a/src/python/grpcio_tests/setup.py
+++ b/src/python/grpcio_tests/setup.py
@@ -68,6 +68,10 @@ PACKAGE_DATA = {
'tests.protoc_plugin.protos.invocation_testing.split_services': [
'services.proto',
],
+ 'tests.testing.proto': [
+ 'requests.proto',
+ 'services.proto',
+ ],
'tests.unit': [
'credentials/ca.pem',
'credentials/server1.key',
diff --git a/src/python/grpcio_tests/tests/testing/_application_common.py b/src/python/grpcio_tests/tests/testing/_application_common.py
new file mode 100644
index 0000000000..4e98879607
--- /dev/null
+++ b/src/python/grpcio_tests/tests/testing/_application_common.py
@@ -0,0 +1,36 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""An example gRPC Python-using application's common code elements."""
+
+from tests.testing.proto import requests_pb2
+from tests.testing.proto import services_pb2
+
+SERVICE_NAME = 'tests_of_grpc_testing.FirstService'
+UNARY_UNARY_METHOD_NAME = 'UnUn'
+UNARY_STREAM_METHOD_NAME = 'UnStre'
+STREAM_UNARY_METHOD_NAME = 'StreUn'
+STREAM_STREAM_METHOD_NAME = 'StreStre'
+
+UNARY_UNARY_REQUEST = requests_pb2.Up(first_up_field=2)
+ERRONEOUS_UNARY_UNARY_REQUEST = requests_pb2.Up(first_up_field=3)
+UNARY_UNARY_RESPONSE = services_pb2.Down(first_down_field=5)
+ERRONEOUS_UNARY_UNARY_RESPONSE = services_pb2.Down(first_down_field=7)
+UNARY_STREAM_REQUEST = requests_pb2.Charm(first_charm_field=11)
+STREAM_UNARY_REQUEST = requests_pb2.Charm(first_charm_field=13)
+STREAM_UNARY_RESPONSE = services_pb2.Strange(first_strange_field=17)
+STREAM_STREAM_REQUEST = requests_pb2.Top(first_top_field=19)
+STREAM_STREAM_RESPONSE = services_pb2.Bottom(first_bottom_field=23)
+TWO_STREAM_STREAM_RESPONSES = (STREAM_STREAM_RESPONSE,) * 2
+
+INFINITE_REQUEST_STREAM_TIMEOUT = 0.2
diff --git a/src/python/grpcio_tests/tests/testing/_application_testing_common.py b/src/python/grpcio_tests/tests/testing/_application_testing_common.py
new file mode 100644
index 0000000000..9c9e485a78
--- /dev/null
+++ b/src/python/grpcio_tests/tests/testing/_application_testing_common.py
@@ -0,0 +1,33 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import grpc_testing
+
+from tests.testing.proto import requests_pb2
+from tests.testing.proto import services_pb2
+
+# TODO(https://github.com/grpc/grpc/issues/11657): Eliminate this entirely.
+# TODO(https://github.com/google/protobuf/issues/3452): Eliminate this if/else.
+if services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None:
+ FIRST_SERVICE = 'Fix protobuf issue 3452!'
+ FIRST_SERVICE_UNUN = 'Fix protobuf issue 3452!'
+ FIRST_SERVICE_UNSTRE = 'Fix protobuf issue 3452!'
+ FIRST_SERVICE_STREUN = 'Fix protobuf issue 3452!'
+ FIRST_SERVICE_STRESTRE = 'Fix protobuf issue 3452!'
+else:
+ FIRST_SERVICE = services_pb2.DESCRIPTOR.services_by_name['FirstService']
+ FIRST_SERVICE_UNUN = FIRST_SERVICE.methods_by_name['UnUn']
+ FIRST_SERVICE_UNSTRE = FIRST_SERVICE.methods_by_name['UnStre']
+ FIRST_SERVICE_STREUN = FIRST_SERVICE.methods_by_name['StreUn']
+ FIRST_SERVICE_STRESTRE = FIRST_SERVICE.methods_by_name['StreStre']
diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py
new file mode 100644
index 0000000000..aff32fb4dc
--- /dev/null
+++ b/src/python/grpcio_tests/tests/testing/_client_application.py
@@ -0,0 +1,260 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""An example gRPC Python-using client-side application."""
+
+import collections
+import enum
+import threading
+import time
+
+import grpc
+from tests.unit.framework.common import test_constants
+
+from tests.testing.proto import requests_pb2
+from tests.testing.proto import services_pb2
+from tests.testing.proto import services_pb2_grpc
+
+from tests.testing import _application_common
+
+
+@enum.unique
+class Scenario(enum.Enum):
+ UNARY_UNARY = 'unary unary'
+ UNARY_STREAM = 'unary stream'
+ STREAM_UNARY = 'stream unary'
+ STREAM_STREAM = 'stream stream'
+ CONCURRENT_STREAM_UNARY = 'concurrent stream unary'
+ CONCURRENT_STREAM_STREAM = 'concurrent stream stream'
+ CANCEL_UNARY_UNARY = 'cancel unary unary'
+ CANCEL_UNARY_STREAM = 'cancel unary stream'
+ INFINITE_REQUEST_STREAM = 'infinite request stream'
+
+
+class Outcome(collections.namedtuple('Outcome', ('kind', 'code', 'details'))):
+ """Outcome of a client application scenario.
+
+ Attributes:
+ kind: A Kind value describing the overall kind of scenario execution.
+ code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR.
+ details: A status details string. Only valid if kind is Kind.RPC_ERROR.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ SATISFACTORY = 'satisfactory'
+ UNSATISFACTORY = 'unsatisfactory'
+ RPC_ERROR = 'rpc error'
+
+
+_SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None)
+_UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None)
+
+
+class _Pipe(object):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._values = []
+ self._open = True
+
+ def __iter__(self):
+ return self
+
+ def _next(self):
+ with self._condition:
+ while True:
+ if self._values:
+ return self._values.pop(0)
+ elif not self._open:
+ raise StopIteration()
+ else:
+ self._condition.wait()
+
+ def __next__(self): # (Python 3 Iterator Protocol)
+ return self._next()
+
+ def next(self): # (Python 2 Iterator Protocol)
+ return self._next()
+
+ def add(self, value):
+ with self._condition:
+ self._values.append(value)
+ self._condition.notify_all()
+
+ def close(self):
+ with self._condition:
+ self._open = False
+ self._condition.notify_all()
+
+
+def _run_unary_unary(stub):
+ response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST)
+ if _application_common.UNARY_UNARY_RESPONSE == response:
+ return _SATISFACTORY_OUTCOME
+ else:
+ return _UNSATISFACTORY_OUTCOME
+
+
+def _run_unary_stream(stub):
+ response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST)
+ try:
+ next(response_iterator)
+ except StopIteration:
+ return _SATISFACTORY_OUTCOME
+ else:
+ return _UNSATISFACTORY_OUTCOME
+
+
+def _run_stream_unary(stub):
+ response, call = stub.StreUn.with_call(
+ iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
+ if (_application_common.STREAM_UNARY_RESPONSE == response and
+ call.code() is grpc.StatusCode.OK):
+ return _SATISFACTORY_OUTCOME
+ else:
+ return _UNSATISFACTORY_OUTCOME
+
+
+def _run_stream_stream(stub):
+ request_pipe = _Pipe()
+ response_iterator = stub.StreStre(iter(request_pipe))
+ request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
+ first_responses = next(response_iterator), next(response_iterator),
+ request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
+ second_responses = next(response_iterator), next(response_iterator),
+ request_pipe.close()
+ try:
+ next(response_iterator)
+ except StopIteration:
+ unexpected_extra_response = False
+ else:
+ unexpected_extra_response = True
+ if (first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES and
+ second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES
+ and not unexpected_extra_response):
+ return _SATISFACTORY_OUTCOME
+ else:
+ return _UNSATISFACTORY_OUTCOME
+
+
+def _run_concurrent_stream_unary(stub):
+ future_calls = tuple(
+ stub.StreUn.future(
+ iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
+ for _ in range(test_constants.THREAD_CONCURRENCY))
+ for future_call in future_calls:
+ if future_call.code() is grpc.StatusCode.OK:
+ response = future_call.result()
+ if _application_common.STREAM_UNARY_RESPONSE != response:
+ return _UNSATISFACTORY_OUTCOME
+ else:
+ return _UNSATISFACTORY_OUTCOME
+ else:
+ return _SATISFACTORY_OUTCOME
+
+
+def _run_concurrent_stream_stream(stub):
+ condition = threading.Condition()
+ outcomes = [None] * test_constants.RPC_CONCURRENCY
+
+ def run_stream_stream(index):
+ outcome = _run_stream_stream(stub)
+ with condition:
+ outcomes[index] = outcome
+ condition.notify()
+
+ for index in range(test_constants.RPC_CONCURRENCY):
+ thread = threading.Thread(target=run_stream_stream, args=(index,))
+ thread.start()
+ with condition:
+ while True:
+ if all(outcomes):
+ for outcome in outcomes:
+ if outcome.kind is not Outcome.Kind.SATISFACTORY:
+ return _UNSATISFACTORY_OUTCOME
+ else:
+ return _SATISFACTORY_OUTCOME
+ else:
+ condition.wait()
+
+
+def _run_cancel_unary_unary(stub):
+ response_future_call = stub.UnUn.future(
+ _application_common.UNARY_UNARY_REQUEST)
+ initial_metadata = response_future_call.initial_metadata()
+ cancelled = response_future_call.cancel()
+ if initial_metadata is not None and cancelled:
+ return _SATISFACTORY_OUTCOME
+ else:
+ return _UNSATISFACTORY_OUTCOME
+
+
+def _run_infinite_request_stream(stub):
+
+ def infinite_request_iterator():
+ while True:
+ yield _application_common.STREAM_UNARY_REQUEST
+
+ response_future_call = stub.StreUn.future(
+ infinite_request_iterator(),
+ timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
+ if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED:
+ return _SATISFACTORY_OUTCOME
+ else:
+ return _UNSATISFACTORY_OUTCOME
+
+
+def run(scenario, channel):
+ stub = services_pb2_grpc.FirstServiceStub(channel)
+ try:
+ if scenario is Scenario.UNARY_UNARY:
+ return _run_unary_unary(stub)
+ elif scenario is Scenario.UNARY_STREAM:
+ return _run_unary_stream(stub)
+ elif scenario is Scenario.STREAM_UNARY:
+ return _run_stream_unary(stub)
+ elif scenario is Scenario.STREAM_STREAM:
+ return _run_stream_stream(stub)
+ elif scenario is Scenario.CONCURRENT_STREAM_UNARY:
+ return _run_concurrent_stream_unary(stub)
+ elif scenario is Scenario.CONCURRENT_STREAM_STREAM:
+ return _run_concurrent_stream_stream(stub)
+ elif scenario is Scenario.CANCEL_UNARY_UNARY:
+ return _run_cancel_unary_unary(stub)
+ elif scenario is Scenario.INFINITE_REQUEST_STREAM:
+ return _run_infinite_request_stream(stub)
+ except grpc.RpcError as rpc_error:
+ return Outcome(Outcome.Kind.RPC_ERROR,
+ rpc_error.code(), rpc_error.details())
+
+
+_IMPLEMENTATIONS = {
+ Scenario.UNARY_UNARY: _run_unary_unary,
+ Scenario.UNARY_STREAM: _run_unary_stream,
+ Scenario.STREAM_UNARY: _run_stream_unary,
+ Scenario.STREAM_STREAM: _run_stream_stream,
+ Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary,
+ Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream,
+ Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary,
+ Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream,
+}
+
+
+def run(scenario, channel):
+ stub = services_pb2_grpc.FirstServiceStub(channel)
+ try:
+ return _IMPLEMENTATIONS[scenario](stub)
+ except grpc.RpcError as rpc_error:
+ return Outcome(Outcome.Kind.RPC_ERROR,
+ rpc_error.code(), rpc_error.details())
diff --git a/src/python/grpcio_tests/tests/testing/_client_test.py b/src/python/grpcio_tests/tests/testing/_client_test.py
new file mode 100644
index 0000000000..172f386d7b
--- /dev/null
+++ b/src/python/grpcio_tests/tests/testing/_client_test.py
@@ -0,0 +1,306 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from concurrent import futures
+import time
+import unittest
+
+import grpc
+from grpc.framework.foundation import logging_pool
+from tests.unit.framework.common import test_constants
+import grpc_testing
+
+from tests.testing import _application_common
+from tests.testing import _application_testing_common
+from tests.testing import _client_application
+from tests.testing.proto import requests_pb2
+from tests.testing.proto import services_pb2
+
+
+# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip.
+@unittest.skipIf(
+ services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None,
+ 'Fix protobuf issue 3452!')
+class ClientTest(unittest.TestCase):
+
+ def setUp(self):
+ # In this test the client-side application under test executes in
+ # a separate thread while we retain use of the test thread to "play
+ # server".
+ self._client_execution_thread_pool = logging_pool.pool(1)
+
+ self._fake_time = grpc_testing.strict_fake_time(time.time())
+ self._real_time = grpc_testing.strict_real_time()
+ self._fake_time_channel = grpc_testing.channel(
+ services_pb2.DESCRIPTOR.services_by_name.values(), self._fake_time)
+ self._real_time_channel = grpc_testing.channel(
+ services_pb2.DESCRIPTOR.services_by_name.values(), self._real_time)
+
+ def tearDown(self):
+ self._client_execution_thread_pool.shutdown(wait=True)
+
+ def test_successful_unary_unary(self):
+ application_future = self._client_execution_thread_pool.submit(
+ _client_application.run, _client_application.Scenario.UNARY_UNARY,
+ self._real_time_channel)
+ invocation_metadata, request, rpc = (
+ self._real_time_channel.take_unary_unary(
+ _application_testing_common.FIRST_SERVICE_UNUN))
+ rpc.send_initial_metadata(())
+ rpc.terminate(_application_common.UNARY_UNARY_RESPONSE, (),
+ grpc.StatusCode.OK, '')
+ application_return_value = application_future.result()
+
+ self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
+ self.assertIs(application_return_value.kind,
+ _client_application.Outcome.Kind.SATISFACTORY)
+
+ def test_successful_unary_stream(self):
+ application_future = self._client_execution_thread_pool.submit(
+ _client_application.run, _client_application.Scenario.UNARY_STREAM,
+ self._fake_time_channel)
+ invocation_metadata, request, rpc = (
+ self._fake_time_channel.take_unary_stream(
+ _application_testing_common.FIRST_SERVICE_UNSTRE))
+ rpc.send_initial_metadata(())
+ rpc.terminate((), grpc.StatusCode.OK, '')
+ application_return_value = application_future.result()
+
+ self.assertEqual(_application_common.UNARY_STREAM_REQUEST, request)
+ self.assertIs(application_return_value.kind,
+ _client_application.Outcome.Kind.SATISFACTORY)
+
+ def test_successful_stream_unary(self):
+ application_future = self._client_execution_thread_pool.submit(
+ _client_application.run, _client_application.Scenario.STREAM_UNARY,
+ self._real_time_channel)
+ invocation_metadata, rpc = self._real_time_channel.take_stream_unary(
+ _application_testing_common.FIRST_SERVICE_STREUN)
+ rpc.send_initial_metadata(())
+ first_request = rpc.take_request()
+ second_request = rpc.take_request()
+ third_request = rpc.take_request()
+ rpc.requests_closed()
+ rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (),
+ grpc.StatusCode.OK, '')
+ application_return_value = application_future.result()
+
+ self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
+ first_request)
+ self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
+ second_request)
+ self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
+ third_request)
+ self.assertIs(application_return_value.kind,
+ _client_application.Outcome.Kind.SATISFACTORY)
+
+ def test_successful_stream_stream(self):
+ application_future = self._client_execution_thread_pool.submit(
+ _client_application.run, _client_application.Scenario.STREAM_STREAM,
+ self._fake_time_channel)
+ invocation_metadata, rpc = self._fake_time_channel.take_stream_stream(
+ _application_testing_common.FIRST_SERVICE_STRESTRE)
+ first_request = rpc.take_request()
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ second_request = rpc.take_request()
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.requests_closed()
+ rpc.terminate((), grpc.StatusCode.OK, '')
+ application_return_value = application_future.result()
+
+ self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
+ first_request)
+ self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
+ second_request)
+ self.assertIs(application_return_value.kind,
+ _client_application.Outcome.Kind.SATISFACTORY)
+
+ def test_concurrent_stream_stream(self):
+ application_future = self._client_execution_thread_pool.submit(
+ _client_application.run,
+ _client_application.Scenario.CONCURRENT_STREAM_STREAM,
+ self._real_time_channel)
+ rpcs = []
+ for _ in range(test_constants.RPC_CONCURRENCY):
+ invocation_metadata, rpc = (
+ self._real_time_channel.take_stream_stream(
+ _application_testing_common.FIRST_SERVICE_STRESTRE))
+ rpcs.append(rpc)
+ requests = {}
+ for rpc in rpcs:
+ requests[rpc] = [rpc.take_request()]
+ for rpc in rpcs:
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ for rpc in rpcs:
+ requests[rpc].append(rpc.take_request())
+ for rpc in rpcs:
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ for rpc in rpcs:
+ rpc.requests_closed()
+ for rpc in rpcs:
+ rpc.terminate((), grpc.StatusCode.OK, '')
+ application_return_value = application_future.result()
+
+ for requests_of_one_rpc in requests.values():
+ for request in requests_of_one_rpc:
+ self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
+ request)
+ self.assertIs(application_return_value.kind,
+ _client_application.Outcome.Kind.SATISFACTORY)
+
+ def test_cancelled_unary_unary(self):
+ application_future = self._client_execution_thread_pool.submit(
+ _client_application.run,
+ _client_application.Scenario.CANCEL_UNARY_UNARY,
+ self._fake_time_channel)
+ invocation_metadata, request, rpc = (
+ self._fake_time_channel.take_unary_unary(
+ _application_testing_common.FIRST_SERVICE_UNUN))
+ rpc.send_initial_metadata(())
+ rpc.cancelled()
+ application_return_value = application_future.result()
+
+ self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
+ self.assertIs(application_return_value.kind,
+ _client_application.Outcome.Kind.SATISFACTORY)
+
+ def test_status_stream_unary(self):
+ application_future = self._client_execution_thread_pool.submit(
+ _client_application.run,
+ _client_application.Scenario.CONCURRENT_STREAM_UNARY,
+ self._fake_time_channel)
+ rpcs = tuple(
+ self._fake_time_channel.take_stream_unary(
+ _application_testing_common.FIRST_SERVICE_STREUN)[1]
+ for _ in range(test_constants.THREAD_CONCURRENCY))
+ for rpc in rpcs:
+ rpc.take_request()
+ rpc.take_request()
+ rpc.take_request()
+ rpc.requests_closed()
+ rpc.send_initial_metadata((
+ ('my_metadata_key', 'My Metadata Value!',),))
+ for rpc in rpcs[:-1]:
+ rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (),
+ grpc.StatusCode.OK, '')
+ rpcs[-1].terminate(_application_common.STREAM_UNARY_RESPONSE, (),
+ grpc.StatusCode.RESOURCE_EXHAUSTED,
+ 'nope; not able to handle all those RPCs!')
+ application_return_value = application_future.result()
+
+ self.assertIs(application_return_value.kind,
+ _client_application.Outcome.Kind.UNSATISFACTORY)
+
+ def test_status_stream_stream(self):
+ code = grpc.StatusCode.DEADLINE_EXCEEDED
+ details = 'test deadline exceeded!'
+
+ application_future = self._client_execution_thread_pool.submit(
+ _client_application.run, _client_application.Scenario.STREAM_STREAM,
+ self._real_time_channel)
+ invocation_metadata, rpc = self._real_time_channel.take_stream_stream(
+ _application_testing_common.FIRST_SERVICE_STRESTRE)
+ first_request = rpc.take_request()
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ second_request = rpc.take_request()
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.requests_closed()
+ rpc.terminate((), code, details)
+ application_return_value = application_future.result()
+
+ self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
+ first_request)
+ self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
+ second_request)
+ self.assertIs(application_return_value.kind,
+ _client_application.Outcome.Kind.RPC_ERROR)
+ self.assertIs(application_return_value.code, code)
+ self.assertEqual(application_return_value.details, details)
+
+ def test_misbehaving_server_unary_unary(self):
+ application_future = self._client_execution_thread_pool.submit(
+ _client_application.run, _client_application.Scenario.UNARY_UNARY,
+ self._fake_time_channel)
+ invocation_metadata, request, rpc = (
+ self._fake_time_channel.take_unary_unary(
+ _application_testing_common.FIRST_SERVICE_UNUN))
+ rpc.send_initial_metadata(())
+ rpc.terminate(_application_common.ERRONEOUS_UNARY_UNARY_RESPONSE, (),
+ grpc.StatusCode.OK, '')
+ application_return_value = application_future.result()
+
+ self.assertEqual(_application_common.UNARY_UNARY_REQUEST, request)
+ self.assertIs(application_return_value.kind,
+ _client_application.Outcome.Kind.UNSATISFACTORY)
+
+ def test_misbehaving_server_stream_stream(self):
+ application_future = self._client_execution_thread_pool.submit(
+ _client_application.run, _client_application.Scenario.STREAM_STREAM,
+ self._real_time_channel)
+ invocation_metadata, rpc = self._real_time_channel.take_stream_stream(
+ _application_testing_common.FIRST_SERVICE_STRESTRE)
+ first_request = rpc.take_request()
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ second_request = rpc.take_request()
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.send_response(_application_common.STREAM_STREAM_RESPONSE)
+ rpc.requests_closed()
+ rpc.terminate((), grpc.StatusCode.OK, '')
+ application_return_value = application_future.result()
+
+ self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
+ first_request)
+ self.assertEqual(_application_common.STREAM_STREAM_REQUEST,
+ second_request)
+ self.assertIs(application_return_value.kind,
+ _client_application.Outcome.Kind.UNSATISFACTORY)
+
+ def test_infinite_request_stream_real_time(self):
+ application_future = self._client_execution_thread_pool.submit(
+ _client_application.run,
+ _client_application.Scenario.INFINITE_REQUEST_STREAM,
+ self._real_time_channel)
+ invocation_metadata, rpc = self._real_time_channel.take_stream_unary(
+ _application_testing_common.FIRST_SERVICE_STREUN)
+ rpc.send_initial_metadata(())
+ first_request = rpc.take_request()
+ second_request = rpc.take_request()
+ third_request = rpc.take_request()
+ self._real_time.sleep_for(
+ _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
+ rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (),
+ grpc.StatusCode.DEADLINE_EXCEEDED, '')
+ application_return_value = application_future.result()
+
+ self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
+ first_request)
+ self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
+ second_request)
+ self.assertEqual(_application_common.STREAM_UNARY_REQUEST,
+ third_request)
+ self.assertIs(application_return_value.kind,
+ _client_application.Outcome.Kind.SATISFACTORY)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/testing/proto/__init__.py b/src/python/grpcio_tests/tests/testing/proto/__init__.py
new file mode 100644
index 0000000000..1e120359cf
--- /dev/null
+++ b/src/python/grpcio_tests/tests/testing/proto/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/python/grpcio_tests/tests/testing/proto/requests.proto b/src/python/grpcio_tests/tests/testing/proto/requests.proto
new file mode 100644
index 0000000000..54a60bff86
--- /dev/null
+++ b/src/python/grpcio_tests/tests/testing/proto/requests.proto
@@ -0,0 +1,29 @@
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package tests_of_grpc_testing;
+
+message Up {
+ int32 first_up_field = 1;
+}
+
+message Charm {
+ int32 first_charm_field = 1;
+}
+
+message Top {
+ int32 first_top_field = 1;
+}
diff --git a/src/python/grpcio_tests/tests/testing/proto/services.proto b/src/python/grpcio_tests/tests/testing/proto/services.proto
new file mode 100644
index 0000000000..cb15c0d1ce
--- /dev/null
+++ b/src/python/grpcio_tests/tests/testing/proto/services.proto
@@ -0,0 +1,42 @@
+// Copyright 2017 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "tests/testing/proto/requests.proto";
+
+package tests_of_grpc_testing;
+
+message Down {
+ int32 first_down_field = 1;
+}
+
+message Strange {
+ int32 first_strange_field = 1;
+}
+
+message Bottom {
+ int32 first_bottom_field = 1;
+}
+
+service FirstService {
+ rpc UnUn(Up) returns (Down);
+ rpc UnStre(Charm) returns (stream Strange);
+ rpc StreUn(stream Charm) returns (Strange);
+ rpc StreStre(stream Top) returns (stream Bottom);
+}
+
+service SecondService {
+ rpc UnStre(Strange) returns (stream Charm);
+}
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index f86eeb76c7..c10719b86f 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -9,6 +9,7 @@
"protoc_plugin._split_definitions_test.SplitSeparateTest",
"protoc_plugin.beta_python_plugin_test.PythonPluginTest",
"reflection._reflection_servicer_test.ReflectionServicerTest",
+ "testing._client_test.ClientTest",
"testing._time_test.StrictFakeTimeTest",
"testing._time_test.StrictRealTimeTest",
"unit._api_test.AllTest",