aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_tests/tests')
-rw-r--r--src/python/grpcio_tests/tests/_sanity/__init__.py (renamed from src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/__init__.py)0
-rw-r--r--src/python/grpcio_tests/tests/_sanity/_sanity_test.py (renamed from src/python/grpcio_tests/tests/unit/_sanity/_sanity_test.py)17
-rw-r--r--src/python/grpcio_tests/tests/http2/negative_http2_client.py4
-rw-r--r--src/python/grpcio_tests/tests/interop/client.py6
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py23
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py519
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py341
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/sub/messages.proto (renamed from src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/messages.proto)0
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_services/__init__.py13
-rw-r--r--src/python/grpcio_tests/tests/qps/benchmark_client.py4
-rw-r--r--src/python/grpcio_tests/tests/qps/benchmark_server.py6
-rw-r--r--src/python/grpcio_tests/tests/stress/client.py4
-rw-r--r--src/python/grpcio_tests/tests/stress/metrics_server.py3
-rw-r--r--src/python/grpcio_tests/tests/tests.json16
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_common.py118
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py131
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py126
-rw-r--r--src/python/grpcio_tests/tests/unit/_sanity/__init__.py13
18 files changed, 933 insertions, 411 deletions
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/__init__.py b/src/python/grpcio_tests/tests/_sanity/__init__.py
index 5772620b60..5772620b60 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/__init__.py
+++ b/src/python/grpcio_tests/tests/_sanity/__init__.py
diff --git a/src/python/grpcio_tests/tests/unit/_sanity/_sanity_test.py b/src/python/grpcio_tests/tests/_sanity/_sanity_test.py
index 19bc8801eb..b4079850ff 100644
--- a/src/python/grpcio_tests/tests/unit/_sanity/_sanity_test.py
+++ b/src/python/grpcio_tests/tests/_sanity/_sanity_test.py
@@ -21,24 +21,25 @@ import six
import tests
-class Sanity(unittest.TestCase):
+class SanityTest(unittest.TestCase):
+
+ maxDiff = 32768
def testTestsJsonUpToDate(self):
"""Autodiscovers all test suites and checks that tests.json is up to date"""
loader = tests.Loader()
loader.loadTestsFromNames(['tests'])
- test_suite_names = [
+ test_suite_names = sorted({
test_case_class.id().rsplit('.', 1)[0]
for test_case_class in tests._loader.iterate_suite_cases(
loader.suite)
- ]
- test_suite_names = sorted(set(test_suite_names))
+ })
tests_json_string = pkg_resources.resource_string('tests', 'tests.json')
- if six.PY3:
- tests_json_string = tests_json_string.decode()
- tests_json = json.loads(tests_json_string)
- self.assertListEqual(test_suite_names, tests_json)
+ tests_json = json.loads(tests_json_string.decode()
+ if six.PY3 else tests_json_string)
+
+ self.assertSequenceEqual(tests_json, test_suite_names)
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/http2/negative_http2_client.py b/src/python/grpcio_tests/tests/http2/negative_http2_client.py
index 6d8a6bce77..8dab5b67f1 100644
--- a/src/python/grpcio_tests/tests/http2/negative_http2_client.py
+++ b/src/python/grpcio_tests/tests/http2/negative_http2_client.py
@@ -17,7 +17,7 @@ import argparse
import grpc
import time
-from src.proto.grpc.testing import test_pb2
+from src.proto.grpc.testing import test_pb2_grpc
from src.proto.grpc.testing import messages_pb2
@@ -147,7 +147,7 @@ def _stub(server_host, server_port):
target = '{}:{}'.format(server_host, server_port)
channel = grpc.insecure_channel(target)
grpc.channel_ready_future(channel).result()
- return test_pb2.TestServiceStub(channel)
+ return test_pb2_grpc.TestServiceStub(channel)
def main():
diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py
index 47ae96472d..e520c08290 100644
--- a/src/python/grpcio_tests/tests/interop/client.py
+++ b/src/python/grpcio_tests/tests/interop/client.py
@@ -19,7 +19,7 @@ import os
from google import auth as google_auth
from google.auth import jwt as google_auth_jwt
import grpc
-from src.proto.grpc.testing import test_pb2
+from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import methods
from tests.interop import resources
@@ -106,9 +106,9 @@ def _stub(args):
else:
channel = grpc.insecure_channel(target)
if args.test_case == "unimplemented_service":
- return test_pb2.UnimplementedServiceStub(channel)
+ return test_pb2_grpc.UnimplementedServiceStub(channel)
else:
- return test_pb2.TestServiceStub(channel)
+ return test_pb2_grpc.TestServiceStub(channel)
def _test_case_from_arg(test_case_arg):
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py
index 71493bfec6..5b84001aab 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py
+++ b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py
@@ -33,7 +33,7 @@ from tests.unit.framework.common import test_constants
import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2
import tests.protoc_plugin.protos.requests.r.test_requests_pb2 as request_pb2
import tests.protoc_plugin.protos.responses.test_responses_pb2 as response_pb2
-import tests.protoc_plugin.protos.service.test_service_pb2 as service_pb2
+import tests.protoc_plugin.protos.service.test_service_pb2_grpc as service_pb2_grpc
# Identifiers of entities we expect to find in the generated module.
STUB_IDENTIFIER = 'TestServiceStub'
@@ -138,7 +138,7 @@ def _CreateService():
"""
servicer_methods = _ServicerMethods()
- class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
+ class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
def UnaryCall(self, request, context):
return servicer_methods.UnaryCall(request, context)
@@ -157,11 +157,12 @@ def _CreateService():
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE))
- getattr(service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), server)
+ getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
+ server)
port = server.add_insecure_port('[::]:0')
server.start()
channel = grpc.insecure_channel('localhost:{}'.format(port))
- stub = getattr(service_pb2, STUB_IDENTIFIER)(channel)
+ stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
return _Service(servicer_methods, server, stub)
@@ -173,16 +174,17 @@ def _CreateIncompleteService():
servicer_methods implements none of the methods required of it.
"""
- class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
+ class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
pass
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE))
- getattr(service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), server)
+ getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
+ server)
port = server.add_insecure_port('[::]:0')
server.start()
channel = grpc.insecure_channel('localhost:{}'.format(port))
- stub = getattr(service_pb2, STUB_IDENTIFIER)(channel)
+ stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel)
return _Service(None, server, stub)
@@ -223,10 +225,11 @@ class PythonPluginTest(unittest.TestCase):
def testImportAttributes(self):
# check that we can access the generated module and its members.
- self.assertIsNotNone(getattr(service_pb2, STUB_IDENTIFIER, None))
- self.assertIsNotNone(getattr(service_pb2, SERVICER_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(service_pb2_grpc, STUB_IDENTIFIER, None))
self.assertIsNotNone(
- getattr(service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER, None))
+ getattr(service_pb2_grpc, SERVICER_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER, None))
def testUpDown(self):
service = _CreateService()
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py
index 1aeb62a7c5..7868cdbfb3 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py
+++ b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py
@@ -12,22 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import collections
+import abc
from concurrent import futures
import contextlib
-import distutils.spawn
-import errno
import importlib
import os
-import os.path
+from os import path
import pkgutil
+import platform
import shutil
-import subprocess
import sys
import tempfile
-import threading
import unittest
-import platform
+
+import six
import grpc
from grpc_tools import protoc
@@ -37,292 +35,285 @@ _MESSAGES_IMPORT = b'import "messages.proto";'
_SPLIT_NAMESPACE = b'package grpc_protoc_plugin.invocation_testing.split;'
_COMMON_NAMESPACE = b'package grpc_protoc_plugin.invocation_testing;'
+_RELATIVE_PROTO_PATH = 'relative_proto_path'
+_RELATIVE_PYTHON_OUT = 'relative_python_out'
+
@contextlib.contextmanager
-def _system_path(path):
+def _system_path(path_insertion):
old_system_path = sys.path[:]
- sys.path = sys.path[0:1] + path + sys.path[1:]
+ sys.path = sys.path[0:1] + path_insertion + sys.path[1:]
yield
sys.path = old_system_path
-class DummySplitServicer(object):
+# NOTE(nathaniel): https://twitter.com/exoplaneteer/status/677259364256747520
+# Life lesson "just always default to idempotence" reinforced.
+def _create_directory_tree(root, path_components_sequence):
+ created = set()
+ for path_components in path_components_sequence:
+ thus_far = ''
+ for path_component in path_components:
+ relative_path = path.join(thus_far, path_component)
+ if relative_path not in created:
+ os.makedirs(path.join(root, relative_path))
+ created.add(relative_path)
+ thus_far = path.join(thus_far, path_component)
+
+
+def _massage_proto_content(proto_content, test_name_bytes,
+ messages_proto_relative_file_name_bytes):
+ package_substitution = (b'package grpc_protoc_plugin.invocation_testing.' +
+ test_name_bytes + b';')
+ common_namespace_substituted = proto_content.replace(_COMMON_NAMESPACE,
+ package_substitution)
+ split_namespace_substituted = common_namespace_substituted.replace(
+ _SPLIT_NAMESPACE, package_substitution)
+ message_import_replaced = split_namespace_substituted.replace(
+ _MESSAGES_IMPORT,
+ b'import "' + messages_proto_relative_file_name_bytes + b'";')
+ return message_import_replaced
+
+
+def _packagify(directory):
+ for subdirectory, _, _ in os.walk(directory):
+ init_file_name = path.join(subdirectory, '__init__.py')
+ with open(init_file_name, 'wb') as init_file:
+ init_file.write(b'')
- def __init__(self, request_class, response_class):
- self.request_class = request_class
- self.response_class = response_class
+
+class _Servicer(object):
+
+ def __init__(self, response_class):
+ self._response_class = response_class
def Call(self, request, context):
- return self.response_class()
+ return self._response_class()
-class SeparateTestMixin(object):
+def _protoc(proto_path, python_out, grpc_python_out_flag, grpc_python_out,
+ absolute_proto_file_names):
+ args = [
+ '',
+ '--proto_path={}'.format(proto_path),
+ ]
+ if python_out is not None:
+ args.append('--python_out={}'.format(python_out))
+ if grpc_python_out is not None:
+ args.append('--grpc_python_out={}:{}'.format(grpc_python_out_flag,
+ grpc_python_out))
+ args.extend(absolute_proto_file_names)
+ return protoc.main(args)
- def testImportAttributes(self):
- with _system_path([self.python_out_directory]):
- pb2 = importlib.import_module(self.pb2_import)
- pb2.Request
- pb2.Response
- if self.should_find_services_in_pb2:
- pb2.TestServiceServicer
- else:
- with self.assertRaises(AttributeError):
- pb2.TestServiceServicer
-
- with _system_path([self.grpc_python_out_directory]):
- pb2_grpc = importlib.import_module(self.pb2_grpc_import)
- pb2_grpc.TestServiceServicer
- with self.assertRaises(AttributeError):
- pb2_grpc.Request
- with self.assertRaises(AttributeError):
- pb2_grpc.Response
-
- def testCall(self):
- with _system_path([self.python_out_directory]):
- pb2 = importlib.import_module(self.pb2_import)
- with _system_path([self.grpc_python_out_directory]):
- pb2_grpc = importlib.import_module(self.pb2_grpc_import)
- server = grpc.server(
- futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE))
- pb2_grpc.add_TestServiceServicer_to_server(
- DummySplitServicer(pb2.Request, pb2.Response), server)
- port = server.add_insecure_port('[::]:0')
- server.start()
- channel = grpc.insecure_channel('localhost:{}'.format(port))
- stub = pb2_grpc.TestServiceStub(channel)
- request = pb2.Request()
- expected_response = pb2.Response()
- response = stub.Call(request)
- self.assertEqual(expected_response, response)
-
-
-class CommonTestMixin(object):
-
- def testImportAttributes(self):
- with _system_path([self.python_out_directory]):
- pb2 = importlib.import_module(self.pb2_import)
- pb2.Request
- pb2.Response
- if self.should_find_services_in_pb2:
- pb2.TestServiceServicer
- else:
- with self.assertRaises(AttributeError):
- pb2.TestServiceServicer
-
- with _system_path([self.grpc_python_out_directory]):
- pb2_grpc = importlib.import_module(self.pb2_grpc_import)
- pb2_grpc.TestServiceServicer
- with self.assertRaises(AttributeError):
- pb2_grpc.Request
- with self.assertRaises(AttributeError):
- pb2_grpc.Response
-
- def testCall(self):
- with _system_path([self.python_out_directory]):
- pb2 = importlib.import_module(self.pb2_import)
- with _system_path([self.grpc_python_out_directory]):
- pb2_grpc = importlib.import_module(self.pb2_grpc_import)
- server = grpc.server(
- futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE))
- pb2_grpc.add_TestServiceServicer_to_server(
- DummySplitServicer(pb2.Request, pb2.Response), server)
- port = server.add_insecure_port('[::]:0')
- server.start()
- channel = grpc.insecure_channel('localhost:{}'.format(port))
- stub = pb2_grpc.TestServiceStub(channel)
- request = pb2.Request()
- expected_response = pb2.Response()
- response = stub.Call(request)
- self.assertEqual(expected_response, response)
-
-
-@unittest.skipIf(platform.python_implementation() == "PyPy",
- "Skip test if run with PyPy")
-class SameSeparateTest(unittest.TestCase, SeparateTestMixin):
- def setUp(self):
- same_proto_contents = pkgutil.get_data(
- 'tests.protoc_plugin.protos.invocation_testing', 'same.proto')
- self.directory = tempfile.mkdtemp(suffix='same_separate', dir='.')
- self.proto_directory = os.path.join(self.directory, 'proto_path')
- self.python_out_directory = os.path.join(self.directory, 'python_out')
- self.grpc_python_out_directory = os.path.join(self.directory,
- 'grpc_python_out')
- os.makedirs(self.proto_directory)
- os.makedirs(self.python_out_directory)
- os.makedirs(self.grpc_python_out_directory)
- same_proto_file = os.path.join(self.proto_directory,
- 'same_separate.proto')
- open(same_proto_file, 'wb').write(
- same_proto_contents.replace(
- _COMMON_NAMESPACE,
- b'package grpc_protoc_plugin.invocation_testing.same_separate;'))
- protoc_result = protoc.main([
- '',
- '--proto_path={}'.format(self.proto_directory),
- '--python_out={}'.format(self.python_out_directory),
- '--grpc_python_out=grpc_2_0:{}'.format(
- self.grpc_python_out_directory),
- same_proto_file,
- ])
- if protoc_result != 0:
- raise Exception("unexpected protoc error")
- open(os.path.join(self.grpc_python_out_directory, '__init__.py'),
- 'w').write('')
- open(os.path.join(self.python_out_directory, '__init__.py'),
- 'w').write('')
- self.pb2_import = 'same_separate_pb2'
- self.pb2_grpc_import = 'same_separate_pb2_grpc'
- self.should_find_services_in_pb2 = False
+class _Mid2016ProtocStyle(object):
- def tearDown(self):
- shutil.rmtree(self.directory)
+ def name(self):
+ return 'Mid2016ProtocStyle'
+ def grpc_in_pb2_expected(self):
+ return True
-@unittest.skipIf(platform.python_implementation() == "PyPy",
- "Skip test if run with PyPy")
-class SameCommonTest(unittest.TestCase, CommonTestMixin):
+ def protoc(self, proto_path, python_out, absolute_proto_file_names):
+ return (_protoc(proto_path, python_out, 'grpc_1_0', python_out,
+ absolute_proto_file_names),)
- def setUp(self):
- same_proto_contents = pkgutil.get_data(
- 'tests.protoc_plugin.protos.invocation_testing', 'same.proto')
- self.directory = tempfile.mkdtemp(suffix='same_common', dir='.')
- self.proto_directory = os.path.join(self.directory, 'proto_path')
- self.python_out_directory = os.path.join(self.directory, 'python_out')
- self.grpc_python_out_directory = self.python_out_directory
- os.makedirs(self.proto_directory)
- os.makedirs(self.python_out_directory)
- same_proto_file = os.path.join(self.proto_directory,
- 'same_common.proto')
- open(same_proto_file, 'wb').write(
- same_proto_contents.replace(
- _COMMON_NAMESPACE,
- b'package grpc_protoc_plugin.invocation_testing.same_common;'))
-
- protoc_result = protoc.main([
- '',
- '--proto_path={}'.format(self.proto_directory),
- '--python_out={}'.format(self.python_out_directory),
- '--grpc_python_out={}'.format(self.grpc_python_out_directory),
- same_proto_file,
- ])
- if protoc_result != 0:
- raise Exception("unexpected protoc error")
- open(os.path.join(self.python_out_directory, '__init__.py'),
- 'w').write('')
- self.pb2_import = 'same_common_pb2'
- self.pb2_grpc_import = 'same_common_pb2_grpc'
- self.should_find_services_in_pb2 = True
- def tearDown(self):
- shutil.rmtree(self.directory)
+class _SingleProtocExecutionProtocStyle(object):
+ def name(self):
+ return 'SingleProtocExecutionProtocStyle'
-@unittest.skipIf(platform.python_implementation() == "PyPy",
- "Skip test if run with PyPy")
-class SplitCommonTest(unittest.TestCase, CommonTestMixin):
+ def grpc_in_pb2_expected(self):
+ return False
- def setUp(self):
- services_proto_contents = pkgutil.get_data(
- 'tests.protoc_plugin.protos.invocation_testing.split_services',
- 'services.proto')
- messages_proto_contents = pkgutil.get_data(
- 'tests.protoc_plugin.protos.invocation_testing.split_messages',
- 'messages.proto')
- self.directory = tempfile.mkdtemp(suffix='split_common', dir='.')
- self.proto_directory = os.path.join(self.directory, 'proto_path')
- self.python_out_directory = os.path.join(self.directory, 'python_out')
- self.grpc_python_out_directory = self.python_out_directory
- os.makedirs(self.proto_directory)
- os.makedirs(self.python_out_directory)
- services_proto_file = os.path.join(self.proto_directory,
- 'split_common_services.proto')
- messages_proto_file = os.path.join(self.proto_directory,
- 'split_common_messages.proto')
- open(services_proto_file, 'wb').write(
- services_proto_contents.replace(
- _MESSAGES_IMPORT, b'import "split_common_messages.proto";')
- .replace(
- _SPLIT_NAMESPACE,
- b'package grpc_protoc_plugin.invocation_testing.split_common;'))
- open(messages_proto_file, 'wb').write(
- messages_proto_contents.replace(
- _SPLIT_NAMESPACE,
- b'package grpc_protoc_plugin.invocation_testing.split_common;'))
- protoc_result = protoc.main([
- '',
- '--proto_path={}'.format(self.proto_directory),
- '--python_out={}'.format(self.python_out_directory),
- '--grpc_python_out={}'.format(self.grpc_python_out_directory),
- services_proto_file,
- messages_proto_file,
- ])
- if protoc_result != 0:
- raise Exception("unexpected protoc error")
- open(os.path.join(self.python_out_directory, '__init__.py'),
- 'w').write('')
- self.pb2_import = 'split_common_messages_pb2'
- self.pb2_grpc_import = 'split_common_services_pb2_grpc'
- self.should_find_services_in_pb2 = False
+ def protoc(self, proto_path, python_out, absolute_proto_file_names):
+ return (_protoc(proto_path, python_out, 'grpc_2_0', python_out,
+ absolute_proto_file_names),)
+
+
+class _ProtoBeforeGrpcProtocStyle(object):
+
+ def name(self):
+ return 'ProtoBeforeGrpcProtocStyle'
+
+ def grpc_in_pb2_expected(self):
+ return False
+
+ def protoc(self, proto_path, python_out, absolute_proto_file_names):
+ pb2_protoc_exit_code = _protoc(proto_path, python_out, None, None,
+ absolute_proto_file_names)
+ pb2_grpc_protoc_exit_code = _protoc(
+ proto_path, None, 'grpc_2_0', python_out, absolute_proto_file_names)
+ return pb2_protoc_exit_code, pb2_grpc_protoc_exit_code,
- def tearDown(self):
- shutil.rmtree(self.directory)
+class _GrpcBeforeProtoProtocStyle(object):
-@unittest.skipIf(platform.python_implementation() == "PyPy",
- "Skip test if run with PyPy")
-class SplitSeparateTest(unittest.TestCase, SeparateTestMixin):
+ def name(self):
+ return 'GrpcBeforeProtoProtocStyle'
+
+ def grpc_in_pb2_expected(self):
+ return False
+
+ def protoc(self, proto_path, python_out, absolute_proto_file_names):
+ pb2_grpc_protoc_exit_code = _protoc(
+ proto_path, None, 'grpc_2_0', python_out, absolute_proto_file_names)
+ pb2_protoc_exit_code = _protoc(proto_path, python_out, None, None,
+ absolute_proto_file_names)
+ return pb2_grpc_protoc_exit_code, pb2_protoc_exit_code,
+
+
+_PROTOC_STYLES = (_Mid2016ProtocStyle(), _SingleProtocExecutionProtocStyle(),
+ _ProtoBeforeGrpcProtocStyle(), _GrpcBeforeProtoProtocStyle(),)
+
+
+@unittest.skipIf(platform.python_implementation() == 'PyPy',
+ 'Skip test if run with PyPy!')
+class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)):
def setUp(self):
- services_proto_contents = pkgutil.get_data(
- 'tests.protoc_plugin.protos.invocation_testing.split_services',
- 'services.proto')
- messages_proto_contents = pkgutil.get_data(
- 'tests.protoc_plugin.protos.invocation_testing.split_messages',
- 'messages.proto')
- self.directory = tempfile.mkdtemp(suffix='split_separate', dir='.')
- self.proto_directory = os.path.join(self.directory, 'proto_path')
- self.python_out_directory = os.path.join(self.directory, 'python_out')
- self.grpc_python_out_directory = os.path.join(self.directory,
- 'grpc_python_out')
- os.makedirs(self.proto_directory)
- os.makedirs(self.python_out_directory)
- os.makedirs(self.grpc_python_out_directory)
- services_proto_file = os.path.join(self.proto_directory,
- 'split_separate_services.proto')
- messages_proto_file = os.path.join(self.proto_directory,
- 'split_separate_messages.proto')
- open(services_proto_file, 'wb').write(
- services_proto_contents.replace(
- _MESSAGES_IMPORT, b'import "split_separate_messages.proto";')
- .replace(
- _SPLIT_NAMESPACE,
- b'package grpc_protoc_plugin.invocation_testing.split_separate;'
- ))
- open(messages_proto_file, 'wb').write(
- messages_proto_contents.replace(
- _SPLIT_NAMESPACE,
- b'package grpc_protoc_plugin.invocation_testing.split_separate;'
- ))
- protoc_result = protoc.main([
- '',
- '--proto_path={}'.format(self.proto_directory),
- '--python_out={}'.format(self.python_out_directory),
- '--grpc_python_out=grpc_2_0:{}'.format(
- self.grpc_python_out_directory),
- services_proto_file,
- messages_proto_file,
- ])
- if protoc_result != 0:
- raise Exception("unexpected protoc error")
- open(os.path.join(self.python_out_directory, '__init__.py'),
- 'w').write('')
- self.pb2_import = 'split_separate_messages_pb2'
- self.pb2_grpc_import = 'split_separate_services_pb2_grpc'
- self.should_find_services_in_pb2 = False
+ self._directory = tempfile.mkdtemp(suffix=self.NAME, dir='.')
+ self._proto_path = path.join(self._directory, _RELATIVE_PROTO_PATH)
+ self._python_out = path.join(self._directory, _RELATIVE_PYTHON_OUT)
+
+ os.makedirs(self._proto_path)
+ os.makedirs(self._python_out)
+
+ proto_directories_and_names = {
+ (self.MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES,
+ self.MESSAGES_PROTO_FILE_NAME,),
+ (self.SERVICES_PROTO_RELATIVE_DIRECTORY_NAMES,
+ self.SERVICES_PROTO_FILE_NAME,),
+ }
+ messages_proto_relative_file_name_forward_slashes = '/'.join(
+ self.MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES + (
+ self.MESSAGES_PROTO_FILE_NAME,))
+ _create_directory_tree(self._proto_path, (
+ relative_proto_directory_names
+ for relative_proto_directory_names, _ in proto_directories_and_names
+ ))
+ self._absolute_proto_file_names = set()
+ for relative_directory_names, file_name in proto_directories_and_names:
+ absolute_proto_file_name = path.join(
+ self._proto_path, *relative_directory_names + (file_name,))
+ raw_proto_content = pkgutil.get_data(
+ 'tests.protoc_plugin.protos.invocation_testing',
+ path.join(*relative_directory_names + (file_name,)))
+ massaged_proto_content = _massage_proto_content(
+ raw_proto_content,
+ self.NAME.encode(),
+ messages_proto_relative_file_name_forward_slashes.encode())
+ with open(absolute_proto_file_name, 'wb') as proto_file:
+ proto_file.write(massaged_proto_content)
+ self._absolute_proto_file_names.add(absolute_proto_file_name)
def tearDown(self):
- shutil.rmtree(self.directory)
+ shutil.rmtree(self._directory)
+
+ def _protoc(self):
+ protoc_exit_codes = self.PROTOC_STYLE.protoc(
+ self._proto_path, self._python_out, self._absolute_proto_file_names)
+ for protoc_exit_code in protoc_exit_codes:
+ self.assertEqual(0, protoc_exit_code)
+
+ _packagify(self._python_out)
+
+ generated_modules = {}
+ expected_generated_full_module_names = {
+ self.EXPECTED_MESSAGES_PB2,
+ self.EXPECTED_SERVICES_PB2,
+ self.EXPECTED_SERVICES_PB2_GRPC,
+ }
+ with _system_path([self._python_out]):
+ for full_module_name in expected_generated_full_module_names:
+ module = importlib.import_module(full_module_name)
+ generated_modules[full_module_name] = module
+
+ self._messages_pb2 = generated_modules[self.EXPECTED_MESSAGES_PB2]
+ self._services_pb2 = generated_modules[self.EXPECTED_SERVICES_PB2]
+ self._services_pb2_grpc = generated_modules[
+ self.EXPECTED_SERVICES_PB2_GRPC]
+
+ def _services_modules(self):
+ if self.PROTOC_STYLE.grpc_in_pb2_expected():
+ return self._services_pb2, self._services_pb2_grpc,
+ else:
+ return self._services_pb2_grpc,
+
+ def test_imported_attributes(self):
+ self._protoc()
+
+ self._messages_pb2.Request
+ self._messages_pb2.Response
+ self._services_pb2.DESCRIPTOR.services_by_name['TestService']
+ for services_module in self._services_modules():
+ services_module.TestServiceStub
+ services_module.TestServiceServicer
+ services_module.add_TestServiceServicer_to_server
+
+ def test_call(self):
+ self._protoc()
+
+ for services_module in self._services_modules():
+ server = grpc.server(
+ futures.ThreadPoolExecutor(
+ max_workers=test_constants.POOL_SIZE))
+ services_module.add_TestServiceServicer_to_server(
+ _Servicer(self._messages_pb2.Response), server)
+ port = server.add_insecure_port('[::]:0')
+ server.start()
+ channel = grpc.insecure_channel('localhost:{}'.format(port))
+ stub = services_module.TestServiceStub(channel)
+ response = stub.Call(self._messages_pb2.Request())
+ self.assertEqual(self._messages_pb2.Response(), response)
+
+
+def _create_test_case_class(split_proto, protoc_style):
+ attributes = {}
+
+ name = '{}{}'.format('SplitProto' if split_proto else 'SameProto',
+ protoc_style.name())
+ attributes['NAME'] = name
+
+ if split_proto:
+ attributes['MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES'] = (
+ 'split_messages', 'sub',)
+ attributes['MESSAGES_PROTO_FILE_NAME'] = 'messages.proto'
+ attributes['SERVICES_PROTO_RELATIVE_DIRECTORY_NAMES'] = (
+ 'split_services',)
+ attributes['SERVICES_PROTO_FILE_NAME'] = 'services.proto'
+ attributes['EXPECTED_MESSAGES_PB2'] = 'split_messages.sub.messages_pb2'
+ attributes['EXPECTED_SERVICES_PB2'] = 'split_services.services_pb2'
+ attributes['EXPECTED_SERVICES_PB2_GRPC'] = (
+ 'split_services.services_pb2_grpc')
+ else:
+ attributes['MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES'] = ()
+ attributes['MESSAGES_PROTO_FILE_NAME'] = 'same.proto'
+ attributes['SERVICES_PROTO_RELATIVE_DIRECTORY_NAMES'] = ()
+ attributes['SERVICES_PROTO_FILE_NAME'] = 'same.proto'
+ attributes['EXPECTED_MESSAGES_PB2'] = 'same_pb2'
+ attributes['EXPECTED_SERVICES_PB2'] = 'same_pb2'
+ attributes['EXPECTED_SERVICES_PB2_GRPC'] = 'same_pb2_grpc'
+
+ attributes['PROTOC_STYLE'] = protoc_style
+
+ attributes['__module__'] = _Test.__module__
+
+ return type('{}Test'.format(name), (_Test,), attributes)
+
+
+def _create_test_case_classes():
+ for split_proto in (False, True,):
+ for protoc_style in _PROTOC_STYLES:
+ yield _create_test_case_class(split_proto, protoc_style)
+
+
+def load_tests(loader, tests, pattern):
+ tests = tuple(
+ loader.loadTestsFromTestCase(test_case_class)
+ for test_case_class in _create_test_case_classes())
+ return unittest.TestSuite(tests=tests)
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
index 83f21ecbbb..424b153ff8 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
+++ b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py
@@ -12,19 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import argparse
import contextlib
-import distutils.spawn
-import errno
-import itertools
+import importlib
import os
-import pkg_resources
+from os import path
+import pkgutil
import shutil
-import subprocess
import sys
import tempfile
import threading
-import time
import unittest
from six import moves
@@ -33,12 +29,22 @@ from grpc.beta import implementations
from grpc.beta import interfaces
from grpc.framework.foundation import future
from grpc.framework.interfaces.face import face
+from grpc_tools import protoc
from tests.unit.framework.common import test_constants
-import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2
-import tests.protoc_plugin.protos.requests.r.test_requests_pb2 as request_pb2
-import tests.protoc_plugin.protos.responses.test_responses_pb2 as response_pb2
-import tests.protoc_plugin.protos.service.test_service_pb2 as service_pb2
+_RELATIVE_PROTO_PATH = 'relative_proto_path'
+_RELATIVE_PYTHON_OUT = 'relative_python_out'
+
+_PROTO_FILES_PATH_COMPONENTS = (
+ ('beta_grpc_plugin_test', 'payload', 'test_payload.proto',),
+ ('beta_grpc_plugin_test', 'requests', 'r', 'test_requests.proto',),
+ ('beta_grpc_plugin_test', 'responses', 'test_responses.proto',),
+ ('beta_grpc_plugin_test', 'service', 'test_service.proto',),)
+
+_PAYLOAD_PB2 = 'beta_grpc_plugin_test.payload.test_payload_pb2'
+_REQUESTS_PB2 = 'beta_grpc_plugin_test.requests.r.test_requests_pb2'
+_RESPONSES_PB2 = 'beta_grpc_plugin_test.responses.test_responses_pb2'
+_SERVICE_PB2 = 'beta_grpc_plugin_test.service.test_service_pb2'
# Identifiers of entities we expect to find in the generated module.
SERVICER_IDENTIFIER = 'BetaTestServiceServicer'
@@ -47,12 +53,50 @@ SERVER_FACTORY_IDENTIFIER = 'beta_create_TestService_server'
STUB_FACTORY_IDENTIFIER = 'beta_create_TestService_stub'
+@contextlib.contextmanager
+def _system_path(path_insertion):
+ old_system_path = sys.path[:]
+ sys.path = sys.path[0:1] + path_insertion + sys.path[1:]
+ yield
+ sys.path = old_system_path
+
+
+def _create_directory_tree(root, path_components_sequence):
+ created = set()
+ for path_components in path_components_sequence:
+ thus_far = ''
+ for path_component in path_components:
+ relative_path = path.join(thus_far, path_component)
+ if relative_path not in created:
+ os.makedirs(path.join(root, relative_path))
+ created.add(relative_path)
+ thus_far = path.join(thus_far, path_component)
+
+
+def _massage_proto_content(raw_proto_content):
+ imports_substituted = raw_proto_content.replace(
+ b'import "tests/protoc_plugin/protos/',
+ b'import "beta_grpc_plugin_test/')
+ package_statement_substituted = imports_substituted.replace(
+ b'package grpc_protoc_plugin;', b'package beta_grpc_protoc_plugin;')
+ return package_statement_substituted
+
+
+def _packagify(directory):
+ for subdirectory, _, _ in os.walk(directory):
+ init_file_name = path.join(subdirectory, '__init__.py')
+ with open(init_file_name, 'wb') as init_file:
+ init_file.write(b'')
+
+
class _ServicerMethods(object):
- def __init__(self):
+ def __init__(self, payload_pb2, responses_pb2):
self._condition = threading.Condition()
self._paused = False
self._fail = False
+ self._payload_pb2 = payload_pb2
+ self._responses_pb2 = responses_pb2
@contextlib.contextmanager
def pause(self): # pylint: disable=invalid-name
@@ -79,22 +123,22 @@ class _ServicerMethods(object):
self._condition.wait()
def UnaryCall(self, request, unused_rpc_context):
- response = response_pb2.SimpleResponse()
- response.payload.payload_type = payload_pb2.COMPRESSABLE
+ response = self._responses_pb2.SimpleResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * request.response_size
self._control()
return response
def StreamingOutputCall(self, request, unused_rpc_context):
for parameter in request.response_parameters:
- response = response_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = payload_pb2.COMPRESSABLE
+ response = self._responses_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
def StreamingInputCall(self, request_iter, unused_rpc_context):
- response = response_pb2.StreamingInputCallResponse()
+ response = self._responses_pb2.StreamingInputCallResponse()
aggregated_payload_size = 0
for request in request_iter:
aggregated_payload_size += len(request.payload.payload_compressable)
@@ -105,8 +149,8 @@ class _ServicerMethods(object):
def FullDuplexCall(self, request_iter, unused_rpc_context):
for request in request_iter:
for parameter in request.response_parameters:
- response = response_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = payload_pb2.COMPRESSABLE
+ response = self._responses_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
@@ -115,8 +159,8 @@ class _ServicerMethods(object):
responses = []
for request in request_iter:
for parameter in request.response_parameters:
- response = response_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = payload_pb2.COMPRESSABLE
+ response = self._responses_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
responses.append(response)
@@ -125,7 +169,7 @@ class _ServicerMethods(object):
@contextlib.contextmanager
-def _CreateService():
+def _CreateService(payload_pb2, responses_pb2, service_pb2):
"""Provides a servicer backend and a stub.
The servicer is just the implementation of the actual servicer passed to the
@@ -136,7 +180,7 @@ def _CreateService():
the service bound to the stub and and stub is the stub on which to invoke
RPCs.
"""
- servicer_methods = _ServicerMethods()
+ servicer_methods = _ServicerMethods(payload_pb2, responses_pb2)
class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
@@ -161,12 +205,12 @@ def _CreateService():
server.start()
channel = implementations.insecure_channel('localhost', port)
stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
- yield (servicer_methods, stub)
+ yield servicer_methods, stub,
server.stop(0)
@contextlib.contextmanager
-def _CreateIncompleteService():
+def _CreateIncompleteService(service_pb2):
"""Provides a servicer backend that fails to implement methods and its stub.
The servicer is just the implementation of the actual servicer passed to the
@@ -192,16 +236,16 @@ def _CreateIncompleteService():
server.stop(0)
-def _streaming_input_request_iterator():
+def _streaming_input_request_iterator(payload_pb2, requests_pb2):
for _ in range(3):
- request = request_pb2.StreamingInputCallRequest()
+ request = requests_pb2.StreamingInputCallRequest()
request.payload.payload_type = payload_pb2.COMPRESSABLE
request.payload.payload_compressable = 'a'
yield request
-def _streaming_output_request():
- request = request_pb2.StreamingOutputCallRequest()
+def _streaming_output_request(requests_pb2):
+ request = requests_pb2.StreamingOutputCallRequest()
sizes = [1, 2, 3]
request.response_parameters.add(size=sizes[0], interval_us=0)
request.response_parameters.add(size=sizes[1], interval_us=0)
@@ -209,11 +253,11 @@ def _streaming_output_request():
return request
-def _full_duplex_request_iterator():
- request = request_pb2.StreamingOutputCallRequest()
+def _full_duplex_request_iterator(requests_pb2):
+ request = requests_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
- request = request_pb2.StreamingOutputCallRequest()
+ request = requests_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
@@ -227,22 +271,78 @@ class PythonPluginTest(unittest.TestCase):
methods and does not exist for response-streaming methods.
"""
+ def setUp(self):
+ self._directory = tempfile.mkdtemp(dir='.')
+ self._proto_path = path.join(self._directory, _RELATIVE_PROTO_PATH)
+ self._python_out = path.join(self._directory, _RELATIVE_PYTHON_OUT)
+
+ os.makedirs(self._proto_path)
+ os.makedirs(self._python_out)
+
+ directories_path_components = {
+ proto_file_path_components[:-1]
+ for proto_file_path_components in _PROTO_FILES_PATH_COMPONENTS
+ }
+ _create_directory_tree(self._proto_path, directories_path_components)
+ self._proto_file_names = set()
+ for proto_file_path_components in _PROTO_FILES_PATH_COMPONENTS:
+ raw_proto_content = pkgutil.get_data(
+ 'tests.protoc_plugin.protos',
+ path.join(*proto_file_path_components[1:]))
+ massaged_proto_content = _massage_proto_content(raw_proto_content)
+ proto_file_name = path.join(self._proto_path,
+ *proto_file_path_components)
+ with open(proto_file_name, 'wb') as proto_file:
+ proto_file.write(massaged_proto_content)
+ self._proto_file_names.add(proto_file_name)
+
+ def tearDown(self):
+ shutil.rmtree(self._directory)
+
+ def _protoc(self):
+ args = [
+ '',
+ '--proto_path={}'.format(self._proto_path),
+ '--python_out={}'.format(self._python_out),
+ '--grpc_python_out=grpc_1_0:{}'.format(self._python_out),
+ ] + list(self._proto_file_names)
+ protoc_exit_code = protoc.main(args)
+ self.assertEqual(0, protoc_exit_code)
+
+ _packagify(self._python_out)
+
+ with _system_path([
+ self._python_out,
+ ]):
+ self._payload_pb2 = importlib.import_module(_PAYLOAD_PB2)
+ self._requests_pb2 = importlib.import_module(_REQUESTS_PB2)
+ self._responses_pb2 = importlib.import_module(_RESPONSES_PB2)
+ self._service_pb2 = importlib.import_module(_SERVICE_PB2)
+
def testImportAttributes(self):
+ self._protoc()
+
# check that we can access the generated module and its members.
- self.assertIsNotNone(getattr(service_pb2, SERVICER_IDENTIFIER, None))
- self.assertIsNotNone(getattr(service_pb2, STUB_IDENTIFIER, None))
self.assertIsNotNone(
- getattr(service_pb2, SERVER_FACTORY_IDENTIFIER, None))
+ getattr(self._service_pb2, SERVICER_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(self._service_pb2, STUB_IDENTIFIER, None))
self.assertIsNotNone(
- getattr(service_pb2, STUB_FACTORY_IDENTIFIER, None))
+ getattr(self._service_pb2, SERVER_FACTORY_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, STUB_FACTORY_IDENTIFIER, None))
def testUpDown(self):
- with _CreateService():
- request_pb2.SimpleRequest(response_size=13)
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2):
+ self._requests_pb2.SimpleRequest(response_size=13)
def testIncompleteServicer(self):
- with _CreateIncompleteService() as (_, stub):
- request = request_pb2.SimpleRequest(response_size=13)
+ self._protoc()
+
+ with _CreateIncompleteService(self._service_pb2) as (_, stub):
+ request = self._requests_pb2.SimpleRequest(response_size=13)
try:
stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
except face.AbortionError as error:
@@ -250,15 +350,21 @@ class PythonPluginTest(unittest.TestCase):
error.code)
def testUnaryCall(self):
- with _CreateService() as (methods, stub):
- request = request_pb2.SimpleRequest(response_size=13)
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
+ request = self._requests_pb2.SimpleRequest(response_size=13)
response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
expected_response = methods.UnaryCall(request, 'not a real context!')
self.assertEqual(expected_response, response)
def testUnaryCallFuture(self):
- with _CreateService() as (methods, stub):
- request = request_pb2.SimpleRequest(response_size=13)
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
+ request = self._requests_pb2.SimpleRequest(response_size=13)
# Check that the call does not block waiting for the server to respond.
with methods.pause():
response_future = stub.UnaryCall.future(
@@ -268,8 +374,11 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testUnaryCallFutureExpired(self):
- with _CreateService() as (methods, stub):
- request = request_pb2.SimpleRequest(response_size=13)
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
+ request = self._requests_pb2.SimpleRequest(response_size=13)
with methods.pause():
response_future = stub.UnaryCall.future(
request, test_constants.SHORT_TIMEOUT)
@@ -277,24 +386,33 @@ class PythonPluginTest(unittest.TestCase):
response_future.result()
def testUnaryCallFutureCancelled(self):
- with _CreateService() as (methods, stub):
- request = request_pb2.SimpleRequest(response_size=13)
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
+ request = self._requests_pb2.SimpleRequest(response_size=13)
with methods.pause():
response_future = stub.UnaryCall.future(request, 1)
response_future.cancel()
self.assertTrue(response_future.cancelled())
def testUnaryCallFutureFailed(self):
- with _CreateService() as (methods, stub):
- request = request_pb2.SimpleRequest(response_size=13)
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
+ request = self._requests_pb2.SimpleRequest(response_size=13)
with methods.fail():
response_future = stub.UnaryCall.future(
request, test_constants.LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testStreamingOutputCall(self):
- with _CreateService() as (methods, stub):
- request = _streaming_output_request()
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
+ request = _streaming_output_request(self._requests_pb2)
responses = stub.StreamingOutputCall(request,
test_constants.LONG_TIMEOUT)
expected_responses = methods.StreamingOutputCall(
@@ -304,8 +422,11 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testStreamingOutputCallExpired(self):
- with _CreateService() as (methods, stub):
- request = _streaming_output_request()
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
+ request = _streaming_output_request(self._requests_pb2)
with methods.pause():
responses = stub.StreamingOutputCall(
request, test_constants.SHORT_TIMEOUT)
@@ -313,8 +434,11 @@ class PythonPluginTest(unittest.TestCase):
list(responses)
def testStreamingOutputCallCancelled(self):
- with _CreateService() as (methods, stub):
- request = _streaming_output_request()
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
+ request = _streaming_output_request(self._requests_pb2)
responses = stub.StreamingOutputCall(request,
test_constants.LONG_TIMEOUT)
next(responses)
@@ -323,8 +447,11 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testStreamingOutputCallFailed(self):
- with _CreateService() as (methods, stub):
- request = _streaming_output_request()
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
+ request = _streaming_output_request(self._requests_pb2)
with methods.fail():
responses = stub.StreamingOutputCall(request, 1)
self.assertIsNotNone(responses)
@@ -332,30 +459,46 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testStreamingInputCall(self):
- with _CreateService() as (methods, stub):
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
response = stub.StreamingInputCall(
- _streaming_input_request_iterator(),
+ _streaming_input_request_iterator(self._payload_pb2,
+ self._requests_pb2),
test_constants.LONG_TIMEOUT)
expected_response = methods.StreamingInputCall(
- _streaming_input_request_iterator(), 'not a real RpcContext!')
+ _streaming_input_request_iterator(self._payload_pb2,
+ self._requests_pb2),
+ 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallFuture(self):
- with _CreateService() as (methods, stub):
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(),
+ _streaming_input_request_iterator(self._payload_pb2,
+ self._requests_pb2),
test_constants.LONG_TIMEOUT)
response = response_future.result()
expected_response = methods.StreamingInputCall(
- _streaming_input_request_iterator(), 'not a real RpcContext!')
+ _streaming_input_request_iterator(self._payload_pb2,
+ self._requests_pb2),
+ 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallFutureExpired(self):
- with _CreateService() as (methods, stub):
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(),
+ _streaming_input_request_iterator(self._payload_pb2,
+ self._requests_pb2),
test_constants.SHORT_TIMEOUT)
with self.assertRaises(face.ExpirationError):
response_future.result()
@@ -363,10 +506,14 @@ class PythonPluginTest(unittest.TestCase):
face.ExpirationError)
def testStreamingInputCallFutureCancelled(self):
- with _CreateService() as (methods, stub):
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(),
+ _streaming_input_request_iterator(self._payload_pb2,
+ self._requests_pb2),
test_constants.LONG_TIMEOUT)
response_future.cancel()
self.assertTrue(response_future.cancelled())
@@ -374,26 +521,38 @@ class PythonPluginTest(unittest.TestCase):
response_future.result()
def testStreamingInputCallFutureFailed(self):
- with _CreateService() as (methods, stub):
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
with methods.fail():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(),
+ _streaming_input_request_iterator(self._payload_pb2,
+ self._requests_pb2),
test_constants.LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testFullDuplexCall(self):
- with _CreateService() as (methods, stub):
- responses = stub.FullDuplexCall(_full_duplex_request_iterator(),
- test_constants.LONG_TIMEOUT)
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
+ responses = stub.FullDuplexCall(
+ _full_duplex_request_iterator(self._requests_pb2),
+ test_constants.LONG_TIMEOUT)
expected_responses = methods.FullDuplexCall(
- _full_duplex_request_iterator(), 'not a real RpcContext!')
+ _full_duplex_request_iterator(self._requests_pb2),
+ 'not a real RpcContext!')
for expected_response, response in moves.zip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
def testFullDuplexCallExpired(self):
- request_iterator = _full_duplex_request_iterator()
- with _CreateService() as (methods, stub):
+ self._protoc()
+
+ request_iterator = _full_duplex_request_iterator(self._requests_pb2)
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
with methods.pause():
responses = stub.FullDuplexCall(request_iterator,
test_constants.SHORT_TIMEOUT)
@@ -401,8 +560,11 @@ class PythonPluginTest(unittest.TestCase):
list(responses)
def testFullDuplexCallCancelled(self):
- with _CreateService() as (methods, stub):
- request_iterator = _full_duplex_request_iterator()
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
+ request_iterator = _full_duplex_request_iterator(self._requests_pb2)
responses = stub.FullDuplexCall(request_iterator,
test_constants.LONG_TIMEOUT)
next(responses)
@@ -411,8 +573,11 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testFullDuplexCallFailed(self):
- request_iterator = _full_duplex_request_iterator()
- with _CreateService() as (methods, stub):
+ self._protoc()
+
+ request_iterator = _full_duplex_request_iterator(self._requests_pb2)
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
with methods.fail():
responses = stub.FullDuplexCall(request_iterator,
test_constants.LONG_TIMEOUT)
@@ -421,13 +586,16 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testHalfDuplexCall(self):
- with _CreateService() as (methods, stub):
+ self._protoc()
+
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
def half_duplex_request_iterator():
- request = request_pb2.StreamingOutputCallRequest()
+ request = self._requests_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
- request = request_pb2.StreamingOutputCallRequest()
+ request = self._requests_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
@@ -441,6 +609,8 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testHalfDuplexCallWedged(self):
+ self._protoc()
+
condition = threading.Condition()
wait_cell = [False]
@@ -455,14 +625,15 @@ class PythonPluginTest(unittest.TestCase):
condition.notify_all()
def half_duplex_request_iterator():
- request = request_pb2.StreamingOutputCallRequest()
+ request = self._requests_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
with condition:
while wait_cell[0]:
condition.wait()
- with _CreateService() as (methods, stub):
+ with _CreateService(self._payload_pb2, self._responses_pb2,
+ self._service_pb2) as (methods, stub):
with wait():
responses = stub.HalfDuplexCall(half_duplex_request_iterator(),
test_constants.SHORT_TIMEOUT)
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/messages.proto b/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/sub/messages.proto
index 1b780c69ba..1b780c69ba 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/messages.proto
+++ b/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/sub/messages.proto
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_services/__init__.py b/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_services/__init__.py
deleted file mode 100644
index 5772620b60..0000000000
--- a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_services/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2016 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py
index 5f4df79c5b..17fa61ea36 100644
--- a/src/python/grpcio_tests/tests/qps/benchmark_client.py
+++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py
@@ -22,7 +22,7 @@ from six.moves import queue
import grpc
from src.proto.grpc.testing import messages_pb2
-from src.proto.grpc.testing import services_pb2
+from src.proto.grpc.testing import services_pb2_grpc
from tests.unit import resources
from tests.unit import test_common
@@ -58,7 +58,7 @@ class BenchmarkClient:
if config.payload_config.WhichOneof('payload') == 'simple_params':
self._generic = False
- self._stub = services_pb2.BenchmarkServiceStub(channel)
+ self._stub = services_pb2_grpc.BenchmarkServiceStub(channel)
payload = messages_pb2.Payload(
body='\0' * config.payload_config.simple_params.req_size)
self._request = messages_pb2.SimpleRequest(
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_server.py b/src/python/grpcio_tests/tests/qps/benchmark_server.py
index 05101fdc6d..bb07844491 100644
--- a/src/python/grpcio_tests/tests/qps/benchmark_server.py
+++ b/src/python/grpcio_tests/tests/qps/benchmark_server.py
@@ -13,10 +13,10 @@
# limitations under the License.
from src.proto.grpc.testing import messages_pb2
-from src.proto.grpc.testing import services_pb2
+from src.proto.grpc.testing import services_pb2_grpc
-class BenchmarkServer(services_pb2.BenchmarkServiceServicer):
+class BenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer):
"""Synchronous Server implementation for the Benchmark service."""
def UnaryCall(self, request, context):
@@ -29,7 +29,7 @@ class BenchmarkServer(services_pb2.BenchmarkServiceServicer):
yield messages_pb2.SimpleResponse(payload=payload)
-class GenericBenchmarkServer(services_pb2.BenchmarkServiceServicer):
+class GenericBenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer):
"""Generic Server implementation for the Benchmark service."""
def __init__(self, resp_size):
diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py
index d5ff0064fd..40caa3926a 100644
--- a/src/python/grpcio_tests/tests/stress/client.py
+++ b/src/python/grpcio_tests/tests/stress/client.py
@@ -20,7 +20,7 @@ import threading
import grpc
from six.moves import queue
from src.proto.grpc.testing import metrics_pb2_grpc
-from src.proto.grpc.testing import test_pb2
+from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import methods
from tests.interop import resources
@@ -133,7 +133,7 @@ def run_test(args):
for _ in xrange(args.num_channels_per_server):
channel = _get_channel(test_server_target, args)
for _ in xrange(args.num_stubs_per_channel):
- stub = test_pb2.TestServiceStub(channel)
+ stub = test_pb2_grpc.TestServiceStub(channel)
runner = test_runner.TestRunner(stub, test_cases, hist,
exception_queue, stop_event)
runners.append(runner)
diff --git a/src/python/grpcio_tests/tests/stress/metrics_server.py b/src/python/grpcio_tests/tests/stress/metrics_server.py
index 11ab6c3f4e..33a74b4a38 100644
--- a/src/python/grpcio_tests/tests/stress/metrics_server.py
+++ b/src/python/grpcio_tests/tests/stress/metrics_server.py
@@ -16,11 +16,12 @@
import time
from src.proto.grpc.testing import metrics_pb2
+from src.proto.grpc.testing import metrics_pb2_grpc
GAUGE_NAME = 'python_overall_qps'
-class MetricsServer(metrics_pb2.MetricsServiceServicer):
+class MetricsServer(metrics_pb2_grpc.MetricsServiceServicer):
def __init__(self, histogram):
self._start_time = time.time()
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index d61297b918..8512d5b96f 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -1,12 +1,17 @@
[
+ "_sanity._sanity_test.SanityTest",
"health_check._health_servicer_test.HealthServicerTest",
"interop._insecure_intraop_test.InsecureIntraopTest",
"interop._secure_intraop_test.SecureIntraopTest",
"protoc_plugin._python_plugin_test.PythonPluginTest",
- "protoc_plugin._split_definitions_test.SameCommonTest",
- "protoc_plugin._split_definitions_test.SameSeparateTest",
- "protoc_plugin._split_definitions_test.SplitCommonTest",
- "protoc_plugin._split_definitions_test.SplitSeparateTest",
+ "protoc_plugin._split_definitions_test.SameProtoGrpcBeforeProtoProtocStyleTest",
+ "protoc_plugin._split_definitions_test.SameProtoMid2016ProtocStyleTest",
+ "protoc_plugin._split_definitions_test.SameProtoProtoBeforeGrpcProtocStyleTest",
+ "protoc_plugin._split_definitions_test.SameProtoSingleProtocExecutionProtocStyleTest",
+ "protoc_plugin._split_definitions_test.SplitProtoGrpcBeforeProtoProtocStyleTest",
+ "protoc_plugin._split_definitions_test.SplitProtoMid2016ProtocStyleTest",
+ "protoc_plugin._split_definitions_test.SplitProtoProtoBeforeGrpcProtocStyleTest",
+ "protoc_plugin._split_definitions_test.SplitProtoSingleProtocExecutionProtocStyleTest",
"protoc_plugin.beta_python_plugin_test.PythonPluginTest",
"reflection._reflection_servicer_test.ReflectionServicerTest",
"testing._client_test.ClientTest",
@@ -26,6 +31,8 @@
"unit._credentials_test.CredentialsTest",
"unit._cython._cancel_many_calls_test.CancelManyCallsTest",
"unit._cython._channel_test.ChannelTest",
+ "unit._cython._no_messages_server_completion_queue_per_call_test.Test",
+ "unit._cython._no_messages_single_server_completion_queue_test.Test",
"unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest",
"unit._cython.cygrpc_test.InsecureServerInsecureClient",
"unit._cython.cygrpc_test.SecureServerSecureClient",
@@ -39,7 +46,6 @@
"unit._reconnect_test.ReconnectTest",
"unit._resource_exhausted_test.ResourceExhaustedTest",
"unit._rpc_test.RPCTest",
- "unit._sanity._sanity_test.Sanity",
"unit._thread_cleanup_test.CleanupThreadTest",
"unit.beta._beta_features_test.BetaFeaturesTest",
"unit.beta._beta_features_test.ContextManagementAndLifecycleTest",
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py
new file mode 100644
index 0000000000..ac66d1db3d
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py
@@ -0,0 +1,118 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Common utilities for tests of the Cython layer of gRPC Python."""
+
+import collections
+import threading
+
+from grpc._cython import cygrpc
+
+RPC_COUNT = 4000
+
+INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
+EMPTY_FLAGS = 0
+
+INVOCATION_METADATA = cygrpc.Metadata(
+ (cygrpc.Metadatum(b'client-md-key', b'client-md-key'),
+ cygrpc.Metadatum(b'client-md-key-bin', b'\x00\x01' * 3000),))
+
+INITIAL_METADATA = cygrpc.Metadata(
+ (cygrpc.Metadatum(b'server-initial-md-key', b'server-initial-md-value'),
+ cygrpc.Metadatum(b'server-initial-md-key-bin', b'\x00\x02' * 3000),))
+
+TRAILING_METADATA = cygrpc.Metadata(
+ (cygrpc.Metadatum(b'server-trailing-md-key', b'server-trailing-md-value'),
+ cygrpc.Metadatum(b'server-trailing-md-key-bin', b'\x00\x03' * 3000),))
+
+
+class QueueDriver(object):
+
+ def __init__(self, condition, completion_queue):
+ self._condition = condition
+ self._completion_queue = completion_queue
+ self._due = collections.defaultdict(int)
+ self._events = collections.defaultdict(list)
+
+ def add_due(self, tags):
+ if not self._due:
+
+ def in_thread():
+ while True:
+ event = self._completion_queue.poll()
+ with self._condition:
+ self._events[event.tag].append(event)
+ self._due[event.tag] -= 1
+ self._condition.notify_all()
+ if self._due[event.tag] <= 0:
+ self._due.pop(event.tag)
+ if not self._due:
+ return
+
+ thread = threading.Thread(target=in_thread)
+ thread.start()
+ for tag in tags:
+ self._due[tag] += 1
+
+ def event_with_tag(self, tag):
+ with self._condition:
+ while True:
+ if self._events[tag]:
+ return self._events[tag].pop(0)
+ else:
+ self._condition.wait()
+
+
+def execute_many_times(behavior):
+ return tuple(behavior() for _ in range(RPC_COUNT))
+
+
+class OperationResult(
+ collections.namedtuple('OperationResult', (
+ 'start_batch_result', 'completion_type', 'success',))):
+ pass
+
+
+SUCCESSFUL_OPERATION_RESULT = OperationResult(
+ cygrpc.CallError.ok, cygrpc.CompletionType.operation_complete, True)
+
+
+class RpcTest(object):
+
+ def setUp(self):
+ self.server_completion_queue = cygrpc.CompletionQueue()
+ self.server = cygrpc.Server(cygrpc.ChannelArgs([]))
+ self.server.register_completion_queue(self.server_completion_queue)
+ port = self.server.add_http2_port(b'[::]:0')
+ self.server.start()
+ self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(),
+ cygrpc.ChannelArgs([]))
+
+ self._server_shutdown_tag = 'server_shutdown_tag'
+ self.server_condition = threading.Condition()
+ self.server_driver = QueueDriver(self.server_condition,
+ self.server_completion_queue)
+ with self.server_condition:
+ self.server_driver.add_due({
+ self._server_shutdown_tag,
+ })
+
+ self.client_condition = threading.Condition()
+ self.client_completion_queue = cygrpc.CompletionQueue()
+ self.client_driver = QueueDriver(self.client_condition,
+ self.client_completion_queue)
+
+ def tearDown(self):
+ self.server.shutdown(self.server_completion_queue,
+ self._server_shutdown_tag)
+ self.server.cancel_all_calls()
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
new file mode 100644
index 0000000000..14cc66675c
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
@@ -0,0 +1,131 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Test a corner-case at the level of the Cython API."""
+
+import threading
+import unittest
+
+from grpc._cython import cygrpc
+
+from tests.unit._cython import _common
+
+
+class Test(_common.RpcTest, unittest.TestCase):
+
+ def _do_rpcs(self):
+ server_call_condition = threading.Condition()
+ server_call_completion_queue = cygrpc.CompletionQueue()
+ server_call_driver = _common.QueueDriver(server_call_condition,
+ server_call_completion_queue)
+
+ server_request_call_tag = 'server_request_call_tag'
+ server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
+ server_complete_rpc_tag = 'server_complete_rpc_tag'
+
+ with self.server_condition:
+ server_request_call_start_batch_result = self.server.request_call(
+ server_call_completion_queue, self.server_completion_queue,
+ server_request_call_tag)
+ self.server_driver.add_due({
+ server_request_call_tag,
+ })
+
+ client_call = self.channel.create_call(
+ None, _common.EMPTY_FLAGS, self.client_completion_queue,
+ b'/twinkies', None, _common.INFINITE_FUTURE)
+ client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
+ client_complete_rpc_tag = 'client_complete_rpc_tag'
+ with self.client_condition:
+ client_receive_initial_metadata_start_batch_result = (
+ client_call.start_client_batch(
+ cygrpc.Operations([
+ cygrpc.operation_receive_initial_metadata(
+ _common.EMPTY_FLAGS),
+ ]), client_receive_initial_metadata_tag))
+ client_complete_rpc_start_batch_result = client_call.start_client_batch(
+ cygrpc.Operations([
+ cygrpc.operation_send_initial_metadata(
+ _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
+ cygrpc.operation_send_close_from_client(
+ _common.EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(
+ _common.EMPTY_FLAGS),
+ ]), client_complete_rpc_tag)
+ self.client_driver.add_due({
+ client_receive_initial_metadata_tag,
+ client_complete_rpc_tag,
+ })
+
+ server_request_call_event = self.server_driver.event_with_tag(
+ server_request_call_tag)
+
+ with server_call_condition:
+ server_send_initial_metadata_start_batch_result = (
+ server_request_call_event.operation_call.start_server_batch([
+ cygrpc.operation_send_initial_metadata(
+ _common.INITIAL_METADATA, _common.EMPTY_FLAGS),
+ ], server_send_initial_metadata_tag))
+ server_call_driver.add_due({
+ server_send_initial_metadata_tag,
+ })
+ server_send_initial_metadata_event = server_call_driver.event_with_tag(
+ server_send_initial_metadata_tag)
+
+ with server_call_condition:
+ server_complete_rpc_start_batch_result = (
+ server_request_call_event.operation_call.start_server_batch([
+ cygrpc.operation_receive_close_on_server(
+ _common.EMPTY_FLAGS),
+ cygrpc.operation_send_status_from_server(
+ _common.TRAILING_METADATA, cygrpc.StatusCode.ok,
+ b'test details', _common.EMPTY_FLAGS),
+ ], server_complete_rpc_tag))
+ server_call_driver.add_due({
+ server_complete_rpc_tag,
+ })
+ server_complete_rpc_event = server_call_driver.event_with_tag(
+ server_complete_rpc_tag)
+
+ client_receive_initial_metadata_event = self.client_driver.event_with_tag(
+ client_receive_initial_metadata_tag)
+ client_complete_rpc_event = self.client_driver.event_with_tag(
+ client_complete_rpc_tag)
+
+ return (_common.OperationResult(server_request_call_start_batch_result,
+ server_request_call_event.type,
+ server_request_call_event.success),
+ _common.OperationResult(
+ client_receive_initial_metadata_start_batch_result,
+ client_receive_initial_metadata_event.type,
+ client_receive_initial_metadata_event.success),
+ _common.OperationResult(client_complete_rpc_start_batch_result,
+ client_complete_rpc_event.type,
+ client_complete_rpc_event.success),
+ _common.OperationResult(
+ server_send_initial_metadata_start_batch_result,
+ server_send_initial_metadata_event.type,
+ server_send_initial_metadata_event.success),
+ _common.OperationResult(server_complete_rpc_start_batch_result,
+ server_complete_rpc_event.type,
+ server_complete_rpc_event.success),)
+
+ def test_rpcs(self):
+ expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
+ 5] * _common.RPC_COUNT
+ actuallys = _common.execute_many_times(self._do_rpcs)
+ self.assertSequenceEqual(expecteds, actuallys)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
new file mode 100644
index 0000000000..1e44bcc4dc
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
@@ -0,0 +1,126 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Test a corner-case at the level of the Cython API."""
+
+import threading
+import unittest
+
+from grpc._cython import cygrpc
+
+from tests.unit._cython import _common
+
+
+class Test(_common.RpcTest, unittest.TestCase):
+
+ def _do_rpcs(self):
+ server_request_call_tag = 'server_request_call_tag'
+ server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
+ server_complete_rpc_tag = 'server_complete_rpc_tag'
+
+ with self.server_condition:
+ server_request_call_start_batch_result = self.server.request_call(
+ self.server_completion_queue, self.server_completion_queue,
+ server_request_call_tag)
+ self.server_driver.add_due({
+ server_request_call_tag,
+ })
+
+ client_call = self.channel.create_call(
+ None, _common.EMPTY_FLAGS, self.client_completion_queue,
+ b'/twinkies', None, _common.INFINITE_FUTURE)
+ client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
+ client_complete_rpc_tag = 'client_complete_rpc_tag'
+ with self.client_condition:
+ client_receive_initial_metadata_start_batch_result = (
+ client_call.start_client_batch(
+ cygrpc.Operations([
+ cygrpc.operation_receive_initial_metadata(
+ _common.EMPTY_FLAGS),
+ ]), client_receive_initial_metadata_tag))
+ client_complete_rpc_start_batch_result = client_call.start_client_batch(
+ cygrpc.Operations([
+ cygrpc.operation_send_initial_metadata(
+ _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
+ cygrpc.operation_send_close_from_client(
+ _common.EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(
+ _common.EMPTY_FLAGS),
+ ]), client_complete_rpc_tag)
+ self.client_driver.add_due({
+ client_receive_initial_metadata_tag,
+ client_complete_rpc_tag,
+ })
+
+ server_request_call_event = self.server_driver.event_with_tag(
+ server_request_call_tag)
+
+ with self.server_condition:
+ server_send_initial_metadata_start_batch_result = (
+ server_request_call_event.operation_call.start_server_batch([
+ cygrpc.operation_send_initial_metadata(
+ _common.INITIAL_METADATA, _common.EMPTY_FLAGS),
+ ], server_send_initial_metadata_tag))
+ self.server_driver.add_due({
+ server_send_initial_metadata_tag,
+ })
+ server_send_initial_metadata_event = self.server_driver.event_with_tag(
+ server_send_initial_metadata_tag)
+
+ with self.server_condition:
+ server_complete_rpc_start_batch_result = (
+ server_request_call_event.operation_call.start_server_batch([
+ cygrpc.operation_receive_close_on_server(
+ _common.EMPTY_FLAGS),
+ cygrpc.operation_send_status_from_server(
+ _common.TRAILING_METADATA, cygrpc.StatusCode.ok,
+ b'test details', _common.EMPTY_FLAGS),
+ ], server_complete_rpc_tag))
+ self.server_driver.add_due({
+ server_complete_rpc_tag,
+ })
+ server_complete_rpc_event = self.server_driver.event_with_tag(
+ server_complete_rpc_tag)
+
+ client_receive_initial_metadata_event = self.client_driver.event_with_tag(
+ client_receive_initial_metadata_tag)
+ client_complete_rpc_event = self.client_driver.event_with_tag(
+ client_complete_rpc_tag)
+
+ return (_common.OperationResult(server_request_call_start_batch_result,
+ server_request_call_event.type,
+ server_request_call_event.success),
+ _common.OperationResult(
+ client_receive_initial_metadata_start_batch_result,
+ client_receive_initial_metadata_event.type,
+ client_receive_initial_metadata_event.success),
+ _common.OperationResult(client_complete_rpc_start_batch_result,
+ client_complete_rpc_event.type,
+ client_complete_rpc_event.success),
+ _common.OperationResult(
+ server_send_initial_metadata_start_batch_result,
+ server_send_initial_metadata_event.type,
+ server_send_initial_metadata_event.success),
+ _common.OperationResult(server_complete_rpc_start_batch_result,
+ server_complete_rpc_event.type,
+ server_complete_rpc_event.success),)
+
+ def test_rpcs(self):
+ expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
+ 5] * _common.RPC_COUNT
+ actuallys = _common.execute_many_times(self._do_rpcs)
+ self.assertSequenceEqual(expecteds, actuallys)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_sanity/__init__.py b/src/python/grpcio_tests/tests/unit/_sanity/__init__.py
deleted file mode 100644
index 5772620b60..0000000000
--- a/src/python/grpcio_tests/tests/unit/_sanity/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2016 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.