aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2015-06-17 17:36:58 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2015-06-17 17:36:58 +0000
commitc6636c01a55d6a9d59ccd45223a140172343c38e (patch)
tree59c8f97ba568b77c90f64b5d301cac2f964567b9 /src/python
parent4c32de585a0f95139d0c2487bc4f4dc954ed2be7 (diff)
Adapt _intermediary_low_test to no backup poller
Pull request #1577 removes the backup poller internal to completion queues so this test needs to use extra threads to drive the work done inside the completion queues by continuously calling each completion queue's get method.
Diffstat (limited to 'src/python')
-rw-r--r--src/python/src/grpc/_adapter/_intermediary_low_test.py95
1 files changed, 56 insertions, 39 deletions
diff --git a/src/python/src/grpc/_adapter/_intermediary_low_test.py b/src/python/src/grpc/_adapter/_intermediary_low_test.py
index 478346341b..1a9b0c69f3 100644
--- a/src/python/src/grpc/_adapter/_intermediary_low_test.py
+++ b/src/python/src/grpc/_adapter/_intermediary_low_test.py
@@ -29,6 +29,8 @@
"""Tests for the old '_low'."""
+import Queue
+import threading
import time
import unittest
@@ -43,6 +45,7 @@ _BYTE_SEQUENCE_SEQUENCE = tuple(
bytes(bytearray((row + column) % 256 for column in range(row)))
for row in range(_STREAM_LENGTH))
+
class LonelyClientTest(unittest.TestCase):
def testLonelyClient(self):
@@ -79,6 +82,14 @@ class LonelyClientTest(unittest.TestCase):
del completion_queue
+def _drive_completion_queue(completion_queue, event_queue):
+ while True:
+ event = completion_queue.get(_FUTURE)
+ if event.kind is _low.Event.Kind.STOP:
+ break
+ event_queue.put(event)
+
+
class EchoTest(unittest.TestCase):
def setUp(self):
@@ -88,24 +99,26 @@ class EchoTest(unittest.TestCase):
self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0')
self.server.start()
+ self.server_events = Queue.Queue()
+ self.server_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.server_completion_queue, self.server_events))
+ self.server_completion_queue_thread.start()
self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port), None)
+ self.client_events = Queue.Queue()
+ self.client_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.client_completion_queue, self.client_events))
+ self.client_completion_queue_thread.start()
def tearDown(self):
self.server.stop()
self.server_completion_queue.stop()
self.client_completion_queue.stop()
- while True:
- event = self.server_completion_queue.get(_FUTURE)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
- while True:
- event = self.client_completion_queue.get(_FUTURE)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
- self.server_completion_queue = None
- self.client_completion_queue = None
+ self.server_completion_queue_thread.join()
+ self.client_completion_queue_thread.join()
del self.server
def _perform_echo_test(self, test_data):
@@ -144,7 +157,7 @@ class EchoTest(unittest.TestCase):
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
self.server.service(service_tag)
- service_accepted = self.server_completion_queue.get(_FUTURE)
+ service_accepted = self.server_events.get()
self.assertIsNotNone(service_accepted)
self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
self.assertIs(service_accepted.tag, service_tag)
@@ -165,7 +178,7 @@ class EchoTest(unittest.TestCase):
server_leading_binary_metadata_value)
server_call.premetadata()
- metadata_accepted = self.client_completion_queue.get(_FUTURE)
+ metadata_accepted = self.client_events.get()
self.assertIsNotNone(metadata_accepted)
self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
self.assertEqual(metadata_tag, metadata_accepted.tag)
@@ -179,14 +192,14 @@ class EchoTest(unittest.TestCase):
for datum in test_data:
client_call.write(datum, write_tag)
- write_accepted = self.client_completion_queue.get(_FUTURE)
+ write_accepted = self.client_events.get()
self.assertIsNotNone(write_accepted)
self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
self.assertIs(write_accepted.tag, write_tag)
self.assertIs(write_accepted.write_accepted, True)
server_call.read(read_tag)
- read_accepted = self.server_completion_queue.get(_FUTURE)
+ read_accepted = self.server_events.get()
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
@@ -194,14 +207,14 @@ class EchoTest(unittest.TestCase):
server_data.append(read_accepted.bytes)
server_call.write(read_accepted.bytes, write_tag)
- write_accepted = self.server_completion_queue.get(_FUTURE)
+ write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted)
self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
self.assertEqual(write_tag, write_accepted.tag)
self.assertTrue(write_accepted.write_accepted)
client_call.read(read_tag)
- read_accepted = self.client_completion_queue.get(_FUTURE)
+ read_accepted = self.client_events.get()
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
@@ -209,14 +222,14 @@ class EchoTest(unittest.TestCase):
client_data.append(read_accepted.bytes)
client_call.complete(complete_tag)
- complete_accepted = self.client_completion_queue.get(_FUTURE)
+ complete_accepted = self.client_events.get()
self.assertIsNotNone(complete_accepted)
self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED)
self.assertIs(complete_accepted.tag, complete_tag)
self.assertIs(complete_accepted.complete_accepted, True)
server_call.read(read_tag)
- read_accepted = self.server_completion_queue.get(_FUTURE)
+ read_accepted = self.server_events.get()
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
@@ -228,8 +241,8 @@ class EchoTest(unittest.TestCase):
server_trailing_binary_metadata_value)
server_call.status(_low.Status(_low.Code.OK, details), status_tag)
- server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
- server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
+ server_terminal_event_one = self.server_events.get()
+ server_terminal_event_two = self.server_events.get()
if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED:
status_accepted = server_terminal_event_one
rpc_accepted = server_terminal_event_two
@@ -246,8 +259,8 @@ class EchoTest(unittest.TestCase):
self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status)
client_call.read(read_tag)
- client_terminal_event_one = self.client_completion_queue.get(_FUTURE)
- client_terminal_event_two = self.client_completion_queue.get(_FUTURE)
+ client_terminal_event_one = self.client_events.get()
+ client_terminal_event_two = self.client_events.get()
if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
read_accepted = client_terminal_event_one
finish_accepted = client_terminal_event_two
@@ -303,22 +316,26 @@ class CancellationTest(unittest.TestCase):
self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0')
self.server.start()
+ self.server_events = Queue.Queue()
+ self.server_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.server_completion_queue, self.server_events))
+ self.server_completion_queue_thread.start()
self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port), None)
+ self.client_events = Queue.Queue()
+ self.client_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.client_completion_queue, self.client_events))
+ self.client_completion_queue_thread.start()
def tearDown(self):
self.server.stop()
self.server_completion_queue.stop()
self.client_completion_queue.stop()
- while True:
- event = self.server_completion_queue.get(0)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
- while True:
- event = self.client_completion_queue.get(0)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
+ self.server_completion_queue_thread.join()
+ self.client_completion_queue_thread.join()
del self.server
def testCancellation(self):
@@ -340,29 +357,29 @@ class CancellationTest(unittest.TestCase):
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
self.server.service(service_tag)
- service_accepted = self.server_completion_queue.get(_FUTURE)
+ service_accepted = self.server_events.get()
server_call = service_accepted.service_acceptance.call
server_call.accept(self.server_completion_queue, finish_tag)
server_call.premetadata()
- metadata_accepted = self.client_completion_queue.get(_FUTURE)
+ metadata_accepted = self.client_events.get()
self.assertIsNotNone(metadata_accepted)
for datum in test_data:
client_call.write(datum, write_tag)
- write_accepted = self.client_completion_queue.get(_FUTURE)
+ write_accepted = self.client_events.get()
server_call.read(read_tag)
- read_accepted = self.server_completion_queue.get(_FUTURE)
+ read_accepted = self.server_events.get()
server_data.append(read_accepted.bytes)
server_call.write(read_accepted.bytes, write_tag)
- write_accepted = self.server_completion_queue.get(_FUTURE)
+ write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted)
client_call.read(read_tag)
- read_accepted = self.client_completion_queue.get(_FUTURE)
+ read_accepted = self.client_events.get()
client_data.append(read_accepted.bytes)
client_call.cancel()
@@ -373,8 +390,8 @@ class CancellationTest(unittest.TestCase):
server_call.read(read_tag)
- server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
- server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
+ server_terminal_event_one = self.server_events.get()
+ server_terminal_event_two = self.server_events.get()
if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
read_accepted = server_terminal_event_one
rpc_accepted = server_terminal_event_two
@@ -388,7 +405,7 @@ class CancellationTest(unittest.TestCase):
self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status)
- finish_event = self.client_completion_queue.get(_FUTURE)
+ finish_event = self.client_events.get()
self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'),
finish_event.status)