aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_test/grpc_interop/methods.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_test/grpc_interop/methods.py')
-rw-r--r--src/python/grpcio_test/grpc_interop/methods.py375
1 files changed, 375 insertions, 0 deletions
diff --git a/src/python/grpcio_test/grpc_interop/methods.py b/src/python/grpcio_test/grpc_interop/methods.py
new file mode 100644
index 0000000000..f4c94685ee
--- /dev/null
+++ b/src/python/grpcio_test/grpc_interop/methods.py
@@ -0,0 +1,375 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Implementations of interoperability test methods."""
+
+import enum
+import json
+import os
+import threading
+
+from oauth2client import client as oauth2client_client
+
+from grpc.framework.alpha import utilities
+
+from grpc_interop import empty_pb2
+from grpc_interop import messages_pb2
+
+_TIMEOUT = 7
+
+
+def _empty_call(request, unused_context):
+ return empty_pb2.Empty()
+
+_CLIENT_EMPTY_CALL = utilities.unary_unary_invocation_description(
+ empty_pb2.Empty.SerializeToString, empty_pb2.Empty.FromString)
+_SERVER_EMPTY_CALL = utilities.unary_unary_service_description(
+ _empty_call, empty_pb2.Empty.FromString,
+ empty_pb2.Empty.SerializeToString)
+
+
+def _unary_call(request, unused_context):
+ return messages_pb2.SimpleResponse(
+ payload=messages_pb2.Payload(
+ type=messages_pb2.COMPRESSABLE,
+ body=b'\x00' * request.response_size))
+
+_CLIENT_UNARY_CALL = utilities.unary_unary_invocation_description(
+ messages_pb2.SimpleRequest.SerializeToString,
+ messages_pb2.SimpleResponse.FromString)
+_SERVER_UNARY_CALL = utilities.unary_unary_service_description(
+ _unary_call, messages_pb2.SimpleRequest.FromString,
+ messages_pb2.SimpleResponse.SerializeToString)
+
+
+def _streaming_output_call(request, unused_context):
+ for response_parameters in request.response_parameters:
+ yield messages_pb2.StreamingOutputCallResponse(
+ payload=messages_pb2.Payload(
+ type=request.response_type,
+ body=b'\x00' * response_parameters.size))
+
+_CLIENT_STREAMING_OUTPUT_CALL = utilities.unary_stream_invocation_description(
+ messages_pb2.StreamingOutputCallRequest.SerializeToString,
+ messages_pb2.StreamingOutputCallResponse.FromString)
+_SERVER_STREAMING_OUTPUT_CALL = utilities.unary_stream_service_description(
+ _streaming_output_call,
+ messages_pb2.StreamingOutputCallRequest.FromString,
+ messages_pb2.StreamingOutputCallResponse.SerializeToString)
+
+
+def _streaming_input_call(request_iterator, unused_context):
+ aggregate_size = 0
+ for request in request_iterator:
+ if request.payload and request.payload.body:
+ aggregate_size += len(request.payload.body)
+ return messages_pb2.StreamingInputCallResponse(
+ aggregated_payload_size=aggregate_size)
+
+_CLIENT_STREAMING_INPUT_CALL = utilities.stream_unary_invocation_description(
+ messages_pb2.StreamingInputCallRequest.SerializeToString,
+ messages_pb2.StreamingInputCallResponse.FromString)
+_SERVER_STREAMING_INPUT_CALL = utilities.stream_unary_service_description(
+ _streaming_input_call,
+ messages_pb2.StreamingInputCallRequest.FromString,
+ messages_pb2.StreamingInputCallResponse.SerializeToString)
+
+
+def _full_duplex_call(request_iterator, unused_context):
+ for request in request_iterator:
+ yield messages_pb2.StreamingOutputCallResponse(
+ payload=messages_pb2.Payload(
+ type=request.payload.type,
+ body=b'\x00' * request.response_parameters[0].size))
+
+_CLIENT_FULL_DUPLEX_CALL = utilities.stream_stream_invocation_description(
+ messages_pb2.StreamingOutputCallRequest.SerializeToString,
+ messages_pb2.StreamingOutputCallResponse.FromString)
+_SERVER_FULL_DUPLEX_CALL = utilities.stream_stream_service_description(
+ _full_duplex_call,
+ messages_pb2.StreamingOutputCallRequest.FromString,
+ messages_pb2.StreamingOutputCallResponse.SerializeToString)
+
+# NOTE(nathaniel): Apparently this is the same as the full-duplex call?
+_CLIENT_HALF_DUPLEX_CALL = utilities.stream_stream_invocation_description(
+ messages_pb2.StreamingOutputCallRequest.SerializeToString,
+ messages_pb2.StreamingOutputCallResponse.FromString)
+_SERVER_HALF_DUPLEX_CALL = utilities.stream_stream_service_description(
+ _full_duplex_call,
+ messages_pb2.StreamingOutputCallRequest.FromString,
+ messages_pb2.StreamingOutputCallResponse.SerializeToString)
+
+
+SERVICE_NAME = 'grpc.testing.TestService'
+
+_EMPTY_CALL_METHOD_NAME = 'EmptyCall'
+_UNARY_CALL_METHOD_NAME = 'UnaryCall'
+_STREAMING_OUTPUT_CALL_METHOD_NAME = 'StreamingOutputCall'
+_STREAMING_INPUT_CALL_METHOD_NAME = 'StreamingInputCall'
+_FULL_DUPLEX_CALL_METHOD_NAME = 'FullDuplexCall'
+_HALF_DUPLEX_CALL_METHOD_NAME = 'HalfDuplexCall'
+
+CLIENT_METHODS = {
+ _EMPTY_CALL_METHOD_NAME: _CLIENT_EMPTY_CALL,
+ _UNARY_CALL_METHOD_NAME: _CLIENT_UNARY_CALL,
+ _STREAMING_OUTPUT_CALL_METHOD_NAME: _CLIENT_STREAMING_OUTPUT_CALL,
+ _STREAMING_INPUT_CALL_METHOD_NAME: _CLIENT_STREAMING_INPUT_CALL,
+ _FULL_DUPLEX_CALL_METHOD_NAME: _CLIENT_FULL_DUPLEX_CALL,
+ _HALF_DUPLEX_CALL_METHOD_NAME: _CLIENT_HALF_DUPLEX_CALL,
+}
+
+SERVER_METHODS = {
+ _EMPTY_CALL_METHOD_NAME: _SERVER_EMPTY_CALL,
+ _UNARY_CALL_METHOD_NAME: _SERVER_UNARY_CALL,
+ _STREAMING_OUTPUT_CALL_METHOD_NAME: _SERVER_STREAMING_OUTPUT_CALL,
+ _STREAMING_INPUT_CALL_METHOD_NAME: _SERVER_STREAMING_INPUT_CALL,
+ _FULL_DUPLEX_CALL_METHOD_NAME: _SERVER_FULL_DUPLEX_CALL,
+ _HALF_DUPLEX_CALL_METHOD_NAME: _SERVER_HALF_DUPLEX_CALL,
+}
+
+
+def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope):
+ with stub:
+ request = messages_pb2.SimpleRequest(
+ response_type=messages_pb2.COMPRESSABLE, response_size=314159,
+ payload=messages_pb2.Payload(body=b'\x00' * 271828),
+ fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
+ response_future = stub.UnaryCall.async(request, _TIMEOUT)
+ response = response_future.result()
+ if response.payload.type is not messages_pb2.COMPRESSABLE:
+ raise ValueError(
+ 'response payload type is "%s"!' % type(response.payload.type))
+ if len(response.payload.body) != 314159:
+ raise ValueError(
+ 'response body of incorrect size %d!' % len(response.payload.body))
+ return response
+
+
+def _empty_unary(stub):
+ with stub:
+ response = stub.EmptyCall(empty_pb2.Empty(), _TIMEOUT)
+ if not isinstance(response, empty_pb2.Empty):
+ raise TypeError(
+ 'response is of type "%s", not empty_pb2.Empty!', type(response))
+
+
+def _large_unary(stub):
+ _large_unary_common_behavior(stub, False, False)
+
+
+def _client_streaming(stub):
+ with stub:
+ payload_body_sizes = (27182, 8, 1828, 45904)
+ payloads = (
+ messages_pb2.Payload(body=b'\x00' * size)
+ for size in payload_body_sizes)
+ requests = (
+ messages_pb2.StreamingInputCallRequest(payload=payload)
+ for payload in payloads)
+ response = stub.StreamingInputCall(requests, _TIMEOUT)
+ if response.aggregated_payload_size != 74922:
+ raise ValueError(
+ 'incorrect size %d!' % response.aggregated_payload_size)
+
+
+def _server_streaming(stub):
+ sizes = (31415, 9, 2653, 58979)
+
+ with stub:
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=sizes[0]),
+ messages_pb2.ResponseParameters(size=sizes[1]),
+ messages_pb2.ResponseParameters(size=sizes[2]),
+ messages_pb2.ResponseParameters(size=sizes[3]),
+ ))
+ response_iterator = stub.StreamingOutputCall(request, _TIMEOUT)
+ for index, response in enumerate(response_iterator):
+ if response.payload.type != messages_pb2.COMPRESSABLE:
+ raise ValueError(
+ 'response body of invalid type %s!' % response.payload.type)
+ if len(response.payload.body) != sizes[index]:
+ raise ValueError(
+ 'response body of invalid size %d!' % len(response.payload.body))
+
+def _cancel_after_begin(stub):
+ with stub:
+ sizes = (27182, 8, 1828, 45904)
+ payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes]
+ requests = [messages_pb2.StreamingInputCallRequest(payload=payload)
+ for payload in payloads]
+ responses = stub.StreamingInputCall.async(requests, _TIMEOUT)
+ responses.cancel()
+ if not responses.cancelled():
+ raise ValueError('expected call to be cancelled')
+
+
+class _Pipe(object):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._values = []
+ self._open = True
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ with self._condition:
+ while not self._values and self._open:
+ self._condition.wait()
+ if self._values:
+ return self._values.pop(0)
+ else:
+ raise StopIteration()
+
+ def add(self, value):
+ with self._condition:
+ self._values.append(value)
+ self._condition.notify()
+
+ def close(self):
+ with self._condition:
+ self._open = False
+ self._condition.notify()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self.close()
+
+
+def _ping_pong(stub):
+ request_response_sizes = (31415, 9, 2653, 58979)
+ request_payload_sizes = (27182, 8, 1828, 45904)
+
+ with stub, _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
+ print 'Starting ping-pong with response iterator %s' % response_iterator
+ for response_size, payload_size in zip(
+ request_response_sizes, request_payload_sizes):
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(messages_pb2.ResponseParameters(
+ size=response_size),),
+ payload=messages_pb2.Payload(body=b'\x00' * payload_size))
+ pipe.add(request)
+ response = next(response_iterator)
+ if response.payload.type != messages_pb2.COMPRESSABLE:
+ raise ValueError(
+ 'response body of invalid type %s!' % response.payload.type)
+ if len(response.payload.body) != response_size:
+ raise ValueError(
+ 'response body of invalid size %d!' % len(response.payload.body))
+
+
+def _cancel_after_first_response(stub):
+ request_response_sizes = (31415, 9, 2653, 58979)
+ request_payload_sizes = (27182, 8, 1828, 45904)
+ with stub, _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
+
+ response_size = request_response_sizes[0]
+ payload_size = request_payload_sizes[0]
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(messages_pb2.ResponseParameters(
+ size=response_size),),
+ payload=messages_pb2.Payload(body=b'\x00' * payload_size))
+ pipe.add(request)
+ response = next(response_iterator)
+ # We test the contents of `response` in the Ping Pong test - don't check
+ # them here.
+ response_iterator.cancel()
+
+ try:
+ next(response_iterator)
+ except Exception:
+ pass
+ else:
+ raise ValueError('expected call to be cancelled')
+
+
+def _compute_engine_creds(stub, args):
+ response = _large_unary_common_behavior(stub, True, True)
+ if args.default_service_account != response.username:
+ raise ValueError(
+ 'expected username %s, got %s' % (args.default_service_account,
+ response.username))
+
+
+def _service_account_creds(stub, args):
+ json_key_filename = os.environ[
+ oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
+ wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
+ response = _large_unary_common_behavior(stub, True, True)
+ if wanted_email != response.username:
+ raise ValueError(
+ 'expected username %s, got %s' % (wanted_email, response.username))
+ if args.oauth_scope.find(response.oauth_scope) == -1:
+ raise ValueError(
+ 'expected to find oauth scope "%s" in received "%s"' %
+ (response.oauth_scope, args.oauth_scope))
+
+
+@enum.unique
+class TestCase(enum.Enum):
+ EMPTY_UNARY = 'empty_unary'
+ LARGE_UNARY = 'large_unary'
+ SERVER_STREAMING = 'server_streaming'
+ CLIENT_STREAMING = 'client_streaming'
+ PING_PONG = 'ping_pong'
+ CANCEL_AFTER_BEGIN = 'cancel_after_begin'
+ CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response'
+ COMPUTE_ENGINE_CREDS = 'compute_engine_creds'
+ SERVICE_ACCOUNT_CREDS = 'service_account_creds'
+
+ def test_interoperability(self, stub, args):
+ if self is TestCase.EMPTY_UNARY:
+ _empty_unary(stub)
+ elif self is TestCase.LARGE_UNARY:
+ _large_unary(stub)
+ elif self is TestCase.SERVER_STREAMING:
+ _server_streaming(stub)
+ elif self is TestCase.CLIENT_STREAMING:
+ _client_streaming(stub)
+ elif self is TestCase.PING_PONG:
+ _ping_pong(stub)
+ elif self is TestCase.CANCEL_AFTER_BEGIN:
+ _cancel_after_begin(stub)
+ elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE:
+ _cancel_after_first_response(stub)
+ elif self is TestCase.COMPUTE_ENGINE_CREDS:
+ _compute_engine_creds(stub, args)
+ elif self is TestCase.SERVICE_ACCOUNT_CREDS:
+ _service_account_creds(stub, args)
+ else:
+ raise NotImplementedError('Test case "%s" not implemented!' % self.name)