diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/interop/methods.py')
-rw-r--r-- | src/python/grpcio_tests/tests/interop/methods.py | 759 |
1 files changed, 389 insertions, 370 deletions
diff --git a/src/python/grpcio_tests/tests/interop/methods.py b/src/python/grpcio_tests/tests/interop/methods.py index 9038ae5751..e1f8722168 100644 --- a/src/python/grpcio_tests/tests/interop/methods.py +++ b/src/python/grpcio_tests/tests/interop/methods.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Implementations of interoperability test methods.""" import enum @@ -46,463 +45,483 @@ from src.proto.grpc.testing import test_pb2 _INITIAL_METADATA_KEY = "x-grpc-test-echo-initial" _TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin" + def _maybe_echo_metadata(servicer_context): - """Copies metadata from request to response if it is present.""" - invocation_metadata = dict(servicer_context.invocation_metadata()) - if _INITIAL_METADATA_KEY in invocation_metadata: - initial_metadatum = ( - _INITIAL_METADATA_KEY, invocation_metadata[_INITIAL_METADATA_KEY]) - servicer_context.send_initial_metadata((initial_metadatum,)) - if _TRAILING_METADATA_KEY in invocation_metadata: - trailing_metadatum = ( - _TRAILING_METADATA_KEY, invocation_metadata[_TRAILING_METADATA_KEY]) - servicer_context.set_trailing_metadata((trailing_metadatum,)) + """Copies metadata from request to response if it is present.""" + invocation_metadata = dict(servicer_context.invocation_metadata()) + if _INITIAL_METADATA_KEY in invocation_metadata: + initial_metadatum = (_INITIAL_METADATA_KEY, + invocation_metadata[_INITIAL_METADATA_KEY]) + servicer_context.send_initial_metadata((initial_metadatum,)) + if _TRAILING_METADATA_KEY in invocation_metadata: + trailing_metadatum = (_TRAILING_METADATA_KEY, + invocation_metadata[_TRAILING_METADATA_KEY]) + servicer_context.set_trailing_metadata((trailing_metadatum,)) + def _maybe_echo_status_and_message(request, servicer_context): - """Sets the response context code and details if the request asks for them""" - if request.HasField('response_status'): - servicer_context.set_code(request.response_status.code) - servicer_context.set_details(request.response_status.message) + """Sets the response context code and details if the request asks for them""" + if request.HasField('response_status'): + servicer_context.set_code(request.response_status.code) + servicer_context.set_details(request.response_status.message) + class TestService(test_pb2.TestServiceServicer): - def EmptyCall(self, request, context): - _maybe_echo_metadata(context) - return empty_pb2.Empty() + def EmptyCall(self, request, context): + _maybe_echo_metadata(context) + return empty_pb2.Empty() - def UnaryCall(self, request, context): - _maybe_echo_metadata(context) - _maybe_echo_status_and_message(request, context) - return messages_pb2.SimpleResponse( - payload=messages_pb2.Payload( + def UnaryCall(self, request, context): + _maybe_echo_metadata(context) + _maybe_echo_status_and_message(request, context) + return messages_pb2.SimpleResponse(payload=messages_pb2.Payload( type=messages_pb2.COMPRESSABLE, body=b'\x00' * request.response_size)) - def StreamingOutputCall(self, request, context): - _maybe_echo_status_and_message(request, context) - for response_parameters in request.response_parameters: - yield messages_pb2.StreamingOutputCallResponse( - payload=messages_pb2.Payload( - type=request.response_type, - body=b'\x00' * response_parameters.size)) - - def StreamingInputCall(self, request_iterator, context): - aggregate_size = 0 - for request in request_iterator: - if request.payload is not None and request.payload.body: - aggregate_size += len(request.payload.body) - return messages_pb2.StreamingInputCallResponse( - aggregated_payload_size=aggregate_size) - - def FullDuplexCall(self, request_iterator, context): - _maybe_echo_metadata(context) - for request in request_iterator: - _maybe_echo_status_and_message(request, context) - for response_parameters in request.response_parameters: - yield messages_pb2.StreamingOutputCallResponse( - payload=messages_pb2.Payload( - type=request.payload.type, - body=b'\x00' * response_parameters.size)) - - # NOTE(nathaniel): Apparently this is the same as the full-duplex call? - # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)... - def HalfDuplexCall(self, request_iterator, context): - return self.FullDuplexCall(request_iterator, context) + def StreamingOutputCall(self, request, context): + _maybe_echo_status_and_message(request, context) + for response_parameters in request.response_parameters: + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload( + type=request.response_type, + body=b'\x00' * response_parameters.size)) + + def StreamingInputCall(self, request_iterator, context): + aggregate_size = 0 + for request in request_iterator: + if request.payload is not None and request.payload.body: + aggregate_size += len(request.payload.body) + return messages_pb2.StreamingInputCallResponse( + aggregated_payload_size=aggregate_size) + + def FullDuplexCall(self, request_iterator, context): + _maybe_echo_metadata(context) + for request in request_iterator: + _maybe_echo_status_and_message(request, context) + for response_parameters in request.response_parameters: + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload( + type=request.payload.type, + body=b'\x00' * response_parameters.size)) + + # NOTE(nathaniel): Apparently this is the same as the full-duplex call? + # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)... + def HalfDuplexCall(self, request_iterator, context): + return self.FullDuplexCall(request_iterator, context) def _expect_status_code(call, expected_code): - if call.code() != expected_code: - raise ValueError( - 'expected code %s, got %s' % (expected_code, call.code())) + if call.code() != expected_code: + raise ValueError('expected code %s, got %s' % + (expected_code, call.code())) def _expect_status_details(call, expected_details): - if call.details() != expected_details: - raise ValueError( - 'expected message %s, got %s' % (expected_details, call.details())) + if call.details() != expected_details: + raise ValueError('expected message %s, got %s' % + (expected_details, call.details())) def _validate_status_code_and_details(call, expected_code, expected_details): - _expect_status_code(call, expected_code) - _expect_status_details(call, expected_details) + _expect_status_code(call, expected_code) + _expect_status_details(call, expected_details) def _validate_payload_type_and_length(response, expected_type, expected_length): - if response.payload.type is not expected_type: - raise ValueError( - 'expected payload type %s, got %s' % - (expected_type, type(response.payload.type))) - elif len(response.payload.body) != expected_length: - raise ValueError( - 'expected payload body size %d, got %d' % - (expected_length, len(response.payload.body))) - - -def _large_unary_common_behavior( - stub, fill_username, fill_oauth_scope, call_credentials): - size = 314159 - request = messages_pb2.SimpleRequest( - response_type=messages_pb2.COMPRESSABLE, response_size=size, - payload=messages_pb2.Payload(body=b'\x00' * 271828), - fill_username=fill_username, fill_oauth_scope=fill_oauth_scope) - response_future = stub.UnaryCall.future( - request, credentials=call_credentials) - response = response_future.result() - _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) - return response + if response.payload.type is not expected_type: + raise ValueError('expected payload type %s, got %s' % + (expected_type, type(response.payload.type))) + elif len(response.payload.body) != expected_length: + raise ValueError('expected payload body size %d, got %d' % + (expected_length, len(response.payload.body))) + + +def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope, + call_credentials): + size = 314159 + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, + response_size=size, + payload=messages_pb2.Payload(body=b'\x00' * 271828), + fill_username=fill_username, + fill_oauth_scope=fill_oauth_scope) + response_future = stub.UnaryCall.future( + request, credentials=call_credentials) + response = response_future.result() + _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) + return response def _empty_unary(stub): - response = stub.EmptyCall(empty_pb2.Empty()) - if not isinstance(response, empty_pb2.Empty): - raise TypeError( - 'response is of type "%s", not empty_pb2.Empty!', type(response)) + response = stub.EmptyCall(empty_pb2.Empty()) + if not isinstance(response, empty_pb2.Empty): + raise TypeError('response is of type "%s", not empty_pb2.Empty!', + type(response)) def _large_unary(stub): - _large_unary_common_behavior(stub, False, False, None) + _large_unary_common_behavior(stub, False, False, None) def _client_streaming(stub): - payload_body_sizes = (27182, 8, 1828, 45904,) - payloads = ( - messages_pb2.Payload(body=b'\x00' * size) - for size in payload_body_sizes) - requests = ( - messages_pb2.StreamingInputCallRequest(payload=payload) - for payload in payloads) - response = stub.StreamingInputCall(requests) - if response.aggregated_payload_size != 74922: - raise ValueError( - 'incorrect size %d!' % response.aggregated_payload_size) + payload_body_sizes = ( + 27182, + 8, + 1828, + 45904,) + payloads = (messages_pb2.Payload(body=b'\x00' * size) + for size in payload_body_sizes) + requests = (messages_pb2.StreamingInputCallRequest(payload=payload) + for payload in payloads) + response = stub.StreamingInputCall(requests) + if response.aggregated_payload_size != 74922: + raise ValueError('incorrect size %d!' % + response.aggregated_payload_size) def _server_streaming(stub): - sizes = (31415, 9, 2653, 58979,) - - request = messages_pb2.StreamingOutputCallRequest( - response_type=messages_pb2.COMPRESSABLE, - response_parameters=( - messages_pb2.ResponseParameters(size=sizes[0]), - messages_pb2.ResponseParameters(size=sizes[1]), - messages_pb2.ResponseParameters(size=sizes[2]), - messages_pb2.ResponseParameters(size=sizes[3]), - ) - ) - response_iterator = stub.StreamingOutputCall(request) - for index, response in enumerate(response_iterator): - _validate_payload_type_and_length( - response, messages_pb2.COMPRESSABLE, sizes[index]) + sizes = ( + 31415, + 9, + 2653, + 58979,) + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=( + messages_pb2.ResponseParameters(size=sizes[0]), + messages_pb2.ResponseParameters(size=sizes[1]), + messages_pb2.ResponseParameters(size=sizes[2]), + messages_pb2.ResponseParameters(size=sizes[3]),)) + response_iterator = stub.StreamingOutputCall(request) + for index, response in enumerate(response_iterator): + _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, + sizes[index]) class _Pipe(object): - def __init__(self): - self._condition = threading.Condition() - self._values = [] - self._open = True + def __init__(self): + self._condition = threading.Condition() + self._values = [] + self._open = True - def __iter__(self): - return self + def __iter__(self): + return self - def __next__(self): - return self.next() + def __next__(self): + return self.next() - def next(self): - with self._condition: - while not self._values and self._open: - self._condition.wait() - if self._values: - return self._values.pop(0) - else: - raise StopIteration() + def next(self): + with self._condition: + while not self._values and self._open: + self._condition.wait() + if self._values: + return self._values.pop(0) + else: + raise StopIteration() - def add(self, value): - with self._condition: - self._values.append(value) - self._condition.notify() + def add(self, value): + with self._condition: + self._values.append(value) + self._condition.notify() - def close(self): - with self._condition: - self._open = False - self._condition.notify() + def close(self): + with self._condition: + self._open = False + self._condition.notify() - def __enter__(self): - return self + def __enter__(self): + return self - def __exit__(self, type, value, traceback): - self.close() + def __exit__(self, type, value, traceback): + self.close() def _ping_pong(stub): - request_response_sizes = (31415, 9, 2653, 58979,) - request_payload_sizes = (27182, 8, 1828, 45904,) - - with _Pipe() as pipe: - response_iterator = stub.FullDuplexCall(pipe) - for response_size, payload_size in zip( - request_response_sizes, request_payload_sizes): - request = messages_pb2.StreamingOutputCallRequest( - response_type=messages_pb2.COMPRESSABLE, - response_parameters=( - messages_pb2.ResponseParameters(size=response_size),), - payload=messages_pb2.Payload(body=b'\x00' * payload_size)) - pipe.add(request) - response = next(response_iterator) - _validate_payload_type_and_length( - response, messages_pb2.COMPRESSABLE, response_size) + request_response_sizes = ( + 31415, + 9, + 2653, + 58979,) + request_payload_sizes = ( + 27182, + 8, + 1828, + 45904,) + + with _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe) + for response_size, payload_size in zip(request_response_sizes, + request_payload_sizes): + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=( + messages_pb2.ResponseParameters(size=response_size),), + payload=messages_pb2.Payload(body=b'\x00' * payload_size)) + pipe.add(request) + response = next(response_iterator) + _validate_payload_type_and_length( + response, messages_pb2.COMPRESSABLE, response_size) def _cancel_after_begin(stub): - with _Pipe() as pipe: - response_future = stub.StreamingInputCall.future(pipe) - response_future.cancel() - if not response_future.cancelled(): - raise ValueError('expected cancelled method to return True') - if response_future.code() is not grpc.StatusCode.CANCELLED: - raise ValueError('expected status code CANCELLED') + with _Pipe() as pipe: + response_future = stub.StreamingInputCall.future(pipe) + response_future.cancel() + if not response_future.cancelled(): + raise ValueError('expected cancelled method to return True') + if response_future.code() is not grpc.StatusCode.CANCELLED: + raise ValueError('expected status code CANCELLED') def _cancel_after_first_response(stub): - request_response_sizes = (31415, 9, 2653, 58979,) - request_payload_sizes = (27182, 8, 1828, 45904,) - with _Pipe() as pipe: - response_iterator = stub.FullDuplexCall(pipe) - - response_size = request_response_sizes[0] - payload_size = request_payload_sizes[0] - request = messages_pb2.StreamingOutputCallRequest( - response_type=messages_pb2.COMPRESSABLE, - response_parameters=( - messages_pb2.ResponseParameters(size=response_size),), - payload=messages_pb2.Payload(body=b'\x00' * payload_size)) - pipe.add(request) - response = next(response_iterator) - # We test the contents of `response` in the Ping Pong test - don't check - # them here. - response_iterator.cancel() - - try: - next(response_iterator) - except grpc.RpcError as rpc_error: - if rpc_error.code() is not grpc.StatusCode.CANCELLED: - raise - else: - raise ValueError('expected call to be cancelled') + request_response_sizes = ( + 31415, + 9, + 2653, + 58979,) + request_payload_sizes = ( + 27182, + 8, + 1828, + 45904,) + with _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe) + + response_size = request_response_sizes[0] + payload_size = request_payload_sizes[0] + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=( + messages_pb2.ResponseParameters(size=response_size),), + payload=messages_pb2.Payload(body=b'\x00' * payload_size)) + pipe.add(request) + response = next(response_iterator) + # We test the contents of `response` in the Ping Pong test - don't check + # them here. + response_iterator.cancel() + + try: + next(response_iterator) + except grpc.RpcError as rpc_error: + if rpc_error.code() is not grpc.StatusCode.CANCELLED: + raise + else: + raise ValueError('expected call to be cancelled') def _timeout_on_sleeping_server(stub): - request_payload_size = 27182 - with _Pipe() as pipe: - response_iterator = stub.FullDuplexCall(pipe, timeout=0.001) - - request = messages_pb2.StreamingOutputCallRequest( - response_type=messages_pb2.COMPRESSABLE, - payload=messages_pb2.Payload(body=b'\x00' * request_payload_size)) - pipe.add(request) - try: - next(response_iterator) - except grpc.RpcError as rpc_error: - if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED: - raise - else: - raise ValueError('expected call to exceed deadline') + request_payload_size = 27182 + with _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe, timeout=0.001) + + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + payload=messages_pb2.Payload(body=b'\x00' * request_payload_size)) + pipe.add(request) + try: + next(response_iterator) + except grpc.RpcError as rpc_error: + if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED: + raise + else: + raise ValueError('expected call to exceed deadline') def _empty_stream(stub): - with _Pipe() as pipe: - response_iterator = stub.FullDuplexCall(pipe) - pipe.close() - try: - next(response_iterator) - raise ValueError('expected exactly 0 responses') - except StopIteration: - pass + with _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe) + pipe.close() + try: + next(response_iterator) + raise ValueError('expected exactly 0 responses') + except StopIteration: + pass def _status_code_and_message(stub): - details = 'test status message' - code = 2 - status = grpc.StatusCode.UNKNOWN # code = 2 - - # Test with a UnaryCall - request = messages_pb2.SimpleRequest( - response_type=messages_pb2.COMPRESSABLE, - response_size=1, - payload=messages_pb2.Payload(body=b'\x00'), - response_status=messages_pb2.EchoStatus(code=code, message=details) - ) - response_future = stub.UnaryCall.future(request) - _validate_status_code_and_details(response_future, status, details) - - # Test with a FullDuplexCall - with _Pipe() as pipe: - response_iterator = stub.FullDuplexCall(pipe) - request = messages_pb2.StreamingOutputCallRequest( + details = 'test status message' + code = 2 + status = grpc.StatusCode.UNKNOWN # code = 2 + + # Test with a UnaryCall + request = messages_pb2.SimpleRequest( response_type=messages_pb2.COMPRESSABLE, - response_parameters=( - messages_pb2.ResponseParameters(size=1),), + response_size=1, payload=messages_pb2.Payload(body=b'\x00'), - response_status=messages_pb2.EchoStatus(code=code, message=details)) - pipe.add(request) # sends the initial request. - # Dropping out of with block closes the pipe - _validate_status_code_and_details(response_iterator, status, details) + response_status=messages_pb2.EchoStatus( + code=code, message=details)) + response_future = stub.UnaryCall.future(request) + _validate_status_code_and_details(response_future, status, details) + + # Test with a FullDuplexCall + with _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe) + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=(messages_pb2.ResponseParameters(size=1),), + payload=messages_pb2.Payload(body=b'\x00'), + response_status=messages_pb2.EchoStatus( + code=code, message=details)) + pipe.add(request) # sends the initial request. + # Dropping out of with block closes the pipe + _validate_status_code_and_details(response_iterator, status, details) def _unimplemented_method(test_service_stub): - response_future = ( - test_service_stub.UnimplementedCall.future(empty_pb2.Empty())) - _expect_status_code(response_future, grpc.StatusCode.UNIMPLEMENTED) + response_future = ( + test_service_stub.UnimplementedCall.future(empty_pb2.Empty())) + _expect_status_code(response_future, grpc.StatusCode.UNIMPLEMENTED) def _unimplemented_service(unimplemented_service_stub): - response_future = ( - unimplemented_service_stub.UnimplementedCall.future(empty_pb2.Empty())) - _expect_status_code(response_future, grpc.StatusCode.UNIMPLEMENTED) + response_future = ( + unimplemented_service_stub.UnimplementedCall.future(empty_pb2.Empty())) + _expect_status_code(response_future, grpc.StatusCode.UNIMPLEMENTED) def _custom_metadata(stub): - initial_metadata_value = "test_initial_metadata_value" - trailing_metadata_value = "\x0a\x0b\x0a\x0b\x0a\x0b" - metadata = ( - (_INITIAL_METADATA_KEY, initial_metadata_value), - (_TRAILING_METADATA_KEY, trailing_metadata_value)) - - def _validate_metadata(response): - initial_metadata = dict(response.initial_metadata()) - if initial_metadata[_INITIAL_METADATA_KEY] != initial_metadata_value: - raise ValueError( - 'expected initial metadata %s, got %s' % ( - initial_metadata_value, initial_metadata[_INITIAL_METADATA_KEY])) - trailing_metadata = dict(response.trailing_metadata()) - if trailing_metadata[_TRAILING_METADATA_KEY] != trailing_metadata_value: - raise ValueError( - 'expected trailing metadata %s, got %s' % ( - trailing_metadata_value, initial_metadata[_TRAILING_METADATA_KEY])) - - # Testing with UnaryCall - request = messages_pb2.SimpleRequest( - response_type=messages_pb2.COMPRESSABLE, - response_size=1, - payload=messages_pb2.Payload(body=b'\x00')) - response_future = stub.UnaryCall.future(request, metadata=metadata) - _validate_metadata(response_future) - - # Testing with FullDuplexCall - with _Pipe() as pipe: - response_iterator = stub.FullDuplexCall(pipe, metadata=metadata) - request = messages_pb2.StreamingOutputCallRequest( + initial_metadata_value = "test_initial_metadata_value" + trailing_metadata_value = "\x0a\x0b\x0a\x0b\x0a\x0b" + metadata = ((_INITIAL_METADATA_KEY, initial_metadata_value), + (_TRAILING_METADATA_KEY, trailing_metadata_value)) + + def _validate_metadata(response): + initial_metadata = dict(response.initial_metadata()) + if initial_metadata[_INITIAL_METADATA_KEY] != initial_metadata_value: + raise ValueError('expected initial metadata %s, got %s' % + (initial_metadata_value, + initial_metadata[_INITIAL_METADATA_KEY])) + trailing_metadata = dict(response.trailing_metadata()) + if trailing_metadata[_TRAILING_METADATA_KEY] != trailing_metadata_value: + raise ValueError('expected trailing metadata %s, got %s' % + (trailing_metadata_value, + initial_metadata[_TRAILING_METADATA_KEY])) + + # Testing with UnaryCall + request = messages_pb2.SimpleRequest( response_type=messages_pb2.COMPRESSABLE, - response_parameters=( - messages_pb2.ResponseParameters(size=1),)) - pipe.add(request) # Sends the request - next(response_iterator) # Causes server to send trailing metadata - # Dropping out of the with block closes the pipe - _validate_metadata(response_iterator) + response_size=1, + payload=messages_pb2.Payload(body=b'\x00')) + response_future = stub.UnaryCall.future(request, metadata=metadata) + _validate_metadata(response_future) + + # Testing with FullDuplexCall + with _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe, metadata=metadata) + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=(messages_pb2.ResponseParameters(size=1),)) + pipe.add(request) # Sends the request + next(response_iterator) # Causes server to send trailing metadata + # Dropping out of the with block closes the pipe + _validate_metadata(response_iterator) + def _compute_engine_creds(stub, args): - response = _large_unary_common_behavior(stub, True, True, None) - if args.default_service_account != response.username: - raise ValueError( - 'expected username %s, got %s' % ( - args.default_service_account, response.username)) + response = _large_unary_common_behavior(stub, True, True, None) + if args.default_service_account != response.username: + raise ValueError('expected username %s, got %s' % + (args.default_service_account, response.username)) def _oauth2_auth_token(stub, args): - json_key_filename = os.environ[ - oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] - wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] - response = _large_unary_common_behavior(stub, True, True, None) - if wanted_email != response.username: - raise ValueError( - 'expected username %s, got %s' % (wanted_email, response.username)) - if args.oauth_scope.find(response.oauth_scope) == -1: - raise ValueError( - 'expected to find oauth scope "{}" in received "{}"'.format( - response.oauth_scope, args.oauth_scope)) + json_key_filename = os.environ[ + oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] + wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] + response = _large_unary_common_behavior(stub, True, True, None) + if wanted_email != response.username: + raise ValueError('expected username %s, got %s' % + (wanted_email, response.username)) + if args.oauth_scope.find(response.oauth_scope) == -1: + raise ValueError('expected to find oauth scope "{}" in received "{}"'. + format(response.oauth_scope, args.oauth_scope)) def _jwt_token_creds(stub, args): - json_key_filename = os.environ[ - oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] - wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] - response = _large_unary_common_behavior(stub, True, False, None) - if wanted_email != response.username: - raise ValueError( - 'expected username %s, got %s' % (wanted_email, response.username)) + json_key_filename = os.environ[ + oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] + wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] + response = _large_unary_common_behavior(stub, True, False, None) + if wanted_email != response.username: + raise ValueError('expected username %s, got %s' % + (wanted_email, response.username)) def _per_rpc_creds(stub, args): - json_key_filename = os.environ[ - oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] - wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] - credentials = oauth2client_client.GoogleCredentials.get_application_default() - scoped_credentials = credentials.create_scoped([args.oauth_scope]) - # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last - # remaining use of the Beta API. - call_credentials = implementations.google_call_credentials( - scoped_credentials) - response = _large_unary_common_behavior(stub, True, False, call_credentials) - if wanted_email != response.username: - raise ValueError( - 'expected username %s, got %s' % (wanted_email, response.username)) + json_key_filename = os.environ[ + oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] + wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] + credentials = oauth2client_client.GoogleCredentials.get_application_default( + ) + scoped_credentials = credentials.create_scoped([args.oauth_scope]) + # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last + # remaining use of the Beta API. + call_credentials = implementations.google_call_credentials( + scoped_credentials) + response = _large_unary_common_behavior(stub, True, False, call_credentials) + if wanted_email != response.username: + raise ValueError('expected username %s, got %s' % + (wanted_email, response.username)) @enum.unique class TestCase(enum.Enum): - EMPTY_UNARY = 'empty_unary' - LARGE_UNARY = 'large_unary' - SERVER_STREAMING = 'server_streaming' - CLIENT_STREAMING = 'client_streaming' - PING_PONG = 'ping_pong' - CANCEL_AFTER_BEGIN = 'cancel_after_begin' - CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' - EMPTY_STREAM = 'empty_stream' - STATUS_CODE_AND_MESSAGE = 'status_code_and_message' - UNIMPLEMENTED_METHOD = 'unimplemented_method' - UNIMPLEMENTED_SERVICE = 'unimplemented_service' - CUSTOM_METADATA = "custom_metadata" - COMPUTE_ENGINE_CREDS = 'compute_engine_creds' - OAUTH2_AUTH_TOKEN = 'oauth2_auth_token' - JWT_TOKEN_CREDS = 'jwt_token_creds' - PER_RPC_CREDS = 'per_rpc_creds' - TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' - - def test_interoperability(self, stub, args): - if self is TestCase.EMPTY_UNARY: - _empty_unary(stub) - elif self is TestCase.LARGE_UNARY: - _large_unary(stub) - elif self is TestCase.SERVER_STREAMING: - _server_streaming(stub) - elif self is TestCase.CLIENT_STREAMING: - _client_streaming(stub) - elif self is TestCase.PING_PONG: - _ping_pong(stub) - elif self is TestCase.CANCEL_AFTER_BEGIN: - _cancel_after_begin(stub) - elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE: - _cancel_after_first_response(stub) - elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER: - _timeout_on_sleeping_server(stub) - elif self is TestCase.EMPTY_STREAM: - _empty_stream(stub) - elif self is TestCase.STATUS_CODE_AND_MESSAGE: - _status_code_and_message(stub) - elif self is TestCase.UNIMPLEMENTED_METHOD: - _unimplemented_method(stub) - elif self is TestCase.UNIMPLEMENTED_SERVICE: - _unimplemented_service(stub) - elif self is TestCase.CUSTOM_METADATA: - _custom_metadata(stub) - elif self is TestCase.COMPUTE_ENGINE_CREDS: - _compute_engine_creds(stub, args) - elif self is TestCase.OAUTH2_AUTH_TOKEN: - _oauth2_auth_token(stub, args) - elif self is TestCase.JWT_TOKEN_CREDS: - _jwt_token_creds(stub, args) - elif self is TestCase.PER_RPC_CREDS: - _per_rpc_creds(stub, args) - else: - raise NotImplementedError('Test case "%s" not implemented!' % self.name) + EMPTY_UNARY = 'empty_unary' + LARGE_UNARY = 'large_unary' + SERVER_STREAMING = 'server_streaming' + CLIENT_STREAMING = 'client_streaming' + PING_PONG = 'ping_pong' + CANCEL_AFTER_BEGIN = 'cancel_after_begin' + CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' + EMPTY_STREAM = 'empty_stream' + STATUS_CODE_AND_MESSAGE = 'status_code_and_message' + UNIMPLEMENTED_METHOD = 'unimplemented_method' + UNIMPLEMENTED_SERVICE = 'unimplemented_service' + CUSTOM_METADATA = "custom_metadata" + COMPUTE_ENGINE_CREDS = 'compute_engine_creds' + OAUTH2_AUTH_TOKEN = 'oauth2_auth_token' + JWT_TOKEN_CREDS = 'jwt_token_creds' + PER_RPC_CREDS = 'per_rpc_creds' + TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' + + def test_interoperability(self, stub, args): + if self is TestCase.EMPTY_UNARY: + _empty_unary(stub) + elif self is TestCase.LARGE_UNARY: + _large_unary(stub) + elif self is TestCase.SERVER_STREAMING: + _server_streaming(stub) + elif self is TestCase.CLIENT_STREAMING: + _client_streaming(stub) + elif self is TestCase.PING_PONG: + _ping_pong(stub) + elif self is TestCase.CANCEL_AFTER_BEGIN: + _cancel_after_begin(stub) + elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE: + _cancel_after_first_response(stub) + elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER: + _timeout_on_sleeping_server(stub) + elif self is TestCase.EMPTY_STREAM: + _empty_stream(stub) + elif self is TestCase.STATUS_CODE_AND_MESSAGE: + _status_code_and_message(stub) + elif self is TestCase.UNIMPLEMENTED_METHOD: + _unimplemented_method(stub) + elif self is TestCase.UNIMPLEMENTED_SERVICE: + _unimplemented_service(stub) + elif self is TestCase.CUSTOM_METADATA: + _custom_metadata(stub) + elif self is TestCase.COMPUTE_ENGINE_CREDS: + _compute_engine_creds(stub, args) + elif self is TestCase.OAUTH2_AUTH_TOKEN: + _oauth2_auth_token(stub, args) + elif self is TestCase.JWT_TOKEN_CREDS: + _jwt_token_creds(stub, args) + elif self is TestCase.PER_RPC_CREDS: + _per_rpc_creds(stub, args) + else: + raise NotImplementedError('Test case "%s" not implemented!' % + self.name) |