aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_test/grpc_test/_adapter/_links_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_test/grpc_test/_adapter/_links_test.py')
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_links_test.py277
1 files changed, 277 insertions, 0 deletions
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_links_test.py b/src/python/grpcio_test/grpc_test/_adapter/_links_test.py
new file mode 100644
index 0000000000..c4686b327a
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/_links_test.py
@@ -0,0 +1,277 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test of the GRPC-backed ForeLink and RearLink."""
+
+import threading
+import unittest
+
+from grpc._adapter import fore
+from grpc._adapter import rear
+from grpc.framework.base import interfaces
+from grpc.framework.foundation import logging_pool
+from grpc_test._adapter import _proto_scenarios
+from grpc_test._adapter import _test_links
+
+_IDENTITY = lambda x: x
+_TIMEOUT = 32
+
+
+# TODO(nathaniel): End-to-end metadata testing.
+def _transform_metadata(unused_metadata):
+ return (
+ ('one unused key', 'one unused value'),
+ ('another unused key', 'another unused value'),
+)
+
+
+class RoundTripTest(unittest.TestCase):
+
+ def setUp(self):
+ self.fore_link_pool = logging_pool.pool(8)
+ self.rear_link_pool = logging_pool.pool(8)
+
+ def tearDown(self):
+ self.rear_link_pool.shutdown(wait=True)
+ self.fore_link_pool.shutdown(wait=True)
+
+ def testZeroMessageRoundTrip(self):
+ test_operation_id = object()
+ test_method = 'test method'
+ test_fore_link = _test_links.ForeLink(None, None)
+ def rear_action(front_to_back_ticket, fore_link):
+ if front_to_back_ticket.kind in (
+ interfaces.FrontToBackTicket.Kind.COMPLETION,
+ interfaces.FrontToBackTicket.Kind.ENTIRE):
+ back_to_front_ticket = interfaces.BackToFrontTicket(
+ front_to_back_ticket.operation_id, 0,
+ interfaces.BackToFrontTicket.Kind.COMPLETION, None)
+ fore_link.accept_back_to_front_ticket(back_to_front_ticket)
+ test_rear_link = _test_links.RearLink(rear_action, None)
+
+ fore_link = fore.ForeLink(
+ self.fore_link_pool, {test_method: None}, {test_method: None}, None, ())
+ fore_link.join_rear_link(test_rear_link)
+ test_rear_link.join_fore_link(fore_link)
+ fore_link.start()
+ port = fore_link.port()
+
+ rear_link = rear.RearLink(
+ 'localhost', port, self.rear_link_pool, {test_method: None},
+ {test_method: None}, False, None, None, None,
+ metadata_transformer=_transform_metadata)
+ rear_link.join_fore_link(test_fore_link)
+ test_fore_link.join_rear_link(rear_link)
+ rear_link.start()
+
+ front_to_back_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE,
+ test_method, interfaces.ServicedSubscription.Kind.FULL, None, None,
+ _TIMEOUT)
+ rear_link.accept_front_to_back_ticket(front_to_back_ticket)
+
+ with test_fore_link.condition:
+ while (not test_fore_link.tickets or
+ test_fore_link.tickets[-1].kind is
+ interfaces.BackToFrontTicket.Kind.CONTINUATION):
+ test_fore_link.condition.wait()
+
+ rear_link.stop()
+ fore_link.stop()
+
+ with test_fore_link.condition:
+ self.assertIs(
+ test_fore_link.tickets[-1].kind,
+ interfaces.BackToFrontTicket.Kind.COMPLETION)
+
+ def testEntireRoundTrip(self):
+ test_operation_id = object()
+ test_method = 'test method'
+ test_front_to_back_datum = b'\x07'
+ test_back_to_front_datum = b'\x08'
+ test_fore_link = _test_links.ForeLink(None, None)
+ rear_sequence_number = [0]
+ def rear_action(front_to_back_ticket, fore_link):
+ if front_to_back_ticket.payload is None:
+ payload = None
+ else:
+ payload = test_back_to_front_datum
+ terminal = front_to_back_ticket.kind in (
+ interfaces.FrontToBackTicket.Kind.COMPLETION,
+ interfaces.FrontToBackTicket.Kind.ENTIRE)
+ if payload is not None or terminal:
+ if terminal:
+ kind = interfaces.BackToFrontTicket.Kind.COMPLETION
+ else:
+ kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
+ back_to_front_ticket = interfaces.BackToFrontTicket(
+ front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
+ payload)
+ rear_sequence_number[0] += 1
+ fore_link.accept_back_to_front_ticket(back_to_front_ticket)
+ test_rear_link = _test_links.RearLink(rear_action, None)
+
+ fore_link = fore.ForeLink(
+ self.fore_link_pool, {test_method: _IDENTITY},
+ {test_method: _IDENTITY}, None, ())
+ fore_link.join_rear_link(test_rear_link)
+ test_rear_link.join_fore_link(fore_link)
+ fore_link.start()
+ port = fore_link.port()
+
+ rear_link = rear.RearLink(
+ 'localhost', port, self.rear_link_pool, {test_method: _IDENTITY},
+ {test_method: _IDENTITY}, False, None, None, None)
+ rear_link.join_fore_link(test_fore_link)
+ test_fore_link.join_rear_link(rear_link)
+ rear_link.start()
+
+ front_to_back_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE,
+ test_method, interfaces.ServicedSubscription.Kind.FULL, None,
+ test_front_to_back_datum, _TIMEOUT)
+ rear_link.accept_front_to_back_ticket(front_to_back_ticket)
+
+ with test_fore_link.condition:
+ while (not test_fore_link.tickets or
+ test_fore_link.tickets[-1].kind is not
+ interfaces.BackToFrontTicket.Kind.COMPLETION):
+ test_fore_link.condition.wait()
+
+ rear_link.stop()
+ fore_link.stop()
+
+ with test_rear_link.condition:
+ front_to_back_payloads = tuple(
+ ticket.payload for ticket in test_rear_link.tickets
+ if ticket.payload is not None)
+ with test_fore_link.condition:
+ back_to_front_payloads = tuple(
+ ticket.payload for ticket in test_fore_link.tickets
+ if ticket.payload is not None)
+ self.assertTupleEqual((test_front_to_back_datum,), front_to_back_payloads)
+ self.assertTupleEqual((test_back_to_front_datum,), back_to_front_payloads)
+
+ def _perform_scenario_test(self, scenario):
+ test_operation_id = object()
+ test_method = scenario.method()
+ test_fore_link = _test_links.ForeLink(None, None)
+ rear_lock = threading.Lock()
+ rear_sequence_number = [0]
+ def rear_action(front_to_back_ticket, fore_link):
+ with rear_lock:
+ if front_to_back_ticket.payload is not None:
+ response = scenario.response_for_request(front_to_back_ticket.payload)
+ else:
+ response = None
+ terminal = front_to_back_ticket.kind in (
+ interfaces.FrontToBackTicket.Kind.COMPLETION,
+ interfaces.FrontToBackTicket.Kind.ENTIRE)
+ if response is not None or terminal:
+ if terminal:
+ kind = interfaces.BackToFrontTicket.Kind.COMPLETION
+ else:
+ kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
+ back_to_front_ticket = interfaces.BackToFrontTicket(
+ front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
+ response)
+ rear_sequence_number[0] += 1
+ fore_link.accept_back_to_front_ticket(back_to_front_ticket)
+ test_rear_link = _test_links.RearLink(rear_action, None)
+
+ fore_link = fore.ForeLink(
+ self.fore_link_pool, {test_method: scenario.deserialize_request},
+ {test_method: scenario.serialize_response}, None, ())
+ fore_link.join_rear_link(test_rear_link)
+ test_rear_link.join_fore_link(fore_link)
+ fore_link.start()
+ port = fore_link.port()
+
+ rear_link = rear.RearLink(
+ 'localhost', port, self.rear_link_pool,
+ {test_method: scenario.serialize_request},
+ {test_method: scenario.deserialize_response}, False, None, None, None)
+ rear_link.join_fore_link(test_fore_link)
+ test_fore_link.join_rear_link(rear_link)
+ rear_link.start()
+
+ commencement_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, 0,
+ interfaces.FrontToBackTicket.Kind.COMMENCEMENT, test_method,
+ interfaces.ServicedSubscription.Kind.FULL, None, None,
+ _TIMEOUT)
+ fore_sequence_number = 1
+ rear_link.accept_front_to_back_ticket(commencement_ticket)
+ for request in scenario.requests():
+ continuation_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, fore_sequence_number,
+ interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None, None,
+ request, None)
+ fore_sequence_number += 1
+ rear_link.accept_front_to_back_ticket(continuation_ticket)
+ completion_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, fore_sequence_number,
+ interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None, None,
+ None)
+ fore_sequence_number += 1
+ rear_link.accept_front_to_back_ticket(completion_ticket)
+
+ with test_fore_link.condition:
+ while (not test_fore_link.tickets or
+ test_fore_link.tickets[-1].kind is not
+ interfaces.BackToFrontTicket.Kind.COMPLETION):
+ test_fore_link.condition.wait()
+
+ rear_link.stop()
+ fore_link.stop()
+
+ with test_rear_link.condition:
+ requests = tuple(
+ ticket.payload for ticket in test_rear_link.tickets
+ if ticket.payload is not None)
+ with test_fore_link.condition:
+ responses = tuple(
+ ticket.payload for ticket in test_fore_link.tickets
+ if ticket.payload is not None)
+ self.assertTrue(scenario.verify_requests(requests))
+ self.assertTrue(scenario.verify_responses(responses))
+
+ def testEmptyScenario(self):
+ self._perform_scenario_test(_proto_scenarios.EmptyScenario())
+
+ def testBidirectionallyUnaryScenario(self):
+ self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario())
+
+ def testBidirectionallyStreamingScenario(self):
+ self._perform_scenario_test(
+ _proto_scenarios.BidirectionallyStreamingScenario())
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)