aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_test/grpc_test
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_test/grpc_test')
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_low_test.py199
-rw-r--r--src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py165
-rw-r--r--src/python/grpcio_test/grpc_test/_links/_lonely_invocation_link_test.py2
-rw-r--r--src/python/grpcio_test/grpc_test/_links/_transmission_test.py15
-rw-r--r--src/python/grpcio_test/grpc_test/framework/common/test_constants.py10
-rw-r--r--src/python/grpcio_test/grpc_test/framework/common/test_control.py10
-rw-r--r--src/python/grpcio_test/grpc_test/framework/core/__init__.py30
-rw-r--r--src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py96
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/__init__.py30
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py568
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py168
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/_state.py55
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py262
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/test_interfaces.py186
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/__init__.py30
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py250
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_digest.py444
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py377
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py378
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_invocation.py213
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_receiver.py95
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_service.py332
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py396
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/test_cases.py67
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/face/test_interfaces.py229
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py4
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py101
27 files changed, 4650 insertions, 62 deletions
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
index 44fe760fbc..70149127da 100644
--- a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
+++ b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
@@ -52,7 +52,6 @@ def wait_for_events(completion_queues, deadline):
def set_ith_result(i, completion_queue):
result = completion_queue.next(deadline)
with lock:
- print i, completion_queue, result, time.time() - deadline
results[i] = result
for i, completion_queue in enumerate(completion_queues):
thread = threading.Thread(target=set_ith_result,
@@ -80,10 +79,12 @@ class InsecureServerInsecureClient(unittest.TestCase):
del self.client_channel
self.client_completion_queue.shutdown()
- while self.client_completion_queue.next().type != _types.EventType.QUEUE_SHUTDOWN:
+ while (self.client_completion_queue.next().type !=
+ _types.EventType.QUEUE_SHUTDOWN):
pass
self.server_completion_queue.shutdown()
- while self.server_completion_queue.next().type != _types.EventType.QUEUE_SHUTDOWN:
+ while (self.server_completion_queue.next().type !=
+ _types.EventType.QUEUE_SHUTDOWN):
pass
del self.client_completion_queue
@@ -91,58 +92,68 @@ class InsecureServerInsecureClient(unittest.TestCase):
del self.server
def testEcho(self):
- DEADLINE = time.time()+5
- DEADLINE_TOLERANCE = 0.25
- CLIENT_METADATA_ASCII_KEY = 'key'
- CLIENT_METADATA_ASCII_VALUE = 'val'
- CLIENT_METADATA_BIN_KEY = 'key-bin'
- CLIENT_METADATA_BIN_VALUE = b'\0'*1000
- SERVER_INITIAL_METADATA_KEY = 'init_me_me_me'
- SERVER_INITIAL_METADATA_VALUE = 'whodawha?'
- SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought'
- SERVER_TRAILING_METADATA_VALUE = 'zomg it is'
- SERVER_STATUS_CODE = _types.StatusCode.OK
- SERVER_STATUS_DETAILS = 'our work is never over'
- REQUEST = 'in death a member of project mayhem has a name'
- RESPONSE = 'his name is robert paulson'
- METHOD = 'twinkies'
- HOST = 'hostess'
+ deadline = time.time() + 5
+ event_time_tolerance = 2
+ deadline_tolerance = 0.25
+ client_metadata_ascii_key = 'key'
+ client_metadata_ascii_value = 'val'
+ client_metadata_bin_key = 'key-bin'
+ client_metadata_bin_value = b'\0'*1000
+ server_initial_metadata_key = 'init_me_me_me'
+ server_initial_metadata_value = 'whodawha?'
+ server_trailing_metadata_key = 'california_is_in_a_drought'
+ server_trailing_metadata_value = 'zomg it is'
+ server_status_code = _types.StatusCode.OK
+ server_status_details = 'our work is never over'
+ request = 'blarghaflargh'
+ response = 'his name is robert paulson'
+ method = 'twinkies'
+ host = 'hostess'
server_request_tag = object()
- request_call_result = self.server.request_call(self.server_completion_queue, server_request_tag)
+ request_call_result = self.server.request_call(self.server_completion_queue,
+ server_request_tag)
- self.assertEquals(_types.CallError.OK, request_call_result)
+ self.assertEqual(_types.CallError.OK, request_call_result)
client_call_tag = object()
- client_call = self.client_channel.create_call(self.client_completion_queue, METHOD, HOST, DEADLINE)
- client_initial_metadata = [(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]
+ client_call = self.client_channel.create_call(
+ self.client_completion_queue, method, host, deadline)
+ client_initial_metadata = [
+ (client_metadata_ascii_key, client_metadata_ascii_value),
+ (client_metadata_bin_key, client_metadata_bin_value)
+ ]
client_start_batch_result = client_call.start_batch([
_types.OpArgs.send_initial_metadata(client_initial_metadata),
- _types.OpArgs.send_message(REQUEST, 0),
+ _types.OpArgs.send_message(request, 0),
_types.OpArgs.send_close_from_client(),
_types.OpArgs.recv_initial_metadata(),
_types.OpArgs.recv_message(),
_types.OpArgs.recv_status_on_client()
], client_call_tag)
- self.assertEquals(_types.CallError.OK, client_start_batch_result)
+ self.assertEqual(_types.CallError.OK, client_start_batch_result)
- client_no_event, request_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 2)
- self.assertEquals(client_no_event, None)
- self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type)
+ client_no_event, request_event, = wait_for_events(
+ [self.client_completion_queue, self.server_completion_queue],
+ time.time() + event_time_tolerance)
+ self.assertEqual(client_no_event, None)
+ self.assertEqual(_types.EventType.OP_COMPLETE, request_event.type)
self.assertIsInstance(request_event.call, _low.Call)
self.assertIs(server_request_tag, request_event.tag)
- self.assertEquals(1, len(request_event.results))
+ self.assertEqual(1, len(request_event.results))
received_initial_metadata = dict(request_event.results[0].initial_metadata)
# Check that our metadata were transmitted
- self.assertEquals(
+ self.assertEqual(
dict(client_initial_metadata),
- dict((x, received_initial_metadata[x]) for x in zip(*client_initial_metadata)[0]))
+ dict((x, received_initial_metadata[x])
+ for x in zip(*client_initial_metadata)[0]))
# Check that Python's user agent string is a part of the full user agent
# string
self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__),
received_initial_metadata['user-agent'])
- self.assertEquals(METHOD, request_event.call_details.method)
- self.assertEquals(HOST, request_event.call_details.host)
- self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE)
+ self.assertEqual(method, request_event.call_details.method)
+ self.assertEqual(host, request_event.call_details.host)
+ self.assertLess(abs(deadline - request_event.call_details.deadline),
+ deadline_tolerance)
# Check that the channel is connected, and that both it and the call have
# the proper target and peer; do this after the first flurry of messages to
@@ -155,33 +166,43 @@ class InsecureServerInsecureClient(unittest.TestCase):
server_call_tag = object()
server_call = request_event.call
- server_initial_metadata = [(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE)]
- server_trailing_metadata = [(SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE)]
+ server_initial_metadata = [
+ (server_initial_metadata_key, server_initial_metadata_value)
+ ]
+ server_trailing_metadata = [
+ (server_trailing_metadata_key, server_trailing_metadata_value)
+ ]
server_start_batch_result = server_call.start_batch([
_types.OpArgs.send_initial_metadata(server_initial_metadata),
_types.OpArgs.recv_message(),
- _types.OpArgs.send_message(RESPONSE, 0),
+ _types.OpArgs.send_message(response, 0),
_types.OpArgs.recv_close_on_server(),
- _types.OpArgs.send_status_from_server(server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS)
+ _types.OpArgs.send_status_from_server(
+ server_trailing_metadata, server_status_code, server_status_details)
], server_call_tag)
- self.assertEquals(_types.CallError.OK, server_start_batch_result)
+ self.assertEqual(_types.CallError.OK, server_start_batch_result)
- client_event, server_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 1)
+ client_event, server_event, = wait_for_events(
+ [self.client_completion_queue, self.server_completion_queue],
+ time.time() + event_time_tolerance)
- self.assertEquals(6, len(client_event.results))
+ self.assertEqual(6, len(client_event.results))
found_client_op_types = set()
for client_result in client_event.results:
- self.assertNotIn(client_result.type, found_client_op_types) # we expect each op type to be unique
+ # we expect each op type to be unique
+ self.assertNotIn(client_result.type, found_client_op_types)
found_client_op_types.add(client_result.type)
if client_result.type == _types.OpType.RECV_INITIAL_METADATA:
- self.assertEquals(dict(server_initial_metadata), dict(client_result.initial_metadata))
+ self.assertEqual(dict(server_initial_metadata),
+ dict(client_result.initial_metadata))
elif client_result.type == _types.OpType.RECV_MESSAGE:
- self.assertEquals(RESPONSE, client_result.message)
+ self.assertEqual(response, client_result.message)
elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT:
- self.assertEquals(dict(server_trailing_metadata), dict(client_result.trailing_metadata))
- self.assertEquals(SERVER_STATUS_DETAILS, client_result.status.details)
- self.assertEquals(SERVER_STATUS_CODE, client_result.status.code)
- self.assertEquals(set([
+ self.assertEqual(dict(server_trailing_metadata),
+ dict(client_result.trailing_metadata))
+ self.assertEqual(server_status_details, client_result.status.details)
+ self.assertEqual(server_status_code, client_result.status.code)
+ self.assertEqual(set([
_types.OpType.SEND_INITIAL_METADATA,
_types.OpType.SEND_MESSAGE,
_types.OpType.SEND_CLOSE_FROM_CLIENT,
@@ -190,16 +211,16 @@ class InsecureServerInsecureClient(unittest.TestCase):
_types.OpType.RECV_STATUS_ON_CLIENT
]), found_client_op_types)
- self.assertEquals(5, len(server_event.results))
+ self.assertEqual(5, len(server_event.results))
found_server_op_types = set()
for server_result in server_event.results:
self.assertNotIn(client_result.type, found_server_op_types)
found_server_op_types.add(server_result.type)
if server_result.type == _types.OpType.RECV_MESSAGE:
- self.assertEquals(REQUEST, server_result.message)
+ self.assertEqual(request, server_result.message)
elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER:
self.assertFalse(server_result.cancelled)
- self.assertEquals(set([
+ self.assertEqual(set([
_types.OpType.SEND_INITIAL_METADATA,
_types.OpType.RECV_MESSAGE,
_types.OpType.SEND_MESSAGE,
@@ -211,5 +232,81 @@ class InsecureServerInsecureClient(unittest.TestCase):
del server_call
+class HangingServerShutdown(unittest.TestCase):
+
+ def setUp(self):
+ self.server_completion_queue = _low.CompletionQueue()
+ self.server = _low.Server(self.server_completion_queue, [])
+ self.port = self.server.add_http2_port('[::]:0')
+ self.client_completion_queue = _low.CompletionQueue()
+ self.client_channel = _low.Channel('localhost:%d'%self.port, [])
+
+ self.server.start()
+
+ def tearDown(self):
+ self.server.shutdown()
+ del self.client_channel
+
+ self.client_completion_queue.shutdown()
+ self.server_completion_queue.shutdown()
+ while True:
+ client_event, server_event = wait_for_events(
+ [self.client_completion_queue, self.server_completion_queue],
+ float("+inf"))
+ if (client_event.type == _types.EventType.QUEUE_SHUTDOWN and
+ server_event.type == _types.EventType.QUEUE_SHUTDOWN):
+ break
+
+ del self.client_completion_queue
+ del self.server_completion_queue
+ del self.server
+
+ def testHangingServerCall(self):
+ deadline = time.time() + 5
+ deadline_tolerance = 0.25
+ event_time_tolerance = 2
+ cancel_all_calls_time_tolerance = 0.5
+ request = 'blarghaflargh'
+ method = 'twinkies'
+ host = 'hostess'
+ server_request_tag = object()
+ request_call_result = self.server.request_call(self.server_completion_queue,
+ server_request_tag)
+
+ client_call_tag = object()
+ client_call = self.client_channel.create_call(self.client_completion_queue,
+ method, host, deadline)
+ client_start_batch_result = client_call.start_batch([
+ _types.OpArgs.send_initial_metadata([]),
+ _types.OpArgs.send_message(request, 0),
+ _types.OpArgs.send_close_from_client(),
+ _types.OpArgs.recv_initial_metadata(),
+ _types.OpArgs.recv_message(),
+ _types.OpArgs.recv_status_on_client()
+ ], client_call_tag)
+
+ client_no_event, request_event, = wait_for_events(
+ [self.client_completion_queue, self.server_completion_queue],
+ time.time() + event_time_tolerance)
+
+ # Now try to shutdown the server and expect that we see server shutdown
+ # almost immediately after calling cancel_all_calls.
+ with self.assertRaises(RuntimeError):
+ self.server.cancel_all_calls()
+ shutdown_tag = object()
+ self.server.shutdown(shutdown_tag)
+ pre_cancel_timestamp = time.time()
+ self.server.cancel_all_calls()
+ finish_shutdown_timestamp = None
+ client_call_event, server_shutdown_event = wait_for_events(
+ [self.client_completion_queue, self.server_completion_queue],
+ time.time() + event_time_tolerance)
+ self.assertIs(shutdown_tag, server_shutdown_event.tag)
+ self.assertGreater(pre_cancel_timestamp + cancel_all_calls_time_tolerance,
+ time.time())
+
+ del client_call
+
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
new file mode 100644
index 0000000000..72b1ae5642
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
@@ -0,0 +1,165 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests the RPC Framework Core's implementation of the Base interface."""
+
+import collections
+import logging
+import random
+import time
+import unittest
+
+from grpc._adapter import _intermediary_low
+from grpc._links import invocation
+from grpc._links import service
+from grpc.framework.core import implementations
+from grpc.framework.interfaces.base import utilities
+from grpc_test import test_common as grpc_test_common
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import test_cases
+from grpc_test.framework.interfaces.base import test_interfaces
+
+_INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),)
+_SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),)
+_SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),)
+_CODE = _intermediary_low.Code.OK
+_MESSAGE = b'test message'
+
+
+class _SerializationBehaviors(
+ collections.namedtuple(
+ '_SerializationBehaviors',
+ ('request_serializers', 'request_deserializers', 'response_serializers',
+ 'response_deserializers',))):
+ pass
+
+
+class _Links(
+ collections.namedtuple(
+ '_Links',
+ ('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link',
+ 'service_end_link'))):
+ pass
+
+
+def _serialization_behaviors_from_serializations(serializations):
+ request_serializers = {}
+ request_deserializers = {}
+ response_serializers = {}
+ response_deserializers = {}
+ for (group, method), serialization in serializations.iteritems():
+ request_serializers[group, method] = serialization.serialize_request
+ request_deserializers[group, method] = serialization.deserialize_request
+ response_serializers[group, method] = serialization.serialize_response
+ response_deserializers[group, method] = serialization.deserialize_response
+ return _SerializationBehaviors(
+ request_serializers, request_deserializers, response_serializers,
+ response_deserializers)
+
+
+class _Implementation(test_interfaces.Implementation):
+
+ def instantiate(self, serializations, servicer):
+ serialization_behaviors = _serialization_behaviors_from_serializations(
+ serializations)
+ invocation_end_link = implementations.invocation_end_link()
+ service_end_link = implementations.service_end_link(
+ servicer, test_constants.DEFAULT_TIMEOUT,
+ test_constants.MAXIMUM_TIMEOUT)
+ service_grpc_link = service.service_link(
+ serialization_behaviors.request_deserializers,
+ serialization_behaviors.response_serializers)
+ port = service_grpc_link.add_port(0, None)
+ channel = _intermediary_low.Channel('localhost:%d' % port, None)
+ invocation_grpc_link = invocation.invocation_link(
+ channel, b'localhost',
+ serialization_behaviors.request_serializers,
+ serialization_behaviors.response_deserializers)
+
+ invocation_end_link.join_link(invocation_grpc_link)
+ invocation_grpc_link.join_link(invocation_end_link)
+ service_end_link.join_link(service_grpc_link)
+ service_grpc_link.join_link(service_end_link)
+ invocation_grpc_link.start()
+ service_grpc_link.start()
+ return invocation_end_link, service_end_link, (
+ invocation_grpc_link, service_grpc_link)
+
+ def destantiate(self, memo):
+ invocation_grpc_link, service_grpc_link = memo
+ invocation_grpc_link.stop()
+ service_grpc_link.stop_gracefully()
+
+ def invocation_initial_metadata(self):
+ return _INVOCATION_INITIAL_METADATA
+
+ def service_initial_metadata(self):
+ return _SERVICE_INITIAL_METADATA
+
+ def invocation_completion(self):
+ return utilities.completion(None, None, None)
+
+ def service_completion(self):
+ return utilities.completion(_SERVICE_TERMINAL_METADATA, _CODE, _MESSAGE)
+
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ return original_metadata is None or grpc_test_common.metadata_transmitted(
+ original_metadata, transmitted_metadata)
+
+ def completion_transmitted(self, original_completion, transmitted_completion):
+ if (original_completion.terminal_metadata is not None and
+ not grpc_test_common.metadata_transmitted(
+ original_completion.terminal_metadata,
+ transmitted_completion.terminal_metadata)):
+ return False
+ elif original_completion.code is not transmitted_completion.code:
+ return False
+ elif original_completion.message != transmitted_completion.message:
+ return False
+ else:
+ return True
+
+
+def setUpModule():
+ logging.warn('setUpModule!')
+
+
+def tearDownModule():
+ logging.warn('tearDownModule!')
+
+
+def load_tests(loader, tests, pattern):
+ return unittest.TestSuite(
+ tests=tuple(
+ loader.loadTestsFromTestCase(test_case_class)
+ for test_case_class in test_cases.test_cases(_Implementation())))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/_links/_lonely_invocation_link_test.py b/src/python/grpcio_test/grpc_test/_links/_lonely_invocation_link_test.py
index abe240e07a..373a2b2a1f 100644
--- a/src/python/grpcio_test/grpc_test/_links/_lonely_invocation_link_test.py
+++ b/src/python/grpcio_test/grpc_test/_links/_lonely_invocation_link_test.py
@@ -66,7 +66,7 @@ class LonelyInvocationLinkTest(unittest.TestCase):
ticket = links.Ticket(
test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.SHORT_TIMEOUT, 1, None,
- None, None, None, None, termination)
+ None, None, None, None, termination, None)
invocation_link.accept_ticket(ticket)
invocation_link_mate.block_until_tickets_satisfy(test_cases.terminated)
diff --git a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py
index 9cdc9620f0..02ddd512c2 100644
--- a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py
+++ b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py
@@ -128,14 +128,14 @@ class RoundTripTest(unittest.TestCase):
invocation_ticket = links.Ticket(
test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
- None, None, None, None, links.Ticket.Termination.COMPLETION)
+ None, None, None, None, links.Ticket.Termination.COMPLETION, None)
invocation_link.accept_ticket(invocation_ticket)
service_mate.block_until_tickets_satisfy(test_cases.terminated)
service_ticket = links.Ticket(
service_mate.tickets()[-1].operation_id, 0, None, None, None, None,
None, None, None, None, test_code, test_message,
- links.Ticket.Termination.COMPLETION)
+ links.Ticket.Termination.COMPLETION, None)
service_link.accept_ticket(service_ticket)
invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
@@ -174,33 +174,34 @@ class RoundTripTest(unittest.TestCase):
invocation_ticket = links.Ticket(
test_operation_id, 0, test_group, test_method,
links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
- None, None, None, None, None)
+ None, None, None, None, None, None)
invocation_link.accept_ticket(invocation_ticket)
requests = scenario.requests()
for request_index, request in enumerate(requests):
request_ticket = links.Ticket(
test_operation_id, 1 + request_index, None, None, None, None, 1, None,
- request, None, None, None, None)
+ request, None, None, None, None, None)
invocation_link.accept_ticket(request_ticket)
service_mate.block_until_tickets_satisfy(
test_cases.at_least_n_payloads_received_predicate(1 + request_index))
response_ticket = links.Ticket(
service_mate.tickets()[0].operation_id, request_index, None, None,
None, None, 1, None, scenario.response_for_request(request), None,
- None, None, None)
+ None, None, None, None)
service_link.accept_ticket(response_ticket)
invocation_mate.block_until_tickets_satisfy(
test_cases.at_least_n_payloads_received_predicate(1 + request_index))
request_count = len(requests)
invocation_completion_ticket = links.Ticket(
test_operation_id, request_count + 1, None, None, None, None, None,
- None, None, None, None, None, links.Ticket.Termination.COMPLETION)
+ None, None, None, None, None, links.Ticket.Termination.COMPLETION,
+ None)
invocation_link.accept_ticket(invocation_completion_ticket)
service_mate.block_until_tickets_satisfy(test_cases.terminated)
service_completion_ticket = links.Ticket(
service_mate.tickets()[0].operation_id, request_count, None, None, None,
None, None, None, None, None, test_code, test_message,
- links.Ticket.Termination.COMPLETION)
+ links.Ticket.Termination.COMPLETION, None)
service_link.accept_ticket(service_completion_ticket)
invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
diff --git a/src/python/grpcio_test/grpc_test/framework/common/test_constants.py b/src/python/grpcio_test/grpc_test/framework/common/test_constants.py
index 3126d0d82c..e1d3c2709d 100644
--- a/src/python/grpcio_test/grpc_test/framework/common/test_constants.py
+++ b/src/python/grpcio_test/grpc_test/framework/common/test_constants.py
@@ -29,15 +29,25 @@
"""Constants shared among tests throughout RPC Framework."""
+# Value for maximum duration in seconds that a test is allowed for its actual
+# behavioral logic, excluding all time spent deliberately waiting in the test.
+TIME_ALLOWANCE = 10
# Value for maximum duration in seconds of RPCs that may time out as part of a
# test.
SHORT_TIMEOUT = 4
# Absurdly large value for maximum duration in seconds for should-not-time-out
# RPCs made during tests.
LONG_TIMEOUT = 3000
+# Values to supply on construction of an object that will service RPCs; these
+# should not be used as the actual timeout values of any RPCs made during tests.
+DEFAULT_TIMEOUT = 300
+MAXIMUM_TIMEOUT = 3600
# The number of payloads to transmit in streaming tests.
STREAM_LENGTH = 200
+# The size of payloads to transmit in tests.
+PAYLOAD_SIZE = 256 * 1024 + 17
+
# The size of thread pools to use in tests.
POOL_SIZE = 10
diff --git a/src/python/grpcio_test/grpc_test/framework/common/test_control.py b/src/python/grpcio_test/grpc_test/framework/common/test_control.py
index 3960c4e649..8d6eba5c2c 100644
--- a/src/python/grpcio_test/grpc_test/framework/common/test_control.py
+++ b/src/python/grpcio_test/grpc_test/framework/common/test_control.py
@@ -34,6 +34,14 @@ import contextlib
import threading
+class Defect(Exception):
+ """Simulates a programming defect raised into in a system under test.
+
+ Use of a standard exception type is too easily misconstrued as an actual
+ defect in either the test infrastructure or the system under test.
+ """
+
+
class Control(object):
"""An object that accepts program control from a system under test.
@@ -62,7 +70,7 @@ class PauseFailControl(Control):
def control(self):
with self._condition:
if self._fail:
- raise ValueError()
+ raise Defect()
while self._paused:
self._condition.wait()
diff --git a/src/python/grpcio_test/grpc_test/framework/core/__init__.py b/src/python/grpcio_test/grpc_test/framework/core/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/core/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py
new file mode 100644
index 0000000000..8d72f131d5
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py
@@ -0,0 +1,96 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests the RPC Framework Core's implementation of the Base interface."""
+
+import logging
+import random
+import time
+import unittest
+
+from grpc.framework.core import implementations
+from grpc.framework.interfaces.base import utilities
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import test_cases
+from grpc_test.framework.interfaces.base import test_interfaces
+
+
+class _Implementation(test_interfaces.Implementation):
+
+ def __init__(self):
+ self._invocation_initial_metadata = object()
+ self._service_initial_metadata = object()
+ self._invocation_terminal_metadata = object()
+ self._service_terminal_metadata = object()
+
+ def instantiate(self, serializations, servicer):
+ invocation = implementations.invocation_end_link()
+ service = implementations.service_end_link(
+ servicer, test_constants.DEFAULT_TIMEOUT,
+ test_constants.MAXIMUM_TIMEOUT)
+ invocation.join_link(service)
+ service.join_link(invocation)
+ return invocation, service, None
+
+ def destantiate(self, memo):
+ pass
+
+ def invocation_initial_metadata(self):
+ return self._invocation_initial_metadata
+
+ def service_initial_metadata(self):
+ return self._service_initial_metadata
+
+ def invocation_completion(self):
+ return utilities.completion(self._invocation_terminal_metadata, None, None)
+
+ def service_completion(self):
+ return utilities.completion(self._service_terminal_metadata, None, None)
+
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ return transmitted_metadata is original_metadata
+
+ def completion_transmitted(self, original_completion, transmitted_completion):
+ return (
+ (original_completion.terminal_metadata is
+ transmitted_completion.terminal_metadata) and
+ original_completion.code is transmitted_completion.code and
+ original_completion.message is transmitted_completion.message
+ )
+
+
+def load_tests(loader, tests, pattern):
+ return unittest.TestSuite(
+ tests=tuple(
+ loader.loadTestsFromTestCase(test_case_class)
+ for test_case_class in test_cases.test_cases(_Implementation())))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/__init__.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py
new file mode 100644
index 0000000000..e4d2a7a0d7
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py
@@ -0,0 +1,568 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Part of the tests of the base interface of RPC Framework."""
+
+import abc
+import collections
+import enum
+import random # pylint: disable=unused-import
+import threading
+import time
+
+from grpc.framework.interfaces.base import base
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import _sequence
+from grpc_test.framework.interfaces.base import _state
+from grpc_test.framework.interfaces.base import test_interfaces # pylint: disable=unused-import
+
+_GROUP = 'base test cases test group'
+_METHOD = 'base test cases test method'
+
+_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE = test_constants.PAYLOAD_SIZE / 20
+_MINIMUM_PAYLOAD_SIZE = test_constants.PAYLOAD_SIZE / 600
+
+
+def _create_payload(randomness):
+ length = randomness.randint(
+ _MINIMUM_PAYLOAD_SIZE, test_constants.PAYLOAD_SIZE)
+ random_section_length = randomness.randint(
+ 0, min(_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE, length))
+ random_section = bytes(
+ bytearray(
+ randomness.getrandbits(8) for _ in range(random_section_length)))
+ sevens_section = '\x07' * (length - random_section_length)
+ return b''.join(randomness.sample((random_section, sevens_section), 2))
+
+
+def _anything_in_flight(state):
+ return (
+ state.invocation_initial_metadata_in_flight is not None or
+ state.invocation_payloads_in_flight or
+ state.invocation_completion_in_flight is not None or
+ state.service_initial_metadata_in_flight is not None or
+ state.service_payloads_in_flight or
+ state.service_completion_in_flight is not None or
+ 0 < state.invocation_allowance_in_flight or
+ 0 < state.service_allowance_in_flight
+ )
+
+
+def _verify_service_advance_and_update_state(
+ initial_metadata, payload, completion, allowance, state, implementation):
+ if initial_metadata is not None:
+ if state.invocation_initial_metadata_received:
+ return 'Later invocation initial metadata received: %s' % (
+ initial_metadata,)
+ if state.invocation_payloads_received:
+ return 'Invocation initial metadata received after payloads: %s' % (
+ state.invocation_payloads_received)
+ if state.invocation_completion_received:
+ return 'Invocation initial metadata received after invocation completion!'
+ if not implementation.metadata_transmitted(
+ state.invocation_initial_metadata_in_flight, initial_metadata):
+ return 'Invocation initial metadata maltransmitted: %s, %s' % (
+ state.invocation_initial_metadata_in_flight, initial_metadata)
+ else:
+ state.invocation_initial_metadata_in_flight = None
+ state.invocation_initial_metadata_received = True
+
+ if payload is not None:
+ if state.invocation_completion_received:
+ return 'Invocation payload received after invocation completion!'
+ elif not state.invocation_payloads_in_flight:
+ return 'Invocation payload "%s" received but not in flight!' % (payload,)
+ elif state.invocation_payloads_in_flight[0] != payload:
+ return 'Invocation payload mismatch: %s, %s' % (
+ state.invocation_payloads_in_flight[0], payload)
+ elif state.service_side_invocation_allowance < 1:
+ return 'Disallowed invocation payload!'
+ else:
+ state.invocation_payloads_in_flight.pop(0)
+ state.invocation_payloads_received += 1
+ state.service_side_invocation_allowance -= 1
+
+ if completion is not None:
+ if state.invocation_completion_received:
+ return 'Later invocation completion received: %s' % (completion,)
+ elif not implementation.completion_transmitted(
+ state.invocation_completion_in_flight, completion):
+ return 'Invocation completion maltransmitted: %s, %s' % (
+ state.invocation_completion_in_flight, completion)
+ else:
+ state.invocation_completion_in_flight = None
+ state.invocation_completion_received = True
+
+ if allowance is not None:
+ if allowance <= 0:
+ return 'Illegal allowance value: %s' % (allowance,)
+ else:
+ state.service_allowance_in_flight -= allowance
+ state.service_side_service_allowance += allowance
+
+
+def _verify_invocation_advance_and_update_state(
+ initial_metadata, payload, completion, allowance, state, implementation):
+ if initial_metadata is not None:
+ if state.service_initial_metadata_received:
+ return 'Later service initial metadata received: %s' % (initial_metadata,)
+ if state.service_payloads_received:
+ return 'Service initial metadata received after service payloads: %s' % (
+ state.service_payloads_received)
+ if state.service_completion_received:
+ return 'Service initial metadata received after service completion!'
+ if not implementation.metadata_transmitted(
+ state.service_initial_metadata_in_flight, initial_metadata):
+ return 'Service initial metadata maltransmitted: %s, %s' % (
+ state.service_initial_metadata_in_flight, initial_metadata)
+ else:
+ state.service_initial_metadata_in_flight = None
+ state.service_initial_metadata_received = True
+
+ if payload is not None:
+ if state.service_completion_received:
+ return 'Service payload received after service completion!'
+ elif not state.service_payloads_in_flight:
+ return 'Service payload "%s" received but not in flight!' % (payload,)
+ elif state.service_payloads_in_flight[0] != payload:
+ return 'Service payload mismatch: %s, %s' % (
+ state.invocation_payloads_in_flight[0], payload)
+ elif state.invocation_side_service_allowance < 1:
+ return 'Disallowed service payload!'
+ else:
+ state.service_payloads_in_flight.pop(0)
+ state.service_payloads_received += 1
+ state.invocation_side_service_allowance -= 1
+
+ if completion is not None:
+ if state.service_completion_received:
+ return 'Later service completion received: %s' % (completion,)
+ elif not implementation.completion_transmitted(
+ state.service_completion_in_flight, completion):
+ return 'Service completion maltransmitted: %s, %s' % (
+ state.service_completion_in_flight, completion)
+ else:
+ state.service_completion_in_flight = None
+ state.service_completion_received = True
+
+ if allowance is not None:
+ if allowance <= 0:
+ return 'Illegal allowance value: %s' % (allowance,)
+ else:
+ state.invocation_allowance_in_flight -= allowance
+ state.invocation_side_service_allowance += allowance
+
+
+class Invocation(
+ collections.namedtuple(
+ 'Invocation',
+ ('group', 'method', 'subscription_kind', 'timeout', 'initial_metadata',
+ 'payload', 'completion',))):
+ """A description of operation invocation.
+
+ Attributes:
+ group: The group identifier for the operation.
+ method: The method identifier for the operation.
+ subscription_kind: A base.Subscription.Kind value describing the kind of
+ subscription to use for the operation.
+ timeout: A duration in seconds to pass as the timeout value for the
+ operation.
+ initial_metadata: An object to pass as the initial metadata for the
+ operation or None.
+ payload: An object to pass as a payload value for the operation or None.
+ completion: An object to pass as a completion value for the operation or
+ None.
+ """
+
+
+class OnAdvance(
+ collections.namedtuple(
+ 'OnAdvance',
+ ('kind', 'initial_metadata', 'payload', 'completion', 'allowance'))):
+ """Describes action to be taken in a test in response to an advance call.
+
+ Attributes:
+ kind: A Kind value describing the overall kind of response.
+ initial_metadata: An initial metadata value to pass to a call of the advance
+ method of the operator under test. Only valid if kind is Kind.ADVANCE and
+ may be None.
+ payload: A payload value to pass to a call of the advance method of the
+ operator under test. Only valid if kind is Kind.ADVANCE and may be None.
+ completion: A base.Completion value to pass to a call of the advance method
+ of the operator under test. Only valid if kind is Kind.ADVANCE and may be
+ None.
+ allowance: An allowance value to pass to a call of the advance method of the
+ operator under test. Only valid if kind is Kind.ADVANCE and may be None.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ ADVANCE = 'advance'
+ DEFECT = 'defect'
+ IDLE = 'idle'
+
+
+_DEFECT_ON_ADVANCE = OnAdvance(OnAdvance.Kind.DEFECT, None, None, None, None)
+_IDLE_ON_ADVANCE = OnAdvance(OnAdvance.Kind.IDLE, None, None, None, None)
+
+
+class Instruction(
+ collections.namedtuple(
+ 'Instruction',
+ ('kind', 'advance_args', 'advance_kwargs', 'conclude_success',
+ 'conclude_message', 'conclude_invocation_outcome',
+ 'conclude_service_outcome',))):
+ """"""
+
+ @enum.unique
+ class Kind(enum.Enum):
+ ADVANCE = 'ADVANCE'
+ CANCEL = 'CANCEL'
+ CONCLUDE = 'CONCLUDE'
+
+
+class Controller(object):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def failed(self, message):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_request(self, request):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_request(self, serialized_request):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_response(self, response):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_response(self, serialized_response):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation(self):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def poll(self):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def on_service_advance(
+ self, initial_metadata, payload, completion, allowance):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def on_invocation_advance(
+ self, initial_metadata, payload, completion, allowance):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def service_on_termination(self, outcome):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation_on_termination(self, outcome):
+ """"""
+ raise NotImplementedError()
+
+
+class ControllerCreator(object):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def name(self):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def controller(self, implementation, randomness):
+ """"""
+ raise NotImplementedError()
+
+
+class _Remainder(
+ collections.namedtuple(
+ '_Remainder',
+ ('invocation_payloads', 'service_payloads', 'invocation_completion',
+ 'service_completion',))):
+ """Describes work remaining to be done in a portion of a test.
+
+ Attributes:
+ invocation_payloads: The number of payloads to be sent from the invocation
+ side of the operation to the service side of the operation.
+ service_payloads: The number of payloads to be sent from the service side of
+ the operation to the invocation side of the operation.
+ invocation_completion: Whether or not completion from the invocation side of
+ the operation should be indicated and has yet to be indicated.
+ service_completion: Whether or not completion from the service side of the
+ operation should be indicated and has yet to be indicated.
+ """
+
+
+class _SequenceController(Controller):
+
+ def __init__(self, sequence, implementation, randomness):
+ """Constructor.
+
+ Args:
+ sequence: A _sequence.Sequence describing the steps to be taken in the
+ test at a relatively high level.
+ implementation: A test_interfaces.Implementation encapsulating the
+ base interface implementation that is the system under test.
+ randomness: A random.Random instance for use in the test.
+ """
+ self._condition = threading.Condition()
+ self._sequence = sequence
+ self._implementation = implementation
+ self._randomness = randomness
+
+ self._until = None
+ self._remaining_elements = None
+ self._poll_next = None
+ self._message = None
+
+ self._state = _state.OperationState()
+ self._todo = None
+
+ # called with self._condition
+ def _failed(self, message):
+ self._message = message
+ self._condition.notify_all()
+
+ def _passed(self, invocation_outcome, service_outcome):
+ self._poll_next = Instruction(
+ Instruction.Kind.CONCLUDE, None, None, True, None, invocation_outcome,
+ service_outcome)
+ self._condition.notify_all()
+
+ def failed(self, message):
+ with self._condition:
+ self._failed(message)
+
+ def serialize_request(self, request):
+ return request + request
+
+ def deserialize_request(self, serialized_request):
+ return serialized_request[:len(serialized_request) / 2]
+
+ def serialize_response(self, response):
+ return response * 3
+
+ def deserialize_response(self, serialized_response):
+ return serialized_response[2 * len(serialized_response) / 3:]
+
+ def invocation(self):
+ with self._condition:
+ self._until = time.time() + self._sequence.maximum_duration
+ self._remaining_elements = list(self._sequence.elements)
+ if self._sequence.invocation.initial_metadata:
+ initial_metadata = self._implementation.invocation_initial_metadata()
+ self._state.invocation_initial_metadata_in_flight = initial_metadata
+ else:
+ initial_metadata = None
+ if self._sequence.invocation.payload:
+ payload = _create_payload(self._randomness)
+ self._state.invocation_payloads_in_flight.append(payload)
+ else:
+ payload = None
+ if self._sequence.invocation.complete:
+ completion = self._implementation.invocation_completion()
+ self._state.invocation_completion_in_flight = completion
+ else:
+ completion = None
+ return Invocation(
+ _GROUP, _METHOD, base.Subscription.Kind.FULL,
+ self._sequence.invocation.timeout, initial_metadata, payload,
+ completion)
+
+ def poll(self):
+ with self._condition:
+ while True:
+ if self._message is not None:
+ return Instruction(
+ Instruction.Kind.CONCLUDE, None, None, False, self._message, None,
+ None)
+ elif self._poll_next:
+ poll_next = self._poll_next
+ self._poll_next = None
+ return poll_next
+ elif self._until < time.time():
+ return Instruction(
+ Instruction.Kind.CONCLUDE, None, None, False,
+ 'overran allotted time!', None, None)
+ else:
+ self._condition.wait(timeout=self._until-time.time())
+
+ def on_service_advance(
+ self, initial_metadata, payload, completion, allowance):
+ with self._condition:
+ message = _verify_service_advance_and_update_state(
+ initial_metadata, payload, completion, allowance, self._state,
+ self._implementation)
+ if message is not None:
+ self._failed(message)
+ if self._todo is not None:
+ raise ValueError('TODO!!!')
+ elif _anything_in_flight(self._state):
+ return _IDLE_ON_ADVANCE
+ elif self._remaining_elements:
+ element = self._remaining_elements.pop(0)
+ if element.kind is _sequence.Element.Kind.SERVICE_TRANSMISSION:
+ if element.transmission.initial_metadata:
+ initial_metadata = self._implementation.service_initial_metadata()
+ self._state.service_initial_metadata_in_flight = initial_metadata
+ else:
+ initial_metadata = None
+ if element.transmission.payload:
+ payload = _create_payload(self._randomness)
+ self._state.service_payloads_in_flight.append(payload)
+ self._state.service_side_service_allowance -= 1
+ else:
+ payload = None
+ if element.transmission.complete:
+ completion = self._implementation.service_completion()
+ self._state.service_completion_in_flight = completion
+ else:
+ completion = None
+ if (not self._state.invocation_completion_received and
+ 0 <= self._state.service_side_invocation_allowance):
+ allowance = 1
+ self._state.service_side_invocation_allowance += 1
+ self._state.invocation_allowance_in_flight += 1
+ else:
+ allowance = None
+ return OnAdvance(
+ OnAdvance.Kind.ADVANCE, initial_metadata, payload, completion,
+ allowance)
+ else:
+ raise ValueError('TODO!!!')
+ else:
+ return _IDLE_ON_ADVANCE
+
+ def on_invocation_advance(
+ self, initial_metadata, payload, completion, allowance):
+ with self._condition:
+ message = _verify_invocation_advance_and_update_state(
+ initial_metadata, payload, completion, allowance, self._state,
+ self._implementation)
+ if message is not None:
+ self._failed(message)
+ if self._todo is not None:
+ raise ValueError('TODO!!!')
+ elif _anything_in_flight(self._state):
+ return _IDLE_ON_ADVANCE
+ elif self._remaining_elements:
+ element = self._remaining_elements.pop(0)
+ if element.kind is _sequence.Element.Kind.INVOCATION_TRANSMISSION:
+ if element.transmission.initial_metadata:
+ initial_metadata = self._implementation.invocation_initial_metadata()
+ self._state.invocation_initial_metadata_in_fight = initial_metadata
+ else:
+ initial_metadata = None
+ if element.transmission.payload:
+ payload = _create_payload(self._randomness)
+ self._state.invocation_payloads_in_flight.append(payload)
+ self._state.invocation_side_invocation_allowance -= 1
+ else:
+ payload = None
+ if element.transmission.complete:
+ completion = self._implementation.invocation_completion()
+ self._state.invocation_completion_in_flight = completion
+ else:
+ completion = None
+ if (not self._state.service_completion_received and
+ 0 <= self._state.invocation_side_service_allowance):
+ allowance = 1
+ self._state.invocation_side_service_allowance += 1
+ self._state.service_allowance_in_flight += 1
+ else:
+ allowance = None
+ return OnAdvance(
+ OnAdvance.Kind.ADVANCE, initial_metadata, payload, completion,
+ allowance)
+ else:
+ raise ValueError('TODO!!!')
+ else:
+ return _IDLE_ON_ADVANCE
+
+ def service_on_termination(self, outcome):
+ with self._condition:
+ self._state.service_side_outcome = outcome
+ if self._todo is not None or self._remaining_elements:
+ self._failed('Premature service-side outcome %s!' % (outcome,))
+ elif outcome is not self._sequence.outcome.service:
+ self._failed(
+ 'Incorrect service-side outcome: %s should have been %s' % (
+ outcome, self._sequence.outcome.service))
+ elif self._state.invocation_side_outcome is not None:
+ self._passed(self._state.invocation_side_outcome, outcome)
+
+ def invocation_on_termination(self, outcome):
+ with self._condition:
+ self._state.invocation_side_outcome = outcome
+ if self._todo is not None or self._remaining_elements:
+ self._failed('Premature invocation-side outcome %s!' % (outcome,))
+ elif outcome is not self._sequence.outcome.invocation:
+ self._failed(
+ 'Incorrect invocation-side outcome: %s should have been %s' % (
+ outcome, self._sequence.outcome.invocation))
+ elif self._state.service_side_outcome is not None:
+ self._passed(outcome, self._state.service_side_outcome)
+
+
+class _SequenceControllerCreator(ControllerCreator):
+
+ def __init__(self, sequence):
+ self._sequence = sequence
+
+ def name(self):
+ return self._sequence.name
+
+ def controller(self, implementation, randomness):
+ return _SequenceController(self._sequence, implementation, randomness)
+
+
+CONTROLLER_CREATORS = tuple(
+ _SequenceControllerCreator(sequence) for sequence in _sequence.SEQUENCES)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py
new file mode 100644
index 0000000000..1d77aaebe6
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py
@@ -0,0 +1,168 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Part of the tests of the base interface of RPC Framework."""
+
+import collections
+import enum
+
+from grpc.framework.interfaces.base import base
+from grpc_test.framework.common import test_constants
+
+
+class Invocation(
+ collections.namedtuple(
+ 'Invocation', ('timeout', 'initial_metadata', 'payload', 'complete',))):
+ """A recipe for operation invocation.
+
+ Attributes:
+ timeout: A duration in seconds to pass to the system under test as the
+ operation's timeout value.
+ initial_metadata: A boolean indicating whether or not to pass initial
+ metadata when invoking the operation.
+ payload: A boolean indicating whether or not to pass a payload when
+ invoking the operation.
+ complete: A boolean indicating whether or not to indicate completion of
+ transmissions from the invoking side of the operation when invoking the
+ operation.
+ """
+
+
+class Transmission(
+ collections.namedtuple(
+ 'Transmission', ('initial_metadata', 'payload', 'complete',))):
+ """A recipe for a single transmission in an operation.
+
+ Attributes:
+ initial_metadata: A boolean indicating whether or not to pass initial
+ metadata as part of the transmission.
+ payload: A boolean indicating whether or not to pass a payload as part of
+ the transmission.
+ complete: A boolean indicating whether or not to indicate completion of
+ transmission from the transmitting side of the operation as part of the
+ transmission.
+ """
+
+
+class Intertransmission(
+ collections.namedtuple('Intertransmission', ('invocation', 'service',))):
+ """A recipe for multiple transmissions in an operation.
+
+ Attributes:
+ invocation: An integer describing the number of payloads to send from the
+ invocation side of the operation to the service side.
+ service: An integer describing the number of payloads to send from the
+ service side of the operation to the invocation side.
+ """
+
+
+class Element(collections.namedtuple('Element', ('kind', 'transmission',))):
+ """A sum type for steps to perform when testing an operation.
+
+ Attributes:
+ kind: A Kind value describing the kind of step to perform in the test.
+ transmission: Only valid for kinds Kind.INVOCATION_TRANSMISSION and
+ Kind.SERVICE_TRANSMISSION, a Transmission value describing the details of
+ the transmission to be made.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ INVOCATION_TRANSMISSION = 'invocation transmission'
+ SERVICE_TRANSMISSION = 'service transmission'
+ INTERTRANSMISSION = 'intertransmission'
+ INVOCATION_CANCEL = 'invocation cancel'
+ SERVICE_CANCEL = 'service cancel'
+ INVOCATION_FAILURE = 'invocation failure'
+ SERVICE_FAILURE = 'service failure'
+
+
+class Outcome(collections.namedtuple('Outcome', ('invocation', 'service',))):
+ """A description of the expected outcome of an operation test.
+
+ Attributes:
+ invocation: The base.Outcome value expected on the invocation side of the
+ operation.
+ service: The base.Outcome value expected on the service side of the
+ operation.
+ """
+
+
+class Sequence(
+ collections.namedtuple(
+ 'Sequence',
+ ('name', 'maximum_duration', 'invocation', 'elements', 'outcome',))):
+ """Describes at a high level steps to perform in a test.
+
+ Attributes:
+ name: The string name of the sequence.
+ maximum_duration: A length of time in seconds to allow for the test before
+ declaring it to have failed.
+ invocation: An Invocation value describing how to invoke the operation
+ under test.
+ elements: A sequence of Element values describing at coarse granularity
+ actions to take during the operation under test.
+ outcome: An Outcome value describing the expected outcome of the test.
+ """
+
+_EASY = Sequence(
+ 'Easy',
+ test_constants.TIME_ALLOWANCE,
+ Invocation(test_constants.LONG_TIMEOUT, True, True, True),
+ (
+ Element(
+ Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, True)),
+ ),
+ Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED))
+
+_PEASY = Sequence(
+ 'Peasy',
+ test_constants.TIME_ALLOWANCE,
+ Invocation(test_constants.LONG_TIMEOUT, True, True, False),
+ (
+ Element(
+ Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, False)),
+ Element(
+ Element.Kind.INVOCATION_TRANSMISSION,
+ Transmission(False, True, True)),
+ Element(
+ Element.Kind.SERVICE_TRANSMISSION, Transmission(False, True, True)),
+ ),
+ Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED))
+
+
+# TODO(issue 2959): Finish this test suite. This tuple of sequences should
+# contain at least the values in the Cartesian product of (half-duplex,
+# full-duplex) * (zero payloads, one payload, test_constants.STREAM_LENGTH
+# payloads) * (completion, cancellation, expiration, programming defect in
+# servicer code).
+SEQUENCES = (
+ _EASY,
+ _PEASY,
+)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_state.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_state.py
new file mode 100644
index 0000000000..21cf33aeb6
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_state.py
@@ -0,0 +1,55 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Part of the tests of the base interface of RPC Framework."""
+
+
+class OperationState(object):
+
+ def __init__(self):
+ self.invocation_initial_metadata_in_flight = None
+ self.invocation_initial_metadata_received = False
+ self.invocation_payloads_in_flight = []
+ self.invocation_payloads_received = 0
+ self.invocation_completion_in_flight = None
+ self.invocation_completion_received = False
+ self.service_initial_metadata_in_flight = None
+ self.service_initial_metadata_received = False
+ self.service_payloads_in_flight = []
+ self.service_payloads_received = 0
+ self.service_completion_in_flight = None
+ self.service_completion_received = False
+ self.invocation_side_invocation_allowance = 1
+ self.invocation_side_service_allowance = 1
+ self.service_side_invocation_allowance = 1
+ self.service_side_service_allowance = 1
+ self.invocation_allowance_in_flight = 0
+ self.service_allowance_in_flight = 0
+ self.invocation_side_outcome = None
+ self.service_side_outcome = None
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
new file mode 100644
index 0000000000..5c8b176da4
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
@@ -0,0 +1,262 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of the base interface of RPC Framework."""
+
+import logging
+import random
+import threading
+import time
+import unittest
+
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.base import utilities
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.base import _control
+from grpc_test.framework.interfaces.base import test_interfaces
+
+_SYNCHRONICITY_VARIATION = (('Sync', False), ('Async', True))
+
+_EMPTY_OUTCOME_DICT = {outcome: 0 for outcome in base.Outcome}
+
+
+class _Serialization(test_interfaces.Serialization):
+
+ def serialize_request(self, request):
+ return request + request
+
+ def deserialize_request(self, serialized_request):
+ return serialized_request[:len(serialized_request) / 2]
+
+ def serialize_response(self, response):
+ return response * 3
+
+ def deserialize_response(self, serialized_response):
+ return serialized_response[2 * len(serialized_response) / 3:]
+
+
+def _advance(quadruples, operator, controller):
+ try:
+ for quadruple in quadruples:
+ operator.advance(
+ initial_metadata=quadruple[0], payload=quadruple[1],
+ completion=quadruple[2], allowance=quadruple[3])
+ except Exception as e: # pylint: disable=broad-except
+ controller.failed('Exception on advance: %e' % e)
+
+
+class _Operator(base.Operator):
+
+ def __init__(self, controller, on_advance, pool, operator_under_test):
+ self._condition = threading.Condition()
+ self._controller = controller
+ self._on_advance = on_advance
+ self._pool = pool
+ self._operator_under_test = operator_under_test
+ self._pending_advances = []
+
+ def set_operator_under_test(self, operator_under_test):
+ with self._condition:
+ self._operator_under_test = operator_under_test
+ pent_advances = self._pending_advances
+ self._pending_advances = []
+ pool = self._pool
+ controller = self._controller
+
+ if pool is None:
+ _advance(pent_advances, operator_under_test, controller)
+ else:
+ pool.submit(_advance, pent_advances, operator_under_test, controller)
+
+ def advance(
+ self, initial_metadata=None, payload=None, completion=None,
+ allowance=None):
+ on_advance = self._on_advance(
+ initial_metadata, payload, completion, allowance)
+ if on_advance.kind is _control.OnAdvance.Kind.ADVANCE:
+ with self._condition:
+ pool = self._pool
+ operator_under_test = self._operator_under_test
+ controller = self._controller
+
+ quadruple = (
+ on_advance.initial_metadata, on_advance.payload,
+ on_advance.completion, on_advance.allowance)
+ if pool is None:
+ _advance((quadruple,), operator_under_test, controller)
+ else:
+ pool.submit(_advance, (quadruple,), operator_under_test, controller)
+ elif on_advance.kind is _control.OnAdvance.Kind.DEFECT:
+ raise ValueError(
+ 'Deliberately raised exception from Operator.advance (in a test)!')
+
+
+class _Servicer(base.Servicer):
+ """An base.Servicer with instrumented for testing."""
+
+ def __init__(self, group, method, controllers, pool):
+ self._condition = threading.Condition()
+ self._group = group
+ self._method = method
+ self._pool = pool
+ self._controllers = list(controllers)
+
+ def service(self, group, method, context, output_operator):
+ with self._condition:
+ controller = self._controllers.pop(0)
+ if group != self._group or method != self._method:
+ controller.fail(
+ '%s != %s or %s != %s' % (group, self._group, method, self._method))
+ raise base.NoSuchMethodError()
+ else:
+ operator = _Operator(
+ controller, controller.on_service_advance, self._pool,
+ output_operator)
+ outcome = context.add_termination_callback(
+ controller.service_on_termination)
+ if outcome is not None:
+ controller.service_on_termination(outcome)
+ return utilities.full_subscription(operator)
+
+
+class _OperationTest(unittest.TestCase):
+
+ def setUp(self):
+ if self._synchronicity_variation:
+ self._pool = logging_pool.pool(test_constants.POOL_SIZE)
+ else:
+ self._pool = None
+ self._controller = self._controller_creator.controller(
+ self._implementation, self._randomness)
+
+ def tearDown(self):
+ if self._synchronicity_variation:
+ self._pool.shutdown(wait=True)
+ else:
+ self._pool = None
+
+ def test_operation(self):
+ invocation = self._controller.invocation()
+ if invocation.subscription_kind is base.Subscription.Kind.FULL:
+ test_operator = _Operator(
+ self._controller, self._controller.on_invocation_advance,
+ self._pool, None)
+ subscription = utilities.full_subscription(test_operator)
+ else:
+ # TODO(nathaniel): support and test other subscription kinds.
+ self.fail('Non-full subscriptions not yet supported!')
+
+ servicer = _Servicer(
+ invocation.group, invocation.method, (self._controller,), self._pool)
+
+ invocation_end, service_end, memo = self._implementation.instantiate(
+ {(invocation.group, invocation.method): _Serialization()}, servicer)
+
+ try:
+ invocation_end.start()
+ service_end.start()
+ operation_context, operator_under_test = invocation_end.operate(
+ invocation.group, invocation.method, subscription, invocation.timeout,
+ initial_metadata=invocation.initial_metadata, payload=invocation.payload,
+ completion=invocation.completion)
+ test_operator.set_operator_under_test(operator_under_test)
+ outcome = operation_context.add_termination_callback(
+ self._controller.invocation_on_termination)
+ if outcome is not None:
+ self._controller.invocation_on_termination(outcome)
+ except Exception as e: # pylint: disable=broad-except
+ self._controller.failed('Exception on invocation: %s' % e)
+ self.fail(e)
+
+ while True:
+ instruction = self._controller.poll()
+ if instruction.kind is _control.Instruction.Kind.ADVANCE:
+ try:
+ test_operator.advance(
+ *instruction.advance_args, **instruction.advance_kwargs)
+ except Exception as e: # pylint: disable=broad-except
+ self._controller.failed('Exception on instructed advance: %s' % e)
+ elif instruction.kind is _control.Instruction.Kind.CANCEL:
+ try:
+ operation_context.cancel()
+ except Exception as e: # pylint: disable=broad-except
+ self._controller.failed('Exception on cancel: %s' % e)
+ elif instruction.kind is _control.Instruction.Kind.CONCLUDE:
+ break
+
+ invocation_stop_event = invocation_end.stop(0)
+ service_stop_event = service_end.stop(0)
+ invocation_stop_event.wait()
+ service_stop_event.wait()
+ invocation_stats = invocation_end.operation_stats()
+ service_stats = service_end.operation_stats()
+
+ self._implementation.destantiate(memo)
+
+ self.assertTrue(
+ instruction.conclude_success, msg=instruction.conclude_message)
+
+ expected_invocation_stats = dict(_EMPTY_OUTCOME_DICT)
+ expected_invocation_stats[instruction.conclude_invocation_outcome] += 1
+ self.assertDictEqual(expected_invocation_stats, invocation_stats)
+ expected_service_stats = dict(_EMPTY_OUTCOME_DICT)
+ expected_service_stats[instruction.conclude_service_outcome] += 1
+ self.assertDictEqual(expected_service_stats, service_stats)
+
+
+def test_cases(implementation):
+ """Creates unittest.TestCase classes for a given Base implementation.
+
+ Args:
+ implementation: A test_interfaces.Implementation specifying creation and
+ destruction of the Base implementation under test.
+
+ Returns:
+ A sequence of subclasses of unittest.TestCase defining tests of the
+ specified Base layer implementation.
+ """
+ random_seed = hash(time.time())
+ logging.warning('Random seed for this execution: %s', random_seed)
+ randomness = random.Random(x=random_seed)
+
+ test_case_classes = []
+ for synchronicity_variation in _SYNCHRONICITY_VARIATION:
+ for controller_creator in _control.CONTROLLER_CREATORS:
+ name = ''.join(
+ (synchronicity_variation[0], controller_creator.name(), 'Test',))
+ test_case_classes.append(
+ type(name, (_OperationTest,),
+ {'_implementation': implementation,
+ '_randomness': randomness,
+ '_synchronicity_variation': synchronicity_variation[1],
+ '_controller_creator': controller_creator,
+ }))
+
+ return test_case_classes
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_interfaces.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_interfaces.py
new file mode 100644
index 0000000000..02426ab846
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_interfaces.py
@@ -0,0 +1,186 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces used in tests of implementations of the Base layer."""
+
+import abc
+
+from grpc.framework.interfaces.base import base # pylint: disable=unused-import
+
+
+class Serialization(object):
+ """Specifies serialization and deserialization of test payloads."""
+ __metaclass__ = abc.ABCMeta
+
+ def serialize_request(self, request):
+ """Serializes a request value used in a test.
+
+ Args:
+ request: A request value created by a test.
+
+ Returns:
+ A bytestring that is the serialization of the given request.
+ """
+ raise NotImplementedError()
+
+ def deserialize_request(self, serialized_request):
+ """Deserializes a request value used in a test.
+
+ Args:
+ serialized_request: A bytestring that is the serialization of some request
+ used in a test.
+
+ Returns:
+ The request value encoded by the given bytestring.
+ """
+ raise NotImplementedError()
+
+ def serialize_response(self, response):
+ """Serializes a response value used in a test.
+
+ Args:
+ response: A response value created by a test.
+
+ Returns:
+ A bytestring that is the serialization of the given response.
+ """
+ raise NotImplementedError()
+
+ def deserialize_response(self, serialized_response):
+ """Deserializes a response value used in a test.
+
+ Args:
+ serialized_response: A bytestring that is the serialization of some
+ response used in a test.
+
+ Returns:
+ The response value encoded by the given bytestring.
+ """
+ raise NotImplementedError()
+
+
+class Implementation(object):
+ """Specifies an implementation of the Base layer."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def instantiate(self, serializations, servicer):
+ """Instantiates the Base layer implementation to be used in a test.
+
+ Args:
+ serializations: A dict from group-method pair to Serialization object
+ specifying how to serialize and deserialize payload values used in the
+ test.
+ servicer: A base.Servicer object to be called to service RPCs made during
+ the test.
+
+ Returns:
+ A sequence of length three the first element of which is a
+ base.End to be used to invoke RPCs, the second element of which is a
+ base.End to be used to service invoked RPCs, and the third element of
+ which is an arbitrary memo object to be kept and passed to destantiate
+ at the conclusion of the test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def destantiate(self, memo):
+ """Destroys the Base layer implementation under test.
+
+ Args:
+ memo: The object from the third position of the return value of a call to
+ instantiate.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation_initial_metadata(self):
+ """Provides an operation's invocation-side initial metadata.
+
+ Returns:
+ A value to use for an operation's invocation-side initial metadata, or
+ None.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def service_initial_metadata(self):
+ """Provices an operation's service-side initial metadata.
+
+ Returns:
+ A value to use for an operation's service-side initial metadata, or
+ None.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation_completion(self):
+ """Provides an operation's invocation-side completion.
+
+ Returns:
+ A base.Completion to use for an operation's invocation-side completion.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def service_completion(self):
+ """Provides an operation's service-side completion.
+
+ Returns:
+ A base.Completion to use for an operation's service-side completion.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ """Identifies whether or not metadata was properly transmitted.
+
+ Args:
+ original_metadata: A metadata value passed to the system under test.
+ transmitted_metadata: The same metadata value after having been
+ transmitted through the system under test.
+
+ Returns:
+ Whether or not the metadata was properly transmitted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def completion_transmitted(self, original_completion, transmitted_completion):
+ """Identifies whether or not a base.Completion was properly transmitted.
+
+ Args:
+ original_completion: A base.Completion passed to the system under test.
+ transmitted_completion: The same completion value after having been
+ transmitted through the system under test.
+
+ Returns:
+ Whether or not the completion was properly transmitted.
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/__init__.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
new file mode 100644
index 0000000000..857ad5cf3e
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -0,0 +1,250 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test code for the Face layer of RPC Framework."""
+
+import abc
+import unittest
+
+# test_interfaces is referenced from specification in this module.
+from grpc.framework.interfaces.face import face
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.common import test_control
+from grpc_test.framework.common import test_coverage
+from grpc_test.framework.interfaces.face import _digest
+from grpc_test.framework.interfaces.face import _stock_service
+from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
+
+
+class TestCase(test_coverage.Coverage, unittest.TestCase):
+ """A test of the Face layer of RPC Framework.
+
+ Concrete subclasses must have an "implementation" attribute of type
+ test_interfaces.Implementation and an "invoker_constructor" attribute of type
+ _invocation.InvokerConstructor.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ NAME = 'BlockingInvocationInlineServiceTest'
+
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self._control = test_control.PauseFailControl()
+ self._digest = _digest.digest(
+ _stock_service.STOCK_TEST_SERVICE, self._control, None)
+
+ generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
+ self._digest.methods, self._digest.inline_method_implementations, None)
+ self._invoker = self.invoker_constructor.construct_invoker(
+ generic_stub, dynamic_stubs, self._digest.methods)
+
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.implementation.destantiate(self._memo)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response = self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+
+ test_messages.verify(request, response, self)
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_iterator = self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ response = self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+
+ test_messages.verify(requests, response, self)
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ response_iterator = self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response = self._invoker.blocking(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+
+ test_messages.verify(first_request, first_response, self)
+
+ second_response = self._invoker.blocking(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+
+ test_messages.verify(second_request, second_response, self)
+
+ @unittest.skip('Parallel invocations impossible with blocking control flow!')
+ def testParallelInvocations(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Parallel invocations impossible with blocking control flow!')
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Cancellation impossible with blocking control flow!')
+ def testCancelledUnaryRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Cancellation impossible with blocking control flow!')
+ def testCancelledUnaryRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Cancellation impossible with blocking control flow!')
+ def testCancelledStreamRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Cancellation impossible with blocking control flow!')
+ def testCancelledStreamRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause(), self.assertRaises(
+ face.ExpirationError):
+ self._invoker.blocking(group, method)(
+ request, test_constants.SHORT_TIMEOUT)
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause(), self.assertRaises(
+ face.ExpirationError):
+ response_iterator = self._invoker.blocking(group, method)(
+ request, test_constants.SHORT_TIMEOUT)
+ list(response_iterator)
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause(), self.assertRaises(
+ face.ExpirationError):
+ self._invoker.blocking(group, method)(
+ iter(requests), test_constants.SHORT_TIMEOUT)
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause(), self.assertRaises(
+ face.ExpirationError):
+ response_iterator = self._invoker.blocking(group, method)(
+ iter(requests), test_constants.SHORT_TIMEOUT)
+ list(response_iterator)
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.fail(), self.assertRaises(face.RemoteError):
+ self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.fail(), self.assertRaises(face.RemoteError):
+ response_iterator = self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ list(response_iterator)
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.fail(), self.assertRaises(face.RemoteError):
+ self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+
+ def testFailedStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.fail(), self.assertRaises(face.RemoteError):
+ response_iterator = self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+ list(response_iterator)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_digest.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_digest.py
new file mode 100644
index 0000000000..da56ed7b27
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_digest.py
@@ -0,0 +1,444 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Code for making a service.TestService more amenable to use in tests."""
+
+import collections
+import threading
+
+# test_control, _service, and test_interfaces are referenced from specification
+# in this module.
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.foundation import stream
+from grpc.framework.foundation import stream_util
+from grpc.framework.interfaces.face import face
+from grpc_test.framework.common import test_control # pylint: disable=unused-import
+from grpc_test.framework.interfaces.face import _service # pylint: disable=unused-import
+from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
+
+_IDENTITY = lambda x: x
+
+
+class TestServiceDigest(
+ collections.namedtuple(
+ 'TestServiceDigest',
+ ('methods',
+ 'inline_method_implementations',
+ 'event_method_implementations',
+ 'multi_method_implementation',
+ 'unary_unary_messages_sequences',
+ 'unary_stream_messages_sequences',
+ 'stream_unary_messages_sequences',
+ 'stream_stream_messages_sequences',))):
+ """A transformation of a service.TestService.
+
+ Attributes:
+ methods: A dict from method group-name pair to test_interfaces.Method object
+ describing the RPC methods that may be called during the test.
+ inline_method_implementations: A dict from method group-name pair to
+ face.MethodImplementation object to be used in tests of in-line calls to
+ behaviors under test.
+ event_method_implementations: A dict from method group-name pair to
+ face.MethodImplementation object to be used in tests of event-driven calls
+ to behaviors under test.
+ multi_method_implementation: A face.MultiMethodImplementation to be used in
+ tests of generic calls to behaviors under test.
+ unary_unary_messages_sequences: A dict from method group-name pair to
+ sequence of service.UnaryUnaryTestMessages objects to be used to test the
+ identified method.
+ unary_stream_messages_sequences: A dict from method group-name pair to
+ sequence of service.UnaryStreamTestMessages objects to be used to test the
+ identified method.
+ stream_unary_messages_sequences: A dict from method group-name pair to
+ sequence of service.StreamUnaryTestMessages objects to be used to test the
+ identified method.
+ stream_stream_messages_sequences: A dict from method group-name pair to
+ sequence of service.StreamStreamTestMessages objects to be used to test
+ the identified method.
+ """
+
+
+class _BufferingConsumer(stream.Consumer):
+ """A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
+
+ def __init__(self):
+ self.consumed = []
+ self.terminated = False
+
+ def consume(self, value):
+ self.consumed.append(value)
+
+ def terminate(self):
+ self.terminated = True
+
+ def consume_and_terminate(self, value):
+ self.consumed.append(value)
+ self.terminated = True
+
+
+class _InlineUnaryUnaryMethod(face.MethodImplementation):
+
+ def __init__(self, unary_unary_test_method, control):
+ self._test_method = unary_unary_test_method
+ self._control = control
+
+ self.cardinality = cardinality.Cardinality.UNARY_UNARY
+ self.style = style.Service.INLINE
+
+ def unary_unary_inline(self, request, context):
+ response_list = []
+ self._test_method.service(
+ request, response_list.append, context, self._control)
+ return response_list.pop(0)
+
+
+class _EventUnaryUnaryMethod(face.MethodImplementation):
+
+ def __init__(self, unary_unary_test_method, control, pool):
+ self._test_method = unary_unary_test_method
+ self._control = control
+ self._pool = pool
+
+ self.cardinality = cardinality.Cardinality.UNARY_UNARY
+ self.style = style.Service.EVENT
+
+ def unary_unary_event(self, request, response_callback, context):
+ if self._pool is None:
+ self._test_method.service(
+ request, response_callback, context, self._control)
+ else:
+ self._pool.submit(
+ self._test_method.service, request, response_callback, context,
+ self._control)
+
+
+class _InlineUnaryStreamMethod(face.MethodImplementation):
+
+ def __init__(self, unary_stream_test_method, control):
+ self._test_method = unary_stream_test_method
+ self._control = control
+
+ self.cardinality = cardinality.Cardinality.UNARY_STREAM
+ self.style = style.Service.INLINE
+
+ def unary_stream_inline(self, request, context):
+ response_consumer = _BufferingConsumer()
+ self._test_method.service(
+ request, response_consumer, context, self._control)
+ for response in response_consumer.consumed:
+ yield response
+
+
+class _EventUnaryStreamMethod(face.MethodImplementation):
+
+ def __init__(self, unary_stream_test_method, control, pool):
+ self._test_method = unary_stream_test_method
+ self._control = control
+ self._pool = pool
+
+ self.cardinality = cardinality.Cardinality.UNARY_STREAM
+ self.style = style.Service.EVENT
+
+ def unary_stream_event(self, request, response_consumer, context):
+ if self._pool is None:
+ self._test_method.service(
+ request, response_consumer, context, self._control)
+ else:
+ self._pool.submit(
+ self._test_method.service, request, response_consumer, context,
+ self._control)
+
+
+class _InlineStreamUnaryMethod(face.MethodImplementation):
+
+ def __init__(self, stream_unary_test_method, control):
+ self._test_method = stream_unary_test_method
+ self._control = control
+
+ self.cardinality = cardinality.Cardinality.STREAM_UNARY
+ self.style = style.Service.INLINE
+
+ def stream_unary_inline(self, request_iterator, context):
+ response_list = []
+ request_consumer = self._test_method.service(
+ response_list.append, context, self._control)
+ for request in request_iterator:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ return response_list.pop(0)
+
+
+class _EventStreamUnaryMethod(face.MethodImplementation):
+
+ def __init__(self, stream_unary_test_method, control, pool):
+ self._test_method = stream_unary_test_method
+ self._control = control
+ self._pool = pool
+
+ self.cardinality = cardinality.Cardinality.STREAM_UNARY
+ self.style = style.Service.EVENT
+
+ def stream_unary_event(self, response_callback, context):
+ request_consumer = self._test_method.service(
+ response_callback, context, self._control)
+ if self._pool is None:
+ return request_consumer
+ else:
+ return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _InlineStreamStreamMethod(face.MethodImplementation):
+
+ def __init__(self, stream_stream_test_method, control):
+ self._test_method = stream_stream_test_method
+ self._control = control
+
+ self.cardinality = cardinality.Cardinality.STREAM_STREAM
+ self.style = style.Service.INLINE
+
+ def stream_stream_inline(self, request_iterator, context):
+ response_consumer = _BufferingConsumer()
+ request_consumer = self._test_method.service(
+ response_consumer, context, self._control)
+
+ for request in request_iterator:
+ request_consumer.consume(request)
+ while response_consumer.consumed:
+ yield response_consumer.consumed.pop(0)
+ response_consumer.terminate()
+
+
+class _EventStreamStreamMethod(face.MethodImplementation):
+
+ def __init__(self, stream_stream_test_method, control, pool):
+ self._test_method = stream_stream_test_method
+ self._control = control
+ self._pool = pool
+
+ self.cardinality = cardinality.Cardinality.STREAM_STREAM
+ self.style = style.Service.EVENT
+
+ def stream_stream_event(self, response_consumer, context):
+ request_consumer = self._test_method.service(
+ response_consumer, context, self._control)
+ if self._pool is None:
+ return request_consumer
+ else:
+ return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _UnaryConsumer(stream.Consumer):
+ """A Consumer that only allows consumption of exactly one value."""
+
+ def __init__(self, action):
+ self._lock = threading.Lock()
+ self._action = action
+ self._consumed = False
+ self._terminated = False
+
+ def consume(self, value):
+ with self._lock:
+ if self._consumed:
+ raise ValueError('Unary consumer already consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._consumed = True
+
+ self._action(value)
+
+ def terminate(self):
+ with self._lock:
+ if not self._consumed:
+ raise ValueError('Unary consumer hasn\'t yet consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._terminated = True
+
+ def consume_and_terminate(self, value):
+ with self._lock:
+ if self._consumed:
+ raise ValueError('Unary consumer already consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._consumed = True
+ self._terminated = True
+
+ self._action(value)
+
+
+class _UnaryUnaryAdaptation(object):
+
+ def __init__(self, unary_unary_test_method):
+ self._method = unary_unary_test_method
+
+ def service(self, response_consumer, context, control):
+ def action(request):
+ self._method.service(
+ request, response_consumer.consume_and_terminate, context, control)
+ return _UnaryConsumer(action)
+
+
+class _UnaryStreamAdaptation(object):
+
+ def __init__(self, unary_stream_test_method):
+ self._method = unary_stream_test_method
+
+ def service(self, response_consumer, context, control):
+ def action(request):
+ self._method.service(request, response_consumer, context, control)
+ return _UnaryConsumer(action)
+
+
+class _StreamUnaryAdaptation(object):
+
+ def __init__(self, stream_unary_test_method):
+ self._method = stream_unary_test_method
+
+ def service(self, response_consumer, context, control):
+ return self._method.service(
+ response_consumer.consume_and_terminate, context, control)
+
+
+class _MultiMethodImplementation(face.MultiMethodImplementation):
+
+ def __init__(self, methods, control, pool):
+ self._methods = methods
+ self._control = control
+ self._pool = pool
+
+ def service(self, group, name, response_consumer, context):
+ method = self._methods.get(group, name, None)
+ if method is None:
+ raise face.NoSuchMethodError(group, name)
+ elif self._pool is None:
+ return method(response_consumer, context, self._control)
+ else:
+ request_consumer = method(response_consumer, context, self._control)
+ return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _Assembly(
+ collections.namedtuple(
+ '_Assembly',
+ ['methods', 'inlines', 'events', 'adaptations', 'messages'])):
+ """An intermediate structure created when creating a TestServiceDigest."""
+
+
+def _assemble(
+ scenarios, identifiers, inline_method_constructor, event_method_constructor,
+ adapter, control, pool):
+ """Creates an _Assembly from the given scenarios."""
+ methods = {}
+ inlines = {}
+ events = {}
+ adaptations = {}
+ messages = {}
+ for identifier, scenario in scenarios.iteritems():
+ if identifier in identifiers:
+ raise ValueError('Repeated identifier "(%s, %s)"!' % identifier)
+
+ test_method = scenario[0]
+ inline_method = inline_method_constructor(test_method, control)
+ event_method = event_method_constructor(test_method, control, pool)
+ adaptation = adapter(test_method)
+
+ methods[identifier] = test_method
+ inlines[identifier] = inline_method
+ events[identifier] = event_method
+ adaptations[identifier] = adaptation
+ messages[identifier] = scenario[1]
+
+ return _Assembly(methods, inlines, events, adaptations, messages)
+
+
+def digest(service, control, pool):
+ """Creates a TestServiceDigest from a TestService.
+
+ Args:
+ service: A _service.TestService.
+ control: A test_control.Control.
+ pool: If RPC methods should be serviced in a separate thread, a thread pool.
+ None if RPC methods should be serviced in the thread belonging to the
+ run-time that calls for their service.
+
+ Returns:
+ A TestServiceDigest synthesized from the given service.TestService.
+ """
+ identifiers = set()
+
+ unary_unary = _assemble(
+ service.unary_unary_scenarios(), identifiers, _InlineUnaryUnaryMethod,
+ _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool)
+ identifiers.update(unary_unary.inlines)
+
+ unary_stream = _assemble(
+ service.unary_stream_scenarios(), identifiers, _InlineUnaryStreamMethod,
+ _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool)
+ identifiers.update(unary_stream.inlines)
+
+ stream_unary = _assemble(
+ service.stream_unary_scenarios(), identifiers, _InlineStreamUnaryMethod,
+ _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool)
+ identifiers.update(stream_unary.inlines)
+
+ stream_stream = _assemble(
+ service.stream_stream_scenarios(), identifiers, _InlineStreamStreamMethod,
+ _EventStreamStreamMethod, _IDENTITY, control, pool)
+ identifiers.update(stream_stream.inlines)
+
+ methods = dict(unary_unary.methods)
+ methods.update(unary_stream.methods)
+ methods.update(stream_unary.methods)
+ methods.update(stream_stream.methods)
+ adaptations = dict(unary_unary.adaptations)
+ adaptations.update(unary_stream.adaptations)
+ adaptations.update(stream_unary.adaptations)
+ adaptations.update(stream_stream.adaptations)
+ inlines = dict(unary_unary.inlines)
+ inlines.update(unary_stream.inlines)
+ inlines.update(stream_unary.inlines)
+ inlines.update(stream_stream.inlines)
+ events = dict(unary_unary.events)
+ events.update(unary_stream.events)
+ events.update(stream_unary.events)
+ events.update(stream_stream.events)
+
+ return TestServiceDigest(
+ methods,
+ inlines,
+ events,
+ _MultiMethodImplementation(adaptations, control, pool),
+ unary_unary.messages,
+ unary_stream.messages,
+ stream_unary.messages,
+ stream_stream.messages)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py
new file mode 100644
index 0000000000..ea5cdeaea3
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py
@@ -0,0 +1,377 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test code for the Face layer of RPC Framework."""
+
+import abc
+import unittest
+
+# test_interfaces is referenced from specification in this module.
+from grpc.framework.interfaces.face import face
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.common import test_control
+from grpc_test.framework.common import test_coverage
+from grpc_test.framework.interfaces.face import _digest
+from grpc_test.framework.interfaces.face import _receiver
+from grpc_test.framework.interfaces.face import _stock_service
+from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
+
+
+class TestCase(test_coverage.Coverage, unittest.TestCase):
+ """A test of the Face layer of RPC Framework.
+
+ Concrete subclasses must have an "implementation" attribute of type
+ test_interfaces.Implementation and an "invoker_constructor" attribute of type
+ _invocation.InvokerConstructor.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ NAME = 'EventInvocationSynchronousEventServiceTest'
+
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self._control = test_control.PauseFailControl()
+ self._digest = _digest.digest(
+ _stock_service.STOCK_TEST_SERVICE, self._control, None)
+
+ generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
+ self._digest.methods, self._digest.event_method_implementations, None)
+ self._invoker = self.invoker_constructor.construct_invoker(
+ generic_stub, dynamic_stubs, self._digest.methods)
+
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.implementation.destantiate(self._memo)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ receiver = _receiver.Receiver()
+
+ self._invoker.event(group, method)(
+ request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+ receiver.block_until_terminated()
+ response = receiver.unary_response()
+
+ test_messages.verify(request, response, self)
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ receiver = _receiver.Receiver()
+
+ self._invoker.event(group, method)(
+ request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+ receiver.block_until_terminated()
+ responses = receiver.stream_responses()
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ receiver = _receiver.Receiver()
+
+ call_consumer = self._invoker.event(group, method)(
+ receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+ for request in requests:
+ call_consumer.consume(request)
+ call_consumer.terminate()
+ receiver.block_until_terminated()
+ response = receiver.unary_response()
+
+ test_messages.verify(requests, response, self)
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ receiver = _receiver.Receiver()
+
+ call_consumer = self._invoker.event(group, method)(
+ receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+ for request in requests:
+ call_consumer.consume(request)
+ call_consumer.terminate()
+ receiver.block_until_terminated()
+ responses = receiver.stream_responses()
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ # pylint: disable=cell-var-from-loop
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+ second_receiver = _receiver.Receiver()
+
+ def make_second_invocation():
+ self._invoker.event(group, method)(
+ second_request, second_receiver, second_receiver.abort,
+ test_constants.LONG_TIMEOUT)
+
+ class FirstReceiver(_receiver.Receiver):
+
+ def complete(self, terminal_metadata, code, details):
+ super(FirstReceiver, self).complete(
+ terminal_metadata, code, details)
+ make_second_invocation()
+
+ first_receiver = FirstReceiver()
+
+ self._invoker.event(group, method)(
+ first_request, first_receiver, first_receiver.abort,
+ test_constants.LONG_TIMEOUT)
+ second_receiver.block_until_terminated()
+
+ first_response = first_receiver.unary_response()
+ second_response = second_receiver.unary_response()
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
+ def testParallelInvocations(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ first_receiver = _receiver.Receiver()
+ second_request = test_messages.request()
+ second_receiver = _receiver.Receiver()
+
+ self._invoker.event(group, method)(
+ first_request, first_receiver, first_receiver.abort,
+ test_constants.LONG_TIMEOUT)
+ self._invoker.event(group, method)(
+ second_request, second_receiver, second_receiver.abort,
+ test_constants.LONG_TIMEOUT)
+ first_receiver.block_until_terminated()
+ second_receiver.block_until_terminated()
+
+ first_response = first_receiver.unary_response()
+ second_response = second_receiver.unary_response()
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
+ @unittest.skip('TODO(nathaniel): implement.')
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ raise NotImplementedError()
+
+ def testCancelledUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ receiver = _receiver.Receiver()
+
+ with self._control.pause():
+ call = self._invoker.event(group, method)(
+ request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+ call.cancel()
+ receiver.block_until_terminated()
+
+ self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
+
+ def testCancelledUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ receiver = _receiver.Receiver()
+
+ call = self._invoker.event(group, method)(
+ request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+ call.cancel()
+ receiver.block_until_terminated()
+
+ self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
+
+ def testCancelledStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ receiver = _receiver.Receiver()
+
+ call_consumer = self._invoker.event(group, method)(
+ receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+ for request in requests:
+ call_consumer.consume(request)
+ call_consumer.cancel()
+ receiver.block_until_terminated()
+
+ self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
+
+ def testCancelledStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_stream_messages_sequences.iteritems()):
+ for unused_test_messages in test_messages_sequence:
+ receiver = _receiver.Receiver()
+
+ call_consumer = self._invoker.event(group, method)(
+ receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+ call_consumer.cancel()
+ receiver.block_until_terminated()
+
+ self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ receiver = _receiver.Receiver()
+
+ with self._control.pause():
+ self._invoker.event(group, method)(
+ request, receiver, receiver.abort, test_constants.SHORT_TIMEOUT)
+ receiver.block_until_terminated()
+
+ self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ receiver = _receiver.Receiver()
+
+ with self._control.pause():
+ self._invoker.event(group, method)(
+ request, receiver, receiver.abort, test_constants.SHORT_TIMEOUT)
+ receiver.block_until_terminated()
+
+ self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_unary_messages_sequences.iteritems()):
+ for unused_test_messages in test_messages_sequence:
+ receiver = _receiver.Receiver()
+
+ self._invoker.event(group, method)(
+ receiver, receiver.abort, test_constants.SHORT_TIMEOUT)
+ receiver.block_until_terminated()
+
+ self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ receiver = _receiver.Receiver()
+
+ call_consumer = self._invoker.event(group, method)(
+ receiver, receiver.abort, test_constants.SHORT_TIMEOUT)
+ for request in requests:
+ call_consumer.consume(request)
+ receiver.block_until_terminated()
+
+ self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ receiver = _receiver.Receiver()
+
+ with self._control.fail():
+ self._invoker.event(group, method)(
+ request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+ receiver.block_until_terminated()
+
+ self.assertIs(
+ face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ receiver = _receiver.Receiver()
+
+ with self._control.fail():
+ self._invoker.event(group, method)(
+ request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+ receiver.block_until_terminated()
+
+ self.assertIs(
+ face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ receiver = _receiver.Receiver()
+
+ with self._control.fail():
+ call_consumer = self._invoker.event(group, method)(
+ receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+ for request in requests:
+ call_consumer.consume(request)
+ call_consumer.terminate()
+ receiver.block_until_terminated()
+
+ self.assertIs(
+ face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
+
+ def testFailedStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ receiver = _receiver.Receiver()
+
+ with self._control.fail():
+ call_consumer = self._invoker.event(group, method)(
+ receiver, receiver.abort, test_constants.LONG_TIMEOUT)
+ for request in requests:
+ call_consumer.consume(request)
+ call_consumer.terminate()
+ receiver.block_until_terminated()
+
+ self.assertIs(
+ face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
new file mode 100644
index 0000000000..a649362cef
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -0,0 +1,378 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test code for the Face layer of RPC Framework."""
+
+import abc
+import contextlib
+import threading
+import unittest
+
+# test_interfaces is referenced from specification in this module.
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.face import face
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.common import test_control
+from grpc_test.framework.common import test_coverage
+from grpc_test.framework.interfaces.face import _digest
+from grpc_test.framework.interfaces.face import _stock_service
+from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
+
+
+class _PauseableIterator(object):
+
+ def __init__(self, upstream):
+ self._upstream = upstream
+ self._condition = threading.Condition()
+ self._paused = False
+
+ @contextlib.contextmanager
+ def pause(self):
+ with self._condition:
+ self._paused = True
+ yield
+ with self._condition:
+ self._paused = False
+ self._condition.notify_all()
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ with self._condition:
+ while self._paused:
+ self._condition.wait()
+ return next(self._upstream)
+
+
+class TestCase(test_coverage.Coverage, unittest.TestCase):
+ """A test of the Face layer of RPC Framework.
+
+ Concrete subclasses must have an "implementation" attribute of type
+ test_interfaces.Implementation and an "invoker_constructor" attribute of type
+ _invocation.InvokerConstructor.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ NAME = 'FutureInvocationAsynchronousEventServiceTest'
+
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self._control = test_control.PauseFailControl()
+ self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE)
+ self._digest = _digest.digest(
+ _stock_service.STOCK_TEST_SERVICE, self._control, self._digest_pool)
+
+ generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
+ self._digest.methods, self._digest.event_method_implementations, None)
+ self._invoker = self.invoker_constructor.construct_invoker(
+ generic_stub, dynamic_stubs, self._digest.methods)
+
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.implementation.destantiate(self._memo)
+ self._digest_pool.shutdown(wait=True)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ response = response_future.result()
+
+ test_messages.verify(request, response, self)
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_iterator = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ request_iterator = _PauseableIterator(iter(requests))
+
+ # Use of a paused iterator of requests allows us to test that control is
+ # returned to calling code before the iterator yields any requests.
+ with request_iterator.pause():
+ response_future = self._invoker.future(group, method)(
+ request_iterator, test_constants.LONG_TIMEOUT)
+ response = response_future.result()
+
+ test_messages.verify(requests, response, self)
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ request_iterator = _PauseableIterator(iter(requests))
+
+ # Use of a paused iterator of requests allows us to test that control is
+ # returned to calling code before the iterator yields any requests.
+ with request_iterator.pause():
+ response_iterator = self._invoker.future(group, method)(
+ request_iterator, test_constants.LONG_TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self._invoker.future(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+ first_response = first_response_future.result()
+
+ test_messages.verify(first_request, first_response, self)
+
+ second_response_future = self._invoker.future(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+ second_response = second_response_future.result()
+
+ test_messages.verify(second_request, second_response, self)
+
+ def testParallelInvocations(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self._invoker.future(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+ second_response_future = self._invoker.future(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+ first_response = first_response_future.result()
+ second_response = second_response_future.result()
+
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
+ @unittest.skip('TODO(nathaniel): implement.')
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ raise NotImplementedError()
+
+ def testCancelledUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause():
+ response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ cancel_method_return_value = response_future.cancel()
+
+ self.assertFalse(cancel_method_return_value)
+ self.assertTrue(response_future.cancelled())
+
+ def testCancelledUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause():
+ response_iterator = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ response_iterator.cancel()
+
+ with self.assertRaises(face.CancellationError):
+ next(response_iterator)
+
+ def testCancelledStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause():
+ response_future = self._invoker.future(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+ cancel_method_return_value = response_future.cancel()
+
+ self.assertFalse(cancel_method_return_value)
+ self.assertTrue(response_future.cancelled())
+
+ def testCancelledStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause():
+ response_iterator = self._invoker.future(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+ response_iterator.cancel()
+
+ with self.assertRaises(face.CancellationError):
+ next(response_iterator)
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause():
+ response_future = self._invoker.future(
+ group, method)(request, test_constants.SHORT_TIMEOUT)
+ self.assertIsInstance(
+ response_future.exception(), face.ExpirationError)
+ with self.assertRaises(face.ExpirationError):
+ response_future.result()
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause():
+ response_iterator = self._invoker.future(group, method)(
+ request, test_constants.SHORT_TIMEOUT)
+ with self.assertRaises(face.ExpirationError):
+ list(response_iterator)
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause():
+ response_future = self._invoker.future(group, method)(
+ iter(requests), test_constants.SHORT_TIMEOUT)
+ self.assertIsInstance(
+ response_future.exception(), face.ExpirationError)
+ with self.assertRaises(face.ExpirationError):
+ response_future.result()
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause():
+ response_iterator = self._invoker.future(group, method)(
+ iter(requests), test_constants.SHORT_TIMEOUT)
+ with self.assertRaises(face.ExpirationError):
+ list(response_iterator)
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.fail():
+ response_future = self._invoker.future(group, method)(
+ request, test_constants.SHORT_TIMEOUT)
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is
+ # indistinguishable from simply not having called its
+ # response_callback before the expiration of the RPC.
+ self.assertIsInstance(
+ response_future.exception(), face.ExpirationError)
+ with self.assertRaises(face.ExpirationError):
+ response_future.result()
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_consumer before the
+ # expiration of the RPC.
+ with self._control.fail(), self.assertRaises(face.ExpirationError):
+ response_iterator = self._invoker.future(group, method)(
+ request, test_constants.SHORT_TIMEOUT)
+ list(response_iterator)
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.fail():
+ response_future = self._invoker.future(group, method)(
+ iter(requests), test_constants.SHORT_TIMEOUT)
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is
+ # indistinguishable from simply not having called its
+ # response_callback before the expiration of the RPC.
+ self.assertIsInstance(
+ response_future.exception(), face.ExpirationError)
+ with self.assertRaises(face.ExpirationError):
+ response_future.result()
+
+ def testFailedStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_consumer before the
+ # expiration of the RPC.
+ with self._control.fail(), self.assertRaises(face.ExpirationError):
+ response_iterator = self._invoker.future(group, method)(
+ iter(requests), test_constants.SHORT_TIMEOUT)
+ list(response_iterator)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_invocation.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_invocation.py
new file mode 100644
index 0000000000..448e845a08
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_invocation.py
@@ -0,0 +1,213 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Coverage across the Face layer's generic-to-dynamic range for invocation."""
+
+import abc
+
+from grpc.framework.common import cardinality
+
+_CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR = {
+ cardinality.Cardinality.UNARY_UNARY: 'blocking_unary_unary',
+ cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream',
+ cardinality.Cardinality.STREAM_UNARY: 'blocking_stream_unary',
+ cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream',
+}
+
+_CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR = {
+ cardinality.Cardinality.UNARY_UNARY: 'future_unary_unary',
+ cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream',
+ cardinality.Cardinality.STREAM_UNARY: 'future_stream_unary',
+ cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream',
+}
+
+_CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR = {
+ cardinality.Cardinality.UNARY_UNARY: 'event_unary_unary',
+ cardinality.Cardinality.UNARY_STREAM: 'event_unary_stream',
+ cardinality.Cardinality.STREAM_UNARY: 'event_stream_unary',
+ cardinality.Cardinality.STREAM_STREAM: 'event_stream_stream',
+}
+
+_CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE = {
+ cardinality.Cardinality.UNARY_UNARY: 'unary_unary',
+ cardinality.Cardinality.UNARY_STREAM: 'unary_stream',
+ cardinality.Cardinality.STREAM_UNARY: 'stream_unary',
+ cardinality.Cardinality.STREAM_STREAM: 'stream_stream',
+}
+
+
+class Invoker(object):
+ """A type used to invoke test RPCs."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def blocking(self, group, name):
+ """Invokes an RPC with blocking control flow."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def future(self, group, name):
+ """Invokes an RPC with future control flow."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event(self, group, name):
+ """Invokes an RPC with event control flow."""
+ raise NotImplementedError()
+
+
+class InvokerConstructor(object):
+ """A type used to create Invokers."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def name(self):
+ """Specifies the name of the Invoker constructed by this object."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def construct_invoker(self, generic_stub, dynamic_stubs, methods):
+ """Constructs an Invoker for the given stubs and methods."""
+ raise NotImplementedError()
+
+
+class _GenericInvoker(Invoker):
+
+ def __init__(self, generic_stub, methods):
+ self._stub = generic_stub
+ self._methods = methods
+
+ def _behavior(self, group, name, cardinality_to_generic_method):
+ method_cardinality = self._methods[group, name].cardinality()
+ behavior = getattr(
+ self._stub, cardinality_to_generic_method[method_cardinality])
+ return lambda *args, **kwargs: behavior(group, name, *args, **kwargs)
+
+ def blocking(self, group, name):
+ return self._behavior(
+ group, name, _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR)
+
+ def future(self, group, name):
+ return self._behavior(group, name, _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR)
+
+ def event(self, group, name):
+ return self._behavior(group, name, _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR)
+
+
+class _GenericInvokerConstructor(InvokerConstructor):
+
+ def name(self):
+ return 'GenericInvoker'
+
+ def construct_invoker(self, generic_stub, dynamic_stub, methods):
+ return _GenericInvoker(generic_stub, methods)
+
+
+class _MultiCallableInvoker(Invoker):
+
+ def __init__(self, generic_stub, methods):
+ self._stub = generic_stub
+ self._methods = methods
+
+ def _multi_callable(self, group, name):
+ method_cardinality = self._methods[group, name].cardinality()
+ behavior = getattr(
+ self._stub,
+ _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
+ return behavior(group, name)
+
+ def blocking(self, group, name):
+ return self._multi_callable(group, name)
+
+ def future(self, group, name):
+ method_cardinality = self._methods[group, name].cardinality()
+ behavior = getattr(
+ self._stub,
+ _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
+ if method_cardinality in (
+ cardinality.Cardinality.UNARY_UNARY,
+ cardinality.Cardinality.STREAM_UNARY):
+ return behavior(group, name).future
+ else:
+ return behavior(group, name)
+
+ def event(self, group, name):
+ return self._multi_callable(group, name).event
+
+
+class _MultiCallableInvokerConstructor(InvokerConstructor):
+
+ def name(self):
+ return 'MultiCallableInvoker'
+
+ def construct_invoker(self, generic_stub, dynamic_stub, methods):
+ return _MultiCallableInvoker(generic_stub, methods)
+
+
+class _DynamicInvoker(Invoker):
+
+ def __init__(self, dynamic_stubs, methods):
+ self._stubs = dynamic_stubs
+ self._methods = methods
+
+ def blocking(self, group, name):
+ return getattr(self._stubs[group], name)
+
+ def future(self, group, name):
+ if self._methods[group, name].cardinality() in (
+ cardinality.Cardinality.UNARY_UNARY,
+ cardinality.Cardinality.STREAM_UNARY):
+ return getattr(self._stubs[group], name).future
+ else:
+ return getattr(self._stubs[group], name)
+
+ def event(self, group, name):
+ return getattr(self._stubs[group], name).event
+
+
+class _DynamicInvokerConstructor(InvokerConstructor):
+
+ def name(self):
+ return 'DynamicInvoker'
+
+ def construct_invoker(self, generic_stub, dynamic_stubs, methods):
+ return _DynamicInvoker(dynamic_stubs, methods)
+
+
+def invoker_constructors():
+ """Creates a sequence of InvokerConstructors to use in tests of RPCs.
+
+ Returns:
+ A sequence of InvokerConstructors.
+ """
+ return (
+ _GenericInvokerConstructor(),
+ _MultiCallableInvokerConstructor(),
+ _DynamicInvokerConstructor(),
+ )
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_receiver.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_receiver.py
new file mode 100644
index 0000000000..2e444ff09d
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_receiver.py
@@ -0,0 +1,95 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A utility useful in tests of asynchronous, event-driven interfaces."""
+
+import threading
+
+from grpc.framework.interfaces.face import face
+
+
+class Receiver(face.ResponseReceiver):
+ """A utility object useful in tests of asynchronous code."""
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._initial_metadata = None
+ self._responses = []
+ self._terminal_metadata = None
+ self._code = None
+ self._details = None
+ self._completed = False
+ self._abortion = None
+
+ def abort(self, abortion):
+ with self._condition:
+ self._abortion = abortion
+ self._condition.notify_all()
+
+ def initial_metadata(self, initial_metadata):
+ with self._condition:
+ self._initial_metadata = initial_metadata
+
+ def response(self, response):
+ with self._condition:
+ self._responses.append(response)
+
+ def complete(self, terminal_metadata, code, details):
+ with self._condition:
+ self._terminal_metadata = terminal_metadata
+ self._code = code
+ self._details = details
+ self._completed = True
+ self._condition.notify_all()
+
+ def block_until_terminated(self):
+ with self._condition:
+ while self._abortion is None and not self._completed:
+ self._condition.wait()
+
+ def unary_response(self):
+ with self._condition:
+ if self._abortion is not None:
+ raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+ elif len(self._responses) != 1:
+ raise AssertionError(
+ '%d responses received, not exactly one!', len(self._responses))
+ else:
+ return self._responses[0]
+
+ def stream_responses(self):
+ with self._condition:
+ if self._abortion is None:
+ return list(self._responses)
+ else:
+ raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+
+ def abortion(self):
+ with self._condition:
+ return self._abortion
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_service.py
new file mode 100644
index 0000000000..e25b8a038c
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_service.py
@@ -0,0 +1,332 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Private interfaces implemented by data sets used in Face-layer tests."""
+
+import abc
+
+# face is referenced from specification in this module.
+from grpc.framework.interfaces.face import face # pylint: disable=unused-import
+from grpc_test.framework.interfaces.face import test_interfaces
+
+
+class UnaryUnaryTestMethodImplementation(test_interfaces.Method):
+ """A controllable implementation of a unary-unary method."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, request, response_callback, context, control):
+ """Services an RPC that accepts one message and produces one message.
+
+ Args:
+ request: The single request message for the RPC.
+ response_callback: A callback to be called to accept the response message
+ of the RPC.
+ context: An face.ServicerContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class UnaryUnaryTestMessages(object):
+ """A type for unary-request-unary-response message pairings."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def request(self):
+ """Affords a request message.
+
+ Implementations of this method should return a different message with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A request message.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, request, response, test_case):
+ """Verifies that the computed response matches the given request.
+
+ Args:
+ request: A request message.
+ response: A response message.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the request and response do not match, indicating that
+ there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamTestMethodImplementation(test_interfaces.Method):
+ """A controllable implementation of a unary-stream method."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, request, response_consumer, context, control):
+ """Services an RPC that takes one message and produces a stream of messages.
+
+ Args:
+ request: The single request message for the RPC.
+ response_consumer: A stream.Consumer to be called to accept the response
+ messages of the RPC.
+ context: A face.ServicerContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamTestMessages(object):
+ """A type for unary-request-stream-response message pairings."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def request(self):
+ """Affords a request message.
+
+ Implementations of this method should return a different message with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A request message.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, request, responses, test_case):
+ """Verifies that the computed responses match the given request.
+
+ Args:
+ request: A request message.
+ responses: A sequence of response messages.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the request and responses do not match, indicating that
+ there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryTestMethodImplementation(test_interfaces.Method):
+ """A controllable implementation of a stream-unary method."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, response_callback, context, control):
+ """Services an RPC that takes a stream of messages and produces one message.
+
+ Args:
+ response_callback: A callback to be called to accept the response message
+ of the RPC.
+ context: A face.ServicerContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Returns:
+ A stream.Consumer with which to accept the request messages of the RPC.
+ The consumer returned from this method may or may not be invoked to
+ completion: in the case of RPC abortion, RPC Framework will simply stop
+ passing messages to this object. Implementations must not assume that
+ this object will be called to completion of the request stream or even
+ called at all.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryTestMessages(object):
+ """A type for stream-request-unary-response message pairings."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def requests(self):
+ """Affords a sequence of request messages.
+
+ Implementations of this method should return a different sequences with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A sequence of request messages.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, requests, response, test_case):
+ """Verifies that the computed response matches the given requests.
+
+ Args:
+ requests: A sequence of request messages.
+ response: A response message.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the requests and response do not match, indicating that
+ there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamTestMethodImplementation(test_interfaces.Method):
+ """A controllable implementation of a stream-stream method."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, response_consumer, context, control):
+ """Services an RPC that accepts and produces streams of messages.
+
+ Args:
+ response_consumer: A stream.Consumer to be called to accept the response
+ messages of the RPC.
+ context: A face.ServicerContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Returns:
+ A stream.Consumer with which to accept the request messages of the RPC.
+ The consumer returned from this method may or may not be invoked to
+ completion: in the case of RPC abortion, RPC Framework will simply stop
+ passing messages to this object. Implementations must not assume that
+ this object will be called to completion of the request stream or even
+ called at all.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamTestMessages(object):
+ """A type for stream-request-stream-response message pairings."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def requests(self):
+ """Affords a sequence of request messages.
+
+ Implementations of this method should return a different sequences with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A sequence of request messages.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, requests, responses, test_case):
+ """Verifies that the computed response matches the given requests.
+
+ Args:
+ requests: A sequence of request messages.
+ responses: A sequence of response messages.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the requests and responses do not match, indicating
+ that there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class TestService(object):
+ """A specification of implemented methods to use in tests."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def unary_unary_scenarios(self):
+ """Affords unary-request-unary-response test methods and their messages.
+
+ Returns:
+ A dict from method group-name pair to implementation/messages pair. The
+ first element of the pair is a UnaryUnaryTestMethodImplementation object
+ and the second element is a sequence of UnaryUnaryTestMethodMessages
+ objects.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_stream_scenarios(self):
+ """Affords unary-request-stream-response test methods and their messages.
+
+ Returns:
+ A dict from method group-name pair to implementation/messages pair. The
+ first element of the pair is a UnaryStreamTestMethodImplementation
+ object and the second element is a sequence of
+ UnaryStreamTestMethodMessages objects.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_unary_scenarios(self):
+ """Affords stream-request-unary-response test methods and their messages.
+
+ Returns:
+ A dict from method group-name pair to implementation/messages pair. The
+ first element of the pair is a StreamUnaryTestMethodImplementation
+ object and the second element is a sequence of
+ StreamUnaryTestMethodMessages objects.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_stream_scenarios(self):
+ """Affords stream-request-stream-response test methods and their messages.
+
+ Returns:
+ A dict from method group-name pair to implementation/messages pair. The
+ first element of the pair is a StreamStreamTestMethodImplementation
+ object and the second element is a sequence of
+ StreamStreamTestMethodMessages objects.
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py
new file mode 100644
index 0000000000..1dd2ec3633
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py
@@ -0,0 +1,396 @@
+B# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Examples of Python implementations of the stock.proto Stock service."""
+
+from grpc.framework.common import cardinality
+from grpc.framework.foundation import abandonment
+from grpc.framework.foundation import stream
+from grpc_test.framework.common import test_constants
+from grpc_test.framework.interfaces.face import _service
+from grpc_test._junkdrawer import stock_pb2
+
+_STOCK_GROUP_NAME = 'Stock'
+_SYMBOL_FORMAT = 'test symbol:%03d'
+
+# A test-appropriate security-pricing function. :-P
+_price = lambda symbol_name: float(hash(symbol_name) % 4096)
+
+
+def _get_last_trade_price(stock_request, stock_reply_callback, control, active):
+ """A unary-request, unary-response test method."""
+ control.control()
+ if active():
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=_price(stock_request.symbol)))
+ else:
+ raise abandonment.Abandoned()
+
+
+def _get_last_trade_price_multiple(stock_reply_consumer, control, active):
+ """A stream-request, stream-response test method."""
+ def stock_reply_for_stock_request(stock_request):
+ control.control()
+ if active():
+ return stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=_price(stock_request.symbol))
+ else:
+ raise abandonment.Abandoned()
+
+ class StockRequestConsumer(stream.Consumer):
+
+ def consume(self, stock_request):
+ stock_reply_consumer.consume(stock_reply_for_stock_request(stock_request))
+
+ def terminate(self):
+ control.control()
+ stock_reply_consumer.terminate()
+
+ def consume_and_terminate(self, stock_request):
+ stock_reply_consumer.consume_and_terminate(
+ stock_reply_for_stock_request(stock_request))
+
+ return StockRequestConsumer()
+
+
+def _watch_future_trades(stock_request, stock_reply_consumer, control, active):
+ """A unary-request, stream-response test method."""
+ base_price = _price(stock_request.symbol)
+ for index in range(stock_request.num_trades_to_watch):
+ control.control()
+ if active():
+ stock_reply_consumer.consume(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=base_price + index))
+ else:
+ raise abandonment.Abandoned()
+ stock_reply_consumer.terminate()
+
+
+def _get_highest_trade_price(stock_reply_callback, control, active):
+ """A stream-request, unary-response test method."""
+
+ class StockRequestConsumer(stream.Consumer):
+ """Keeps an ongoing record of the most valuable symbol yet consumed."""
+
+ def __init__(self):
+ self._symbol = None
+ self._price = None
+
+ def consume(self, stock_request):
+ control.control()
+ if active():
+ if self._price is None:
+ self._symbol = stock_request.symbol
+ self._price = _price(stock_request.symbol)
+ else:
+ candidate_price = _price(stock_request.symbol)
+ if self._price < candidate_price:
+ self._symbol = stock_request.symbol
+ self._price = candidate_price
+
+ def terminate(self):
+ control.control()
+ if active():
+ if self._symbol is None:
+ raise ValueError()
+ else:
+ stock_reply_callback(
+ stock_pb2.StockReply(symbol=self._symbol, price=self._price))
+ self._symbol = None
+ self._price = None
+
+ def consume_and_terminate(self, stock_request):
+ control.control()
+ if active():
+ if self._price is None:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol,
+ price=_price(stock_request.symbol)))
+ else:
+ candidate_price = _price(stock_request.symbol)
+ if self._price < candidate_price:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=candidate_price))
+ else:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=self._symbol, price=self._price))
+
+ self._symbol = None
+ self._price = None
+
+ return StockRequestConsumer()
+
+
+class GetLastTradePrice(_service.UnaryUnaryTestMethodImplementation):
+ """GetLastTradePrice for use in tests."""
+
+ def group(self):
+ return _STOCK_GROUP_NAME
+
+ def name(self):
+ return 'GetLastTradePrice'
+
+ def cardinality(self):
+ return cardinality.Cardinality.UNARY_UNARY
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, request, response_callback, context, control):
+ _get_last_trade_price(
+ request, response_callback, control, context.is_active)
+
+
+class GetLastTradePriceMessages(_service.UnaryUnaryTestMessages):
+
+ def __init__(self):
+ self._index = 0
+
+ def request(self):
+ symbol = _SYMBOL_FORMAT % self._index
+ self._index += 1
+ return stock_pb2.StockRequest(symbol=symbol)
+
+ def verify(self, request, response, test_case):
+ test_case.assertEqual(request.symbol, response.symbol)
+ test_case.assertEqual(_price(request.symbol), response.price)
+
+
+class GetLastTradePriceMultiple(_service.StreamStreamTestMethodImplementation):
+ """GetLastTradePriceMultiple for use in tests."""
+
+ def group(self):
+ return _STOCK_GROUP_NAME
+
+ def name(self):
+ return 'GetLastTradePriceMultiple'
+
+ def cardinality(self):
+ return cardinality.Cardinality.STREAM_STREAM
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, response_consumer, context, control):
+ return _get_last_trade_price_multiple(
+ response_consumer, control, context.is_active)
+
+
+class GetLastTradePriceMultipleMessages(_service.StreamStreamTestMessages):
+ """Pairs of message streams for use with GetLastTradePriceMultiple."""
+
+ def __init__(self):
+ self._index = 0
+
+ def requests(self):
+ base_index = self._index
+ self._index += 1
+ return [
+ stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index))
+ for index in range(test_constants.STREAM_LENGTH)]
+
+ def verify(self, requests, responses, test_case):
+ test_case.assertEqual(len(requests), len(responses))
+ for stock_request, stock_reply in zip(requests, responses):
+ test_case.assertEqual(stock_request.symbol, stock_reply.symbol)
+ test_case.assertEqual(_price(stock_request.symbol), stock_reply.price)
+
+
+class WatchFutureTrades(_service.UnaryStreamTestMethodImplementation):
+ """WatchFutureTrades for use in tests."""
+
+ def group(self):
+ return _STOCK_GROUP_NAME
+
+ def name(self):
+ return 'WatchFutureTrades'
+
+ def cardinality(self):
+ return cardinality.Cardinality.UNARY_STREAM
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, request, response_consumer, context, control):
+ _watch_future_trades(request, response_consumer, control, context.is_active)
+
+
+class WatchFutureTradesMessages(_service.UnaryStreamTestMessages):
+ """Pairs of a single request message and a sequence of response messages."""
+
+ def __init__(self):
+ self._index = 0
+
+ def request(self):
+ symbol = _SYMBOL_FORMAT % self._index
+ self._index += 1
+ return stock_pb2.StockRequest(
+ symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH)
+
+ def verify(self, request, responses, test_case):
+ test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses))
+ base_price = _price(request.symbol)
+ for index, response in enumerate(responses):
+ test_case.assertEqual(base_price + index, response.price)
+
+
+class GetHighestTradePrice(_service.StreamUnaryTestMethodImplementation):
+ """GetHighestTradePrice for use in tests."""
+
+ def group(self):
+ return _STOCK_GROUP_NAME
+
+ def name(self):
+ return 'GetHighestTradePrice'
+
+ def cardinality(self):
+ return cardinality.Cardinality.STREAM_UNARY
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, response_callback, context, control):
+ return _get_highest_trade_price(
+ response_callback, control, context.is_active)
+
+
+class GetHighestTradePriceMessages(_service.StreamUnaryTestMessages):
+
+ def requests(self):
+ return [
+ stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index)
+ for index in range(test_constants.STREAM_LENGTH)]
+
+ def verify(self, requests, response, test_case):
+ price = None
+ symbol = None
+ for stock_request in requests:
+ current_symbol = stock_request.symbol
+ current_price = _price(current_symbol)
+ if price is None or price < current_price:
+ price = current_price
+ symbol = current_symbol
+ test_case.assertEqual(price, response.price)
+ test_case.assertEqual(symbol, response.symbol)
+
+
+class StockTestService(_service.TestService):
+ """A corpus of test data with one method of each RPC cardinality."""
+
+ def unary_unary_scenarios(self):
+ return {
+ (_STOCK_GROUP_NAME, 'GetLastTradePrice'): (
+ GetLastTradePrice(), [GetLastTradePriceMessages()]),
+ }
+
+ def unary_stream_scenarios(self):
+ return {
+ (_STOCK_GROUP_NAME, 'WatchFutureTrades'): (
+ WatchFutureTrades(), [WatchFutureTradesMessages()]),
+ }
+
+ def stream_unary_scenarios(self):
+ return {
+ (_STOCK_GROUP_NAME, 'GetHighestTradePrice'): (
+ GetHighestTradePrice(), [GetHighestTradePriceMessages()])
+ }
+
+ def stream_stream_scenarios(self):
+ return {
+ (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'): (
+ GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]),
+ }
+
+
+STOCK_TEST_SERVICE = StockTestService()
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_cases.py
new file mode 100644
index 0000000000..ca623662f7
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_cases.py
@@ -0,0 +1,67 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tools for creating tests of implementations of the Face layer."""
+
+# unittest is referenced from specification in this module.
+import unittest # pylint: disable=unused-import
+
+# test_interfaces is referenced from specification in this module.
+from grpc_test.framework.interfaces.face import _blocking_invocation_inline_service
+from grpc_test.framework.interfaces.face import _event_invocation_synchronous_event_service
+from grpc_test.framework.interfaces.face import _future_invocation_asynchronous_event_service
+from grpc_test.framework.interfaces.face import _invocation
+from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
+
+_TEST_CASE_SUPERCLASSES = (
+ _blocking_invocation_inline_service.TestCase,
+ _event_invocation_synchronous_event_service.TestCase,
+ _future_invocation_asynchronous_event_service.TestCase,
+)
+
+
+def test_cases(implementation):
+ """Creates unittest.TestCase classes for a given Face layer implementation.
+
+ Args:
+ implementation: A test_interfaces.Implementation specifying creation and
+ destruction of a given Face layer implementation.
+
+ Returns:
+ A sequence of subclasses of unittest.TestCase defining tests of the
+ specified Face layer implementation.
+ """
+ test_case_classes = []
+ for invoker_constructor in _invocation.invoker_constructors():
+ for super_class in _TEST_CASE_SUPERCLASSES:
+ test_case_classes.append(
+ type(invoker_constructor.name() + super_class.NAME, (super_class,),
+ {'implementation': implementation,
+ 'invoker_constructor': invoker_constructor}))
+ return test_case_classes
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_interfaces.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_interfaces.py
new file mode 100644
index 0000000000..b2b5c10fa6
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_interfaces.py
@@ -0,0 +1,229 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces used in tests of implementations of the Face layer."""
+
+import abc
+
+from grpc.framework.common import cardinality # pylint: disable=unused-import
+from grpc.framework.interfaces.face import face # pylint: disable=unused-import
+
+
+class Method(object):
+ """Specifies a method to be used in tests."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def group(self):
+ """Identify the group of the method.
+
+ Returns:
+ The group of the method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def name(self):
+ """Identify the name of the method.
+
+ Returns:
+ The name of the method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cardinality(self):
+ """Identify the cardinality of the method.
+
+ Returns:
+ A cardinality.Cardinality value describing the streaming semantics of the
+ method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def request_class(self):
+ """Identify the class used for the method's request objects.
+
+ Returns:
+ The class object of the class to which the method's request objects
+ belong.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def response_class(self):
+ """Identify the class used for the method's response objects.
+
+ Returns:
+ The class object of the class to which the method's response objects
+ belong.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_request(self, request):
+ """Serialize the given request object.
+
+ Args:
+ request: A request object appropriate for this method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_request(self, serialized_request):
+ """Synthesize a request object from a given bytestring.
+
+ Args:
+ serialized_request: A bytestring deserializable into a request object
+ appropriate for this method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_response(self, response):
+ """Serialize the given response object.
+
+ Args:
+ response: A response object appropriate for this method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_response(self, serialized_response):
+ """Synthesize a response object from a given bytestring.
+
+ Args:
+ serialized_response: A bytestring deserializable into a response object
+ appropriate for this method.
+ """
+ raise NotImplementedError()
+
+
+class Implementation(object):
+ """Specifies an implementation of the Face layer."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def instantiate(
+ self, methods, method_implementations,
+ multi_method_implementation):
+ """Instantiates the Face layer implementation to be used in a test.
+
+ Args:
+ methods: A sequence of Method objects describing the methods available to
+ be called during the test.
+ method_implementations: A dictionary from group-name pair to
+ face.MethodImplementation object specifying implementation of a method.
+ multi_method_implementation: A face.MultiMethodImplementation or None.
+
+ Returns:
+ A sequence of length three the first element of which is a
+ face.GenericStub, the second element of which is dictionary from groups
+ to face.DynamicStubs affording invocation of the group's methods, and
+ the third element of which is an arbitrary memo object to be kept and
+ passed to destantiate at the conclusion of the test. The returned stubs
+ must be backed by the provided implementations.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def destantiate(self, memo):
+ """Destroys the Face layer implementation under test.
+
+ Args:
+ memo: The object from the third position of the return value of a call to
+ instantiate.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation_metadata(self):
+ """Provides the metadata to be used when invoking a test RPC.
+
+ Returns:
+ An object to use as the supplied-at-invocation-time metadata in a test
+ RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Provides the metadata for use as a test RPC's first servicer metadata.
+
+ Returns:
+ An object to use as the from-the-servicer-before-responses metadata in a
+ test RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminal_metadata(self):
+ """Provides the metadata for use as a test RPC's second servicer metadata.
+
+ Returns:
+ An object to use as the from-the-servicer-after-all-responses metadata in
+ a test RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def code(self):
+ """Provides the value for use as a test RPC's code.
+
+ Returns:
+ An object to use as the from-the-servicer code in a test RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def details(self):
+ """Provides the value for use as a test RPC's details.
+
+ Returns:
+ An object to use as the from-the-servicer details in a test RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ """Identifies whether or not metadata was properly transmitted.
+
+ Args:
+ original_metadata: A metadata value passed to the Face interface
+ implementation under test.
+ transmitted_metadata: The same metadata value after having been
+ transmitted via an RPC performed by the Face interface implementation
+ under test.
+
+ Returns:
+ Whether or not the metadata was properly transmitted by the Face interface
+ implementation under test.
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
index 1e575d1a9e..ecf49d9cdb 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
@@ -300,7 +300,7 @@ class TransmissionTest(object):
invocation_operation_id, 0, _TRANSMISSION_GROUP, _TRANSMISSION_METHOD,
links.Ticket.Subscription.FULL, timeout, 0, invocation_initial_metadata,
invocation_payload, invocation_terminal_metadata, invocation_code,
- invocation_message, links.Ticket.Termination.COMPLETION)
+ invocation_message, links.Ticket.Termination.COMPLETION, None)
self._invocation_link.accept_ticket(original_invocation_ticket)
self._service_mate.block_until_tickets_satisfy(
@@ -317,7 +317,7 @@ class TransmissionTest(object):
service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
timeout, 0, service_initial_metadata, service_payload,
service_terminal_metadata, service_code, service_message,
- links.Ticket.Termination.COMPLETION)
+ links.Ticket.Termination.COMPLETION, None)
self._service_link.accept_ticket(original_service_ticket)
self._invocation_mate.block_until_tickets_satisfy(terminated)
self._assert_is_valid_service_sequence(
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
index 6c2e3346aa..39c7f2fc63 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py
@@ -29,9 +29,42 @@
"""State and behavior appropriate for use in tests."""
+import logging
import threading
+import time
from grpc.framework.interfaces.links import links
+from grpc.framework.interfaces.links import utilities
+
+# A more-or-less arbitrary limit on the length of raw data values to be logged.
+_UNCOMFORTABLY_LONG = 48
+
+
+def _safe_for_log_ticket(ticket):
+ """Creates a safe-for-printing-to-the-log ticket for a given ticket.
+
+ Args:
+ ticket: Any links.Ticket.
+
+ Returns:
+ A links.Ticket that is as much as can be equal to the given ticket but
+ possibly features values like the string "<payload of length 972321>" in
+ place of the actual values of the given ticket.
+ """
+ if isinstance(ticket.payload, (basestring,)):
+ payload_length = len(ticket.payload)
+ else:
+ payload_length = -1
+ if payload_length < _UNCOMFORTABLY_LONG:
+ return ticket
+ else:
+ return links.Ticket(
+ ticket.operation_id, ticket.sequence_number,
+ ticket.group, ticket.method, ticket.subscription, ticket.timeout,
+ ticket.allowance, ticket.initial_metadata,
+ '<payload of length {}>'.format(payload_length),
+ ticket.terminal_metadata, ticket.code, ticket.message,
+ ticket.termination, None)
class RecordingLink(links.Link):
@@ -64,3 +97,71 @@ class RecordingLink(links.Link):
"""Returns a copy of the list of all tickets received by this Link."""
with self._condition:
return tuple(self._tickets)
+
+
+class _Pipe(object):
+ """A conduit that logs all tickets passed through it."""
+
+ def __init__(self, name):
+ self._lock = threading.Lock()
+ self._name = name
+ self._left_mate = utilities.NULL_LINK
+ self._right_mate = utilities.NULL_LINK
+
+ def accept_left_to_right_ticket(self, ticket):
+ with self._lock:
+ logging.warning(
+ '%s: moving left to right through %s: %s', time.time(), self._name,
+ _safe_for_log_ticket(ticket))
+ try:
+ self._right_mate.accept_ticket(ticket)
+ except Exception as e: # pylint: disable=broad-except
+ logging.exception(e)
+
+ def accept_right_to_left_ticket(self, ticket):
+ with self._lock:
+ logging.warning(
+ '%s: moving right to left through %s: %s', time.time(), self._name,
+ _safe_for_log_ticket(ticket))
+ try:
+ self._left_mate.accept_ticket(ticket)
+ except Exception as e: # pylint: disable=broad-except
+ logging.exception(e)
+
+ def join_left_mate(self, left_mate):
+ with self._lock:
+ self._left_mate = utilities.NULL_LINK if left_mate is None else left_mate
+
+ def join_right_mate(self, right_mate):
+ with self._lock:
+ self._right_mate = (
+ utilities.NULL_LINK if right_mate is None else right_mate)
+
+
+class _Facade(links.Link):
+
+ def __init__(self, accept, join):
+ self._accept = accept
+ self._join = join
+
+ def accept_ticket(self, ticket):
+ self._accept(ticket)
+
+ def join_link(self, link):
+ self._join(link)
+
+
+def logging_links(name):
+ """Creates a conduit that logs all tickets passed through it.
+
+ Args:
+ name: A name to use for the conduit to identify itself in logging output.
+
+ Returns:
+ Two links.Links, the first of which is the "left" side of the conduit
+ and the second of which is the "right" side of the conduit.
+ """
+ pipe = _Pipe(name)
+ left_facade = _Facade(pipe.accept_left_to_right_ticket, pipe.join_left_mate)
+ right_facade = _Facade(pipe.accept_right_to_left_ticket, pipe.join_right_mate)
+ return left_facade, right_facade