aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_test/grpc_test/_adapter
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-08-01 16:21:18 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-08-01 16:21:18 -0700
commit42ce225d50534f2bd6e24c47963896f096e1cb5f (patch)
tree62e1611cd5f6d50e7b2924f0bd88ece373870e1a /src/python/grpcio_test/grpc_test/_adapter
parent85c2a08ca6e008589f660113de5d8d3b7df74b85 (diff)
parent7716c53a217292f1982d986e4681c1e2b25f4367 (diff)
Merge github.com:grpc/grpc into plucking-hell
Conflicts: Makefile
Diffstat (limited to 'src/python/grpcio_test/grpc_test/_adapter')
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/.gitignore5
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/__init__.py30
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_blocking_invocation_inline_service_test.py46
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_c_test.py55
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_event_invocation_synchronous_event_service_test.py46
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_face_test_case.py106
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_future_invocation_asynchronous_event_service_test.py46
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py434
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_links_test.py277
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_lonely_rear_link_test.py100
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_low_test.py199
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_proto_scenarios.py261
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_test_links.py80
13 files changed, 1685 insertions, 0 deletions
diff --git a/src/python/grpcio_test/grpc_test/_adapter/.gitignore b/src/python/grpcio_test/grpc_test/_adapter/.gitignore
new file mode 100644
index 0000000000..a6f96cd6db
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/.gitignore
@@ -0,0 +1,5 @@
+*.a
+*.so
+*.dll
+*.pyc
+*.pyd
diff --git a/src/python/grpcio_test/grpc_test/_adapter/__init__.py b/src/python/grpcio_test/grpc_test/_adapter/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_blocking_invocation_inline_service_test.py b/src/python/grpcio_test/grpc_test/_adapter/_blocking_invocation_inline_service_test.py
new file mode 100644
index 0000000000..a1f776211c
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/_blocking_invocation_inline_service_test.py
@@ -0,0 +1,46 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""One of the tests of the Face layer of RPC Framework."""
+
+import unittest
+
+from grpc_test._adapter import _face_test_case
+from grpc_test.framework.face.testing import blocking_invocation_inline_service_test_case as test_case
+
+
+class BlockingInvocationInlineServiceTest(
+ _face_test_case.FaceTestCase,
+ test_case.BlockingInvocationInlineServiceTestCase,
+ unittest.TestCase):
+ pass
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_c_test.py b/src/python/grpcio_test/grpc_test/_adapter/_c_test.py
new file mode 100644
index 0000000000..fe020e2a9c
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/_c_test.py
@@ -0,0 +1,55 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import time
+import unittest
+
+from grpc._adapter import _c
+from grpc._adapter import _types
+
+
+class CTypeSmokeTest(unittest.TestCase):
+
+ def testCompletionQueueUpDown(self):
+ completion_queue = _c.CompletionQueue()
+ del completion_queue
+
+ def testServerUpDown(self):
+ completion_queue = _c.CompletionQueue()
+ serv = _c.Server(completion_queue, [])
+ del serv
+ del completion_queue
+
+ def testChannelUpDown(self):
+ channel = _c.Channel('[::]:0', [])
+ del channel
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_event_invocation_synchronous_event_service_test.py b/src/python/grpcio_test/grpc_test/_adapter/_event_invocation_synchronous_event_service_test.py
new file mode 100644
index 0000000000..0d01ebc8dc
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/_event_invocation_synchronous_event_service_test.py
@@ -0,0 +1,46 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""One of the tests of the Face layer of RPC Framework."""
+
+import unittest
+
+from grpc_test._adapter import _face_test_case
+from grpc_test.framework.face.testing import event_invocation_synchronous_event_service_test_case as test_case
+
+
+class EventInvocationSynchronousEventServiceTest(
+ _face_test_case.FaceTestCase,
+ test_case.EventInvocationSynchronousEventServiceTestCase,
+ unittest.TestCase):
+ pass
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_face_test_case.py b/src/python/grpcio_test/grpc_test/_adapter/_face_test_case.py
new file mode 100644
index 0000000000..dfbd0b60af
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/_face_test_case.py
@@ -0,0 +1,106 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Common construction and destruction for GRPC-backed Face-layer tests."""
+
+import unittest
+
+from grpc._adapter import fore
+from grpc._adapter import rear
+from grpc.framework.base import util
+from grpc.framework.base import implementations as base_implementations
+from grpc.framework.face import implementations as face_implementations
+from grpc.framework.foundation import logging_pool
+from grpc_test.framework.face.testing import coverage
+from grpc_test.framework.face.testing import serial
+from grpc_test.framework.face.testing import test_case
+
+_TIMEOUT = 3
+_MAXIMUM_TIMEOUT = 90
+_MAXIMUM_POOL_SIZE = 4
+
+
+class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage):
+ """Provides abstract Face-layer tests a GRPC-backed implementation."""
+
+ def set_up_implementation(
+ self, name, methods, method_implementations,
+ multi_method_implementation):
+ pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
+
+ servicer = face_implementations.servicer(
+ pool, method_implementations, multi_method_implementation)
+
+ serialization = serial.serialization(methods)
+
+ fore_link = fore.ForeLink(
+ pool, serialization.request_deserializers,
+ serialization.response_serializers, None, ())
+ fore_link.start()
+ port = fore_link.port()
+ rear_link = rear.RearLink(
+ 'localhost', port, pool,
+ serialization.request_serializers,
+ serialization.response_deserializers, False, None, None, None)
+ rear_link.start()
+ front = base_implementations.front_link(pool, pool, pool)
+ back = base_implementations.back_link(
+ servicer, pool, pool, pool, _TIMEOUT, _MAXIMUM_TIMEOUT)
+ fore_link.join_rear_link(back)
+ back.join_fore_link(fore_link)
+ rear_link.join_fore_link(front)
+ front.join_rear_link(rear_link)
+
+ stub = face_implementations.generic_stub(front, pool)
+ return stub, (rear_link, fore_link, front, back)
+
+ def tear_down_implementation(self, memo):
+ rear_link, fore_link, front, back = memo
+ # TODO(nathaniel): Waiting for the front and back to idle possibly should
+ # not be necessary - investigate as part of graceful shutdown work.
+ util.wait_for_idle(front)
+ util.wait_for_idle(back)
+ rear_link.stop()
+ fore_link.stop()
+
+ @unittest.skip('Service-side failure not transmitted by GRPC.')
+ def testFailedUnaryRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Service-side failure not transmitted by GRPC.')
+ def testFailedUnaryRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Service-side failure not transmitted by GRPC.')
+ def testFailedStreamRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Service-side failure not transmitted by GRPC.')
+ def testFailedStreamRequestStreamResponse(self):
+ raise NotImplementedError()
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_future_invocation_asynchronous_event_service_test.py b/src/python/grpcio_test/grpc_test/_adapter/_future_invocation_asynchronous_event_service_test.py
new file mode 100644
index 0000000000..ea4a6a0bae
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/_future_invocation_asynchronous_event_service_test.py
@@ -0,0 +1,46 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""One of the tests of the Face layer of RPC Framework."""
+
+import unittest
+
+from grpc_test._adapter import _face_test_case
+from grpc_test.framework.face.testing import future_invocation_asynchronous_event_service_test_case as test_case
+
+
+class FutureInvocationAsynchronousEventServiceTest(
+ _face_test_case.FaceTestCase,
+ test_case.FutureInvocationAsynchronousEventServiceTestCase,
+ unittest.TestCase):
+ pass
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py
new file mode 100644
index 0000000000..27a5b82e9c
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py
@@ -0,0 +1,434 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests for the old '_low'."""
+
+import Queue
+import threading
+import time
+import unittest
+
+from grpc._adapter import _intermediary_low as _low
+
+_STREAM_LENGTH = 300
+_TIMEOUT = 5
+_AFTER_DELAY = 2
+_FUTURE = time.time() + 60 * 60 * 24
+_BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200
+_BYTE_SEQUENCE_SEQUENCE = tuple(
+ bytes(bytearray((row + column) % 256 for column in range(row)))
+ for row in range(_STREAM_LENGTH))
+
+
+class LonelyClientTest(unittest.TestCase):
+
+ def testLonelyClient(self):
+ host = 'nosuchhostexists'
+ port = 54321
+ method = 'test method'
+ deadline = time.time() + _TIMEOUT
+ after_deadline = deadline + _AFTER_DELAY
+ metadata_tag = object()
+ finish_tag = object()
+
+ completion_queue = _low.CompletionQueue()
+ channel = _low.Channel('%s:%d' % (host, port), None)
+ client_call = _low.Call(channel, completion_queue, method, host, deadline)
+
+ client_call.invoke(completion_queue, metadata_tag, finish_tag)
+ first_event = completion_queue.get(after_deadline)
+ self.assertIsNotNone(first_event)
+ second_event = completion_queue.get(after_deadline)
+ self.assertIsNotNone(second_event)
+ kinds = [event.kind for event in (first_event, second_event)]
+ self.assertItemsEqual(
+ (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH),
+ kinds)
+
+ self.assertIsNone(completion_queue.get(after_deadline))
+
+ completion_queue.stop()
+ stop_event = completion_queue.get(_FUTURE)
+ self.assertEqual(_low.Event.Kind.STOP, stop_event.kind)
+
+ del client_call
+ del channel
+ del completion_queue
+
+
+def _drive_completion_queue(completion_queue, event_queue):
+ while True:
+ event = completion_queue.get(_FUTURE)
+ if event.kind is _low.Event.Kind.STOP:
+ break
+ event_queue.put(event)
+
+
+class EchoTest(unittest.TestCase):
+
+ def setUp(self):
+ self.host = 'localhost'
+
+ self.server_completion_queue = _low.CompletionQueue()
+ self.server = _low.Server(self.server_completion_queue)
+ port = self.server.add_http2_addr('[::]:0')
+ self.server.start()
+ self.server_events = Queue.Queue()
+ self.server_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.server_completion_queue, self.server_events))
+ self.server_completion_queue_thread.start()
+
+ self.client_completion_queue = _low.CompletionQueue()
+ self.channel = _low.Channel('%s:%d' % (self.host, port), None)
+ self.client_events = Queue.Queue()
+ self.client_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.client_completion_queue, self.client_events))
+ self.client_completion_queue_thread.start()
+
+ def tearDown(self):
+ self.server.stop()
+ self.server_completion_queue.stop()
+ self.client_completion_queue.stop()
+ self.server_completion_queue_thread.join()
+ self.client_completion_queue_thread.join()
+ del self.server
+
+ def _perform_echo_test(self, test_data):
+ method = 'test method'
+ details = 'test details'
+ server_leading_metadata_key = 'my_server_leading_key'
+ server_leading_metadata_value = 'my_server_leading_value'
+ server_trailing_metadata_key = 'my_server_trailing_key'
+ server_trailing_metadata_value = 'my_server_trailing_value'
+ client_metadata_key = 'my_client_key'
+ client_metadata_value = 'my_client_value'
+ server_leading_binary_metadata_key = 'my_server_leading_key-bin'
+ server_leading_binary_metadata_value = b'\0'*2047
+ server_trailing_binary_metadata_key = 'my_server_trailing_key-bin'
+ server_trailing_binary_metadata_value = b'\0'*2047
+ client_binary_metadata_key = 'my_client_key-bin'
+ client_binary_metadata_value = b'\0'*2047
+ deadline = _FUTURE
+ metadata_tag = object()
+ finish_tag = object()
+ write_tag = object()
+ complete_tag = object()
+ service_tag = object()
+ read_tag = object()
+ status_tag = object()
+
+ server_data = []
+ client_data = []
+
+ client_call = _low.Call(self.channel, self.client_completion_queue,
+ method, self.host, deadline)
+ client_call.add_metadata(client_metadata_key, client_metadata_value)
+ client_call.add_metadata(client_binary_metadata_key,
+ client_binary_metadata_value)
+
+ client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
+
+ self.server.service(service_tag)
+ service_accepted = self.server_events.get()
+ self.assertIsNotNone(service_accepted)
+ self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
+ self.assertIs(service_accepted.tag, service_tag)
+ self.assertEqual(method, service_accepted.service_acceptance.method)
+ self.assertEqual(self.host, service_accepted.service_acceptance.host)
+ self.assertIsNotNone(service_accepted.service_acceptance.call)
+ metadata = dict(service_accepted.metadata)
+ self.assertIn(client_metadata_key, metadata)
+ self.assertEqual(client_metadata_value, metadata[client_metadata_key])
+ self.assertIn(client_binary_metadata_key, metadata)
+ self.assertEqual(client_binary_metadata_value,
+ metadata[client_binary_metadata_key])
+ server_call = service_accepted.service_acceptance.call
+ server_call.accept(self.server_completion_queue, finish_tag)
+ server_call.add_metadata(server_leading_metadata_key,
+ server_leading_metadata_value)
+ server_call.add_metadata(server_leading_binary_metadata_key,
+ server_leading_binary_metadata_value)
+ server_call.premetadata()
+
+ metadata_accepted = self.client_events.get()
+ self.assertIsNotNone(metadata_accepted)
+ self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
+ self.assertEqual(metadata_tag, metadata_accepted.tag)
+ metadata = dict(metadata_accepted.metadata)
+ self.assertIn(server_leading_metadata_key, metadata)
+ self.assertEqual(server_leading_metadata_value,
+ metadata[server_leading_metadata_key])
+ self.assertIn(server_leading_binary_metadata_key, metadata)
+ self.assertEqual(server_leading_binary_metadata_value,
+ metadata[server_leading_binary_metadata_key])
+
+ for datum in test_data:
+ client_call.write(datum, write_tag)
+ write_accepted = self.client_events.get()
+ self.assertIsNotNone(write_accepted)
+ self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
+ self.assertIs(write_accepted.tag, write_tag)
+ self.assertIs(write_accepted.write_accepted, True)
+
+ server_call.read(read_tag)
+ read_accepted = self.server_events.get()
+ self.assertIsNotNone(read_accepted)
+ self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
+ self.assertEqual(read_tag, read_accepted.tag)
+ self.assertIsNotNone(read_accepted.bytes)
+ server_data.append(read_accepted.bytes)
+
+ server_call.write(read_accepted.bytes, write_tag)
+ write_accepted = self.server_events.get()
+ self.assertIsNotNone(write_accepted)
+ self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
+ self.assertEqual(write_tag, write_accepted.tag)
+ self.assertTrue(write_accepted.write_accepted)
+
+ client_call.read(read_tag)
+ read_accepted = self.client_events.get()
+ self.assertIsNotNone(read_accepted)
+ self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
+ self.assertEqual(read_tag, read_accepted.tag)
+ self.assertIsNotNone(read_accepted.bytes)
+ client_data.append(read_accepted.bytes)
+
+ client_call.complete(complete_tag)
+ complete_accepted = self.client_events.get()
+ self.assertIsNotNone(complete_accepted)
+ self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED)
+ self.assertIs(complete_accepted.tag, complete_tag)
+ self.assertIs(complete_accepted.complete_accepted, True)
+
+ server_call.read(read_tag)
+ read_accepted = self.server_events.get()
+ self.assertIsNotNone(read_accepted)
+ self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
+ self.assertEqual(read_tag, read_accepted.tag)
+ self.assertIsNone(read_accepted.bytes)
+
+ server_call.add_metadata(server_trailing_metadata_key,
+ server_trailing_metadata_value)
+ server_call.add_metadata(server_trailing_binary_metadata_key,
+ server_trailing_binary_metadata_value)
+
+ server_call.status(_low.Status(_low.Code.OK, details), status_tag)
+ server_terminal_event_one = self.server_events.get()
+ server_terminal_event_two = self.server_events.get()
+ if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED:
+ status_accepted = server_terminal_event_one
+ rpc_accepted = server_terminal_event_two
+ else:
+ status_accepted = server_terminal_event_two
+ rpc_accepted = server_terminal_event_one
+ self.assertIsNotNone(status_accepted)
+ self.assertIsNotNone(rpc_accepted)
+ self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind)
+ self.assertEqual(status_tag, status_accepted.tag)
+ self.assertTrue(status_accepted.complete_accepted)
+ self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
+ self.assertEqual(finish_tag, rpc_accepted.tag)
+ self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status)
+
+ client_call.read(read_tag)
+ client_terminal_event_one = self.client_events.get()
+ client_terminal_event_two = self.client_events.get()
+ if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
+ read_accepted = client_terminal_event_one
+ finish_accepted = client_terminal_event_two
+ else:
+ read_accepted = client_terminal_event_two
+ finish_accepted = client_terminal_event_one
+ self.assertIsNotNone(read_accepted)
+ self.assertIsNotNone(finish_accepted)
+ self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
+ self.assertEqual(read_tag, read_accepted.tag)
+ self.assertIsNone(read_accepted.bytes)
+ self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind)
+ self.assertEqual(finish_tag, finish_accepted.tag)
+ self.assertEqual(_low.Status(_low.Code.OK, details), finish_accepted.status)
+ metadata = dict(finish_accepted.metadata)
+ self.assertIn(server_trailing_metadata_key, metadata)
+ self.assertEqual(server_trailing_metadata_value,
+ metadata[server_trailing_metadata_key])
+ self.assertIn(server_trailing_binary_metadata_key, metadata)
+ self.assertEqual(server_trailing_binary_metadata_value,
+ metadata[server_trailing_binary_metadata_key])
+ self.assertSetEqual(set(key for key, _ in finish_accepted.metadata),
+ set((server_trailing_metadata_key,
+ server_trailing_binary_metadata_key,)))
+
+ server_timeout_none_event = self.server_completion_queue.get(0)
+ self.assertIsNone(server_timeout_none_event)
+ client_timeout_none_event = self.client_completion_queue.get(0)
+ self.assertIsNone(client_timeout_none_event)
+
+ self.assertSequenceEqual(test_data, server_data)
+ self.assertSequenceEqual(test_data, client_data)
+
+ def testNoEcho(self):
+ self._perform_echo_test(())
+
+ def testOneByteEcho(self):
+ self._perform_echo_test([b'\x07'])
+
+ def testOneManyByteEcho(self):
+ self._perform_echo_test([_BYTE_SEQUENCE])
+
+ def testManyOneByteEchoes(self):
+ self._perform_echo_test(_BYTE_SEQUENCE)
+
+ def testManyManyByteEchoes(self):
+ self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE)
+
+
+class CancellationTest(unittest.TestCase):
+
+ def setUp(self):
+ self.host = 'localhost'
+
+ self.server_completion_queue = _low.CompletionQueue()
+ self.server = _low.Server(self.server_completion_queue)
+ port = self.server.add_http2_addr('[::]:0')
+ self.server.start()
+ self.server_events = Queue.Queue()
+ self.server_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.server_completion_queue, self.server_events))
+ self.server_completion_queue_thread.start()
+
+ self.client_completion_queue = _low.CompletionQueue()
+ self.channel = _low.Channel('%s:%d' % (self.host, port), None)
+ self.client_events = Queue.Queue()
+ self.client_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.client_completion_queue, self.client_events))
+ self.client_completion_queue_thread.start()
+
+ def tearDown(self):
+ self.server.stop()
+ self.server_completion_queue.stop()
+ self.client_completion_queue.stop()
+ self.server_completion_queue_thread.join()
+ self.client_completion_queue_thread.join()
+ del self.server
+
+ def testCancellation(self):
+ method = 'test method'
+ deadline = _FUTURE
+ metadata_tag = object()
+ finish_tag = object()
+ write_tag = object()
+ service_tag = object()
+ read_tag = object()
+ test_data = _BYTE_SEQUENCE_SEQUENCE
+
+ server_data = []
+ client_data = []
+
+ client_call = _low.Call(self.channel, self.client_completion_queue,
+ method, self.host, deadline)
+
+ client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
+
+ self.server.service(service_tag)
+ service_accepted = self.server_events.get()
+ server_call = service_accepted.service_acceptance.call
+
+ server_call.accept(self.server_completion_queue, finish_tag)
+ server_call.premetadata()
+
+ metadata_accepted = self.client_events.get()
+ self.assertIsNotNone(metadata_accepted)
+
+ for datum in test_data:
+ client_call.write(datum, write_tag)
+ write_accepted = self.client_events.get()
+
+ server_call.read(read_tag)
+ read_accepted = self.server_events.get()
+ server_data.append(read_accepted.bytes)
+
+ server_call.write(read_accepted.bytes, write_tag)
+ write_accepted = self.server_events.get()
+ self.assertIsNotNone(write_accepted)
+
+ client_call.read(read_tag)
+ read_accepted = self.client_events.get()
+ client_data.append(read_accepted.bytes)
+
+ client_call.cancel()
+ # cancel() is idempotent.
+ client_call.cancel()
+ client_call.cancel()
+ client_call.cancel()
+
+ server_call.read(read_tag)
+
+ server_terminal_event_one = self.server_events.get()
+ server_terminal_event_two = self.server_events.get()
+ if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
+ read_accepted = server_terminal_event_one
+ rpc_accepted = server_terminal_event_two
+ else:
+ read_accepted = server_terminal_event_two
+ rpc_accepted = server_terminal_event_one
+ self.assertIsNotNone(read_accepted)
+ self.assertIsNotNone(rpc_accepted)
+ self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
+ self.assertIsNone(read_accepted.bytes)
+ self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
+ self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status)
+
+ finish_event = self.client_events.get()
+ self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
+ self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'),
+ finish_event.status)
+
+ server_timeout_none_event = self.server_completion_queue.get(0)
+ self.assertIsNone(server_timeout_none_event)
+ client_timeout_none_event = self.client_completion_queue.get(0)
+ self.assertIsNone(client_timeout_none_event)
+
+ self.assertSequenceEqual(test_data, server_data)
+ self.assertSequenceEqual(test_data, client_data)
+
+
+class ExpirationTest(unittest.TestCase):
+
+ @unittest.skip('TODO(nathaniel): Expiration test!')
+ def testExpiration(self):
+ pass
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
+
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_links_test.py b/src/python/grpcio_test/grpc_test/_adapter/_links_test.py
new file mode 100644
index 0000000000..c4686b327a
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/_links_test.py
@@ -0,0 +1,277 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test of the GRPC-backed ForeLink and RearLink."""
+
+import threading
+import unittest
+
+from grpc._adapter import fore
+from grpc._adapter import rear
+from grpc.framework.base import interfaces
+from grpc.framework.foundation import logging_pool
+from grpc_test._adapter import _proto_scenarios
+from grpc_test._adapter import _test_links
+
+_IDENTITY = lambda x: x
+_TIMEOUT = 32
+
+
+# TODO(nathaniel): End-to-end metadata testing.
+def _transform_metadata(unused_metadata):
+ return (
+ ('one unused key', 'one unused value'),
+ ('another unused key', 'another unused value'),
+)
+
+
+class RoundTripTest(unittest.TestCase):
+
+ def setUp(self):
+ self.fore_link_pool = logging_pool.pool(8)
+ self.rear_link_pool = logging_pool.pool(8)
+
+ def tearDown(self):
+ self.rear_link_pool.shutdown(wait=True)
+ self.fore_link_pool.shutdown(wait=True)
+
+ def testZeroMessageRoundTrip(self):
+ test_operation_id = object()
+ test_method = 'test method'
+ test_fore_link = _test_links.ForeLink(None, None)
+ def rear_action(front_to_back_ticket, fore_link):
+ if front_to_back_ticket.kind in (
+ interfaces.FrontToBackTicket.Kind.COMPLETION,
+ interfaces.FrontToBackTicket.Kind.ENTIRE):
+ back_to_front_ticket = interfaces.BackToFrontTicket(
+ front_to_back_ticket.operation_id, 0,
+ interfaces.BackToFrontTicket.Kind.COMPLETION, None)
+ fore_link.accept_back_to_front_ticket(back_to_front_ticket)
+ test_rear_link = _test_links.RearLink(rear_action, None)
+
+ fore_link = fore.ForeLink(
+ self.fore_link_pool, {test_method: None}, {test_method: None}, None, ())
+ fore_link.join_rear_link(test_rear_link)
+ test_rear_link.join_fore_link(fore_link)
+ fore_link.start()
+ port = fore_link.port()
+
+ rear_link = rear.RearLink(
+ 'localhost', port, self.rear_link_pool, {test_method: None},
+ {test_method: None}, False, None, None, None,
+ metadata_transformer=_transform_metadata)
+ rear_link.join_fore_link(test_fore_link)
+ test_fore_link.join_rear_link(rear_link)
+ rear_link.start()
+
+ front_to_back_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE,
+ test_method, interfaces.ServicedSubscription.Kind.FULL, None, None,
+ _TIMEOUT)
+ rear_link.accept_front_to_back_ticket(front_to_back_ticket)
+
+ with test_fore_link.condition:
+ while (not test_fore_link.tickets or
+ test_fore_link.tickets[-1].kind is
+ interfaces.BackToFrontTicket.Kind.CONTINUATION):
+ test_fore_link.condition.wait()
+
+ rear_link.stop()
+ fore_link.stop()
+
+ with test_fore_link.condition:
+ self.assertIs(
+ test_fore_link.tickets[-1].kind,
+ interfaces.BackToFrontTicket.Kind.COMPLETION)
+
+ def testEntireRoundTrip(self):
+ test_operation_id = object()
+ test_method = 'test method'
+ test_front_to_back_datum = b'\x07'
+ test_back_to_front_datum = b'\x08'
+ test_fore_link = _test_links.ForeLink(None, None)
+ rear_sequence_number = [0]
+ def rear_action(front_to_back_ticket, fore_link):
+ if front_to_back_ticket.payload is None:
+ payload = None
+ else:
+ payload = test_back_to_front_datum
+ terminal = front_to_back_ticket.kind in (
+ interfaces.FrontToBackTicket.Kind.COMPLETION,
+ interfaces.FrontToBackTicket.Kind.ENTIRE)
+ if payload is not None or terminal:
+ if terminal:
+ kind = interfaces.BackToFrontTicket.Kind.COMPLETION
+ else:
+ kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
+ back_to_front_ticket = interfaces.BackToFrontTicket(
+ front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
+ payload)
+ rear_sequence_number[0] += 1
+ fore_link.accept_back_to_front_ticket(back_to_front_ticket)
+ test_rear_link = _test_links.RearLink(rear_action, None)
+
+ fore_link = fore.ForeLink(
+ self.fore_link_pool, {test_method: _IDENTITY},
+ {test_method: _IDENTITY}, None, ())
+ fore_link.join_rear_link(test_rear_link)
+ test_rear_link.join_fore_link(fore_link)
+ fore_link.start()
+ port = fore_link.port()
+
+ rear_link = rear.RearLink(
+ 'localhost', port, self.rear_link_pool, {test_method: _IDENTITY},
+ {test_method: _IDENTITY}, False, None, None, None)
+ rear_link.join_fore_link(test_fore_link)
+ test_fore_link.join_rear_link(rear_link)
+ rear_link.start()
+
+ front_to_back_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE,
+ test_method, interfaces.ServicedSubscription.Kind.FULL, None,
+ test_front_to_back_datum, _TIMEOUT)
+ rear_link.accept_front_to_back_ticket(front_to_back_ticket)
+
+ with test_fore_link.condition:
+ while (not test_fore_link.tickets or
+ test_fore_link.tickets[-1].kind is not
+ interfaces.BackToFrontTicket.Kind.COMPLETION):
+ test_fore_link.condition.wait()
+
+ rear_link.stop()
+ fore_link.stop()
+
+ with test_rear_link.condition:
+ front_to_back_payloads = tuple(
+ ticket.payload for ticket in test_rear_link.tickets
+ if ticket.payload is not None)
+ with test_fore_link.condition:
+ back_to_front_payloads = tuple(
+ ticket.payload for ticket in test_fore_link.tickets
+ if ticket.payload is not None)
+ self.assertTupleEqual((test_front_to_back_datum,), front_to_back_payloads)
+ self.assertTupleEqual((test_back_to_front_datum,), back_to_front_payloads)
+
+ def _perform_scenario_test(self, scenario):
+ test_operation_id = object()
+ test_method = scenario.method()
+ test_fore_link = _test_links.ForeLink(None, None)
+ rear_lock = threading.Lock()
+ rear_sequence_number = [0]
+ def rear_action(front_to_back_ticket, fore_link):
+ with rear_lock:
+ if front_to_back_ticket.payload is not None:
+ response = scenario.response_for_request(front_to_back_ticket.payload)
+ else:
+ response = None
+ terminal = front_to_back_ticket.kind in (
+ interfaces.FrontToBackTicket.Kind.COMPLETION,
+ interfaces.FrontToBackTicket.Kind.ENTIRE)
+ if response is not None or terminal:
+ if terminal:
+ kind = interfaces.BackToFrontTicket.Kind.COMPLETION
+ else:
+ kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
+ back_to_front_ticket = interfaces.BackToFrontTicket(
+ front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
+ response)
+ rear_sequence_number[0] += 1
+ fore_link.accept_back_to_front_ticket(back_to_front_ticket)
+ test_rear_link = _test_links.RearLink(rear_action, None)
+
+ fore_link = fore.ForeLink(
+ self.fore_link_pool, {test_method: scenario.deserialize_request},
+ {test_method: scenario.serialize_response}, None, ())
+ fore_link.join_rear_link(test_rear_link)
+ test_rear_link.join_fore_link(fore_link)
+ fore_link.start()
+ port = fore_link.port()
+
+ rear_link = rear.RearLink(
+ 'localhost', port, self.rear_link_pool,
+ {test_method: scenario.serialize_request},
+ {test_method: scenario.deserialize_response}, False, None, None, None)
+ rear_link.join_fore_link(test_fore_link)
+ test_fore_link.join_rear_link(rear_link)
+ rear_link.start()
+
+ commencement_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, 0,
+ interfaces.FrontToBackTicket.Kind.COMMENCEMENT, test_method,
+ interfaces.ServicedSubscription.Kind.FULL, None, None,
+ _TIMEOUT)
+ fore_sequence_number = 1
+ rear_link.accept_front_to_back_ticket(commencement_ticket)
+ for request in scenario.requests():
+ continuation_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, fore_sequence_number,
+ interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None, None,
+ request, None)
+ fore_sequence_number += 1
+ rear_link.accept_front_to_back_ticket(continuation_ticket)
+ completion_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, fore_sequence_number,
+ interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None, None,
+ None)
+ fore_sequence_number += 1
+ rear_link.accept_front_to_back_ticket(completion_ticket)
+
+ with test_fore_link.condition:
+ while (not test_fore_link.tickets or
+ test_fore_link.tickets[-1].kind is not
+ interfaces.BackToFrontTicket.Kind.COMPLETION):
+ test_fore_link.condition.wait()
+
+ rear_link.stop()
+ fore_link.stop()
+
+ with test_rear_link.condition:
+ requests = tuple(
+ ticket.payload for ticket in test_rear_link.tickets
+ if ticket.payload is not None)
+ with test_fore_link.condition:
+ responses = tuple(
+ ticket.payload for ticket in test_fore_link.tickets
+ if ticket.payload is not None)
+ self.assertTrue(scenario.verify_requests(requests))
+ self.assertTrue(scenario.verify_responses(responses))
+
+ def testEmptyScenario(self):
+ self._perform_scenario_test(_proto_scenarios.EmptyScenario())
+
+ def testBidirectionallyUnaryScenario(self):
+ self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario())
+
+ def testBidirectionallyStreamingScenario(self):
+ self._perform_scenario_test(
+ _proto_scenarios.BidirectionallyStreamingScenario())
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_lonely_rear_link_test.py b/src/python/grpcio_test/grpc_test/_adapter/_lonely_rear_link_test.py
new file mode 100644
index 0000000000..9b5758f60f
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/_lonely_rear_link_test.py
@@ -0,0 +1,100 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A test of invocation-side code unconnected to an RPC server."""
+
+import unittest
+
+from grpc._adapter import rear
+from grpc.framework.base import interfaces
+from grpc.framework.foundation import logging_pool
+from grpc_test._adapter import _test_links
+
+_IDENTITY = lambda x: x
+_TIMEOUT = 2
+
+
+class LonelyRearLinkTest(unittest.TestCase):
+
+ def setUp(self):
+ self.pool = logging_pool.pool(8)
+
+ def tearDown(self):
+ self.pool.shutdown(wait=True)
+
+ def testUpAndDown(self):
+ rear_link = rear.RearLink(
+ 'nonexistent', 54321, self.pool, {}, {}, False, None, None, None)
+
+ rear_link.start()
+ rear_link.stop()
+
+ def _perform_lonely_client_test_with_ticket_kind(
+ self, front_to_back_ticket_kind):
+ test_operation_id = object()
+ test_method = 'test method'
+ fore_link = _test_links.ForeLink(None, None)
+
+ rear_link = rear.RearLink(
+ 'nonexistent', 54321, self.pool, {test_method: None},
+ {test_method: None}, False, None, None, None)
+ rear_link.join_fore_link(fore_link)
+ rear_link.start()
+
+ front_to_back_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, 0, front_to_back_ticket_kind, test_method,
+ interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
+ rear_link.accept_front_to_back_ticket(front_to_back_ticket)
+
+ with fore_link.condition:
+ while True:
+ if (fore_link.tickets and
+ fore_link.tickets[-1].kind is not
+ interfaces.BackToFrontTicket.Kind.CONTINUATION):
+ break
+ fore_link.condition.wait()
+
+ rear_link.stop()
+
+ with fore_link.condition:
+ self.assertIsNot(
+ fore_link.tickets[-1].kind,
+ interfaces.BackToFrontTicket.Kind.COMPLETION)
+
+ def testLonelyClientCommencementTicket(self):
+ self._perform_lonely_client_test_with_ticket_kind(
+ interfaces.FrontToBackTicket.Kind.COMMENCEMENT)
+
+ def testLonelyClientEntireTicket(self):
+ self._perform_lonely_client_test_with_ticket_kind(
+ interfaces.FrontToBackTicket.Kind.ENTIRE)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
new file mode 100644
index 0000000000..9a8edfad0c
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
@@ -0,0 +1,199 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import threading
+import time
+import unittest
+
+from grpc._adapter import _types
+from grpc._adapter import _low
+
+
+def WaitForEvents(completion_queues, deadline):
+ """
+ Args:
+ completion_queues: list of completion queues to wait for events on
+ deadline: absolute deadline to wait until
+
+ Returns:
+ a sequence of events of length len(completion_queues).
+ """
+
+ results = [None] * len(completion_queues)
+ lock = threading.Lock()
+ threads = []
+ def set_ith_result(i, completion_queue):
+ result = completion_queue.next(deadline)
+ with lock:
+ print i, completion_queue, result, time.time() - deadline
+ results[i] = result
+ for i, completion_queue in enumerate(completion_queues):
+ thread = threading.Thread(target=set_ith_result,
+ args=[i, completion_queue])
+ thread.start()
+ threads.append(thread)
+ for thread in threads:
+ thread.join()
+ return results
+
+class InsecureServerInsecureClient(unittest.TestCase):
+
+ def setUp(self):
+ self.server_completion_queue = _low.CompletionQueue()
+ self.server = _low.Server(self.server_completion_queue, [])
+ self.port = self.server.add_http2_port('[::]:0')
+ self.client_completion_queue = _low.CompletionQueue()
+ self.client_channel = _low.Channel('localhost:%d'%self.port, [])
+
+ self.server.start()
+
+ def tearDown(self):
+ self.server.shutdown()
+ del self.client_channel
+
+ self.client_completion_queue.shutdown()
+ while self.client_completion_queue.next().type != _types.EventType.QUEUE_SHUTDOWN:
+ pass
+ self.server_completion_queue.shutdown()
+ while self.server_completion_queue.next().type != _types.EventType.QUEUE_SHUTDOWN:
+ pass
+
+ del self.client_completion_queue
+ del self.server_completion_queue
+ del self.server
+
+ def testEcho(self):
+ DEADLINE = time.time()+5
+ DEADLINE_TOLERANCE = 0.25
+ CLIENT_METADATA_ASCII_KEY = 'key'
+ CLIENT_METADATA_ASCII_VALUE = 'val'
+ CLIENT_METADATA_BIN_KEY = 'key-bin'
+ CLIENT_METADATA_BIN_VALUE = b'\0'*1000
+ SERVER_INITIAL_METADATA_KEY = 'init_me_me_me'
+ SERVER_INITIAL_METADATA_VALUE = 'whodawha?'
+ SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought'
+ SERVER_TRAILING_METADATA_VALUE = 'zomg it is'
+ SERVER_STATUS_CODE = _types.StatusCode.OK
+ SERVER_STATUS_DETAILS = 'our work is never over'
+ REQUEST = 'in death a member of project mayhem has a name'
+ RESPONSE = 'his name is robert paulson'
+ METHOD = 'twinkies'
+ HOST = 'hostess'
+ server_request_tag = object()
+ request_call_result = self.server.request_call(self.server_completion_queue, server_request_tag)
+
+ self.assertEquals(_types.CallError.OK, request_call_result)
+
+ client_call_tag = object()
+ client_call = self.client_channel.create_call(self.client_completion_queue, METHOD, HOST, DEADLINE)
+ client_initial_metadata = [(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]
+ client_start_batch_result = client_call.start_batch([
+ _types.OpArgs.send_initial_metadata(client_initial_metadata),
+ _types.OpArgs.send_message(REQUEST),
+ _types.OpArgs.send_close_from_client(),
+ _types.OpArgs.recv_initial_metadata(),
+ _types.OpArgs.recv_message(),
+ _types.OpArgs.recv_status_on_client()
+ ], client_call_tag)
+ self.assertEquals(_types.CallError.OK, client_start_batch_result)
+
+ client_no_event, request_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 2)
+ self.assertEquals(client_no_event, None)
+ self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type)
+ self.assertIsInstance(request_event.call, _low.Call)
+ self.assertIs(server_request_tag, request_event.tag)
+ self.assertEquals(1, len(request_event.results))
+ got_initial_metadata = dict(request_event.results[0].initial_metadata)
+ self.assertEquals(
+ dict(client_initial_metadata),
+ dict((x, got_initial_metadata[x]) for x in zip(*client_initial_metadata)[0]))
+ self.assertEquals(METHOD, request_event.call_details.method)
+ self.assertEquals(HOST, request_event.call_details.host)
+ self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE)
+
+ server_call_tag = object()
+ server_call = request_event.call
+ server_initial_metadata = [(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE)]
+ server_trailing_metadata = [(SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE)]
+ server_start_batch_result = server_call.start_batch([
+ _types.OpArgs.send_initial_metadata(server_initial_metadata),
+ _types.OpArgs.recv_message(),
+ _types.OpArgs.send_message(RESPONSE),
+ _types.OpArgs.recv_close_on_server(),
+ _types.OpArgs.send_status_from_server(server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS)
+ ], server_call_tag)
+ self.assertEquals(_types.CallError.OK, server_start_batch_result)
+
+ client_event, server_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 1)
+
+ self.assertEquals(6, len(client_event.results))
+ found_client_op_types = set()
+ for client_result in client_event.results:
+ self.assertNotIn(client_result.type, found_client_op_types) # we expect each op type to be unique
+ found_client_op_types.add(client_result.type)
+ if client_result.type == _types.OpType.RECV_INITIAL_METADATA:
+ self.assertEquals(dict(server_initial_metadata), dict(client_result.initial_metadata))
+ elif client_result.type == _types.OpType.RECV_MESSAGE:
+ self.assertEquals(RESPONSE, client_result.message)
+ elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT:
+ self.assertEquals(dict(server_trailing_metadata), dict(client_result.trailing_metadata))
+ self.assertEquals(SERVER_STATUS_DETAILS, client_result.status.details)
+ self.assertEquals(SERVER_STATUS_CODE, client_result.status.code)
+ self.assertEquals(set([
+ _types.OpType.SEND_INITIAL_METADATA,
+ _types.OpType.SEND_MESSAGE,
+ _types.OpType.SEND_CLOSE_FROM_CLIENT,
+ _types.OpType.RECV_INITIAL_METADATA,
+ _types.OpType.RECV_MESSAGE,
+ _types.OpType.RECV_STATUS_ON_CLIENT
+ ]), found_client_op_types)
+
+ self.assertEquals(5, len(server_event.results))
+ found_server_op_types = set()
+ for server_result in server_event.results:
+ self.assertNotIn(client_result.type, found_server_op_types)
+ found_server_op_types.add(server_result.type)
+ if server_result.type == _types.OpType.RECV_MESSAGE:
+ self.assertEquals(REQUEST, server_result.message)
+ elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER:
+ self.assertFalse(server_result.cancelled)
+ self.assertEquals(set([
+ _types.OpType.SEND_INITIAL_METADATA,
+ _types.OpType.RECV_MESSAGE,
+ _types.OpType.SEND_MESSAGE,
+ _types.OpType.RECV_CLOSE_ON_SERVER,
+ _types.OpType.SEND_STATUS_FROM_SERVER
+ ]), found_server_op_types)
+
+ del client_call
+ del server_call
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_proto_scenarios.py b/src/python/grpcio_test/grpc_test/_adapter/_proto_scenarios.py
new file mode 100644
index 0000000000..b3d6ec8607
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/_proto_scenarios.py
@@ -0,0 +1,261 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test scenarios using protocol buffers."""
+
+import abc
+import threading
+
+from grpc_test._junkdrawer import math_pb2
+
+
+class ProtoScenario(object):
+ """An RPC test scenario using protocol buffers."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def method(self):
+ """Access the test method name.
+
+ Returns:
+ The test method name.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_request(self, request):
+ """Serialize a request protocol buffer.
+
+ Args:
+ request: A request protocol buffer.
+
+ Returns:
+ The bytestring serialization of the given request protocol buffer.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_request(self, request_bytestring):
+ """Deserialize a request protocol buffer.
+
+ Args:
+ request_bytestring: The bytestring serialization of a request protocol
+ buffer.
+
+ Returns:
+ The request protocol buffer deserialized from the given byte string.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_response(self, response):
+ """Serialize a response protocol buffer.
+
+ Args:
+ response: A response protocol buffer.
+
+ Returns:
+ The bytestring serialization of the given response protocol buffer.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_response(self, response_bytestring):
+ """Deserialize a response protocol buffer.
+
+ Args:
+ response_bytestring: The bytestring serialization of a response protocol
+ buffer.
+
+ Returns:
+ The response protocol buffer deserialized from the given byte string.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def requests(self):
+ """Access the sequence of requests for this scenario.
+
+ Returns:
+ A sequence of request protocol buffers.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def response_for_request(self, request):
+ """Access the response for a particular request.
+
+ Args:
+ request: A request protocol buffer.
+
+ Returns:
+ The response protocol buffer appropriate for the given request.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify_requests(self, experimental_requests):
+ """Verify the requests transmitted through the system under test.
+
+ Args:
+ experimental_requests: The request protocol buffers transmitted through
+ the system under test.
+
+ Returns:
+ True if the requests satisfy this test scenario; False otherwise.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify_responses(self, experimental_responses):
+ """Verify the responses transmitted through the system under test.
+
+ Args:
+ experimental_responses: The response protocol buffers transmitted through
+ the system under test.
+
+ Returns:
+ True if the responses satisfy this test scenario; False otherwise.
+ """
+ raise NotImplementedError()
+
+
+class EmptyScenario(ProtoScenario):
+ """A scenario that transmits no protocol buffers in either direction."""
+
+ def method(self):
+ return 'DivMany'
+
+ def serialize_request(self, request):
+ raise ValueError('This should not be necessary to call!')
+
+ def deserialize_request(self, request_bytestring):
+ raise ValueError('This should not be necessary to call!')
+
+ def serialize_response(self, response):
+ raise ValueError('This should not be necessary to call!')
+
+ def deserialize_response(self, response_bytestring):
+ raise ValueError('This should not be necessary to call!')
+
+ def requests(self):
+ return ()
+
+ def response_for_request(self, request):
+ raise ValueError('This should not be necessary to call!')
+
+ def verify_requests(self, experimental_requests):
+ return not experimental_requests
+
+ def verify_responses(self, experimental_responses):
+ return not experimental_responses
+
+
+class BidirectionallyUnaryScenario(ProtoScenario):
+ """A scenario that transmits no protocol buffers in either direction."""
+
+ _DIVIDEND = 59
+ _DIVISOR = 7
+ _QUOTIENT = 8
+ _REMAINDER = 3
+
+ _REQUEST = math_pb2.DivArgs(dividend=_DIVIDEND, divisor=_DIVISOR)
+ _RESPONSE = math_pb2.DivReply(quotient=_QUOTIENT, remainder=_REMAINDER)
+
+ def method(self):
+ return 'Div'
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, request_bytestring):
+ return math_pb2.DivArgs.FromString(request_bytestring)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, response_bytestring):
+ return math_pb2.DivReply.FromString(response_bytestring)
+
+ def requests(self):
+ return [self._REQUEST]
+
+ def response_for_request(self, request):
+ return self._RESPONSE
+
+ def verify_requests(self, experimental_requests):
+ return tuple(experimental_requests) == (self._REQUEST,)
+
+ def verify_responses(self, experimental_responses):
+ return tuple(experimental_responses) == (self._RESPONSE,)
+
+
+class BidirectionallyStreamingScenario(ProtoScenario):
+ """A scenario that transmits no protocol buffers in either direction."""
+
+ _STREAM_LENGTH = 200
+ _REQUESTS = tuple(
+ math_pb2.DivArgs(dividend=59 + index, divisor=7 + index)
+ for index in range(_STREAM_LENGTH))
+
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._responses = []
+
+ def method(self):
+ return 'DivMany'
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, request_bytestring):
+ return math_pb2.DivArgs.FromString(request_bytestring)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, response_bytestring):
+ return math_pb2.DivReply.FromString(response_bytestring)
+
+ def requests(self):
+ return self._REQUESTS
+
+ def response_for_request(self, request):
+ quotient, remainder = divmod(request.dividend, request.divisor)
+ response = math_pb2.DivReply(quotient=quotient, remainder=remainder)
+ with self._lock:
+ self._responses.append(response)
+ return response
+
+ def verify_requests(self, experimental_requests):
+ return tuple(experimental_requests) == self._REQUESTS
+
+ def verify_responses(self, experimental_responses):
+ with self._lock:
+ return tuple(experimental_responses) == tuple(self._responses)
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_test_links.py b/src/python/grpcio_test/grpc_test/_adapter/_test_links.py
new file mode 100644
index 0000000000..86c7e61b17
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/_adapter/_test_links.py
@@ -0,0 +1,80 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Links suitable for use in tests."""
+
+import threading
+
+from grpc.framework.base import interfaces
+
+
+class ForeLink(interfaces.ForeLink):
+ """A ForeLink suitable for use in tests of RearLinks."""
+
+ def __init__(self, action, rear_link):
+ self.condition = threading.Condition()
+ self.tickets = []
+ self.action = action
+ self.rear_link = rear_link
+
+ def accept_back_to_front_ticket(self, ticket):
+ with self.condition:
+ self.tickets.append(ticket)
+ self.condition.notify_all()
+ action, rear_link = self.action, self.rear_link
+
+ if action is not None:
+ action(ticket, rear_link)
+
+ def join_rear_link(self, rear_link):
+ with self.condition:
+ self.rear_link = rear_link
+
+
+class RearLink(interfaces.RearLink):
+ """A RearLink suitable for use in tests of ForeLinks."""
+
+ def __init__(self, action, fore_link):
+ self.condition = threading.Condition()
+ self.tickets = []
+ self.action = action
+ self.fore_link = fore_link
+
+ def accept_front_to_back_ticket(self, ticket):
+ with self.condition:
+ self.tickets.append(ticket)
+ self.condition.notify_all()
+ action, fore_link = self.action, self.fore_link
+
+ if action is not None:
+ action(ticket, fore_link)
+
+ def join_fore_link(self, fore_link):
+ with self.condition:
+ self.fore_link = fore_link