diff options
Diffstat (limited to 'src/python/grpcio_test/grpc_test/_adapter/_links_test.py')
-rw-r--r-- | src/python/grpcio_test/grpc_test/_adapter/_links_test.py | 277 |
1 files changed, 277 insertions, 0 deletions
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_links_test.py b/src/python/grpcio_test/grpc_test/_adapter/_links_test.py new file mode 100644 index 0000000000..c4686b327a --- /dev/null +++ b/src/python/grpcio_test/grpc_test/_adapter/_links_test.py @@ -0,0 +1,277 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test of the GRPC-backed ForeLink and RearLink.""" + +import threading +import unittest + +from grpc._adapter import fore +from grpc._adapter import rear +from grpc.framework.base import interfaces +from grpc.framework.foundation import logging_pool +from grpc_test._adapter import _proto_scenarios +from grpc_test._adapter import _test_links + +_IDENTITY = lambda x: x +_TIMEOUT = 32 + + +# TODO(nathaniel): End-to-end metadata testing. +def _transform_metadata(unused_metadata): + return ( + ('one unused key', 'one unused value'), + ('another unused key', 'another unused value'), +) + + +class RoundTripTest(unittest.TestCase): + + def setUp(self): + self.fore_link_pool = logging_pool.pool(8) + self.rear_link_pool = logging_pool.pool(8) + + def tearDown(self): + self.rear_link_pool.shutdown(wait=True) + self.fore_link_pool.shutdown(wait=True) + + def testZeroMessageRoundTrip(self): + test_operation_id = object() + test_method = 'test method' + test_fore_link = _test_links.ForeLink(None, None) + def rear_action(front_to_back_ticket, fore_link): + if front_to_back_ticket.kind in ( + interfaces.FrontToBackTicket.Kind.COMPLETION, + interfaces.FrontToBackTicket.Kind.ENTIRE): + back_to_front_ticket = interfaces.BackToFrontTicket( + front_to_back_ticket.operation_id, 0, + interfaces.BackToFrontTicket.Kind.COMPLETION, None) + fore_link.accept_back_to_front_ticket(back_to_front_ticket) + test_rear_link = _test_links.RearLink(rear_action, None) + + fore_link = fore.ForeLink( + self.fore_link_pool, {test_method: None}, {test_method: None}, None, ()) + fore_link.join_rear_link(test_rear_link) + test_rear_link.join_fore_link(fore_link) + fore_link.start() + port = fore_link.port() + + rear_link = rear.RearLink( + 'localhost', port, self.rear_link_pool, {test_method: None}, + {test_method: None}, False, None, None, None, + metadata_transformer=_transform_metadata) + rear_link.join_fore_link(test_fore_link) + test_fore_link.join_rear_link(rear_link) + rear_link.start() + + front_to_back_ticket = interfaces.FrontToBackTicket( + test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE, + test_method, interfaces.ServicedSubscription.Kind.FULL, None, None, + _TIMEOUT) + rear_link.accept_front_to_back_ticket(front_to_back_ticket) + + with test_fore_link.condition: + while (not test_fore_link.tickets or + test_fore_link.tickets[-1].kind is + interfaces.BackToFrontTicket.Kind.CONTINUATION): + test_fore_link.condition.wait() + + rear_link.stop() + fore_link.stop() + + with test_fore_link.condition: + self.assertIs( + test_fore_link.tickets[-1].kind, + interfaces.BackToFrontTicket.Kind.COMPLETION) + + def testEntireRoundTrip(self): + test_operation_id = object() + test_method = 'test method' + test_front_to_back_datum = b'\x07' + test_back_to_front_datum = b'\x08' + test_fore_link = _test_links.ForeLink(None, None) + rear_sequence_number = [0] + def rear_action(front_to_back_ticket, fore_link): + if front_to_back_ticket.payload is None: + payload = None + else: + payload = test_back_to_front_datum + terminal = front_to_back_ticket.kind in ( + interfaces.FrontToBackTicket.Kind.COMPLETION, + interfaces.FrontToBackTicket.Kind.ENTIRE) + if payload is not None or terminal: + if terminal: + kind = interfaces.BackToFrontTicket.Kind.COMPLETION + else: + kind = interfaces.BackToFrontTicket.Kind.CONTINUATION + back_to_front_ticket = interfaces.BackToFrontTicket( + front_to_back_ticket.operation_id, rear_sequence_number[0], kind, + payload) + rear_sequence_number[0] += 1 + fore_link.accept_back_to_front_ticket(back_to_front_ticket) + test_rear_link = _test_links.RearLink(rear_action, None) + + fore_link = fore.ForeLink( + self.fore_link_pool, {test_method: _IDENTITY}, + {test_method: _IDENTITY}, None, ()) + fore_link.join_rear_link(test_rear_link) + test_rear_link.join_fore_link(fore_link) + fore_link.start() + port = fore_link.port() + + rear_link = rear.RearLink( + 'localhost', port, self.rear_link_pool, {test_method: _IDENTITY}, + {test_method: _IDENTITY}, False, None, None, None) + rear_link.join_fore_link(test_fore_link) + test_fore_link.join_rear_link(rear_link) + rear_link.start() + + front_to_back_ticket = interfaces.FrontToBackTicket( + test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE, + test_method, interfaces.ServicedSubscription.Kind.FULL, None, + test_front_to_back_datum, _TIMEOUT) + rear_link.accept_front_to_back_ticket(front_to_back_ticket) + + with test_fore_link.condition: + while (not test_fore_link.tickets or + test_fore_link.tickets[-1].kind is not + interfaces.BackToFrontTicket.Kind.COMPLETION): + test_fore_link.condition.wait() + + rear_link.stop() + fore_link.stop() + + with test_rear_link.condition: + front_to_back_payloads = tuple( + ticket.payload for ticket in test_rear_link.tickets + if ticket.payload is not None) + with test_fore_link.condition: + back_to_front_payloads = tuple( + ticket.payload for ticket in test_fore_link.tickets + if ticket.payload is not None) + self.assertTupleEqual((test_front_to_back_datum,), front_to_back_payloads) + self.assertTupleEqual((test_back_to_front_datum,), back_to_front_payloads) + + def _perform_scenario_test(self, scenario): + test_operation_id = object() + test_method = scenario.method() + test_fore_link = _test_links.ForeLink(None, None) + rear_lock = threading.Lock() + rear_sequence_number = [0] + def rear_action(front_to_back_ticket, fore_link): + with rear_lock: + if front_to_back_ticket.payload is not None: + response = scenario.response_for_request(front_to_back_ticket.payload) + else: + response = None + terminal = front_to_back_ticket.kind in ( + interfaces.FrontToBackTicket.Kind.COMPLETION, + interfaces.FrontToBackTicket.Kind.ENTIRE) + if response is not None or terminal: + if terminal: + kind = interfaces.BackToFrontTicket.Kind.COMPLETION + else: + kind = interfaces.BackToFrontTicket.Kind.CONTINUATION + back_to_front_ticket = interfaces.BackToFrontTicket( + front_to_back_ticket.operation_id, rear_sequence_number[0], kind, + response) + rear_sequence_number[0] += 1 + fore_link.accept_back_to_front_ticket(back_to_front_ticket) + test_rear_link = _test_links.RearLink(rear_action, None) + + fore_link = fore.ForeLink( + self.fore_link_pool, {test_method: scenario.deserialize_request}, + {test_method: scenario.serialize_response}, None, ()) + fore_link.join_rear_link(test_rear_link) + test_rear_link.join_fore_link(fore_link) + fore_link.start() + port = fore_link.port() + + rear_link = rear.RearLink( + 'localhost', port, self.rear_link_pool, + {test_method: scenario.serialize_request}, + {test_method: scenario.deserialize_response}, False, None, None, None) + rear_link.join_fore_link(test_fore_link) + test_fore_link.join_rear_link(rear_link) + rear_link.start() + + commencement_ticket = interfaces.FrontToBackTicket( + test_operation_id, 0, + interfaces.FrontToBackTicket.Kind.COMMENCEMENT, test_method, + interfaces.ServicedSubscription.Kind.FULL, None, None, + _TIMEOUT) + fore_sequence_number = 1 + rear_link.accept_front_to_back_ticket(commencement_ticket) + for request in scenario.requests(): + continuation_ticket = interfaces.FrontToBackTicket( + test_operation_id, fore_sequence_number, + interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None, None, + request, None) + fore_sequence_number += 1 + rear_link.accept_front_to_back_ticket(continuation_ticket) + completion_ticket = interfaces.FrontToBackTicket( + test_operation_id, fore_sequence_number, + interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None, None, + None) + fore_sequence_number += 1 + rear_link.accept_front_to_back_ticket(completion_ticket) + + with test_fore_link.condition: + while (not test_fore_link.tickets or + test_fore_link.tickets[-1].kind is not + interfaces.BackToFrontTicket.Kind.COMPLETION): + test_fore_link.condition.wait() + + rear_link.stop() + fore_link.stop() + + with test_rear_link.condition: + requests = tuple( + ticket.payload for ticket in test_rear_link.tickets + if ticket.payload is not None) + with test_fore_link.condition: + responses = tuple( + ticket.payload for ticket in test_fore_link.tickets + if ticket.payload is not None) + self.assertTrue(scenario.verify_requests(requests)) + self.assertTrue(scenario.verify_responses(responses)) + + def testEmptyScenario(self): + self._perform_scenario_test(_proto_scenarios.EmptyScenario()) + + def testBidirectionallyUnaryScenario(self): + self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario()) + + def testBidirectionallyStreamingScenario(self): + self._perform_scenario_test( + _proto_scenarios.BidirectionallyStreamingScenario()) + + +if __name__ == '__main__': + unittest.main(verbosity=2) |