aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/unit/_cython
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2016-08-02 16:27:17 -0700
committerGravatar murgatroid99 <mlumish@google.com>2016-08-02 16:27:17 -0700
commit45475ac9a448e8680ec94c9e542eba263084ab65 (patch)
treea4b6a0b999e65e309bfd71bd815d8da313a8f2c5 /src/python/grpcio_tests/tests/unit/_cython
parentc4326ef25a5280b4254ab1c97c9f28d68f6ce852 (diff)
parent5e1fbd20619e30046aa6f5eba7a76ea5dd4db16a (diff)
Merge branch 'master' into node_generator_nested_messages
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/_cython')
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/.gitignore7
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/__init__.py28
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py222
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_channel_test.py82
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py251
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py433
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/test_utilities.py66
7 files changed, 1089 insertions, 0 deletions
diff --git a/src/python/grpcio_tests/tests/unit/_cython/.gitignore b/src/python/grpcio_tests/tests/unit/_cython/.gitignore
new file mode 100644
index 0000000000..c315029288
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/.gitignore
@@ -0,0 +1,7 @@
+*.h
+*.c
+*.a
+*.so
+*.dll
+*.pyc
+*.pyd
diff --git a/src/python/grpcio_tests/tests/unit/_cython/__init__.py b/src/python/grpcio_tests/tests/unit/_cython/__init__.py
new file mode 100644
index 0000000000..b89398809f
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/__init__.py
@@ -0,0 +1,28 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
new file mode 100644
index 0000000000..cf212c5653
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
@@ -0,0 +1,222 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test making many calls and immediately cancelling most of them."""
+
+import threading
+import unittest
+
+from grpc._cython import cygrpc
+from grpc.framework.foundation import logging_pool
+from tests.unit.framework.common import test_constants
+
+_INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
+_EMPTY_FLAGS = 0
+_EMPTY_METADATA = cygrpc.Metadata(())
+
+_SERVER_SHUTDOWN_TAG = 'server_shutdown'
+_REQUEST_CALL_TAG = 'request_call'
+_RECEIVE_CLOSE_ON_SERVER_TAG = 'receive_close_on_server'
+_RECEIVE_MESSAGE_TAG = 'receive_message'
+_SERVER_COMPLETE_CALL_TAG = 'server_complete_call'
+
+_SUCCESS_CALL_FRACTION = 1.0 / 8.0
+
+
+class _State(object):
+
+ def __init__(self):
+ self.condition = threading.Condition()
+ self.handlers_released = False
+ self.parked_handlers = 0
+ self.handled_rpcs = 0
+
+
+def _is_cancellation_event(event):
+ return (
+ event.tag is _RECEIVE_CLOSE_ON_SERVER_TAG and
+ event.batch_operations[0].received_cancelled)
+
+
+class _Handler(object):
+
+ def __init__(self, state, completion_queue, rpc_event):
+ self._state = state
+ self._lock = threading.Lock()
+ self._completion_queue = completion_queue
+ self._call = rpc_event.operation_call
+
+ def __call__(self):
+ with self._state.condition:
+ self._state.parked_handlers += 1
+ if self._state.parked_handlers == test_constants.THREAD_CONCURRENCY:
+ self._state.condition.notify_all()
+ while not self._state.handlers_released:
+ self._state.condition.wait()
+
+ with self._lock:
+ self._call.start_server_batch(
+ cygrpc.Operations(
+ (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),)),
+ _RECEIVE_CLOSE_ON_SERVER_TAG)
+ self._call.start_server_batch(
+ cygrpc.Operations((cygrpc.operation_receive_message(_EMPTY_FLAGS),)),
+ _RECEIVE_MESSAGE_TAG)
+ first_event = self._completion_queue.poll()
+ if _is_cancellation_event(first_event):
+ self._completion_queue.poll()
+ else:
+ with self._lock:
+ operations = (
+ cygrpc.operation_send_initial_metadata(
+ _EMPTY_METADATA, _EMPTY_FLAGS),
+ cygrpc.operation_send_message(b'\x79\x57', _EMPTY_FLAGS),
+ cygrpc.operation_send_status_from_server(
+ _EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!',
+ _EMPTY_FLAGS),
+ )
+ self._call.start_server_batch(
+ cygrpc.Operations(operations), _SERVER_COMPLETE_CALL_TAG)
+ self._completion_queue.poll()
+ self._completion_queue.poll()
+
+
+def _serve(state, server, server_completion_queue, thread_pool):
+ for _ in range(test_constants.RPC_CONCURRENCY):
+ call_completion_queue = cygrpc.CompletionQueue()
+ server.request_call(
+ call_completion_queue, server_completion_queue, _REQUEST_CALL_TAG)
+ rpc_event = server_completion_queue.poll()
+ thread_pool.submit(_Handler(state, call_completion_queue, rpc_event))
+ with state.condition:
+ state.handled_rpcs += 1
+ if test_constants.RPC_CONCURRENCY <= state.handled_rpcs:
+ state.condition.notify_all()
+ server_completion_queue.poll()
+
+
+class _QueueDriver(object):
+
+ def __init__(self, condition, completion_queue, due):
+ self._condition = condition
+ self._completion_queue = completion_queue
+ self._due = due
+ self._events = []
+ self._returned = False
+
+ def start(self):
+ def in_thread():
+ while True:
+ event = self._completion_queue.poll()
+ with self._condition:
+ self._events.append(event)
+ self._due.remove(event.tag)
+ self._condition.notify_all()
+ if not self._due:
+ self._returned = True
+ return
+ thread = threading.Thread(target=in_thread)
+ thread.start()
+
+ def events(self, at_least):
+ with self._condition:
+ while len(self._events) < at_least:
+ self._condition.wait()
+ return tuple(self._events)
+
+
+class CancelManyCallsTest(unittest.TestCase):
+
+ def testCancelManyCalls(self):
+ server_thread_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+
+ server_completion_queue = cygrpc.CompletionQueue()
+ server = cygrpc.Server()
+ server.register_completion_queue(server_completion_queue)
+ port = server.add_http2_port(b'[::]:0')
+ server.start()
+ channel = cygrpc.Channel('localhost:{}'.format(port).encode())
+
+ state = _State()
+
+ server_thread_args = (
+ state, server, server_completion_queue, server_thread_pool,)
+ server_thread = threading.Thread(target=_serve, args=server_thread_args)
+ server_thread.start()
+
+ client_condition = threading.Condition()
+ client_due = set()
+ client_completion_queue = cygrpc.CompletionQueue()
+ client_driver = _QueueDriver(
+ client_condition, client_completion_queue, client_due)
+ client_driver.start()
+
+ with client_condition:
+ client_calls = []
+ for index in range(test_constants.RPC_CONCURRENCY):
+ client_call = channel.create_call(
+ None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', None,
+ _INFINITE_FUTURE)
+ operations = (
+ cygrpc.operation_send_initial_metadata(
+ _EMPTY_METADATA, _EMPTY_FLAGS),
+ cygrpc.operation_send_message(b'\x45\x56', _EMPTY_FLAGS),
+ cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
+ cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
+ )
+ tag = 'client_complete_call_{0:04d}_tag'.format(index)
+ client_call.start_client_batch(cygrpc.Operations(operations), tag)
+ client_due.add(tag)
+ client_calls.append(client_call)
+
+ with state.condition:
+ while True:
+ if state.parked_handlers < test_constants.THREAD_CONCURRENCY:
+ state.condition.wait()
+ elif state.handled_rpcs < test_constants.RPC_CONCURRENCY:
+ state.condition.wait()
+ else:
+ state.handlers_released = True
+ state.condition.notify_all()
+ break
+
+ client_driver.events(
+ test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION)
+ with client_condition:
+ for client_call in client_calls:
+ client_call.cancel()
+
+ with state.condition:
+ server.shutdown(server_completion_queue, _SERVER_SHUTDOWN_TAG)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py
new file mode 100644
index 0000000000..f9c8a3ac62
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py
@@ -0,0 +1,82 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import time
+import threading
+import unittest
+
+from grpc._cython import cygrpc
+
+from tests.unit.framework.common import test_constants
+
+
+def _channel_and_completion_queue():
+ channel = cygrpc.Channel(b'localhost:54321', cygrpc.ChannelArgs(()))
+ completion_queue = cygrpc.CompletionQueue()
+ return channel, completion_queue
+
+
+def _connectivity_loop(channel, completion_queue):
+ for _ in range(100):
+ connectivity = channel.check_connectivity_state(True)
+ channel.watch_connectivity_state(
+ connectivity, cygrpc.Timespec(time.time() + 0.2), completion_queue,
+ None)
+ completion_queue.poll(deadline=cygrpc.Timespec(float('+inf')))
+
+
+def _create_loop_destroy():
+ channel, completion_queue = _channel_and_completion_queue()
+ _connectivity_loop(channel, completion_queue)
+ completion_queue.shutdown()
+
+
+def _in_parallel(behavior, arguments):
+ threads = tuple(
+ threading.Thread(target=behavior, args=arguments)
+ for _ in range(test_constants.THREAD_CONCURRENCY))
+ for thread in threads:
+ thread.start()
+ for thread in threads:
+ thread.join()
+
+
+class ChannelTest(unittest.TestCase):
+
+ def test_single_channel_lonely_connectivity(self):
+ channel, completion_queue = _channel_and_completion_queue()
+ _in_parallel(_connectivity_loop, (channel, completion_queue,))
+ completion_queue.shutdown()
+
+ def test_multiple_channels_lonely_connectivity(self):
+ _in_parallel(_create_loop_destroy, ())
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
new file mode 100644
index 0000000000..152d8edde3
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
@@ -0,0 +1,251 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test a corner-case at the level of the Cython API."""
+
+import threading
+import unittest
+
+from grpc._cython import cygrpc
+
+_INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
+_EMPTY_FLAGS = 0
+_EMPTY_METADATA = cygrpc.Metadata(())
+
+
+class _ServerDriver(object):
+
+ def __init__(self, completion_queue, shutdown_tag):
+ self._condition = threading.Condition()
+ self._completion_queue = completion_queue
+ self._shutdown_tag = shutdown_tag
+ self._events = []
+ self._saw_shutdown_tag = False
+
+ def start(self):
+ def in_thread():
+ while True:
+ event = self._completion_queue.poll()
+ with self._condition:
+ self._events.append(event)
+ self._condition.notify()
+ if event.tag is self._shutdown_tag:
+ self._saw_shutdown_tag = True
+ break
+ thread = threading.Thread(target=in_thread)
+ thread.start()
+
+ def done(self):
+ with self._condition:
+ return self._saw_shutdown_tag
+
+ def first_event(self):
+ with self._condition:
+ while not self._events:
+ self._condition.wait()
+ return self._events[0]
+
+ def events(self):
+ with self._condition:
+ while not self._saw_shutdown_tag:
+ self._condition.wait()
+ return tuple(self._events)
+
+
+class _QueueDriver(object):
+
+ def __init__(self, condition, completion_queue, due):
+ self._condition = condition
+ self._completion_queue = completion_queue
+ self._due = due
+ self._events = []
+ self._returned = False
+
+ def start(self):
+ def in_thread():
+ while True:
+ event = self._completion_queue.poll()
+ with self._condition:
+ self._events.append(event)
+ self._due.remove(event.tag)
+ self._condition.notify_all()
+ if not self._due:
+ self._returned = True
+ return
+ thread = threading.Thread(target=in_thread)
+ thread.start()
+
+ def done(self):
+ with self._condition:
+ return self._returned
+
+ def event_with_tag(self, tag):
+ with self._condition:
+ while True:
+ for event in self._events:
+ if event.tag is tag:
+ return event
+ self._condition.wait()
+
+ def events(self):
+ with self._condition:
+ while not self._returned:
+ self._condition.wait()
+ return tuple(self._events)
+
+
+class ReadSomeButNotAllResponsesTest(unittest.TestCase):
+
+ def testReadSomeButNotAllResponses(self):
+ server_completion_queue = cygrpc.CompletionQueue()
+ server = cygrpc.Server()
+ server.register_completion_queue(server_completion_queue)
+ port = server.add_http2_port(b'[::]:0')
+ server.start()
+ channel = cygrpc.Channel('localhost:{}'.format(port).encode())
+
+ server_shutdown_tag = 'server_shutdown_tag'
+ server_driver = _ServerDriver(server_completion_queue, server_shutdown_tag)
+ server_driver.start()
+
+ client_condition = threading.Condition()
+ client_due = set()
+ client_completion_queue = cygrpc.CompletionQueue()
+ client_driver = _QueueDriver(
+ client_condition, client_completion_queue, client_due)
+ client_driver.start()
+
+ server_call_condition = threading.Condition()
+ server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
+ server_send_first_message_tag = 'server_send_first_message_tag'
+ server_send_second_message_tag = 'server_send_second_message_tag'
+ server_complete_rpc_tag = 'server_complete_rpc_tag'
+ server_call_due = set((
+ server_send_initial_metadata_tag,
+ server_send_first_message_tag,
+ server_send_second_message_tag,
+ server_complete_rpc_tag,
+ ))
+ server_call_completion_queue = cygrpc.CompletionQueue()
+ server_call_driver = _QueueDriver(
+ server_call_condition, server_call_completion_queue, server_call_due)
+ server_call_driver.start()
+
+ server_rpc_tag = 'server_rpc_tag'
+ request_call_result = server.request_call(
+ server_call_completion_queue, server_completion_queue, server_rpc_tag)
+
+ client_call = channel.create_call(
+ None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', None,
+ _INFINITE_FUTURE)
+ client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
+ client_complete_rpc_tag = 'client_complete_rpc_tag'
+ with client_condition:
+ client_receive_initial_metadata_start_batch_result = (
+ client_call.start_client_batch(cygrpc.Operations([
+ cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ ]), client_receive_initial_metadata_tag))
+ client_due.add(client_receive_initial_metadata_tag)
+ client_complete_rpc_start_batch_result = (
+ client_call.start_client_batch(cygrpc.Operations([
+ cygrpc.operation_send_initial_metadata(
+ _EMPTY_METADATA, _EMPTY_FLAGS),
+ cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
+ ]), client_complete_rpc_tag))
+ client_due.add(client_complete_rpc_tag)
+
+ server_rpc_event = server_driver.first_event()
+
+ with server_call_condition:
+ server_send_initial_metadata_start_batch_result = (
+ server_rpc_event.operation_call.start_server_batch([
+ cygrpc.operation_send_initial_metadata(
+ _EMPTY_METADATA, _EMPTY_FLAGS),
+ ], server_send_initial_metadata_tag))
+ server_send_first_message_start_batch_result = (
+ server_rpc_event.operation_call.start_server_batch([
+ cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
+ ], server_send_first_message_tag))
+ server_send_initial_metadata_event = server_call_driver.event_with_tag(
+ server_send_initial_metadata_tag)
+ server_send_first_message_event = server_call_driver.event_with_tag(
+ server_send_first_message_tag)
+ with server_call_condition:
+ server_send_second_message_start_batch_result = (
+ server_rpc_event.operation_call.start_server_batch([
+ cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
+ ], server_send_second_message_tag))
+ server_complete_rpc_start_batch_result = (
+ server_rpc_event.operation_call.start_server_batch([
+ cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
+ cygrpc.operation_send_status_from_server(
+ cygrpc.Metadata(()), cygrpc.StatusCode.ok, b'test details',
+ _EMPTY_FLAGS),
+ ], server_complete_rpc_tag))
+ server_send_second_message_event = server_call_driver.event_with_tag(
+ server_send_second_message_tag)
+ server_complete_rpc_event = server_call_driver.event_with_tag(
+ server_complete_rpc_tag)
+ server_call_driver.events()
+
+ with client_condition:
+ client_receive_first_message_tag = 'client_receive_first_message_tag'
+ client_receive_first_message_start_batch_result = (
+ client_call.start_client_batch(cygrpc.Operations([
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ ]), client_receive_first_message_tag))
+ client_due.add(client_receive_first_message_tag)
+ client_receive_first_message_event = client_driver.event_with_tag(
+ client_receive_first_message_tag)
+
+ client_call_cancel_result = client_call.cancel()
+ client_driver.events()
+
+ server.shutdown(server_completion_queue, server_shutdown_tag)
+ server.cancel_all_calls()
+ server_driver.events()
+
+ self.assertEqual(cygrpc.CallError.ok, request_call_result)
+ self.assertEqual(
+ cygrpc.CallError.ok, server_send_initial_metadata_start_batch_result)
+ self.assertEqual(
+ cygrpc.CallError.ok, client_receive_initial_metadata_start_batch_result)
+ self.assertEqual(
+ cygrpc.CallError.ok, client_complete_rpc_start_batch_result)
+ self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result)
+ self.assertIs(server_rpc_tag, server_rpc_event.tag)
+ self.assertEqual(
+ cygrpc.CompletionType.operation_complete, server_rpc_event.type)
+ self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call)
+ self.assertEqual(0, len(server_rpc_event.batch_operations))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
new file mode 100644
index 0000000000..f9a8e2401b
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -0,0 +1,433 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import time
+import threading
+import unittest
+
+from grpc._cython import cygrpc
+from tests.unit._cython import test_utilities
+from tests.unit import test_common
+from tests.unit import resources
+
+
+_SSL_HOST_OVERRIDE = b'foo.test.google.fr'
+_CALL_CREDENTIALS_METADATA_KEY = 'call-creds-key'
+_CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value'
+_EMPTY_FLAGS = 0
+
+def _metadata_plugin_callback(context, callback):
+ callback(cygrpc.Metadata(
+ [cygrpc.Metadatum(_CALL_CREDENTIALS_METADATA_KEY,
+ _CALL_CREDENTIALS_METADATA_VALUE)]),
+ cygrpc.StatusCode.ok, b'')
+
+
+class TypeSmokeTest(unittest.TestCase):
+
+ def testStringsInUtilitiesUpDown(self):
+ self.assertEqual(0, cygrpc.StatusCode.ok)
+ metadatum = cygrpc.Metadatum(b'a', b'b')
+ self.assertEqual(b'a', metadatum.key)
+ self.assertEqual(b'b', metadatum.value)
+ metadata = cygrpc.Metadata([metadatum])
+ self.assertEqual(1, len(metadata))
+ self.assertEqual(metadatum.key, metadata[0].key)
+
+ def testMetadataIteration(self):
+ metadata = cygrpc.Metadata([
+ cygrpc.Metadatum(b'a', b'b'), cygrpc.Metadatum(b'c', b'd')])
+ iterator = iter(metadata)
+ metadatum = next(iterator)
+ self.assertIsInstance(metadatum, cygrpc.Metadatum)
+ self.assertEqual(metadatum.key, b'a')
+ self.assertEqual(metadatum.value, b'b')
+ metadatum = next(iterator)
+ self.assertIsInstance(metadatum, cygrpc.Metadatum)
+ self.assertEqual(metadatum.key, b'c')
+ self.assertEqual(metadatum.value, b'd')
+ with self.assertRaises(StopIteration):
+ next(iterator)
+
+ def testOperationsIteration(self):
+ operations = cygrpc.Operations([
+ cygrpc.operation_send_message(b'asdf', _EMPTY_FLAGS)])
+ iterator = iter(operations)
+ operation = next(iterator)
+ self.assertIsInstance(operation, cygrpc.Operation)
+ # `Operation`s are write-only structures; can't directly debug anything out
+ # of them. Just check that we stop iterating.
+ with self.assertRaises(StopIteration):
+ next(iterator)
+
+ def testOperationFlags(self):
+ operation = cygrpc.operation_send_message(b'asdf',
+ cygrpc.WriteFlag.no_compress)
+ self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags)
+
+ def testTimespec(self):
+ now = time.time()
+ timespec = cygrpc.Timespec(now)
+ self.assertAlmostEqual(now, float(timespec), places=8)
+
+ def testCompletionQueueUpDown(self):
+ completion_queue = cygrpc.CompletionQueue()
+ del completion_queue
+
+ def testServerUpDown(self):
+ server = cygrpc.Server(cygrpc.ChannelArgs([]))
+ del server
+
+ def testChannelUpDown(self):
+ channel = cygrpc.Channel(b'[::]:0', cygrpc.ChannelArgs([]))
+ del channel
+
+ def testCredentialsMetadataPluginUpDown(self):
+ plugin = cygrpc.CredentialsMetadataPlugin(
+ lambda ignored_a, ignored_b: None, b'')
+ del plugin
+
+ def testCallCredentialsFromPluginUpDown(self):
+ plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback, b'')
+ call_credentials = cygrpc.call_credentials_metadata_plugin(plugin)
+ del plugin
+ del call_credentials
+
+ def testServerStartNoExplicitShutdown(self):
+ server = cygrpc.Server()
+ completion_queue = cygrpc.CompletionQueue()
+ server.register_completion_queue(completion_queue)
+ port = server.add_http2_port(b'[::]:0')
+ self.assertIsInstance(port, int)
+ server.start()
+ del server
+
+ def testServerStartShutdown(self):
+ completion_queue = cygrpc.CompletionQueue()
+ server = cygrpc.Server()
+ server.add_http2_port(b'[::]:0')
+ server.register_completion_queue(completion_queue)
+ server.start()
+ shutdown_tag = object()
+ server.shutdown(completion_queue, shutdown_tag)
+ event = completion_queue.poll()
+ self.assertEqual(cygrpc.CompletionType.operation_complete, event.type)
+ self.assertIs(shutdown_tag, event.tag)
+ del server
+ del completion_queue
+
+
+class ServerClientMixin(object):
+
+ def setUpMixin(self, server_credentials, client_credentials, host_override):
+ self.server_completion_queue = cygrpc.CompletionQueue()
+ self.server = cygrpc.Server()
+ self.server.register_completion_queue(self.server_completion_queue)
+ if server_credentials:
+ self.port = self.server.add_http2_port(b'[::]:0', server_credentials)
+ else:
+ self.port = self.server.add_http2_port(b'[::]:0')
+ self.server.start()
+ self.client_completion_queue = cygrpc.CompletionQueue()
+ if client_credentials:
+ client_channel_arguments = cygrpc.ChannelArgs([
+ cygrpc.ChannelArg(cygrpc.ChannelArgKey.ssl_target_name_override,
+ host_override)])
+ self.client_channel = cygrpc.Channel(
+ 'localhost:{}'.format(self.port).encode(), client_channel_arguments,
+ client_credentials)
+ else:
+ self.client_channel = cygrpc.Channel('localhost:{}'.format(self.port).encode())
+ if host_override:
+ self.host_argument = None # default host
+ self.expected_host = host_override
+ else:
+ # arbitrary host name necessitating no further identification
+ self.host_argument = b'hostess'
+ self.expected_host = self.host_argument
+
+ def tearDownMixin(self):
+ del self.server
+ del self.client_completion_queue
+ del self.server_completion_queue
+
+ def _perform_operations(self, operations, call, queue, deadline, description):
+ """Perform the list of operations with given call, queue, and deadline.
+
+ Invocation errors are reported with as an exception with `description` in
+ the message. Performs the operations asynchronously, returning a future.
+ """
+ def performer():
+ tag = object()
+ try:
+ call_result = call.start_client_batch(
+ cygrpc.Operations(operations), tag)
+ self.assertEqual(cygrpc.CallError.ok, call_result)
+ event = queue.poll(deadline)
+ self.assertEqual(cygrpc.CompletionType.operation_complete, event.type)
+ self.assertTrue(event.success)
+ self.assertIs(tag, event.tag)
+ except Exception as error:
+ raise Exception("Error in '{}': {}".format(description, error.message))
+ return event
+ return test_utilities.SimpleFuture(performer)
+
+ def testEcho(self):
+ DEADLINE = time.time()+5
+ DEADLINE_TOLERANCE = 0.25
+ CLIENT_METADATA_ASCII_KEY = b'key'
+ CLIENT_METADATA_ASCII_VALUE = b'val'
+ CLIENT_METADATA_BIN_KEY = b'key-bin'
+ CLIENT_METADATA_BIN_VALUE = b'\0'*1000
+ SERVER_INITIAL_METADATA_KEY = b'init_me_me_me'
+ SERVER_INITIAL_METADATA_VALUE = b'whodawha?'
+ SERVER_TRAILING_METADATA_KEY = b'california_is_in_a_drought'
+ SERVER_TRAILING_METADATA_VALUE = b'zomg it is'
+ SERVER_STATUS_CODE = cygrpc.StatusCode.ok
+ SERVER_STATUS_DETAILS = b'our work is never over'
+ REQUEST = b'in death a member of project mayhem has a name'
+ RESPONSE = b'his name is robert paulson'
+ METHOD = b'twinkies'
+
+ cygrpc_deadline = cygrpc.Timespec(DEADLINE)
+
+ server_request_tag = object()
+ request_call_result = self.server.request_call(
+ self.server_completion_queue, self.server_completion_queue,
+ server_request_tag)
+
+ self.assertEqual(cygrpc.CallError.ok, request_call_result)
+
+ client_call_tag = object()
+ client_call = self.client_channel.create_call(
+ None, 0, self.client_completion_queue, METHOD, self.host_argument,
+ cygrpc_deadline)
+ client_initial_metadata = cygrpc.Metadata([
+ cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY,
+ CLIENT_METADATA_ASCII_VALUE),
+ cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)])
+ client_start_batch_result = client_call.start_client_batch([
+ cygrpc.operation_send_initial_metadata(client_initial_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS),
+ cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
+ cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
+ ], client_call_tag)
+ self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
+ client_event_future = test_utilities.CompletionQueuePollFuture(
+ self.client_completion_queue, cygrpc_deadline)
+
+ request_event = self.server_completion_queue.poll(cygrpc_deadline)
+ self.assertEqual(cygrpc.CompletionType.operation_complete,
+ request_event.type)
+ self.assertIsInstance(request_event.operation_call, cygrpc.Call)
+ self.assertIs(server_request_tag, request_event.tag)
+ self.assertEqual(0, len(request_event.batch_operations))
+ self.assertTrue(
+ test_common.metadata_transmitted(client_initial_metadata,
+ request_event.request_metadata))
+ self.assertEqual(METHOD, request_event.request_call_details.method)
+ self.assertEqual(self.expected_host,
+ request_event.request_call_details.host)
+ self.assertLess(
+ abs(DEADLINE - float(request_event.request_call_details.deadline)),
+ DEADLINE_TOLERANCE)
+
+ server_call_tag = object()
+ server_call = request_event.operation_call
+ server_initial_metadata = cygrpc.Metadata([
+ cygrpc.Metadatum(SERVER_INITIAL_METADATA_KEY,
+ SERVER_INITIAL_METADATA_VALUE)])
+ server_trailing_metadata = cygrpc.Metadata([
+ cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY,
+ SERVER_TRAILING_METADATA_VALUE)])
+ server_start_batch_result = server_call.start_server_batch([
+ cygrpc.operation_send_initial_metadata(server_initial_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS),
+ cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
+ cygrpc.operation_send_status_from_server(
+ server_trailing_metadata, SERVER_STATUS_CODE,
+ SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
+ ], server_call_tag)
+ self.assertEqual(cygrpc.CallError.ok, server_start_batch_result)
+
+ server_event = self.server_completion_queue.poll(cygrpc_deadline)
+ client_event = client_event_future.result()
+
+ self.assertEqual(6, len(client_event.batch_operations))
+ found_client_op_types = set()
+ for client_result in client_event.batch_operations:
+ # we expect each op type to be unique
+ self.assertNotIn(client_result.type, found_client_op_types)
+ found_client_op_types.add(client_result.type)
+ if client_result.type == cygrpc.OperationType.receive_initial_metadata:
+ self.assertTrue(
+ test_common.metadata_transmitted(server_initial_metadata,
+ client_result.received_metadata))
+ elif client_result.type == cygrpc.OperationType.receive_message:
+ self.assertEqual(RESPONSE, client_result.received_message.bytes())
+ elif client_result.type == cygrpc.OperationType.receive_status_on_client:
+ self.assertTrue(
+ test_common.metadata_transmitted(server_trailing_metadata,
+ client_result.received_metadata))
+ self.assertEqual(SERVER_STATUS_DETAILS,
+ client_result.received_status_details)
+ self.assertEqual(SERVER_STATUS_CODE, client_result.received_status_code)
+ self.assertEqual(set([
+ cygrpc.OperationType.send_initial_metadata,
+ cygrpc.OperationType.send_message,
+ cygrpc.OperationType.send_close_from_client,
+ cygrpc.OperationType.receive_initial_metadata,
+ cygrpc.OperationType.receive_message,
+ cygrpc.OperationType.receive_status_on_client
+ ]), found_client_op_types)
+
+ self.assertEqual(5, len(server_event.batch_operations))
+ found_server_op_types = set()
+ for server_result in server_event.batch_operations:
+ self.assertNotIn(client_result.type, found_server_op_types)
+ found_server_op_types.add(server_result.type)
+ if server_result.type == cygrpc.OperationType.receive_message:
+ self.assertEqual(REQUEST, server_result.received_message.bytes())
+ elif server_result.type == cygrpc.OperationType.receive_close_on_server:
+ self.assertFalse(server_result.received_cancelled)
+ self.assertEqual(set([
+ cygrpc.OperationType.send_initial_metadata,
+ cygrpc.OperationType.receive_message,
+ cygrpc.OperationType.send_message,
+ cygrpc.OperationType.receive_close_on_server,
+ cygrpc.OperationType.send_status_from_server
+ ]), found_server_op_types)
+
+ del client_call
+ del server_call
+
+ def test6522(self):
+ DEADLINE = time.time()+5
+ DEADLINE_TOLERANCE = 0.25
+ METHOD = b'twinkies'
+
+ cygrpc_deadline = cygrpc.Timespec(DEADLINE)
+ empty_metadata = cygrpc.Metadata([])
+
+ server_request_tag = object()
+ self.server.request_call(
+ self.server_completion_queue, self.server_completion_queue,
+ server_request_tag)
+ client_call = self.client_channel.create_call(
+ None, 0, self.client_completion_queue, METHOD, self.host_argument,
+ cygrpc_deadline)
+
+ # Prologue
+ def perform_client_operations(operations, description):
+ return self._perform_operations(
+ operations, client_call,
+ self.client_completion_queue, cygrpc_deadline, description)
+
+ client_event_future = perform_client_operations([
+ cygrpc.operation_send_initial_metadata(empty_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ ], "Client prologue")
+
+ request_event = self.server_completion_queue.poll(cygrpc_deadline)
+ server_call = request_event.operation_call
+
+ def perform_server_operations(operations, description):
+ return self._perform_operations(
+ operations, server_call,
+ self.server_completion_queue, cygrpc_deadline, description)
+
+ server_event_future = perform_server_operations([
+ cygrpc.operation_send_initial_metadata(empty_metadata,
+ _EMPTY_FLAGS),
+ ], "Server prologue")
+
+ client_event_future.result() # force completion
+ server_event_future.result()
+
+ # Messaging
+ for _ in range(10):
+ client_event_future = perform_client_operations([
+ cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ ], "Client message")
+ server_event_future = perform_server_operations([
+ cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ ], "Server receive")
+
+ client_event_future.result() # force completion
+ server_event_future.result()
+
+ # Epilogue
+ client_event_future = perform_client_operations([
+ cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
+ ], "Client epilogue")
+
+ server_event_future = perform_server_operations([
+ cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
+ cygrpc.operation_send_status_from_server(
+ empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS)
+ ], "Server epilogue")
+
+ client_event_future.result() # force completion
+ server_event_future.result()
+
+
+class InsecureServerInsecureClient(unittest.TestCase, ServerClientMixin):
+
+ def setUp(self):
+ self.setUpMixin(None, None, None)
+
+ def tearDown(self):
+ self.tearDownMixin()
+
+
+class SecureServerSecureClient(unittest.TestCase, ServerClientMixin):
+
+ def setUp(self):
+ server_credentials = cygrpc.server_credentials_ssl(
+ None, [cygrpc.SslPemKeyCertPair(resources.private_key(),
+ resources.certificate_chain())], False)
+ client_credentials = cygrpc.channel_credentials_ssl(
+ resources.test_root_certificates(), None)
+ self.setUpMixin(server_credentials, client_credentials, _SSL_HOST_OVERRIDE)
+
+ def tearDown(self):
+ self.tearDownMixin()
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py
new file mode 100644
index 0000000000..6280ce74c4
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py
@@ -0,0 +1,66 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import threading
+
+from grpc._cython import cygrpc
+
+
+class SimpleFuture(object):
+ """A simple future mechanism."""
+
+ def __init__(self, function, *args, **kwargs):
+ def wrapped_function():
+ try:
+ self._result = function(*args, **kwargs)
+ except Exception as error:
+ self._error = error
+ self._result = None
+ self._error = None
+ self._thread = threading.Thread(target=wrapped_function)
+ self._thread.start()
+
+ def result(self):
+ """The resulting value of this future.
+
+ Re-raises any exceptions.
+ """
+ self._thread.join()
+ if self._error:
+ # TODO(atash): re-raise exceptions in a way that preserves tracebacks
+ raise self._error
+ return self._result
+
+
+class CompletionQueuePollFuture(SimpleFuture):
+
+ def __init__(self, completion_queue, deadline):
+ super(CompletionQueuePollFuture, self).__init__(
+ lambda: completion_queue.poll(deadline))
+