aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/interop
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_tests/tests/interop')
-rw-r--r--src/python/grpcio_tests/tests/interop/__init__.py2
-rw-r--r--src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py (renamed from src/python/grpcio_tests/tests/interop/_insecure_interop_test.py)26
-rw-r--r--src/python/grpcio_tests/tests/interop/_intraop_test_case.py (renamed from src/python/grpcio_tests/tests/interop/_interop_test_case.py)40
-rw-r--r--src/python/grpcio_tests/tests/interop/_secure_intraop_test.py (renamed from src/python/grpcio_tests/tests/interop/_secure_interop_test.py)37
-rw-r--r--src/python/grpcio_tests/tests/interop/client.py163
-rw-r--r--src/python/grpcio_tests/tests/interop/methods.py767
-rw-r--r--src/python/grpcio_tests/tests/interop/resources.py21
-rw-r--r--src/python/grpcio_tests/tests/interop/server.py57
8 files changed, 570 insertions, 543 deletions
diff --git a/src/python/grpcio_tests/tests/interop/__init__.py b/src/python/grpcio_tests/tests/interop/__init__.py
index 7086519106..b89398809f 100644
--- a/src/python/grpcio_tests/tests/interop/__init__.py
+++ b/src/python/grpcio_tests/tests/interop/__init__.py
@@ -26,5 +26,3 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
index 936c895bd2..58f3b364ba 100644
--- a/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Insecure client-server interoperability as a unit test."""
from concurrent import futures
@@ -35,24 +34,23 @@ import unittest
import grpc
from src.proto.grpc.testing import test_pb2
-from tests.interop import _interop_test_case
+from tests.interop import _intraop_test_case
from tests.interop import methods
from tests.interop import server
-class InsecureInteropTest(
- _interop_test_case.InteropTestCase,
- unittest.TestCase):
+class InsecureIntraopTest(_intraop_test_case.IntraopTestCase,
+ unittest.TestCase):
- def setUp(self):
- self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
- test_pb2.add_TestServiceServicer_to_server(
- methods.TestService(), self.server)
- port = self.server.add_insecure_port('[::]:0')
- self.server.start()
- self.stub = test_pb2.TestServiceStub(
- grpc.insecure_channel('localhost:{}'.format(port)))
+ def setUp(self):
+ self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ test_pb2.add_TestServiceServicer_to_server(methods.TestService(),
+ self.server)
+ port = self.server.add_insecure_port('[::]:0')
+ self.server.start()
+ self.stub = test_pb2.TestServiceStub(
+ grpc.insecure_channel('localhost:{}'.format(port)))
if __name__ == '__main__':
- unittest.main(verbosity=2)
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/interop/_interop_test_case.py b/src/python/grpcio_tests/tests/interop/_intraop_test_case.py
index ccea17a66d..424f93980c 100644
--- a/src/python/grpcio_tests/tests/interop/_interop_test_case.py
+++ b/src/python/grpcio_tests/tests/interop/_intraop_test_case.py
@@ -26,39 +26,41 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Common code for unit tests of the interoperability test code."""
from tests.interop import methods
-class InteropTestCase(object):
- """Unit test methods.
+class IntraopTestCase(object):
+ """Unit test methods.
This class must be mixed in with unittest.TestCase and a class that defines
setUp and tearDown methods that manage a stub attribute.
"""
- def testEmptyUnary(self):
- methods.TestCase.EMPTY_UNARY.test_interoperability(self.stub, None)
+ def testEmptyUnary(self):
+ methods.TestCase.EMPTY_UNARY.test_interoperability(self.stub, None)
- def testLargeUnary(self):
- methods.TestCase.LARGE_UNARY.test_interoperability(self.stub, None)
+ def testLargeUnary(self):
+ methods.TestCase.LARGE_UNARY.test_interoperability(self.stub, None)
- def testServerStreaming(self):
- methods.TestCase.SERVER_STREAMING.test_interoperability(self.stub, None)
+ def testServerStreaming(self):
+ methods.TestCase.SERVER_STREAMING.test_interoperability(self.stub, None)
- def testClientStreaming(self):
- methods.TestCase.CLIENT_STREAMING.test_interoperability(self.stub, None)
+ def testClientStreaming(self):
+ methods.TestCase.CLIENT_STREAMING.test_interoperability(self.stub, None)
- def testPingPong(self):
- methods.TestCase.PING_PONG.test_interoperability(self.stub, None)
+ def testPingPong(self):
+ methods.TestCase.PING_PONG.test_interoperability(self.stub, None)
- def testCancelAfterBegin(self):
- methods.TestCase.CANCEL_AFTER_BEGIN.test_interoperability(self.stub, None)
+ def testCancelAfterBegin(self):
+ methods.TestCase.CANCEL_AFTER_BEGIN.test_interoperability(self.stub,
+ None)
- def testCancelAfterFirstResponse(self):
- methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability(self.stub, None)
+ def testCancelAfterFirstResponse(self):
+ methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability(
+ self.stub, None)
- def testTimeoutOnSleepingServer(self):
- methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER.test_interoperability(self.stub, None)
+ def testTimeoutOnSleepingServer(self):
+ methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER.test_interoperability(
+ self.stub, None)
diff --git a/src/python/grpcio_tests/tests/interop/_secure_interop_test.py b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
index eaca553e1b..b28406ed3f 100644
--- a/src/python/grpcio_tests/tests/interop/_secure_interop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Secure client-server interoperability as a unit test."""
from concurrent import futures
@@ -35,31 +34,31 @@ import unittest
import grpc
from src.proto.grpc.testing import test_pb2
-from tests.interop import _interop_test_case
+from tests.interop import _intraop_test_case
from tests.interop import methods
from tests.interop import resources
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
-class SecureInteropTest(
- _interop_test_case.InteropTestCase,
- unittest.TestCase):
+class SecureIntraopTest(_intraop_test_case.IntraopTestCase, unittest.TestCase):
- def setUp(self):
- self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
- test_pb2.add_TestServiceServicer_to_server(
- methods.TestService(), self.server)
- port = self.server.add_secure_port(
- '[::]:0', grpc.ssl_server_credentials(
- [(resources.private_key(), resources.certificate_chain())]))
- self.server.start()
- self.stub = test_pb2.TestServiceStub(
- grpc.secure_channel(
- 'localhost:{}'.format(port),
- grpc.ssl_channel_credentials(resources.test_root_certificates()),
- (('grpc.ssl_target_name_override', _SERVER_HOST_OVERRIDE,),)))
+ def setUp(self):
+ self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ test_pb2.add_TestServiceServicer_to_server(methods.TestService(),
+ self.server)
+ port = self.server.add_secure_port(
+ '[::]:0',
+ grpc.ssl_server_credentials(
+ [(resources.private_key(), resources.certificate_chain())]))
+ self.server.start()
+ self.stub = test_pb2.TestServiceStub(
+ grpc.secure_channel('localhost:{}'.format(port),
+ grpc.ssl_channel_credentials(
+ resources.test_root_certificates()), ((
+ 'grpc.ssl_target_name_override',
+ _SERVER_HOST_OVERRIDE,),)))
if __name__ == '__main__':
- unittest.main(verbosity=2)
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py
index afaa466254..f177896e8e 100644
--- a/src/python/grpcio_tests/tests/interop/client.py
+++ b/src/python/grpcio_tests/tests/interop/client.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""The Python implementation of the GRPC interoperability test client."""
import argparse
@@ -41,93 +40,107 @@ from tests.interop import resources
def _args():
- parser = argparse.ArgumentParser()
- parser.add_argument(
- '--server_host', help='the host to which to connect', type=str,
- default="127.0.0.1")
- parser.add_argument(
- '--server_port', help='the port to which to connect', type=int)
- parser.add_argument(
- '--test_case', help='the test case to execute', type=str,
- default="large_unary")
- parser.add_argument(
- '--use_tls', help='require a secure connection', default=False,
- type=resources.parse_bool)
- parser.add_argument(
- '--use_test_ca', help='replace platform root CAs with ca.pem',
- default=False, type=resources.parse_bool)
- parser.add_argument(
- '--server_host_override', default="foo.test.google.fr",
- help='the server host to which to claim to connect', type=str)
- parser.add_argument('--oauth_scope', help='scope for OAuth tokens', type=str)
- parser.add_argument(
- '--default_service_account',
- help='email address of the default service account', type=str)
- return parser.parse_args()
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--server_host',
+ help='the host to which to connect',
+ type=str,
+ default="127.0.0.1")
+ parser.add_argument(
+ '--server_port', help='the port to which to connect', type=int)
+ parser.add_argument(
+ '--test_case',
+ help='the test case to execute',
+ type=str,
+ default="large_unary")
+ parser.add_argument(
+ '--use_tls',
+ help='require a secure connection',
+ default=False,
+ type=resources.parse_bool)
+ parser.add_argument(
+ '--use_test_ca',
+ help='replace platform root CAs with ca.pem',
+ default=False,
+ type=resources.parse_bool)
+ parser.add_argument(
+ '--server_host_override',
+ default="foo.test.google.fr",
+ help='the server host to which to claim to connect',
+ type=str)
+ parser.add_argument(
+ '--oauth_scope', help='scope for OAuth tokens', type=str)
+ parser.add_argument(
+ '--default_service_account',
+ help='email address of the default service account',
+ type=str)
+ return parser.parse_args()
def _application_default_credentials():
- return oauth2client_client.GoogleCredentials.get_application_default()
+ return oauth2client_client.GoogleCredentials.get_application_default()
def _stub(args):
- target = '{}:{}'.format(args.server_host, args.server_port)
- if args.test_case == 'oauth2_auth_token':
- google_credentials = _application_default_credentials()
- scoped_credentials = google_credentials.create_scoped([args.oauth_scope])
- access_token = scoped_credentials.get_access_token().access_token
- call_credentials = grpc.access_token_call_credentials(access_token)
- elif args.test_case == 'compute_engine_creds':
- google_credentials = _application_default_credentials()
- scoped_credentials = google_credentials.create_scoped([args.oauth_scope])
- # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
- # remaining use of the Beta API.
- call_credentials = implementations.google_call_credentials(
- scoped_credentials)
- elif args.test_case == 'jwt_token_creds':
- google_credentials = _application_default_credentials()
- # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
- # remaining use of the Beta API.
- call_credentials = implementations.google_call_credentials(
- google_credentials)
- else:
- call_credentials = None
- if args.use_tls:
- if args.use_test_ca:
- root_certificates = resources.test_root_certificates()
+ target = '{}:{}'.format(args.server_host, args.server_port)
+ if args.test_case == 'oauth2_auth_token':
+ google_credentials = _application_default_credentials()
+ scoped_credentials = google_credentials.create_scoped(
+ [args.oauth_scope])
+ access_token = scoped_credentials.get_access_token().access_token
+ call_credentials = grpc.access_token_call_credentials(access_token)
+ elif args.test_case == 'compute_engine_creds':
+ google_credentials = _application_default_credentials()
+ scoped_credentials = google_credentials.create_scoped(
+ [args.oauth_scope])
+ # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
+ # remaining use of the Beta API.
+ call_credentials = implementations.google_call_credentials(
+ scoped_credentials)
+ elif args.test_case == 'jwt_token_creds':
+ google_credentials = _application_default_credentials()
+ # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
+ # remaining use of the Beta API.
+ call_credentials = implementations.google_call_credentials(
+ google_credentials)
else:
- root_certificates = None # will load default roots.
-
- channel_credentials = grpc.ssl_channel_credentials(root_certificates)
- if call_credentials is not None:
- channel_credentials = grpc.composite_channel_credentials(
- channel_credentials, call_credentials)
-
- channel = grpc.secure_channel(
- target, channel_credentials,
- (('grpc.ssl_target_name_override', args.server_host_override,),))
- else:
- channel = grpc.insecure_channel(target)
- if args.test_case == "unimplemented_service":
- return test_pb2.UnimplementedServiceStub(channel)
- else:
- return test_pb2.TestServiceStub(channel)
+ call_credentials = None
+ if args.use_tls:
+ if args.use_test_ca:
+ root_certificates = resources.test_root_certificates()
+ else:
+ root_certificates = None # will load default roots.
+
+ channel_credentials = grpc.ssl_channel_credentials(root_certificates)
+ if call_credentials is not None:
+ channel_credentials = grpc.composite_channel_credentials(
+ channel_credentials, call_credentials)
+
+ channel = grpc.secure_channel(target, channel_credentials, ((
+ 'grpc.ssl_target_name_override',
+ args.server_host_override,),))
+ else:
+ channel = grpc.insecure_channel(target)
+ if args.test_case == "unimplemented_service":
+ return test_pb2.UnimplementedServiceStub(channel)
+ else:
+ return test_pb2.TestServiceStub(channel)
def _test_case_from_arg(test_case_arg):
- for test_case in methods.TestCase:
- if test_case_arg == test_case.value:
- return test_case
- else:
- raise ValueError('No test case "%s"!' % test_case_arg)
+ for test_case in methods.TestCase:
+ if test_case_arg == test_case.value:
+ return test_case
+ else:
+ raise ValueError('No test case "%s"!' % test_case_arg)
def test_interoperability():
- args = _args()
- stub = _stub(args)
- test_case = _test_case_from_arg(args.test_case)
- test_case.test_interoperability(stub, args)
+ args = _args()
+ stub = _stub(args)
+ test_case = _test_case_from_arg(args.test_case)
+ test_case.test_interoperability(stub, args)
if __name__ == '__main__':
- test_interoperability()
+ test_interoperability()
diff --git a/src/python/grpcio_tests/tests/interop/methods.py b/src/python/grpcio_tests/tests/interop/methods.py
index 52e56f3502..e1f8722168 100644
--- a/src/python/grpcio_tests/tests/interop/methods.py
+++ b/src/python/grpcio_tests/tests/interop/methods.py
@@ -26,14 +26,12 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Implementations of interoperability test methods."""
import enum
import json
import os
import threading
-import time
from oauth2client import client as oauth2client_client
@@ -47,464 +45,483 @@ from src.proto.grpc.testing import test_pb2
_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial"
_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin"
+
def _maybe_echo_metadata(servicer_context):
- """Copies metadata from request to response if it is present."""
- invocation_metadata = dict(servicer_context.invocation_metadata())
- if _INITIAL_METADATA_KEY in invocation_metadata:
- initial_metadatum = (
- _INITIAL_METADATA_KEY, invocation_metadata[_INITIAL_METADATA_KEY])
- servicer_context.send_initial_metadata((initial_metadatum,))
- if _TRAILING_METADATA_KEY in invocation_metadata:
- trailing_metadatum = (
- _TRAILING_METADATA_KEY, invocation_metadata[_TRAILING_METADATA_KEY])
- servicer_context.set_trailing_metadata((trailing_metadatum,))
+ """Copies metadata from request to response if it is present."""
+ invocation_metadata = dict(servicer_context.invocation_metadata())
+ if _INITIAL_METADATA_KEY in invocation_metadata:
+ initial_metadatum = (_INITIAL_METADATA_KEY,
+ invocation_metadata[_INITIAL_METADATA_KEY])
+ servicer_context.send_initial_metadata((initial_metadatum,))
+ if _TRAILING_METADATA_KEY in invocation_metadata:
+ trailing_metadatum = (_TRAILING_METADATA_KEY,
+ invocation_metadata[_TRAILING_METADATA_KEY])
+ servicer_context.set_trailing_metadata((trailing_metadatum,))
+
def _maybe_echo_status_and_message(request, servicer_context):
- """Sets the response context code and details if the request asks for them"""
- if request.HasField('response_status'):
- servicer_context.set_code(request.response_status.code)
- servicer_context.set_details(request.response_status.message)
+ """Sets the response context code and details if the request asks for them"""
+ if request.HasField('response_status'):
+ servicer_context.set_code(request.response_status.code)
+ servicer_context.set_details(request.response_status.message)
+
class TestService(test_pb2.TestServiceServicer):
- def EmptyCall(self, request, context):
- _maybe_echo_metadata(context)
- return empty_pb2.Empty()
+ def EmptyCall(self, request, context):
+ _maybe_echo_metadata(context)
+ return empty_pb2.Empty()
- def UnaryCall(self, request, context):
- _maybe_echo_metadata(context)
- _maybe_echo_status_and_message(request, context)
- return messages_pb2.SimpleResponse(
- payload=messages_pb2.Payload(
+ def UnaryCall(self, request, context):
+ _maybe_echo_metadata(context)
+ _maybe_echo_status_and_message(request, context)
+ return messages_pb2.SimpleResponse(payload=messages_pb2.Payload(
type=messages_pb2.COMPRESSABLE,
body=b'\x00' * request.response_size))
- def StreamingOutputCall(self, request, context):
- _maybe_echo_status_and_message(request, context)
- for response_parameters in request.response_parameters:
- yield messages_pb2.StreamingOutputCallResponse(
- payload=messages_pb2.Payload(
- type=request.response_type,
- body=b'\x00' * response_parameters.size))
-
- def StreamingInputCall(self, request_iterator, context):
- aggregate_size = 0
- for request in request_iterator:
- if request.payload is not None and request.payload.body:
- aggregate_size += len(request.payload.body)
- return messages_pb2.StreamingInputCallResponse(
- aggregated_payload_size=aggregate_size)
-
- def FullDuplexCall(self, request_iterator, context):
- _maybe_echo_metadata(context)
- for request in request_iterator:
- _maybe_echo_status_and_message(request, context)
- for response_parameters in request.response_parameters:
- yield messages_pb2.StreamingOutputCallResponse(
- payload=messages_pb2.Payload(
- type=request.payload.type,
- body=b'\x00' * response_parameters.size))
-
- # NOTE(nathaniel): Apparently this is the same as the full-duplex call?
- # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)...
- def HalfDuplexCall(self, request_iterator, context):
- return self.FullDuplexCall(request_iterator, context)
+ def StreamingOutputCall(self, request, context):
+ _maybe_echo_status_and_message(request, context)
+ for response_parameters in request.response_parameters:
+ yield messages_pb2.StreamingOutputCallResponse(
+ payload=messages_pb2.Payload(
+ type=request.response_type,
+ body=b'\x00' * response_parameters.size))
+
+ def StreamingInputCall(self, request_iterator, context):
+ aggregate_size = 0
+ for request in request_iterator:
+ if request.payload is not None and request.payload.body:
+ aggregate_size += len(request.payload.body)
+ return messages_pb2.StreamingInputCallResponse(
+ aggregated_payload_size=aggregate_size)
+
+ def FullDuplexCall(self, request_iterator, context):
+ _maybe_echo_metadata(context)
+ for request in request_iterator:
+ _maybe_echo_status_and_message(request, context)
+ for response_parameters in request.response_parameters:
+ yield messages_pb2.StreamingOutputCallResponse(
+ payload=messages_pb2.Payload(
+ type=request.payload.type,
+ body=b'\x00' * response_parameters.size))
+
+ # NOTE(nathaniel): Apparently this is the same as the full-duplex call?
+ # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)...
+ def HalfDuplexCall(self, request_iterator, context):
+ return self.FullDuplexCall(request_iterator, context)
def _expect_status_code(call, expected_code):
- if call.code() != expected_code:
- raise ValueError(
- 'expected code %s, got %s' % (expected_code, call.code()))
+ if call.code() != expected_code:
+ raise ValueError('expected code %s, got %s' %
+ (expected_code, call.code()))
def _expect_status_details(call, expected_details):
- if call.details() != expected_details:
- raise ValueError(
- 'expected message %s, got %s' % (expected_details, call.details()))
+ if call.details() != expected_details:
+ raise ValueError('expected message %s, got %s' %
+ (expected_details, call.details()))
def _validate_status_code_and_details(call, expected_code, expected_details):
- _expect_status_code(call, expected_code)
- _expect_status_details(call, expected_details)
+ _expect_status_code(call, expected_code)
+ _expect_status_details(call, expected_details)
def _validate_payload_type_and_length(response, expected_type, expected_length):
- if response.payload.type is not expected_type:
- raise ValueError(
- 'expected payload type %s, got %s' %
- (expected_type, type(response.payload.type)))
- elif len(response.payload.body) != expected_length:
- raise ValueError(
- 'expected payload body size %d, got %d' %
- (expected_length, len(response.payload.body)))
-
-
-def _large_unary_common_behavior(
- stub, fill_username, fill_oauth_scope, call_credentials):
- size = 314159
- request = messages_pb2.SimpleRequest(
- response_type=messages_pb2.COMPRESSABLE, response_size=size,
- payload=messages_pb2.Payload(body=b'\x00' * 271828),
- fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
- response_future = stub.UnaryCall.future(
- request, credentials=call_credentials)
- response = response_future.result()
- _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
- return response
+ if response.payload.type is not expected_type:
+ raise ValueError('expected payload type %s, got %s' %
+ (expected_type, type(response.payload.type)))
+ elif len(response.payload.body) != expected_length:
+ raise ValueError('expected payload body size %d, got %d' %
+ (expected_length, len(response.payload.body)))
+
+
+def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope,
+ call_credentials):
+ size = 314159
+ request = messages_pb2.SimpleRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_size=size,
+ payload=messages_pb2.Payload(body=b'\x00' * 271828),
+ fill_username=fill_username,
+ fill_oauth_scope=fill_oauth_scope)
+ response_future = stub.UnaryCall.future(
+ request, credentials=call_credentials)
+ response = response_future.result()
+ _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
+ return response
def _empty_unary(stub):
- response = stub.EmptyCall(empty_pb2.Empty())
- if not isinstance(response, empty_pb2.Empty):
- raise TypeError(
- 'response is of type "%s", not empty_pb2.Empty!', type(response))
+ response = stub.EmptyCall(empty_pb2.Empty())
+ if not isinstance(response, empty_pb2.Empty):
+ raise TypeError('response is of type "%s", not empty_pb2.Empty!',
+ type(response))
def _large_unary(stub):
- _large_unary_common_behavior(stub, False, False, None)
+ _large_unary_common_behavior(stub, False, False, None)
def _client_streaming(stub):
- payload_body_sizes = (27182, 8, 1828, 45904,)
- payloads = (
- messages_pb2.Payload(body=b'\x00' * size)
- for size in payload_body_sizes)
- requests = (
- messages_pb2.StreamingInputCallRequest(payload=payload)
- for payload in payloads)
- response = stub.StreamingInputCall(requests)
- if response.aggregated_payload_size != 74922:
- raise ValueError(
- 'incorrect size %d!' % response.aggregated_payload_size)
+ payload_body_sizes = (
+ 27182,
+ 8,
+ 1828,
+ 45904,)
+ payloads = (messages_pb2.Payload(body=b'\x00' * size)
+ for size in payload_body_sizes)
+ requests = (messages_pb2.StreamingInputCallRequest(payload=payload)
+ for payload in payloads)
+ response = stub.StreamingInputCall(requests)
+ if response.aggregated_payload_size != 74922:
+ raise ValueError('incorrect size %d!' %
+ response.aggregated_payload_size)
def _server_streaming(stub):
- sizes = (31415, 9, 2653, 58979,)
-
- request = messages_pb2.StreamingOutputCallRequest(
- response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(
- messages_pb2.ResponseParameters(size=sizes[0]),
- messages_pb2.ResponseParameters(size=sizes[1]),
- messages_pb2.ResponseParameters(size=sizes[2]),
- messages_pb2.ResponseParameters(size=sizes[3]),
- )
- )
- response_iterator = stub.StreamingOutputCall(request)
- for index, response in enumerate(response_iterator):
- _validate_payload_type_and_length(
- response, messages_pb2.COMPRESSABLE, sizes[index])
-
+ sizes = (
+ 31415,
+ 9,
+ 2653,
+ 58979,)
-def _cancel_after_begin(stub):
- sizes = (27182, 8, 1828, 45904,)
- payloads = (messages_pb2.Payload(body=b'\x00' * size) for size in sizes)
- requests = (messages_pb2.StreamingInputCallRequest(payload=payload)
- for payload in payloads)
- response_future = stub.StreamingInputCall.future(requests)
- response_future.cancel()
- if not response_future.cancelled():
- raise ValueError('expected call to be cancelled')
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=sizes[0]),
+ messages_pb2.ResponseParameters(size=sizes[1]),
+ messages_pb2.ResponseParameters(size=sizes[2]),
+ messages_pb2.ResponseParameters(size=sizes[3]),))
+ response_iterator = stub.StreamingOutputCall(request)
+ for index, response in enumerate(response_iterator):
+ _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE,
+ sizes[index])
class _Pipe(object):
- def __init__(self):
- self._condition = threading.Condition()
- self._values = []
- self._open = True
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._values = []
+ self._open = True
- def __iter__(self):
- return self
+ def __iter__(self):
+ return self
- def __next__(self):
- return self.next()
+ def __next__(self):
+ return self.next()
- def next(self):
- with self._condition:
- while not self._values and self._open:
- self._condition.wait()
- if self._values:
- return self._values.pop(0)
- else:
- raise StopIteration()
+ def next(self):
+ with self._condition:
+ while not self._values and self._open:
+ self._condition.wait()
+ if self._values:
+ return self._values.pop(0)
+ else:
+ raise StopIteration()
- def add(self, value):
- with self._condition:
- self._values.append(value)
- self._condition.notify()
+ def add(self, value):
+ with self._condition:
+ self._values.append(value)
+ self._condition.notify()
- def close(self):
- with self._condition:
- self._open = False
- self._condition.notify()
+ def close(self):
+ with self._condition:
+ self._open = False
+ self._condition.notify()
- def __enter__(self):
- return self
+ def __enter__(self):
+ return self
- def __exit__(self, type, value, traceback):
- self.close()
+ def __exit__(self, type, value, traceback):
+ self.close()
def _ping_pong(stub):
- request_response_sizes = (31415, 9, 2653, 58979,)
- request_payload_sizes = (27182, 8, 1828, 45904,)
-
- with _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe)
- for response_size, payload_size in zip(
- request_response_sizes, request_payload_sizes):
- request = messages_pb2.StreamingOutputCallRequest(
- response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(
- messages_pb2.ResponseParameters(size=response_size),),
- payload=messages_pb2.Payload(body=b'\x00' * payload_size))
- pipe.add(request)
- response = next(response_iterator)
- _validate_payload_type_and_length(
- response, messages_pb2.COMPRESSABLE, response_size)
+ request_response_sizes = (
+ 31415,
+ 9,
+ 2653,
+ 58979,)
+ request_payload_sizes = (
+ 27182,
+ 8,
+ 1828,
+ 45904,)
+
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe)
+ for response_size, payload_size in zip(request_response_sizes,
+ request_payload_sizes):
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=response_size),),
+ payload=messages_pb2.Payload(body=b'\x00' * payload_size))
+ pipe.add(request)
+ response = next(response_iterator)
+ _validate_payload_type_and_length(
+ response, messages_pb2.COMPRESSABLE, response_size)
-def _cancel_after_first_response(stub):
- request_response_sizes = (31415, 9, 2653, 58979,)
- request_payload_sizes = (27182, 8, 1828, 45904,)
- with _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe)
+def _cancel_after_begin(stub):
+ with _Pipe() as pipe:
+ response_future = stub.StreamingInputCall.future(pipe)
+ response_future.cancel()
+ if not response_future.cancelled():
+ raise ValueError('expected cancelled method to return True')
+ if response_future.code() is not grpc.StatusCode.CANCELLED:
+ raise ValueError('expected status code CANCELLED')
- response_size = request_response_sizes[0]
- payload_size = request_payload_sizes[0]
- request = messages_pb2.StreamingOutputCallRequest(
- response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(
- messages_pb2.ResponseParameters(size=response_size),),
- payload=messages_pb2.Payload(body=b'\x00' * payload_size))
- pipe.add(request)
- response = next(response_iterator)
- # We test the contents of `response` in the Ping Pong test - don't check
- # them here.
- response_iterator.cancel()
-
- try:
- next(response_iterator)
- except grpc.RpcError as rpc_error:
- if rpc_error.code() is not grpc.StatusCode.CANCELLED:
- raise
- else:
- raise ValueError('expected call to be cancelled')
+def _cancel_after_first_response(stub):
+ request_response_sizes = (
+ 31415,
+ 9,
+ 2653,
+ 58979,)
+ request_payload_sizes = (
+ 27182,
+ 8,
+ 1828,
+ 45904,)
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe)
+
+ response_size = request_response_sizes[0]
+ payload_size = request_payload_sizes[0]
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=response_size),),
+ payload=messages_pb2.Payload(body=b'\x00' * payload_size))
+ pipe.add(request)
+ response = next(response_iterator)
+ # We test the contents of `response` in the Ping Pong test - don't check
+ # them here.
+ response_iterator.cancel()
+
+ try:
+ next(response_iterator)
+ except grpc.RpcError as rpc_error:
+ if rpc_error.code() is not grpc.StatusCode.CANCELLED:
+ raise
+ else:
+ raise ValueError('expected call to be cancelled')
-def _timeout_on_sleeping_server(stub):
- request_payload_size = 27182
- with _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe, timeout=0.001)
- request = messages_pb2.StreamingOutputCallRequest(
- response_type=messages_pb2.COMPRESSABLE,
- payload=messages_pb2.Payload(body=b'\x00' * request_payload_size))
- pipe.add(request)
- time.sleep(0.1)
- try:
- next(response_iterator)
- except grpc.RpcError as rpc_error:
- if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED:
- raise
- else:
- raise ValueError('expected call to exceed deadline')
+def _timeout_on_sleeping_server(stub):
+ request_payload_size = 27182
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe, timeout=0.001)
+
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ payload=messages_pb2.Payload(body=b'\x00' * request_payload_size))
+ pipe.add(request)
+ try:
+ next(response_iterator)
+ except grpc.RpcError as rpc_error:
+ if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED:
+ raise
+ else:
+ raise ValueError('expected call to exceed deadline')
def _empty_stream(stub):
- with _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe)
- pipe.close()
- try:
- next(response_iterator)
- raise ValueError('expected exactly 0 responses')
- except StopIteration:
- pass
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe)
+ pipe.close()
+ try:
+ next(response_iterator)
+ raise ValueError('expected exactly 0 responses')
+ except StopIteration:
+ pass
def _status_code_and_message(stub):
- details = 'test status message'
- code = 2
- status = grpc.StatusCode.UNKNOWN # code = 2
-
- # Test with a UnaryCall
- request = messages_pb2.SimpleRequest(
- response_type=messages_pb2.COMPRESSABLE,
- response_size=1,
- payload=messages_pb2.Payload(body=b'\x00'),
- response_status=messages_pb2.EchoStatus(code=code, message=details)
- )
- response_future = stub.UnaryCall.future(request)
- _validate_status_code_and_details(response_future, status, details)
-
- # Test with a FullDuplexCall
- with _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe)
- request = messages_pb2.StreamingOutputCallRequest(
+ details = 'test status message'
+ code = 2
+ status = grpc.StatusCode.UNKNOWN # code = 2
+
+ # Test with a UnaryCall
+ request = messages_pb2.SimpleRequest(
response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(
- messages_pb2.ResponseParameters(size=1),),
+ response_size=1,
payload=messages_pb2.Payload(body=b'\x00'),
- response_status=messages_pb2.EchoStatus(code=code, message=details))
- pipe.add(request) # sends the initial request.
- # Dropping out of with block closes the pipe
- _validate_status_code_and_details(response_iterator, status, details)
+ response_status=messages_pb2.EchoStatus(
+ code=code, message=details))
+ response_future = stub.UnaryCall.future(request)
+ _validate_status_code_and_details(response_future, status, details)
+
+ # Test with a FullDuplexCall
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe)
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(messages_pb2.ResponseParameters(size=1),),
+ payload=messages_pb2.Payload(body=b'\x00'),
+ response_status=messages_pb2.EchoStatus(
+ code=code, message=details))
+ pipe.add(request) # sends the initial request.
+ # Dropping out of with block closes the pipe
+ _validate_status_code_and_details(response_iterator, status, details)
def _unimplemented_method(test_service_stub):
- response_future = (
- test_service_stub.UnimplementedCall.future(empty_pb2.Empty()))
- _expect_status_code(response_future, grpc.StatusCode.UNIMPLEMENTED)
+ response_future = (
+ test_service_stub.UnimplementedCall.future(empty_pb2.Empty()))
+ _expect_status_code(response_future, grpc.StatusCode.UNIMPLEMENTED)
def _unimplemented_service(unimplemented_service_stub):
- response_future = (
- unimplemented_service_stub.UnimplementedCall.future(empty_pb2.Empty()))
- _expect_status_code(response_future, grpc.StatusCode.UNIMPLEMENTED)
+ response_future = (
+ unimplemented_service_stub.UnimplementedCall.future(empty_pb2.Empty()))
+ _expect_status_code(response_future, grpc.StatusCode.UNIMPLEMENTED)
def _custom_metadata(stub):
- initial_metadata_value = "test_initial_metadata_value"
- trailing_metadata_value = "\x0a\x0b\x0a\x0b\x0a\x0b"
- metadata = (
- (_INITIAL_METADATA_KEY, initial_metadata_value),
- (_TRAILING_METADATA_KEY, trailing_metadata_value))
-
- def _validate_metadata(response):
- initial_metadata = dict(response.initial_metadata())
- if initial_metadata[_INITIAL_METADATA_KEY] != initial_metadata_value:
- raise ValueError(
- 'expected initial metadata %s, got %s' % (
- initial_metadata_value, initial_metadata[_INITIAL_METADATA_KEY]))
- trailing_metadata = dict(response.trailing_metadata())
- if trailing_metadata[_TRAILING_METADATA_KEY] != trailing_metadata_value:
- raise ValueError(
- 'expected trailing metadata %s, got %s' % (
- trailing_metadata_value, initial_metadata[_TRAILING_METADATA_KEY]))
-
- # Testing with UnaryCall
- request = messages_pb2.SimpleRequest(
- response_type=messages_pb2.COMPRESSABLE,
- response_size=1,
- payload=messages_pb2.Payload(body=b'\x00'))
- response_future = stub.UnaryCall.future(request, metadata=metadata)
- _validate_metadata(response_future)
-
- # Testing with FullDuplexCall
- with _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe, metadata=metadata)
- request = messages_pb2.StreamingOutputCallRequest(
+ initial_metadata_value = "test_initial_metadata_value"
+ trailing_metadata_value = "\x0a\x0b\x0a\x0b\x0a\x0b"
+ metadata = ((_INITIAL_METADATA_KEY, initial_metadata_value),
+ (_TRAILING_METADATA_KEY, trailing_metadata_value))
+
+ def _validate_metadata(response):
+ initial_metadata = dict(response.initial_metadata())
+ if initial_metadata[_INITIAL_METADATA_KEY] != initial_metadata_value:
+ raise ValueError('expected initial metadata %s, got %s' %
+ (initial_metadata_value,
+ initial_metadata[_INITIAL_METADATA_KEY]))
+ trailing_metadata = dict(response.trailing_metadata())
+ if trailing_metadata[_TRAILING_METADATA_KEY] != trailing_metadata_value:
+ raise ValueError('expected trailing metadata %s, got %s' %
+ (trailing_metadata_value,
+ initial_metadata[_TRAILING_METADATA_KEY]))
+
+ # Testing with UnaryCall
+ request = messages_pb2.SimpleRequest(
response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(
- messages_pb2.ResponseParameters(size=1),))
- pipe.add(request) # Sends the request
- next(response_iterator) # Causes server to send trailing metadata
- # Dropping out of the with block closes the pipe
- _validate_metadata(response_iterator)
+ response_size=1,
+ payload=messages_pb2.Payload(body=b'\x00'))
+ response_future = stub.UnaryCall.future(request, metadata=metadata)
+ _validate_metadata(response_future)
+
+ # Testing with FullDuplexCall
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe, metadata=metadata)
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(messages_pb2.ResponseParameters(size=1),))
+ pipe.add(request) # Sends the request
+ next(response_iterator) # Causes server to send trailing metadata
+ # Dropping out of the with block closes the pipe
+ _validate_metadata(response_iterator)
+
def _compute_engine_creds(stub, args):
- response = _large_unary_common_behavior(stub, True, True, None)
- if args.default_service_account != response.username:
- raise ValueError(
- 'expected username %s, got %s' % (
- args.default_service_account, response.username))
+ response = _large_unary_common_behavior(stub, True, True, None)
+ if args.default_service_account != response.username:
+ raise ValueError('expected username %s, got %s' %
+ (args.default_service_account, response.username))
def _oauth2_auth_token(stub, args):
- json_key_filename = os.environ[
- oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
- wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
- response = _large_unary_common_behavior(stub, True, True, None)
- if wanted_email != response.username:
- raise ValueError(
- 'expected username %s, got %s' % (wanted_email, response.username))
- if args.oauth_scope.find(response.oauth_scope) == -1:
- raise ValueError(
- 'expected to find oauth scope "{}" in received "{}"'.format(
- response.oauth_scope, args.oauth_scope))
+ json_key_filename = os.environ[
+ oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
+ wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
+ response = _large_unary_common_behavior(stub, True, True, None)
+ if wanted_email != response.username:
+ raise ValueError('expected username %s, got %s' %
+ (wanted_email, response.username))
+ if args.oauth_scope.find(response.oauth_scope) == -1:
+ raise ValueError('expected to find oauth scope "{}" in received "{}"'.
+ format(response.oauth_scope, args.oauth_scope))
def _jwt_token_creds(stub, args):
- json_key_filename = os.environ[
- oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
- wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
- response = _large_unary_common_behavior(stub, True, False, None)
- if wanted_email != response.username:
- raise ValueError(
- 'expected username %s, got %s' % (wanted_email, response.username))
+ json_key_filename = os.environ[
+ oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
+ wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
+ response = _large_unary_common_behavior(stub, True, False, None)
+ if wanted_email != response.username:
+ raise ValueError('expected username %s, got %s' %
+ (wanted_email, response.username))
def _per_rpc_creds(stub, args):
- json_key_filename = os.environ[
- oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
- wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
- credentials = oauth2client_client.GoogleCredentials.get_application_default()
- scoped_credentials = credentials.create_scoped([args.oauth_scope])
- # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
- # remaining use of the Beta API.
- call_credentials = implementations.google_call_credentials(
- scoped_credentials)
- response = _large_unary_common_behavior(stub, True, False, call_credentials)
- if wanted_email != response.username:
- raise ValueError(
- 'expected username %s, got %s' % (wanted_email, response.username))
+ json_key_filename = os.environ[
+ oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
+ wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
+ credentials = oauth2client_client.GoogleCredentials.get_application_default(
+ )
+ scoped_credentials = credentials.create_scoped([args.oauth_scope])
+ # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
+ # remaining use of the Beta API.
+ call_credentials = implementations.google_call_credentials(
+ scoped_credentials)
+ response = _large_unary_common_behavior(stub, True, False, call_credentials)
+ if wanted_email != response.username:
+ raise ValueError('expected username %s, got %s' %
+ (wanted_email, response.username))
@enum.unique
class TestCase(enum.Enum):
- EMPTY_UNARY = 'empty_unary'
- LARGE_UNARY = 'large_unary'
- SERVER_STREAMING = 'server_streaming'
- CLIENT_STREAMING = 'client_streaming'
- PING_PONG = 'ping_pong'
- CANCEL_AFTER_BEGIN = 'cancel_after_begin'
- CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response'
- EMPTY_STREAM = 'empty_stream'
- STATUS_CODE_AND_MESSAGE = 'status_code_and_message'
- UNIMPLEMENTED_METHOD = 'unimplemented_method'
- UNIMPLEMENTED_SERVICE = 'unimplemented_service'
- CUSTOM_METADATA = "custom_metadata"
- COMPUTE_ENGINE_CREDS = 'compute_engine_creds'
- OAUTH2_AUTH_TOKEN = 'oauth2_auth_token'
- JWT_TOKEN_CREDS = 'jwt_token_creds'
- PER_RPC_CREDS = 'per_rpc_creds'
- TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
-
- def test_interoperability(self, stub, args):
- if self is TestCase.EMPTY_UNARY:
- _empty_unary(stub)
- elif self is TestCase.LARGE_UNARY:
- _large_unary(stub)
- elif self is TestCase.SERVER_STREAMING:
- _server_streaming(stub)
- elif self is TestCase.CLIENT_STREAMING:
- _client_streaming(stub)
- elif self is TestCase.PING_PONG:
- _ping_pong(stub)
- elif self is TestCase.CANCEL_AFTER_BEGIN:
- _cancel_after_begin(stub)
- elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE:
- _cancel_after_first_response(stub)
- elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER:
- _timeout_on_sleeping_server(stub)
- elif self is TestCase.EMPTY_STREAM:
- _empty_stream(stub)
- elif self is TestCase.STATUS_CODE_AND_MESSAGE:
- _status_code_and_message(stub)
- elif self is TestCase.UNIMPLEMENTED_METHOD:
- _unimplemented_method(stub)
- elif self is TestCase.UNIMPLEMENTED_SERVICE:
- _unimplemented_service(stub)
- elif self is TestCase.CUSTOM_METADATA:
- _custom_metadata(stub)
- elif self is TestCase.COMPUTE_ENGINE_CREDS:
- _compute_engine_creds(stub, args)
- elif self is TestCase.OAUTH2_AUTH_TOKEN:
- _oauth2_auth_token(stub, args)
- elif self is TestCase.JWT_TOKEN_CREDS:
- _jwt_token_creds(stub, args)
- elif self is TestCase.PER_RPC_CREDS:
- _per_rpc_creds(stub, args)
- else:
- raise NotImplementedError('Test case "%s" not implemented!' % self.name)
+ EMPTY_UNARY = 'empty_unary'
+ LARGE_UNARY = 'large_unary'
+ SERVER_STREAMING = 'server_streaming'
+ CLIENT_STREAMING = 'client_streaming'
+ PING_PONG = 'ping_pong'
+ CANCEL_AFTER_BEGIN = 'cancel_after_begin'
+ CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response'
+ EMPTY_STREAM = 'empty_stream'
+ STATUS_CODE_AND_MESSAGE = 'status_code_and_message'
+ UNIMPLEMENTED_METHOD = 'unimplemented_method'
+ UNIMPLEMENTED_SERVICE = 'unimplemented_service'
+ CUSTOM_METADATA = "custom_metadata"
+ COMPUTE_ENGINE_CREDS = 'compute_engine_creds'
+ OAUTH2_AUTH_TOKEN = 'oauth2_auth_token'
+ JWT_TOKEN_CREDS = 'jwt_token_creds'
+ PER_RPC_CREDS = 'per_rpc_creds'
+ TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
+
+ def test_interoperability(self, stub, args):
+ if self is TestCase.EMPTY_UNARY:
+ _empty_unary(stub)
+ elif self is TestCase.LARGE_UNARY:
+ _large_unary(stub)
+ elif self is TestCase.SERVER_STREAMING:
+ _server_streaming(stub)
+ elif self is TestCase.CLIENT_STREAMING:
+ _client_streaming(stub)
+ elif self is TestCase.PING_PONG:
+ _ping_pong(stub)
+ elif self is TestCase.CANCEL_AFTER_BEGIN:
+ _cancel_after_begin(stub)
+ elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE:
+ _cancel_after_first_response(stub)
+ elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER:
+ _timeout_on_sleeping_server(stub)
+ elif self is TestCase.EMPTY_STREAM:
+ _empty_stream(stub)
+ elif self is TestCase.STATUS_CODE_AND_MESSAGE:
+ _status_code_and_message(stub)
+ elif self is TestCase.UNIMPLEMENTED_METHOD:
+ _unimplemented_method(stub)
+ elif self is TestCase.UNIMPLEMENTED_SERVICE:
+ _unimplemented_service(stub)
+ elif self is TestCase.CUSTOM_METADATA:
+ _custom_metadata(stub)
+ elif self is TestCase.COMPUTE_ENGINE_CREDS:
+ _compute_engine_creds(stub, args)
+ elif self is TestCase.OAUTH2_AUTH_TOKEN:
+ _oauth2_auth_token(stub, args)
+ elif self is TestCase.JWT_TOKEN_CREDS:
+ _jwt_token_creds(stub, args)
+ elif self is TestCase.PER_RPC_CREDS:
+ _per_rpc_creds(stub, args)
+ else:
+ raise NotImplementedError('Test case "%s" not implemented!' %
+ self.name)
diff --git a/src/python/grpcio_tests/tests/interop/resources.py b/src/python/grpcio_tests/tests/interop/resources.py
index c424385cf6..2ec2eb92b4 100644
--- a/src/python/grpcio_tests/tests/interop/resources.py
+++ b/src/python/grpcio_tests/tests/interop/resources.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Constants and functions for data used in interoperability testing."""
import argparse
@@ -40,22 +39,22 @@ _CERTIFICATE_CHAIN_RESOURCE_PATH = 'credentials/server1.pem'
def test_root_certificates():
- return pkg_resources.resource_string(
- __name__, _ROOT_CERTIFICATES_RESOURCE_PATH)
+ return pkg_resources.resource_string(__name__,
+ _ROOT_CERTIFICATES_RESOURCE_PATH)
def private_key():
- return pkg_resources.resource_string(__name__, _PRIVATE_KEY_RESOURCE_PATH)
+ return pkg_resources.resource_string(__name__, _PRIVATE_KEY_RESOURCE_PATH)
def certificate_chain():
- return pkg_resources.resource_string(
- __name__, _CERTIFICATE_CHAIN_RESOURCE_PATH)
+ return pkg_resources.resource_string(__name__,
+ _CERTIFICATE_CHAIN_RESOURCE_PATH)
def parse_bool(value):
- if value == 'true':
- return True
- if value == 'false':
- return False
- raise argparse.ArgumentTypeError('Only true/false allowed')
+ if value == 'true':
+ return True
+ if value == 'false':
+ return False
+ raise argparse.ArgumentTypeError('Only true/false allowed')
diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py
index 1ae83bc57d..65f1604eb8 100644
--- a/src/python/grpcio_tests/tests/interop/server.py
+++ b/src/python/grpcio_tests/tests/interop/server.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""The Python implementation of the GRPC interoperability test server."""
import argparse
@@ -44,34 +43,36 @@ _ONE_DAY_IN_SECONDS = 60 * 60 * 24
def serve():
- parser = argparse.ArgumentParser()
- parser.add_argument(
- '--port', help='the port on which to serve', type=int)
- parser.add_argument(
- '--use_tls', help='require a secure connection',
- default=False, type=resources.parse_bool)
- args = parser.parse_args()
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--port', help='the port on which to serve', type=int)
+ parser.add_argument(
+ '--use_tls',
+ help='require a secure connection',
+ default=False,
+ type=resources.parse_bool)
+ args = parser.parse_args()
+
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ test_pb2.add_TestServiceServicer_to_server(methods.TestService(), server)
+ if args.use_tls:
+ private_key = resources.private_key()
+ certificate_chain = resources.certificate_chain()
+ credentials = grpc.ssl_server_credentials((
+ (private_key, certificate_chain),))
+ server.add_secure_port('[::]:{}'.format(args.port), credentials)
+ else:
+ server.add_insecure_port('[::]:{}'.format(args.port))
- server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
- test_pb2.add_TestServiceServicer_to_server(methods.TestService(), server)
- if args.use_tls:
- private_key = resources.private_key()
- certificate_chain = resources.certificate_chain()
- credentials = grpc.ssl_server_credentials(
- ((private_key, certificate_chain),))
- server.add_secure_port('[::]:{}'.format(args.port), credentials)
- else:
- server.add_insecure_port('[::]:{}'.format(args.port))
+ server.start()
+ logging.info('Server serving.')
+ try:
+ while True:
+ time.sleep(_ONE_DAY_IN_SECONDS)
+ except BaseException as e:
+ logging.info('Caught exception "%s"; stopping server...', e)
+ server.stop(None)
+ logging.info('Server stopped; exiting.')
- server.start()
- logging.info('Server serving.')
- try:
- while True:
- time.sleep(_ONE_DAY_IN_SECONDS)
- except BaseException as e:
- logging.info('Caught exception "%s"; stopping server...', e)
- server.stop(None)
- logging.info('Server stopped; exiting.')
if __name__ == '__main__':
- serve()
+ serve()