diff options
Diffstat (limited to 'src/python/grpcio/tests/unit')
27 files changed, 1589 insertions, 1957 deletions
diff --git a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py b/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py deleted file mode 100644 index 22d4b019c7..0000000000 --- a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py +++ /dev/null @@ -1,428 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests for the old '_low'.""" - -import threading -import time -import unittest - -import six -from six.moves import queue - -from grpc._adapter import _intermediary_low as _low - -_STREAM_LENGTH = 300 -_TIMEOUT = 5 -_AFTER_DELAY = 2 -_FUTURE = time.time() + 60 * 60 * 24 -_BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200 -_BYTE_SEQUENCE_SEQUENCE = tuple( - bytes(bytearray((row + column) % 256 for column in range(row))) - for row in range(_STREAM_LENGTH)) - - -class LonelyClientTest(unittest.TestCase): - - def testLonelyClient(self): - host = 'nosuchhostexists' - port = 54321 - method = 'test method' - deadline = time.time() + _TIMEOUT - after_deadline = deadline + _AFTER_DELAY - metadata_tag = object() - finish_tag = object() - - completion_queue = _low.CompletionQueue() - channel = _low.Channel('%s:%d' % (host, port), None) - client_call = _low.Call(channel, completion_queue, method, host, deadline) - - client_call.invoke(completion_queue, metadata_tag, finish_tag) - first_event = completion_queue.get(after_deadline) - self.assertIsNotNone(first_event) - second_event = completion_queue.get(after_deadline) - self.assertIsNotNone(second_event) - kinds = [event.kind for event in (first_event, second_event)] - six.assertCountEqual(self, - (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH), - kinds) - - self.assertIsNone(completion_queue.get(after_deadline)) - - completion_queue.stop() - stop_event = completion_queue.get(_FUTURE) - self.assertEqual(_low.Event.Kind.STOP, stop_event.kind) - - del client_call - del channel - del completion_queue - - -def _drive_completion_queue(completion_queue, event_queue): - while True: - event = completion_queue.get(_FUTURE) - if event.kind is _low.Event.Kind.STOP: - break - event_queue.put(event) - - -class EchoTest(unittest.TestCase): - - def setUp(self): - self.host = 'localhost' - - self.server_completion_queue = _low.CompletionQueue() - self.server = _low.Server(self.server_completion_queue) - port = self.server.add_http2_addr('[::]:0') - self.server.start() - self.server_events = queue.Queue() - self.server_completion_queue_thread = threading.Thread( - target=_drive_completion_queue, - args=(self.server_completion_queue, self.server_events)) - self.server_completion_queue_thread.start() - - self.client_completion_queue = _low.CompletionQueue() - self.channel = _low.Channel('%s:%d' % (self.host, port), None) - self.client_events = queue.Queue() - self.client_completion_queue_thread = threading.Thread( - target=_drive_completion_queue, - args=(self.client_completion_queue, self.client_events)) - self.client_completion_queue_thread.start() - - def tearDown(self): - self.server.stop() - self.server.cancel_all_calls() - self.server_completion_queue.stop() - self.client_completion_queue.stop() - self.server_completion_queue_thread.join() - self.client_completion_queue_thread.join() - del self.server - - def _perform_echo_test(self, test_data): - method = 'test method' - details = 'test details' - server_leading_metadata_key = 'my_server_leading_key' - server_leading_metadata_value = 'my_server_leading_value' - server_trailing_metadata_key = 'my_server_trailing_key' - server_trailing_metadata_value = 'my_server_trailing_value' - client_metadata_key = 'my_client_key' - client_metadata_value = 'my_client_value' - server_leading_binary_metadata_key = 'my_server_leading_key-bin' - server_leading_binary_metadata_value = b'\0'*2047 - server_trailing_binary_metadata_key = 'my_server_trailing_key-bin' - server_trailing_binary_metadata_value = b'\0'*2047 - client_binary_metadata_key = 'my_client_key-bin' - client_binary_metadata_value = b'\0'*2047 - deadline = _FUTURE - metadata_tag = object() - finish_tag = object() - write_tag = object() - complete_tag = object() - service_tag = object() - read_tag = object() - status_tag = object() - - server_data = [] - client_data = [] - - client_call = _low.Call(self.channel, self.client_completion_queue, - method, self.host, deadline) - client_call.add_metadata(client_metadata_key, client_metadata_value) - client_call.add_metadata(client_binary_metadata_key, - client_binary_metadata_value) - - client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) - - self.server.service(service_tag) - service_accepted = self.server_events.get() - self.assertIsNotNone(service_accepted) - self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED) - self.assertIs(service_accepted.tag, service_tag) - self.assertEqual(method, service_accepted.service_acceptance.method) - self.assertEqual(self.host, service_accepted.service_acceptance.host) - self.assertIsNotNone(service_accepted.service_acceptance.call) - metadata = dict(service_accepted.metadata) - self.assertIn(client_metadata_key, metadata) - self.assertEqual(client_metadata_value, metadata[client_metadata_key]) - self.assertIn(client_binary_metadata_key, metadata) - self.assertEqual(client_binary_metadata_value, - metadata[client_binary_metadata_key]) - server_call = service_accepted.service_acceptance.call - server_call.accept(self.server_completion_queue, finish_tag) - server_call.add_metadata(server_leading_metadata_key, - server_leading_metadata_value) - server_call.add_metadata(server_leading_binary_metadata_key, - server_leading_binary_metadata_value) - server_call.premetadata() - - metadata_accepted = self.client_events.get() - self.assertIsNotNone(metadata_accepted) - self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind) - self.assertEqual(metadata_tag, metadata_accepted.tag) - metadata = dict(metadata_accepted.metadata) - self.assertIn(server_leading_metadata_key, metadata) - self.assertEqual(server_leading_metadata_value, - metadata[server_leading_metadata_key]) - self.assertIn(server_leading_binary_metadata_key, metadata) - self.assertEqual(server_leading_binary_metadata_value, - metadata[server_leading_binary_metadata_key]) - - for datum in test_data: - client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS) - write_accepted = self.client_events.get() - self.assertIsNotNone(write_accepted) - self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED) - self.assertIs(write_accepted.tag, write_tag) - self.assertIs(write_accepted.write_accepted, True) - - server_call.read(read_tag) - read_accepted = self.server_events.get() - self.assertIsNotNone(read_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertEqual(read_tag, read_accepted.tag) - self.assertIsNotNone(read_accepted.bytes) - server_data.append(read_accepted.bytes) - - server_call.write(read_accepted.bytes, write_tag, 0) - write_accepted = self.server_events.get() - self.assertIsNotNone(write_accepted) - self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind) - self.assertEqual(write_tag, write_accepted.tag) - self.assertTrue(write_accepted.write_accepted) - - client_call.read(read_tag) - read_accepted = self.client_events.get() - self.assertIsNotNone(read_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertEqual(read_tag, read_accepted.tag) - self.assertIsNotNone(read_accepted.bytes) - client_data.append(read_accepted.bytes) - - client_call.complete(complete_tag) - complete_accepted = self.client_events.get() - self.assertIsNotNone(complete_accepted) - self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED) - self.assertIs(complete_accepted.tag, complete_tag) - self.assertIs(complete_accepted.complete_accepted, True) - - server_call.read(read_tag) - read_accepted = self.server_events.get() - self.assertIsNotNone(read_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertEqual(read_tag, read_accepted.tag) - self.assertIsNone(read_accepted.bytes) - - server_call.add_metadata(server_trailing_metadata_key, - server_trailing_metadata_value) - server_call.add_metadata(server_trailing_binary_metadata_key, - server_trailing_binary_metadata_value) - - server_call.status(_low.Status(_low.Code.OK, details), status_tag) - server_terminal_event_one = self.server_events.get() - server_terminal_event_two = self.server_events.get() - if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED: - status_accepted = server_terminal_event_one - rpc_accepted = server_terminal_event_two - else: - status_accepted = server_terminal_event_two - rpc_accepted = server_terminal_event_one - self.assertIsNotNone(status_accepted) - self.assertIsNotNone(rpc_accepted) - self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind) - self.assertEqual(status_tag, status_accepted.tag) - self.assertTrue(status_accepted.complete_accepted) - self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) - self.assertEqual(finish_tag, rpc_accepted.tag) - self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status) - - client_call.read(read_tag) - client_terminal_event_one = self.client_events.get() - client_terminal_event_two = self.client_events.get() - if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: - read_accepted = client_terminal_event_one - finish_accepted = client_terminal_event_two - else: - read_accepted = client_terminal_event_two - finish_accepted = client_terminal_event_one - self.assertIsNotNone(read_accepted) - self.assertIsNotNone(finish_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertEqual(read_tag, read_accepted.tag) - self.assertIsNone(read_accepted.bytes) - self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind) - self.assertEqual(finish_tag, finish_accepted.tag) - self.assertEqual(_low.Status(_low.Code.OK, details), finish_accepted.status) - metadata = dict(finish_accepted.metadata) - self.assertIn(server_trailing_metadata_key, metadata) - self.assertEqual(server_trailing_metadata_value, - metadata[server_trailing_metadata_key]) - self.assertIn(server_trailing_binary_metadata_key, metadata) - self.assertEqual(server_trailing_binary_metadata_value, - metadata[server_trailing_binary_metadata_key]) - self.assertSetEqual(set(key for key, _ in finish_accepted.metadata), - set((server_trailing_metadata_key, - server_trailing_binary_metadata_key,))) - - self.assertSequenceEqual(test_data, server_data) - self.assertSequenceEqual(test_data, client_data) - - def testNoEcho(self): - self._perform_echo_test(()) - - def testOneByteEcho(self): - self._perform_echo_test([b'\x07']) - - def testOneManyByteEcho(self): - self._perform_echo_test([_BYTE_SEQUENCE]) - - def testManyOneByteEchoes(self): - self._perform_echo_test(_BYTE_SEQUENCE) - - def testManyManyByteEchoes(self): - self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE) - - -class CancellationTest(unittest.TestCase): - - def setUp(self): - self.host = 'localhost' - - self.server_completion_queue = _low.CompletionQueue() - self.server = _low.Server(self.server_completion_queue) - port = self.server.add_http2_addr('[::]:0') - self.server.start() - self.server_events = queue.Queue() - self.server_completion_queue_thread = threading.Thread( - target=_drive_completion_queue, - args=(self.server_completion_queue, self.server_events)) - self.server_completion_queue_thread.start() - - self.client_completion_queue = _low.CompletionQueue() - self.channel = _low.Channel('%s:%d' % (self.host, port), None) - self.client_events = queue.Queue() - self.client_completion_queue_thread = threading.Thread( - target=_drive_completion_queue, - args=(self.client_completion_queue, self.client_events)) - self.client_completion_queue_thread.start() - - def tearDown(self): - self.server.stop() - self.server.cancel_all_calls() - self.server_completion_queue.stop() - self.client_completion_queue.stop() - self.server_completion_queue_thread.join() - self.client_completion_queue_thread.join() - del self.server - - def testCancellation(self): - method = 'test method' - deadline = _FUTURE - metadata_tag = object() - finish_tag = object() - write_tag = object() - service_tag = object() - read_tag = object() - test_data = _BYTE_SEQUENCE_SEQUENCE - - server_data = [] - client_data = [] - - client_call = _low.Call(self.channel, self.client_completion_queue, - method, self.host, deadline) - - client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) - - self.server.service(service_tag) - service_accepted = self.server_events.get() - server_call = service_accepted.service_acceptance.call - - server_call.accept(self.server_completion_queue, finish_tag) - server_call.premetadata() - - metadata_accepted = self.client_events.get() - self.assertIsNotNone(metadata_accepted) - - for datum in test_data: - client_call.write(datum, write_tag, 0) - write_accepted = self.client_events.get() - - server_call.read(read_tag) - read_accepted = self.server_events.get() - server_data.append(read_accepted.bytes) - - server_call.write(read_accepted.bytes, write_tag, 0) - write_accepted = self.server_events.get() - self.assertIsNotNone(write_accepted) - - client_call.read(read_tag) - read_accepted = self.client_events.get() - client_data.append(read_accepted.bytes) - - client_call.cancel() - # cancel() is idempotent. - client_call.cancel() - client_call.cancel() - client_call.cancel() - - server_call.read(read_tag) - - server_terminal_event_one = self.server_events.get() - server_terminal_event_two = self.server_events.get() - if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: - read_accepted = server_terminal_event_one - rpc_accepted = server_terminal_event_two - else: - read_accepted = server_terminal_event_two - rpc_accepted = server_terminal_event_one - self.assertIsNotNone(read_accepted) - self.assertIsNotNone(rpc_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertIsNone(read_accepted.bytes) - self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) - self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status) - - finish_event = self.client_events.get() - self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind) - self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'), - finish_event.status) - - self.assertSequenceEqual(test_data, server_data) - self.assertSequenceEqual(test_data, client_data) - - -class ExpirationTest(unittest.TestCase): - - @unittest.skip('TODO(nathaniel): Expiration test!') - def testExpiration(self): - pass - - -if __name__ == '__main__': - unittest.main(verbosity=2) - diff --git a/src/python/grpcio/tests/unit/_adapter/_low_test.py b/src/python/grpcio/tests/unit/_adapter/_low_test.py deleted file mode 100644 index ec46617996..0000000000 --- a/src/python/grpcio/tests/unit/_adapter/_low_test.py +++ /dev/null @@ -1,319 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import threading -import time -import unittest - -from grpc import _grpcio_metadata -from grpc._adapter import _types -from grpc._adapter import _low -from tests.unit import test_common - - -def wait_for_events(completion_queues, deadline): - """ - Args: - completion_queues: list of completion queues to wait for events on - deadline: absolute deadline to wait until - - Returns: - a sequence of events of length len(completion_queues). - """ - - results = [None] * len(completion_queues) - lock = threading.Lock() - threads = [] - def set_ith_result(i, completion_queue): - result = completion_queue.next(deadline) - with lock: - results[i] = result - for i, completion_queue in enumerate(completion_queues): - thread = threading.Thread(target=set_ith_result, - args=[i, completion_queue]) - thread.start() - threads.append(thread) - for thread in threads: - thread.join() - return results - - -class InsecureServerInsecureClient(unittest.TestCase): - - def setUp(self): - self.server_completion_queue = _low.CompletionQueue() - self.server = _low.Server(self.server_completion_queue, []) - self.port = self.server.add_http2_port('[::]:0') - self.client_completion_queue = _low.CompletionQueue() - self.client_channel = _low.Channel('localhost:%d'%self.port, []) - - self.server.start() - - def tearDown(self): - self.server.shutdown() - del self.client_channel - - self.client_completion_queue.shutdown() - while (self.client_completion_queue.next(float('+inf')).type != - _types.EventType.QUEUE_SHUTDOWN): - pass - self.server_completion_queue.shutdown() - while (self.server_completion_queue.next(float('+inf')).type != - _types.EventType.QUEUE_SHUTDOWN): - pass - - del self.client_completion_queue - del self.server_completion_queue - del self.server - - def testEcho(self): - deadline = time.time() + 5 - event_time_tolerance = 2 - deadline_tolerance = 0.25 - client_metadata_ascii_key = 'key' - client_metadata_ascii_value = 'val' - client_metadata_bin_key = 'key-bin' - client_metadata_bin_value = b'\0'*1000 - server_initial_metadata_key = 'init_me_me_me' - server_initial_metadata_value = 'whodawha?' - server_trailing_metadata_key = 'california_is_in_a_drought' - server_trailing_metadata_value = 'zomg it is' - server_status_code = _types.StatusCode.OK - server_status_details = 'our work is never over' - request = 'blarghaflargh' - response = 'his name is robert paulson' - method = 'twinkies' - host = 'hostess' - server_request_tag = object() - request_call_result = self.server.request_call(self.server_completion_queue, - server_request_tag) - - self.assertEqual(_types.CallError.OK, request_call_result) - - client_call_tag = object() - client_call = self.client_channel.create_call( - self.client_completion_queue, method, host, deadline) - client_initial_metadata = [ - (client_metadata_ascii_key, client_metadata_ascii_value), - (client_metadata_bin_key, client_metadata_bin_value) - ] - client_start_batch_result = client_call.start_batch([ - _types.OpArgs.send_initial_metadata(client_initial_metadata), - _types.OpArgs.send_message(request, 0), - _types.OpArgs.send_close_from_client(), - _types.OpArgs.recv_initial_metadata(), - _types.OpArgs.recv_message(), - _types.OpArgs.recv_status_on_client() - ], client_call_tag) - self.assertEqual(_types.CallError.OK, client_start_batch_result) - - client_no_event, request_event, = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - time.time() + event_time_tolerance) - self.assertEqual(client_no_event, None) - self.assertEqual(_types.EventType.OP_COMPLETE, request_event.type) - self.assertIsInstance(request_event.call, _low.Call) - self.assertIs(server_request_tag, request_event.tag) - self.assertEqual(1, len(request_event.results)) - received_initial_metadata = request_event.results[0].initial_metadata - # Check that our metadata were transmitted - self.assertTrue(test_common.metadata_transmitted(client_initial_metadata, - received_initial_metadata)) - # Check that Python's user agent string is a part of the full user agent - # string - received_initial_metadata_dict = dict(received_initial_metadata) - self.assertIn('user-agent', received_initial_metadata_dict) - self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__), - received_initial_metadata_dict['user-agent']) - self.assertEqual(method, request_event.call_details.method) - self.assertEqual(host, request_event.call_details.host) - self.assertLess(abs(deadline - request_event.call_details.deadline), - deadline_tolerance) - - # Check that the channel is connected, and that both it and the call have - # the proper target and peer; do this after the first flurry of messages to - # avoid the possibility that connection was delayed by the core until the - # first message was sent. - self.assertEqual(_types.ConnectivityState.READY, - self.client_channel.check_connectivity_state(False)) - self.assertIsNotNone(self.client_channel.target()) - self.assertIsNotNone(client_call.peer()) - - server_call_tag = object() - server_call = request_event.call - server_initial_metadata = [ - (server_initial_metadata_key, server_initial_metadata_value) - ] - server_trailing_metadata = [ - (server_trailing_metadata_key, server_trailing_metadata_value) - ] - server_start_batch_result = server_call.start_batch([ - _types.OpArgs.send_initial_metadata(server_initial_metadata), - _types.OpArgs.recv_message(), - _types.OpArgs.send_message(response, 0), - _types.OpArgs.recv_close_on_server(), - _types.OpArgs.send_status_from_server( - server_trailing_metadata, server_status_code, server_status_details) - ], server_call_tag) - self.assertEqual(_types.CallError.OK, server_start_batch_result) - - client_event, server_event, = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - time.time() + event_time_tolerance) - - self.assertEqual(6, len(client_event.results)) - found_client_op_types = set() - for client_result in client_event.results: - # we expect each op type to be unique - self.assertNotIn(client_result.type, found_client_op_types) - found_client_op_types.add(client_result.type) - if client_result.type == _types.OpType.RECV_INITIAL_METADATA: - self.assertTrue( - test_common.metadata_transmitted(server_initial_metadata, - client_result.initial_metadata)) - elif client_result.type == _types.OpType.RECV_MESSAGE: - self.assertEqual(response, client_result.message) - elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT: - self.assertTrue( - test_common.metadata_transmitted(server_trailing_metadata, - client_result.trailing_metadata)) - self.assertEqual(server_status_details, client_result.status.details) - self.assertEqual(server_status_code, client_result.status.code) - self.assertEqual(set([ - _types.OpType.SEND_INITIAL_METADATA, - _types.OpType.SEND_MESSAGE, - _types.OpType.SEND_CLOSE_FROM_CLIENT, - _types.OpType.RECV_INITIAL_METADATA, - _types.OpType.RECV_MESSAGE, - _types.OpType.RECV_STATUS_ON_CLIENT - ]), found_client_op_types) - - self.assertEqual(5, len(server_event.results)) - found_server_op_types = set() - for server_result in server_event.results: - self.assertNotIn(client_result.type, found_server_op_types) - found_server_op_types.add(server_result.type) - if server_result.type == _types.OpType.RECV_MESSAGE: - self.assertEqual(request, server_result.message) - elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER: - self.assertFalse(server_result.cancelled) - self.assertEqual(set([ - _types.OpType.SEND_INITIAL_METADATA, - _types.OpType.RECV_MESSAGE, - _types.OpType.SEND_MESSAGE, - _types.OpType.RECV_CLOSE_ON_SERVER, - _types.OpType.SEND_STATUS_FROM_SERVER - ]), found_server_op_types) - - del client_call - del server_call - - -class HangingServerShutdown(unittest.TestCase): - - def setUp(self): - self.server_completion_queue = _low.CompletionQueue() - self.server = _low.Server(self.server_completion_queue, []) - self.port = self.server.add_http2_port('[::]:0') - self.client_completion_queue = _low.CompletionQueue() - self.client_channel = _low.Channel('localhost:%d'%self.port, []) - - self.server.start() - - def tearDown(self): - self.server.shutdown() - del self.client_channel - - self.client_completion_queue.shutdown() - self.server_completion_queue.shutdown() - while True: - client_event, server_event = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - float("+inf")) - if (client_event.type == _types.EventType.QUEUE_SHUTDOWN and - server_event.type == _types.EventType.QUEUE_SHUTDOWN): - break - - del self.client_completion_queue - del self.server_completion_queue - del self.server - - def testHangingServerCall(self): - deadline = time.time() + 5 - deadline_tolerance = 0.25 - event_time_tolerance = 2 - cancel_all_calls_time_tolerance = 0.5 - request = 'blarghaflargh' - method = 'twinkies' - host = 'hostess' - server_request_tag = object() - request_call_result = self.server.request_call(self.server_completion_queue, - server_request_tag) - - client_call_tag = object() - client_call = self.client_channel.create_call(self.client_completion_queue, - method, host, deadline) - client_start_batch_result = client_call.start_batch([ - _types.OpArgs.send_initial_metadata([]), - _types.OpArgs.send_message(request, 0), - _types.OpArgs.send_close_from_client(), - _types.OpArgs.recv_initial_metadata(), - _types.OpArgs.recv_message(), - _types.OpArgs.recv_status_on_client() - ], client_call_tag) - - client_no_event, request_event, = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - time.time() + event_time_tolerance) - - # Now try to shutdown the server and expect that we see server shutdown - # almost immediately after calling cancel_all_calls. - - # First attempt to cancel all calls before shutting down, and expect - # our state machine to catch the erroneous API use. - with self.assertRaises(RuntimeError): - self.server.cancel_all_calls() - - shutdown_tag = object() - self.server.shutdown(shutdown_tag) - pre_cancel_timestamp = time.time() - self.server.cancel_all_calls() - finish_shutdown_timestamp = None - client_call_event, server_shutdown_event = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - time.time() + event_time_tolerance) - self.assertIs(shutdown_tag, server_shutdown_event.tag) - self.assertGreater(pre_cancel_timestamp + cancel_all_calls_time_tolerance, - time.time()) - - del client_call - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_api_test.py b/src/python/grpcio/tests/unit/_api_test.py new file mode 100644 index 0000000000..2fe89499f5 --- /dev/null +++ b/src/python/grpcio/tests/unit/_api_test.py @@ -0,0 +1,111 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test of gRPC Python's application-layer API.""" + +import unittest + +import six + +import grpc + +from tests.unit import _from_grpc_import_star + + +class AllTest(unittest.TestCase): + + def testAll(self): + expected_grpc_code_elements = ( + 'FutureTimeoutError', + 'FutureCancelledError', + 'Future', + 'ChannelConnectivity', + 'StatusCode', + 'RpcError', + 'RpcContext', + 'Call', + 'ChannelCredentials', + 'CallCredentials', + 'AuthMetadataContext', + 'AuthMetadataPluginCallback', + 'AuthMetadataPlugin', + 'ServerCredentials', + 'UnaryUnaryMultiCallable', + 'UnaryStreamMultiCallable', + 'StreamUnaryMultiCallable', + 'StreamStreamMultiCallable', + 'Channel', + 'ServicerContext', + 'RpcMethodHandler', + 'HandlerCallDetails', + 'GenericRpcHandler', + 'Server', + 'unary_unary_rpc_method_handler', + 'unary_stream_rpc_method_handler', + 'stream_unary_rpc_method_handler', + 'stream_stream_rpc_method_handler', + 'method_handlers_generic_handler', + 'ssl_channel_credentials', + 'metadata_call_credentials', + 'access_token_call_credentials', + 'composite_call_credentials', + 'composite_channel_credentials', + 'ssl_server_credentials', + 'channel_ready_future', + 'insecure_channel', + 'secure_channel', + 'server', + ) + + six.assertCountEqual( + self, expected_grpc_code_elements, + _from_grpc_import_star.GRPC_ELEMENTS) + + +class ChannelConnectivityTest(unittest.TestCase): + + def testChannelConnectivity(self): + self.assertSequenceEqual( + (grpc.ChannelConnectivity.IDLE, + grpc.ChannelConnectivity.CONNECTING, + grpc.ChannelConnectivity.READY, + grpc.ChannelConnectivity.TRANSIENT_FAILURE, + grpc.ChannelConnectivity.SHUTDOWN,), + tuple(grpc.ChannelConnectivity)) + + +class ChannelTest(unittest.TestCase): + + def test_secure_channel(self): + channel_credentials = grpc.ssl_channel_credentials() + channel = grpc.secure_channel('google.com:443', channel_credentials) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/beta/_auth_test.py b/src/python/grpcio/tests/unit/_auth_test.py index 694928a91b..c31f7b06f7 100644 --- a/src/python/grpcio/tests/unit/beta/_auth_test.py +++ b/src/python/grpcio/tests/unit/_auth_test.py @@ -33,7 +33,7 @@ import collections import threading import unittest -from grpc.beta import _auth +from grpc import _auth class MockGoogleCreds(object): diff --git a/src/python/grpcio/tests/unit/_channel_connectivity_test.py b/src/python/grpcio/tests/unit/_channel_connectivity_test.py index a1575efada..ae8de523ec 100644 --- a/src/python/grpcio/tests/unit/_channel_connectivity_test.py +++ b/src/python/grpcio/tests/unit/_channel_connectivity_test.py @@ -135,12 +135,12 @@ class ChannelConnectivityTest(unittest.TestCase): self.assertNotIn( grpc.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities) self.assertNotIn( - grpc.ChannelConnectivity.FATAL_FAILURE, third_connectivities) + grpc.ChannelConnectivity.SHUTDOWN, third_connectivities) self.assertNotIn( grpc.ChannelConnectivity.TRANSIENT_FAILURE, fourth_connectivities) self.assertNotIn( - grpc.ChannelConnectivity.FATAL_FAILURE, fourth_connectivities) + grpc.ChannelConnectivity.SHUTDOWN, fourth_connectivities) def test_reachable_then_unreachable_channel_connectivity(self): server = _server.Server((), futures.ThreadPoolExecutor(max_workers=0)) diff --git a/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py b/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py deleted file mode 100644 index 2b8981c752..0000000000 --- a/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py +++ /dev/null @@ -1,157 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests Base interface compliance of the core-over-gRPC-links stack.""" - -import collections -import logging -import random -import time -import unittest - -import six - -from grpc._adapter import _intermediary_low -from grpc._links import invocation -from grpc._links import service -from grpc.beta import interfaces as beta_interfaces -from grpc.framework.core import implementations -from grpc.framework.interfaces.base import utilities -from tests.unit import test_common as grpc_test_common -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.base import test_cases -from tests.unit.framework.interfaces.base import test_interfaces - - -class _SerializationBehaviors( - collections.namedtuple( - '_SerializationBehaviors', - ('request_serializers', 'request_deserializers', 'response_serializers', - 'response_deserializers',))): - pass - - -class _Links( - collections.namedtuple( - '_Links', - ('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link', - 'service_end_link'))): - pass - - -def _serialization_behaviors_from_serializations(serializations): - request_serializers = {} - request_deserializers = {} - response_serializers = {} - response_deserializers = {} - for (group, method), serialization in six.iteritems(serializations): - request_serializers[group, method] = serialization.serialize_request - request_deserializers[group, method] = serialization.deserialize_request - response_serializers[group, method] = serialization.serialize_response - response_deserializers[group, method] = serialization.deserialize_response - return _SerializationBehaviors( - request_serializers, request_deserializers, response_serializers, - response_deserializers) - - -class _Implementation(test_interfaces.Implementation): - - def instantiate(self, serializations, servicer): - serialization_behaviors = _serialization_behaviors_from_serializations( - serializations) - invocation_end_link = implementations.invocation_end_link() - service_end_link = implementations.service_end_link( - servicer, test_constants.DEFAULT_TIMEOUT, - test_constants.MAXIMUM_TIMEOUT) - service_grpc_link = service.service_link( - serialization_behaviors.request_deserializers, - serialization_behaviors.response_serializers) - port = service_grpc_link.add_port('[::]:0', None) - channel = _intermediary_low.Channel('localhost:%d' % port, None) - invocation_grpc_link = invocation.invocation_link( - channel, b'localhost', None, - serialization_behaviors.request_serializers, - serialization_behaviors.response_deserializers) - - invocation_end_link.join_link(invocation_grpc_link) - invocation_grpc_link.join_link(invocation_end_link) - service_end_link.join_link(service_grpc_link) - service_grpc_link.join_link(service_end_link) - invocation_grpc_link.start() - service_grpc_link.start() - return invocation_end_link, service_end_link, ( - invocation_grpc_link, service_grpc_link) - - def destantiate(self, memo): - invocation_grpc_link, service_grpc_link = memo - invocation_grpc_link.stop() - service_grpc_link.begin_stop() - service_grpc_link.end_stop() - - def invocation_initial_metadata(self): - return grpc_test_common.INVOCATION_INITIAL_METADATA - - def service_initial_metadata(self): - return grpc_test_common.SERVICE_INITIAL_METADATA - - def invocation_completion(self): - return utilities.completion(None, None, None) - - def service_completion(self): - return utilities.completion( - grpc_test_common.SERVICE_TERMINAL_METADATA, - beta_interfaces.StatusCode.OK, grpc_test_common.DETAILS) - - def metadata_transmitted(self, original_metadata, transmitted_metadata): - return original_metadata is None or grpc_test_common.metadata_transmitted( - original_metadata, transmitted_metadata) - - def completion_transmitted(self, original_completion, transmitted_completion): - if (original_completion.terminal_metadata is not None and - not grpc_test_common.metadata_transmitted( - original_completion.terminal_metadata, - transmitted_completion.terminal_metadata)): - return False - elif original_completion.code is not transmitted_completion.code: - return False - elif original_completion.message != transmitted_completion.message: - return False - else: - return True - - -def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py b/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py deleted file mode 100644 index 50b9a5a824..0000000000 --- a/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py +++ /dev/null @@ -1,163 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests Face compliance of the crust-over-core-over-gRPC-links stack.""" - -import collections -import unittest - -import six - -from grpc._adapter import _intermediary_low -from grpc._links import invocation -from grpc._links import service -from grpc.beta import interfaces as beta_interfaces -from grpc.framework.core import implementations as core_implementations -from grpc.framework.crust import implementations as crust_implementations -from grpc.framework.foundation import logging_pool -from grpc.framework.interfaces.links import utilities -from tests.unit import test_common as grpc_test_common -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.face import test_cases -from tests.unit.framework.interfaces.face import test_interfaces - - -class _SerializationBehaviors( - collections.namedtuple( - '_SerializationBehaviors', - ('request_serializers', 'request_deserializers', 'response_serializers', - 'response_deserializers',))): - pass - - -def _serialization_behaviors_from_test_methods(test_methods): - request_serializers = {} - request_deserializers = {} - response_serializers = {} - response_deserializers = {} - for (group, method), test_method in six.iteritems(test_methods): - request_serializers[group, method] = test_method.serialize_request - request_deserializers[group, method] = test_method.deserialize_request - response_serializers[group, method] = test_method.serialize_response - response_deserializers[group, method] = test_method.deserialize_response - return _SerializationBehaviors( - request_serializers, request_deserializers, response_serializers, - response_deserializers) - - -class _Implementation(test_interfaces.Implementation): - - def instantiate( - self, methods, method_implementations, multi_method_implementation): - pool = logging_pool.pool(test_constants.POOL_SIZE) - servicer = crust_implementations.servicer( - method_implementations, multi_method_implementation, pool) - serialization_behaviors = _serialization_behaviors_from_test_methods( - methods) - invocation_end_link = core_implementations.invocation_end_link() - service_end_link = core_implementations.service_end_link( - servicer, test_constants.DEFAULT_TIMEOUT, - test_constants.MAXIMUM_TIMEOUT) - service_grpc_link = service.service_link( - serialization_behaviors.request_deserializers, - serialization_behaviors.response_serializers) - port = service_grpc_link.add_port('[::]:0', None) - channel = _intermediary_low.Channel('localhost:%d' % port, None) - invocation_grpc_link = invocation.invocation_link( - channel, b'localhost', None, - serialization_behaviors.request_serializers, - serialization_behaviors.response_deserializers) - - invocation_end_link.join_link(invocation_grpc_link) - invocation_grpc_link.join_link(invocation_end_link) - service_grpc_link.join_link(service_end_link) - service_end_link.join_link(service_grpc_link) - service_end_link.start() - invocation_end_link.start() - invocation_grpc_link.start() - service_grpc_link.start() - - generic_stub = crust_implementations.generic_stub(invocation_end_link, pool) - # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. - group = next(iter(methods))[0] - # TODO(nathaniel): Add a "cardinalities_by_group" attribute to - # _digest.TestServiceDigest. - cardinalities = { - method: method_object.cardinality() - for (group, method), method_object in six.iteritems(methods)} - dynamic_stub = crust_implementations.dynamic_stub( - invocation_end_link, group, cardinalities, pool) - - return generic_stub, {group: dynamic_stub}, ( - invocation_end_link, invocation_grpc_link, service_grpc_link, - service_end_link, pool) - - def destantiate(self, memo): - (invocation_end_link, invocation_grpc_link, service_grpc_link, - service_end_link, pool) = memo - invocation_end_link.stop(0).wait() - invocation_grpc_link.stop() - service_grpc_link.begin_stop() - service_end_link.stop(0).wait() - service_grpc_link.end_stop() - invocation_end_link.join_link(utilities.NULL_LINK) - invocation_grpc_link.join_link(utilities.NULL_LINK) - service_grpc_link.join_link(utilities.NULL_LINK) - service_end_link.join_link(utilities.NULL_LINK) - pool.shutdown(wait=True) - - def invocation_metadata(self): - return grpc_test_common.INVOCATION_INITIAL_METADATA - - def initial_metadata(self): - return grpc_test_common.SERVICE_INITIAL_METADATA - - def terminal_metadata(self): - return grpc_test_common.SERVICE_TERMINAL_METADATA - - def code(self): - return beta_interfaces.StatusCode.OK - - def details(self): - return grpc_test_common.DETAILS - - def metadata_transmitted(self, original_metadata, transmitted_metadata): - return original_metadata is None or grpc_test_common.metadata_transmitted( - original_metadata, transmitted_metadata) - - -def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio/tests/unit/_cython/_read_some_but_not_all_responses_test.py new file mode 100644 index 0000000000..6ae7a90fbe --- /dev/null +++ b/src/python/grpcio/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -0,0 +1,251 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test a corner-case at the level of the Cython API.""" + +import threading +import unittest + +from grpc._cython import cygrpc + +_INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) +_EMPTY_FLAGS = 0 +_EMPTY_METADATA = cygrpc.Metadata(()) + + +class _ServerDriver(object): + + def __init__(self, completion_queue, shutdown_tag): + self._condition = threading.Condition() + self._completion_queue = completion_queue + self._shutdown_tag = shutdown_tag + self._events = [] + self._saw_shutdown_tag = False + + def start(self): + def in_thread(): + while True: + event = self._completion_queue.poll() + with self._condition: + self._events.append(event) + self._condition.notify() + if event.tag is self._shutdown_tag: + self._saw_shutdown_tag = True + break + thread = threading.Thread(target=in_thread) + thread.start() + + def done(self): + with self._condition: + return self._saw_shutdown_tag + + def first_event(self): + with self._condition: + while not self._events: + self._condition.wait() + return self._events[0] + + def events(self): + with self._condition: + while not self._saw_shutdown_tag: + self._condition.wait() + return tuple(self._events) + + +class _QueueDriver(object): + + def __init__(self, condition, completion_queue, due): + self._condition = condition + self._completion_queue = completion_queue + self._due = due + self._events = [] + self._returned = False + + def start(self): + def in_thread(): + while True: + event = self._completion_queue.poll() + with self._condition: + self._events.append(event) + self._due.remove(event.tag) + self._condition.notify_all() + if not self._due: + self._returned = True + return + thread = threading.Thread(target=in_thread) + thread.start() + + def done(self): + with self._condition: + return self._returned + + def event_with_tag(self, tag): + with self._condition: + while True: + for event in self._events: + if event.tag is tag: + return event + self._condition.wait() + + def events(self): + with self._condition: + while not self._returned: + self._condition.wait() + return tuple(self._events) + + +class ReadSomeButNotAllResponsesTest(unittest.TestCase): + + def testReadSomeButNotAllResponses(self): + server_completion_queue = cygrpc.CompletionQueue() + server = cygrpc.Server() + server.register_completion_queue(server_completion_queue) + port = server.add_http2_port('[::]:0') + server.start() + channel = cygrpc.Channel('localhost:{}'.format(port)) + + server_shutdown_tag = 'server_shutdown_tag' + server_driver = _ServerDriver(server_completion_queue, server_shutdown_tag) + server_driver.start() + + client_condition = threading.Condition() + client_due = set() + client_completion_queue = cygrpc.CompletionQueue() + client_driver = _QueueDriver( + client_condition, client_completion_queue, client_due) + client_driver.start() + + server_call_condition = threading.Condition() + server_send_initial_metadata_tag = 'server_send_initial_metadata_tag' + server_send_first_message_tag = 'server_send_first_message_tag' + server_send_second_message_tag = 'server_send_second_message_tag' + server_complete_rpc_tag = 'server_complete_rpc_tag' + server_call_due = set(( + server_send_initial_metadata_tag, + server_send_first_message_tag, + server_send_second_message_tag, + server_complete_rpc_tag, + )) + server_call_completion_queue = cygrpc.CompletionQueue() + server_call_driver = _QueueDriver( + server_call_condition, server_call_completion_queue, server_call_due) + server_call_driver.start() + + server_rpc_tag = 'server_rpc_tag' + request_call_result = server.request_call( + server_call_completion_queue, server_completion_queue, server_rpc_tag) + + client_call = channel.create_call( + None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', None, + _INFINITE_FUTURE) + client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' + client_complete_rpc_tag = 'client_complete_rpc_tag' + with client_condition: + client_receive_initial_metadata_start_batch_result = ( + client_call.start_batch(cygrpc.Operations([ + cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + ]), client_receive_initial_metadata_tag)) + client_due.add(client_receive_initial_metadata_tag) + client_complete_rpc_start_batch_result = ( + client_call.start_batch(cygrpc.Operations([ + cygrpc.operation_send_initial_metadata( + _EMPTY_METADATA, _EMPTY_FLAGS), + cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), + ]), client_complete_rpc_tag)) + client_due.add(client_complete_rpc_tag) + + server_rpc_event = server_driver.first_event() + + with server_call_condition: + server_send_initial_metadata_start_batch_result = ( + server_rpc_event.operation_call.start_batch(cygrpc.Operations([ + cygrpc.operation_send_initial_metadata( + _EMPTY_METADATA, _EMPTY_FLAGS), + ]), server_send_initial_metadata_tag)) + server_send_first_message_start_batch_result = ( + server_rpc_event.operation_call.start_batch(cygrpc.Operations([ + cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS), + ]), server_send_first_message_tag)) + server_send_initial_metadata_event = server_call_driver.event_with_tag( + server_send_initial_metadata_tag) + server_send_first_message_event = server_call_driver.event_with_tag( + server_send_first_message_tag) + with server_call_condition: + server_send_second_message_start_batch_result = ( + server_rpc_event.operation_call.start_batch(cygrpc.Operations([ + cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS), + ]), server_send_second_message_tag)) + server_complete_rpc_start_batch_result = ( + server_rpc_event.operation_call.start_batch(cygrpc.Operations([ + cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), + cygrpc.operation_send_status_from_server( + cygrpc.Metadata(()), cygrpc.StatusCode.ok, b'test details', + _EMPTY_FLAGS), + ]), server_complete_rpc_tag)) + server_send_second_message_event = server_call_driver.event_with_tag( + server_send_second_message_tag) + server_complete_rpc_event = server_call_driver.event_with_tag( + server_complete_rpc_tag) + server_call_driver.events() + + with client_condition: + client_receive_first_message_tag = 'client_receive_first_message_tag' + client_receive_first_message_start_batch_result = ( + client_call.start_batch(cygrpc.Operations([ + cygrpc.operation_receive_message(_EMPTY_FLAGS), + ]), client_receive_first_message_tag)) + client_due.add(client_receive_first_message_tag) + client_receive_first_message_event = client_driver.event_with_tag( + client_receive_first_message_tag) + + client_call_cancel_result = client_call.cancel() + client_driver.events() + + server.shutdown(server_completion_queue, server_shutdown_tag) + server.cancel_all_calls() + server_driver.events() + + self.assertEqual(cygrpc.CallError.ok, request_call_result) + self.assertEqual( + cygrpc.CallError.ok, server_send_initial_metadata_start_batch_result) + self.assertEqual( + cygrpc.CallError.ok, client_receive_initial_metadata_start_batch_result) + self.assertEqual( + cygrpc.CallError.ok, client_complete_rpc_start_batch_result) + self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result) + self.assertIs(server_rpc_tag, server_rpc_event.tag) + self.assertEqual( + cygrpc.CompletionType.operation_complete, server_rpc_event.type) + self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call) + self.assertEqual(0, len(server_rpc_event.batch_operations)) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py index 0a511101f0..a006a20ce3 100644 --- a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py @@ -37,7 +37,7 @@ from tests.unit import test_common from tests.unit import resources -_SSL_HOST_OVERRIDE = 'foo.test.google.fr' +_SSL_HOST_OVERRIDE = b'foo.test.google.fr' _CALL_CREDENTIALS_METADATA_KEY = 'call-creds-key' _CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value' _EMPTY_FLAGS = 0 @@ -143,22 +143,60 @@ class TypeSmokeTest(unittest.TestCase): del completion_queue -class InsecureServerInsecureClient(unittest.TestCase): +class ServerClientMixin(object): - def setUp(self): + def setUpMixin(self, server_credentials, client_credentials, host_override): self.server_completion_queue = cygrpc.CompletionQueue() self.server = cygrpc.Server() self.server.register_completion_queue(self.server_completion_queue) - self.port = self.server.add_http2_port('[::]:0') + if server_credentials: + self.port = self.server.add_http2_port('[::]:0', server_credentials) + else: + self.port = self.server.add_http2_port('[::]:0') self.server.start() self.client_completion_queue = cygrpc.CompletionQueue() - self.client_channel = cygrpc.Channel('localhost:{}'.format(self.port)) - - def tearDown(self): + if client_credentials: + client_channel_arguments = cygrpc.ChannelArgs([ + cygrpc.ChannelArg(cygrpc.ChannelArgKey.ssl_target_name_override, + host_override)]) + self.client_channel = cygrpc.Channel( + 'localhost:{}'.format(self.port), client_channel_arguments, + client_credentials) + else: + self.client_channel = cygrpc.Channel('localhost:{}'.format(self.port)) + if host_override: + self.host_argument = None # default host + self.expected_host = host_override + else: + # arbitrary host name necessitating no further identification + self.host_argument = b'hostess' + self.expected_host = self.host_argument + + def tearDownMixin(self): del self.server del self.client_completion_queue del self.server_completion_queue + def _perform_operations(self, operations, call, queue, deadline, description): + """Perform the list of operations with given call, queue, and deadline. + + Invocation errors are reported with as an exception with `description` in + the message. Performs the operations asynchronously, returning a future. + """ + def performer(): + tag = object() + try: + call_result = call.start_batch(cygrpc.Operations(operations), tag) + self.assertEqual(cygrpc.CallError.ok, call_result) + event = queue.poll(deadline) + self.assertEqual(cygrpc.CompletionType.operation_complete, event.type) + self.assertTrue(event.success) + self.assertIs(tag, event.tag) + except Exception as error: + raise Exception("Error in '{}': {}".format(description, error.message)) + return event + return test_utilities.SimpleFuture(performer) + def testEcho(self): DEADLINE = time.time()+5 DEADLINE_TOLERANCE = 0.25 @@ -175,7 +213,6 @@ class InsecureServerInsecureClient(unittest.TestCase): REQUEST = b'in death a member of project mayhem has a name' RESPONSE = b'his name is robert paulson' METHOD = b'twinkies' - HOST = b'hostess' cygrpc_deadline = cygrpc.Timespec(DEADLINE) @@ -188,7 +225,8 @@ class InsecureServerInsecureClient(unittest.TestCase): client_call_tag = object() client_call = self.client_channel.create_call( - None, 0, self.client_completion_queue, METHOD, HOST, cygrpc_deadline) + None, 0, self.client_completion_queue, METHOD, self.host_argument, + cygrpc_deadline) client_initial_metadata = cygrpc.Metadata([ cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE), @@ -216,7 +254,8 @@ class InsecureServerInsecureClient(unittest.TestCase): test_common.metadata_transmitted(client_initial_metadata, request_event.request_metadata)) self.assertEqual(METHOD, request_event.request_call_details.method) - self.assertEqual(HOST, request_event.request_call_details.host) + self.assertEqual(self.expected_host, + request_event.request_call_details.host) self.assertLess( abs(DEADLINE - float(request_event.request_call_details.deadline)), DEADLINE_TOLERANCE) @@ -292,172 +331,101 @@ class InsecureServerInsecureClient(unittest.TestCase): del client_call del server_call - -class SecureServerSecureClient(unittest.TestCase): - - def setUp(self): - server_credentials = cygrpc.server_credentials_ssl( - None, [cygrpc.SslPemKeyCertPair(resources.private_key(), - resources.certificate_chain())], False) - channel_credentials = cygrpc.channel_credentials_ssl( - resources.test_root_certificates(), None) - self.server_completion_queue = cygrpc.CompletionQueue() - self.server = cygrpc.Server() - self.server.register_completion_queue(self.server_completion_queue) - self.port = self.server.add_http2_port('[::]:0', server_credentials) - self.server.start() - self.client_completion_queue = cygrpc.CompletionQueue() - client_channel_arguments = cygrpc.ChannelArgs([ - cygrpc.ChannelArg(cygrpc.ChannelArgKey.ssl_target_name_override, - _SSL_HOST_OVERRIDE)]) - self.client_channel = cygrpc.Channel( - 'localhost:{}'.format(self.port), client_channel_arguments, - channel_credentials) - - def tearDown(self): - del self.server - del self.client_completion_queue - del self.server_completion_queue - - def testEcho(self): + def test6522(self): DEADLINE = time.time()+5 DEADLINE_TOLERANCE = 0.25 - CLIENT_METADATA_ASCII_KEY = b'key' - CLIENT_METADATA_ASCII_VALUE = b'val' - CLIENT_METADATA_BIN_KEY = b'key-bin' - CLIENT_METADATA_BIN_VALUE = b'\0'*1000 - SERVER_INITIAL_METADATA_KEY = b'init_me_me_me' - SERVER_INITIAL_METADATA_VALUE = b'whodawha?' - SERVER_TRAILING_METADATA_KEY = b'california_is_in_a_drought' - SERVER_TRAILING_METADATA_VALUE = b'zomg it is' - SERVER_STATUS_CODE = cygrpc.StatusCode.ok - SERVER_STATUS_DETAILS = b'our work is never over' - REQUEST = b'in death a member of project mayhem has a name' - RESPONSE = b'his name is robert paulson' - METHOD = b'/twinkies' - HOST = None # Default host + METHOD = b'twinkies' cygrpc_deadline = cygrpc.Timespec(DEADLINE) + empty_metadata = cygrpc.Metadata([]) server_request_tag = object() - request_call_result = self.server.request_call( + self.server.request_call( self.server_completion_queue, self.server_completion_queue, server_request_tag) + client_call = self.client_channel.create_call( + None, 0, self.client_completion_queue, METHOD, self.host_argument, + cygrpc_deadline) - self.assertEqual(cygrpc.CallError.ok, request_call_result) - - plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback, '') - call_credentials = cygrpc.call_credentials_metadata_plugin(plugin) + # Prologue + def perform_client_operations(operations, description): + return self._perform_operations( + operations, client_call, + self.client_completion_queue, cygrpc_deadline, description) - client_call_tag = object() - client_call = self.client_channel.create_call( - None, 0, self.client_completion_queue, METHOD, HOST, cygrpc_deadline) - client_call.set_credentials(call_credentials) - client_initial_metadata = cygrpc.Metadata([ - cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY, - CLIENT_METADATA_ASCII_VALUE), - cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]) - client_start_batch_result = client_call.start_batch(cygrpc.Operations([ - cygrpc.operation_send_initial_metadata(client_initial_metadata, - _EMPTY_FLAGS), - cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) - ]), client_call_tag) - self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) - client_event_future = test_utilities.CompletionQueuePollFuture( - self.client_completion_queue, cygrpc_deadline) + client_event_future = perform_client_operations([ + cygrpc.operation_send_initial_metadata(empty_metadata, + _EMPTY_FLAGS), + cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + ], "Client prologue") request_event = self.server_completion_queue.poll(cygrpc_deadline) - self.assertEqual(cygrpc.CompletionType.operation_complete, - request_event.type) - self.assertIsInstance(request_event.operation_call, cygrpc.Call) - self.assertIs(server_request_tag, request_event.tag) - self.assertEqual(0, len(request_event.batch_operations)) - client_metadata_with_credentials = list(client_initial_metadata) + [ - (_CALL_CREDENTIALS_METADATA_KEY, _CALL_CREDENTIALS_METADATA_VALUE)] - self.assertTrue( - test_common.metadata_transmitted(client_metadata_with_credentials, - request_event.request_metadata)) - self.assertEqual(METHOD, request_event.request_call_details.method) - self.assertEqual(_SSL_HOST_OVERRIDE, - request_event.request_call_details.host) - self.assertLess( - abs(DEADLINE - float(request_event.request_call_details.deadline)), - DEADLINE_TOLERANCE) - - server_call_tag = object() server_call = request_event.operation_call - server_initial_metadata = cygrpc.Metadata([ - cygrpc.Metadatum(SERVER_INITIAL_METADATA_KEY, - SERVER_INITIAL_METADATA_VALUE)]) - server_trailing_metadata = cygrpc.Metadata([ - cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY, - SERVER_TRAILING_METADATA_VALUE)]) - server_start_batch_result = server_call.start_batch([ - cygrpc.operation_send_initial_metadata(server_initial_metadata, - _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS), - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( - server_trailing_metadata, SERVER_STATUS_CODE, - SERVER_STATUS_DETAILS, _EMPTY_FLAGS) - ], server_call_tag) - self.assertEqual(cygrpc.CallError.ok, server_start_batch_result) - client_event = client_event_future.result() - server_event = self.server_completion_queue.poll(cygrpc_deadline) + def perform_server_operations(operations, description): + return self._perform_operations( + operations, server_call, + self.server_completion_queue, cygrpc_deadline, description) - self.assertEqual(6, len(client_event.batch_operations)) - found_client_op_types = set() - for client_result in client_event.batch_operations: - # we expect each op type to be unique - self.assertNotIn(client_result.type, found_client_op_types) - found_client_op_types.add(client_result.type) - if client_result.type == cygrpc.OperationType.receive_initial_metadata: - self.assertTrue( - test_common.metadata_transmitted(server_initial_metadata, - client_result.received_metadata)) - elif client_result.type == cygrpc.OperationType.receive_message: - self.assertEqual(RESPONSE, client_result.received_message.bytes()) - elif client_result.type == cygrpc.OperationType.receive_status_on_client: - self.assertTrue( - test_common.metadata_transmitted(server_trailing_metadata, - client_result.received_metadata)) - self.assertEqual(SERVER_STATUS_DETAILS, - client_result.received_status_details) - self.assertEqual(SERVER_STATUS_CODE, client_result.received_status_code) - self.assertEqual(set([ - cygrpc.OperationType.send_initial_metadata, - cygrpc.OperationType.send_message, - cygrpc.OperationType.send_close_from_client, - cygrpc.OperationType.receive_initial_metadata, - cygrpc.OperationType.receive_message, - cygrpc.OperationType.receive_status_on_client - ]), found_client_op_types) + server_event_future = perform_server_operations([ + cygrpc.operation_send_initial_metadata(empty_metadata, + _EMPTY_FLAGS), + ], "Server prologue") - self.assertEqual(5, len(server_event.batch_operations)) - found_server_op_types = set() - for server_result in server_event.batch_operations: - self.assertNotIn(client_result.type, found_server_op_types) - found_server_op_types.add(server_result.type) - if server_result.type == cygrpc.OperationType.receive_message: - self.assertEqual(REQUEST, server_result.received_message.bytes()) - elif server_result.type == cygrpc.OperationType.receive_close_on_server: - self.assertFalse(server_result.received_cancelled) - self.assertEqual(set([ - cygrpc.OperationType.send_initial_metadata, - cygrpc.OperationType.receive_message, - cygrpc.OperationType.send_message, - cygrpc.OperationType.receive_close_on_server, - cygrpc.OperationType.send_status_from_server - ]), found_server_op_types) + client_event_future.result() # force completion + server_event_future.result() - del client_call - del server_call + # Messaging + for _ in range(10): + client_event_future = perform_client_operations([ + cygrpc.operation_send_message(b'', _EMPTY_FLAGS), + cygrpc.operation_receive_message(_EMPTY_FLAGS), + ], "Client message") + server_event_future = perform_server_operations([ + cygrpc.operation_send_message(b'', _EMPTY_FLAGS), + cygrpc.operation_receive_message(_EMPTY_FLAGS), + ], "Server receive") + + client_event_future.result() # force completion + server_event_future.result() + + # Epilogue + client_event_future = perform_client_operations([ + cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) + ], "Client epilogue") + + server_event_future = perform_server_operations([ + cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), + cygrpc.operation_send_status_from_server( + empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS) + ], "Server epilogue") + + client_event_future.result() # force completion + server_event_future.result() + + +class InsecureServerInsecureClient(unittest.TestCase, ServerClientMixin): + + def setUp(self): + self.setUpMixin(None, None, None) + + def tearDown(self): + self.tearDownMixin() + + +class SecureServerSecureClient(unittest.TestCase, ServerClientMixin): + + def setUp(self): + server_credentials = cygrpc.server_credentials_ssl( + None, [cygrpc.SslPemKeyCertPair(resources.private_key(), + resources.certificate_chain())], False) + client_credentials = cygrpc.channel_credentials_ssl( + resources.test_root_certificates(), None) + self.setUpMixin(server_credentials, client_credentials, _SSL_HOST_OVERRIDE) + + def tearDown(self): + self.tearDownMixin() if __name__ == '__main__': diff --git a/src/python/grpcio/tests/unit/_cython/test_utilities.py b/src/python/grpcio/tests/unit/_cython/test_utilities.py index 6a09739643..6280ce74c4 100644 --- a/src/python/grpcio/tests/unit/_cython/test_utilities.py +++ b/src/python/grpcio/tests/unit/_cython/test_utilities.py @@ -32,15 +32,35 @@ import threading from grpc._cython import cygrpc -class CompletionQueuePollFuture: +class SimpleFuture(object): + """A simple future mechanism.""" - def __init__(self, completion_queue, deadline): - def poller_function(): - self._event_result = completion_queue.poll(deadline) - self._event_result = None - self._thread = threading.Thread(target=poller_function) + def __init__(self, function, *args, **kwargs): + def wrapped_function(): + try: + self._result = function(*args, **kwargs) + except Exception as error: + self._error = error + self._result = None + self._error = None + self._thread = threading.Thread(target=wrapped_function) self._thread.start() def result(self): + """The resulting value of this future. + + Re-raises any exceptions. + """ self._thread.join() - return self._event_result + if self._error: + # TODO(atash): re-raise exceptions in a way that preserves tracebacks + raise self._error + return self._result + + +class CompletionQueuePollFuture(SimpleFuture): + + def __init__(self, completion_queue, deadline): + super(CompletionQueuePollFuture, self).__init__( + lambda: completion_queue.poll(deadline)) + diff --git a/src/python/grpcio/tests/unit/_empty_message_test.py b/src/python/grpcio/tests/unit/_empty_message_test.py new file mode 100644 index 0000000000..f324f6216b --- /dev/null +++ b/src/python/grpcio/tests/unit/_empty_message_test.py @@ -0,0 +1,137 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import unittest + +import grpc +from grpc.framework.foundation import logging_pool + +from tests.unit.framework.common import test_constants + +_REQUEST = b'' +_RESPONSE = b'' + +_UNARY_UNARY = b'/test/UnaryUnary' +_UNARY_STREAM = b'/test/UnaryStream' +_STREAM_UNARY = b'/test/StreamUnary' +_STREAM_STREAM = b'/test/StreamStream' + + +def handle_unary_unary(request, servicer_context): + return _RESPONSE + + +def handle_unary_stream(request, servicer_context): + for _ in range(test_constants.STREAM_LENGTH): + yield _RESPONSE + + +def handle_stream_unary(request_iterator, servicer_context): + for request in request_iterator: + pass + return _RESPONSE + + +def handle_stream_stream(request_iterator, servicer_context): + for request in request_iterator: + yield _RESPONSE + + +class _MethodHandler(grpc.RpcMethodHandler): + + def __init__(self, request_streaming, response_streaming): + self.request_streaming = request_streaming + self.response_streaming = response_streaming + self.request_deserializer = None + self.response_serializer = None + self.unary_unary = None + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + if self.request_streaming and self.response_streaming: + self.stream_stream = handle_stream_stream + elif self.request_streaming: + self.stream_unary = handle_stream_unary + elif self.response_streaming: + self.unary_stream = handle_unary_stream + else: + self.unary_unary = handle_unary_unary + + +class _GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == _UNARY_UNARY: + return _MethodHandler(False, False) + elif handler_call_details.method == _UNARY_STREAM: + return _MethodHandler(False, True) + elif handler_call_details.method == _STREAM_UNARY: + return _MethodHandler(True, False) + elif handler_call_details.method == _STREAM_STREAM: + return _MethodHandler(True, True) + else: + return None + + +class EmptyMessageTest(unittest.TestCase): + + def setUp(self): + self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + self._server = grpc.server((_GenericHandler(),), self._server_pool) + port = self._server.add_insecure_port('[::]:0') + self._server.start() + self._channel = grpc.insecure_channel('localhost:%d' % port) + + def tearDown(self): + self._server.stop(0) + + def testUnaryUnary(self): + response = self._channel.unary_unary(_UNARY_UNARY)(_REQUEST) + self.assertEqual(_RESPONSE, response) + + def testUnaryStream(self): + response_iterator = self._channel.unary_stream(_UNARY_STREAM)(_REQUEST) + self.assertSequenceEqual( + [_RESPONSE] * test_constants.STREAM_LENGTH, list(response_iterator)) + + def testStreamUnary(self): + response = self._channel.stream_unary(_STREAM_UNARY)( + [_REQUEST] * test_constants.STREAM_LENGTH) + self.assertEqual(_RESPONSE, response) + + def testStreamStream(self): + response_iterator = self._channel.stream_stream(_STREAM_STREAM)( + [_REQUEST] * test_constants.STREAM_LENGTH) + self.assertSequenceEqual( + [_RESPONSE] * test_constants.STREAM_LENGTH, list(response_iterator)) + + +if __name__ == '__main__': + unittest.main(verbosity=2) + diff --git a/src/python/grpcio/tests/unit/_from_grpc_import_star.py b/src/python/grpcio/tests/unit/_from_grpc_import_star.py new file mode 100644 index 0000000000..78d2fb7dc5 --- /dev/null +++ b/src/python/grpcio/tests/unit/_from_grpc_import_star.py @@ -0,0 +1,38 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +_BEFORE_IMPORT = tuple(globals()) + +from grpc import * + +_AFTER_IMPORT = tuple(globals()) + +GRPC_ELEMENTS = tuple( + element for element in _AFTER_IMPORT + if element not in _BEFORE_IMPORT and element != '_BEFORE_IMPORT') diff --git a/src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py b/src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py deleted file mode 100644 index 890755f81c..0000000000 --- a/src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py +++ /dev/null @@ -1,88 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""A test of invocation-side code unconnected to an RPC server.""" - -import unittest - -from grpc._adapter import _intermediary_low -from grpc._links import invocation -from grpc.framework.interfaces.links import links -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.links import test_cases -from tests.unit.framework.interfaces.links import test_utilities - -_NULL_BEHAVIOR = lambda unused_argument: None - - -class LonelyInvocationLinkTest(unittest.TestCase): - - def testUpAndDown(self): - channel = _intermediary_low.Channel('nonexistent:54321', None) - invocation_link = invocation.invocation_link( - channel, 'nonexistent', None, {}, {}) - - invocation_link.start() - invocation_link.stop() - - def _test_lonely_invocation_with_termination(self, termination): - test_operation_id = object() - test_group = 'test package.Test Service' - test_method = 'test method' - invocation_link_mate = test_utilities.RecordingLink() - - channel = _intermediary_low.Channel('nonexistent:54321', None) - invocation_link = invocation.invocation_link( - channel, 'nonexistent', None, {}, {}) - invocation_link.join_link(invocation_link_mate) - invocation_link.start() - - ticket = links.Ticket( - test_operation_id, 0, test_group, test_method, - links.Ticket.Subscription.FULL, test_constants.SHORT_TIMEOUT, 1, None, - None, None, None, None, termination, None) - invocation_link.accept_ticket(ticket) - invocation_link_mate.block_until_tickets_satisfy(test_cases.terminated) - - invocation_link.stop() - - self.assertIsNot( - invocation_link_mate.tickets()[-1].termination, - links.Ticket.Termination.COMPLETION) - - def testLonelyInvocationLinkWithCommencementTicket(self): - self._test_lonely_invocation_with_termination(None) - - def testLonelyInvocationLinkWithEntireTicket(self): - self._test_lonely_invocation_with_termination( - links.Ticket.Termination.COMPLETION) - - -if __name__ == '__main__': - unittest.main() diff --git a/src/python/grpcio/tests/unit/_links/_transmission_test.py b/src/python/grpcio/tests/unit/_links/_transmission_test.py deleted file mode 100644 index 888684d197..0000000000 --- a/src/python/grpcio/tests/unit/_links/_transmission_test.py +++ /dev/null @@ -1,239 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests transmission of tickets across gRPC-on-the-wire.""" - -import unittest - -from grpc._adapter import _intermediary_low -from grpc._links import invocation -from grpc._links import service -from grpc.beta import interfaces as beta_interfaces -from grpc.framework.interfaces.links import links -from tests.unit import test_common -from tests.unit._links import _proto_scenarios -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.links import test_cases -from tests.unit.framework.interfaces.links import test_utilities - -_IDENTITY = lambda x: x - - -class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase): - - def create_transmitting_links(self): - service_link = service.service_link( - {self.group_and_method(): self.deserialize_request}, - {self.group_and_method(): self.serialize_response}) - port = service_link.add_port('[::]:0', None) - service_link.start() - channel = _intermediary_low.Channel('localhost:%d' % port, None) - invocation_link = invocation.invocation_link( - channel, 'localhost', None, - {self.group_and_method(): self.serialize_request}, - {self.group_and_method(): self.deserialize_response}) - invocation_link.start() - return invocation_link, service_link - - def destroy_transmitting_links(self, invocation_side_link, service_side_link): - invocation_side_link.stop() - service_side_link.begin_stop() - service_side_link.end_stop() - - def create_invocation_initial_metadata(self): - return ( - ('first_invocation_initial_metadata_key', 'just a string value'), - ('second_invocation_initial_metadata_key', '0123456789'), - ('third_invocation_initial_metadata_key-bin', '\x00\x57' * 100), - ) - - def create_invocation_terminal_metadata(self): - return None - - def create_service_initial_metadata(self): - return ( - ('first_service_initial_metadata_key', 'just another string value'), - ('second_service_initial_metadata_key', '9876543210'), - ('third_service_initial_metadata_key-bin', '\x00\x59\x02' * 100), - ) - - def create_service_terminal_metadata(self): - return ( - ('first_service_terminal_metadata_key', 'yet another string value'), - ('second_service_terminal_metadata_key', 'abcdefghij'), - ('third_service_terminal_metadata_key-bin', '\x00\x37' * 100), - ) - - def create_invocation_completion(self): - return None, None - - def create_service_completion(self): - return ( - beta_interfaces.StatusCode.OK, b'An exuberant test "details" message!') - - def assertMetadataTransmitted(self, original_metadata, transmitted_metadata): - self.assertTrue( - test_common.metadata_transmitted( - original_metadata, transmitted_metadata), - '%s erroneously transmitted as %s' % ( - original_metadata, transmitted_metadata)) - - -class RoundTripTest(unittest.TestCase): - - def testZeroMessageRoundTrip(self): - test_operation_id = object() - test_group = 'test package.Test Group' - test_method = 'test method' - identity_transformation = {(test_group, test_method): _IDENTITY} - test_code = beta_interfaces.StatusCode.OK - test_message = 'a test message' - - service_link = service.service_link( - identity_transformation, identity_transformation) - service_mate = test_utilities.RecordingLink() - service_link.join_link(service_mate) - port = service_link.add_port('[::]:0', None) - service_link.start() - channel = _intermediary_low.Channel('localhost:%d' % port, None) - invocation_link = invocation.invocation_link( - channel, None, None, identity_transformation, identity_transformation) - invocation_mate = test_utilities.RecordingLink() - invocation_link.join_link(invocation_mate) - invocation_link.start() - - invocation_ticket = links.Ticket( - test_operation_id, 0, test_group, test_method, - links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None, - None, None, None, None, links.Ticket.Termination.COMPLETION, None) - invocation_link.accept_ticket(invocation_ticket) - service_mate.block_until_tickets_satisfy(test_cases.terminated) - - service_ticket = links.Ticket( - service_mate.tickets()[-1].operation_id, 0, None, None, None, None, - None, None, None, None, test_code, test_message, - links.Ticket.Termination.COMPLETION, None) - service_link.accept_ticket(service_ticket) - invocation_mate.block_until_tickets_satisfy(test_cases.terminated) - - invocation_link.stop() - service_link.begin_stop() - service_link.end_stop() - - self.assertIs( - service_mate.tickets()[-1].termination, - links.Ticket.Termination.COMPLETION) - self.assertIs( - invocation_mate.tickets()[-1].termination, - links.Ticket.Termination.COMPLETION) - self.assertIs(invocation_mate.tickets()[-1].code, test_code) - self.assertEqual(invocation_mate.tickets()[-1].message, test_message) - - def _perform_scenario_test(self, scenario): - test_operation_id = object() - test_group, test_method = scenario.group_and_method() - test_code = beta_interfaces.StatusCode.OK - test_message = 'a scenario test message' - - service_link = service.service_link( - {(test_group, test_method): scenario.deserialize_request}, - {(test_group, test_method): scenario.serialize_response}) - service_mate = test_utilities.RecordingLink() - service_link.join_link(service_mate) - port = service_link.add_port('[::]:0', None) - service_link.start() - channel = _intermediary_low.Channel('localhost:%d' % port, None) - invocation_link = invocation.invocation_link( - channel, 'localhost', None, - {(test_group, test_method): scenario.serialize_request}, - {(test_group, test_method): scenario.deserialize_response}) - invocation_mate = test_utilities.RecordingLink() - invocation_link.join_link(invocation_mate) - invocation_link.start() - - invocation_ticket = links.Ticket( - test_operation_id, 0, test_group, test_method, - links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None, - None, None, None, None, None, None) - invocation_link.accept_ticket(invocation_ticket) - requests = scenario.requests() - for request_index, request in enumerate(requests): - request_ticket = links.Ticket( - test_operation_id, 1 + request_index, None, None, None, None, 1, None, - request, None, None, None, None, None) - invocation_link.accept_ticket(request_ticket) - service_mate.block_until_tickets_satisfy( - test_cases.at_least_n_payloads_received_predicate(1 + request_index)) - response_ticket = links.Ticket( - service_mate.tickets()[0].operation_id, request_index, None, None, - None, None, 1, None, scenario.response_for_request(request), None, - None, None, None, None) - service_link.accept_ticket(response_ticket) - invocation_mate.block_until_tickets_satisfy( - test_cases.at_least_n_payloads_received_predicate(1 + request_index)) - request_count = len(requests) - invocation_completion_ticket = links.Ticket( - test_operation_id, request_count + 1, None, None, None, None, None, - None, None, None, None, None, links.Ticket.Termination.COMPLETION, - None) - invocation_link.accept_ticket(invocation_completion_ticket) - service_mate.block_until_tickets_satisfy(test_cases.terminated) - service_completion_ticket = links.Ticket( - service_mate.tickets()[0].operation_id, request_count, None, None, None, - None, None, None, None, None, test_code, test_message, - links.Ticket.Termination.COMPLETION, None) - service_link.accept_ticket(service_completion_ticket) - invocation_mate.block_until_tickets_satisfy(test_cases.terminated) - - invocation_link.stop() - service_link.begin_stop() - service_link.end_stop() - - observed_requests = tuple( - ticket.payload for ticket in service_mate.tickets() - if ticket.payload is not None) - observed_responses = tuple( - ticket.payload for ticket in invocation_mate.tickets() - if ticket.payload is not None) - self.assertTrue(scenario.verify_requests(observed_requests)) - self.assertTrue(scenario.verify_responses(observed_responses)) - - def testEmptyScenario(self): - self._perform_scenario_test(_proto_scenarios.EmptyScenario()) - - def testBidirectionallyUnaryScenario(self): - self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario()) - - def testBidirectionallyStreamingScenario(self): - self._perform_scenario_test( - _proto_scenarios.BidirectionallyStreamingScenario()) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_metadata_code_details_test.py b/src/python/grpcio/tests/unit/_metadata_code_details_test.py new file mode 100644 index 0000000000..dd74268cbf --- /dev/null +++ b/src/python/grpcio/tests/unit/_metadata_code_details_test.py @@ -0,0 +1,523 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests application-provided metadata, status code, and details.""" + +import threading +import unittest + +import grpc +from grpc.framework.foundation import logging_pool + +from tests.unit import test_common +from tests.unit.framework.common import test_constants +from tests.unit.framework.common import test_control + +_SERIALIZED_REQUEST = b'\x46\x47\x48' +_SERIALIZED_RESPONSE = b'\x49\x50\x51' + +_REQUEST_SERIALIZER = lambda unused_request: _SERIALIZED_REQUEST +_REQUEST_DESERIALIZER = lambda unused_serialized_request: object() +_RESPONSE_SERIALIZER = lambda unused_response: _SERIALIZED_RESPONSE +_RESPONSE_DESERIALIZER = lambda unused_serialized_resopnse: object() + +_SERVICE = b'test.TestService' +_UNARY_UNARY = b'UnaryUnary' +_UNARY_STREAM = b'UnaryStream' +_STREAM_UNARY = b'StreamUnary' +_STREAM_STREAM = b'StreamStream' + +_CLIENT_METADATA = ( + (b'client-md-key', b'client-md-key'), + (b'client-md-key-bin', b'\x00\x01') +) + +_SERVER_INITIAL_METADATA = ( + (b'server-initial-md-key', b'server-initial-md-value'), + (b'server-initial-md-key-bin', b'\x00\x02') +) + +_SERVER_TRAILING_METADATA = ( + (b'server-trailing-md-key', b'server-trailing-md-value'), + (b'server-trailing-md-key-bin', b'\x00\x03') +) + +_NON_OK_CODE = grpc.StatusCode.NOT_FOUND +_DETAILS = b'Test details!' + + +class _Servicer(object): + + def __init__(self): + self._lock = threading.Lock() + self._code = None + self._details = None + self._exception = False + self._return_none = False + self._received_client_metadata = None + + def unary_unary(self, request, context): + with self._lock: + self._received_client_metadata = context.invocation_metadata() + context.send_initial_metadata(_SERVER_INITIAL_METADATA) + context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + if self._code is not None: + context.set_code(self._code) + if self._details is not None: + context.set_details(self._details) + if self._exception: + raise test_control.Defect() + else: + return None if self._return_none else object() + + def unary_stream(self, request, context): + with self._lock: + self._received_client_metadata = context.invocation_metadata() + context.send_initial_metadata(_SERVER_INITIAL_METADATA) + context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + if self._code is not None: + context.set_code(self._code) + if self._details is not None: + context.set_details(self._details) + for _ in range(test_constants.STREAM_LENGTH // 2): + yield _SERIALIZED_RESPONSE + if self._exception: + raise test_control.Defect() + + def stream_unary(self, request_iterator, context): + with self._lock: + self._received_client_metadata = context.invocation_metadata() + context.send_initial_metadata(_SERVER_INITIAL_METADATA) + context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + if self._code is not None: + context.set_code(self._code) + if self._details is not None: + context.set_details(self._details) + # TODO(https://github.com/grpc/grpc/issues/6891): just ignore the + # request iterator. + for ignored_request in request_iterator: + pass + if self._exception: + raise test_control.Defect() + else: + return None if self._return_none else _SERIALIZED_RESPONSE + + def stream_stream(self, request_iterator, context): + with self._lock: + self._received_client_metadata = context.invocation_metadata() + context.send_initial_metadata(_SERVER_INITIAL_METADATA) + context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + if self._code is not None: + context.set_code(self._code) + if self._details is not None: + context.set_details(self._details) + # TODO(https://github.com/grpc/grpc/issues/6891): just ignore the + # request iterator. + for ignored_request in request_iterator: + pass + for _ in range(test_constants.STREAM_LENGTH // 3): + yield object() + if self._exception: + raise test_control.Defect() + + def set_code(self, code): + with self._lock: + self._code = code + + def set_details(self, details): + with self._lock: + self._details = details + + def set_exception(self): + with self._lock: + self._exception = True + + def set_return_none(self): + with self._lock: + self._return_none = True + + def received_client_metadata(self): + with self._lock: + return self._received_client_metadata + + +def _generic_handler(servicer): + method_handlers = { + _UNARY_UNARY: grpc.unary_unary_rpc_method_handler( + servicer.unary_unary, request_deserializer=_REQUEST_DESERIALIZER, + response_serializer=_RESPONSE_SERIALIZER), + _UNARY_STREAM: grpc.unary_stream_rpc_method_handler( + servicer.unary_stream), + _STREAM_UNARY: grpc.stream_unary_rpc_method_handler( + servicer.stream_unary), + _STREAM_STREAM: grpc.stream_stream_rpc_method_handler( + servicer.stream_stream, request_deserializer=_REQUEST_DESERIALIZER, + response_serializer=_RESPONSE_SERIALIZER), + } + return grpc.method_handlers_generic_handler(_SERVICE, method_handlers) + + +class MetadataCodeDetailsTest(unittest.TestCase): + + def setUp(self): + self._servicer = _Servicer() + self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + self._server = grpc.server( + (_generic_handler(self._servicer),), self._server_pool) + port = self._server.add_insecure_port('[::]:0') + self._server.start() + + channel = grpc.insecure_channel('localhost:{}'.format(port)) + self._unary_unary = channel.unary_unary( + b'/'.join((b'', _SERVICE, _UNARY_UNARY,)), + request_serializer=_REQUEST_SERIALIZER, + response_deserializer=_RESPONSE_DESERIALIZER,) + self._unary_stream = channel.unary_stream( + b'/'.join((b'', _SERVICE, _UNARY_STREAM,)),) + self._stream_unary = channel.stream_unary( + b'/'.join((b'', _SERVICE, _STREAM_UNARY,)),) + self._stream_stream = channel.stream_stream( + b'/'.join((b'', _SERVICE, _STREAM_STREAM,)), + request_serializer=_REQUEST_SERIALIZER, + response_deserializer=_RESPONSE_DESERIALIZER,) + + + def testSuccessfulUnaryUnary(self): + self._servicer.set_details(_DETAILS) + + unused_response, call = self._unary_unary.with_call( + object(), metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, call.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(grpc.StatusCode.OK, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testSuccessfulUnaryStream(self): + self._servicer.set_details(_DETAILS) + + call = self._unary_stream(_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA) + received_initial_metadata = call.initial_metadata() + for _ in call: + pass + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(grpc.StatusCode.OK, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testSuccessfulStreamUnary(self): + self._servicer.set_details(_DETAILS) + + unused_response, call = self._stream_unary.with_call( + iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, call.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(grpc.StatusCode.OK, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testSuccessfulStreamStream(self): + self._servicer.set_details(_DETAILS) + + call = self._stream_stream( + iter([object()] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + received_initial_metadata = call.initial_metadata() + for _ in call: + pass + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(grpc.StatusCode.OK, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testCustomCodeUnaryUnary(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + + with self.assertRaises(grpc.RpcError) as exception_context: + self._unary_unary.with_call(object(), metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + def testCustomCodeUnaryStream(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + + call = self._unary_stream(_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA) + received_initial_metadata = call.initial_metadata() + with self.assertRaises(grpc.RpcError): + for _ in call: + pass + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(_NON_OK_CODE, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testCustomCodeStreamUnary(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + + with self.assertRaises(grpc.RpcError) as exception_context: + self._stream_unary.with_call( + iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + def testCustomCodeStreamStream(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + + call = self._stream_stream( + iter([object()] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + received_initial_metadata = call.initial_metadata() + with self.assertRaises(grpc.RpcError) as exception_context: + for _ in call: + pass + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + def testCustomCodeExceptionUnaryUnary(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + self._servicer.set_exception() + + with self.assertRaises(grpc.RpcError) as exception_context: + self._unary_unary.with_call(object(), metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + def testCustomCodeExceptionUnaryStream(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + self._servicer.set_exception() + + call = self._unary_stream(_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA) + received_initial_metadata = call.initial_metadata() + with self.assertRaises(grpc.RpcError): + for _ in call: + pass + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(_NON_OK_CODE, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testCustomCodeExceptionStreamUnary(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + self._servicer.set_exception() + + with self.assertRaises(grpc.RpcError) as exception_context: + self._stream_unary.with_call( + iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + def testCustomCodeExceptionStreamStream(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + self._servicer.set_exception() + + call = self._stream_stream( + iter([object()] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + received_initial_metadata = call.initial_metadata() + with self.assertRaises(grpc.RpcError): + for _ in call: + pass + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(_NON_OK_CODE, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testCustomCodeReturnNoneUnaryUnary(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + self._servicer.set_return_none() + + with self.assertRaises(grpc.RpcError) as exception_context: + self._unary_unary.with_call(object(), metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + def testCustomCodeReturnNoneStreamUnary(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + self._servicer.set_return_none() + + with self.assertRaises(grpc.RpcError) as exception_context: + self._stream_unary.with_call( + iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_metadata_test.py b/src/python/grpcio/tests/unit/_metadata_test.py new file mode 100644 index 0000000000..2cb13f236b --- /dev/null +++ b/src/python/grpcio/tests/unit/_metadata_test.py @@ -0,0 +1,216 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +"""Tests server and client side metadata API.""" + +import unittest +import weakref + +import grpc +from grpc import _grpcio_metadata +from grpc.framework.foundation import logging_pool + +from tests.unit import test_common +from tests.unit.framework.common import test_constants + +_CHANNEL_ARGS = (('grpc.primary_user_agent', 'primary-agent'), + ('grpc.secondary_user_agent', 'secondary-agent')) + +_REQUEST = b'\x00\x00\x00' +_RESPONSE = b'\x00\x00\x00' + +_UNARY_UNARY = b'/test/UnaryUnary' +_UNARY_STREAM = b'/test/UnaryStream' +_STREAM_UNARY = b'/test/StreamUnary' +_STREAM_STREAM = b'/test/StreamStream' + +_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__) + +_CLIENT_METADATA = ( + (b'client-md-key', b'client-md-key'), + (b'client-md-key-bin', b'\x00\x01') +) + +_SERVER_INITIAL_METADATA = ( + (b'server-initial-md-key', b'server-initial-md-value'), + (b'server-initial-md-key-bin', b'\x00\x02') +) + +_SERVER_TRAILING_METADATA = ( + (b'server-trailing-md-key', b'server-trailing-md-value'), + (b'server-trailing-md-key-bin', b'\x00\x03') +) + + +def user_agent(metadata): + for key, val in metadata: + if key == b'user-agent': + return val.decode('ascii') + raise KeyError('No user agent!') + + +def validate_client_metadata(test, servicer_context): + test.assertTrue(test_common.metadata_transmitted( + _CLIENT_METADATA, servicer_context.invocation_metadata())) + test.assertTrue(user_agent(servicer_context.invocation_metadata()) + .startswith('primary-agent ' + _USER_AGENT)) + test.assertTrue(user_agent(servicer_context.invocation_metadata()) + .endswith('secondary-agent')) + + +def handle_unary_unary(test, request, servicer_context): + validate_client_metadata(test, servicer_context) + servicer_context.send_initial_metadata(_SERVER_INITIAL_METADATA) + servicer_context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + return _RESPONSE + + +def handle_unary_stream(test, request, servicer_context): + validate_client_metadata(test, servicer_context) + servicer_context.send_initial_metadata(_SERVER_INITIAL_METADATA) + servicer_context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + for _ in range(test_constants.STREAM_LENGTH): + yield _RESPONSE + + +def handle_stream_unary(test, request_iterator, servicer_context): + validate_client_metadata(test, servicer_context) + servicer_context.send_initial_metadata(_SERVER_INITIAL_METADATA) + servicer_context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + # TODO(issue:#6891) We should be able to remove this loop + for request in request_iterator: + pass + return _RESPONSE + + +def handle_stream_stream(test, request_iterator, servicer_context): + validate_client_metadata(test, servicer_context) + servicer_context.send_initial_metadata(_SERVER_INITIAL_METADATA) + servicer_context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + # TODO(issue:#6891) We should be able to remove this loop, + # and replace with return; yield + for request in request_iterator: + yield _RESPONSE + + +class _MethodHandler(grpc.RpcMethodHandler): + + def __init__(self, test, request_streaming, response_streaming): + self.request_streaming = request_streaming + self.response_streaming = response_streaming + self.request_deserializer = None + self.response_serializer = None + self.unary_unary = None + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + if self.request_streaming and self.response_streaming: + self.stream_stream = lambda x, y: handle_stream_stream(test, x, y) + elif self.request_streaming: + self.stream_unary = lambda x, y: handle_stream_unary(test, x, y) + elif self.response_streaming: + self.unary_stream = lambda x, y: handle_unary_stream(test, x, y) + else: + self.unary_unary = lambda x, y: handle_unary_unary(test, x, y) + + +class _GenericHandler(grpc.GenericRpcHandler): + + def __init__(self, test): + self._test = test + + def service(self, handler_call_details): + if handler_call_details.method == _UNARY_UNARY: + return _MethodHandler(self._test, False, False) + elif handler_call_details.method == _UNARY_STREAM: + return _MethodHandler(self._test, False, True) + elif handler_call_details.method == _STREAM_UNARY: + return _MethodHandler(self._test, True, False) + elif handler_call_details.method == _STREAM_STREAM: + return _MethodHandler(self._test, True, True) + else: + return None + + +class MetadataTest(unittest.TestCase): + + def setUp(self): + self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + self._server = grpc.server((_GenericHandler(weakref.proxy(self)),), + self._server_pool) + port = self._server.add_insecure_port('[::]:0') + self._server.start() + self._channel = grpc.insecure_channel('localhost:%d' % port, + options=_CHANNEL_ARGS) + + def tearDown(self): + self._server.stop(0) + + def testUnaryUnary(self): + multi_callable = self._channel.unary_unary(_UNARY_UNARY) + unused_response, call = multi_callable.with_call( + _REQUEST, metadata=_CLIENT_METADATA) + self.assertTrue(test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, call.initial_metadata())) + self.assertTrue(test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + + def testUnaryStream(self): + multi_callable = self._channel.unary_stream(_UNARY_STREAM) + call = multi_callable(_REQUEST, metadata=_CLIENT_METADATA) + self.assertTrue(test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, call.initial_metadata())) + for _ in call: + pass + self.assertTrue(test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + + def testStreamUnary(self): + multi_callable = self._channel.stream_unary(_STREAM_UNARY) + unused_response, call = multi_callable.with_call( + [_REQUEST] * test_constants.STREAM_LENGTH, + metadata=_CLIENT_METADATA) + self.assertTrue(test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, call.initial_metadata())) + self.assertTrue(test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + + def testStreamStream(self): + multi_callable = self._channel.stream_stream(_STREAM_STREAM) + call = multi_callable([_REQUEST] * test_constants.STREAM_LENGTH, + metadata=_CLIENT_METADATA) + self.assertTrue(test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, call.initial_metadata())) + for _ in call: + pass + self.assertTrue(test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_rpc_test.py b/src/python/grpcio/tests/unit/_rpc_test.py index 1c7a14c5d0..9814504edf 100644 --- a/src/python/grpcio/tests/unit/_rpc_test.py +++ b/src/python/grpcio/tests/unit/_rpc_test.py @@ -27,7 +27,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Test of gRPC Python's application-layer API.""" +"""Test of RPCs made against gRPC Python's application-layer API.""" import itertools import threading @@ -41,9 +41,9 @@ from tests.unit.framework.common import test_constants from tests.unit.framework.common import test_control _SERIALIZE_REQUEST = lambda bytestring: bytestring * 2 -_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) / 2:] +_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:] _SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3 -_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) / 3] +_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3] _UNARY_UNARY = b'/test/UnaryUnary' _UNARY_STREAM = b'/test/UnaryStream' @@ -189,14 +189,7 @@ class RPCTest(unittest.TestCase): self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) self._server.start() - self._channel = grpc.insecure_channel(b'localhost:%d' % port) - - # TODO(nathaniel): Why is this necessary, and only in some development - # environments? - def tearDown(self): - del self._channel - del self._server - del self._server_pool + self._channel = grpc.insecure_channel('localhost:%d' % port) def testUnrecognizedMethod(self): request = b'abc' @@ -223,10 +216,9 @@ class RPCTest(unittest.TestCase): expected_response = self._handler.handle_unary_unary(request, None) multi_callable = _unary_unary_multi_callable(self._channel) - response, call = multi_callable( + response, call = multi_callable.with_call( request, metadata=( - (b'test', b'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),), - with_call=True) + (b'test', b'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),)) self.assertEqual(expected_response, response) self.assertIs(grpc.StatusCode.OK, call.code()) @@ -273,11 +265,11 @@ class RPCTest(unittest.TestCase): request_iterator = iter(requests) multi_callable = _stream_unary_multi_callable(self._channel) - response, call = multi_callable( + response, call = multi_callable.with_call( request_iterator, metadata=( (b'test', b'SuccessfulStreamRequestBlockingUnaryResponseWithCall'), - ), with_call=True) + )) self.assertEqual(expected_response, response) self.assertIs(grpc.StatusCode.OK, call.code()) @@ -532,10 +524,9 @@ class RPCTest(unittest.TestCase): multi_callable = _unary_unary_multi_callable(self._channel) with self._control.pause(): with self.assertRaises(grpc.RpcError) as exception_context: - multi_callable( + multi_callable.with_call( request, timeout=test_constants.SHORT_TIMEOUT, - metadata=((b'test', b'ExpiredUnaryRequestBlockingUnaryResponse'),), - with_call=True) + metadata=((b'test', b'ExpiredUnaryRequestBlockingUnaryResponse'),)) self.assertIsNotNone(exception_context.exception.initial_metadata()) self.assertIs( @@ -647,10 +638,9 @@ class RPCTest(unittest.TestCase): multi_callable = _unary_unary_multi_callable(self._channel) with self._control.fail(): with self.assertRaises(grpc.RpcError) as exception_context: - multi_callable( + multi_callable.with_call( request, - metadata=((b'test', b'FailedUnaryRequestBlockingUnaryResponse'),), - with_call=True) + metadata=((b'test', b'FailedUnaryRequestBlockingUnaryResponse'),)) self.assertIs(grpc.StatusCode.UNKNOWN, exception_context.exception.code()) diff --git a/src/python/grpcio/tests/unit/_thread_cleanup_test.py b/src/python/grpcio/tests/unit/_thread_cleanup_test.py new file mode 100644 index 0000000000..3e4f317edc --- /dev/null +++ b/src/python/grpcio/tests/unit/_thread_cleanup_test.py @@ -0,0 +1,117 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +"""Tests for CleanupThread.""" + +import threading +import time +import unittest + +from grpc import _common + +_SHORT_TIME = 0.5 +_LONG_TIME = 2.0 +_EPSILON = 0.1 + + +def cleanup(timeout): + if timeout is not None: + time.sleep(timeout) + else: + time.sleep(_LONG_TIME) + + +def slow_cleanup(timeout): + # Don't respect timeout + time.sleep(_LONG_TIME) + + +class CleanupThreadTest(unittest.TestCase): + + def testTargetInvocation(self): + event = threading.Event() + def target(arg1, arg2, arg3=None): + self.assertEqual('arg1', arg1) + self.assertEqual('arg2', arg2) + self.assertEqual('arg3', arg3) + event.set() + + cleanup_thread = _common.CleanupThread(behavior=lambda x: None, + target=target, name='test-name', + args=('arg1', 'arg2'), kwargs={'arg3': 'arg3'}) + cleanup_thread.start() + cleanup_thread.join() + self.assertEqual(cleanup_thread.name, 'test-name') + self.assertTrue(event.is_set()) + + def testJoinNoTimeout(self): + cleanup_thread = _common.CleanupThread(behavior=cleanup) + cleanup_thread.start() + start_time = time.time() + cleanup_thread.join() + end_time = time.time() + self.assertAlmostEqual(_LONG_TIME, end_time - start_time, delta=_EPSILON) + + def testJoinTimeout(self): + cleanup_thread = _common.CleanupThread(behavior=cleanup) + cleanup_thread.start() + start_time = time.time() + cleanup_thread.join(_SHORT_TIME) + end_time = time.time() + self.assertAlmostEqual(_SHORT_TIME, end_time - start_time, delta=_EPSILON) + + def testJoinTimeoutSlowBehavior(self): + cleanup_thread = _common.CleanupThread(behavior=slow_cleanup) + cleanup_thread.start() + start_time = time.time() + cleanup_thread.join(_SHORT_TIME) + end_time = time.time() + self.assertAlmostEqual(_LONG_TIME, end_time - start_time, delta=_EPSILON) + + def testJoinTimeoutSlowTarget(self): + event = threading.Event() + def target(): + event.wait(_LONG_TIME) + cleanup_thread = _common.CleanupThread(behavior=cleanup, target=target) + cleanup_thread.start() + start_time = time.time() + cleanup_thread.join(_SHORT_TIME) + end_time = time.time() + self.assertAlmostEqual(_SHORT_TIME, end_time - start_time, delta=_EPSILON) + event.set() + + def testJoinZeroTimeout(self): + cleanup_thread = _common.CleanupThread(behavior=cleanup) + cleanup_thread.start() + start_time = time.time() + cleanup_thread.join(0) + end_time = time.time() + self.assertAlmostEqual(0, end_time - start_time, delta=_EPSILON) + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/beta/_beta_features_test.py b/src/python/grpcio/tests/unit/beta/_beta_features_test.py index bb2893a21b..3a9701b8eb 100644 --- a/src/python/grpcio/tests/unit/beta/_beta_features_test.py +++ b/src/python/grpcio/tests/unit/beta/_beta_features_test.py @@ -42,8 +42,8 @@ from tests.unit.framework.common import test_constants _SERVER_HOST_OVERRIDE = 'foo.test.google.fr' -_PER_RPC_CREDENTIALS_METADATA_KEY = 'my-call-credentials-metadata-key' -_PER_RPC_CREDENTIALS_METADATA_VALUE = 'my-call-credentials-metadata-value' +_PER_RPC_CREDENTIALS_METADATA_KEY = b'my-call-credentials-metadata-key' +_PER_RPC_CREDENTIALS_METADATA_VALUE = b'my-call-credentials-metadata-value' _GROUP = 'group' _UNARY_UNARY = 'unary-unary' diff --git a/src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py b/src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py index 5dc8720639..488f7d7141 100644 --- a/src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py +++ b/src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py @@ -187,5 +187,15 @@ class ChannelConnectivityTest(unittest.TestCase): server_completion_queue_thread.join() +class ConnectivityStatesTest(unittest.TestCase): + + def testBetaConnectivityStates(self): + self.assertIsNotNone(interfaces.ChannelConnectivity.IDLE) + self.assertIsNotNone(interfaces.ChannelConnectivity.CONNECTING) + self.assertIsNotNone(interfaces.ChannelConnectivity.READY) + self.assertIsNotNone(interfaces.ChannelConnectivity.TRANSIENT_FAILURE) + self.assertIsNotNone(interfaces.ChannelConnectivity.FATAL_FAILURE) + + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/beta/_not_found_test.py b/src/python/grpcio/tests/unit/beta/_not_found_test.py index 44fcd1e13c..37b8c49120 100644 --- a/src/python/grpcio/tests/unit/beta/_not_found_test.py +++ b/src/python/grpcio/tests/unit/beta/_not_found_test.py @@ -61,7 +61,7 @@ class NotFoundTest(unittest.TestCase): def test_future_stream_unary_not_found(self): rpc_future = self._generic_stub.future_stream_unary( - 'grupe', 'mevvod', b'def', test_constants.LONG_TIMEOUT) + 'grupe', 'mevvod', [b'def'], test_constants.LONG_TIMEOUT) with self.assertRaises(face.LocalError) as exception_assertion_context: rpc_future.result() self.assertIs( diff --git a/src/python/grpcio/tests/unit/beta/test_utilities.py b/src/python/grpcio/tests/unit/beta/test_utilities.py index 0313e06a93..8ccad04e05 100644 --- a/src/python/grpcio/tests/unit/beta/test_utilities.py +++ b/src/python/grpcio/tests/unit/beta/test_utilities.py @@ -29,7 +29,7 @@ """Test-appropriate entry points into the gRPC Python Beta API.""" -from grpc._adapter import _intermediary_low +import grpc from grpc.beta import implementations @@ -48,9 +48,8 @@ def not_really_secure_channel( An implementations.Channel to the remote host through which RPCs may be conducted. """ - hostport = '%s:%d' % (host, port) - intermediary_low_channel = _intermediary_low.Channel( - hostport, channel_credentials._low_credentials, - server_host_override=server_host_override) - return implementations.Channel( - intermediary_low_channel._internal, intermediary_low_channel) + target = '%s:%d' % (host, port) + channel = grpc.secure_channel( + target, channel_credentials, + ((b'grpc.ssl_target_name_override', server_host_override,),)) + return implementations.Channel(channel) diff --git a/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py b/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py deleted file mode 100644 index 43457be362..0000000000 --- a/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py +++ /dev/null @@ -1,113 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests Face interface compliance of the crust-over-core stack.""" - -import collections -import unittest - -import six - -from grpc.framework.core import implementations as core_implementations -from grpc.framework.crust import implementations as crust_implementations -from grpc.framework.foundation import logging_pool -from grpc.framework.interfaces.links import utilities -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.face import test_cases -from tests.unit.framework.interfaces.face import test_interfaces -from tests.unit.framework.interfaces.links import test_utilities - - -class _Implementation(test_interfaces.Implementation): - - def instantiate( - self, methods, method_implementations, multi_method_implementation): - pool = logging_pool.pool(test_constants.POOL_SIZE) - servicer = crust_implementations.servicer( - method_implementations, multi_method_implementation, pool) - - service_end_link = core_implementations.service_end_link( - servicer, test_constants.DEFAULT_TIMEOUT, - test_constants.MAXIMUM_TIMEOUT) - invocation_end_link = core_implementations.invocation_end_link() - invocation_end_link.join_link(service_end_link) - service_end_link.join_link(invocation_end_link) - service_end_link.start() - invocation_end_link.start() - - generic_stub = crust_implementations.generic_stub(invocation_end_link, pool) - # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. - group = next(iter(methods))[0] - # TODO(nathaniel): Add a "cardinalities_by_group" attribute to - # _digest.TestServiceDigest. - cardinalities = { - method: method_object.cardinality() - for (group, method), method_object in six.iteritems(methods)} - dynamic_stub = crust_implementations.dynamic_stub( - invocation_end_link, group, cardinalities, pool) - - return generic_stub, {group: dynamic_stub}, ( - invocation_end_link, service_end_link, pool) - - def destantiate(self, memo): - invocation_end_link, service_end_link, pool = memo - invocation_end_link.stop(0).wait() - service_end_link.stop(0).wait() - invocation_end_link.join_link(utilities.NULL_LINK) - service_end_link.join_link(utilities.NULL_LINK) - pool.shutdown(wait=True) - - def invocation_metadata(self): - return object() - - def initial_metadata(self): - return object() - - def terminal_metadata(self): - return object() - - def code(self): - return object() - - def details(self): - return object() - - def metadata_transmitted(self, original_metadata, transmitted_metadata): - return original_metadata is transmitted_metadata - - -def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py b/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py deleted file mode 100644 index 1310292306..0000000000 --- a/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests the RPC Framework Core's implementation of the Base interface.""" - -import logging -import random -import time -import unittest - -from grpc.framework.core import implementations -from grpc.framework.interfaces.base import utilities -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.base import test_cases -from tests.unit.framework.interfaces.base import test_interfaces - - -class _Implementation(test_interfaces.Implementation): - - def __init__(self): - self._invocation_initial_metadata = object() - self._service_initial_metadata = object() - self._invocation_terminal_metadata = object() - self._service_terminal_metadata = object() - - def instantiate(self, serializations, servicer): - invocation = implementations.invocation_end_link() - service = implementations.service_end_link( - servicer, test_constants.DEFAULT_TIMEOUT, - test_constants.MAXIMUM_TIMEOUT) - invocation.join_link(service) - service.join_link(invocation) - return invocation, service, None - - def destantiate(self, memo): - pass - - def invocation_initial_metadata(self): - return self._invocation_initial_metadata - - def service_initial_metadata(self): - return self._service_initial_metadata - - def invocation_completion(self): - return utilities.completion(self._invocation_terminal_metadata, None, None) - - def service_completion(self): - return utilities.completion(self._service_terminal_metadata, None, None) - - def metadata_transmitted(self, original_metadata, transmitted_metadata): - return transmitted_metadata is original_metadata - - def completion_transmitted(self, original_completion, transmitted_completion): - return ( - (original_completion.terminal_metadata is - transmitted_completion.terminal_metadata) and - original_completion.code is transmitted_completion.code and - original_completion.message is transmitted_completion.message - ) - - -def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/framework/foundation/_later_test.py b/src/python/grpcio/tests/unit/framework/foundation/_later_test.py deleted file mode 100644 index 6c2459e185..0000000000 --- a/src/python/grpcio/tests/unit/framework/foundation/_later_test.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests of the later module.""" - -import threading -import time -import unittest - -from grpc.framework.foundation import later - -TICK = 0.1 - - -class LaterTest(unittest.TestCase): - - def test_simple_delay(self): - lock = threading.Lock() - cell = [0] - return_value = object() - - def computation(): - with lock: - cell[0] += 1 - return return_value - computation_future = later.later(TICK * 2, computation) - - self.assertFalse(computation_future.done()) - self.assertFalse(computation_future.cancelled()) - time.sleep(TICK) - self.assertFalse(computation_future.done()) - self.assertFalse(computation_future.cancelled()) - with lock: - self.assertEqual(0, cell[0]) - time.sleep(TICK * 2) - self.assertTrue(computation_future.done()) - self.assertFalse(computation_future.cancelled()) - with lock: - self.assertEqual(1, cell[0]) - self.assertEqual(return_value, computation_future.result()) - - def test_callback(self): - lock = threading.Lock() - cell = [0] - callback_called = [False] - future_passed_to_callback = [None] - def computation(): - with lock: - cell[0] += 1 - computation_future = later.later(TICK * 2, computation) - def callback(outcome): - with lock: - callback_called[0] = True - future_passed_to_callback[0] = outcome - computation_future.add_done_callback(callback) - time.sleep(TICK) - with lock: - self.assertFalse(callback_called[0]) - time.sleep(TICK * 2) - with lock: - self.assertTrue(callback_called[0]) - self.assertTrue(future_passed_to_callback[0].done()) - - callback_called[0] = False - future_passed_to_callback[0] = None - - computation_future.add_done_callback(callback) - with lock: - self.assertTrue(callback_called[0]) - self.assertTrue(future_passed_to_callback[0].done()) - - def test_cancel(self): - lock = threading.Lock() - cell = [0] - callback_called = [False] - future_passed_to_callback = [None] - def computation(): - with lock: - cell[0] += 1 - computation_future = later.later(TICK * 2, computation) - def callback(outcome): - with lock: - callback_called[0] = True - future_passed_to_callback[0] = outcome - computation_future.add_done_callback(callback) - time.sleep(TICK) - with lock: - self.assertFalse(callback_called[0]) - computation_future.cancel() - self.assertTrue(computation_future.cancelled()) - self.assertFalse(computation_future.running()) - self.assertTrue(computation_future.done()) - with lock: - self.assertTrue(callback_called[0]) - self.assertTrue(future_passed_to_callback[0].cancelled()) - - def test_result(self): - lock = threading.Lock() - cell = [0] - callback_called = [False] - future_passed_to_callback_cell = [None] - return_value = object() - - def computation(): - with lock: - cell[0] += 1 - return return_value - computation_future = later.later(TICK * 2, computation) - - def callback(future_passed_to_callback): - with lock: - callback_called[0] = True - future_passed_to_callback_cell[0] = future_passed_to_callback - computation_future.add_done_callback(callback) - returned_value = computation_future.result() - self.assertEqual(return_value, returned_value) - - # The callback may not yet have been called! Sleep a tick. - time.sleep(TICK) - with lock: - self.assertTrue(callback_called[0]) - self.assertEqual(return_value, future_passed_to_callback_cell[0].result()) - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py b/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py index 4f8e26c9a2..5d16bf98be 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py @@ -29,6 +29,8 @@ """Tests of the base interface of RPC Framework.""" +from __future__ import division + import logging import random import threading @@ -54,13 +56,13 @@ class _Serialization(test_interfaces.Serialization): return request + request def deserialize_request(self, serialized_request): - return serialized_request[:len(serialized_request) / 2] + return serialized_request[:len(serialized_request) // 2] def serialize_response(self, response): return response * 3 def deserialize_response(self, serialized_response): - return serialized_response[2 * len(serialized_response) / 3:] + return serialized_response[2 * len(serialized_response) // 3:] def _advance(quadruples, operator, controller): diff --git a/src/python/grpcio/tests/unit/test_common.py b/src/python/grpcio/tests/unit/test_common.py index 7b4d20dccb..b779f65e7e 100644 --- a/src/python/grpcio/tests/unit/test_common.py +++ b/src/python/grpcio/tests/unit/test_common.py @@ -61,6 +61,10 @@ def metadata_transmitted(original_metadata, transmitted_metadata): original = collections.defaultdict(list) for key_value_pair in original_metadata: key, value = tuple(key_value_pair) + if not isinstance(key, bytes): + key = key.encode() + if not isinstance(value, bytes): + value = value.encode() original[key].append(value) transmitted = collections.defaultdict(list) for key_value_pair in transmitted_metadata: |