aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/tests')
-rw-r--r--src/python/grpcio/tests/health_check/__init__.py28
-rw-r--r--src/python/grpcio/tests/health_check/_health_servicer_test.py75
-rw-r--r--src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py295
-rw-r--r--src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto51
-rw-r--r--src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto77
-rw-r--r--src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto47
-rw-r--r--src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto (renamed from src/python/grpcio/tests/protoc_plugin/protoc_plugin_test.proto)85
-rw-r--r--src/python/grpcio/tests/qps/benchmark_client.py60
-rw-r--r--src/python/grpcio/tests/qps/client_runner.py2
-rw-r--r--src/python/grpcio/tests/qps/worker_server.py5
-rw-r--r--src/python/grpcio/tests/tests.json5
-rw-r--r--src/python/grpcio/tests/unit/_cython/_channel_test.py2
-rw-r--r--src/python/grpcio/tests/unit/_cython/cygrpc_test.py58
-rw-r--r--src/python/grpcio/tests/unit/framework/common/test_constants.py9
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py10
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py8
16 files changed, 548 insertions, 269 deletions
diff --git a/src/python/grpcio/tests/health_check/__init__.py b/src/python/grpcio/tests/health_check/__init__.py
new file mode 100644
index 0000000000..100a624dc9
--- /dev/null
+++ b/src/python/grpcio/tests/health_check/__init__.py
@@ -0,0 +1,28 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/src/python/grpcio/tests/health_check/_health_servicer_test.py b/src/python/grpcio/tests/health_check/_health_servicer_test.py
new file mode 100644
index 0000000000..1b63388663
--- /dev/null
+++ b/src/python/grpcio/tests/health_check/_health_servicer_test.py
@@ -0,0 +1,75 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of grpc_health.health.v1.health."""
+
+import unittest
+
+from grpc_health.health.v1 import health
+from grpc_health.health.v1 import health_pb2
+
+
+class HealthServicerTest(unittest.TestCase):
+
+ def setUp(self):
+ self.servicer = health.HealthServicer()
+ self.servicer.set('', health_pb2.HealthCheckResponse.SERVING)
+ self.servicer.set('grpc.test.TestServiceServing',
+ health_pb2.HealthCheckResponse.SERVING)
+ self.servicer.set('grpc.test.TestServiceUnknown',
+ health_pb2.HealthCheckResponse.UNKNOWN)
+ self.servicer.set('grpc.test.TestServiceNotServing',
+ health_pb2.HealthCheckResponse.NOT_SERVING)
+
+ def test_empty_service(self):
+ request = health_pb2.HealthCheckRequest()
+ resp = self.servicer.Check(request, None)
+ self.assertEqual(resp.status, health_pb2.HealthCheckResponse.SERVING)
+
+ def test_serving_service(self):
+ request = health_pb2.HealthCheckRequest(
+ service='grpc.test.TestServiceServing')
+ resp = self.servicer.Check(request, None)
+ self.assertEqual(resp.status, health_pb2.HealthCheckResponse.SERVING)
+
+ def test_unknown_serivce(self):
+ request = health_pb2.HealthCheckRequest(
+ service='grpc.test.TestServiceUnknown')
+ resp = self.servicer.Check(request, None)
+ self.assertEqual(resp.status, health_pb2.HealthCheckResponse.UNKNOWN)
+
+ def test_not_serving_service(self):
+ request = health_pb2.HealthCheckRequest(
+ service='grpc.test.TestServiceNotServing')
+ resp = self.servicer.Check(request, None)
+ self.assertEqual(resp.status, health_pb2.HealthCheckResponse.NOT_SERVING)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
index 3dc3042e38..7466f88059 100644
--- a/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
+++ b/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
@@ -59,11 +59,12 @@ STUB_FACTORY_IDENTIFIER = 'beta_create_TestService_stub'
class _ServicerMethods(object):
- def __init__(self, test_pb2):
+ def __init__(self, response_pb2, payload_pb2):
self._condition = threading.Condition()
self._paused = False
self._fail = False
- self._test_pb2 = test_pb2
+ self._response_pb2 = response_pb2
+ self._payload_pb2 = payload_pb2
@contextlib.contextmanager
def pause(self): # pylint: disable=invalid-name
@@ -90,22 +91,22 @@ class _ServicerMethods(object):
self._condition.wait()
def UnaryCall(self, request, unused_rpc_context):
- response = self._test_pb2.SimpleResponse()
- response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response = self._response_pb2.SimpleResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * request.response_size
self._control()
return response
def StreamingOutputCall(self, request, unused_rpc_context):
for parameter in request.response_parameters:
- response = self._test_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response = self._response_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
def StreamingInputCall(self, request_iter, unused_rpc_context):
- response = self._test_pb2.StreamingInputCallResponse()
+ response = self._response_pb2.StreamingInputCallResponse()
aggregated_payload_size = 0
for request in request_iter:
aggregated_payload_size += len(request.payload.payload_compressable)
@@ -116,8 +117,8 @@ class _ServicerMethods(object):
def FullDuplexCall(self, request_iter, unused_rpc_context):
for request in request_iter:
for parameter in request.response_parameters:
- response = self._test_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response = self._response_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
@@ -126,8 +127,8 @@ class _ServicerMethods(object):
responses = []
for request in request_iter:
for parameter in request.response_parameters:
- response = self._test_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response = self._response_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
responses.append(response)
@@ -136,23 +137,25 @@ class _ServicerMethods(object):
@contextlib.contextmanager
-def _CreateService(test_pb2):
+def _CreateService(service_pb2, response_pb2, payload_pb2):
"""Provides a servicer backend and a stub.
The servicer is just the implementation of the actual servicer passed to the
face player of the python RPC implementation; the two are detached.
Args:
- test_pb2: The test_pb2 module generated by this test.
+ service_pb2: The service_pb2 module generated by this test.
+ response_pb2: The response_pb2 module generated by this test
+ payload_pb2: The payload_pb2 module generated by this test
Yields:
A (servicer_methods, stub) pair where servicer_methods is the back-end of
the service bound to the stub and and stub is the stub on which to invoke
RPCs.
"""
- servicer_methods = _ServicerMethods(test_pb2)
+ servicer_methods = _ServicerMethods(response_pb2, payload_pb2)
- class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
+ class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
def UnaryCall(self, request, context):
return servicer_methods.UnaryCall(request, context)
@@ -170,55 +173,52 @@ def _CreateService(test_pb2):
return servicer_methods.HalfDuplexCall(request_iter, context)
servicer = Servicer()
- server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
+ server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
port = server.add_insecure_port('[::]:0')
server.start()
channel = implementations.insecure_channel('localhost', port)
- stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel)
- yield servicer_methods, stub
+ stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
+ yield (servicer_methods, stub)
server.stop(0)
@contextlib.contextmanager
-def _CreateIncompleteService(test_pb2):
+def _CreateIncompleteService(service_pb2):
"""Provides a servicer backend that fails to implement methods and its stub.
The servicer is just the implementation of the actual servicer passed to the
face player of the python RPC implementation; the two are detached.
-
Args:
- test_pb2: The test_pb2 module generated by this test.
-
+ service_pb2: The service_pb2 module generated by this test.
Yields:
A (servicer_methods, stub) pair where servicer_methods is the back-end of
the service bound to the stub and and stub is the stub on which to invoke
RPCs.
"""
- servicer_methods = _ServicerMethods(test_pb2)
- class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
+ class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
pass
servicer = Servicer()
- server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
+ server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
port = server.add_insecure_port('[::]:0')
server.start()
channel = implementations.insecure_channel('localhost', port)
- stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel)
- yield servicer_methods, stub
+ stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
+ yield None, stub
server.stop(0)
-def _streaming_input_request_iterator(test_pb2):
+def _streaming_input_request_iterator(request_pb2, payload_pb2):
for _ in range(3):
- request = test_pb2.StreamingInputCallRequest()
- request.payload.payload_type = test_pb2.COMPRESSABLE
+ request = request_pb2.StreamingInputCallRequest()
+ request.payload.payload_type = payload_pb2.COMPRESSABLE
request.payload.payload_compressable = 'a'
yield request
-def _streaming_output_request(test_pb2):
- request = test_pb2.StreamingOutputCallRequest()
+def _streaming_output_request(request_pb2):
+ request = request_pb2.StreamingOutputCallRequest()
sizes = [1, 2, 3]
request.response_parameters.add(size=sizes[0], interval_us=0)
request.response_parameters.add(size=sizes[1], interval_us=0)
@@ -226,11 +226,11 @@ def _streaming_output_request(test_pb2):
return request
-def _full_duplex_request_iterator(test_pb2):
- request = test_pb2.StreamingOutputCallRequest()
+def _full_duplex_request_iterator(request_pb2):
+ request = request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
- request = test_pb2.StreamingOutputCallRequest()
+ request = request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
@@ -250,8 +250,6 @@ class PythonPluginTest(unittest.TestCase):
protoc_command = 'protoc'
protoc_plugin_filename = distutils.spawn.find_executable(
'grpc_python_plugin')
- test_proto_filename = pkg_resources.resource_filename(
- 'tests.protoc_plugin', 'protoc_plugin_test.proto')
if not os.path.isfile(protoc_command):
# Assume that if we haven't built protoc that it's on the system.
protoc_command = 'protoc'
@@ -259,19 +257,44 @@ class PythonPluginTest(unittest.TestCase):
# Ensure that the output directory exists.
self.outdir = tempfile.mkdtemp()
+ # Find all proto files
+ paths = []
+ root_dir = os.path.dirname(os.path.realpath(__file__))
+ proto_dir = os.path.join(root_dir, 'protos')
+ for walk_root, _, filenames in os.walk(proto_dir):
+ for filename in filenames:
+ if filename.endswith('.proto'):
+ path = os.path.join(walk_root, filename)
+ paths.append(path)
+
# Invoke protoc with the plugin.
cmd = [
protoc_command,
'--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
- '-I .',
+ '-I %s' % root_dir,
'--python_out=%s' % self.outdir,
- '--python-grpc_out=%s' % self.outdir,
- os.path.basename(test_proto_filename),
- ]
+ '--python-grpc_out=%s' % self.outdir
+ ] + paths
subprocess.check_call(' '.join(cmd), shell=True, env=os.environ,
- cwd=os.path.dirname(test_proto_filename))
+ cwd=os.path.dirname(os.path.realpath(__file__)))
+
+ # Generated proto directories dont include __init__.py, but
+ # these are needed for python package resolution
+ for walk_root, _, _ in os.walk(os.path.join(self.outdir, 'protos')):
+ path = os.path.join(walk_root, '__init__.py')
+ open(path, 'a').close()
+
sys.path.insert(0, self.outdir)
+ import protos.payload.test_payload_pb2 as payload_pb2 # pylint: disable=g-import-not-at-top
+ import protos.requests.r.test_requests_pb2 as request_pb2 # pylint: disable=g-import-not-at-top
+ import protos.responses.test_responses_pb2 as response_pb2 # pylint: disable=g-import-not-at-top
+ import protos.service.test_service_pb2 as service_pb2 # pylint: disable=g-import-not-at-top
+ self._payload_pb2 = payload_pb2
+ self._request_pb2 = request_pb2
+ self._response_pb2 = response_pb2
+ self._service_pb2 = service_pb2
+
def tearDown(self):
try:
shutil.rmtree(self.outdir)
@@ -282,43 +305,40 @@ class PythonPluginTest(unittest.TestCase):
def testImportAttributes(self):
# check that we can access the generated module and its members.
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))
- self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))
- self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))
- self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, SERVICER_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, STUB_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, SERVER_FACTORY_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, STUB_FACTORY_IDENTIFIER, None))
def testUpDown(self):
- import protoc_plugin_test_pb2 as test_pb2
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (servicer, stub):
- request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(
+ self._service_pb2, self._response_pb2, self._payload_pb2):
+ self._request_pb2.SimpleRequest(response_size=13)
def testIncompleteServicer(self):
- import protoc_plugin_test_pb2 as test_pb2
- moves.reload_module(test_pb2)
- with _CreateIncompleteService(test_pb2) as (servicer, stub):
- request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateIncompleteService(self._service_pb2) as (_, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
try:
- response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
+ stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
except face.AbortionError as error:
self.assertEqual(interfaces.StatusCode.UNIMPLEMENTED, error.code)
def testUnaryCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
- request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
expected_response = methods.UnaryCall(request, 'not a real context!')
self.assertEqual(expected_response, response)
def testUnaryCallFuture(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = test_pb2.SimpleRequest(response_size=13)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
# Check that the call does not block waiting for the server to respond.
with methods.pause():
response_future = stub.UnaryCall.future(
@@ -328,10 +348,9 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testUnaryCallFutureExpired(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
- request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
with methods.pause():
response_future = stub.UnaryCall.future(
request, test_constants.SHORT_TIMEOUT)
@@ -339,30 +358,27 @@ class PythonPluginTest(unittest.TestCase):
response_future.result()
def testUnaryCallFutureCancelled(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = test_pb2.SimpleRequest(response_size=13)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
with methods.pause():
response_future = stub.UnaryCall.future(request, 1)
response_future.cancel()
self.assertTrue(response_future.cancelled())
def testUnaryCallFutureFailed(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = test_pb2.SimpleRequest(response_size=13)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
with methods.fail():
response_future = stub.UnaryCall.future(
request, test_constants.LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testStreamingOutputCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = _streaming_output_request(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = _streaming_output_request(self._request_pb2)
responses = stub.StreamingOutputCall(
request, test_constants.LONG_TIMEOUT)
expected_responses = methods.StreamingOutputCall(
@@ -372,10 +388,9 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testStreamingOutputCallExpired(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = _streaming_output_request(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = _streaming_output_request(self._request_pb2)
with methods.pause():
responses = stub.StreamingOutputCall(
request, test_constants.SHORT_TIMEOUT)
@@ -383,10 +398,9 @@ class PythonPluginTest(unittest.TestCase):
list(responses)
def testStreamingOutputCallCancelled(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = _streaming_output_request(test_pb2)
- with _CreateService(test_pb2) as (unused_methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = _streaming_output_request(self._request_pb2)
responses = stub.StreamingOutputCall(
request, test_constants.LONG_TIMEOUT)
next(responses)
@@ -395,10 +409,9 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testStreamingOutputCallFailed(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = _streaming_output_request(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = _streaming_output_request(self._request_pb2)
with methods.fail():
responses = stub.StreamingOutputCall(request, 1)
self.assertIsNotNone(responses)
@@ -406,36 +419,38 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testStreamingInputCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
response = stub.StreamingInputCall(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
expected_response = methods.StreamingInputCall(
- _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+ _streaming_input_request_iterator(self._request_pb2, self._payload_pb2),
+ 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallFuture(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
response = response_future.result()
expected_response = methods.StreamingInputCall(
- _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+ _streaming_input_request_iterator(self._request_pb2, self._payload_pb2),
+ 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallFutureExpired(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.SHORT_TIMEOUT)
with self.assertRaises(face.ExpirationError):
response_future.result()
@@ -443,12 +458,12 @@ class PythonPluginTest(unittest.TestCase):
response_future.exception(), face.ExpirationError)
def testStreamingInputCallFutureCancelled(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
response_future.cancel()
self.assertTrue(response_future.cancelled())
@@ -456,32 +471,32 @@ class PythonPluginTest(unittest.TestCase):
response_future.result()
def testStreamingInputCallFutureFailed(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.fail():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testFullDuplexCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
responses = stub.FullDuplexCall(
- _full_duplex_request_iterator(test_pb2), test_constants.LONG_TIMEOUT)
+ _full_duplex_request_iterator(self._request_pb2),
+ test_constants.LONG_TIMEOUT)
expected_responses = methods.FullDuplexCall(
- _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
+ _full_duplex_request_iterator(self._request_pb2),
+ 'not a real RpcContext!')
for expected_response, response in moves.zip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
def testFullDuplexCallExpired(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request_iterator = _full_duplex_request_iterator(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ request_iterator = _full_duplex_request_iterator(self._request_pb2)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.pause():
responses = stub.FullDuplexCall(
request_iterator, test_constants.SHORT_TIMEOUT)
@@ -489,10 +504,9 @@ class PythonPluginTest(unittest.TestCase):
list(responses)
def testFullDuplexCallCancelled(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
- request_iterator = _full_duplex_request_iterator(test_pb2)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request_iterator = _full_duplex_request_iterator(self._request_pb2)
responses = stub.FullDuplexCall(
request_iterator, test_constants.LONG_TIMEOUT)
next(responses)
@@ -501,10 +515,9 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testFullDuplexCallFailed(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request_iterator = _full_duplex_request_iterator(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ request_iterator = _full_duplex_request_iterator(self._request_pb2)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.fail():
responses = stub.FullDuplexCall(
request_iterator, test_constants.LONG_TIMEOUT)
@@ -513,14 +526,13 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testHalfDuplexCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
def half_duplex_request_iterator():
- request = test_pb2.StreamingOutputCallRequest()
+ request = self._request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
- request = test_pb2.StreamingOutputCallRequest()
+ request = self._request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
@@ -533,8 +545,6 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testHalfDuplexCallWedged(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
condition = threading.Condition()
wait_cell = [False]
@contextlib.contextmanager
@@ -547,13 +557,14 @@ class PythonPluginTest(unittest.TestCase):
wait_cell[0] = False
condition.notify_all()
def half_duplex_request_iterator():
- request = test_pb2.StreamingOutputCallRequest()
+ request = self._request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
with condition:
while wait_cell[0]:
condition.wait()
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with wait():
responses = stub.HalfDuplexCall(
half_duplex_request_iterator(), test_constants.SHORT_TIMEOUT)
@@ -563,5 +574,5 @@ class PythonPluginTest(unittest.TestCase):
if __name__ == '__main__':
- os.chdir(os.path.dirname(sys.argv[0]))
+ #os.chdir(os.path.dirname(sys.argv[0]))
unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto b/src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto
new file mode 100644
index 0000000000..457543aa79
--- /dev/null
+++ b/src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto
@@ -0,0 +1,51 @@
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+package grpc_protoc_plugin;
+
+enum PayloadType {
+ // Compressable text format.
+ COMPRESSABLE= 0;
+
+ // Uncompressable binary format.
+ UNCOMPRESSABLE = 1;
+
+ // Randomly chosen from all other formats defined in this enum.
+ RANDOM = 2;
+}
+
+message Payload {
+ PayloadType payload_type = 1;
+ oneof payload_body {
+ string payload_compressable = 2;
+ bytes payload_uncompressable = 3;
+ }
+}
diff --git a/src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto b/src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto
new file mode 100644
index 0000000000..54105df6a5
--- /dev/null
+++ b/src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto
@@ -0,0 +1,77 @@
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+import "protos/payload/test_payload.proto";
+
+package grpc_protoc_plugin;
+
+message SimpleRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, server randomly chooses one from other formats.
+ PayloadType response_type = 1;
+
+ // Desired payload size in the response from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ int32 response_size = 2;
+
+ // input payload sent along with the request.
+ Payload payload = 3;
+}
+
+message StreamingInputCallRequest {
+ // input payload sent along with the request.
+ Payload payload = 1;
+
+ // Not expecting any payload from the response.
+}
+
+message ResponseParameters {
+ // Desired payload sizes in responses from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ int32 size = 1;
+
+ // Desired interval between consecutive responses in the response stream in
+ // microseconds.
+ int32 interval_us = 2;
+}
+
+message StreamingOutputCallRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, the payload from each response in the stream
+ // might be of different types. This is to simulate a mixed type of payload
+ // stream.
+ PayloadType response_type = 1;
+
+ repeated ResponseParameters response_parameters = 2;
+
+ // input payload sent along with the request.
+ Payload payload = 3;
+}
diff --git a/src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto b/src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto
new file mode 100644
index 0000000000..734fbda86e
--- /dev/null
+++ b/src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto
@@ -0,0 +1,47 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+import "protos/payload/test_payload.proto";
+
+package grpc_protoc_plugin;
+
+message SimpleResponse {
+ Payload payload = 1;
+}
+
+message StreamingInputCallResponse {
+ // Aggregated size of payloads received from the client.
+ int32 aggregated_payload_size = 1;
+}
+
+message StreamingOutputCallResponse {
+ Payload payload = 1;
+}
diff --git a/src/python/grpcio/tests/protoc_plugin/protoc_plugin_test.proto b/src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto
index 6762a8e7f3..fe715ee7f9 100644
--- a/src/python/grpcio/tests/protoc_plugin/protoc_plugin_test.proto
+++ b/src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto
@@ -1,4 +1,4 @@
-// Copyright 2015, Google Inc.
+// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -27,87 +27,12 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-// An integration test service that covers all the method signature permutations
-// of unary/streaming requests/responses.
-// This file is duplicated around the code base. See GitHub issue #526.
-syntax = "proto2";
+syntax = "proto3";
-package grpc_protoc_plugin;
-
-enum PayloadType {
- // Compressable text format.
- COMPRESSABLE= 1;
-
- // Uncompressable binary format.
- UNCOMPRESSABLE = 2;
-
- // Randomly chosen from all other formats defined in this enum.
- RANDOM = 3;
-}
-
-message Payload {
- required PayloadType payload_type = 1;
- oneof payload_body {
- string payload_compressable = 2;
- bytes payload_uncompressable = 3;
- }
-}
-
-message SimpleRequest {
- // Desired payload type in the response from the server.
- // If response_type is RANDOM, server randomly chooses one from other formats.
- optional PayloadType response_type = 1 [default=COMPRESSABLE];
-
- // Desired payload size in the response from the server.
- // If response_type is COMPRESSABLE, this denotes the size before compression.
- optional int32 response_size = 2;
-
- // Optional input payload sent along with the request.
- optional Payload payload = 3;
-}
-
-message SimpleResponse {
- optional Payload payload = 1;
-}
-
-message StreamingInputCallRequest {
- // Optional input payload sent along with the request.
- optional Payload payload = 1;
+import "protos/requests/r/test_requests.proto";
+import "protos/responses/test_responses.proto";
- // Not expecting any payload from the response.
-}
-
-message StreamingInputCallResponse {
- // Aggregated size of payloads received from the client.
- optional int32 aggregated_payload_size = 1;
-}
-
-message ResponseParameters {
- // Desired payload sizes in responses from the server.
- // If response_type is COMPRESSABLE, this denotes the size before compression.
- required int32 size = 1;
-
- // Desired interval between consecutive responses in the response stream in
- // microseconds.
- required int32 interval_us = 2;
-}
-
-message StreamingOutputCallRequest {
- // Desired payload type in the response from the server.
- // If response_type is RANDOM, the payload from each response in the stream
- // might be of different types. This is to simulate a mixed type of payload
- // stream.
- optional PayloadType response_type = 1 [default=COMPRESSABLE];
-
- repeated ResponseParameters response_parameters = 2;
-
- // Optional input payload sent along with the request.
- optional Payload payload = 3;
-}
-
-message StreamingOutputCallResponse {
- optional Payload payload = 1;
-}
+package grpc_protoc_plugin;
service TestService {
// One request followed by one response.
diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py
index eed0b0c6da..b372ea01ad 100644
--- a/src/python/grpcio/tests/qps/benchmark_client.py
+++ b/src/python/grpcio/tests/qps/benchmark_client.py
@@ -39,6 +39,7 @@ except ImportError:
from concurrent import futures
from grpc.beta import implementations
+from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import services_pb2
from tests.unit import resources
@@ -141,10 +142,10 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient):
self._stub = None
-class StreamingAsyncBenchmarkClient(BenchmarkClient):
+class StreamingSyncBenchmarkClient(BenchmarkClient):
def __init__(self, server, config, hist):
- super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
+ super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
self._is_streaming = False
self._pool = futures.ThreadPoolExecutor(max_workers=1)
# Use a thread-safe queue to put requests on the stream
@@ -167,12 +168,12 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
def _request_stream(self):
self._is_streaming = True
if self._generic:
- response_stream = self._stub.inline_stream_stream(
- 'grpc.testing.BenchmarkService', 'StreamingCall',
- self._request_generator(), _TIMEOUT)
+ stream_callable = self._stub.stream_stream(
+ 'grpc.testing.BenchmarkService', 'StreamingCall')
else:
- response_stream = self._stub.StreamingCall(self._request_generator(),
- _TIMEOUT)
+ stream_callable = self._stub.StreamingCall
+
+ response_stream = stream_callable(self._request_generator(), _TIMEOUT)
for _ in response_stream:
end_time = time.time()
self._handle_response(end_time - self._send_time_queue.get_nowait())
@@ -184,3 +185,48 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
yield request
except queue.Empty:
pass
+
+
+class AsyncReceiver(face.ResponseReceiver):
+ """Receiver for async stream responses."""
+
+ def __init__(self, send_time_queue, response_handler):
+ self._send_time_queue = send_time_queue
+ self._response_handler = response_handler
+
+ def initial_metadata(self, initial_mdetadata):
+ pass
+
+ def response(self, response):
+ end_time = time.time()
+ self._response_handler(end_time - self._send_time_queue.get_nowait())
+
+ def complete(self, terminal_metadata, code, details):
+ pass
+
+
+class StreamingAsyncBenchmarkClient(BenchmarkClient):
+
+ def __init__(self, server, config, hist):
+ super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
+ self._send_time_queue = queue.Queue()
+ self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response)
+ self._rendezvous = None
+
+ def send_request(self):
+ if self._rendezvous is not None:
+ self._send_time_queue.put(time.time())
+ self._rendezvous.consume(self._request)
+
+ def start(self):
+ if self._generic:
+ stream_callable = self._stub.stream_stream(
+ 'grpc.testing.BenchmarkService', 'StreamingCall')
+ else:
+ stream_callable = self._stub.StreamingCall
+ self._rendezvous = stream_callable.event(
+ self._receiver, lambda *args: None, _TIMEOUT)
+
+ def stop(self):
+ self._rendezvous.terminate()
+ self._rendezvous = None
diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py
index a36c30ccc0..1ede7d2af1 100644
--- a/src/python/grpcio/tests/qps/client_runner.py
+++ b/src/python/grpcio/tests/qps/client_runner.py
@@ -89,9 +89,9 @@ class ClosedLoopClientRunner(ClientRunner):
def start(self):
self._is_running = True
+ self._client.start()
for _ in xrange(self._request_count):
self._client.send_request()
- self._client.start()
def stop(self):
self._is_running = False
diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py
index 0b3acc14e7..1f9af5482c 100644
--- a/src/python/grpcio/tests/qps/worker_server.py
+++ b/src/python/grpcio/tests/qps/worker_server.py
@@ -146,8 +146,9 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer):
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnarySyncBenchmarkClient(
server, config, qps_data)
- else:
- raise Exception('STREAMING SYNC client not supported')
+ elif config.rpc_type == control_pb2.STREAMING:
+ client = benchmark_client.StreamingSyncBenchmarkClient(
+ server, config, qps_data)
elif config.client_type == control_pb2.ASYNC_CLIENT:
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnaryAsyncBenchmarkClient(
diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json
index 84870aaa5c..691062f25a 100644
--- a/src/python/grpcio/tests/tests.json
+++ b/src/python/grpcio/tests/tests.json
@@ -28,7 +28,8 @@
"_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
"_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
- "_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
+ "_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
+ "_health_servicer_test.HealthServicerTest",
"_implementations_test.ChannelCredentialsTest",
"_insecure_interop_test.InsecureInteropTest",
"_intermediary_low_test.CancellationTest",
@@ -50,4 +51,4 @@
"cygrpc_test.InsecureServerInsecureClient",
"cygrpc_test.SecureServerSecureClient",
"cygrpc_test.TypeSmokeTest"
-] \ No newline at end of file
+]
diff --git a/src/python/grpcio/tests/unit/_cython/_channel_test.py b/src/python/grpcio/tests/unit/_cython/_channel_test.py
index 931cd9083e..3dc7a246ae 100644
--- a/src/python/grpcio/tests/unit/_cython/_channel_test.py
+++ b/src/python/grpcio/tests/unit/_cython/_channel_test.py
@@ -60,7 +60,7 @@ def _create_loop_destroy():
def _in_parallel(behavior, arguments):
threads = tuple(
threading.Thread(target=behavior, args=arguments)
- for _ in range(test_constants.PARALLELISM))
+ for _ in range(test_constants.THREAD_CONCURRENCY))
for thread in threads:
thread.start()
for thread in threads:
diff --git a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py
index 876da88de9..0a511101f0 100644
--- a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py
@@ -40,6 +40,7 @@ from tests.unit import resources
_SSL_HOST_OVERRIDE = 'foo.test.google.fr'
_CALL_CREDENTIALS_METADATA_KEY = 'call-creds-key'
_CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value'
+_EMPTY_FLAGS = 0
def _metadata_plugin_callback(context, callback):
callback(cygrpc.Metadata(
@@ -76,7 +77,7 @@ class TypeSmokeTest(unittest.TestCase):
def testOperationsIteration(self):
operations = cygrpc.Operations([
- cygrpc.operation_send_message('asdf')])
+ cygrpc.operation_send_message('asdf', _EMPTY_FLAGS)])
iterator = iter(operations)
operation = next(iterator)
self.assertIsInstance(operation, cygrpc.Operation)
@@ -85,6 +86,11 @@ class TypeSmokeTest(unittest.TestCase):
with self.assertRaises(StopIteration):
next(iterator)
+ def testOperationFlags(self):
+ operation = cygrpc.operation_send_message('asdf',
+ cygrpc.WriteFlag.no_compress)
+ self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags)
+
def testTimespec(self):
now = time.time()
timespec = cygrpc.Timespec(now)
@@ -188,12 +194,13 @@ class InsecureServerInsecureClient(unittest.TestCase):
CLIENT_METADATA_ASCII_VALUE),
cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)])
client_start_batch_result = client_call.start_batch(cygrpc.Operations([
- cygrpc.operation_send_initial_metadata(client_initial_metadata),
- cygrpc.operation_send_message(REQUEST),
- cygrpc.operation_send_close_from_client(),
- cygrpc.operation_receive_initial_metadata(),
- cygrpc.operation_receive_message(),
- cygrpc.operation_receive_status_on_client()
+ cygrpc.operation_send_initial_metadata(client_initial_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS),
+ cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
+ cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
]), client_call_tag)
self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
client_event_future = test_utilities.CompletionQueuePollFuture(
@@ -223,12 +230,14 @@ class InsecureServerInsecureClient(unittest.TestCase):
cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY,
SERVER_TRAILING_METADATA_VALUE)])
server_start_batch_result = server_call.start_batch([
- cygrpc.operation_send_initial_metadata(server_initial_metadata),
- cygrpc.operation_receive_message(),
- cygrpc.operation_send_message(RESPONSE),
- cygrpc.operation_receive_close_on_server(),
+ cygrpc.operation_send_initial_metadata(server_initial_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS),
+ cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
- server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS)
+ server_trailing_metadata, SERVER_STATUS_CODE,
+ SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
], server_call_tag)
self.assertEqual(cygrpc.CallError.ok, server_start_batch_result)
@@ -349,12 +358,13 @@ class SecureServerSecureClient(unittest.TestCase):
CLIENT_METADATA_ASCII_VALUE),
cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)])
client_start_batch_result = client_call.start_batch(cygrpc.Operations([
- cygrpc.operation_send_initial_metadata(client_initial_metadata),
- cygrpc.operation_send_message(REQUEST),
- cygrpc.operation_send_close_from_client(),
- cygrpc.operation_receive_initial_metadata(),
- cygrpc.operation_receive_message(),
- cygrpc.operation_receive_status_on_client()
+ cygrpc.operation_send_initial_metadata(client_initial_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS),
+ cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
+ cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
]), client_call_tag)
self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
client_event_future = test_utilities.CompletionQueuePollFuture(
@@ -387,12 +397,14 @@ class SecureServerSecureClient(unittest.TestCase):
cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY,
SERVER_TRAILING_METADATA_VALUE)])
server_start_batch_result = server_call.start_batch([
- cygrpc.operation_send_initial_metadata(server_initial_metadata),
- cygrpc.operation_receive_message(),
- cygrpc.operation_send_message(RESPONSE),
- cygrpc.operation_receive_close_on_server(),
+ cygrpc.operation_send_initial_metadata(server_initial_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS),
+ cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
- server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS)
+ server_trailing_metadata, SERVER_STATUS_CODE,
+ SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
], server_call_tag)
self.assertEqual(cygrpc.CallError.ok, server_start_batch_result)
diff --git a/src/python/grpcio/tests/unit/framework/common/test_constants.py b/src/python/grpcio/tests/unit/framework/common/test_constants.py
index 8d89101e09..b6682d396c 100644
--- a/src/python/grpcio/tests/unit/framework/common/test_constants.py
+++ b/src/python/grpcio/tests/unit/framework/common/test_constants.py
@@ -49,8 +49,13 @@ STREAM_LENGTH = 200
# The size of payloads to transmit in tests.
PAYLOAD_SIZE = 256 * 1024 + 17
-# The parallelism to use in tests of parallel RPCs.
-PARALLELISM = 200
+# The concurrency to use in tests of concurrent RPCs that will not create as
+# many threads as RPCs.
+RPC_CONCURRENCY = 200
+
+# The concurrency to use in tests of concurrent RPCs that will create as many
+# threads as RPCs.
+THREAD_CONCURRENCY = 25
# The size of thread pools to use in tests.
POOL_SIZE = 10
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
index 649892463a..e338aaa396 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -146,13 +146,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
test_messages.verify(second_request, second_response, self)
def testParallelInvocations(self):
- pool = logging_pool.pool(test_constants.PARALLELISM)
+ pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (
six.iteritems(self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures = []
- for _ in range(test_constants.PARALLELISM):
+ for _ in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = pool.submit(
self._invoker.blocking(group, method), request,
@@ -168,13 +168,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
pool.shutdown(wait=True)
def testWaitingForSomeButNotAllParallelInvocations(self):
- pool = logging_pool.pool(test_constants.PARALLELISM)
+ pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (
six.iteritems(self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures_to_indices = {}
- for index in range(test_constants.PARALLELISM):
+ for index in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = pool.submit(
self._invoker.blocking(group, method), request,
@@ -184,7 +184,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
some_completed_response_futures_iterator = itertools.islice(
futures.as_completed(response_futures_to_indices),
- test_constants.PARALLELISM // 2)
+ test_constants.THREAD_CONCURRENCY // 2)
for response_future in some_completed_response_futures_iterator:
index = response_futures_to_indices[response_future]
test_messages.verify(requests[index], response_future.result(), self)
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
index c3813d5f3a..791620307b 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -249,7 +249,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
for test_messages in test_messages_sequence:
requests = []
response_futures = []
- for _ in range(test_constants.PARALLELISM):
+ for _ in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
@@ -263,13 +263,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
test_messages.verify(request, response, self)
def testWaitingForSomeButNotAllParallelInvocations(self):
- pool = logging_pool.pool(test_constants.PARALLELISM)
+ pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (
six.iteritems(self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures_to_indices = {}
- for index in range(test_constants.PARALLELISM):
+ for index in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
inner_response_future = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
@@ -279,7 +279,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
some_completed_response_futures_iterator = itertools.islice(
futures.as_completed(response_futures_to_indices),
- test_constants.PARALLELISM // 2)
+ test_constants.THREAD_CONCURRENCY // 2)
for response_future in some_completed_response_futures_iterator:
index = response_futures_to_indices[response_future]
test_messages.verify(requests[index], response_future.result(), self)