diff options
Diffstat (limited to 'src/python')
17 files changed, 238 insertions, 206 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index ba60986143..cc3bd7a067 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -34,6 +34,7 @@ cdef class Call: def __cinit__(self): # Create an *empty* call + grpc_init() self.c_call = NULL self.references = [] @@ -106,6 +107,7 @@ cdef class Call: def __dealloc__(self): if self.c_call != NULL: grpc_call_destroy(self.c_call) + grpc_shutdown() # The object *should* always be valid from Python. Used for debugging. @property diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 5416401431..3df937eb14 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -34,6 +34,7 @@ cdef class Channel: def __cinit__(self, bytes target, ChannelArgs arguments=None, ChannelCredentials channel_credentials=None): + grpc_init() cdef grpc_channel_args *c_arguments = NULL cdef char *c_target = NULL self.c_channel = NULL @@ -103,3 +104,4 @@ cdef class Channel: def __dealloc__(self): if self.c_channel != NULL: grpc_channel_destroy(self.c_channel) + grpc_shutdown() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 5955021ceb..a258ba4063 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -38,6 +38,7 @@ cdef int _INTERRUPT_CHECK_PERIOD_MS = 200 cdef class CompletionQueue: def __cinit__(self): + grpc_init() with nogil: self.c_completion_queue = grpc_completion_queue_create(NULL) self.is_shutting_down = False @@ -129,3 +130,4 @@ cdef class CompletionQueue: self.c_completion_queue, c_deadline, NULL) self._interpret_event(event) grpc_completion_queue_destroy(self.c_completion_queue) + grpc_shutdown() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index 035ac49a8b..04872b9c09 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -33,6 +33,7 @@ cimport cpython cdef class ChannelCredentials: def __cinit__(self): + grpc_init() self.c_credentials = NULL self.c_ssl_pem_key_cert_pair.private_key = NULL self.c_ssl_pem_key_cert_pair.certificate_chain = NULL @@ -47,11 +48,13 @@ cdef class ChannelCredentials: def __dealloc__(self): if self.c_credentials != NULL: grpc_channel_credentials_release(self.c_credentials) + grpc_shutdown() cdef class CallCredentials: def __cinit__(self): + grpc_init() self.c_credentials = NULL self.references = [] @@ -64,17 +67,20 @@ cdef class CallCredentials: def __dealloc__(self): if self.c_credentials != NULL: grpc_call_credentials_release(self.c_credentials) + grpc_shutdown() cdef class ServerCredentials: def __cinit__(self): + grpc_init() self.c_credentials = NULL self.references = [] def __dealloc__(self): if self.c_credentials != NULL: grpc_server_credentials_release(self.c_credentials) + grpc_shutdown() cdef class CredentialsMetadataPlugin: @@ -90,6 +96,7 @@ cdef class CredentialsMetadataPlugin: successful). name (bytes): Plugin name. """ + grpc_init() if not callable(plugin_callback): raise ValueError('expected callable plugin_callback') self.plugin_callback = plugin_callback @@ -105,10 +112,14 @@ cdef class CredentialsMetadataPlugin: cpython.Py_INCREF(self) return result + def __dealloc__(self): + grpc_shutdown() + cdef class AuthMetadataContext: def __cinit__(self): + grpc_init() self.context.service_url = NULL self.context.method_name = NULL @@ -120,6 +131,9 @@ cdef class AuthMetadataContext: def method_name(self): return self.context.method_name + def __dealloc__(self): + grpc_shutdown() + cdef void plugin_get_metadata( void *state, grpc_auth_metadata_context context, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 54b3d00dfc..834a44123d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -176,12 +176,14 @@ cdef class Timespec: cdef class CallDetails: def __cinit__(self): + grpc_init() with nogil: grpc_call_details_init(&self.c_details) def __dealloc__(self): with nogil: grpc_call_details_destroy(&self.c_details) + grpc_shutdown() @property def method(self): @@ -232,6 +234,7 @@ cdef class Event: cdef class ByteBuffer: def __cinit__(self, bytes data): + grpc_init() if data is None: self.c_byte_buffer = NULL return @@ -288,6 +291,7 @@ cdef class ByteBuffer: def __dealloc__(self): if self.c_byte_buffer != NULL: grpc_byte_buffer_destroy(self.c_byte_buffer) + grpc_shutdown() cdef class SslPemKeyCertPair: @@ -319,6 +323,7 @@ cdef class ChannelArg: cdef class ChannelArgs: def __cinit__(self, args): + grpc_init() self.args = list(args) for arg in self.args: if not isinstance(arg, ChannelArg): @@ -333,6 +338,7 @@ cdef class ChannelArgs: def __dealloc__(self): with nogil: gpr_free(self.c_args.arguments) + grpc_shutdown() def __len__(self): # self.args is never stale; it's only updated from this file @@ -399,6 +405,7 @@ cdef class _MetadataIterator: cdef class Metadata: def __cinit__(self, metadata): + grpc_init() self.metadata = list(metadata) for metadatum in metadata: if not isinstance(metadatum, Metadatum): @@ -420,6 +427,7 @@ cdef class Metadata: # it'd be nice if that were documented somewhere...) # TODO(atash): document this in the C core grpc_metadata_array_destroy(&self.c_metadata_array) + grpc_shutdown() def __len__(self): return self.c_metadata_array.count @@ -437,6 +445,7 @@ cdef class Metadata: cdef class Operation: def __cinit__(self): + grpc_init() self.references = [] self._received_status_details = NULL self._received_status_details_capacity = 0 @@ -529,6 +538,7 @@ cdef class Operation: # This means that we need to clean up after receive_status_on_client. if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT: gpr_free(self._received_status_details) + grpc_shutdown() def operation_send_initial_metadata(Metadata metadata, int flags): cdef Operation op = Operation() @@ -645,6 +655,7 @@ cdef class _OperationsIterator: cdef class Operations: def __cinit__(self, operations): + grpc_init() self.operations = list(operations) # normalize iterable self.c_ops = NULL self.c_nops = 0 @@ -667,6 +678,7 @@ cdef class Operations: def __dealloc__(self): with nogil: gpr_free(self.c_ops) + grpc_shutdown() def __iter__(self): return _OperationsIterator(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 4f2d51b03f..ca2b831114 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -35,6 +35,7 @@ import time cdef class Server: def __cinit__(self, ChannelArgs arguments=None): + grpc_init() cdef grpc_channel_args *c_arguments = NULL self.references = [] self.registered_completion_queues = [] @@ -172,3 +173,4 @@ cdef class Server: while not self.is_shutdown: time.sleep(0) grpc_server_destroy(self.c_server) + grpc_shutdown() diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index a9520b9c0f..08089994a9 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -55,12 +55,8 @@ cdef extern from "Python.h": def _initialize(): - grpc_init() grpc_set_ssl_roots_override_callback( <grpc_ssl_roots_override_callback>ssl_roots_override_callback) - if Py_AtExit(grpc_shutdown) != 0: - raise ImportError('failed to register gRPC library shutdown callbacks') - _initialize() diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py index 727d628885..6074175a44 100644 --- a/src/python/grpcio_health_checking/setup.py +++ b/src/python/grpcio_health_checking/setup.py @@ -50,6 +50,7 @@ SETUP_REQUIRES = ( ) INSTALL_REQUIRES = ( + 'protobuf>=3.0.0', 'grpcio>=0.15.0', ) diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index 0afaf7dfa2..5c60eaca3a 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -63,7 +63,7 @@ INSTALL_REQUIRES = ( 'grpcio>=0.14.0', 'grpcio-health-checking>=0.14.0', 'oauth2client>=1.4.7', - 'protobuf>=3.0.0a3', + 'protobuf>=3.0.0', 'six>=1.10', ) diff --git a/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py index c753d6faf0..936c895bd2 100644 --- a/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py +++ b/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py @@ -29,9 +29,10 @@ """Insecure client-server interoperability as a unit test.""" +from concurrent import futures import unittest -from grpc.beta import implementations +import grpc from src.proto.grpc.testing import test_pb2 from tests.interop import _interop_test_case @@ -44,14 +45,13 @@ class InsecureInteropTest( unittest.TestCase): def setUp(self): - self.server = test_pb2.beta_create_TestService_server(methods.TestService()) + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + test_pb2.add_TestServiceServicer_to_server( + methods.TestService(), self.server) port = self.server.add_insecure_port('[::]:0') self.server.start() - self.stub = test_pb2.beta_create_TestService_stub( - implementations.insecure_channel('localhost', port)) - - def tearDown(self): - self.server.stop(0) + self.stub = test_pb2.TestServiceStub( + grpc.insecure_channel('localhost:{}'.format(port))) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/interop/_secure_interop_test.py b/src/python/grpcio_tests/tests/interop/_secure_interop_test.py index cb09f54a34..eaca553e1b 100644 --- a/src/python/grpcio_tests/tests/interop/_secure_interop_test.py +++ b/src/python/grpcio_tests/tests/interop/_secure_interop_test.py @@ -29,17 +29,16 @@ """Secure client-server interoperability as a unit test.""" +from concurrent import futures import unittest -from grpc.beta import implementations +import grpc from src.proto.grpc.testing import test_pb2 from tests.interop import _interop_test_case from tests.interop import methods from tests.interop import resources -from tests.unit.beta import test_utilities - _SERVER_HOST_OVERRIDE = 'foo.test.google.fr' @@ -48,19 +47,18 @@ class SecureInteropTest( unittest.TestCase): def setUp(self): - self.server = test_pb2.beta_create_TestService_server(methods.TestService()) + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + test_pb2.add_TestServiceServicer_to_server( + methods.TestService(), self.server) port = self.server.add_secure_port( - '[::]:0', implementations.ssl_server_credentials( + '[::]:0', grpc.ssl_server_credentials( [(resources.private_key(), resources.certificate_chain())])) self.server.start() - self.stub = test_pb2.beta_create_TestService_stub( - test_utilities.not_really_secure_channel( - 'localhost', port, implementations.ssl_channel_credentials( - resources.test_root_certificates()), - _SERVER_HOST_OVERRIDE)) - - def tearDown(self): - self.server.stop(0) + self.stub = test_pb2.TestServiceStub( + grpc.secure_channel( + 'localhost:{}'.format(port), + grpc.ssl_channel_credentials(resources.test_root_certificates()), + (('grpc.ssl_target_name_override', _SERVER_HOST_OVERRIDE,),))) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py index 8aa1ce30c1..9d61d18975 100644 --- a/src/python/grpcio_tests/tests/interop/client.py +++ b/src/python/grpcio_tests/tests/interop/client.py @@ -32,14 +32,12 @@ import argparse from oauth2client import client as oauth2client_client +import grpc from grpc.beta import implementations from src.proto.grpc.testing import test_pb2 from tests.interop import methods from tests.interop import resources -from tests.unit.beta import test_utilities - -_ONE_DAY_IN_SECONDS = 60 * 60 * 24 def _args(): @@ -66,41 +64,49 @@ def _args(): return parser.parse_args() +def _application_default_credentials(): + return oauth2client_client.GoogleCredentials.get_application_default() + + def _stub(args): + target = '{}:{}'.format(args.server_host, args.server_port) if args.test_case == 'oauth2_auth_token': - creds = oauth2client_client.GoogleCredentials.get_application_default() - scoped_creds = creds.create_scoped([args.oauth_scope]) - access_token = scoped_creds.get_access_token().access_token - call_creds = implementations.access_token_call_credentials(access_token) + google_credentials = _application_default_credentials() + scoped_credentials = google_credentials.create_scoped([args.oauth_scope]) + access_token = scoped_credentials.get_access_token().access_token + call_credentials = grpc.access_token_call_credentials(access_token) elif args.test_case == 'compute_engine_creds': - creds = oauth2client_client.GoogleCredentials.get_application_default() - scoped_creds = creds.create_scoped([args.oauth_scope]) - call_creds = implementations.google_call_credentials(scoped_creds) + google_credentials = _application_default_credentials() + scoped_credentials = google_credentials.create_scoped([args.oauth_scope]) + # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last + # remaining use of the Beta API. + call_credentials = implementations.google_call_credentials( + scoped_credentials) elif args.test_case == 'jwt_token_creds': - creds = oauth2client_client.GoogleCredentials.get_application_default() - call_creds = implementations.google_call_credentials(creds) + google_credentials = _application_default_credentials() + # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last + # remaining use of the Beta API. + call_credentials = implementations.google_call_credentials( + google_credentials) else: - call_creds = None + call_credentials = None if args.use_tls: if args.use_test_ca: root_certificates = resources.test_root_certificates() else: root_certificates = None # will load default roots. - channel_creds = implementations.ssl_channel_credentials(root_certificates) - if call_creds is not None: - channel_creds = implementations.composite_channel_credentials( - channel_creds, call_creds) + channel_credentials = grpc.ssl_channel_credentials(root_certificates) + if call_credentials is not None: + channel_credentials = grpc.composite_channel_credentials( + channel_credentials, call_credentials) - channel = test_utilities.not_really_secure_channel( - args.server_host, args.server_port, channel_creds, - args.server_host_override) - stub = test_pb2.beta_create_TestService_stub(channel) + channel = grpc.secure_channel( + target, channel_credentials, + (('grpc.ssl_target_name_override', args.server_host_override,),)) else: - channel = implementations.insecure_channel( - args.server_host, args.server_port) - stub = test_pb2.beta_create_TestService_stub(channel) - return stub + channel = grpc.insecure_channel(target) + return test_pb2.TestServiceStub(channel) def _test_case_from_arg(test_case_arg): diff --git a/src/python/grpcio_tests/tests/interop/methods.py b/src/python/grpcio_tests/tests/interop/methods.py index 97e6c9e27e..7edd75c56c 100644 --- a/src/python/grpcio_tests/tests/interop/methods.py +++ b/src/python/grpcio_tests/tests/interop/methods.py @@ -29,8 +29,6 @@ """Implementations of interoperability test methods.""" -from __future__ import print_function - import enum import json import os @@ -41,26 +39,21 @@ from oauth2client import client as oauth2client_client import grpc from grpc.beta import implementations -from grpc.beta import interfaces -from grpc.framework.common import cardinality -from grpc.framework.interfaces.face import face from src.proto.grpc.testing import empty_pb2 from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import test_pb2 -_TIMEOUT = 7 - -class TestService(test_pb2.BetaTestServiceServicer): +class TestService(test_pb2.TestServiceServicer): def EmptyCall(self, request, context): return empty_pb2.Empty() def UnaryCall(self, request, context): if request.HasField('response_status'): - context.code(request.response_status.code) - context.details(request.response_status.message) + context.set_code(request.response_status.code) + context.set_details(request.response_status.message) return messages_pb2.SimpleResponse( payload=messages_pb2.Payload( type=messages_pb2.COMPRESSABLE, @@ -68,8 +61,8 @@ class TestService(test_pb2.BetaTestServiceServicer): def StreamingOutputCall(self, request, context): if request.HasField('response_status'): - context.code(request.response_status.code) - context.details(request.response_status.message) + context.set_code(request.response_status.code) + context.set_details(request.response_status.message) for response_parameters in request.response_parameters: yield messages_pb2.StreamingOutputCallResponse( payload=messages_pb2.Payload( @@ -79,7 +72,7 @@ class TestService(test_pb2.BetaTestServiceServicer): def StreamingInputCall(self, request_iterator, context): aggregate_size = 0 for request in request_iterator: - if request.payload and request.payload.body: + if request.payload is not None and request.payload.body: aggregate_size += len(request.payload.body) return messages_pb2.StreamingInputCallResponse( aggregated_payload_size=aggregate_size) @@ -87,8 +80,8 @@ class TestService(test_pb2.BetaTestServiceServicer): def FullDuplexCall(self, request_iterator, context): for request in request_iterator: if request.HasField('response_status'): - context.code(request.response_status.code) - context.details(request.response_status.message) + context.set_code(request.response_status.code) + context.set_details(request.response_status.message) for response_parameters in request.response_parameters: yield messages_pb2.StreamingOutputCallResponse( payload=messages_pb2.Payload( @@ -101,83 +94,80 @@ class TestService(test_pb2.BetaTestServiceServicer): return self.FullDuplexCall(request_iterator, context) -def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope, - protocol_options=None): - with stub: - request = messages_pb2.SimpleRequest( - response_type=messages_pb2.COMPRESSABLE, response_size=314159, - payload=messages_pb2.Payload(body=b'\x00' * 271828), - fill_username=fill_username, fill_oauth_scope=fill_oauth_scope) - response_future = stub.UnaryCall.future(request, _TIMEOUT, - protocol_options=protocol_options) - response = response_future.result() - if response.payload.type is not messages_pb2.COMPRESSABLE: - raise ValueError( - 'response payload type is "%s"!' % type(response.payload.type)) - if len(response.payload.body) != 314159: - raise ValueError( - 'response body of incorrect size %d!' % len(response.payload.body)) +def _large_unary_common_behavior( + stub, fill_username, fill_oauth_scope, call_credentials): + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, response_size=314159, + payload=messages_pb2.Payload(body=b'\x00' * 271828), + fill_username=fill_username, fill_oauth_scope=fill_oauth_scope) + response_future = stub.UnaryCall.future( + request, credentials=call_credentials) + response = response_future.result() + if response.payload.type is not messages_pb2.COMPRESSABLE: + raise ValueError( + 'response payload type is "%s"!' % type(response.payload.type)) + elif len(response.payload.body) != 314159: + raise ValueError( + 'response body of incorrect size %d!' % len(response.payload.body)) + else: return response def _empty_unary(stub): - with stub: - response = stub.EmptyCall(empty_pb2.Empty(), _TIMEOUT) - if not isinstance(response, empty_pb2.Empty): - raise TypeError( - 'response is of type "%s", not empty_pb2.Empty!', type(response)) + response = stub.EmptyCall(empty_pb2.Empty()) + if not isinstance(response, empty_pb2.Empty): + raise TypeError( + 'response is of type "%s", not empty_pb2.Empty!', type(response)) def _large_unary(stub): - _large_unary_common_behavior(stub, False, False) + _large_unary_common_behavior(stub, False, False, None) def _client_streaming(stub): - with stub: - payload_body_sizes = (27182, 8, 1828, 45904) - payloads = ( - messages_pb2.Payload(body=b'\x00' * size) - for size in payload_body_sizes) - requests = ( - messages_pb2.StreamingInputCallRequest(payload=payload) - for payload in payloads) - response = stub.StreamingInputCall(requests, _TIMEOUT) - if response.aggregated_payload_size != 74922: - raise ValueError( - 'incorrect size %d!' % response.aggregated_payload_size) + payload_body_sizes = (27182, 8, 1828, 45904,) + payloads = ( + messages_pb2.Payload(body=b'\x00' * size) + for size in payload_body_sizes) + requests = ( + messages_pb2.StreamingInputCallRequest(payload=payload) + for payload in payloads) + response = stub.StreamingInputCall(requests) + if response.aggregated_payload_size != 74922: + raise ValueError( + 'incorrect size %d!' % response.aggregated_payload_size) def _server_streaming(stub): - sizes = (31415, 9, 2653, 58979) - - with stub: - request = messages_pb2.StreamingOutputCallRequest( - response_type=messages_pb2.COMPRESSABLE, - response_parameters=( - messages_pb2.ResponseParameters(size=sizes[0]), - messages_pb2.ResponseParameters(size=sizes[1]), - messages_pb2.ResponseParameters(size=sizes[2]), - messages_pb2.ResponseParameters(size=sizes[3]), - )) - response_iterator = stub.StreamingOutputCall(request, _TIMEOUT) - for index, response in enumerate(response_iterator): - if response.payload.type != messages_pb2.COMPRESSABLE: - raise ValueError( - 'response body of invalid type %s!' % response.payload.type) - if len(response.payload.body) != sizes[index]: - raise ValueError( - 'response body of invalid size %d!' % len(response.payload.body)) + sizes = (31415, 9, 2653, 58979,) + + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=( + messages_pb2.ResponseParameters(size=sizes[0]), + messages_pb2.ResponseParameters(size=sizes[1]), + messages_pb2.ResponseParameters(size=sizes[2]), + messages_pb2.ResponseParameters(size=sizes[3]), + ) + ) + response_iterator = stub.StreamingOutputCall(request) + for index, response in enumerate(response_iterator): + if response.payload.type != messages_pb2.COMPRESSABLE: + raise ValueError( + 'response body of invalid type %s!' % response.payload.type) + elif len(response.payload.body) != sizes[index]: + raise ValueError( + 'response body of invalid size %d!' % len(response.payload.body)) def _cancel_after_begin(stub): - with stub: - sizes = (27182, 8, 1828, 45904) - payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes] - requests = [messages_pb2.StreamingInputCallRequest(payload=payload) - for payload in payloads] - responses = stub.StreamingInputCall.future(requests, _TIMEOUT) - responses.cancel() - if not responses.cancelled(): - raise ValueError('expected call to be cancelled') + sizes = (27182, 8, 1828, 45904,) + payloads = (messages_pb2.Payload(body=b'\x00' * size) for size in sizes) + requests = (messages_pb2.StreamingInputCallRequest(payload=payload) + for payload in payloads) + response_future = stub.StreamingInputCall.future(requests) + response_future.cancel() + if not response_future.cancelled(): + raise ValueError('expected call to be cancelled') class _Pipe(object): @@ -220,18 +210,17 @@ class _Pipe(object): def _ping_pong(stub): - request_response_sizes = (31415, 9, 2653, 58979) - request_payload_sizes = (27182, 8, 1828, 45904) + request_response_sizes = (31415, 9, 2653, 58979,) + request_payload_sizes = (27182, 8, 1828, 45904,) - with stub, _Pipe() as pipe: - response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) - print('Starting ping-pong with response iterator %s' % response_iterator) + with _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe) for response_size, payload_size in zip( request_response_sizes, request_payload_sizes): request = messages_pb2.StreamingOutputCallRequest( response_type=messages_pb2.COMPRESSABLE, - response_parameters=(messages_pb2.ResponseParameters( - size=response_size),), + response_parameters=( + messages_pb2.ResponseParameters(size=response_size),), payload=messages_pb2.Payload(body=b'\x00' * payload_size)) pipe.add(request) response = next(response_iterator) @@ -244,17 +233,17 @@ def _ping_pong(stub): def _cancel_after_first_response(stub): - request_response_sizes = (31415, 9, 2653, 58979) - request_payload_sizes = (27182, 8, 1828, 45904) - with stub, _Pipe() as pipe: - response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) + request_response_sizes = (31415, 9, 2653, 58979,) + request_payload_sizes = (27182, 8, 1828, 45904,) + with _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe) response_size = request_response_sizes[0] payload_size = request_payload_sizes[0] request = messages_pb2.StreamingOutputCallRequest( response_type=messages_pb2.COMPRESSABLE, - response_parameters=(messages_pb2.ResponseParameters( - size=response_size),), + response_parameters=( + messages_pb2.ResponseParameters(size=response_size),), payload=messages_pb2.Payload(body=b'\x00' * payload_size)) pipe.add(request) response = next(response_iterator) @@ -264,16 +253,17 @@ def _cancel_after_first_response(stub): try: next(response_iterator) - except Exception: - pass + except grpc.RpcError as rpc_error: + if rpc_error.code() is not grpc.StatusCode.CANCELLED: + raise else: raise ValueError('expected call to be cancelled') def _timeout_on_sleeping_server(stub): request_payload_size = 27182 - with stub, _Pipe() as pipe: - response_iterator = stub.FullDuplexCall(pipe, 0.001) + with _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe, timeout=0.001) request = messages_pb2.StreamingOutputCallRequest( response_type=messages_pb2.COMPRESSABLE, @@ -282,15 +272,16 @@ def _timeout_on_sleeping_server(stub): time.sleep(0.1) try: next(response_iterator) - except face.ExpirationError: - pass + except grpc.RpcError as rpc_error: + if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED: + raise else: raise ValueError('expected call to exceed deadline') def _empty_stream(stub): - with stub, _Pipe() as pipe: - response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) + with _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe) pipe.close() try: next(response_iterator) @@ -300,65 +291,64 @@ def _empty_stream(stub): def _status_code_and_message(stub): - with stub: - message = 'test status message' - code = 2 - status = grpc.StatusCode.UNKNOWN # code = 2 - request = messages_pb2.SimpleRequest( - response_type=messages_pb2.COMPRESSABLE, - response_size=1, - payload=messages_pb2.Payload(body=b'\x00'), - response_status=messages_pb2.EchoStatus(code=code, message=message) - ) - response_future = stub.UnaryCall.future(request, _TIMEOUT) - if response_future.code() != status: - raise ValueError( - 'expected code %s, got %s' % (status, response_future.code())) - if response_future.details() != message: - raise ValueError( - 'expected message %s, got %s' % (message, response_future.details())) - - request = messages_pb2.StreamingOutputCallRequest( - response_type=messages_pb2.COMPRESSABLE, - response_parameters=( - messages_pb2.ResponseParameters(size=1),), - response_status=messages_pb2.EchoStatus(code=code, message=message)) - response_iterator = stub.StreamingOutputCall(request, _TIMEOUT) - if response_future.code() != status: - raise ValueError( - 'expected code %s, got %s' % (status, response_iterator.code())) - if response_future.details() != message: - raise ValueError( - 'expected message %s, got %s' % (message, response_iterator.details())) + message = 'test status message' + code = 2 + status = grpc.StatusCode.UNKNOWN # code = 2 + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, + response_size=1, + payload=messages_pb2.Payload(body=b'\x00'), + response_status=messages_pb2.EchoStatus(code=code, message=message) + ) + response_future = stub.UnaryCall.future(request) + if response_future.code() != status: + raise ValueError( + 'expected code %s, got %s' % (status, response_future.code())) + elif response_future.details() != message: + raise ValueError( + 'expected message %s, got %s' % (message, response_future.details())) + + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=( + messages_pb2.ResponseParameters(size=1),), + response_status=messages_pb2.EchoStatus(code=code, message=message)) + response_iterator = stub.StreamingOutputCall(request) + if response_future.code() != status: + raise ValueError( + 'expected code %s, got %s' % (status, response_iterator.code())) + elif response_future.details() != message: + raise ValueError( + 'expected message %s, got %s' % (message, response_iterator.details())) def _compute_engine_creds(stub, args): - response = _large_unary_common_behavior(stub, True, True) + response = _large_unary_common_behavior(stub, True, True, None) if args.default_service_account != response.username: raise ValueError( - 'expected username %s, got %s' % (args.default_service_account, - response.username)) + 'expected username %s, got %s' % ( + args.default_service_account, response.username)) def _oauth2_auth_token(stub, args): json_key_filename = os.environ[ oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] - response = _large_unary_common_behavior(stub, True, True) + response = _large_unary_common_behavior(stub, True, True, None) if wanted_email != response.username: raise ValueError( 'expected username %s, got %s' % (wanted_email, response.username)) if args.oauth_scope.find(response.oauth_scope) == -1: raise ValueError( - 'expected to find oauth scope "%s" in received "%s"' % - (response.oauth_scope, args.oauth_scope)) + 'expected to find oauth scope "{}" in received "{}"'.format( + response.oauth_scope, args.oauth_scope)) def _jwt_token_creds(stub, args): json_key_filename = os.environ[ oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] - response = _large_unary_common_behavior(stub, True, False) + response = _large_unary_common_behavior(stub, True, False, None) if wanted_email != response.username: raise ValueError( 'expected username %s, got %s' % (wanted_email, response.username)) @@ -370,11 +360,11 @@ def _per_rpc_creds(stub, args): wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] credentials = oauth2client_client.GoogleCredentials.get_application_default() scoped_credentials = credentials.create_scoped([args.oauth_scope]) - call_creds = implementations.google_call_credentials(scoped_credentials) - options = interfaces.grpc_call_options(disable_compression=False, - credentials=call_creds) - response = _large_unary_common_behavior(stub, True, False, - protocol_options=options) + # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last + # remaining use of the Beta API. + call_credentials = implementations.google_call_credentials( + scoped_credentials) + response = _large_unary_common_behavior(stub, True, False, call_credentials) if wanted_email != response.username: raise ValueError( 'expected username %s, got %s' % (wanted_email, response.username)) diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py index ab2c3c708f..1ae83bc57d 100644 --- a/src/python/grpcio_tests/tests/interop/server.py +++ b/src/python/grpcio_tests/tests/interop/server.py @@ -30,10 +30,11 @@ """The Python implementation of the GRPC interoperability test server.""" import argparse +from concurrent import futures import logging import time -from grpc.beta import implementations +import grpc from src.proto.grpc.testing import test_pb2 from tests.interop import methods @@ -51,12 +52,13 @@ def serve(): default=False, type=resources.parse_bool) args = parser.parse_args() - server = test_pb2.beta_create_TestService_server(methods.TestService()) + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + test_pb2.add_TestServiceServicer_to_server(methods.TestService(), server) if args.use_tls: private_key = resources.private_key() certificate_chain = resources.certificate_chain() - credentials = implementations.ssl_server_credentials( - [(private_key, certificate_chain)]) + credentials = grpc.ssl_server_credentials( + ((private_key, certificate_chain),)) server.add_secure_port('[::]:{}'.format(args.port), credentials) else: server.add_insecure_port('[::]:{}'.format(args.port)) @@ -68,7 +70,7 @@ def serve(): time.sleep(_ONE_DAY_IN_SECONDS) except BaseException as e: logging.info('Caught exception "%s"; stopping server...', e) - server.stop(0) + server.stop(None) logging.info('Server stopped; exiting.') if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py index 0de2532cd8..975f33b4c1 100644 --- a/src/python/grpcio_tests/tests/stress/client.py +++ b/src/python/grpcio_tests/tests/stress/client.py @@ -30,9 +30,10 @@ """Entry point for running stress tests.""" import argparse +from concurrent import futures import threading -from grpc.beta import implementations +import grpc from six.moves import queue from src.proto.grpc.testing import metrics_pb2 from src.proto.grpc.testing import test_pb2 @@ -92,24 +93,24 @@ def _parse_weighted_test_cases(test_case_args): def run_test(args): test_cases = _parse_weighted_test_cases(args.test_cases) - test_servers = args.server_addresses.split(',') + test_server_targets = args.server_addresses.split(',') # Propagate any client exceptions with a queue exception_queue = queue.Queue() stop_event = threading.Event() hist = histogram.Histogram(1, 1) runners = [] - server = metrics_pb2.beta_create_MetricsService_server( - metrics_server.MetricsServer(hist)) + server = grpc.server(futures.ThreadPoolExecutor(max_workers=25)) + metrics_pb2.add_MetricsServiceServicer_to_server( + metrics_server.MetricsServer(hist), server) server.add_insecure_port('[::]:{}'.format(args.metrics_port)) server.start() - for test_server in test_servers: - host, port = test_server.split(':', 1) + for test_server_target in test_server_targets: for _ in xrange(args.num_channels_per_server): - channel = implementations.insecure_channel(host, int(port)) + channel = grpc.insecure_channel(test_server_target) for _ in xrange(args.num_stubs_per_channel): - stub = test_pb2.beta_create_TestService_stub(channel) + stub = test_pb2.TestServiceStub(channel) runner = test_runner.TestRunner(stub, test_cases, hist, exception_queue, stop_event) runners.append(runner) @@ -128,8 +129,8 @@ def run_test(args): stop_event.set() for runner in runners: runner.join() - runner = None - server.stop(0) + runner = None + server.stop(None) if __name__ == '__main__': run_test(_args()) diff --git a/src/python/grpcio_tests/tests/stress/metrics_server.py b/src/python/grpcio_tests/tests/stress/metrics_server.py index b994e4643e..33dd1d6f2a 100644 --- a/src/python/grpcio_tests/tests/stress/metrics_server.py +++ b/src/python/grpcio_tests/tests/stress/metrics_server.py @@ -36,7 +36,7 @@ from src.proto.grpc.testing import metrics_pb2 GAUGE_NAME = 'python_overall_qps' -class MetricsServer(metrics_pb2.BetaMetricsServiceServicer): +class MetricsServer(metrics_pb2.MetricsServiceServicer): def __init__(self, histogram): self._start_time = time.time() diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index f9a8e2401b..2f50263730 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -30,6 +30,7 @@ import time import threading import unittest +import platform from grpc._cython import cygrpc from tests.unit._cython import test_utilities @@ -113,6 +114,9 @@ class TypeSmokeTest(unittest.TestCase): lambda ignored_a, ignored_b: None, b'') del plugin + @unittest.skipIf( + platform.python_implementation() == "PyPy", + 'TODO(issue 7672): figure out why this fails on PyPy') def testCallCredentialsFromPluginUpDown(self): plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback, b'') call_credentials = cygrpc.call_credentials_metadata_plugin(plugin) |