aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
Diffstat (limited to 'src/python')
-rw-r--r--src/python/grpcio/_spawn_patch.py (renamed from src/python/grpcio/_unixccompiler_patch.py)62
-rw-r--r--src/python/grpcio_tests/tests/interop/_insecure_interop_test.py14
-rw-r--r--src/python/grpcio_tests/tests/interop/_secure_interop_test.py24
-rw-r--r--src/python/grpcio_tests/tests/interop/client.py56
-rw-r--r--src/python/grpcio_tests/tests/interop/methods.py270
-rw-r--r--src/python/grpcio_tests/tests/interop/server.py12
-rw-r--r--src/python/grpcio_tests/tests/stress/client.py21
-rw-r--r--src/python/grpcio_tests/tests/stress/metrics_server.py2
8 files changed, 227 insertions, 234 deletions
diff --git a/src/python/grpcio/_unixccompiler_patch.py b/src/python/grpcio/_spawn_patch.py
index 894c3ef395..24306f0dd9 100644
--- a/src/python/grpcio/_unixccompiler_patch.py
+++ b/src/python/grpcio/_spawn_patch.py
@@ -27,51 +27,47 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Covers inadequacies in distutils."""
+"""Patches the spawn() command for windows compilers.
+
+Windows has an 8191 character command line limit, but some compilers
+support an @command_file directive where command_file is a file
+containing the full command line.
+"""
from distutils import ccompiler
-from distutils import errors
-from distutils import unixccompiler
import os
import os.path
-import shlex
import shutil
import sys
import tempfile
-def _unix_commandfile_spawn(self, command):
- """Wrapper around distutils.util.spawn that attempts to use command files.
+MAX_COMMAND_LENGTH = 8191
- Meant to replace the CCompiler method `spawn` on UnixCCompiler and its
- derivatives (e.g. the MinGW32 compiler).
+_classic_spawn = ccompiler.CCompiler.spawn
- Some commands like `gcc` (and friends like `clang`) support command files to
- work around shell command length limits.
- """
- # Sometimes distutils embeds the executables as full strings including some
- # hard-coded flags rather than as lists.
- command = list(shlex.split(command[0])) + list(command[1:])
- command_base = os.path.basename(command[0].strip())
- if command_base == 'ccache':
- command_base = command[:2]
- command_args = command[2:]
- elif command_base.startswith('ccache') or command_base in ['gcc', 'clang', 'clang++', 'g++']:
- command_base = command[:1]
- command_args = command[1:]
+def _commandfile_spawn(self, command):
+ command_length = sum([len(arg) for arg in command])
+ if os.name == 'nt' and command_length > MAX_COMMAND_LENGTH:
+ # Even if this command doesn't support the @command_file, it will
+ # fail as is so we try blindly
+ print('Command line length exceeded, using command file')
+ print(' '.join(command))
+ temporary_directory = tempfile.mkdtemp()
+ command_filename = os.path.abspath(
+ os.path.join(temporary_directory, 'command'))
+ with open(command_filename, 'w') as command_file:
+ escaped_args = ['"' + arg.replace('\\', '\\\\') + '"' for arg in command[1:]]
+ command_file.write(' '.join(escaped_args))
+ modified_command = command[:1] + ['@{}'.format(command_filename)]
+ try:
+ _classic_spawn(self, modified_command)
+ finally:
+ shutil.rmtree(temporary_directory)
else:
- return ccompiler.CCompiler.spawn(self, command)
- temporary_directory = tempfile.mkdtemp()
- command_filename = os.path.abspath(os.path.join(temporary_directory, 'command'))
- with open(command_filename, 'w') as command_file:
- escaped_args = [arg.replace('\\', '\\\\') for arg in command_args]
- command_file.write(' '.join(escaped_args))
- modified_command = command_base + ['@{}'.format(command_filename)]
- result = ccompiler.CCompiler.spawn(self, modified_command)
- shutil.rmtree(temporary_directory)
- return result
+ _classic_spawn(self, command)
-def monkeypatch_unix_compiler():
+def monkeypatch_spawn():
"""Monkeypatching is dumb, but it's either that or we become maintainers of
something much, much bigger."""
- unixccompiler.UnixCCompiler.spawn = _unix_commandfile_spawn
+ ccompiler.CCompiler.spawn = _commandfile_spawn
diff --git a/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
index c753d6faf0..936c895bd2 100644
--- a/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
@@ -29,9 +29,10 @@
"""Insecure client-server interoperability as a unit test."""
+from concurrent import futures
import unittest
-from grpc.beta import implementations
+import grpc
from src.proto.grpc.testing import test_pb2
from tests.interop import _interop_test_case
@@ -44,14 +45,13 @@ class InsecureInteropTest(
unittest.TestCase):
def setUp(self):
- self.server = test_pb2.beta_create_TestService_server(methods.TestService())
+ self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ test_pb2.add_TestServiceServicer_to_server(
+ methods.TestService(), self.server)
port = self.server.add_insecure_port('[::]:0')
self.server.start()
- self.stub = test_pb2.beta_create_TestService_stub(
- implementations.insecure_channel('localhost', port))
-
- def tearDown(self):
- self.server.stop(0)
+ self.stub = test_pb2.TestServiceStub(
+ grpc.insecure_channel('localhost:{}'.format(port)))
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/interop/_secure_interop_test.py b/src/python/grpcio_tests/tests/interop/_secure_interop_test.py
index cb09f54a34..eaca553e1b 100644
--- a/src/python/grpcio_tests/tests/interop/_secure_interop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_secure_interop_test.py
@@ -29,17 +29,16 @@
"""Secure client-server interoperability as a unit test."""
+from concurrent import futures
import unittest
-from grpc.beta import implementations
+import grpc
from src.proto.grpc.testing import test_pb2
from tests.interop import _interop_test_case
from tests.interop import methods
from tests.interop import resources
-from tests.unit.beta import test_utilities
-
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
@@ -48,19 +47,18 @@ class SecureInteropTest(
unittest.TestCase):
def setUp(self):
- self.server = test_pb2.beta_create_TestService_server(methods.TestService())
+ self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ test_pb2.add_TestServiceServicer_to_server(
+ methods.TestService(), self.server)
port = self.server.add_secure_port(
- '[::]:0', implementations.ssl_server_credentials(
+ '[::]:0', grpc.ssl_server_credentials(
[(resources.private_key(), resources.certificate_chain())]))
self.server.start()
- self.stub = test_pb2.beta_create_TestService_stub(
- test_utilities.not_really_secure_channel(
- 'localhost', port, implementations.ssl_channel_credentials(
- resources.test_root_certificates()),
- _SERVER_HOST_OVERRIDE))
-
- def tearDown(self):
- self.server.stop(0)
+ self.stub = test_pb2.TestServiceStub(
+ grpc.secure_channel(
+ 'localhost:{}'.format(port),
+ grpc.ssl_channel_credentials(resources.test_root_certificates()),
+ (('grpc.ssl_target_name_override', _SERVER_HOST_OVERRIDE,),)))
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py
index 8aa1ce30c1..9d61d18975 100644
--- a/src/python/grpcio_tests/tests/interop/client.py
+++ b/src/python/grpcio_tests/tests/interop/client.py
@@ -32,14 +32,12 @@
import argparse
from oauth2client import client as oauth2client_client
+import grpc
from grpc.beta import implementations
from src.proto.grpc.testing import test_pb2
from tests.interop import methods
from tests.interop import resources
-from tests.unit.beta import test_utilities
-
-_ONE_DAY_IN_SECONDS = 60 * 60 * 24
def _args():
@@ -66,41 +64,49 @@ def _args():
return parser.parse_args()
+def _application_default_credentials():
+ return oauth2client_client.GoogleCredentials.get_application_default()
+
+
def _stub(args):
+ target = '{}:{}'.format(args.server_host, args.server_port)
if args.test_case == 'oauth2_auth_token':
- creds = oauth2client_client.GoogleCredentials.get_application_default()
- scoped_creds = creds.create_scoped([args.oauth_scope])
- access_token = scoped_creds.get_access_token().access_token
- call_creds = implementations.access_token_call_credentials(access_token)
+ google_credentials = _application_default_credentials()
+ scoped_credentials = google_credentials.create_scoped([args.oauth_scope])
+ access_token = scoped_credentials.get_access_token().access_token
+ call_credentials = grpc.access_token_call_credentials(access_token)
elif args.test_case == 'compute_engine_creds':
- creds = oauth2client_client.GoogleCredentials.get_application_default()
- scoped_creds = creds.create_scoped([args.oauth_scope])
- call_creds = implementations.google_call_credentials(scoped_creds)
+ google_credentials = _application_default_credentials()
+ scoped_credentials = google_credentials.create_scoped([args.oauth_scope])
+ # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
+ # remaining use of the Beta API.
+ call_credentials = implementations.google_call_credentials(
+ scoped_credentials)
elif args.test_case == 'jwt_token_creds':
- creds = oauth2client_client.GoogleCredentials.get_application_default()
- call_creds = implementations.google_call_credentials(creds)
+ google_credentials = _application_default_credentials()
+ # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
+ # remaining use of the Beta API.
+ call_credentials = implementations.google_call_credentials(
+ google_credentials)
else:
- call_creds = None
+ call_credentials = None
if args.use_tls:
if args.use_test_ca:
root_certificates = resources.test_root_certificates()
else:
root_certificates = None # will load default roots.
- channel_creds = implementations.ssl_channel_credentials(root_certificates)
- if call_creds is not None:
- channel_creds = implementations.composite_channel_credentials(
- channel_creds, call_creds)
+ channel_credentials = grpc.ssl_channel_credentials(root_certificates)
+ if call_credentials is not None:
+ channel_credentials = grpc.composite_channel_credentials(
+ channel_credentials, call_credentials)
- channel = test_utilities.not_really_secure_channel(
- args.server_host, args.server_port, channel_creds,
- args.server_host_override)
- stub = test_pb2.beta_create_TestService_stub(channel)
+ channel = grpc.secure_channel(
+ target, channel_credentials,
+ (('grpc.ssl_target_name_override', args.server_host_override,),))
else:
- channel = implementations.insecure_channel(
- args.server_host, args.server_port)
- stub = test_pb2.beta_create_TestService_stub(channel)
- return stub
+ channel = grpc.insecure_channel(target)
+ return test_pb2.TestServiceStub(channel)
def _test_case_from_arg(test_case_arg):
diff --git a/src/python/grpcio_tests/tests/interop/methods.py b/src/python/grpcio_tests/tests/interop/methods.py
index 97e6c9e27e..7edd75c56c 100644
--- a/src/python/grpcio_tests/tests/interop/methods.py
+++ b/src/python/grpcio_tests/tests/interop/methods.py
@@ -29,8 +29,6 @@
"""Implementations of interoperability test methods."""
-from __future__ import print_function
-
import enum
import json
import os
@@ -41,26 +39,21 @@ from oauth2client import client as oauth2client_client
import grpc
from grpc.beta import implementations
-from grpc.beta import interfaces
-from grpc.framework.common import cardinality
-from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2
-_TIMEOUT = 7
-
-class TestService(test_pb2.BetaTestServiceServicer):
+class TestService(test_pb2.TestServiceServicer):
def EmptyCall(self, request, context):
return empty_pb2.Empty()
def UnaryCall(self, request, context):
if request.HasField('response_status'):
- context.code(request.response_status.code)
- context.details(request.response_status.message)
+ context.set_code(request.response_status.code)
+ context.set_details(request.response_status.message)
return messages_pb2.SimpleResponse(
payload=messages_pb2.Payload(
type=messages_pb2.COMPRESSABLE,
@@ -68,8 +61,8 @@ class TestService(test_pb2.BetaTestServiceServicer):
def StreamingOutputCall(self, request, context):
if request.HasField('response_status'):
- context.code(request.response_status.code)
- context.details(request.response_status.message)
+ context.set_code(request.response_status.code)
+ context.set_details(request.response_status.message)
for response_parameters in request.response_parameters:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
@@ -79,7 +72,7 @@ class TestService(test_pb2.BetaTestServiceServicer):
def StreamingInputCall(self, request_iterator, context):
aggregate_size = 0
for request in request_iterator:
- if request.payload and request.payload.body:
+ if request.payload is not None and request.payload.body:
aggregate_size += len(request.payload.body)
return messages_pb2.StreamingInputCallResponse(
aggregated_payload_size=aggregate_size)
@@ -87,8 +80,8 @@ class TestService(test_pb2.BetaTestServiceServicer):
def FullDuplexCall(self, request_iterator, context):
for request in request_iterator:
if request.HasField('response_status'):
- context.code(request.response_status.code)
- context.details(request.response_status.message)
+ context.set_code(request.response_status.code)
+ context.set_details(request.response_status.message)
for response_parameters in request.response_parameters:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
@@ -101,83 +94,80 @@ class TestService(test_pb2.BetaTestServiceServicer):
return self.FullDuplexCall(request_iterator, context)
-def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope,
- protocol_options=None):
- with stub:
- request = messages_pb2.SimpleRequest(
- response_type=messages_pb2.COMPRESSABLE, response_size=314159,
- payload=messages_pb2.Payload(body=b'\x00' * 271828),
- fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
- response_future = stub.UnaryCall.future(request, _TIMEOUT,
- protocol_options=protocol_options)
- response = response_future.result()
- if response.payload.type is not messages_pb2.COMPRESSABLE:
- raise ValueError(
- 'response payload type is "%s"!' % type(response.payload.type))
- if len(response.payload.body) != 314159:
- raise ValueError(
- 'response body of incorrect size %d!' % len(response.payload.body))
+def _large_unary_common_behavior(
+ stub, fill_username, fill_oauth_scope, call_credentials):
+ request = messages_pb2.SimpleRequest(
+ response_type=messages_pb2.COMPRESSABLE, response_size=314159,
+ payload=messages_pb2.Payload(body=b'\x00' * 271828),
+ fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
+ response_future = stub.UnaryCall.future(
+ request, credentials=call_credentials)
+ response = response_future.result()
+ if response.payload.type is not messages_pb2.COMPRESSABLE:
+ raise ValueError(
+ 'response payload type is "%s"!' % type(response.payload.type))
+ elif len(response.payload.body) != 314159:
+ raise ValueError(
+ 'response body of incorrect size %d!' % len(response.payload.body))
+ else:
return response
def _empty_unary(stub):
- with stub:
- response = stub.EmptyCall(empty_pb2.Empty(), _TIMEOUT)
- if not isinstance(response, empty_pb2.Empty):
- raise TypeError(
- 'response is of type "%s", not empty_pb2.Empty!', type(response))
+ response = stub.EmptyCall(empty_pb2.Empty())
+ if not isinstance(response, empty_pb2.Empty):
+ raise TypeError(
+ 'response is of type "%s", not empty_pb2.Empty!', type(response))
def _large_unary(stub):
- _large_unary_common_behavior(stub, False, False)
+ _large_unary_common_behavior(stub, False, False, None)
def _client_streaming(stub):
- with stub:
- payload_body_sizes = (27182, 8, 1828, 45904)
- payloads = (
- messages_pb2.Payload(body=b'\x00' * size)
- for size in payload_body_sizes)
- requests = (
- messages_pb2.StreamingInputCallRequest(payload=payload)
- for payload in payloads)
- response = stub.StreamingInputCall(requests, _TIMEOUT)
- if response.aggregated_payload_size != 74922:
- raise ValueError(
- 'incorrect size %d!' % response.aggregated_payload_size)
+ payload_body_sizes = (27182, 8, 1828, 45904,)
+ payloads = (
+ messages_pb2.Payload(body=b'\x00' * size)
+ for size in payload_body_sizes)
+ requests = (
+ messages_pb2.StreamingInputCallRequest(payload=payload)
+ for payload in payloads)
+ response = stub.StreamingInputCall(requests)
+ if response.aggregated_payload_size != 74922:
+ raise ValueError(
+ 'incorrect size %d!' % response.aggregated_payload_size)
def _server_streaming(stub):
- sizes = (31415, 9, 2653, 58979)
-
- with stub:
- request = messages_pb2.StreamingOutputCallRequest(
- response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(
- messages_pb2.ResponseParameters(size=sizes[0]),
- messages_pb2.ResponseParameters(size=sizes[1]),
- messages_pb2.ResponseParameters(size=sizes[2]),
- messages_pb2.ResponseParameters(size=sizes[3]),
- ))
- response_iterator = stub.StreamingOutputCall(request, _TIMEOUT)
- for index, response in enumerate(response_iterator):
- if response.payload.type != messages_pb2.COMPRESSABLE:
- raise ValueError(
- 'response body of invalid type %s!' % response.payload.type)
- if len(response.payload.body) != sizes[index]:
- raise ValueError(
- 'response body of invalid size %d!' % len(response.payload.body))
+ sizes = (31415, 9, 2653, 58979,)
+
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=sizes[0]),
+ messages_pb2.ResponseParameters(size=sizes[1]),
+ messages_pb2.ResponseParameters(size=sizes[2]),
+ messages_pb2.ResponseParameters(size=sizes[3]),
+ )
+ )
+ response_iterator = stub.StreamingOutputCall(request)
+ for index, response in enumerate(response_iterator):
+ if response.payload.type != messages_pb2.COMPRESSABLE:
+ raise ValueError(
+ 'response body of invalid type %s!' % response.payload.type)
+ elif len(response.payload.body) != sizes[index]:
+ raise ValueError(
+ 'response body of invalid size %d!' % len(response.payload.body))
def _cancel_after_begin(stub):
- with stub:
- sizes = (27182, 8, 1828, 45904)
- payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes]
- requests = [messages_pb2.StreamingInputCallRequest(payload=payload)
- for payload in payloads]
- responses = stub.StreamingInputCall.future(requests, _TIMEOUT)
- responses.cancel()
- if not responses.cancelled():
- raise ValueError('expected call to be cancelled')
+ sizes = (27182, 8, 1828, 45904,)
+ payloads = (messages_pb2.Payload(body=b'\x00' * size) for size in sizes)
+ requests = (messages_pb2.StreamingInputCallRequest(payload=payload)
+ for payload in payloads)
+ response_future = stub.StreamingInputCall.future(requests)
+ response_future.cancel()
+ if not response_future.cancelled():
+ raise ValueError('expected call to be cancelled')
class _Pipe(object):
@@ -220,18 +210,17 @@ class _Pipe(object):
def _ping_pong(stub):
- request_response_sizes = (31415, 9, 2653, 58979)
- request_payload_sizes = (27182, 8, 1828, 45904)
+ request_response_sizes = (31415, 9, 2653, 58979,)
+ request_payload_sizes = (27182, 8, 1828, 45904,)
- with stub, _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
- print('Starting ping-pong with response iterator %s' % response_iterator)
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe)
for response_size, payload_size in zip(
request_response_sizes, request_payload_sizes):
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(messages_pb2.ResponseParameters(
- size=response_size),),
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=response_size),),
payload=messages_pb2.Payload(body=b'\x00' * payload_size))
pipe.add(request)
response = next(response_iterator)
@@ -244,17 +233,17 @@ def _ping_pong(stub):
def _cancel_after_first_response(stub):
- request_response_sizes = (31415, 9, 2653, 58979)
- request_payload_sizes = (27182, 8, 1828, 45904)
- with stub, _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
+ request_response_sizes = (31415, 9, 2653, 58979,)
+ request_payload_sizes = (27182, 8, 1828, 45904,)
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe)
response_size = request_response_sizes[0]
payload_size = request_payload_sizes[0]
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(messages_pb2.ResponseParameters(
- size=response_size),),
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=response_size),),
payload=messages_pb2.Payload(body=b'\x00' * payload_size))
pipe.add(request)
response = next(response_iterator)
@@ -264,16 +253,17 @@ def _cancel_after_first_response(stub):
try:
next(response_iterator)
- except Exception:
- pass
+ except grpc.RpcError as rpc_error:
+ if rpc_error.code() is not grpc.StatusCode.CANCELLED:
+ raise
else:
raise ValueError('expected call to be cancelled')
def _timeout_on_sleeping_server(stub):
request_payload_size = 27182
- with stub, _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe, 0.001)
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe, timeout=0.001)
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
@@ -282,15 +272,16 @@ def _timeout_on_sleeping_server(stub):
time.sleep(0.1)
try:
next(response_iterator)
- except face.ExpirationError:
- pass
+ except grpc.RpcError as rpc_error:
+ if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED:
+ raise
else:
raise ValueError('expected call to exceed deadline')
def _empty_stream(stub):
- with stub, _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe)
pipe.close()
try:
next(response_iterator)
@@ -300,65 +291,64 @@ def _empty_stream(stub):
def _status_code_and_message(stub):
- with stub:
- message = 'test status message'
- code = 2
- status = grpc.StatusCode.UNKNOWN # code = 2
- request = messages_pb2.SimpleRequest(
- response_type=messages_pb2.COMPRESSABLE,
- response_size=1,
- payload=messages_pb2.Payload(body=b'\x00'),
- response_status=messages_pb2.EchoStatus(code=code, message=message)
- )
- response_future = stub.UnaryCall.future(request, _TIMEOUT)
- if response_future.code() != status:
- raise ValueError(
- 'expected code %s, got %s' % (status, response_future.code()))
- if response_future.details() != message:
- raise ValueError(
- 'expected message %s, got %s' % (message, response_future.details()))
-
- request = messages_pb2.StreamingOutputCallRequest(
- response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(
- messages_pb2.ResponseParameters(size=1),),
- response_status=messages_pb2.EchoStatus(code=code, message=message))
- response_iterator = stub.StreamingOutputCall(request, _TIMEOUT)
- if response_future.code() != status:
- raise ValueError(
- 'expected code %s, got %s' % (status, response_iterator.code()))
- if response_future.details() != message:
- raise ValueError(
- 'expected message %s, got %s' % (message, response_iterator.details()))
+ message = 'test status message'
+ code = 2
+ status = grpc.StatusCode.UNKNOWN # code = 2
+ request = messages_pb2.SimpleRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_size=1,
+ payload=messages_pb2.Payload(body=b'\x00'),
+ response_status=messages_pb2.EchoStatus(code=code, message=message)
+ )
+ response_future = stub.UnaryCall.future(request)
+ if response_future.code() != status:
+ raise ValueError(
+ 'expected code %s, got %s' % (status, response_future.code()))
+ elif response_future.details() != message:
+ raise ValueError(
+ 'expected message %s, got %s' % (message, response_future.details()))
+
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=1),),
+ response_status=messages_pb2.EchoStatus(code=code, message=message))
+ response_iterator = stub.StreamingOutputCall(request)
+ if response_future.code() != status:
+ raise ValueError(
+ 'expected code %s, got %s' % (status, response_iterator.code()))
+ elif response_future.details() != message:
+ raise ValueError(
+ 'expected message %s, got %s' % (message, response_iterator.details()))
def _compute_engine_creds(stub, args):
- response = _large_unary_common_behavior(stub, True, True)
+ response = _large_unary_common_behavior(stub, True, True, None)
if args.default_service_account != response.username:
raise ValueError(
- 'expected username %s, got %s' % (args.default_service_account,
- response.username))
+ 'expected username %s, got %s' % (
+ args.default_service_account, response.username))
def _oauth2_auth_token(stub, args):
json_key_filename = os.environ[
oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
- response = _large_unary_common_behavior(stub, True, True)
+ response = _large_unary_common_behavior(stub, True, True, None)
if wanted_email != response.username:
raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username))
if args.oauth_scope.find(response.oauth_scope) == -1:
raise ValueError(
- 'expected to find oauth scope "%s" in received "%s"' %
- (response.oauth_scope, args.oauth_scope))
+ 'expected to find oauth scope "{}" in received "{}"'.format(
+ response.oauth_scope, args.oauth_scope))
def _jwt_token_creds(stub, args):
json_key_filename = os.environ[
oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
- response = _large_unary_common_behavior(stub, True, False)
+ response = _large_unary_common_behavior(stub, True, False, None)
if wanted_email != response.username:
raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username))
@@ -370,11 +360,11 @@ def _per_rpc_creds(stub, args):
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
credentials = oauth2client_client.GoogleCredentials.get_application_default()
scoped_credentials = credentials.create_scoped([args.oauth_scope])
- call_creds = implementations.google_call_credentials(scoped_credentials)
- options = interfaces.grpc_call_options(disable_compression=False,
- credentials=call_creds)
- response = _large_unary_common_behavior(stub, True, False,
- protocol_options=options)
+ # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
+ # remaining use of the Beta API.
+ call_credentials = implementations.google_call_credentials(
+ scoped_credentials)
+ response = _large_unary_common_behavior(stub, True, False, call_credentials)
if wanted_email != response.username:
raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username))
diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py
index ab2c3c708f..1ae83bc57d 100644
--- a/src/python/grpcio_tests/tests/interop/server.py
+++ b/src/python/grpcio_tests/tests/interop/server.py
@@ -30,10 +30,11 @@
"""The Python implementation of the GRPC interoperability test server."""
import argparse
+from concurrent import futures
import logging
import time
-from grpc.beta import implementations
+import grpc
from src.proto.grpc.testing import test_pb2
from tests.interop import methods
@@ -51,12 +52,13 @@ def serve():
default=False, type=resources.parse_bool)
args = parser.parse_args()
- server = test_pb2.beta_create_TestService_server(methods.TestService())
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ test_pb2.add_TestServiceServicer_to_server(methods.TestService(), server)
if args.use_tls:
private_key = resources.private_key()
certificate_chain = resources.certificate_chain()
- credentials = implementations.ssl_server_credentials(
- [(private_key, certificate_chain)])
+ credentials = grpc.ssl_server_credentials(
+ ((private_key, certificate_chain),))
server.add_secure_port('[::]:{}'.format(args.port), credentials)
else:
server.add_insecure_port('[::]:{}'.format(args.port))
@@ -68,7 +70,7 @@ def serve():
time.sleep(_ONE_DAY_IN_SECONDS)
except BaseException as e:
logging.info('Caught exception "%s"; stopping server...', e)
- server.stop(0)
+ server.stop(None)
logging.info('Server stopped; exiting.')
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py
index 0de2532cd8..975f33b4c1 100644
--- a/src/python/grpcio_tests/tests/stress/client.py
+++ b/src/python/grpcio_tests/tests/stress/client.py
@@ -30,9 +30,10 @@
"""Entry point for running stress tests."""
import argparse
+from concurrent import futures
import threading
-from grpc.beta import implementations
+import grpc
from six.moves import queue
from src.proto.grpc.testing import metrics_pb2
from src.proto.grpc.testing import test_pb2
@@ -92,24 +93,24 @@ def _parse_weighted_test_cases(test_case_args):
def run_test(args):
test_cases = _parse_weighted_test_cases(args.test_cases)
- test_servers = args.server_addresses.split(',')
+ test_server_targets = args.server_addresses.split(',')
# Propagate any client exceptions with a queue
exception_queue = queue.Queue()
stop_event = threading.Event()
hist = histogram.Histogram(1, 1)
runners = []
- server = metrics_pb2.beta_create_MetricsService_server(
- metrics_server.MetricsServer(hist))
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=25))
+ metrics_pb2.add_MetricsServiceServicer_to_server(
+ metrics_server.MetricsServer(hist), server)
server.add_insecure_port('[::]:{}'.format(args.metrics_port))
server.start()
- for test_server in test_servers:
- host, port = test_server.split(':', 1)
+ for test_server_target in test_server_targets:
for _ in xrange(args.num_channels_per_server):
- channel = implementations.insecure_channel(host, int(port))
+ channel = grpc.insecure_channel(test_server_target)
for _ in xrange(args.num_stubs_per_channel):
- stub = test_pb2.beta_create_TestService_stub(channel)
+ stub = test_pb2.TestServiceStub(channel)
runner = test_runner.TestRunner(stub, test_cases, hist,
exception_queue, stop_event)
runners.append(runner)
@@ -128,8 +129,8 @@ def run_test(args):
stop_event.set()
for runner in runners:
runner.join()
- runner = None
- server.stop(0)
+ runner = None
+ server.stop(None)
if __name__ == '__main__':
run_test(_args())
diff --git a/src/python/grpcio_tests/tests/stress/metrics_server.py b/src/python/grpcio_tests/tests/stress/metrics_server.py
index b994e4643e..33dd1d6f2a 100644
--- a/src/python/grpcio_tests/tests/stress/metrics_server.py
+++ b/src/python/grpcio_tests/tests/stress/metrics_server.py
@@ -36,7 +36,7 @@ from src.proto.grpc.testing import metrics_pb2
GAUGE_NAME = 'python_overall_qps'
-class MetricsServer(metrics_pb2.BetaMetricsServiceServicer):
+class MetricsServer(metrics_pb2.MetricsServiceServicer):
def __init__(self, histogram):
self._start_time = time.time()