aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
Diffstat (limited to 'src/python')
-rw-r--r--src/python/grpcio_tests/tests/tests.json2
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_common.py118
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py131
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py126
4 files changed, 377 insertions, 0 deletions
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index d61297b918..4c078e6c22 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -26,6 +26,8 @@
"unit._credentials_test.CredentialsTest",
"unit._cython._cancel_many_calls_test.CancelManyCallsTest",
"unit._cython._channel_test.ChannelTest",
+ "unit._cython._no_messages_server_completion_queue_per_call_test.Test",
+ "unit._cython._no_messages_single_server_completion_queue_test.Test",
"unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest",
"unit._cython.cygrpc_test.InsecureServerInsecureClient",
"unit._cython.cygrpc_test.SecureServerSecureClient",
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py
new file mode 100644
index 0000000000..ac66d1db3d
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py
@@ -0,0 +1,118 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Common utilities for tests of the Cython layer of gRPC Python."""
+
+import collections
+import threading
+
+from grpc._cython import cygrpc
+
+RPC_COUNT = 4000
+
+INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
+EMPTY_FLAGS = 0
+
+INVOCATION_METADATA = cygrpc.Metadata(
+ (cygrpc.Metadatum(b'client-md-key', b'client-md-key'),
+ cygrpc.Metadatum(b'client-md-key-bin', b'\x00\x01' * 3000),))
+
+INITIAL_METADATA = cygrpc.Metadata(
+ (cygrpc.Metadatum(b'server-initial-md-key', b'server-initial-md-value'),
+ cygrpc.Metadatum(b'server-initial-md-key-bin', b'\x00\x02' * 3000),))
+
+TRAILING_METADATA = cygrpc.Metadata(
+ (cygrpc.Metadatum(b'server-trailing-md-key', b'server-trailing-md-value'),
+ cygrpc.Metadatum(b'server-trailing-md-key-bin', b'\x00\x03' * 3000),))
+
+
+class QueueDriver(object):
+
+ def __init__(self, condition, completion_queue):
+ self._condition = condition
+ self._completion_queue = completion_queue
+ self._due = collections.defaultdict(int)
+ self._events = collections.defaultdict(list)
+
+ def add_due(self, tags):
+ if not self._due:
+
+ def in_thread():
+ while True:
+ event = self._completion_queue.poll()
+ with self._condition:
+ self._events[event.tag].append(event)
+ self._due[event.tag] -= 1
+ self._condition.notify_all()
+ if self._due[event.tag] <= 0:
+ self._due.pop(event.tag)
+ if not self._due:
+ return
+
+ thread = threading.Thread(target=in_thread)
+ thread.start()
+ for tag in tags:
+ self._due[tag] += 1
+
+ def event_with_tag(self, tag):
+ with self._condition:
+ while True:
+ if self._events[tag]:
+ return self._events[tag].pop(0)
+ else:
+ self._condition.wait()
+
+
+def execute_many_times(behavior):
+ return tuple(behavior() for _ in range(RPC_COUNT))
+
+
+class OperationResult(
+ collections.namedtuple('OperationResult', (
+ 'start_batch_result', 'completion_type', 'success',))):
+ pass
+
+
+SUCCESSFUL_OPERATION_RESULT = OperationResult(
+ cygrpc.CallError.ok, cygrpc.CompletionType.operation_complete, True)
+
+
+class RpcTest(object):
+
+ def setUp(self):
+ self.server_completion_queue = cygrpc.CompletionQueue()
+ self.server = cygrpc.Server(cygrpc.ChannelArgs([]))
+ self.server.register_completion_queue(self.server_completion_queue)
+ port = self.server.add_http2_port(b'[::]:0')
+ self.server.start()
+ self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(),
+ cygrpc.ChannelArgs([]))
+
+ self._server_shutdown_tag = 'server_shutdown_tag'
+ self.server_condition = threading.Condition()
+ self.server_driver = QueueDriver(self.server_condition,
+ self.server_completion_queue)
+ with self.server_condition:
+ self.server_driver.add_due({
+ self._server_shutdown_tag,
+ })
+
+ self.client_condition = threading.Condition()
+ self.client_completion_queue = cygrpc.CompletionQueue()
+ self.client_driver = QueueDriver(self.client_condition,
+ self.client_completion_queue)
+
+ def tearDown(self):
+ self.server.shutdown(self.server_completion_queue,
+ self._server_shutdown_tag)
+ self.server.cancel_all_calls()
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
new file mode 100644
index 0000000000..14cc66675c
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
@@ -0,0 +1,131 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Test a corner-case at the level of the Cython API."""
+
+import threading
+import unittest
+
+from grpc._cython import cygrpc
+
+from tests.unit._cython import _common
+
+
+class Test(_common.RpcTest, unittest.TestCase):
+
+ def _do_rpcs(self):
+ server_call_condition = threading.Condition()
+ server_call_completion_queue = cygrpc.CompletionQueue()
+ server_call_driver = _common.QueueDriver(server_call_condition,
+ server_call_completion_queue)
+
+ server_request_call_tag = 'server_request_call_tag'
+ server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
+ server_complete_rpc_tag = 'server_complete_rpc_tag'
+
+ with self.server_condition:
+ server_request_call_start_batch_result = self.server.request_call(
+ server_call_completion_queue, self.server_completion_queue,
+ server_request_call_tag)
+ self.server_driver.add_due({
+ server_request_call_tag,
+ })
+
+ client_call = self.channel.create_call(
+ None, _common.EMPTY_FLAGS, self.client_completion_queue,
+ b'/twinkies', None, _common.INFINITE_FUTURE)
+ client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
+ client_complete_rpc_tag = 'client_complete_rpc_tag'
+ with self.client_condition:
+ client_receive_initial_metadata_start_batch_result = (
+ client_call.start_client_batch(
+ cygrpc.Operations([
+ cygrpc.operation_receive_initial_metadata(
+ _common.EMPTY_FLAGS),
+ ]), client_receive_initial_metadata_tag))
+ client_complete_rpc_start_batch_result = client_call.start_client_batch(
+ cygrpc.Operations([
+ cygrpc.operation_send_initial_metadata(
+ _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
+ cygrpc.operation_send_close_from_client(
+ _common.EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(
+ _common.EMPTY_FLAGS),
+ ]), client_complete_rpc_tag)
+ self.client_driver.add_due({
+ client_receive_initial_metadata_tag,
+ client_complete_rpc_tag,
+ })
+
+ server_request_call_event = self.server_driver.event_with_tag(
+ server_request_call_tag)
+
+ with server_call_condition:
+ server_send_initial_metadata_start_batch_result = (
+ server_request_call_event.operation_call.start_server_batch([
+ cygrpc.operation_send_initial_metadata(
+ _common.INITIAL_METADATA, _common.EMPTY_FLAGS),
+ ], server_send_initial_metadata_tag))
+ server_call_driver.add_due({
+ server_send_initial_metadata_tag,
+ })
+ server_send_initial_metadata_event = server_call_driver.event_with_tag(
+ server_send_initial_metadata_tag)
+
+ with server_call_condition:
+ server_complete_rpc_start_batch_result = (
+ server_request_call_event.operation_call.start_server_batch([
+ cygrpc.operation_receive_close_on_server(
+ _common.EMPTY_FLAGS),
+ cygrpc.operation_send_status_from_server(
+ _common.TRAILING_METADATA, cygrpc.StatusCode.ok,
+ b'test details', _common.EMPTY_FLAGS),
+ ], server_complete_rpc_tag))
+ server_call_driver.add_due({
+ server_complete_rpc_tag,
+ })
+ server_complete_rpc_event = server_call_driver.event_with_tag(
+ server_complete_rpc_tag)
+
+ client_receive_initial_metadata_event = self.client_driver.event_with_tag(
+ client_receive_initial_metadata_tag)
+ client_complete_rpc_event = self.client_driver.event_with_tag(
+ client_complete_rpc_tag)
+
+ return (_common.OperationResult(server_request_call_start_batch_result,
+ server_request_call_event.type,
+ server_request_call_event.success),
+ _common.OperationResult(
+ client_receive_initial_metadata_start_batch_result,
+ client_receive_initial_metadata_event.type,
+ client_receive_initial_metadata_event.success),
+ _common.OperationResult(client_complete_rpc_start_batch_result,
+ client_complete_rpc_event.type,
+ client_complete_rpc_event.success),
+ _common.OperationResult(
+ server_send_initial_metadata_start_batch_result,
+ server_send_initial_metadata_event.type,
+ server_send_initial_metadata_event.success),
+ _common.OperationResult(server_complete_rpc_start_batch_result,
+ server_complete_rpc_event.type,
+ server_complete_rpc_event.success),)
+
+ def test_rpcs(self):
+ expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
+ 5] * _common.RPC_COUNT
+ actuallys = _common.execute_many_times(self._do_rpcs)
+ self.assertSequenceEqual(expecteds, actuallys)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
new file mode 100644
index 0000000000..1e44bcc4dc
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
@@ -0,0 +1,126 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Test a corner-case at the level of the Cython API."""
+
+import threading
+import unittest
+
+from grpc._cython import cygrpc
+
+from tests.unit._cython import _common
+
+
+class Test(_common.RpcTest, unittest.TestCase):
+
+ def _do_rpcs(self):
+ server_request_call_tag = 'server_request_call_tag'
+ server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
+ server_complete_rpc_tag = 'server_complete_rpc_tag'
+
+ with self.server_condition:
+ server_request_call_start_batch_result = self.server.request_call(
+ self.server_completion_queue, self.server_completion_queue,
+ server_request_call_tag)
+ self.server_driver.add_due({
+ server_request_call_tag,
+ })
+
+ client_call = self.channel.create_call(
+ None, _common.EMPTY_FLAGS, self.client_completion_queue,
+ b'/twinkies', None, _common.INFINITE_FUTURE)
+ client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
+ client_complete_rpc_tag = 'client_complete_rpc_tag'
+ with self.client_condition:
+ client_receive_initial_metadata_start_batch_result = (
+ client_call.start_client_batch(
+ cygrpc.Operations([
+ cygrpc.operation_receive_initial_metadata(
+ _common.EMPTY_FLAGS),
+ ]), client_receive_initial_metadata_tag))
+ client_complete_rpc_start_batch_result = client_call.start_client_batch(
+ cygrpc.Operations([
+ cygrpc.operation_send_initial_metadata(
+ _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
+ cygrpc.operation_send_close_from_client(
+ _common.EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(
+ _common.EMPTY_FLAGS),
+ ]), client_complete_rpc_tag)
+ self.client_driver.add_due({
+ client_receive_initial_metadata_tag,
+ client_complete_rpc_tag,
+ })
+
+ server_request_call_event = self.server_driver.event_with_tag(
+ server_request_call_tag)
+
+ with self.server_condition:
+ server_send_initial_metadata_start_batch_result = (
+ server_request_call_event.operation_call.start_server_batch([
+ cygrpc.operation_send_initial_metadata(
+ _common.INITIAL_METADATA, _common.EMPTY_FLAGS),
+ ], server_send_initial_metadata_tag))
+ self.server_driver.add_due({
+ server_send_initial_metadata_tag,
+ })
+ server_send_initial_metadata_event = self.server_driver.event_with_tag(
+ server_send_initial_metadata_tag)
+
+ with self.server_condition:
+ server_complete_rpc_start_batch_result = (
+ server_request_call_event.operation_call.start_server_batch([
+ cygrpc.operation_receive_close_on_server(
+ _common.EMPTY_FLAGS),
+ cygrpc.operation_send_status_from_server(
+ _common.TRAILING_METADATA, cygrpc.StatusCode.ok,
+ b'test details', _common.EMPTY_FLAGS),
+ ], server_complete_rpc_tag))
+ self.server_driver.add_due({
+ server_complete_rpc_tag,
+ })
+ server_complete_rpc_event = self.server_driver.event_with_tag(
+ server_complete_rpc_tag)
+
+ client_receive_initial_metadata_event = self.client_driver.event_with_tag(
+ client_receive_initial_metadata_tag)
+ client_complete_rpc_event = self.client_driver.event_with_tag(
+ client_complete_rpc_tag)
+
+ return (_common.OperationResult(server_request_call_start_batch_result,
+ server_request_call_event.type,
+ server_request_call_event.success),
+ _common.OperationResult(
+ client_receive_initial_metadata_start_batch_result,
+ client_receive_initial_metadata_event.type,
+ client_receive_initial_metadata_event.success),
+ _common.OperationResult(client_complete_rpc_start_batch_result,
+ client_complete_rpc_event.type,
+ client_complete_rpc_event.success),
+ _common.OperationResult(
+ server_send_initial_metadata_start_batch_result,
+ server_send_initial_metadata_event.type,
+ server_send_initial_metadata_event.success),
+ _common.OperationResult(server_complete_rpc_start_batch_result,
+ server_complete_rpc_event.type,
+ server_complete_rpc_event.success),)
+
+ def test_rpcs(self):
+ expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
+ 5] * _common.RPC_COUNT
+ actuallys = _common.execute_many_times(self._do_rpcs)
+ self.assertSequenceEqual(expecteds, actuallys)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)