diff options
Diffstat (limited to 'src/python/grpcio/tests/unit/_adapter')
-rw-r--r-- | src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py | 429 | ||||
-rw-r--r-- | src/python/grpcio/tests/unit/_adapter/_low_test.py | 319 |
2 files changed, 0 insertions, 748 deletions
diff --git a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py b/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py deleted file mode 100644 index 09ebdeff33..0000000000 --- a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py +++ /dev/null @@ -1,429 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests for the old '_low'.""" - -import threading -import time -import unittest - -import six -from six.moves import queue - -from grpc._adapter import _intermediary_low as _low - -_STREAM_LENGTH = 300 -_TIMEOUT = 5 -_AFTER_DELAY = 2 -_FUTURE = time.time() + 60 * 60 * 24 -_BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200 -_BYTE_SEQUENCE_SEQUENCE = tuple( - bytes(bytearray((row + column) % 256 for column in range(row))) - for row in range(_STREAM_LENGTH)) - - -class LonelyClientTest(unittest.TestCase): - - def testLonelyClient(self): - host = 'nosuchhostexists' - port = 54321 - method = 'test method' - deadline = time.time() + _TIMEOUT - after_deadline = deadline + _AFTER_DELAY - metadata_tag = object() - finish_tag = object() - - completion_queue = _low.CompletionQueue() - channel = _low.Channel('%s:%d' % (host, port), None) - client_call = _low.Call(channel, completion_queue, method, host, deadline) - - client_call.invoke(completion_queue, metadata_tag, finish_tag) - first_event = completion_queue.get(after_deadline) - self.assertIsNotNone(first_event) - second_event = completion_queue.get(after_deadline) - self.assertIsNotNone(second_event) - kinds = [event.kind for event in (first_event, second_event)] - six.assertCountEqual(self, - (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH), - kinds) - - self.assertIsNone(completion_queue.get(after_deadline)) - - completion_queue.stop() - stop_event = completion_queue.get(_FUTURE) - self.assertEqual(_low.Event.Kind.STOP, stop_event.kind) - - del client_call - del channel - del completion_queue - - -def _drive_completion_queue(completion_queue, event_queue): - while True: - event = completion_queue.get(_FUTURE) - if event.kind is _low.Event.Kind.STOP: - break - event_queue.put(event) - - -class EchoTest(unittest.TestCase): - - def setUp(self): - self.host = 'localhost' - - self.server_completion_queue = _low.CompletionQueue() - self.server = _low.Server(self.server_completion_queue) - port = self.server.add_http2_addr('[::]:0') - self.server.start() - self.server_events = queue.Queue() - self.server_completion_queue_thread = threading.Thread( - target=_drive_completion_queue, - args=(self.server_completion_queue, self.server_events)) - self.server_completion_queue_thread.start() - - self.client_completion_queue = _low.CompletionQueue() - self.channel = _low.Channel('%s:%d' % (self.host, port), None) - self.client_events = queue.Queue() - self.client_completion_queue_thread = threading.Thread( - target=_drive_completion_queue, - args=(self.client_completion_queue, self.client_events)) - self.client_completion_queue_thread.start() - - def tearDown(self): - self.server.stop() - self.server.cancel_all_calls() - self.server_completion_queue.stop() - self.client_completion_queue.stop() - self.server_completion_queue_thread.join() - self.client_completion_queue_thread.join() - del self.server - - def _perform_echo_test(self, test_data): - method = 'test method' - details = 'test details' - server_leading_metadata_key = 'my_server_leading_key' - server_leading_metadata_value = 'my_server_leading_value' - server_trailing_metadata_key = 'my_server_trailing_key' - server_trailing_metadata_value = 'my_server_trailing_value' - client_metadata_key = 'my_client_key' - client_metadata_value = 'my_client_value' - server_leading_binary_metadata_key = 'my_server_leading_key-bin' - server_leading_binary_metadata_value = b'\0'*2047 - server_trailing_binary_metadata_key = 'my_server_trailing_key-bin' - server_trailing_binary_metadata_value = b'\0'*2047 - client_binary_metadata_key = 'my_client_key-bin' - client_binary_metadata_value = b'\0'*2047 - deadline = _FUTURE - metadata_tag = object() - finish_tag = object() - write_tag = object() - complete_tag = object() - service_tag = object() - read_tag = object() - status_tag = object() - - server_data = [] - client_data = [] - - client_call = _low.Call(self.channel, self.client_completion_queue, - method, self.host, deadline) - client_call.add_metadata(client_metadata_key, client_metadata_value) - client_call.add_metadata(client_binary_metadata_key, - client_binary_metadata_value) - - client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) - - self.server.service(service_tag) - service_accepted = self.server_events.get() - self.assertIsNotNone(service_accepted) - self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED) - self.assertIs(service_accepted.tag, service_tag) - self.assertEqual(method.encode(), service_accepted.service_acceptance.method) - self.assertEqual(self.host.encode(), service_accepted.service_acceptance.host) - self.assertIsNotNone(service_accepted.service_acceptance.call) - metadata = dict(service_accepted.metadata) - self.assertIn(client_metadata_key.encode(), metadata) - self.assertEqual(client_metadata_value.encode(), metadata[client_metadata_key.encode()]) - self.assertIn(client_binary_metadata_key.encode(), metadata) - self.assertEqual(client_binary_metadata_value, - metadata[client_binary_metadata_key.encode()]) - server_call = service_accepted.service_acceptance.call - server_call.accept(self.server_completion_queue, finish_tag) - server_call.add_metadata(server_leading_metadata_key, - server_leading_metadata_value) - server_call.add_metadata(server_leading_binary_metadata_key, - server_leading_binary_metadata_value) - server_call.premetadata() - - metadata_accepted = self.client_events.get() - self.assertIsNotNone(metadata_accepted) - self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind) - self.assertEqual(metadata_tag, metadata_accepted.tag) - metadata = dict(metadata_accepted.metadata) - self.assertIn(server_leading_metadata_key.encode(), metadata) - self.assertEqual(server_leading_metadata_value.encode(), - metadata[server_leading_metadata_key.encode()]) - self.assertIn(server_leading_binary_metadata_key.encode(), metadata) - self.assertEqual(server_leading_binary_metadata_value, - metadata[server_leading_binary_metadata_key.encode()]) - - for datum in test_data: - client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS) - write_accepted = self.client_events.get() - self.assertIsNotNone(write_accepted) - self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED) - self.assertIs(write_accepted.tag, write_tag) - self.assertIs(write_accepted.write_accepted, True) - - server_call.read(read_tag) - read_accepted = self.server_events.get() - self.assertIsNotNone(read_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertEqual(read_tag, read_accepted.tag) - self.assertIsNotNone(read_accepted.bytes) - server_data.append(read_accepted.bytes) - - server_call.write(read_accepted.bytes, write_tag, 0) - write_accepted = self.server_events.get() - self.assertIsNotNone(write_accepted) - self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind) - self.assertEqual(write_tag, write_accepted.tag) - self.assertTrue(write_accepted.write_accepted) - - client_call.read(read_tag) - read_accepted = self.client_events.get() - self.assertIsNotNone(read_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertEqual(read_tag, read_accepted.tag) - self.assertIsNotNone(read_accepted.bytes) - client_data.append(read_accepted.bytes) - - client_call.complete(complete_tag) - complete_accepted = self.client_events.get() - self.assertIsNotNone(complete_accepted) - self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED) - self.assertIs(complete_accepted.tag, complete_tag) - self.assertIs(complete_accepted.complete_accepted, True) - - server_call.read(read_tag) - read_accepted = self.server_events.get() - self.assertIsNotNone(read_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertEqual(read_tag, read_accepted.tag) - self.assertIsNone(read_accepted.bytes) - - server_call.add_metadata(server_trailing_metadata_key, - server_trailing_metadata_value) - server_call.add_metadata(server_trailing_binary_metadata_key, - server_trailing_binary_metadata_value) - - server_call.status(_low.Status(_low.Code.OK, details), status_tag) - server_terminal_event_one = self.server_events.get() - server_terminal_event_two = self.server_events.get() - if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED: - status_accepted = server_terminal_event_one - rpc_accepted = server_terminal_event_two - else: - status_accepted = server_terminal_event_two - rpc_accepted = server_terminal_event_one - self.assertIsNotNone(status_accepted) - self.assertIsNotNone(rpc_accepted) - self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind) - self.assertEqual(status_tag, status_accepted.tag) - self.assertTrue(status_accepted.complete_accepted) - self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) - self.assertEqual(finish_tag, rpc_accepted.tag) - self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status) - - client_call.read(read_tag) - client_terminal_event_one = self.client_events.get() - client_terminal_event_two = self.client_events.get() - if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: - read_accepted = client_terminal_event_one - finish_accepted = client_terminal_event_two - else: - read_accepted = client_terminal_event_two - finish_accepted = client_terminal_event_one - self.assertIsNotNone(read_accepted) - self.assertIsNotNone(finish_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertEqual(read_tag, read_accepted.tag) - self.assertIsNone(read_accepted.bytes) - self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind) - self.assertEqual(finish_tag, finish_accepted.tag) - self.assertEqual(_low.Status(_low.Code.OK, details.encode()), finish_accepted.status) - metadata = dict(finish_accepted.metadata) - self.assertIn(server_trailing_metadata_key.encode(), metadata) - self.assertEqual(server_trailing_metadata_value.encode(), - metadata[server_trailing_metadata_key.encode()]) - self.assertIn(server_trailing_binary_metadata_key.encode(), metadata) - self.assertEqual(server_trailing_binary_metadata_value, - metadata[server_trailing_binary_metadata_key.encode()]) - self.assertSetEqual(set(key for key, _ in finish_accepted.metadata), - set((server_trailing_metadata_key.encode(), - server_trailing_binary_metadata_key.encode(),))) - - self.assertSequenceEqual(test_data, server_data) - self.assertSequenceEqual(test_data, client_data) - - def testNoEcho(self): - self._perform_echo_test(()) - - def testOneByteEcho(self): - self._perform_echo_test([b'\x07']) - - def testOneManyByteEcho(self): - self._perform_echo_test([_BYTE_SEQUENCE]) - - def testManyOneByteEchoes(self): - self._perform_echo_test( - [_BYTE_SEQUENCE[i:i+1] for i in range(len(_BYTE_SEQUENCE))]) - - def testManyManyByteEchoes(self): - self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE) - - -class CancellationTest(unittest.TestCase): - - def setUp(self): - self.host = 'localhost' - - self.server_completion_queue = _low.CompletionQueue() - self.server = _low.Server(self.server_completion_queue) - port = self.server.add_http2_addr('[::]:0') - self.server.start() - self.server_events = queue.Queue() - self.server_completion_queue_thread = threading.Thread( - target=_drive_completion_queue, - args=(self.server_completion_queue, self.server_events)) - self.server_completion_queue_thread.start() - - self.client_completion_queue = _low.CompletionQueue() - self.channel = _low.Channel('%s:%d' % (self.host, port), None) - self.client_events = queue.Queue() - self.client_completion_queue_thread = threading.Thread( - target=_drive_completion_queue, - args=(self.client_completion_queue, self.client_events)) - self.client_completion_queue_thread.start() - - def tearDown(self): - self.server.stop() - self.server.cancel_all_calls() - self.server_completion_queue.stop() - self.client_completion_queue.stop() - self.server_completion_queue_thread.join() - self.client_completion_queue_thread.join() - del self.server - - def testCancellation(self): - method = 'test method' - deadline = _FUTURE - metadata_tag = object() - finish_tag = object() - write_tag = object() - service_tag = object() - read_tag = object() - test_data = _BYTE_SEQUENCE_SEQUENCE - - server_data = [] - client_data = [] - - client_call = _low.Call(self.channel, self.client_completion_queue, - method, self.host, deadline) - - client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) - - self.server.service(service_tag) - service_accepted = self.server_events.get() - server_call = service_accepted.service_acceptance.call - - server_call.accept(self.server_completion_queue, finish_tag) - server_call.premetadata() - - metadata_accepted = self.client_events.get() - self.assertIsNotNone(metadata_accepted) - - for datum in test_data: - client_call.write(datum, write_tag, 0) - write_accepted = self.client_events.get() - - server_call.read(read_tag) - read_accepted = self.server_events.get() - server_data.append(read_accepted.bytes) - - server_call.write(read_accepted.bytes, write_tag, 0) - write_accepted = self.server_events.get() - self.assertIsNotNone(write_accepted) - - client_call.read(read_tag) - read_accepted = self.client_events.get() - client_data.append(read_accepted.bytes) - - client_call.cancel() - # cancel() is idempotent. - client_call.cancel() - client_call.cancel() - client_call.cancel() - - server_call.read(read_tag) - - server_terminal_event_one = self.server_events.get() - server_terminal_event_two = self.server_events.get() - if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: - read_accepted = server_terminal_event_one - rpc_accepted = server_terminal_event_two - else: - read_accepted = server_terminal_event_two - rpc_accepted = server_terminal_event_one - self.assertIsNotNone(read_accepted) - self.assertIsNotNone(rpc_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertIsNone(read_accepted.bytes) - self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) - self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status) - - finish_event = self.client_events.get() - self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind) - self.assertEqual(_low.Status(_low.Code.CANCELLED, b'Cancelled'), - finish_event.status) - - self.assertSequenceEqual(test_data, server_data) - self.assertSequenceEqual(test_data, client_data) - - -class ExpirationTest(unittest.TestCase): - - @unittest.skip('TODO(nathaniel): Expiration test!') - def testExpiration(self): - pass - - -if __name__ == '__main__': - unittest.main(verbosity=2) - diff --git a/src/python/grpcio/tests/unit/_adapter/_low_test.py b/src/python/grpcio/tests/unit/_adapter/_low_test.py deleted file mode 100644 index e09a1f2564..0000000000 --- a/src/python/grpcio/tests/unit/_adapter/_low_test.py +++ /dev/null @@ -1,319 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import threading -import time -import unittest - -from grpc import _grpcio_metadata -from grpc._adapter import _types -from grpc._adapter import _low -from tests.unit import test_common - - -def wait_for_events(completion_queues, deadline): - """ - Args: - completion_queues: list of completion queues to wait for events on - deadline: absolute deadline to wait until - - Returns: - a sequence of events of length len(completion_queues). - """ - - results = [None] * len(completion_queues) - lock = threading.Lock() - threads = [] - def set_ith_result(i, completion_queue): - result = completion_queue.next(deadline) - with lock: - results[i] = result - for i, completion_queue in enumerate(completion_queues): - thread = threading.Thread(target=set_ith_result, - args=[i, completion_queue]) - thread.start() - threads.append(thread) - for thread in threads: - thread.join() - return results - - -class InsecureServerInsecureClient(unittest.TestCase): - - def setUp(self): - self.server_completion_queue = _low.CompletionQueue() - self.server = _low.Server(self.server_completion_queue, []) - self.port = self.server.add_http2_port('[::]:0') - self.client_completion_queue = _low.CompletionQueue() - self.client_channel = _low.Channel('localhost:%d'%self.port, []) - - self.server.start() - - def tearDown(self): - self.server.shutdown() - del self.client_channel - - self.client_completion_queue.shutdown() - while (self.client_completion_queue.next(float('+inf')).type != - _types.EventType.QUEUE_SHUTDOWN): - pass - self.server_completion_queue.shutdown() - while (self.server_completion_queue.next(float('+inf')).type != - _types.EventType.QUEUE_SHUTDOWN): - pass - - del self.client_completion_queue - del self.server_completion_queue - del self.server - - def testEcho(self): - deadline = time.time() + 5 - event_time_tolerance = 2 - deadline_tolerance = 0.25 - client_metadata_ascii_key = 'key' - client_metadata_ascii_value = 'val' - client_metadata_bin_key = 'key-bin' - client_metadata_bin_value = b'\0'*1000 - server_initial_metadata_key = 'init_me_me_me' - server_initial_metadata_value = 'whodawha?' - server_trailing_metadata_key = 'california_is_in_a_drought' - server_trailing_metadata_value = 'zomg it is' - server_status_code = _types.StatusCode.OK - server_status_details = 'our work is never over' - request = 'blarghaflargh' - response = 'his name is robert paulson' - method = 'twinkies' - host = 'hostess' - server_request_tag = object() - request_call_result = self.server.request_call(self.server_completion_queue, - server_request_tag) - - self.assertEqual(_types.CallError.OK, request_call_result) - - client_call_tag = object() - client_call = self.client_channel.create_call( - self.client_completion_queue, method, host, deadline) - client_initial_metadata = [ - (client_metadata_ascii_key, client_metadata_ascii_value), - (client_metadata_bin_key, client_metadata_bin_value) - ] - client_start_batch_result = client_call.start_batch([ - _types.OpArgs.send_initial_metadata(client_initial_metadata), - _types.OpArgs.send_message(request, 0), - _types.OpArgs.send_close_from_client(), - _types.OpArgs.recv_initial_metadata(), - _types.OpArgs.recv_message(), - _types.OpArgs.recv_status_on_client() - ], client_call_tag) - self.assertEqual(_types.CallError.OK, client_start_batch_result) - - client_no_event, request_event, = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - time.time() + event_time_tolerance) - self.assertEqual(client_no_event, None) - self.assertEqual(_types.EventType.OP_COMPLETE, request_event.type) - self.assertIsInstance(request_event.call, _low.Call) - self.assertIs(server_request_tag, request_event.tag) - self.assertEqual(1, len(request_event.results)) - received_initial_metadata = request_event.results[0].initial_metadata - # Check that our metadata were transmitted - self.assertTrue(test_common.metadata_transmitted(client_initial_metadata, - received_initial_metadata)) - # Check that Python's user agent string is a part of the full user agent - # string - received_initial_metadata_dict = dict(received_initial_metadata) - self.assertIn(b'user-agent', received_initial_metadata_dict) - self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__).encode(), - received_initial_metadata_dict[b'user-agent']) - self.assertEqual(method.encode(), request_event.call_details.method) - self.assertEqual(host.encode(), request_event.call_details.host) - self.assertLess(abs(deadline - request_event.call_details.deadline), - deadline_tolerance) - - # Check that the channel is connected, and that both it and the call have - # the proper target and peer; do this after the first flurry of messages to - # avoid the possibility that connection was delayed by the core until the - # first message was sent. - self.assertEqual(_types.ConnectivityState.READY, - self.client_channel.check_connectivity_state(False)) - self.assertIsNotNone(self.client_channel.target()) - self.assertIsNotNone(client_call.peer()) - - server_call_tag = object() - server_call = request_event.call - server_initial_metadata = [ - (server_initial_metadata_key, server_initial_metadata_value) - ] - server_trailing_metadata = [ - (server_trailing_metadata_key, server_trailing_metadata_value) - ] - server_start_batch_result = server_call.start_batch([ - _types.OpArgs.send_initial_metadata(server_initial_metadata), - _types.OpArgs.recv_message(), - _types.OpArgs.send_message(response, 0), - _types.OpArgs.recv_close_on_server(), - _types.OpArgs.send_status_from_server( - server_trailing_metadata, server_status_code, server_status_details) - ], server_call_tag) - self.assertEqual(_types.CallError.OK, server_start_batch_result) - - client_event, server_event, = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - time.time() + event_time_tolerance) - - self.assertEqual(6, len(client_event.results)) - found_client_op_types = set() - for client_result in client_event.results: - # we expect each op type to be unique - self.assertNotIn(client_result.type, found_client_op_types) - found_client_op_types.add(client_result.type) - if client_result.type == _types.OpType.RECV_INITIAL_METADATA: - self.assertTrue( - test_common.metadata_transmitted(server_initial_metadata, - client_result.initial_metadata)) - elif client_result.type == _types.OpType.RECV_MESSAGE: - self.assertEqual(response.encode(), client_result.message) - elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT: - self.assertTrue( - test_common.metadata_transmitted(server_trailing_metadata, - client_result.trailing_metadata)) - self.assertEqual(server_status_details.encode(), client_result.status.details) - self.assertEqual(server_status_code, client_result.status.code) - self.assertEqual(set([ - _types.OpType.SEND_INITIAL_METADATA, - _types.OpType.SEND_MESSAGE, - _types.OpType.SEND_CLOSE_FROM_CLIENT, - _types.OpType.RECV_INITIAL_METADATA, - _types.OpType.RECV_MESSAGE, - _types.OpType.RECV_STATUS_ON_CLIENT - ]), found_client_op_types) - - self.assertEqual(5, len(server_event.results)) - found_server_op_types = set() - for server_result in server_event.results: - self.assertNotIn(client_result.type, found_server_op_types) - found_server_op_types.add(server_result.type) - if server_result.type == _types.OpType.RECV_MESSAGE: - self.assertEqual(request.encode(), server_result.message) - elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER: - self.assertFalse(server_result.cancelled) - self.assertEqual(set([ - _types.OpType.SEND_INITIAL_METADATA, - _types.OpType.RECV_MESSAGE, - _types.OpType.SEND_MESSAGE, - _types.OpType.RECV_CLOSE_ON_SERVER, - _types.OpType.SEND_STATUS_FROM_SERVER - ]), found_server_op_types) - - del client_call - del server_call - - -class HangingServerShutdown(unittest.TestCase): - - def setUp(self): - self.server_completion_queue = _low.CompletionQueue() - self.server = _low.Server(self.server_completion_queue, []) - self.port = self.server.add_http2_port('[::]:0') - self.client_completion_queue = _low.CompletionQueue() - self.client_channel = _low.Channel('localhost:%d'%self.port, []) - - self.server.start() - - def tearDown(self): - self.server.shutdown() - del self.client_channel - - self.client_completion_queue.shutdown() - self.server_completion_queue.shutdown() - while True: - client_event, server_event = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - float("+inf")) - if (client_event.type == _types.EventType.QUEUE_SHUTDOWN and - server_event.type == _types.EventType.QUEUE_SHUTDOWN): - break - - del self.client_completion_queue - del self.server_completion_queue - del self.server - - def testHangingServerCall(self): - deadline = time.time() + 5 - deadline_tolerance = 0.25 - event_time_tolerance = 2 - cancel_all_calls_time_tolerance = 0.5 - request = 'blarghaflargh' - method = 'twinkies' - host = 'hostess' - server_request_tag = object() - request_call_result = self.server.request_call(self.server_completion_queue, - server_request_tag) - - client_call_tag = object() - client_call = self.client_channel.create_call(self.client_completion_queue, - method, host, deadline) - client_start_batch_result = client_call.start_batch([ - _types.OpArgs.send_initial_metadata([]), - _types.OpArgs.send_message(request, 0), - _types.OpArgs.send_close_from_client(), - _types.OpArgs.recv_initial_metadata(), - _types.OpArgs.recv_message(), - _types.OpArgs.recv_status_on_client() - ], client_call_tag) - - client_no_event, request_event, = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - time.time() + event_time_tolerance) - - # Now try to shutdown the server and expect that we see server shutdown - # almost immediately after calling cancel_all_calls. - - # First attempt to cancel all calls before shutting down, and expect - # our state machine to catch the erroneous API use. - with self.assertRaises(RuntimeError): - self.server.cancel_all_calls() - - shutdown_tag = object() - self.server.shutdown(shutdown_tag) - pre_cancel_timestamp = time.time() - self.server.cancel_all_calls() - finish_shutdown_timestamp = None - client_call_event, server_shutdown_event = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - time.time() + event_time_tolerance) - self.assertIs(shutdown_tag, server_shutdown_event.tag) - self.assertGreater(pre_cancel_timestamp + cancel_all_calls_time_tolerance, - time.time()) - - del client_call - - -if __name__ == '__main__': - unittest.main(verbosity=2) |