aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/tests/unit
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2016-06-18 02:37:23 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2016-06-18 02:37:23 +0000
commit76233f5c2fc67247414fb4ac85676706a647555e (patch)
tree3332bd60c1168c3a9898a11c47affe1529841df5 /src/python/grpcio/tests/unit
parentb6163dfb4af3bf11df6cf3f051015ec3ee4a0f68 (diff)
Delete tests of dead pre-GA code
Diffstat (limited to 'src/python/grpcio/tests/unit')
-rw-r--r--src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py429
-rw-r--r--src/python/grpcio/tests/unit/_adapter/_low_test.py319
-rw-r--r--src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py157
-rw-r--r--src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py163
-rw-r--r--src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py88
-rw-r--r--src/python/grpcio/tests/unit/_links/_transmission_test.py239
-rw-r--r--src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py113
-rw-r--r--src/python/grpcio/tests/unit/framework/core/_base_interface_test.py96
-rw-r--r--src/python/grpcio/tests/unit/framework/foundation/_later_test.py151
9 files changed, 0 insertions, 1755 deletions
diff --git a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py b/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py
deleted file mode 100644
index 09ebdeff33..0000000000
--- a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py
+++ /dev/null
@@ -1,429 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests for the old '_low'."""
-
-import threading
-import time
-import unittest
-
-import six
-from six.moves import queue
-
-from grpc._adapter import _intermediary_low as _low
-
-_STREAM_LENGTH = 300
-_TIMEOUT = 5
-_AFTER_DELAY = 2
-_FUTURE = time.time() + 60 * 60 * 24
-_BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200
-_BYTE_SEQUENCE_SEQUENCE = tuple(
- bytes(bytearray((row + column) % 256 for column in range(row)))
- for row in range(_STREAM_LENGTH))
-
-
-class LonelyClientTest(unittest.TestCase):
-
- def testLonelyClient(self):
- host = 'nosuchhostexists'
- port = 54321
- method = 'test method'
- deadline = time.time() + _TIMEOUT
- after_deadline = deadline + _AFTER_DELAY
- metadata_tag = object()
- finish_tag = object()
-
- completion_queue = _low.CompletionQueue()
- channel = _low.Channel('%s:%d' % (host, port), None)
- client_call = _low.Call(channel, completion_queue, method, host, deadline)
-
- client_call.invoke(completion_queue, metadata_tag, finish_tag)
- first_event = completion_queue.get(after_deadline)
- self.assertIsNotNone(first_event)
- second_event = completion_queue.get(after_deadline)
- self.assertIsNotNone(second_event)
- kinds = [event.kind for event in (first_event, second_event)]
- six.assertCountEqual(self,
- (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH),
- kinds)
-
- self.assertIsNone(completion_queue.get(after_deadline))
-
- completion_queue.stop()
- stop_event = completion_queue.get(_FUTURE)
- self.assertEqual(_low.Event.Kind.STOP, stop_event.kind)
-
- del client_call
- del channel
- del completion_queue
-
-
-def _drive_completion_queue(completion_queue, event_queue):
- while True:
- event = completion_queue.get(_FUTURE)
- if event.kind is _low.Event.Kind.STOP:
- break
- event_queue.put(event)
-
-
-class EchoTest(unittest.TestCase):
-
- def setUp(self):
- self.host = 'localhost'
-
- self.server_completion_queue = _low.CompletionQueue()
- self.server = _low.Server(self.server_completion_queue)
- port = self.server.add_http2_addr('[::]:0')
- self.server.start()
- self.server_events = queue.Queue()
- self.server_completion_queue_thread = threading.Thread(
- target=_drive_completion_queue,
- args=(self.server_completion_queue, self.server_events))
- self.server_completion_queue_thread.start()
-
- self.client_completion_queue = _low.CompletionQueue()
- self.channel = _low.Channel('%s:%d' % (self.host, port), None)
- self.client_events = queue.Queue()
- self.client_completion_queue_thread = threading.Thread(
- target=_drive_completion_queue,
- args=(self.client_completion_queue, self.client_events))
- self.client_completion_queue_thread.start()
-
- def tearDown(self):
- self.server.stop()
- self.server.cancel_all_calls()
- self.server_completion_queue.stop()
- self.client_completion_queue.stop()
- self.server_completion_queue_thread.join()
- self.client_completion_queue_thread.join()
- del self.server
-
- def _perform_echo_test(self, test_data):
- method = 'test method'
- details = 'test details'
- server_leading_metadata_key = 'my_server_leading_key'
- server_leading_metadata_value = 'my_server_leading_value'
- server_trailing_metadata_key = 'my_server_trailing_key'
- server_trailing_metadata_value = 'my_server_trailing_value'
- client_metadata_key = 'my_client_key'
- client_metadata_value = 'my_client_value'
- server_leading_binary_metadata_key = 'my_server_leading_key-bin'
- server_leading_binary_metadata_value = b'\0'*2047
- server_trailing_binary_metadata_key = 'my_server_trailing_key-bin'
- server_trailing_binary_metadata_value = b'\0'*2047
- client_binary_metadata_key = 'my_client_key-bin'
- client_binary_metadata_value = b'\0'*2047
- deadline = _FUTURE
- metadata_tag = object()
- finish_tag = object()
- write_tag = object()
- complete_tag = object()
- service_tag = object()
- read_tag = object()
- status_tag = object()
-
- server_data = []
- client_data = []
-
- client_call = _low.Call(self.channel, self.client_completion_queue,
- method, self.host, deadline)
- client_call.add_metadata(client_metadata_key, client_metadata_value)
- client_call.add_metadata(client_binary_metadata_key,
- client_binary_metadata_value)
-
- client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
-
- self.server.service(service_tag)
- service_accepted = self.server_events.get()
- self.assertIsNotNone(service_accepted)
- self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
- self.assertIs(service_accepted.tag, service_tag)
- self.assertEqual(method.encode(), service_accepted.service_acceptance.method)
- self.assertEqual(self.host.encode(), service_accepted.service_acceptance.host)
- self.assertIsNotNone(service_accepted.service_acceptance.call)
- metadata = dict(service_accepted.metadata)
- self.assertIn(client_metadata_key.encode(), metadata)
- self.assertEqual(client_metadata_value.encode(), metadata[client_metadata_key.encode()])
- self.assertIn(client_binary_metadata_key.encode(), metadata)
- self.assertEqual(client_binary_metadata_value,
- metadata[client_binary_metadata_key.encode()])
- server_call = service_accepted.service_acceptance.call
- server_call.accept(self.server_completion_queue, finish_tag)
- server_call.add_metadata(server_leading_metadata_key,
- server_leading_metadata_value)
- server_call.add_metadata(server_leading_binary_metadata_key,
- server_leading_binary_metadata_value)
- server_call.premetadata()
-
- metadata_accepted = self.client_events.get()
- self.assertIsNotNone(metadata_accepted)
- self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
- self.assertEqual(metadata_tag, metadata_accepted.tag)
- metadata = dict(metadata_accepted.metadata)
- self.assertIn(server_leading_metadata_key.encode(), metadata)
- self.assertEqual(server_leading_metadata_value.encode(),
- metadata[server_leading_metadata_key.encode()])
- self.assertIn(server_leading_binary_metadata_key.encode(), metadata)
- self.assertEqual(server_leading_binary_metadata_value,
- metadata[server_leading_binary_metadata_key.encode()])
-
- for datum in test_data:
- client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS)
- write_accepted = self.client_events.get()
- self.assertIsNotNone(write_accepted)
- self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
- self.assertIs(write_accepted.tag, write_tag)
- self.assertIs(write_accepted.write_accepted, True)
-
- server_call.read(read_tag)
- read_accepted = self.server_events.get()
- self.assertIsNotNone(read_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertEqual(read_tag, read_accepted.tag)
- self.assertIsNotNone(read_accepted.bytes)
- server_data.append(read_accepted.bytes)
-
- server_call.write(read_accepted.bytes, write_tag, 0)
- write_accepted = self.server_events.get()
- self.assertIsNotNone(write_accepted)
- self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
- self.assertEqual(write_tag, write_accepted.tag)
- self.assertTrue(write_accepted.write_accepted)
-
- client_call.read(read_tag)
- read_accepted = self.client_events.get()
- self.assertIsNotNone(read_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertEqual(read_tag, read_accepted.tag)
- self.assertIsNotNone(read_accepted.bytes)
- client_data.append(read_accepted.bytes)
-
- client_call.complete(complete_tag)
- complete_accepted = self.client_events.get()
- self.assertIsNotNone(complete_accepted)
- self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED)
- self.assertIs(complete_accepted.tag, complete_tag)
- self.assertIs(complete_accepted.complete_accepted, True)
-
- server_call.read(read_tag)
- read_accepted = self.server_events.get()
- self.assertIsNotNone(read_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertEqual(read_tag, read_accepted.tag)
- self.assertIsNone(read_accepted.bytes)
-
- server_call.add_metadata(server_trailing_metadata_key,
- server_trailing_metadata_value)
- server_call.add_metadata(server_trailing_binary_metadata_key,
- server_trailing_binary_metadata_value)
-
- server_call.status(_low.Status(_low.Code.OK, details), status_tag)
- server_terminal_event_one = self.server_events.get()
- server_terminal_event_two = self.server_events.get()
- if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED:
- status_accepted = server_terminal_event_one
- rpc_accepted = server_terminal_event_two
- else:
- status_accepted = server_terminal_event_two
- rpc_accepted = server_terminal_event_one
- self.assertIsNotNone(status_accepted)
- self.assertIsNotNone(rpc_accepted)
- self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind)
- self.assertEqual(status_tag, status_accepted.tag)
- self.assertTrue(status_accepted.complete_accepted)
- self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
- self.assertEqual(finish_tag, rpc_accepted.tag)
- self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status)
-
- client_call.read(read_tag)
- client_terminal_event_one = self.client_events.get()
- client_terminal_event_two = self.client_events.get()
- if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
- read_accepted = client_terminal_event_one
- finish_accepted = client_terminal_event_two
- else:
- read_accepted = client_terminal_event_two
- finish_accepted = client_terminal_event_one
- self.assertIsNotNone(read_accepted)
- self.assertIsNotNone(finish_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertEqual(read_tag, read_accepted.tag)
- self.assertIsNone(read_accepted.bytes)
- self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind)
- self.assertEqual(finish_tag, finish_accepted.tag)
- self.assertEqual(_low.Status(_low.Code.OK, details.encode()), finish_accepted.status)
- metadata = dict(finish_accepted.metadata)
- self.assertIn(server_trailing_metadata_key.encode(), metadata)
- self.assertEqual(server_trailing_metadata_value.encode(),
- metadata[server_trailing_metadata_key.encode()])
- self.assertIn(server_trailing_binary_metadata_key.encode(), metadata)
- self.assertEqual(server_trailing_binary_metadata_value,
- metadata[server_trailing_binary_metadata_key.encode()])
- self.assertSetEqual(set(key for key, _ in finish_accepted.metadata),
- set((server_trailing_metadata_key.encode(),
- server_trailing_binary_metadata_key.encode(),)))
-
- self.assertSequenceEqual(test_data, server_data)
- self.assertSequenceEqual(test_data, client_data)
-
- def testNoEcho(self):
- self._perform_echo_test(())
-
- def testOneByteEcho(self):
- self._perform_echo_test([b'\x07'])
-
- def testOneManyByteEcho(self):
- self._perform_echo_test([_BYTE_SEQUENCE])
-
- def testManyOneByteEchoes(self):
- self._perform_echo_test(
- [_BYTE_SEQUENCE[i:i+1] for i in range(len(_BYTE_SEQUENCE))])
-
- def testManyManyByteEchoes(self):
- self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE)
-
-
-class CancellationTest(unittest.TestCase):
-
- def setUp(self):
- self.host = 'localhost'
-
- self.server_completion_queue = _low.CompletionQueue()
- self.server = _low.Server(self.server_completion_queue)
- port = self.server.add_http2_addr('[::]:0')
- self.server.start()
- self.server_events = queue.Queue()
- self.server_completion_queue_thread = threading.Thread(
- target=_drive_completion_queue,
- args=(self.server_completion_queue, self.server_events))
- self.server_completion_queue_thread.start()
-
- self.client_completion_queue = _low.CompletionQueue()
- self.channel = _low.Channel('%s:%d' % (self.host, port), None)
- self.client_events = queue.Queue()
- self.client_completion_queue_thread = threading.Thread(
- target=_drive_completion_queue,
- args=(self.client_completion_queue, self.client_events))
- self.client_completion_queue_thread.start()
-
- def tearDown(self):
- self.server.stop()
- self.server.cancel_all_calls()
- self.server_completion_queue.stop()
- self.client_completion_queue.stop()
- self.server_completion_queue_thread.join()
- self.client_completion_queue_thread.join()
- del self.server
-
- def testCancellation(self):
- method = 'test method'
- deadline = _FUTURE
- metadata_tag = object()
- finish_tag = object()
- write_tag = object()
- service_tag = object()
- read_tag = object()
- test_data = _BYTE_SEQUENCE_SEQUENCE
-
- server_data = []
- client_data = []
-
- client_call = _low.Call(self.channel, self.client_completion_queue,
- method, self.host, deadline)
-
- client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
-
- self.server.service(service_tag)
- service_accepted = self.server_events.get()
- server_call = service_accepted.service_acceptance.call
-
- server_call.accept(self.server_completion_queue, finish_tag)
- server_call.premetadata()
-
- metadata_accepted = self.client_events.get()
- self.assertIsNotNone(metadata_accepted)
-
- for datum in test_data:
- client_call.write(datum, write_tag, 0)
- write_accepted = self.client_events.get()
-
- server_call.read(read_tag)
- read_accepted = self.server_events.get()
- server_data.append(read_accepted.bytes)
-
- server_call.write(read_accepted.bytes, write_tag, 0)
- write_accepted = self.server_events.get()
- self.assertIsNotNone(write_accepted)
-
- client_call.read(read_tag)
- read_accepted = self.client_events.get()
- client_data.append(read_accepted.bytes)
-
- client_call.cancel()
- # cancel() is idempotent.
- client_call.cancel()
- client_call.cancel()
- client_call.cancel()
-
- server_call.read(read_tag)
-
- server_terminal_event_one = self.server_events.get()
- server_terminal_event_two = self.server_events.get()
- if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
- read_accepted = server_terminal_event_one
- rpc_accepted = server_terminal_event_two
- else:
- read_accepted = server_terminal_event_two
- rpc_accepted = server_terminal_event_one
- self.assertIsNotNone(read_accepted)
- self.assertIsNotNone(rpc_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertIsNone(read_accepted.bytes)
- self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
- self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status)
-
- finish_event = self.client_events.get()
- self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
- self.assertEqual(_low.Status(_low.Code.CANCELLED, b'Cancelled'),
- finish_event.status)
-
- self.assertSequenceEqual(test_data, server_data)
- self.assertSequenceEqual(test_data, client_data)
-
-
-class ExpirationTest(unittest.TestCase):
-
- @unittest.skip('TODO(nathaniel): Expiration test!')
- def testExpiration(self):
- pass
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
-
diff --git a/src/python/grpcio/tests/unit/_adapter/_low_test.py b/src/python/grpcio/tests/unit/_adapter/_low_test.py
deleted file mode 100644
index e09a1f2564..0000000000
--- a/src/python/grpcio/tests/unit/_adapter/_low_test.py
+++ /dev/null
@@ -1,319 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import threading
-import time
-import unittest
-
-from grpc import _grpcio_metadata
-from grpc._adapter import _types
-from grpc._adapter import _low
-from tests.unit import test_common
-
-
-def wait_for_events(completion_queues, deadline):
- """
- Args:
- completion_queues: list of completion queues to wait for events on
- deadline: absolute deadline to wait until
-
- Returns:
- a sequence of events of length len(completion_queues).
- """
-
- results = [None] * len(completion_queues)
- lock = threading.Lock()
- threads = []
- def set_ith_result(i, completion_queue):
- result = completion_queue.next(deadline)
- with lock:
- results[i] = result
- for i, completion_queue in enumerate(completion_queues):
- thread = threading.Thread(target=set_ith_result,
- args=[i, completion_queue])
- thread.start()
- threads.append(thread)
- for thread in threads:
- thread.join()
- return results
-
-
-class InsecureServerInsecureClient(unittest.TestCase):
-
- def setUp(self):
- self.server_completion_queue = _low.CompletionQueue()
- self.server = _low.Server(self.server_completion_queue, [])
- self.port = self.server.add_http2_port('[::]:0')
- self.client_completion_queue = _low.CompletionQueue()
- self.client_channel = _low.Channel('localhost:%d'%self.port, [])
-
- self.server.start()
-
- def tearDown(self):
- self.server.shutdown()
- del self.client_channel
-
- self.client_completion_queue.shutdown()
- while (self.client_completion_queue.next(float('+inf')).type !=
- _types.EventType.QUEUE_SHUTDOWN):
- pass
- self.server_completion_queue.shutdown()
- while (self.server_completion_queue.next(float('+inf')).type !=
- _types.EventType.QUEUE_SHUTDOWN):
- pass
-
- del self.client_completion_queue
- del self.server_completion_queue
- del self.server
-
- def testEcho(self):
- deadline = time.time() + 5
- event_time_tolerance = 2
- deadline_tolerance = 0.25
- client_metadata_ascii_key = 'key'
- client_metadata_ascii_value = 'val'
- client_metadata_bin_key = 'key-bin'
- client_metadata_bin_value = b'\0'*1000
- server_initial_metadata_key = 'init_me_me_me'
- server_initial_metadata_value = 'whodawha?'
- server_trailing_metadata_key = 'california_is_in_a_drought'
- server_trailing_metadata_value = 'zomg it is'
- server_status_code = _types.StatusCode.OK
- server_status_details = 'our work is never over'
- request = 'blarghaflargh'
- response = 'his name is robert paulson'
- method = 'twinkies'
- host = 'hostess'
- server_request_tag = object()
- request_call_result = self.server.request_call(self.server_completion_queue,
- server_request_tag)
-
- self.assertEqual(_types.CallError.OK, request_call_result)
-
- client_call_tag = object()
- client_call = self.client_channel.create_call(
- self.client_completion_queue, method, host, deadline)
- client_initial_metadata = [
- (client_metadata_ascii_key, client_metadata_ascii_value),
- (client_metadata_bin_key, client_metadata_bin_value)
- ]
- client_start_batch_result = client_call.start_batch([
- _types.OpArgs.send_initial_metadata(client_initial_metadata),
- _types.OpArgs.send_message(request, 0),
- _types.OpArgs.send_close_from_client(),
- _types.OpArgs.recv_initial_metadata(),
- _types.OpArgs.recv_message(),
- _types.OpArgs.recv_status_on_client()
- ], client_call_tag)
- self.assertEqual(_types.CallError.OK, client_start_batch_result)
-
- client_no_event, request_event, = wait_for_events(
- [self.client_completion_queue, self.server_completion_queue],
- time.time() + event_time_tolerance)
- self.assertEqual(client_no_event, None)
- self.assertEqual(_types.EventType.OP_COMPLETE, request_event.type)
- self.assertIsInstance(request_event.call, _low.Call)
- self.assertIs(server_request_tag, request_event.tag)
- self.assertEqual(1, len(request_event.results))
- received_initial_metadata = request_event.results[0].initial_metadata
- # Check that our metadata were transmitted
- self.assertTrue(test_common.metadata_transmitted(client_initial_metadata,
- received_initial_metadata))
- # Check that Python's user agent string is a part of the full user agent
- # string
- received_initial_metadata_dict = dict(received_initial_metadata)
- self.assertIn(b'user-agent', received_initial_metadata_dict)
- self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__).encode(),
- received_initial_metadata_dict[b'user-agent'])
- self.assertEqual(method.encode(), request_event.call_details.method)
- self.assertEqual(host.encode(), request_event.call_details.host)
- self.assertLess(abs(deadline - request_event.call_details.deadline),
- deadline_tolerance)
-
- # Check that the channel is connected, and that both it and the call have
- # the proper target and peer; do this after the first flurry of messages to
- # avoid the possibility that connection was delayed by the core until the
- # first message was sent.
- self.assertEqual(_types.ConnectivityState.READY,
- self.client_channel.check_connectivity_state(False))
- self.assertIsNotNone(self.client_channel.target())
- self.assertIsNotNone(client_call.peer())
-
- server_call_tag = object()
- server_call = request_event.call
- server_initial_metadata = [
- (server_initial_metadata_key, server_initial_metadata_value)
- ]
- server_trailing_metadata = [
- (server_trailing_metadata_key, server_trailing_metadata_value)
- ]
- server_start_batch_result = server_call.start_batch([
- _types.OpArgs.send_initial_metadata(server_initial_metadata),
- _types.OpArgs.recv_message(),
- _types.OpArgs.send_message(response, 0),
- _types.OpArgs.recv_close_on_server(),
- _types.OpArgs.send_status_from_server(
- server_trailing_metadata, server_status_code, server_status_details)
- ], server_call_tag)
- self.assertEqual(_types.CallError.OK, server_start_batch_result)
-
- client_event, server_event, = wait_for_events(
- [self.client_completion_queue, self.server_completion_queue],
- time.time() + event_time_tolerance)
-
- self.assertEqual(6, len(client_event.results))
- found_client_op_types = set()
- for client_result in client_event.results:
- # we expect each op type to be unique
- self.assertNotIn(client_result.type, found_client_op_types)
- found_client_op_types.add(client_result.type)
- if client_result.type == _types.OpType.RECV_INITIAL_METADATA:
- self.assertTrue(
- test_common.metadata_transmitted(server_initial_metadata,
- client_result.initial_metadata))
- elif client_result.type == _types.OpType.RECV_MESSAGE:
- self.assertEqual(response.encode(), client_result.message)
- elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT:
- self.assertTrue(
- test_common.metadata_transmitted(server_trailing_metadata,
- client_result.trailing_metadata))
- self.assertEqual(server_status_details.encode(), client_result.status.details)
- self.assertEqual(server_status_code, client_result.status.code)
- self.assertEqual(set([
- _types.OpType.SEND_INITIAL_METADATA,
- _types.OpType.SEND_MESSAGE,
- _types.OpType.SEND_CLOSE_FROM_CLIENT,
- _types.OpType.RECV_INITIAL_METADATA,
- _types.OpType.RECV_MESSAGE,
- _types.OpType.RECV_STATUS_ON_CLIENT
- ]), found_client_op_types)
-
- self.assertEqual(5, len(server_event.results))
- found_server_op_types = set()
- for server_result in server_event.results:
- self.assertNotIn(client_result.type, found_server_op_types)
- found_server_op_types.add(server_result.type)
- if server_result.type == _types.OpType.RECV_MESSAGE:
- self.assertEqual(request.encode(), server_result.message)
- elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER:
- self.assertFalse(server_result.cancelled)
- self.assertEqual(set([
- _types.OpType.SEND_INITIAL_METADATA,
- _types.OpType.RECV_MESSAGE,
- _types.OpType.SEND_MESSAGE,
- _types.OpType.RECV_CLOSE_ON_SERVER,
- _types.OpType.SEND_STATUS_FROM_SERVER
- ]), found_server_op_types)
-
- del client_call
- del server_call
-
-
-class HangingServerShutdown(unittest.TestCase):
-
- def setUp(self):
- self.server_completion_queue = _low.CompletionQueue()
- self.server = _low.Server(self.server_completion_queue, [])
- self.port = self.server.add_http2_port('[::]:0')
- self.client_completion_queue = _low.CompletionQueue()
- self.client_channel = _low.Channel('localhost:%d'%self.port, [])
-
- self.server.start()
-
- def tearDown(self):
- self.server.shutdown()
- del self.client_channel
-
- self.client_completion_queue.shutdown()
- self.server_completion_queue.shutdown()
- while True:
- client_event, server_event = wait_for_events(
- [self.client_completion_queue, self.server_completion_queue],
- float("+inf"))
- if (client_event.type == _types.EventType.QUEUE_SHUTDOWN and
- server_event.type == _types.EventType.QUEUE_SHUTDOWN):
- break
-
- del self.client_completion_queue
- del self.server_completion_queue
- del self.server
-
- def testHangingServerCall(self):
- deadline = time.time() + 5
- deadline_tolerance = 0.25
- event_time_tolerance = 2
- cancel_all_calls_time_tolerance = 0.5
- request = 'blarghaflargh'
- method = 'twinkies'
- host = 'hostess'
- server_request_tag = object()
- request_call_result = self.server.request_call(self.server_completion_queue,
- server_request_tag)
-
- client_call_tag = object()
- client_call = self.client_channel.create_call(self.client_completion_queue,
- method, host, deadline)
- client_start_batch_result = client_call.start_batch([
- _types.OpArgs.send_initial_metadata([]),
- _types.OpArgs.send_message(request, 0),
- _types.OpArgs.send_close_from_client(),
- _types.OpArgs.recv_initial_metadata(),
- _types.OpArgs.recv_message(),
- _types.OpArgs.recv_status_on_client()
- ], client_call_tag)
-
- client_no_event, request_event, = wait_for_events(
- [self.client_completion_queue, self.server_completion_queue],
- time.time() + event_time_tolerance)
-
- # Now try to shutdown the server and expect that we see server shutdown
- # almost immediately after calling cancel_all_calls.
-
- # First attempt to cancel all calls before shutting down, and expect
- # our state machine to catch the erroneous API use.
- with self.assertRaises(RuntimeError):
- self.server.cancel_all_calls()
-
- shutdown_tag = object()
- self.server.shutdown(shutdown_tag)
- pre_cancel_timestamp = time.time()
- self.server.cancel_all_calls()
- finish_shutdown_timestamp = None
- client_call_event, server_shutdown_event = wait_for_events(
- [self.client_completion_queue, self.server_completion_queue],
- time.time() + event_time_tolerance)
- self.assertIs(shutdown_tag, server_shutdown_event.tag)
- self.assertGreater(pre_cancel_timestamp + cancel_all_calls_time_tolerance,
- time.time())
-
- del client_call
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py b/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py
deleted file mode 100644
index 2b8981c752..0000000000
--- a/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py
+++ /dev/null
@@ -1,157 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests Base interface compliance of the core-over-gRPC-links stack."""
-
-import collections
-import logging
-import random
-import time
-import unittest
-
-import six
-
-from grpc._adapter import _intermediary_low
-from grpc._links import invocation
-from grpc._links import service
-from grpc.beta import interfaces as beta_interfaces
-from grpc.framework.core import implementations
-from grpc.framework.interfaces.base import utilities
-from tests.unit import test_common as grpc_test_common
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.base import test_cases
-from tests.unit.framework.interfaces.base import test_interfaces
-
-
-class _SerializationBehaviors(
- collections.namedtuple(
- '_SerializationBehaviors',
- ('request_serializers', 'request_deserializers', 'response_serializers',
- 'response_deserializers',))):
- pass
-
-
-class _Links(
- collections.namedtuple(
- '_Links',
- ('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link',
- 'service_end_link'))):
- pass
-
-
-def _serialization_behaviors_from_serializations(serializations):
- request_serializers = {}
- request_deserializers = {}
- response_serializers = {}
- response_deserializers = {}
- for (group, method), serialization in six.iteritems(serializations):
- request_serializers[group, method] = serialization.serialize_request
- request_deserializers[group, method] = serialization.deserialize_request
- response_serializers[group, method] = serialization.serialize_response
- response_deserializers[group, method] = serialization.deserialize_response
- return _SerializationBehaviors(
- request_serializers, request_deserializers, response_serializers,
- response_deserializers)
-
-
-class _Implementation(test_interfaces.Implementation):
-
- def instantiate(self, serializations, servicer):
- serialization_behaviors = _serialization_behaviors_from_serializations(
- serializations)
- invocation_end_link = implementations.invocation_end_link()
- service_end_link = implementations.service_end_link(
- servicer, test_constants.DEFAULT_TIMEOUT,
- test_constants.MAXIMUM_TIMEOUT)
- service_grpc_link = service.service_link(
- serialization_behaviors.request_deserializers,
- serialization_behaviors.response_serializers)
- port = service_grpc_link.add_port('[::]:0', None)
- channel = _intermediary_low.Channel('localhost:%d' % port, None)
- invocation_grpc_link = invocation.invocation_link(
- channel, b'localhost', None,
- serialization_behaviors.request_serializers,
- serialization_behaviors.response_deserializers)
-
- invocation_end_link.join_link(invocation_grpc_link)
- invocation_grpc_link.join_link(invocation_end_link)
- service_end_link.join_link(service_grpc_link)
- service_grpc_link.join_link(service_end_link)
- invocation_grpc_link.start()
- service_grpc_link.start()
- return invocation_end_link, service_end_link, (
- invocation_grpc_link, service_grpc_link)
-
- def destantiate(self, memo):
- invocation_grpc_link, service_grpc_link = memo
- invocation_grpc_link.stop()
- service_grpc_link.begin_stop()
- service_grpc_link.end_stop()
-
- def invocation_initial_metadata(self):
- return grpc_test_common.INVOCATION_INITIAL_METADATA
-
- def service_initial_metadata(self):
- return grpc_test_common.SERVICE_INITIAL_METADATA
-
- def invocation_completion(self):
- return utilities.completion(None, None, None)
-
- def service_completion(self):
- return utilities.completion(
- grpc_test_common.SERVICE_TERMINAL_METADATA,
- beta_interfaces.StatusCode.OK, grpc_test_common.DETAILS)
-
- def metadata_transmitted(self, original_metadata, transmitted_metadata):
- return original_metadata is None or grpc_test_common.metadata_transmitted(
- original_metadata, transmitted_metadata)
-
- def completion_transmitted(self, original_completion, transmitted_completion):
- if (original_completion.terminal_metadata is not None and
- not grpc_test_common.metadata_transmitted(
- original_completion.terminal_metadata,
- transmitted_completion.terminal_metadata)):
- return False
- elif original_completion.code is not transmitted_completion.code:
- return False
- elif original_completion.message != transmitted_completion.message:
- return False
- else:
- return True
-
-
-def load_tests(loader, tests, pattern):
- return unittest.TestSuite(
- tests=tuple(
- loader.loadTestsFromTestCase(test_case_class)
- for test_case_class in test_cases.test_cases(_Implementation())))
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py b/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py
deleted file mode 100644
index 50b9a5a824..0000000000
--- a/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py
+++ /dev/null
@@ -1,163 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests Face compliance of the crust-over-core-over-gRPC-links stack."""
-
-import collections
-import unittest
-
-import six
-
-from grpc._adapter import _intermediary_low
-from grpc._links import invocation
-from grpc._links import service
-from grpc.beta import interfaces as beta_interfaces
-from grpc.framework.core import implementations as core_implementations
-from grpc.framework.crust import implementations as crust_implementations
-from grpc.framework.foundation import logging_pool
-from grpc.framework.interfaces.links import utilities
-from tests.unit import test_common as grpc_test_common
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.face import test_cases
-from tests.unit.framework.interfaces.face import test_interfaces
-
-
-class _SerializationBehaviors(
- collections.namedtuple(
- '_SerializationBehaviors',
- ('request_serializers', 'request_deserializers', 'response_serializers',
- 'response_deserializers',))):
- pass
-
-
-def _serialization_behaviors_from_test_methods(test_methods):
- request_serializers = {}
- request_deserializers = {}
- response_serializers = {}
- response_deserializers = {}
- for (group, method), test_method in six.iteritems(test_methods):
- request_serializers[group, method] = test_method.serialize_request
- request_deserializers[group, method] = test_method.deserialize_request
- response_serializers[group, method] = test_method.serialize_response
- response_deserializers[group, method] = test_method.deserialize_response
- return _SerializationBehaviors(
- request_serializers, request_deserializers, response_serializers,
- response_deserializers)
-
-
-class _Implementation(test_interfaces.Implementation):
-
- def instantiate(
- self, methods, method_implementations, multi_method_implementation):
- pool = logging_pool.pool(test_constants.POOL_SIZE)
- servicer = crust_implementations.servicer(
- method_implementations, multi_method_implementation, pool)
- serialization_behaviors = _serialization_behaviors_from_test_methods(
- methods)
- invocation_end_link = core_implementations.invocation_end_link()
- service_end_link = core_implementations.service_end_link(
- servicer, test_constants.DEFAULT_TIMEOUT,
- test_constants.MAXIMUM_TIMEOUT)
- service_grpc_link = service.service_link(
- serialization_behaviors.request_deserializers,
- serialization_behaviors.response_serializers)
- port = service_grpc_link.add_port('[::]:0', None)
- channel = _intermediary_low.Channel('localhost:%d' % port, None)
- invocation_grpc_link = invocation.invocation_link(
- channel, b'localhost', None,
- serialization_behaviors.request_serializers,
- serialization_behaviors.response_deserializers)
-
- invocation_end_link.join_link(invocation_grpc_link)
- invocation_grpc_link.join_link(invocation_end_link)
- service_grpc_link.join_link(service_end_link)
- service_end_link.join_link(service_grpc_link)
- service_end_link.start()
- invocation_end_link.start()
- invocation_grpc_link.start()
- service_grpc_link.start()
-
- generic_stub = crust_implementations.generic_stub(invocation_end_link, pool)
- # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
- group = next(iter(methods))[0]
- # TODO(nathaniel): Add a "cardinalities_by_group" attribute to
- # _digest.TestServiceDigest.
- cardinalities = {
- method: method_object.cardinality()
- for (group, method), method_object in six.iteritems(methods)}
- dynamic_stub = crust_implementations.dynamic_stub(
- invocation_end_link, group, cardinalities, pool)
-
- return generic_stub, {group: dynamic_stub}, (
- invocation_end_link, invocation_grpc_link, service_grpc_link,
- service_end_link, pool)
-
- def destantiate(self, memo):
- (invocation_end_link, invocation_grpc_link, service_grpc_link,
- service_end_link, pool) = memo
- invocation_end_link.stop(0).wait()
- invocation_grpc_link.stop()
- service_grpc_link.begin_stop()
- service_end_link.stop(0).wait()
- service_grpc_link.end_stop()
- invocation_end_link.join_link(utilities.NULL_LINK)
- invocation_grpc_link.join_link(utilities.NULL_LINK)
- service_grpc_link.join_link(utilities.NULL_LINK)
- service_end_link.join_link(utilities.NULL_LINK)
- pool.shutdown(wait=True)
-
- def invocation_metadata(self):
- return grpc_test_common.INVOCATION_INITIAL_METADATA
-
- def initial_metadata(self):
- return grpc_test_common.SERVICE_INITIAL_METADATA
-
- def terminal_metadata(self):
- return grpc_test_common.SERVICE_TERMINAL_METADATA
-
- def code(self):
- return beta_interfaces.StatusCode.OK
-
- def details(self):
- return grpc_test_common.DETAILS
-
- def metadata_transmitted(self, original_metadata, transmitted_metadata):
- return original_metadata is None or grpc_test_common.metadata_transmitted(
- original_metadata, transmitted_metadata)
-
-
-def load_tests(loader, tests, pattern):
- return unittest.TestSuite(
- tests=tuple(
- loader.loadTestsFromTestCase(test_case_class)
- for test_case_class in test_cases.test_cases(_Implementation())))
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py b/src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py
deleted file mode 100644
index 890755f81c..0000000000
--- a/src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py
+++ /dev/null
@@ -1,88 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""A test of invocation-side code unconnected to an RPC server."""
-
-import unittest
-
-from grpc._adapter import _intermediary_low
-from grpc._links import invocation
-from grpc.framework.interfaces.links import links
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.links import test_cases
-from tests.unit.framework.interfaces.links import test_utilities
-
-_NULL_BEHAVIOR = lambda unused_argument: None
-
-
-class LonelyInvocationLinkTest(unittest.TestCase):
-
- def testUpAndDown(self):
- channel = _intermediary_low.Channel('nonexistent:54321', None)
- invocation_link = invocation.invocation_link(
- channel, 'nonexistent', None, {}, {})
-
- invocation_link.start()
- invocation_link.stop()
-
- def _test_lonely_invocation_with_termination(self, termination):
- test_operation_id = object()
- test_group = 'test package.Test Service'
- test_method = 'test method'
- invocation_link_mate = test_utilities.RecordingLink()
-
- channel = _intermediary_low.Channel('nonexistent:54321', None)
- invocation_link = invocation.invocation_link(
- channel, 'nonexistent', None, {}, {})
- invocation_link.join_link(invocation_link_mate)
- invocation_link.start()
-
- ticket = links.Ticket(
- test_operation_id, 0, test_group, test_method,
- links.Ticket.Subscription.FULL, test_constants.SHORT_TIMEOUT, 1, None,
- None, None, None, None, termination, None)
- invocation_link.accept_ticket(ticket)
- invocation_link_mate.block_until_tickets_satisfy(test_cases.terminated)
-
- invocation_link.stop()
-
- self.assertIsNot(
- invocation_link_mate.tickets()[-1].termination,
- links.Ticket.Termination.COMPLETION)
-
- def testLonelyInvocationLinkWithCommencementTicket(self):
- self._test_lonely_invocation_with_termination(None)
-
- def testLonelyInvocationLinkWithEntireTicket(self):
- self._test_lonely_invocation_with_termination(
- links.Ticket.Termination.COMPLETION)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/src/python/grpcio/tests/unit/_links/_transmission_test.py b/src/python/grpcio/tests/unit/_links/_transmission_test.py
deleted file mode 100644
index 1f6edd18ca..0000000000
--- a/src/python/grpcio/tests/unit/_links/_transmission_test.py
+++ /dev/null
@@ -1,239 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests transmission of tickets across gRPC-on-the-wire."""
-
-import unittest
-
-from grpc._adapter import _intermediary_low
-from grpc._links import invocation
-from grpc._links import service
-from grpc.beta import interfaces as beta_interfaces
-from grpc.framework.interfaces.links import links
-from tests.unit import test_common
-from tests.unit._links import _proto_scenarios
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.links import test_cases
-from tests.unit.framework.interfaces.links import test_utilities
-
-_IDENTITY = lambda x: x
-
-
-class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
-
- def create_transmitting_links(self):
- service_link = service.service_link(
- {self.group_and_method(): self.deserialize_request},
- {self.group_and_method(): self.serialize_response})
- port = service_link.add_port('[::]:0', None)
- service_link.start()
- channel = _intermediary_low.Channel('localhost:%d' % port, None)
- invocation_link = invocation.invocation_link(
- channel, 'localhost', None,
- {self.group_and_method(): self.serialize_request},
- {self.group_and_method(): self.deserialize_response})
- invocation_link.start()
- return invocation_link, service_link
-
- def destroy_transmitting_links(self, invocation_side_link, service_side_link):
- invocation_side_link.stop()
- service_side_link.begin_stop()
- service_side_link.end_stop()
-
- def create_invocation_initial_metadata(self):
- return (
- ('first_invocation_initial_metadata_key', 'just a string value'),
- ('second_invocation_initial_metadata_key', '0123456789'),
- ('third_invocation_initial_metadata_key-bin', '\x00\x57' * 100),
- )
-
- def create_invocation_terminal_metadata(self):
- return None
-
- def create_service_initial_metadata(self):
- return (
- ('first_service_initial_metadata_key', 'just another string value'),
- ('second_service_initial_metadata_key', '9876543210'),
- ('third_service_initial_metadata_key-bin', '\x00\x59\x02' * 100),
- )
-
- def create_service_terminal_metadata(self):
- return (
- ('first_service_terminal_metadata_key', 'yet another string value'),
- ('second_service_terminal_metadata_key', 'abcdefghij'),
- ('third_service_terminal_metadata_key-bin', '\x00\x37' * 100),
- )
-
- def create_invocation_completion(self):
- return None, None
-
- def create_service_completion(self):
- return (
- beta_interfaces.StatusCode.OK, b'An exuberant test "details" message!')
-
- def assertMetadataTransmitted(self, original_metadata, transmitted_metadata):
- self.assertTrue(
- test_common.metadata_transmitted(
- original_metadata, transmitted_metadata),
- '%s erroneously transmitted as %s' % (
- original_metadata, transmitted_metadata))
-
-
-class RoundTripTest(unittest.TestCase):
-
- def testZeroMessageRoundTrip(self):
- test_operation_id = object()
- test_group = 'test package.Test Group'
- test_method = 'test method'
- identity_transformation = {(test_group, test_method): _IDENTITY}
- test_code = beta_interfaces.StatusCode.OK
- test_message = 'a test message'
-
- service_link = service.service_link(
- identity_transformation, identity_transformation)
- service_mate = test_utilities.RecordingLink()
- service_link.join_link(service_mate)
- port = service_link.add_port('[::]:0', None)
- service_link.start()
- channel = _intermediary_low.Channel('localhost:%d' % port, None)
- invocation_link = invocation.invocation_link(
- channel, None, None, identity_transformation, identity_transformation)
- invocation_mate = test_utilities.RecordingLink()
- invocation_link.join_link(invocation_mate)
- invocation_link.start()
-
- invocation_ticket = links.Ticket(
- test_operation_id, 0, test_group, test_method,
- links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
- None, None, None, None, links.Ticket.Termination.COMPLETION, None)
- invocation_link.accept_ticket(invocation_ticket)
- service_mate.block_until_tickets_satisfy(test_cases.terminated)
-
- service_ticket = links.Ticket(
- service_mate.tickets()[-1].operation_id, 0, None, None, None, None,
- None, None, None, None, test_code, test_message,
- links.Ticket.Termination.COMPLETION, None)
- service_link.accept_ticket(service_ticket)
- invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
-
- invocation_link.stop()
- service_link.begin_stop()
- service_link.end_stop()
-
- self.assertIs(
- service_mate.tickets()[-1].termination,
- links.Ticket.Termination.COMPLETION)
- self.assertIs(
- invocation_mate.tickets()[-1].termination,
- links.Ticket.Termination.COMPLETION)
- self.assertIs(invocation_mate.tickets()[-1].code, test_code)
- self.assertEqual(invocation_mate.tickets()[-1].message, test_message.encode())
-
- def _perform_scenario_test(self, scenario):
- test_operation_id = object()
- test_group, test_method = scenario.group_and_method()
- test_code = beta_interfaces.StatusCode.OK
- test_message = 'a scenario test message'
-
- service_link = service.service_link(
- {(test_group, test_method): scenario.deserialize_request},
- {(test_group, test_method): scenario.serialize_response})
- service_mate = test_utilities.RecordingLink()
- service_link.join_link(service_mate)
- port = service_link.add_port('[::]:0', None)
- service_link.start()
- channel = _intermediary_low.Channel('localhost:%d' % port, None)
- invocation_link = invocation.invocation_link(
- channel, 'localhost', None,
- {(test_group, test_method): scenario.serialize_request},
- {(test_group, test_method): scenario.deserialize_response})
- invocation_mate = test_utilities.RecordingLink()
- invocation_link.join_link(invocation_mate)
- invocation_link.start()
-
- invocation_ticket = links.Ticket(
- test_operation_id, 0, test_group, test_method,
- links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None,
- None, None, None, None, None, None)
- invocation_link.accept_ticket(invocation_ticket)
- requests = scenario.requests()
- for request_index, request in enumerate(requests):
- request_ticket = links.Ticket(
- test_operation_id, 1 + request_index, None, None, None, None, 1, None,
- request, None, None, None, None, None)
- invocation_link.accept_ticket(request_ticket)
- service_mate.block_until_tickets_satisfy(
- test_cases.at_least_n_payloads_received_predicate(1 + request_index))
- response_ticket = links.Ticket(
- service_mate.tickets()[0].operation_id, request_index, None, None,
- None, None, 1, None, scenario.response_for_request(request), None,
- None, None, None, None)
- service_link.accept_ticket(response_ticket)
- invocation_mate.block_until_tickets_satisfy(
- test_cases.at_least_n_payloads_received_predicate(1 + request_index))
- request_count = len(requests)
- invocation_completion_ticket = links.Ticket(
- test_operation_id, request_count + 1, None, None, None, None, None,
- None, None, None, None, None, links.Ticket.Termination.COMPLETION,
- None)
- invocation_link.accept_ticket(invocation_completion_ticket)
- service_mate.block_until_tickets_satisfy(test_cases.terminated)
- service_completion_ticket = links.Ticket(
- service_mate.tickets()[0].operation_id, request_count, None, None, None,
- None, None, None, None, None, test_code, test_message,
- links.Ticket.Termination.COMPLETION, None)
- service_link.accept_ticket(service_completion_ticket)
- invocation_mate.block_until_tickets_satisfy(test_cases.terminated)
-
- invocation_link.stop()
- service_link.begin_stop()
- service_link.end_stop()
-
- observed_requests = tuple(
- ticket.payload for ticket in service_mate.tickets()
- if ticket.payload is not None)
- observed_responses = tuple(
- ticket.payload for ticket in invocation_mate.tickets()
- if ticket.payload is not None)
- self.assertTrue(scenario.verify_requests(observed_requests))
- self.assertTrue(scenario.verify_responses(observed_responses))
-
- def testEmptyScenario(self):
- self._perform_scenario_test(_proto_scenarios.EmptyScenario())
-
- def testBidirectionallyUnaryScenario(self):
- self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario())
-
- def testBidirectionallyStreamingScenario(self):
- self._perform_scenario_test(
- _proto_scenarios.BidirectionallyStreamingScenario())
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py b/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py
deleted file mode 100644
index 43457be362..0000000000
--- a/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py
+++ /dev/null
@@ -1,113 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests Face interface compliance of the crust-over-core stack."""
-
-import collections
-import unittest
-
-import six
-
-from grpc.framework.core import implementations as core_implementations
-from grpc.framework.crust import implementations as crust_implementations
-from grpc.framework.foundation import logging_pool
-from grpc.framework.interfaces.links import utilities
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.face import test_cases
-from tests.unit.framework.interfaces.face import test_interfaces
-from tests.unit.framework.interfaces.links import test_utilities
-
-
-class _Implementation(test_interfaces.Implementation):
-
- def instantiate(
- self, methods, method_implementations, multi_method_implementation):
- pool = logging_pool.pool(test_constants.POOL_SIZE)
- servicer = crust_implementations.servicer(
- method_implementations, multi_method_implementation, pool)
-
- service_end_link = core_implementations.service_end_link(
- servicer, test_constants.DEFAULT_TIMEOUT,
- test_constants.MAXIMUM_TIMEOUT)
- invocation_end_link = core_implementations.invocation_end_link()
- invocation_end_link.join_link(service_end_link)
- service_end_link.join_link(invocation_end_link)
- service_end_link.start()
- invocation_end_link.start()
-
- generic_stub = crust_implementations.generic_stub(invocation_end_link, pool)
- # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
- group = next(iter(methods))[0]
- # TODO(nathaniel): Add a "cardinalities_by_group" attribute to
- # _digest.TestServiceDigest.
- cardinalities = {
- method: method_object.cardinality()
- for (group, method), method_object in six.iteritems(methods)}
- dynamic_stub = crust_implementations.dynamic_stub(
- invocation_end_link, group, cardinalities, pool)
-
- return generic_stub, {group: dynamic_stub}, (
- invocation_end_link, service_end_link, pool)
-
- def destantiate(self, memo):
- invocation_end_link, service_end_link, pool = memo
- invocation_end_link.stop(0).wait()
- service_end_link.stop(0).wait()
- invocation_end_link.join_link(utilities.NULL_LINK)
- service_end_link.join_link(utilities.NULL_LINK)
- pool.shutdown(wait=True)
-
- def invocation_metadata(self):
- return object()
-
- def initial_metadata(self):
- return object()
-
- def terminal_metadata(self):
- return object()
-
- def code(self):
- return object()
-
- def details(self):
- return object()
-
- def metadata_transmitted(self, original_metadata, transmitted_metadata):
- return original_metadata is transmitted_metadata
-
-
-def load_tests(loader, tests, pattern):
- return unittest.TestSuite(
- tests=tuple(
- loader.loadTestsFromTestCase(test_case_class)
- for test_case_class in test_cases.test_cases(_Implementation())))
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py b/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py
deleted file mode 100644
index 1310292306..0000000000
--- a/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py
+++ /dev/null
@@ -1,96 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests the RPC Framework Core's implementation of the Base interface."""
-
-import logging
-import random
-import time
-import unittest
-
-from grpc.framework.core import implementations
-from grpc.framework.interfaces.base import utilities
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.interfaces.base import test_cases
-from tests.unit.framework.interfaces.base import test_interfaces
-
-
-class _Implementation(test_interfaces.Implementation):
-
- def __init__(self):
- self._invocation_initial_metadata = object()
- self._service_initial_metadata = object()
- self._invocation_terminal_metadata = object()
- self._service_terminal_metadata = object()
-
- def instantiate(self, serializations, servicer):
- invocation = implementations.invocation_end_link()
- service = implementations.service_end_link(
- servicer, test_constants.DEFAULT_TIMEOUT,
- test_constants.MAXIMUM_TIMEOUT)
- invocation.join_link(service)
- service.join_link(invocation)
- return invocation, service, None
-
- def destantiate(self, memo):
- pass
-
- def invocation_initial_metadata(self):
- return self._invocation_initial_metadata
-
- def service_initial_metadata(self):
- return self._service_initial_metadata
-
- def invocation_completion(self):
- return utilities.completion(self._invocation_terminal_metadata, None, None)
-
- def service_completion(self):
- return utilities.completion(self._service_terminal_metadata, None, None)
-
- def metadata_transmitted(self, original_metadata, transmitted_metadata):
- return transmitted_metadata is original_metadata
-
- def completion_transmitted(self, original_completion, transmitted_completion):
- return (
- (original_completion.terminal_metadata is
- transmitted_completion.terminal_metadata) and
- original_completion.code is transmitted_completion.code and
- original_completion.message is transmitted_completion.message
- )
-
-
-def load_tests(loader, tests, pattern):
- return unittest.TestSuite(
- tests=tuple(
- loader.loadTestsFromTestCase(test_case_class)
- for test_case_class in test_cases.test_cases(_Implementation())))
-
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/framework/foundation/_later_test.py b/src/python/grpcio/tests/unit/framework/foundation/_later_test.py
deleted file mode 100644
index 6c2459e185..0000000000
--- a/src/python/grpcio/tests/unit/framework/foundation/_later_test.py
+++ /dev/null
@@ -1,151 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tests of the later module."""
-
-import threading
-import time
-import unittest
-
-from grpc.framework.foundation import later
-
-TICK = 0.1
-
-
-class LaterTest(unittest.TestCase):
-
- def test_simple_delay(self):
- lock = threading.Lock()
- cell = [0]
- return_value = object()
-
- def computation():
- with lock:
- cell[0] += 1
- return return_value
- computation_future = later.later(TICK * 2, computation)
-
- self.assertFalse(computation_future.done())
- self.assertFalse(computation_future.cancelled())
- time.sleep(TICK)
- self.assertFalse(computation_future.done())
- self.assertFalse(computation_future.cancelled())
- with lock:
- self.assertEqual(0, cell[0])
- time.sleep(TICK * 2)
- self.assertTrue(computation_future.done())
- self.assertFalse(computation_future.cancelled())
- with lock:
- self.assertEqual(1, cell[0])
- self.assertEqual(return_value, computation_future.result())
-
- def test_callback(self):
- lock = threading.Lock()
- cell = [0]
- callback_called = [False]
- future_passed_to_callback = [None]
- def computation():
- with lock:
- cell[0] += 1
- computation_future = later.later(TICK * 2, computation)
- def callback(outcome):
- with lock:
- callback_called[0] = True
- future_passed_to_callback[0] = outcome
- computation_future.add_done_callback(callback)
- time.sleep(TICK)
- with lock:
- self.assertFalse(callback_called[0])
- time.sleep(TICK * 2)
- with lock:
- self.assertTrue(callback_called[0])
- self.assertTrue(future_passed_to_callback[0].done())
-
- callback_called[0] = False
- future_passed_to_callback[0] = None
-
- computation_future.add_done_callback(callback)
- with lock:
- self.assertTrue(callback_called[0])
- self.assertTrue(future_passed_to_callback[0].done())
-
- def test_cancel(self):
- lock = threading.Lock()
- cell = [0]
- callback_called = [False]
- future_passed_to_callback = [None]
- def computation():
- with lock:
- cell[0] += 1
- computation_future = later.later(TICK * 2, computation)
- def callback(outcome):
- with lock:
- callback_called[0] = True
- future_passed_to_callback[0] = outcome
- computation_future.add_done_callback(callback)
- time.sleep(TICK)
- with lock:
- self.assertFalse(callback_called[0])
- computation_future.cancel()
- self.assertTrue(computation_future.cancelled())
- self.assertFalse(computation_future.running())
- self.assertTrue(computation_future.done())
- with lock:
- self.assertTrue(callback_called[0])
- self.assertTrue(future_passed_to_callback[0].cancelled())
-
- def test_result(self):
- lock = threading.Lock()
- cell = [0]
- callback_called = [False]
- future_passed_to_callback_cell = [None]
- return_value = object()
-
- def computation():
- with lock:
- cell[0] += 1
- return return_value
- computation_future = later.later(TICK * 2, computation)
-
- def callback(future_passed_to_callback):
- with lock:
- callback_called[0] = True
- future_passed_to_callback_cell[0] = future_passed_to_callback
- computation_future.add_done_callback(callback)
- returned_value = computation_future.result()
- self.assertEqual(return_value, returned_value)
-
- # The callback may not yet have been called! Sleep a tick.
- time.sleep(TICK)
- with lock:
- self.assertTrue(callback_called[0])
- self.assertEqual(return_value, future_passed_to_callback_cell[0].result())
-
-if __name__ == '__main__':
- unittest.main(verbosity=2)