# Copyright 2015, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """Tests for the old '_low'.""" import threading import time import unittest import six from six.moves import queue from grpc._adapter import _intermediary_low as _low _STREAM_LENGTH = 300 _TIMEOUT = 5 _AFTER_DELAY = 2 _FUTURE = time.time() + 60 * 60 * 24 _BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200 _BYTE_SEQUENCE_SEQUENCE = tuple( bytes(bytearray((row + column) % 256 for column in range(row))) for row in range(_STREAM_LENGTH)) class LonelyClientTest(unittest.TestCase): def testLonelyClient(self): host = 'nosuchhostexists' port = 54321 method = 'test method' deadline = time.time() + _TIMEOUT after_deadline = deadline + _AFTER_DELAY metadata_tag = object() finish_tag = object() completion_queue = _low.CompletionQueue() channel = _low.Channel('%s:%d' % (host, port), None) client_call = _low.Call(channel, completion_queue, method, host, deadline) client_call.invoke(completion_queue, metadata_tag, finish_tag) first_event = completion_queue.get(after_deadline) self.assertIsNotNone(first_event) second_event = completion_queue.get(after_deadline) self.assertIsNotNone(second_event) kinds = [event.kind for event in (first_event, second_event)] six.assertCountEqual(self, (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH), kinds) self.assertIsNone(completion_queue.get(after_deadline)) completion_queue.stop() stop_event = completion_queue.get(_FUTURE) self.assertEqual(_low.Event.Kind.STOP, stop_event.kind) del client_call del channel del completion_queue def _drive_completion_queue(completion_queue, event_queue): while True: event = completion_queue.get(_FUTURE) if event.kind is _low.Event.Kind.STOP: break event_queue.put(event) class EchoTest(unittest.TestCase): def setUp(self): self.host = 'localhost' self.server_completion_queue = _low.CompletionQueue() self.server = _low.Server(self.server_completion_queue) port = self.server.add_http2_addr('[::]:0') self.server.start() self.server_events = queue.Queue() self.server_completion_queue_thread = threading.Thread( target=_drive_completion_queue, args=(self.server_completion_queue, self.server_events)) self.server_completion_queue_thread.start() self.client_completion_queue = _low.CompletionQueue() self.channel = _low.Channel('%s:%d' % (self.host, port), None) self.client_events = queue.Queue() self.client_completion_queue_thread = threading.Thread( target=_drive_completion_queue, args=(self.client_completion_queue, self.client_events)) self.client_completion_queue_thread.start() def tearDown(self): self.server.stop() self.server.cancel_all_calls() self.server_completion_queue.stop() self.client_completion_queue.stop() self.server_completion_queue_thread.join() self.client_completion_queue_thread.join() del self.server def _perform_echo_test(self, test_data): method = 'test method' details = 'test details' server_leading_metadata_key = 'my_server_leading_key' server_leading_metadata_value = 'my_server_leading_value' server_trailing_metadata_key = 'my_server_trailing_key' server_trailing_metadata_value = 'my_server_trailing_value' client_metadata_key = 'my_client_key' client_metadata_value = 'my_client_value' server_leading_binary_metadata_key = 'my_server_leading_key-bin' server_leading_binary_metadata_value = b'\0'*2047 server_trailing_binary_metadata_key = 'my_server_trailing_key-bin' server_trailing_binary_metadata_value = b'\0'*2047 client_binary_metadata_key = 'my_client_key-bin' client_binary_metadata_value = b'\0'*2047 deadline = _FUTURE metadata_tag = object() finish_tag = object() write_tag = object() complete_tag = object() service_tag = object() read_tag = object() status_tag = object() server_data = [] client_data = [] client_call = _low.Call(self.channel, self.client_completion_queue, method, self.host, deadline) client_call.add_metadata(client_metadata_key, client_metadata_value) client_call.add_metadata(client_binary_metadata_key, client_binary_metadata_value) client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) self.server.service(service_tag) service_accepted = self.server_events.get() self.assertIsNotNone(service_accepted) self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED) self.assertIs(service_accepted.tag, service_tag) self.assertEqual(method, service_accepted.service_acceptance.method) self.assertEqual(self.host, service_accepted.service_acceptance.host) self.assertIsNotNone(service_accepted.service_acceptance.call) metadata = dict(service_accepted.metadata) self.assertIn(client_metadata_key, metadata) self.assertEqual(client_metadata_value, metadata[client_metadata_key]) self.assertIn(client_binary_metadata_key, metadata) self.assertEqual(client_binary_metadata_value, metadata[client_binary_metadata_key]) server_call = service_accepted.service_acceptance.call server_call.accept(self.server_completion_queue, finish_tag) server_call.add_metadata(server_leading_metadata_key, server_leading_metadata_value) server_call.add_metadata(server_leading_binary_metadata_key, server_leading_binary_metadata_value) server_call.premetadata() metadata_accepted = self.client_events.get() self.assertIsNotNone(metadata_accepted) self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind) self.assertEqual(metadata_tag, metadata_accepted.tag) metadata = dict(metadata_accepted.metadata) self.assertIn(server_leading_metadata_key, metadata) self.assertEqual(server_leading_metadata_value, metadata[server_leading_metadata_key]) self.assertIn(server_leading_binary_metadata_key, metadata) self.assertEqual(server_leading_binary_metadata_value, metadata[server_leading_binary_metadata_key]) for datum in test_data: client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS) write_accepted = self.client_events.get() self.assertIsNotNone(write_accepted) self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED) self.assertIs(write_accepted.tag, write_tag) self.assertIs(write_accepted.write_accepted, True) server_call.read(read_tag) read_accepted = self.server_events.get() self.assertIsNotNone(read_accepted) self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) self.assertEqual(read_tag, read_accepted.tag) self.assertIsNotNone(read_accepted.bytes) server_data.append(read_accepted.bytes) server_call.write(read_accepted.bytes, write_tag, 0) write_accepted = self.server_events.get() self.assertIsNotNone(write_accepted) self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind) self.assertEqual(write_tag, write_accepted.tag) self.assertTrue(write_accepted.write_accepted) client_call.read(read_tag) read_accepted = self.client_events.get() self.assertIsNotNone(read_accepted) self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) self.assertEqual(read_tag, read_accepted.tag) self.assertIsNotNone(read_accepted.bytes) client_data.append(read_accepted.bytes) client_call.complete(complete_tag) complete_accepted = self.client_events.get() self.assertIsNotNone(complete_accepted) self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED) self.assertIs(complete_accepted.tag, complete_tag) self.assertIs(complete_accepted.complete_accepted, True) server_call.read(read_tag) read_accepted = self.server_events.get() self.assertIsNotNone(read_accepted) self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) self.assertEqual(read_tag, read_accepted.tag) self.assertIsNone(read_accepted.bytes) server_call.add_metadata(server_trailing_metadata_key, server_trailing_metadata_value) server_call.add_metadata(server_trailing_binary_metadata_key, server_trailing_binary_metadata_value) server_call.status(_low.Status(_low.Code.OK, details), status_tag) server_terminal_event_one = self.server_events.get() server_terminal_event_two = self.server_events.get() if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED: status_accepted = server_terminal_event_one rpc_accepted = server_terminal_event_two else: status_accepted = server_terminal_event_two rpc_accepted = server_terminal_event_one self.assertIsNotNone(status_accepted) self.assertIsNotNone(rpc_accepted) self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind) self.assertEqual(status_tag, status_accepted.tag) self.assertTrue(status_accepted.complete_accepted) self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) self.assertEqual(finish_tag, rpc_accepted.tag) self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status) client_call.read(read_tag) client_terminal_event_one = self.client_events.get() client_terminal_event_two = self.client_events.get() if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: read_accepted = client_terminal_event_one finish_accepted = client_terminal_event_two else: read_accepted = client_terminal_event_two finish_accepted = client_terminal_event_one self.assertIsNotNone(read_accepted) self.assertIsNotNone(finish_accepted) self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) self.assertEqual(read_tag, read_accepted.tag) self.assertIsNone(read_accepted.bytes) self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind) self.assertEqual(finish_tag, finish_accepted.tag) self.assertEqual(_low.Status(_low.Code.OK, details), finish_accepted.status) metadata = dict(finish_accepted.metadata) self.assertIn(server_trailing_metadata_key, metadata) self.assertEqual(server_trailing_metadata_value, metadata[server_trailing_metadata_key]) self.assertIn(server_trailing_binary_metadata_key, metadata) self.assertEqual(server_trailing_binary_metadata_value, metadata[server_trailing_binary_metadata_key]) self.assertSetEqual(set(key for key, _ in finish_accepted.metadata), set((server_trailing_metadata_key, server_trailing_binary_metadata_key,))) self.assertSequenceEqual(test_data, server_data) self.assertSequenceEqual(test_data, client_data) def testNoEcho(self): self._perform_echo_test(()) def testOneByteEcho(self): self._perform_echo_test([b'\x07']) def testOneManyByteEcho(self): self._perform_echo_test([_BYTE_SEQUENCE]) def testManyOneByteEchoes(self): self._perform_echo_test(_BYTE_SEQUENCE) def testManyManyByteEchoes(self): self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE) class CancellationTest(unittest.TestCase): def setUp(self): self.host = 'localhost' self.server_completion_queue = _low.CompletionQueue() self.server = _low.Server(self.server_completion_queue) port = self.server.add_http2_addr('[::]:0') self.server.start() self.server_events = queue.Queue() self.server_completion_queue_thread = threading.Thread( target=_drive_completion_queue, args=(self.server_completion_queue, self.server_events)) self.server_completion_queue_thread.start() self.client_completion_queue = _low.CompletionQueue() self.channel = _low.Channel('%s:%d' % (self.host, port), None) self.client_events = queue.Queue() self.client_completion_queue_thread = threading.Thread( target=_drive_completion_queue, args=(self.client_completion_queue, self.client_events)) self.client_completion_queue_thread.start() def tearDown(self): self.server.stop() self.server.cancel_all_calls() self.server_completion_queue.stop() self.client_completion_queue.stop() self.server_completion_queue_thread.join() self.client_completion_queue_thread.join() del self.server def testCancellation(self): method = 'test method' deadline = _FUTURE metadata_tag = object() finish_tag = object() write_tag = object() service_tag = object() read_tag = object() test_data = _BYTE_SEQUENCE_SEQUENCE server_data = [] client_data = [] client_call = _low.Call(self.channel, self.client_completion_queue, method, self.host, deadline) client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) self.server.service(service_tag) service_accepted = self.server_events.get() server_call = service_accepted.service_acceptance.call server_call.accept(self.server_completion_queue, finish_tag) server_call.premetadata() metadata_accepted = self.client_events.get() self.assertIsNotNone(metadata_accepted) for datum in test_data: client_call.write(datum, write_tag, 0) write_accepted = self.client_events.get() server_call.read(read_tag) read_accepted = self.server_events.get() server_data.append(read_accepted.bytes) server_call.write(read_accepted.bytes, write_tag, 0) write_accepted = self.server_events.get() self.assertIsNotNone(write_accepted) client_call.read(read_tag) read_accepted = self.client_events.get() client_data.append(read_accepted.bytes) client_call.cancel() # cancel() is idempotent. client_call.cancel() client_call.cancel() client_call.cancel() server_call.read(read_tag) server_terminal_event_one = self.server_events.get() server_terminal_event_two = self.server_events.get() if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: read_accepted = server_terminal_event_one rpc_accepted = server_terminal_event_two else: read_accepted = server_terminal_event_two rpc_accepted = server_terminal_event_one self.assertIsNotNone(read_accepted) self.assertIsNotNone(rpc_accepted) self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) self.assertIsNone(read_accepted.bytes) self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status) finish_event = self.client_events.get() self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind) self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'), finish_event.status) self.assertSequenceEqual(test_data, server_data) self.assertSequenceEqual(test_data, client_data) class ExpirationTest(unittest.TestCase): @unittest.skip('TODO(nathaniel): Expiration test!') def testExpiration(self): pass if __name__ == '__main__': unittest.main(verbosity=2)