aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_test
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_test')
-rw-r--r--src/python/grpcio_test/grpc_interop/_interop_test_case.py3
-rw-r--r--src/python/grpcio_test/grpc_interop/methods.py23
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/__init__.py30
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py541
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/test.proto139
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_low_test.py17
-rw-r--r--src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py34
-rw-r--r--src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py64
-rw-r--r--src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py45
-rw-r--r--src/python/grpcio_test/setup.py11
10 files changed, 836 insertions, 71 deletions
diff --git a/src/python/grpcio_test/grpc_interop/_interop_test_case.py b/src/python/grpcio_test/grpc_interop/_interop_test_case.py
index ed8f7ef009..b6d06b300d 100644
--- a/src/python/grpcio_test/grpc_interop/_interop_test_case.py
+++ b/src/python/grpcio_test/grpc_interop/_interop_test_case.py
@@ -59,3 +59,6 @@ class InteropTestCase(object):
def testCancelAfterFirstResponse(self):
methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability(self.stub, None)
+
+ def testTimeoutOnSleepingServer(self):
+ methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER.test_interoperability(self.stub, None)
diff --git a/src/python/grpcio_test/grpc_interop/methods.py b/src/python/grpcio_test/grpc_interop/methods.py
index f4c94685ee..7a831f3cbd 100644
--- a/src/python/grpcio_test/grpc_interop/methods.py
+++ b/src/python/grpcio_test/grpc_interop/methods.py
@@ -33,10 +33,12 @@ import enum
import json
import os
import threading
+import time
from oauth2client import client as oauth2client_client
from grpc.framework.alpha import utilities
+from grpc.framework.alpha import exceptions
from grpc_interop import empty_pb2
from grpc_interop import messages_pb2
@@ -318,6 +320,24 @@ def _cancel_after_first_response(stub):
raise ValueError('expected call to be cancelled')
+def _timeout_on_sleeping_server(stub):
+ request_payload_size = 27182
+ with stub, _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe, 0.001)
+
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ payload=messages_pb2.Payload(body=b'\x00' * request_payload_size))
+ pipe.add(request)
+ time.sleep(0.1)
+ try:
+ next(response_iterator)
+ except exceptions.ExpirationError:
+ pass
+ else:
+ raise ValueError('expected call to exceed deadline')
+
+
def _compute_engine_creds(stub, args):
response = _large_unary_common_behavior(stub, True, True)
if args.default_service_account != response.username:
@@ -351,6 +371,7 @@ class TestCase(enum.Enum):
CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response'
COMPUTE_ENGINE_CREDS = 'compute_engine_creds'
SERVICE_ACCOUNT_CREDS = 'service_account_creds'
+ TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
def test_interoperability(self, stub, args):
if self is TestCase.EMPTY_UNARY:
@@ -367,6 +388,8 @@ class TestCase(enum.Enum):
_cancel_after_begin(stub)
elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE:
_cancel_after_first_response(stub)
+ elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER:
+ _timeout_on_sleeping_server(stub)
elif self is TestCase.COMPUTE_ENGINE_CREDS:
_compute_engine_creds(stub, args)
elif self is TestCase.SERVICE_ACCOUNT_CREDS:
diff --git a/src/python/grpcio_test/grpc_protoc_plugin/__init__.py b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py
new file mode 100644
index 0000000000..b200d129a9
--- /dev/null
+++ b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py
@@ -0,0 +1,541 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import argparse
+import contextlib
+import distutils.spawn
+import errno
+import itertools
+import os
+import pkg_resources
+import shutil
+import subprocess
+import sys
+import tempfile
+import threading
+import time
+import unittest
+
+from grpc.framework.alpha import exceptions
+from grpc.framework.foundation import future
+
+# Identifiers of entities we expect to find in the generated module.
+SERVICER_IDENTIFIER = 'EarlyAdopterTestServiceServicer'
+SERVER_IDENTIFIER = 'EarlyAdopterTestServiceServer'
+STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub'
+SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server'
+STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub'
+
+# The timeout used in tests of RPCs that are supposed to expire.
+SHORT_TIMEOUT = 2
+# The timeout used in tests of RPCs that are not supposed to expire. The
+# absurdly large value doesn't matter since no passing execution of this test
+# module will ever wait the duration.
+LONG_TIMEOUT = 600
+NO_DELAY = 0
+
+
+class _ServicerMethods(object):
+
+ def __init__(self, test_pb2, delay):
+ self._condition = threading.Condition()
+ self._delay = delay
+ self._paused = False
+ self._fail = False
+ self._test_pb2 = test_pb2
+
+ @contextlib.contextmanager
+ def pause(self): # pylint: disable=invalid-name
+ with self._condition:
+ self._paused = True
+ yield
+ with self._condition:
+ self._paused = False
+ self._condition.notify_all()
+
+ @contextlib.contextmanager
+ def fail(self): # pylint: disable=invalid-name
+ with self._condition:
+ self._fail = True
+ yield
+ with self._condition:
+ self._fail = False
+
+ def _control(self): # pylint: disable=invalid-name
+ with self._condition:
+ if self._fail:
+ raise ValueError()
+ while self._paused:
+ self._condition.wait()
+ time.sleep(self._delay)
+
+ def UnaryCall(self, request, unused_rpc_context):
+ response = self._test_pb2.SimpleResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response.payload.payload_compressable = 'a' * request.response_size
+ self._control()
+ return response
+
+ def StreamingOutputCall(self, request, unused_rpc_context):
+ for parameter in request.response_parameters:
+ response = self._test_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response.payload.payload_compressable = 'a' * parameter.size
+ self._control()
+ yield response
+
+ def StreamingInputCall(self, request_iter, unused_rpc_context):
+ response = self._test_pb2.StreamingInputCallResponse()
+ aggregated_payload_size = 0
+ for request in request_iter:
+ aggregated_payload_size += len(request.payload.payload_compressable)
+ response.aggregated_payload_size = aggregated_payload_size
+ self._control()
+ return response
+
+ def FullDuplexCall(self, request_iter, unused_rpc_context):
+ for request in request_iter:
+ for parameter in request.response_parameters:
+ response = self._test_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response.payload.payload_compressable = 'a' * parameter.size
+ self._control()
+ yield response
+
+ def HalfDuplexCall(self, request_iter, unused_rpc_context):
+ responses = []
+ for request in request_iter:
+ for parameter in request.response_parameters:
+ response = self._test_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response.payload.payload_compressable = 'a' * parameter.size
+ self._control()
+ responses.append(response)
+ for response in responses:
+ yield response
+
+
+@contextlib.contextmanager
+def _CreateService(test_pb2, delay):
+ """Provides a servicer backend and a stub.
+
+ The servicer is just the implementation
+ of the actual servicer passed to the face player of the python RPC
+ implementation; the two are detached.
+
+ Non-zero delay puts a delay on each call to the servicer, representative of
+ communication latency. Timeout is the default timeout for the stub while
+ waiting for the service.
+
+ Args:
+ test_pb2: The test_pb2 module generated by this test.
+ delay: Delay in seconds per response from the servicer.
+
+ Yields:
+ A (servicer_methods, servicer, stub) three-tuple where servicer_methods is
+ the back-end of the service bound to the stub and the server and stub
+ are both activated and ready for use.
+ """
+ servicer_methods = _ServicerMethods(test_pb2, delay)
+
+ class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
+
+ def UnaryCall(self, request, context):
+ return servicer_methods.UnaryCall(request, context)
+
+ def StreamingOutputCall(self, request, context):
+ return servicer_methods.StreamingOutputCall(request, context)
+
+ def StreamingInputCall(self, request_iter, context):
+ return servicer_methods.StreamingInputCall(request_iter, context)
+
+ def FullDuplexCall(self, request_iter, context):
+ return servicer_methods.FullDuplexCall(request_iter, context)
+
+ def HalfDuplexCall(self, request_iter, context):
+ return servicer_methods.HalfDuplexCall(request_iter, context)
+
+ servicer = Servicer()
+ server = getattr(
+ test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer, 0)
+ with server:
+ port = server.port()
+ stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)('localhost', port)
+ with stub:
+ yield servicer_methods, stub, server
+
+
+def _streaming_input_request_iterator(test_pb2):
+ for _ in range(3):
+ request = test_pb2.StreamingInputCallRequest()
+ request.payload.payload_type = test_pb2.COMPRESSABLE
+ request.payload.payload_compressable = 'a'
+ yield request
+
+
+def _streaming_output_request(test_pb2):
+ request = test_pb2.StreamingOutputCallRequest()
+ sizes = [1, 2, 3]
+ request.response_parameters.add(size=sizes[0], interval_us=0)
+ request.response_parameters.add(size=sizes[1], interval_us=0)
+ request.response_parameters.add(size=sizes[2], interval_us=0)
+ return request
+
+
+def _full_duplex_request_iterator(test_pb2):
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=1, interval_us=0)
+ yield request
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=2, interval_us=0)
+ request.response_parameters.add(size=3, interval_us=0)
+ yield request
+
+
+class PythonPluginTest(unittest.TestCase):
+ """Test case for the gRPC Python protoc-plugin.
+
+ While reading these tests, remember that the futures API
+ (`stub.method.async()`) only gives futures for the *non-streaming* responses,
+ else it behaves like its blocking cousin.
+ """
+
+ def setUp(self):
+ # Assume that the appropriate protoc and grpc_python_plugins are on the
+ # path.
+ protoc_command = 'protoc'
+ protoc_plugin_filename = distutils.spawn.find_executable(
+ 'grpc_python_plugin')
+ test_proto_filename = pkg_resources.resource_filename(
+ 'grpc_protoc_plugin', 'test.proto')
+ if not os.path.isfile(protoc_command):
+ # Assume that if we haven't built protoc that it's on the system.
+ protoc_command = 'protoc'
+
+ # Ensure that the output directory exists.
+ self.outdir = tempfile.mkdtemp()
+
+ # Invoke protoc with the plugin.
+ cmd = [
+ protoc_command,
+ '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
+ '-I .',
+ '--python_out=%s' % self.outdir,
+ '--python-grpc_out=%s' % self.outdir,
+ os.path.basename(test_proto_filename),
+ ]
+ subprocess.check_call(' '.join(cmd), shell=True, env=os.environ,
+ cwd=os.path.dirname(test_proto_filename))
+ sys.path.append(self.outdir)
+
+ def tearDown(self):
+ try:
+ shutil.rmtree(self.outdir)
+ except OSError as exc:
+ if exc.errno != errno.ENOENT:
+ raise
+
+ # TODO(atash): Figure out which of these tests is hanging flakily with small
+ # probability.
+
+ def testImportAttributes(self):
+ # check that we can access the generated module and its members.
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(test_pb2, SERVER_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None))
+
+ def testUpDown(self):
+ import test_pb2
+ with _CreateService(
+ test_pb2, NO_DELAY) as (servicer, stub, unused_server):
+ request = test_pb2.SimpleRequest(response_size=13)
+
+ def testUnaryCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
+ request = test_pb2.SimpleRequest(response_size=13)
+ response = stub.UnaryCall(request, timeout)
+ expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
+ self.assertEqual(expected_response, response)
+
+ def testUnaryCallAsync(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ # Check that the call does not block waiting for the server to respond.
+ with methods.pause():
+ response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
+ response = response_future.result()
+ expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
+ self.assertEqual(expected_response, response)
+
+ def testUnaryCallAsyncExpired(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ request = test_pb2.SimpleRequest(response_size=13)
+ with methods.pause():
+ response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT)
+ with self.assertRaises(exceptions.ExpirationError):
+ response_future.result()
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testUnaryCallAsyncCancelled(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ response_future = stub.UnaryCall.async(request, 1)
+ response_future.cancel()
+ self.assertTrue(response_future.cancelled())
+
+ def testUnaryCallAsyncFailed(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
+ self.assertIsNotNone(response_future.exception())
+
+ def testStreamingOutputCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ responses = stub.StreamingOutputCall(request, LONG_TIMEOUT)
+ expected_responses = methods.StreamingOutputCall(
+ request, 'not a real RpcContext!')
+ for expected_response, response in itertools.izip_longest(
+ expected_responses, responses):
+ self.assertEqual(expected_response, response)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testStreamingOutputCallExpired(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
+ with self.assertRaises(exceptions.ExpirationError):
+ list(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testStreamingOutputCallCancelled(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ unused_methods, stub, unused_server):
+ responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
+ next(responses)
+ responses.cancel()
+ with self.assertRaises(future.CancelledError):
+ next(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this times out '
+ 'instead of raising the proper error.')
+ def testStreamingOutputCallFailed(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ responses = stub.StreamingOutputCall(request, 1)
+ self.assertIsNotNone(responses)
+ with self.assertRaises(exceptions.ServicerError):
+ next(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testStreamingInputCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ response = stub.StreamingInputCall(
+ _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
+ expected_response = methods.StreamingInputCall(
+ _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+ self.assertEqual(expected_response, response)
+
+ def testStreamingInputCallAsync(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
+ response = response_future.result()
+ expected_response = methods.StreamingInputCall(
+ _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+ self.assertEqual(expected_response, response)
+
+ def testStreamingInputCallAsyncExpired(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
+ with self.assertRaises(exceptions.ExpirationError):
+ response_future.result()
+ self.assertIsInstance(
+ response_future.exception(), exceptions.ExpirationError)
+
+ def testStreamingInputCallAsyncCancelled(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), timeout)
+ response_future.cancel()
+ self.assertTrue(response_future.cancelled())
+ with self.assertRaises(future.CancelledError):
+ response_future.result()
+
+ def testStreamingInputCallAsyncFailed(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
+ self.assertIsNotNone(response_future.exception())
+
+ def testFullDuplexCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ responses = stub.FullDuplexCall(
+ _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT)
+ expected_responses = methods.FullDuplexCall(
+ _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
+ for expected_response, response in itertools.izip_longest(
+ expected_responses, responses):
+ self.assertEqual(expected_response, response)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testFullDuplexCallExpired(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request_iterator = _full_duplex_request_iterator(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT)
+ with self.assertRaises(exceptions.ExpirationError):
+ list(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testFullDuplexCallCancelled(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ request_iterator = _full_duplex_request_iterator(test_pb2)
+ responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
+ next(responses)
+ responses.cancel()
+ with self.assertRaises(future.CancelledError):
+ next(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this hangs forever '
+ 'and fix.')
+ def testFullDuplexCallFailed(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request_iterator = _full_duplex_request_iterator(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
+ self.assertIsNotNone(responses)
+ with self.assertRaises(exceptions.ServicerError):
+ next(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testHalfDuplexCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ def half_duplex_request_iterator():
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=1, interval_us=0)
+ yield request
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=2, interval_us=0)
+ request.response_parameters.add(size=3, interval_us=0)
+ yield request
+ responses = stub.HalfDuplexCall(
+ half_duplex_request_iterator(), LONG_TIMEOUT)
+ expected_responses = methods.HalfDuplexCall(
+ half_duplex_request_iterator(), 'not a real RpcContext!')
+ for check in itertools.izip_longest(expected_responses, responses):
+ expected_response, response = check
+ self.assertEqual(expected_response, response)
+
+ def testHalfDuplexCallWedged(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ condition = threading.Condition()
+ wait_cell = [False]
+ @contextlib.contextmanager
+ def wait(): # pylint: disable=invalid-name
+ # Where's Python 3's 'nonlocal' statement when you need it?
+ with condition:
+ wait_cell[0] = True
+ yield
+ with condition:
+ wait_cell[0] = False
+ condition.notify_all()
+ def half_duplex_request_iterator():
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=1, interval_us=0)
+ yield request
+ with condition:
+ while wait_cell[0]:
+ condition.wait()
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ with wait():
+ responses = stub.HalfDuplexCall(
+ half_duplex_request_iterator(), SHORT_TIMEOUT)
+ # half-duplex waits for the client to send all info
+ with self.assertRaises(exceptions.ExpirationError):
+ next(responses)
+
+
+if __name__ == '__main__':
+ os.chdir(os.path.dirname(sys.argv[0]))
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_protoc_plugin/test.proto b/src/python/grpcio_test/grpc_protoc_plugin/test.proto
new file mode 100644
index 0000000000..ed7c6a7b79
--- /dev/null
+++ b/src/python/grpcio_test/grpc_protoc_plugin/test.proto
@@ -0,0 +1,139 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+// An integration test service that covers all the method signature permutations
+// of unary/streaming requests/responses.
+// This file is duplicated around the code base. See GitHub issue #526.
+syntax = "proto2";
+
+package grpc.testing;
+
+enum PayloadType {
+ // Compressable text format.
+ COMPRESSABLE= 1;
+
+ // Uncompressable binary format.
+ UNCOMPRESSABLE = 2;
+
+ // Randomly chosen from all other formats defined in this enum.
+ RANDOM = 3;
+}
+
+message Payload {
+ required PayloadType payload_type = 1;
+ oneof payload_body {
+ string payload_compressable = 2;
+ bytes payload_uncompressable = 3;
+ }
+}
+
+message SimpleRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, server randomly chooses one from other formats.
+ optional PayloadType response_type = 1 [default=COMPRESSABLE];
+
+ // Desired payload size in the response from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ optional int32 response_size = 2;
+
+ // Optional input payload sent along with the request.
+ optional Payload payload = 3;
+}
+
+message SimpleResponse {
+ optional Payload payload = 1;
+}
+
+message StreamingInputCallRequest {
+ // Optional input payload sent along with the request.
+ optional Payload payload = 1;
+
+ // Not expecting any payload from the response.
+}
+
+message StreamingInputCallResponse {
+ // Aggregated size of payloads received from the client.
+ optional int32 aggregated_payload_size = 1;
+}
+
+message ResponseParameters {
+ // Desired payload sizes in responses from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ required int32 size = 1;
+
+ // Desired interval between consecutive responses in the response stream in
+ // microseconds.
+ required int32 interval_us = 2;
+}
+
+message StreamingOutputCallRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, the payload from each response in the stream
+ // might be of different types. This is to simulate a mixed type of payload
+ // stream.
+ optional PayloadType response_type = 1 [default=COMPRESSABLE];
+
+ repeated ResponseParameters response_parameters = 2;
+
+ // Optional input payload sent along with the request.
+ optional Payload payload = 3;
+}
+
+message StreamingOutputCallResponse {
+ optional Payload payload = 1;
+}
+
+service TestService {
+ // One request followed by one response.
+ // The server returns the client payload as-is.
+ rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
+
+ // One request followed by a sequence of responses (streamed download).
+ // The server returns the payload with client desired type and sizes.
+ rpc StreamingOutputCall(StreamingOutputCallRequest)
+ returns (stream StreamingOutputCallResponse);
+
+ // A sequence of requests followed by one response (streamed upload).
+ // The server returns the aggregated size of client payload as the result.
+ rpc StreamingInputCall(stream StreamingInputCallRequest)
+ returns (StreamingInputCallResponse);
+
+ // A sequence of requests with each request served by the server immediately.
+ // As one request could lead to multiple responses, this interface
+ // demonstrates the idea of full duplexing.
+ rpc FullDuplexCall(stream StreamingOutputCallRequest)
+ returns (stream StreamingOutputCallResponse);
+
+ // A sequence of requests followed by a sequence of responses.
+ // The server buffers all the client requests and then serves them in order. A
+ // stream of responses are returned to the client when the server starts with
+ // first request.
+ rpc HalfDuplexCall(stream StreamingOutputCallRequest)
+ returns (stream StreamingOutputCallResponse);
+}
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
index 1e3aeb1787..44fe760fbc 100644
--- a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
+++ b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
@@ -31,11 +31,12 @@ import threading
import time
import unittest
+from grpc import _grpcio_metadata
from grpc._adapter import _types
from grpc._adapter import _low
-def WaitForEvents(completion_queues, deadline):
+def wait_for_events(completion_queues, deadline):
"""
Args:
completion_queues: list of completion queues to wait for events on
@@ -62,6 +63,7 @@ def WaitForEvents(completion_queues, deadline):
thread.join()
return results
+
class InsecureServerInsecureClient(unittest.TestCase):
def setUp(self):
@@ -123,16 +125,21 @@ class InsecureServerInsecureClient(unittest.TestCase):
], client_call_tag)
self.assertEquals(_types.CallError.OK, client_start_batch_result)
- client_no_event, request_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 2)
+ client_no_event, request_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 2)
self.assertEquals(client_no_event, None)
self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type)
self.assertIsInstance(request_event.call, _low.Call)
self.assertIs(server_request_tag, request_event.tag)
self.assertEquals(1, len(request_event.results))
- got_initial_metadata = dict(request_event.results[0].initial_metadata)
+ received_initial_metadata = dict(request_event.results[0].initial_metadata)
+ # Check that our metadata were transmitted
self.assertEquals(
dict(client_initial_metadata),
- dict((x, got_initial_metadata[x]) for x in zip(*client_initial_metadata)[0]))
+ dict((x, received_initial_metadata[x]) for x in zip(*client_initial_metadata)[0]))
+ # Check that Python's user agent string is a part of the full user agent
+ # string
+ self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__),
+ received_initial_metadata['user-agent'])
self.assertEquals(METHOD, request_event.call_details.method)
self.assertEquals(HOST, request_event.call_details.host)
self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE)
@@ -159,7 +166,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
], server_call_tag)
self.assertEquals(_types.CallError.OK, server_start_batch_result)
- client_event, server_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 1)
+ client_event, server_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 1)
self.assertEquals(6, len(client_event.results))
found_client_op_types = set()
diff --git a/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py
index 7e1158f96b..251e1eb68e 100644
--- a/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py
+++ b/src/python/grpcio_test/grpc_test/framework/face/testing/blocking_invocation_inline_service_test_case.py
@@ -34,15 +34,13 @@ import abc
import unittest # pylint: disable=unused-import
from grpc.framework.face import exceptions
+from grpc_test.framework.common import test_constants
from grpc_test.framework.face.testing import control
from grpc_test.framework.face.testing import coverage
from grpc_test.framework.face.testing import digest
from grpc_test.framework.face.testing import stock_service
from grpc_test.framework.face.testing import test_case
-_TIMEOUT = 3
-_LONG_TIMEOUT = 45
-
class BlockingInvocationInlineServiceTestCase(
test_case.FaceTestCase, coverage.BlockingCoverage):
@@ -79,7 +77,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
response = self.stub.blocking_value_in_value_out(
- name, request, _LONG_TIMEOUT)
+ name, request, test_constants.LONG_TIMEOUT)
test_messages.verify(request, response, self)
@@ -90,7 +88,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _LONG_TIMEOUT)
+ name, request, test_constants.LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
@@ -102,7 +100,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
response = self.stub.blocking_stream_in_value_out(
- name, iter(requests), _LONG_TIMEOUT)
+ name, iter(requests), test_constants.LONG_TIMEOUT)
test_messages.verify(requests, response, self)
@@ -113,7 +111,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _LONG_TIMEOUT)
+ name, iter(requests), test_constants.LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)
@@ -126,12 +124,12 @@ class BlockingInvocationInlineServiceTestCase(
second_request = test_messages.request()
first_response = self.stub.blocking_value_in_value_out(
- name, first_request, _TIMEOUT)
+ name, first_request, test_constants.SHORT_TIMEOUT)
test_messages.verify(first_request, first_response, self)
second_response = self.stub.blocking_value_in_value_out(
- name, second_request, _TIMEOUT)
+ name, second_request, test_constants.SHORT_TIMEOUT)
test_messages.verify(second_request, second_response, self)
@@ -144,7 +142,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
multi_callable = self.stub.unary_unary_multi_callable(name)
- multi_callable(request, _TIMEOUT)
+ multi_callable(request, test_constants.SHORT_TIMEOUT)
def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -155,7 +153,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testExpiredStreamRequestUnaryResponse(self):
@@ -167,7 +165,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
multi_callable = self.stub.stream_unary_multi_callable(name)
- multi_callable(iter(requests), _TIMEOUT)
+ multi_callable(iter(requests), test_constants.SHORT_TIMEOUT)
def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -178,7 +176,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.pause(), self.assertRaises(
exceptions.ExpirationError):
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testFailedUnaryRequestUnaryResponse(self):
@@ -188,7 +186,8 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
- self.stub.blocking_value_in_value_out(name, request, _TIMEOUT)
+ self.stub.blocking_value_in_value_out(name, request,
+ test_constants.SHORT_TIMEOUT)
def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -198,7 +197,7 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testFailedStreamRequestUnaryResponse(self):
@@ -208,7 +207,8 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
- self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT)
+ self.stub.blocking_stream_in_value_out(name, iter(requests),
+ test_constants.SHORT_TIMEOUT)
def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -218,5 +218,5 @@ class BlockingInvocationInlineServiceTestCase(
with self.control.fail(), self.assertRaises(exceptions.ServicerError):
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
list(response_iterator)
diff --git a/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py
index 18eed53d6e..9df77678eb 100644
--- a/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py
+++ b/src/python/grpcio_test/grpc_test/framework/face/testing/event_invocation_synchronous_event_service_test_case.py
@@ -33,6 +33,7 @@ import abc
import unittest
from grpc.framework.face import interfaces
+from grpc_test.framework.common import test_constants
from grpc_test.framework.face.testing import callback as testing_callback
from grpc_test.framework.face.testing import control
from grpc_test.framework.face.testing import coverage
@@ -40,8 +41,6 @@ from grpc_test.framework.face.testing import digest
from grpc_test.framework.face.testing import stock_service
from grpc_test.framework.face.testing import test_case
-_TIMEOUT = 3
-
class EventInvocationSynchronousEventServiceTestCase(
test_case.FaceTestCase, coverage.FullCoverage):
@@ -79,7 +78,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
self.stub.event_value_in_value_out(
- name, request, callback.complete, callback.abort, _TIMEOUT)
+ name, request, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
response = callback.response()
@@ -93,7 +93,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
self.stub.event_value_in_stream_out(
- name, request, callback, callback.abort, _TIMEOUT)
+ name, request, callback, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
responses = callback.responses()
@@ -107,7 +108,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
unused_call, request_consumer = self.stub.event_stream_in_value_out(
- name, callback.complete, callback.abort, _TIMEOUT)
+ name, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
@@ -124,7 +126,7 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
unused_call, request_consumer = self.stub.event_stream_in_stream_out(
- name, callback, callback.abort, _TIMEOUT)
+ name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
@@ -147,11 +149,11 @@ class EventInvocationSynchronousEventServiceTestCase(
first_callback.complete(first_response)
self.stub.event_value_in_value_out(
name, second_request, second_callback.complete,
- second_callback.abort, _TIMEOUT)
+ second_callback.abort, test_constants.SHORT_TIMEOUT)
self.stub.event_value_in_value_out(
name, first_request, make_second_invocation, first_callback.abort,
- _TIMEOUT)
+ test_constants.SHORT_TIMEOUT)
second_callback.block_until_terminated()
first_response = first_callback.response()
@@ -168,7 +170,8 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.pause():
self.stub.event_value_in_value_out(
- name, request, callback.complete, callback.abort, _TIMEOUT)
+ name, request, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
@@ -182,7 +185,8 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.pause():
self.stub.event_value_in_stream_out(
- name, request, callback, callback.abort, _TIMEOUT)
+ name, request, callback, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
@@ -194,7 +198,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
self.stub.event_stream_in_value_out(
- name, callback.complete, callback.abort, _TIMEOUT)
+ name, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
@@ -207,7 +212,7 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
unused_call, request_consumer = self.stub.event_stream_in_stream_out(
- name, callback, callback.abort, _TIMEOUT)
+ name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
callback.block_until_terminated()
@@ -223,10 +228,12 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.fail():
self.stub.event_value_in_value_out(
- name, request, callback.complete, callback.abort, _TIMEOUT)
+ name, request, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
- self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
+ self.assertEqual(interfaces.Abortion.SERVICER_FAILURE,
+ callback.abortion())
def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -237,10 +244,12 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.fail():
self.stub.event_value_in_stream_out(
- name, request, callback, callback.abort, _TIMEOUT)
+ name, request, callback, callback.abort,
+ test_constants.SHORT_TIMEOUT)
callback.block_until_terminated()
- self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
+ self.assertEqual(interfaces.Abortion.SERVICER_FAILURE,
+ callback.abortion())
def testFailedStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
@@ -251,13 +260,15 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.fail():
unused_call, request_consumer = self.stub.event_stream_in_value_out(
- name, callback.complete, callback.abort, _TIMEOUT)
+ name, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
callback.block_until_terminated()
- self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
+ self.assertEqual(interfaces.Abortion.SERVICER_FAILURE,
+ callback.abortion())
def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -268,7 +279,7 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.fail():
unused_call, request_consumer = self.stub.event_stream_in_stream_out(
- name, callback, callback.abort, _TIMEOUT)
+ name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
request_consumer.terminate()
@@ -287,10 +298,10 @@ class EventInvocationSynchronousEventServiceTestCase(
self.stub.event_value_in_value_out(
name, first_request, first_callback.complete, first_callback.abort,
- _TIMEOUT)
+ test_constants.SHORT_TIMEOUT)
self.stub.event_value_in_value_out(
name, second_request, second_callback.complete,
- second_callback.abort, _TIMEOUT)
+ second_callback.abort, test_constants.SHORT_TIMEOUT)
first_callback.block_until_terminated()
second_callback.block_until_terminated()
@@ -312,7 +323,8 @@ class EventInvocationSynchronousEventServiceTestCase(
with self.control.pause():
call = self.stub.event_value_in_value_out(
- name, request, callback.complete, callback.abort, _TIMEOUT)
+ name, request, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
call.cancel()
callback.block_until_terminated()
@@ -326,7 +338,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
call = self.stub.event_value_in_stream_out(
- name, request, callback, callback.abort, _TIMEOUT)
+ name, request, callback, callback.abort,
+ test_constants.SHORT_TIMEOUT)
call.cancel()
callback.block_until_terminated()
@@ -340,7 +353,8 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
call, request_consumer = self.stub.event_stream_in_value_out(
- name, callback.complete, callback.abort, _TIMEOUT)
+ name, callback.complete, callback.abort,
+ test_constants.SHORT_TIMEOUT)
for request in requests:
request_consumer.consume(request)
call.cancel()
@@ -355,7 +369,7 @@ class EventInvocationSynchronousEventServiceTestCase(
callback = testing_callback.Callback()
call, unused_request_consumer = self.stub.event_stream_in_stream_out(
- name, callback, callback.abort, _TIMEOUT)
+ name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
call.cancel()
callback.block_until_terminated()
diff --git a/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
index 3b42914342..70d86a0422 100644
--- a/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
+++ b/src/python/grpcio_test/grpc_test/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
@@ -37,13 +37,13 @@ import unittest
from grpc.framework.face import exceptions
from grpc.framework.foundation import future
from grpc.framework.foundation import logging_pool
+from grpc_test.framework.common import test_constants
from grpc_test.framework.face.testing import control
from grpc_test.framework.face.testing import coverage
from grpc_test.framework.face.testing import digest
from grpc_test.framework.face.testing import stock_service
from grpc_test.framework.face.testing import test_case
-_TIMEOUT = 3
_MAXIMUM_POOL_SIZE = 10
@@ -110,7 +110,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
request = test_messages.request()
response_future = self.stub.future_value_in_value_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
response = response_future.result()
test_messages.verify(request, response, self)
@@ -122,7 +122,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
request = test_messages.request()
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
@@ -138,7 +138,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
# returned to calling code before the iterator yields any requests.
with request_iterator.pause():
response_future = self.stub.future_stream_in_value_out(
- name, request_iterator, _TIMEOUT)
+ name, request_iterator, test_constants.SHORT_TIMEOUT)
response = response_future.result()
test_messages.verify(requests, response, self)
@@ -154,7 +154,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
# returned to calling code before the iterator yields any requests.
with request_iterator.pause():
response_iterator = self.stub.inline_stream_in_stream_out(
- name, request_iterator, _TIMEOUT)
+ name, request_iterator, test_constants.SHORT_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)
@@ -167,13 +167,13 @@ class FutureInvocationAsynchronousEventServiceTestCase(
second_request = test_messages.request()
first_response_future = self.stub.future_value_in_value_out(
- name, first_request, _TIMEOUT)
+ name, first_request, test_constants.SHORT_TIMEOUT)
first_response = first_response_future.result()
test_messages.verify(first_request, first_response, self)
second_response_future = self.stub.future_value_in_value_out(
- name, second_request, _TIMEOUT)
+ name, second_request, test_constants.SHORT_TIMEOUT)
second_response = second_response_future.result()
test_messages.verify(second_request, second_response, self)
@@ -186,7 +186,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
multi_callable = self.stub.unary_unary_multi_callable(name)
- response_future = multi_callable.future(request, _TIMEOUT)
+ response_future = multi_callable.future(request,
+ test_constants.SHORT_TIMEOUT)
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
@@ -200,7 +201,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(response_iterator)
@@ -212,7 +213,8 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
multi_callable = self.stub.stream_unary_multi_callable(name)
- response_future = multi_callable.future(iter(requests), _TIMEOUT)
+ response_future = multi_callable.future(iter(requests),
+ test_constants.SHORT_TIMEOUT)
self.assertIsInstance(
response_future.exception(), exceptions.ExpirationError)
with self.assertRaises(exceptions.ExpirationError):
@@ -226,7 +228,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(response_iterator)
@@ -238,7 +240,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.fail():
response_future = self.stub.future_value_in_value_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is
@@ -261,7 +263,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
# expiration of the RPC.
with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testFailedStreamRequestUnaryResponse(self):
@@ -272,7 +274,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.fail():
response_future = self.stub.future_stream_in_value_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
# Because the servicer fails outside of the thread from which the
# servicer-side runtime called into it its failure is
@@ -295,7 +297,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
# expiration of the RPC.
with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
list(response_iterator)
def testParallelInvocations(self):
@@ -305,10 +307,11 @@ class FutureInvocationAsynchronousEventServiceTestCase(
first_request = test_messages.request()
second_request = test_messages.request()
+ # TODO(bug 2039): use LONG_TIMEOUT instead
first_response_future = self.stub.future_value_in_value_out(
- name, first_request, _TIMEOUT)
+ name, first_request, test_constants.SHORT_TIMEOUT)
second_response_future = self.stub.future_value_in_value_out(
- name, second_request, _TIMEOUT)
+ name, second_request, test_constants.SHORT_TIMEOUT)
first_response = first_response_future.result()
second_response = second_response_future.result()
@@ -327,7 +330,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_future = self.stub.future_value_in_value_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
cancel_method_return_value = response_future.cancel()
self.assertFalse(cancel_method_return_value)
@@ -341,7 +344,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, test_constants.SHORT_TIMEOUT)
response_iterator.cancel()
with self.assertRaises(future.CancelledError):
@@ -355,7 +358,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_future = self.stub.future_stream_in_value_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
cancel_method_return_value = response_future.cancel()
self.assertFalse(cancel_method_return_value)
@@ -369,7 +372,7 @@ class FutureInvocationAsynchronousEventServiceTestCase(
with self.control.pause():
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), test_constants.SHORT_TIMEOUT)
response_iterator.cancel()
with self.assertRaises(future.CancelledError):
diff --git a/src/python/grpcio_test/setup.py b/src/python/grpcio_test/setup.py
index 925c32720f..a6203cae2d 100644
--- a/src/python/grpcio_test/setup.py
+++ b/src/python/grpcio_test/setup.py
@@ -48,8 +48,13 @@ _PACKAGE_DIRECTORIES = {
_PACKAGE_DATA = {
'grpc_interop': [
- 'credentials/ca.pem', 'credentials/server1.key',
- 'credentials/server1.pem',]
+ 'credentials/ca.pem',
+ 'credentials/server1.key',
+ 'credentials/server1.pem',
+ ],
+ 'grpc_protoc_plugin': [
+ 'test.proto',
+ ],
}
_SETUP_REQUIRES = (
@@ -75,5 +80,5 @@ setuptools.setup(
package_data=_PACKAGE_DATA,
install_requires=_INSTALL_REQUIRES + _SETUP_REQUIRES,
setup_requires=_SETUP_REQUIRES,
- cmdclass=_COMMAND_CLASS
+ cmdclass=_COMMAND_CLASS,
)