aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2017-09-05 11:23:38 -0700
committerGravatar GitHub <noreply@github.com>2017-09-05 11:23:38 -0700
commit21b71b41b6716e57a716c5219eecd7651d31931e (patch)
tree14a78c1852584132e4ba948d196431c2e8ac4721
parent96a902fb72727efcd761942238b8da65894221b1 (diff)
parent7c85caeaa8cccf3fbdf237013b809ec74e00ce8c (diff)
Merge pull request #11583 from nathanielmanistaatgoogle/grpc_testing
gRPC Python test infrastructure.
-rw-r--r--src/python/grpcio_testing/grpc_testing/__init__.py289
-rw-r--r--src/python/grpcio_testing/grpc_testing/_common.py68
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/__init__.py20
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_handler.py215
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_rpc.py153
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_server.py149
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_server_rpc.py93
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_service.py88
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py74
-rw-r--r--src/python/grpcio_tests/tests/testing/_server_application.py66
-rw-r--r--src/python/grpcio_tests/tests/testing/_server_test.py169
-rw-r--r--src/python/grpcio_tests/tests/tests.json1
12 files changed, 1385 insertions, 0 deletions
diff --git a/src/python/grpcio_testing/grpc_testing/__init__.py b/src/python/grpcio_testing/grpc_testing/__init__.py
index 14e25f09e2..917e11808e 100644
--- a/src/python/grpcio_testing/grpc_testing/__init__.py
+++ b/src/python/grpcio_testing/grpc_testing/__init__.py
@@ -293,6 +293,278 @@ class Channel(six.with_metaclass(abc.ABCMeta), grpc.Channel):
raise NotImplementedError()
+class UnaryUnaryServerRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a unary-unary RPC serviced by a system under test.
+
+ Enables users to "play client" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Accesses the initial metadata emitted by the system under test.
+
+ This method blocks until the system under test has added initial
+ metadata to the RPC (or has provided one or more response messages or
+ has terminated the RPC, either of which will cause gRPC Python to
+ synthesize initial metadata for the RPC).
+
+ Returns:
+ The initial metadata for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Cancels the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def termination(self):
+ """Blocks until the system under test has terminated the RPC.
+
+ Returns:
+ A (response, trailing_metadata, code, details) sequence with the RPC's
+ response, trailing metadata, code, and details.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamServerRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a unary-stream RPC serviced by a system under test.
+
+ Enables users to "play client" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Accesses the initial metadata emitted by the system under test.
+
+ This method blocks until the system under test has added initial
+ metadata to the RPC (or has provided one or more response messages or
+ has terminated the RPC, either of which will cause gRPC Python to
+ synthesize initial metadata for the RPC).
+
+ Returns:
+ The initial metadata for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_response(self):
+ """Draws one of the responses added to the RPC by the system under test.
+
+ Successive calls to this method return responses in the same order in
+ which the system under test added them to the RPC.
+
+ Returns:
+ A response message added to the RPC by the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Cancels the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def termination(self):
+ """Blocks until the system under test has terminated the RPC.
+
+ Returns:
+ A (trailing_metadata, code, details) sequence with the RPC's trailing
+ metadata, code, and details.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryServerRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a stream-unary RPC serviced by a system under test.
+
+ Enables users to "play client" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Accesses the initial metadata emitted by the system under test.
+
+ This method blocks until the system under test has added initial
+ metadata to the RPC (or has provided one or more response messages or
+ has terminated the RPC, either of which will cause gRPC Python to
+ synthesize initial metadata for the RPC).
+
+ Returns:
+ The initial metadata for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_request(self, request):
+ """Sends a request to the system under test.
+
+ Args:
+ request: A request message for the RPC to be "sent" to the system
+ under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def requests_closed(self):
+ """Indicates the end of the RPC's request stream."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Cancels the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def termination(self):
+ """Blocks until the system under test has terminated the RPC.
+
+ Returns:
+ A (response, trailing_metadata, code, details) sequence with the RPC's
+ response, trailing metadata, code, and details.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamServerRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a stream-stream RPC serviced by a system under test.
+
+ Enables users to "play client" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Accesses the initial metadata emitted by the system under test.
+
+ This method blocks until the system under test has added initial
+ metadata to the RPC (or has provided one or more response messages or
+ has terminated the RPC, either of which will cause gRPC Python to
+ synthesize initial metadata for the RPC).
+
+ Returns:
+ The initial metadata for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_request(self, request):
+ """Sends a request to the system under test.
+
+ Args:
+ request: A request message for the RPC to be "sent" to the system
+ under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def requests_closed(self):
+ """Indicates the end of the RPC's request stream."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_response(self):
+ """Draws one of the responses added to the RPC by the system under test.
+
+ Successive calls to this method return responses in the same order in
+ which the system under test added them to the RPC.
+
+ Returns:
+ A response message added to the RPC by the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Cancels the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def termination(self):
+ """Blocks until the system under test has terminated the RPC.
+
+ Returns:
+ A (trailing_metadata, code, details) sequence with the RPC's trailing
+ metadata, code, and details.
+ """
+ raise NotImplementedError()
+
+
+class Server(six.with_metaclass(abc.ABCMeta)):
+ """A server with which to test a system that services RPCs."""
+
+ @abc.abstractmethod
+ def invoke_unary_unary(
+ self, method_descriptor, invocation_metadata, request, timeout):
+ """Invokes an RPC to be serviced by the system under test.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a unary-unary
+ RPC method.
+ invocation_metadata: The RPC's invocation metadata.
+ request: The RPC's request.
+ timeout: A duration of time in seconds for the RPC or None to
+ indicate that the RPC has no time limit.
+
+ Returns:
+ A UnaryUnaryServerRpc with which to "play client" for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invoke_unary_stream(
+ self, method_descriptor, invocation_metadata, request, timeout):
+ """Invokes an RPC to be serviced by the system under test.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a unary-stream
+ RPC method.
+ invocation_metadata: The RPC's invocation metadata.
+ request: The RPC's request.
+ timeout: A duration of time in seconds for the RPC or None to
+ indicate that the RPC has no time limit.
+
+ Returns:
+ A UnaryStreamServerRpc with which to "play client" for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invoke_stream_unary(
+ self, method_descriptor, invocation_metadata, timeout):
+ """Invokes an RPC to be serviced by the system under test.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a stream-unary
+ RPC method.
+ invocation_metadata: The RPC's invocation metadata.
+ timeout: A duration of time in seconds for the RPC or None to
+ indicate that the RPC has no time limit.
+
+ Returns:
+ A StreamUnaryServerRpc with which to "play client" for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invoke_stream_stream(
+ self, method_descriptor, invocation_metadata, timeout):
+ """Invokes an RPC to be serviced by the system under test.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a stream-stream
+ RPC method.
+ invocation_metadata: The RPC's invocation metadata.
+ timeout: A duration of time in seconds for the RPC or None to
+ indicate that the RPC has no time limit.
+
+ Returns:
+ A StreamStreamServerRpc with which to "play client" for the RPC.
+ """
+ raise NotImplementedError()
+
+
class Time(six.with_metaclass(abc.ABCMeta)):
"""A simulation of time.
@@ -406,3 +678,20 @@ def channel(service_descriptors, time):
"""
from grpc_testing import _channel
return _channel.testing_channel(service_descriptors, time)
+
+
+def server_from_dictionary(descriptors_to_servicers, time):
+ """Creates a Server for use in tests of a gRPC Python-using system.
+
+ Args:
+ descriptors_to_servicers: A dictionary from descriptor.ServiceDescriptors
+ defining RPC services to servicer objects (usually instances of classes
+ that implement "Servicer" interfaces defined in generated "_pb2_grpc"
+ modules) implementing those services.
+ time: A Time to be used for tests.
+
+ Returns:
+ A Server for use in tests.
+ """
+ from grpc_testing import _server
+ return _server.server_from_dictionary(descriptors_to_servicers, time)
diff --git a/src/python/grpcio_testing/grpc_testing/_common.py b/src/python/grpcio_testing/grpc_testing/_common.py
index cb4a7f5fa2..1517434ca7 100644
--- a/src/python/grpcio_testing/grpc_testing/_common.py
+++ b/src/python/grpcio_testing/grpc_testing/_common.py
@@ -37,6 +37,16 @@ def fuss_with_metadata(metadata):
return _fuss(tuple(metadata))
+def rpc_names(service_descriptors):
+ rpc_names_to_descriptors = {}
+ for service_descriptor in service_descriptors:
+ for method_descriptor in service_descriptor.methods_by_name.values():
+ rpc_name = '/{}/{}'.format(
+ service_descriptor.full_name, method_descriptor.name)
+ rpc_names_to_descriptors[rpc_name] = method_descriptor
+ return rpc_names_to_descriptors
+
+
class ChannelRpcRead(
collections.namedtuple(
'ChannelRpcRead',
@@ -90,3 +100,61 @@ class ChannelHandler(six.with_metaclass(abc.ABCMeta)):
self, method_full_rpc_name, invocation_metadata, requests,
requests_closed, timeout):
raise NotImplementedError()
+
+
+class ServerRpcRead(
+ collections.namedtuple('ServerRpcRead',
+ ('request', 'requests_closed', 'terminated',))):
+ pass
+
+
+REQUESTS_CLOSED = ServerRpcRead(None, True, False)
+TERMINATED = ServerRpcRead(None, False, True)
+
+
+class ServerRpcHandler(six.with_metaclass(abc.ABCMeta)):
+
+ @abc.abstractmethod
+ def send_initial_metadata(self, initial_metadata):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_request(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_response(self, response):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_termination(self, trailing_metadata, code, details):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_termination_callback(self, callback):
+ raise NotImplementedError()
+
+
+class Serverish(six.with_metaclass(abc.ABCMeta)):
+
+ @abc.abstractmethod
+ def invoke_unary_unary(
+ self, method_descriptor, handler, invocation_metadata, request,
+ deadline):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invoke_unary_stream(
+ self, method_descriptor, handler, invocation_metadata, request,
+ deadline):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invoke_stream_unary(
+ self, method_descriptor, handler, invocation_metadata, deadline):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invoke_stream_stream(
+ self, method_descriptor, handler, invocation_metadata, deadline):
+ raise NotImplementedError()
diff --git a/src/python/grpcio_testing/grpc_testing/_server/__init__.py b/src/python/grpcio_testing/grpc_testing/_server/__init__.py
new file mode 100644
index 0000000000..759512949a
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/__init__.py
@@ -0,0 +1,20 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from grpc_testing._server import _server
+
+
+def server_from_dictionary(descriptors_to_servicers, time):
+ return _server.server_from_descriptor_to_servicers(
+ descriptors_to_servicers, time)
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_handler.py b/src/python/grpcio_testing/grpc_testing/_server/_handler.py
new file mode 100644
index 0000000000..b47e04c718
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/_handler.py
@@ -0,0 +1,215 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+import threading
+
+import grpc
+from grpc_testing import _common
+
+_CLIENT_INACTIVE = object()
+
+
+class Handler(_common.ServerRpcHandler):
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_request(self, request):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_response(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def requests_closed(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_response_termination(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_response_termination(self):
+ raise NotImplementedError()
+
+
+class _Handler(Handler):
+
+ def __init__(self, requests_closed):
+ self._condition = threading.Condition()
+ self._requests = []
+ self._requests_closed = requests_closed
+ self._initial_metadata = None
+ self._responses = []
+ self._trailing_metadata = None
+ self._code = None
+ self._details = None
+ self._unary_response = None
+ self._expiration_future = None
+ self._termination_callbacks = []
+
+ def send_initial_metadata(self, initial_metadata):
+ with self._condition:
+ self._initial_metadata = initial_metadata
+ self._condition.notify_all()
+
+ def take_request(self):
+ with self._condition:
+ while True:
+ if self._code is None:
+ if self._requests:
+ request = self._requests.pop(0)
+ self._condition.notify_all()
+ return _common.ServerRpcRead(request, False, False)
+ elif self._requests_closed:
+ return _common.REQUESTS_CLOSED
+ else:
+ self._condition.wait()
+ else:
+ return _common.TERMINATED
+
+ def is_active(self):
+ with self._condition:
+ return self._code is None
+
+ def add_response(self, response):
+ with self._condition:
+ self._responses.append(response)
+ self._condition.notify_all()
+
+ def send_termination(self, trailing_metadata, code, details):
+ with self._condition:
+ self._trailing_metadata = trailing_metadata
+ self._code = code
+ self._details = details
+ if self._expiration_future is not None:
+ self._expiration_future.cancel()
+ self._condition.notify_all()
+
+ def add_termination_callback(self, termination_callback):
+ with self._condition:
+ if self._code is None:
+ self._termination_callbacks.append(termination_callback)
+ return True
+ else:
+ return False
+
+ def initial_metadata(self):
+ with self._condition:
+ while True:
+ if self._initial_metadata is None:
+ if self._code is None:
+ self._condition.wait()
+ else:
+ raise ValueError(
+ 'No initial metadata despite status code!')
+ else:
+ return self._initial_metadata
+
+ def add_request(self, request):
+ with self._condition:
+ self._requests.append(request)
+ self._condition.notify_all()
+
+ def take_response(self):
+ with self._condition:
+ while True:
+ if self._responses:
+ response = self._responses.pop(0)
+ self._condition.notify_all()
+ return response
+ elif self._code is None:
+ self._condition.wait()
+ else:
+ raise ValueError('No more responses!')
+
+ def requests_closed(self):
+ with self._condition:
+ self._requests_closed = True
+ self._condition.notify_all()
+
+ def cancel(self):
+ with self._condition:
+ if self._code is None:
+ self._code = _CLIENT_INACTIVE
+ termination_callbacks = self._termination_callbacks
+ self._termination_callbacks = None
+ if self._expiration_future is not None:
+ self._expiration_future.cancel()
+ self._condition.notify_all()
+ for termination_callback in termination_callbacks:
+ termination_callback()
+
+ def unary_response_termination(self):
+ with self._condition:
+ while True:
+ if self._code is _CLIENT_INACTIVE:
+ raise ValueError('Huh? Cancelled but wanting status?')
+ elif self._code is None:
+ self._condition.wait()
+ else:
+ if self._unary_response is None:
+ if self._responses:
+ self._unary_response = self._responses.pop(0)
+ return (
+ self._unary_response, self._trailing_metadata,
+ self._code, self._details,)
+
+
+ def stream_response_termination(self):
+ with self._condition:
+ while True:
+ if self._code is _CLIENT_INACTIVE:
+ raise ValueError('Huh? Cancelled but wanting status?')
+ elif self._code is None:
+ self._condition.wait()
+ else:
+ return self._trailing_metadata, self._code, self._details,
+
+ def expire(self):
+ with self._condition:
+ if self._code is None:
+ if self._initial_metadata is None:
+ self._initial_metadata = _common.FUSSED_EMPTY_METADATA
+ self._trailing_metadata = _common.FUSSED_EMPTY_METADATA
+ self._code = grpc.StatusCode.DEADLINE_EXCEEDED
+ self._details = 'Took too much time!'
+ termination_callbacks = self._termination_callbacks
+ self._termination_callbacks = None
+ self._condition.notify_all()
+ for termination_callback in termination_callbacks:
+ termination_callback()
+
+ def set_expiration_future(self, expiration_future):
+ with self._condition:
+ self._expiration_future = expiration_future
+
+
+def handler_without_deadline(requests_closed):
+ return _Handler(requests_closed)
+
+
+def handler_with_deadline(requests_closed, time, deadline):
+ handler = _Handler(requests_closed)
+ expiration_future = time.call_at(handler.expire, deadline)
+ handler.set_expiration_future(expiration_future)
+ return handler
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_rpc.py b/src/python/grpcio_testing/grpc_testing/_server/_rpc.py
new file mode 100644
index 0000000000..f81876f4b2
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/_rpc.py
@@ -0,0 +1,153 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import threading
+
+import grpc
+from grpc_testing import _common
+
+
+class Rpc(object):
+
+ def __init__(self, handler, invocation_metadata):
+ self._condition = threading.Condition()
+ self._handler = handler
+ self._invocation_metadata = invocation_metadata
+ self._initial_metadata_sent = False
+ self._pending_trailing_metadata = None
+ self._pending_code = None
+ self._pending_details = None
+ self._callbacks = []
+ self._active = True
+ self._rpc_errors = []
+
+ def _ensure_initial_metadata_sent(self):
+ if not self._initial_metadata_sent:
+ self._handler.send_initial_metadata(_common.FUSSED_EMPTY_METADATA)
+ self._initial_metadata_sent = True
+
+ def _call_back(self):
+ callbacks = tuple(self._callbacks)
+ self._callbacks = None
+
+ def call_back():
+ for callback in callbacks:
+ try:
+ callback()
+ except Exception: # pylint: disable=broad-except
+ logging.exception('Exception calling server-side callback!')
+
+ callback_calling_thread = threading.Thread(target=call_back)
+ callback_calling_thread.start()
+
+ def _terminate(self, trailing_metadata, code, details):
+ if self._active:
+ self._active = False
+ self._handler.send_termination(trailing_metadata, code, details)
+ self._call_back()
+ self._condition.notify_all()
+
+ def _complete(self):
+ if self._pending_trailing_metadata is None:
+ trailing_metadata = _common.FUSSED_EMPTY_METADATA
+ else:
+ trailing_metadata = self._pending_trailing_metadata
+ if self._pending_code is None:
+ code = grpc.StatusCode.OK
+ else:
+ code = self._pending_code
+ details = '' if self._pending_details is None else self._pending_details
+ self._terminate(trailing_metadata, code, details)
+
+ def _abort(self, code, details):
+ self._terminate(_common.FUSSED_EMPTY_METADATA, code, details)
+
+ def add_rpc_error(self, rpc_error):
+ with self._condition:
+ self._rpc_errors.append(rpc_error)
+
+ def application_cancel(self):
+ with self._condition:
+ self._abort(
+ grpc.StatusCode.CANCELLED,
+ 'Cancelled by server-side application!')
+
+ def application_exception_abort(self, exception):
+ with self._condition:
+ if exception not in self._rpc_errors:
+ logging.exception('Exception calling application!')
+ self._abort(
+ grpc.StatusCode.UNKNOWN,
+ 'Exception calling application: {}'.format(exception))
+
+ def extrinsic_abort(self):
+ with self._condition:
+ if self._active:
+ self._active = False
+ self._call_back()
+ self._condition.notify_all()
+
+ def unary_response_complete(self, response):
+ with self._condition:
+ self._ensure_initial_metadata_sent()
+ self._handler.add_response(response)
+ self._complete()
+
+ def stream_response(self, response):
+ with self._condition:
+ self._ensure_initial_metadata_sent()
+ self._handler.add_response(response)
+
+ def stream_response_complete(self):
+ with self._condition:
+ self._ensure_initial_metadata_sent()
+ self._complete()
+
+ def send_initial_metadata(self, initial_metadata):
+ with self._condition:
+ if self._initial_metadata_sent:
+ return False
+ else:
+ self._handler.send_initial_metadata(initial_metadata)
+ self._initial_metadata_sent = True
+ return True
+
+ def is_active(self):
+ with self._condition:
+ return self._active
+
+ def add_callback(self, callback):
+ with self._condition:
+ if self._callbacks is None:
+ return False
+ else:
+ self._callbacks.append(callback)
+ return True
+
+ def invocation_metadata(self):
+ with self._condition:
+ return self._invocation_metadata
+
+ def set_trailing_metadata(self, trailing_metadata):
+ with self._condition:
+ self._pending_trailing_metadata = trailing_metadata
+
+ def set_code(self, code):
+ with self._condition:
+ self._pending_code = code
+
+ def set_details(self, details):
+ with self._condition:
+ self._pending_details = details
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_server.py b/src/python/grpcio_testing/grpc_testing/_server/_server.py
new file mode 100644
index 0000000000..66bcfc13c0
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/_server.py
@@ -0,0 +1,149 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+
+import grpc_testing
+from grpc_testing import _common
+from grpc_testing._server import _handler
+from grpc_testing._server import _rpc
+from grpc_testing._server import _server_rpc
+from grpc_testing._server import _service
+from grpc_testing._server import _servicer_context
+
+
+def _implementation(descriptors_to_servicers, method_descriptor):
+ servicer = descriptors_to_servicers[method_descriptor.containing_service]
+ return getattr(servicer, method_descriptor.name)
+
+
+def _unary_unary_service(request):
+ def service(implementation, rpc, servicer_context):
+ _service.unary_unary(
+ implementation, rpc, request, servicer_context)
+ return service
+
+
+def _unary_stream_service(request):
+ def service(implementation, rpc, servicer_context):
+ _service.unary_stream(
+ implementation, rpc, request, servicer_context)
+ return service
+
+
+def _stream_unary_service(handler):
+ def service(implementation, rpc, servicer_context):
+ _service.stream_unary(implementation, rpc, handler, servicer_context)
+ return service
+
+
+def _stream_stream_service(handler):
+ def service(implementation, rpc, servicer_context):
+ _service.stream_stream(implementation, rpc, handler, servicer_context)
+ return service
+
+
+class _Serverish(_common.Serverish):
+
+ def __init__(self, descriptors_to_servicers, time):
+ self._descriptors_to_servicers = descriptors_to_servicers
+ self._time = time
+
+ def _invoke(
+ self, service_behavior, method_descriptor, handler,
+ invocation_metadata, deadline):
+ implementation = _implementation(
+ self._descriptors_to_servicers, method_descriptor)
+ rpc = _rpc.Rpc(handler, invocation_metadata)
+ if handler.add_termination_callback(rpc.extrinsic_abort):
+ servicer_context = _servicer_context.ServicerContext(
+ rpc, self._time, deadline)
+ service_thread = threading.Thread(
+ target=service_behavior,
+ args=(implementation, rpc, servicer_context,))
+ service_thread.start()
+
+ def invoke_unary_unary(
+ self, method_descriptor, handler, invocation_metadata, request,
+ deadline):
+ self._invoke(
+ _unary_unary_service(request), method_descriptor, handler,
+ invocation_metadata, deadline)
+
+ def invoke_unary_stream(
+ self, method_descriptor, handler, invocation_metadata, request,
+ deadline):
+ self._invoke(
+ _unary_stream_service(request), method_descriptor, handler,
+ invocation_metadata, deadline)
+
+ def invoke_stream_unary(
+ self, method_descriptor, handler, invocation_metadata, deadline):
+ self._invoke(
+ _stream_unary_service(handler), method_descriptor, handler,
+ invocation_metadata, deadline)
+
+ def invoke_stream_stream(
+ self, method_descriptor, handler, invocation_metadata, deadline):
+ self._invoke(
+ _stream_stream_service(handler), method_descriptor, handler,
+ invocation_metadata, deadline)
+
+
+def _deadline_and_handler(requests_closed, time, timeout):
+ if timeout is None:
+ return None, _handler.handler_without_deadline(requests_closed)
+ else:
+ deadline = time.time() + timeout
+ handler = _handler.handler_with_deadline(requests_closed, time, deadline)
+ return deadline, handler
+
+
+class _Server(grpc_testing.Server):
+
+ def __init__(self, serverish, time):
+ self._serverish = serverish
+ self._time = time
+
+ def invoke_unary_unary(
+ self, method_descriptor, invocation_metadata, request, timeout):
+ deadline, handler = _deadline_and_handler(True, self._time, timeout)
+ self._serverish.invoke_unary_unary(
+ method_descriptor, handler, invocation_metadata, request, deadline)
+ return _server_rpc.UnaryUnaryServerRpc(handler)
+
+ def invoke_unary_stream(
+ self, method_descriptor, invocation_metadata, request, timeout):
+ deadline, handler = _deadline_and_handler(True, self._time, timeout)
+ self._serverish.invoke_unary_stream(
+ method_descriptor, handler, invocation_metadata, request, deadline)
+ return _server_rpc.UnaryStreamServerRpc(handler)
+
+ def invoke_stream_unary(
+ self, method_descriptor, invocation_metadata, timeout):
+ deadline, handler = _deadline_and_handler(False, self._time, timeout)
+ self._serverish.invoke_stream_unary(
+ method_descriptor, handler, invocation_metadata, deadline)
+ return _server_rpc.StreamUnaryServerRpc(handler)
+
+ def invoke_stream_stream(
+ self, method_descriptor, invocation_metadata, timeout):
+ deadline, handler = _deadline_and_handler(False, self._time, timeout)
+ self._serverish.invoke_stream_stream(
+ method_descriptor, handler, invocation_metadata, deadline)
+ return _server_rpc.StreamStreamServerRpc(handler)
+
+
+def server_from_descriptor_to_servicers(descriptors_to_servicers, time):
+ return _Server(_Serverish(descriptors_to_servicers, time), time)
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_server_rpc.py b/src/python/grpcio_testing/grpc_testing/_server/_server_rpc.py
new file mode 100644
index 0000000000..30de8ff0e2
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/_server_rpc.py
@@ -0,0 +1,93 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import grpc_testing
+
+
+class UnaryUnaryServerRpc(grpc_testing.UnaryUnaryServerRpc):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def initial_metadata(self):
+ return self._handler.initial_metadata()
+
+ def cancel(self):
+ self._handler.cancel()
+
+ def termination(self):
+ return self._handler.unary_response_termination()
+
+
+class UnaryStreamServerRpc(grpc_testing.UnaryStreamServerRpc):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def initial_metadata(self):
+ return self._handler.initial_metadata()
+
+ def take_response(self):
+ return self._handler.take_response()
+
+ def cancel(self):
+ self._handler.cancel()
+
+ def termination(self):
+ return self._handler.stream_response_termination()
+
+
+class StreamUnaryServerRpc(grpc_testing.StreamUnaryServerRpc):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def initial_metadata(self):
+ return self._handler.initial_metadata()
+
+ def send_request(self, request):
+ self._handler.add_request(request)
+
+ def requests_closed(self):
+ self._handler.requests_closed()
+
+ def cancel(self):
+ self._handler.cancel()
+
+ def termination(self):
+ return self._handler.unary_response_termination()
+
+
+class StreamStreamServerRpc(grpc_testing.StreamStreamServerRpc):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def initial_metadata(self):
+ return self._handler.initial_metadata()
+
+ def send_request(self, request):
+ self._handler.add_request(request)
+
+ def requests_closed(self):
+ self._handler.requests_closed()
+
+ def take_response(self):
+ return self._handler.take_response()
+
+ def cancel(self):
+ self._handler.cancel()
+
+ def termination(self):
+ return self._handler.stream_response_termination()
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_service.py b/src/python/grpcio_testing/grpc_testing/_server/_service.py
new file mode 100644
index 0000000000..36b0a2f7ff
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/_service.py
@@ -0,0 +1,88 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import grpc
+
+
+class _RequestIterator(object):
+
+ def __init__(self, rpc, handler):
+ self._rpc = rpc
+ self._handler = handler
+
+ def _next(self):
+ read = self._handler.take_request()
+ if read.requests_closed:
+ raise StopIteration()
+ elif read.terminated:
+ rpc_error = grpc.RpcError()
+ self._rpc.add_rpc_error(rpc_error)
+ raise rpc_error
+ else:
+ return read.request
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return self._next()
+
+ def next(self):
+ return self._next()
+
+
+def _unary_response(argument, implementation, rpc, servicer_context):
+ try:
+ response = implementation(argument, servicer_context)
+ except Exception as exception: # pylint: disable=broad-except
+ rpc.application_exception_abort(exception)
+ else:
+ rpc.unary_response_complete(response)
+
+
+def _stream_response(argument, implementation, rpc, servicer_context):
+ try:
+ response_iterator = implementation(argument, servicer_context)
+ except Exception as exception: # pylint: disable=broad-except
+ rpc.application_exception_abort(exception)
+ else:
+ while True:
+ try:
+ response = next(response_iterator)
+ except StopIteration:
+ rpc.stream_response_complete()
+ break
+ except Exception as exception: # pylint: disable=broad-except
+ rpc.application_exception_abort(exception)
+ break
+ else:
+ rpc.stream_response(response)
+
+
+def unary_unary(implementation, rpc, request, servicer_context):
+ _unary_response(request, implementation, rpc, servicer_context)
+
+
+def unary_stream(implementation, rpc, request, servicer_context):
+ _stream_response(request, implementation, rpc, servicer_context)
+
+
+def stream_unary(implementation, rpc, handler, servicer_context):
+ _unary_response(
+ _RequestIterator(rpc, handler), implementation, rpc, servicer_context)
+
+
+def stream_stream(implementation, rpc, handler, servicer_context):
+ _stream_response(
+ _RequestIterator(rpc, handler), implementation, rpc, servicer_context)
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py b/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py
new file mode 100644
index 0000000000..496689ded0
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py
@@ -0,0 +1,74 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import grpc
+from grpc_testing import _common
+
+
+class ServicerContext(grpc.ServicerContext):
+
+ def __init__(self, rpc, time, deadline):
+ self._rpc = rpc
+ self._time = time
+ self._deadline = deadline
+
+ def is_active(self):
+ return self._rpc.is_active()
+
+ def time_remaining(self):
+ if self._rpc.is_active():
+ if self._deadline is None:
+ return None
+ else:
+ return max(0.0, self._deadline - self._time.time())
+ else:
+ return 0.0
+
+ def cancel(self):
+ self._rpc.application_cancel()
+
+ def add_callback(self, callback):
+ return self._rpc.add_callback(callback)
+
+ def invocation_metadata(self):
+ return self._rpc.invocation_metadata()
+
+ def peer(self):
+ raise NotImplementedError()
+
+ def peer_identities(self):
+ raise NotImplementedError()
+
+ def peer_identity_key(self):
+ raise NotImplementedError()
+
+ def auth_context(self):
+ raise NotImplementedError()
+
+ def send_initial_metadata(self, initial_metadata):
+ initial_metadata_sent = self._rpc.send_initial_metadata(
+ _common.fuss_with_metadata(initial_metadata))
+ if not initial_metadata_sent:
+ raise ValueError(
+ 'ServicerContext.send_initial_metadata called too late!')
+
+ def set_trailing_metadata(self, trailing_metadata):
+ self._rpc.set_trailing_metadata(
+ _common.fuss_with_metadata(trailing_metadata))
+
+ def set_code(self, code):
+ self._rpc.set_code(code)
+
+ def set_details(self, details):
+ self._rpc.set_details(details)
diff --git a/src/python/grpcio_tests/tests/testing/_server_application.py b/src/python/grpcio_tests/tests/testing/_server_application.py
new file mode 100644
index 0000000000..06f09c8cb4
--- /dev/null
+++ b/src/python/grpcio_tests/tests/testing/_server_application.py
@@ -0,0 +1,66 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""An example gRPC Python-using server-side application."""
+
+import grpc
+
+# requests_pb2 is a semantic dependency of this module.
+from tests.testing import _application_common
+from tests.testing.proto import requests_pb2 # pylint: disable=unused-import
+from tests.testing.proto import services_pb2
+from tests.testing.proto import services_pb2_grpc
+
+
+class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer):
+ """Services RPCs."""
+
+ def UnUn(self, request, context):
+ if _application_common.UNARY_UNARY_REQUEST == request:
+ return _application_common.UNARY_UNARY_RESPONSE
+ else:
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ context.set_details('Something is wrong with your request!')
+ return services_pb2.Down()
+
+ def UnStre(self, request, context):
+ if _application_common.UNARY_STREAM_REQUEST != request:
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ context.set_details('Something is wrong with your request!')
+ return
+ yield services_pb2.Strange()
+
+ def StreUn(self, request_iterator, context):
+ context.send_initial_metadata((
+ ('server_application_metadata_key', 'Hi there!',),))
+ for request in request_iterator:
+ if request != _application_common.STREAM_UNARY_REQUEST:
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ context.set_details('Something is wrong with your request!')
+ return services_pb2.Strange()
+ elif not context.is_active():
+ return services_pb2.Strange()
+ else:
+ return _application_common.STREAM_UNARY_RESPONSE
+
+ def StreStre(self, request_iterator, context):
+ for request in request_iterator:
+ if request != _application_common.STREAM_STREAM_REQUEST:
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ context.set_details('Something is wrong with your request!')
+ return
+ elif not context.is_active():
+ return
+ else:
+ yield _application_common.STREAM_STREAM_RESPONSE
+ yield _application_common.STREAM_STREAM_RESPONSE
diff --git a/src/python/grpcio_tests/tests/testing/_server_test.py b/src/python/grpcio_tests/tests/testing/_server_test.py
new file mode 100644
index 0000000000..7897bcce01
--- /dev/null
+++ b/src/python/grpcio_tests/tests/testing/_server_test.py
@@ -0,0 +1,169 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import unittest
+
+import grpc
+import grpc_testing
+
+from tests.testing import _application_common
+from tests.testing import _application_testing_common
+from tests.testing import _server_application
+from tests.testing.proto import services_pb2
+
+
+# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip.
+@unittest.skipIf(
+ services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None,
+ 'Fix protobuf issue 3452!')
+class FirstServiceServicerTest(unittest.TestCase):
+
+ def setUp(self):
+ self._real_time = grpc_testing.strict_real_time()
+ self._fake_time = grpc_testing.strict_fake_time(time.time())
+ servicer = _server_application.FirstServiceServicer()
+ descriptors_to_servicers = {
+ _application_testing_common.FIRST_SERVICE: servicer
+ }
+ self._real_time_server = grpc_testing.server_from_dictionary(
+ descriptors_to_servicers, self._real_time)
+ self._fake_time_server = grpc_testing.server_from_dictionary(
+ descriptors_to_servicers, self._fake_time)
+
+ def test_successful_unary_unary(self):
+ rpc = self._real_time_server.invoke_unary_unary(
+ _application_testing_common.FIRST_SERVICE_UNUN, (),
+ _application_common.UNARY_UNARY_REQUEST, None)
+ initial_metadata = rpc.initial_metadata()
+ response, trailing_metadata, code, details = rpc.termination()
+
+ self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response)
+ self.assertIs(code, grpc.StatusCode.OK)
+
+ def test_successful_unary_stream(self):
+ rpc = self._real_time_server.invoke_unary_stream(
+ _application_testing_common.FIRST_SERVICE_UNSTRE, (),
+ _application_common.UNARY_STREAM_REQUEST, None)
+ initial_metadata = rpc.initial_metadata()
+ trailing_metadata, code, details = rpc.termination()
+
+ self.assertIs(code, grpc.StatusCode.OK)
+
+ def test_successful_stream_unary(self):
+ rpc = self._real_time_server.invoke_stream_unary(
+ _application_testing_common.FIRST_SERVICE_STREUN, (), None)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.requests_closed()
+ initial_metadata = rpc.initial_metadata()
+ response, trailing_metadata, code, details = rpc.termination()
+
+ self.assertEqual(_application_common.STREAM_UNARY_RESPONSE, response)
+ self.assertIs(code, grpc.StatusCode.OK)
+
+ def test_successful_stream_stream(self):
+ rpc = self._real_time_server.invoke_stream_stream(
+ _application_testing_common.FIRST_SERVICE_STRESTRE, (), None)
+ rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
+ initial_metadata = rpc.initial_metadata()
+ responses = [
+ rpc.take_response(),
+ rpc.take_response(),
+ ]
+ rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
+ rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
+ responses.extend([
+ rpc.take_response(),
+ rpc.take_response(),
+ rpc.take_response(),
+ rpc.take_response(),
+ ])
+ rpc.requests_closed()
+ trailing_metadata, code, details = rpc.termination()
+
+ for response in responses:
+ self.assertEqual(_application_common.STREAM_STREAM_RESPONSE,
+ response)
+ self.assertIs(code, grpc.StatusCode.OK)
+
+ def test_server_rpc_idempotence(self):
+ rpc = self._real_time_server.invoke_unary_unary(
+ _application_testing_common.FIRST_SERVICE_UNUN, (),
+ _application_common.UNARY_UNARY_REQUEST, None)
+ first_initial_metadata = rpc.initial_metadata()
+ second_initial_metadata = rpc.initial_metadata()
+ third_initial_metadata = rpc.initial_metadata()
+ first_termination = rpc.termination()
+ second_termination = rpc.termination()
+ third_termination = rpc.termination()
+
+ for later_initial_metadata in (second_initial_metadata,
+ third_initial_metadata,):
+ self.assertEqual(first_initial_metadata, later_initial_metadata)
+ response = first_termination[0]
+ terminal_metadata = first_termination[1]
+ code = first_termination[2]
+ details = first_termination[3]
+ for later_termination in (second_termination, third_termination,):
+ self.assertEqual(response, later_termination[0])
+ self.assertEqual(terminal_metadata, later_termination[1])
+ self.assertIs(code, later_termination[2])
+ self.assertEqual(details, later_termination[3])
+ self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response)
+ self.assertIs(code, grpc.StatusCode.OK)
+
+ def test_misbehaving_client_unary_unary(self):
+ rpc = self._real_time_server.invoke_unary_unary(
+ _application_testing_common.FIRST_SERVICE_UNUN, (),
+ _application_common.ERRONEOUS_UNARY_UNARY_REQUEST, None)
+ initial_metadata = rpc.initial_metadata()
+ response, trailing_metadata, code, details = rpc.termination()
+
+ self.assertIsNot(code, grpc.StatusCode.OK)
+
+ def test_infinite_request_stream_real_time(self):
+ rpc = self._real_time_server.invoke_stream_unary(
+ _application_testing_common.FIRST_SERVICE_STREUN, (),
+ _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ initial_metadata = rpc.initial_metadata()
+ self._real_time.sleep_for(
+ _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ response, trailing_metadata, code, details = rpc.termination()
+
+ self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
+
+ def test_infinite_request_stream_fake_time(self):
+ rpc = self._fake_time_server.invoke_stream_unary(
+ _application_testing_common.FIRST_SERVICE_STREUN, (),
+ _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ initial_metadata = rpc.initial_metadata()
+ self._fake_time.sleep_for(
+ _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ response, trailing_metadata, code, details = rpc.termination()
+
+ self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index c10719b86f..d61297b918 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -10,6 +10,7 @@
"protoc_plugin.beta_python_plugin_test.PythonPluginTest",
"reflection._reflection_servicer_test.ReflectionServicerTest",
"testing._client_test.ClientTest",
+ "testing._server_test.FirstServiceServicerTest",
"testing._time_test.StrictFakeTimeTest",
"testing._time_test.StrictRealTimeTest",
"unit._api_test.AllTest",