aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_test/grpc_test
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_test/grpc_test')
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py8
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_beta_features_test.py232
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py59
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_face_interface_test.py19
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_not_found_test.py8
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_utilities_test.py6
-rw-r--r--src/python/grpcio_test/grpc_test/beta/test_utilities.py12
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py16
8 files changed, 309 insertions, 51 deletions
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py
index 27a5b82e9c..90ad0b9bcb 100644
--- a/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py
+++ b/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py
@@ -191,7 +191,7 @@ class EchoTest(unittest.TestCase):
metadata[server_leading_binary_metadata_key])
for datum in test_data:
- client_call.write(datum, write_tag)
+ client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS)
write_accepted = self.client_events.get()
self.assertIsNotNone(write_accepted)
self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
@@ -206,7 +206,7 @@ class EchoTest(unittest.TestCase):
self.assertIsNotNone(read_accepted.bytes)
server_data.append(read_accepted.bytes)
- server_call.write(read_accepted.bytes, write_tag)
+ server_call.write(read_accepted.bytes, write_tag, 0)
write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted)
self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
@@ -370,14 +370,14 @@ class CancellationTest(unittest.TestCase):
self.assertIsNotNone(metadata_accepted)
for datum in test_data:
- client_call.write(datum, write_tag)
+ client_call.write(datum, write_tag, 0)
write_accepted = self.client_events.get()
server_call.read(read_tag)
read_accepted = self.server_events.get()
server_data.append(read_accepted.bytes)
- server_call.write(read_accepted.bytes, write_tag)
+ server_call.write(read_accepted.bytes, write_tag, 0)
write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted)
diff --git a/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py
new file mode 100644
index 0000000000..fad57da9d0
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py
@@ -0,0 +1,232 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests Face interface compliance of the gRPC Python Beta API."""
+
+import threading
+import unittest
+
+from grpc.beta import implementations
+from grpc.beta import interfaces
+from grpc.framework.common import cardinality
+from grpc.framework.interfaces.face import utilities
+from grpc_test import resources
+from grpc_test.beta import test_utilities
+from grpc_test.framework.common import test_constants
+
+_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
+
+_GROUP = 'group'
+_UNARY_UNARY = 'unary-unary'
+_UNARY_STREAM = 'unary-stream'
+_STREAM_UNARY = 'stream-unary'
+_STREAM_STREAM = 'stream-stream'
+
+_REQUEST = b'abc'
+_RESPONSE = b'123'
+
+
+class _Servicer(object):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._peer = None
+ self._serviced = False
+
+ def unary_unary(self, request, context):
+ with self._condition:
+ self._request = request
+ self._peer = context.protocol_context().peer()
+ context.protocol_context().disable_next_response_compression()
+ self._serviced = True
+ self._condition.notify_all()
+ return _RESPONSE
+
+ def unary_stream(self, request, context):
+ with self._condition:
+ self._request = request
+ self._peer = context.protocol_context().peer()
+ context.protocol_context().disable_next_response_compression()
+ self._serviced = True
+ self._condition.notify_all()
+ return
+ yield
+
+ def stream_unary(self, request_iterator, context):
+ for request in request_iterator:
+ self._request = request
+ with self._condition:
+ self._peer = context.protocol_context().peer()
+ context.protocol_context().disable_next_response_compression()
+ self._serviced = True
+ self._condition.notify_all()
+ return _RESPONSE
+
+ def stream_stream(self, request_iterator, context):
+ for request in request_iterator:
+ with self._condition:
+ self._peer = context.protocol_context().peer()
+ context.protocol_context().disable_next_response_compression()
+ yield _RESPONSE
+ with self._condition:
+ self._serviced = True
+ self._condition.notify_all()
+
+ def peer(self):
+ with self._condition:
+ return self._peer
+
+ def block_until_serviced(self):
+ with self._condition:
+ while not self._serviced:
+ self._condition.wait()
+
+
+class _BlockingIterator(object):
+
+ def __init__(self, upstream):
+ self._condition = threading.Condition()
+ self._upstream = upstream
+ self._allowed = []
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ with self._condition:
+ while True:
+ if self._allowed is None:
+ raise StopIteration()
+ elif self._allowed:
+ return self._allowed.pop(0)
+ else:
+ self._condition.wait()
+
+ def allow(self):
+ with self._condition:
+ try:
+ self._allowed.append(next(self._upstream))
+ except StopIteration:
+ self._allowed = None
+ self._condition.notify_all()
+
+
+class BetaFeaturesTest(unittest.TestCase):
+
+ def setUp(self):
+ self._servicer = _Servicer()
+ method_implementations = {
+ (_GROUP, _UNARY_UNARY):
+ utilities.unary_unary_inline(self._servicer.unary_unary),
+ (_GROUP, _UNARY_STREAM):
+ utilities.unary_stream_inline(self._servicer.unary_stream),
+ (_GROUP, _STREAM_UNARY):
+ utilities.stream_unary_inline(self._servicer.stream_unary),
+ (_GROUP, _STREAM_STREAM):
+ utilities.stream_stream_inline(self._servicer.stream_stream),
+ }
+
+ cardinalities = {
+ _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
+ _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
+ _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
+ _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
+ }
+
+ server_options = implementations.server_options(
+ thread_pool_size=test_constants.POOL_SIZE)
+ self._server = implementations.server(
+ method_implementations, options=server_options)
+ server_credentials = implementations.ssl_server_credentials(
+ [(resources.private_key(), resources.certificate_chain(),),])
+ port = self._server.add_secure_port('[::]:0', server_credentials)
+ self._server.start()
+ self._client_credentials = implementations.ssl_client_credentials(
+ resources.test_root_certificates(), None, None)
+ channel = test_utilities.not_really_secure_channel(
+ 'localhost', port, self._client_credentials, _SERVER_HOST_OVERRIDE)
+ stub_options = implementations.stub_options(
+ thread_pool_size=test_constants.POOL_SIZE)
+ self._dynamic_stub = implementations.dynamic_stub(
+ channel, _GROUP, cardinalities, options=stub_options)
+
+ def tearDown(self):
+ self._dynamic_stub = None
+ self._server.stop(test_constants.SHORT_TIMEOUT).wait()
+
+ def test_unary_unary(self):
+ call_options = interfaces.grpc_call_options(
+ disable_compression=True, credentials=self._client_credentials)
+ response = getattr(self._dynamic_stub, _UNARY_UNARY)(
+ _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options)
+ self.assertEqual(_RESPONSE, response)
+ self.assertIsNotNone(self._servicer.peer())
+
+ def test_unary_stream(self):
+ call_options = interfaces.grpc_call_options(
+ disable_compression=True, credentials=self._client_credentials)
+ response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)(
+ _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options)
+ self._servicer.block_until_serviced()
+ self.assertIsNotNone(self._servicer.peer())
+
+ def test_stream_unary(self):
+ call_options = interfaces.grpc_call_options(
+ credentials=self._client_credentials)
+ request_iterator = _BlockingIterator(iter((_REQUEST,)))
+ response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future(
+ request_iterator, test_constants.LONG_TIMEOUT,
+ protocol_options=call_options)
+ response_future.protocol_context().disable_next_request_compression()
+ request_iterator.allow()
+ response_future.protocol_context().disable_next_request_compression()
+ request_iterator.allow()
+ self._servicer.block_until_serviced()
+ self.assertIsNotNone(self._servicer.peer())
+ self.assertEqual(_RESPONSE, response_future.result())
+
+ def test_stream_stream(self):
+ call_options = interfaces.grpc_call_options(
+ credentials=self._client_credentials)
+ request_iterator = _BlockingIterator(iter((_REQUEST,)))
+ response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)(
+ request_iterator, test_constants.SHORT_TIMEOUT,
+ protocol_options=call_options)
+ response_iterator.protocol_context().disable_next_request_compression()
+ request_iterator.allow()
+ response = next(response_iterator)
+ response_iterator.protocol_context().disable_next_request_compression()
+ request_iterator.allow()
+ self._servicer.block_until_serviced()
+ self.assertIsNotNone(self._servicer.peer())
+ self.assertEqual(_RESPONSE, response)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
index 038464889d..b3c05bdb0c 100644
--- a/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
+++ b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
@@ -36,14 +36,9 @@ import unittest
from grpc._adapter import _low
from grpc._adapter import _types
from grpc.beta import _connectivity_channel
+from grpc.beta import interfaces
from grpc_test.framework.common import test_constants
-_MAPPING_FUNCTION = lambda integer: integer * 200 + 17
-_MAPPING = {
- state: _MAPPING_FUNCTION(state) for state in _types.ConnectivityState}
-_IDLE, _CONNECTING, _READY, _TRANSIENT_FAILURE, _FATAL_FAILURE = map(
- _MAPPING_FUNCTION, _types.ConnectivityState)
-
def _drive_completion_queue(completion_queue):
while True:
@@ -84,7 +79,7 @@ class ChannelConnectivityTest(unittest.TestCase):
callback = _Callback()
connectivity_channel = _connectivity_channel.ConnectivityChannel(
- low_channel, _MAPPING)
+ low_channel)
connectivity_channel.subscribe(callback.update, try_to_connect=False)
first_connectivities = callback.block_until_connectivities_satisfy(bool)
connectivity_channel.subscribe(callback.update, try_to_connect=True)
@@ -98,11 +93,16 @@ class ChannelConnectivityTest(unittest.TestCase):
connectivity_channel.unsubscribe(callback.update)
fifth_connectivities = callback.connectivities()
- self.assertSequenceEqual((_IDLE,), first_connectivities)
- self.assertNotIn(_READY, second_connectivities)
- self.assertNotIn(_READY, third_connectivities)
- self.assertNotIn(_READY, fourth_connectivities)
- self.assertNotIn(_READY, fifth_connectivities)
+ self.assertSequenceEqual(
+ (interfaces.ChannelConnectivity.IDLE,), first_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.READY, second_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.READY, third_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.READY, fourth_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.READY, fifth_connectivities)
def test_immediately_connectable_channel_connectivity(self):
server_completion_queue = _low.CompletionQueue()
@@ -117,7 +117,7 @@ class ChannelConnectivityTest(unittest.TestCase):
second_callback = _Callback()
connectivity_channel = _connectivity_channel.ConnectivityChannel(
- low_channel, _MAPPING)
+ low_channel)
connectivity_channel.subscribe(first_callback.update, try_to_connect=False)
first_connectivities = first_callback.block_until_connectivities_satisfy(
bool)
@@ -132,9 +132,11 @@ class ChannelConnectivityTest(unittest.TestCase):
bool)
# Wait for a connection that will happen (or may already have happened).
first_callback.block_until_connectivities_satisfy(
- lambda connectivities: _READY in connectivities)
+ lambda connectivities:
+ interfaces.ChannelConnectivity.READY in connectivities)
second_callback.block_until_connectivities_satisfy(
- lambda connectivities: _READY in connectivities)
+ lambda connectivities:
+ interfaces.ChannelConnectivity.READY in connectivities)
connectivity_channel.unsubscribe(first_callback.update)
connectivity_channel.unsubscribe(second_callback.update)
@@ -142,12 +144,19 @@ class ChannelConnectivityTest(unittest.TestCase):
server_completion_queue.shutdown()
server_completion_queue_thread.join()
- self.assertSequenceEqual((_IDLE,), first_connectivities)
- self.assertSequenceEqual((_IDLE,), second_connectivities)
- self.assertNotIn(_TRANSIENT_FAILURE, third_connectivities)
- self.assertNotIn(_FATAL_FAILURE, third_connectivities)
- self.assertNotIn(_TRANSIENT_FAILURE, fourth_connectivities)
- self.assertNotIn(_FATAL_FAILURE, fourth_connectivities)
+ self.assertSequenceEqual(
+ (interfaces.ChannelConnectivity.IDLE,), first_connectivities)
+ self.assertSequenceEqual(
+ (interfaces.ChannelConnectivity.IDLE,), second_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.FATAL_FAILURE, third_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.TRANSIENT_FAILURE,
+ fourth_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.FATAL_FAILURE, fourth_connectivities)
def test_reachable_then_unreachable_channel_connectivity(self):
server_completion_queue = _low.CompletionQueue()
@@ -161,14 +170,16 @@ class ChannelConnectivityTest(unittest.TestCase):
callback = _Callback()
connectivity_channel = _connectivity_channel.ConnectivityChannel(
- low_channel, _MAPPING)
+ low_channel)
connectivity_channel.subscribe(callback.update, try_to_connect=True)
callback.block_until_connectivities_satisfy(
- lambda connectivities: _READY in connectivities)
+ lambda connectivities:
+ interfaces.ChannelConnectivity.READY in connectivities)
# Now take down the server and confirm that channel readiness is repudiated.
server.shutdown()
callback.block_until_connectivities_satisfy(
- lambda connectivities: connectivities[-1] is not _READY)
+ lambda connectivities:
+ connectivities[-1] is not interfaces.ChannelConnectivity.READY)
connectivity_channel.unsubscribe(callback.update)
server.shutdown()
diff --git a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py
index e9087a7949..aa33e1e6f8 100644
--- a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py
+++ b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py
@@ -32,7 +32,7 @@
import collections
import unittest
-from grpc.beta import beta
+from grpc.beta import implementations
from grpc.beta import interfaces
from grpc_test import resources
from grpc_test import test_common as grpc_test_common
@@ -81,25 +81,26 @@ class _Implementation(test_interfaces.Implementation):
method: method_object.cardinality()
for (group, method), method_object in methods.iteritems()}
- server_options = beta.server_options(
+ server_options = implementations.server_options(
request_deserializers=serialization_behaviors.request_deserializers,
response_serializers=serialization_behaviors.response_serializers,
thread_pool_size=test_constants.POOL_SIZE)
- server = beta.server(method_implementations, options=server_options)
- server_credentials = beta.ssl_server_credentials(
+ server = implementations.server(
+ method_implementations, options=server_options)
+ server_credentials = implementations.ssl_server_credentials(
[(resources.private_key(), resources.certificate_chain(),),])
port = server.add_secure_port('[::]:0', server_credentials)
server.start()
- client_credentials = beta.ssl_client_credentials(
+ client_credentials = implementations.ssl_client_credentials(
resources.test_root_certificates(), None, None)
- channel = test_utilities.create_not_really_secure_channel(
+ channel = test_utilities.not_really_secure_channel(
'localhost', port, client_credentials, _SERVER_HOST_OVERRIDE)
- stub_options = beta.stub_options(
+ stub_options = implementations.stub_options(
request_serializers=serialization_behaviors.request_serializers,
response_deserializers=serialization_behaviors.response_deserializers,
thread_pool_size=test_constants.POOL_SIZE)
- generic_stub = beta.generic_stub(channel, options=stub_options)
- dynamic_stub = beta.dynamic_stub(
+ generic_stub = implementations.generic_stub(channel, options=stub_options)
+ dynamic_stub = implementations.dynamic_stub(
channel, service, cardinalities, options=stub_options)
return generic_stub, {service: dynamic_stub}, server
diff --git a/src/python/grpcio_test/grpc_test/beta/_not_found_test.py b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py
index ecd10f2175..5feb997fef 100644
--- a/src/python/grpcio_test/grpc_test/beta/_not_found_test.py
+++ b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py
@@ -31,7 +31,7 @@
import unittest
-from grpc.beta import beta
+from grpc.beta import implementations
from grpc.beta import interfaces
from grpc.framework.interfaces.face import face
from grpc_test.framework.common import test_constants
@@ -40,10 +40,10 @@ from grpc_test.framework.common import test_constants
class NotFoundTest(unittest.TestCase):
def setUp(self):
- self._server = beta.server({})
+ self._server = implementations.server({})
port = self._server.add_insecure_port('[::]:0')
- channel = beta.create_insecure_channel('localhost', port)
- self._generic_stub = beta.generic_stub(channel)
+ channel = implementations.insecure_channel('localhost', port)
+ self._generic_stub = implementations.generic_stub(channel)
self._server.start()
def tearDown(self):
diff --git a/src/python/grpcio_test/grpc_test/beta/_utilities_test.py b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
index 998e74ccf4..996cea9118 100644
--- a/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
+++ b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
@@ -35,7 +35,7 @@ import unittest
from grpc._adapter import _low
from grpc._adapter import _types
-from grpc.beta import beta
+from grpc.beta import implementations
from grpc.beta import utilities
from grpc.framework.foundation import future
from grpc_test.framework.common import test_constants
@@ -69,7 +69,7 @@ class _Callback(object):
class ChannelConnectivityTest(unittest.TestCase):
def test_lonely_channel_connectivity(self):
- channel = beta.create_insecure_channel('localhost', 12345)
+ channel = implementations.insecure_channel('localhost', 12345)
callback = _Callback()
ready_future = utilities.channel_ready_future(channel)
@@ -94,7 +94,7 @@ class ChannelConnectivityTest(unittest.TestCase):
server_completion_queue_thread = threading.Thread(
target=_drive_completion_queue, args=(server_completion_queue,))
server_completion_queue_thread.start()
- channel = beta.create_insecure_channel('localhost', port)
+ channel = implementations.insecure_channel('localhost', port)
callback = _Callback()
try:
diff --git a/src/python/grpcio_test/grpc_test/beta/test_utilities.py b/src/python/grpcio_test/grpc_test/beta/test_utilities.py
index 338670478d..24a8600e12 100644
--- a/src/python/grpcio_test/grpc_test/beta/test_utilities.py
+++ b/src/python/grpcio_test/grpc_test/beta/test_utilities.py
@@ -30,25 +30,27 @@
"""Test-appropriate entry points into the gRPC Python Beta API."""
from grpc._adapter import _intermediary_low
-from grpc.beta import beta
+from grpc.beta import implementations
-def create_not_really_secure_channel(
+def not_really_secure_channel(
host, port, client_credentials, server_host_override):
"""Creates an insecure Channel to a remote host.
Args:
host: The name of the remote host to which to connect.
port: The port of the remote host to which to connect.
- client_credentials: The beta.ClientCredentials with which to connect.
+ client_credentials: The implementations.ClientCredentials with which to
+ connect.
server_host_override: The target name used for SSL host name checking.
Returns:
- A beta.Channel to the remote host through which RPCs may be conducted.
+ An implementations.Channel to the remote host through which RPCs may be
+ conducted.
"""
hostport = '%s:%d' % (host, port)
intermediary_low_channel = _intermediary_low.Channel(
hostport, client_credentials._intermediary_low_credentials,
server_host_override=server_host_override)
- return beta.Channel(
+ return implementations.Channel(
intermediary_low_channel._internal, intermediary_low_channel)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
index 5065a3f38a..ddda1018c3 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py
@@ -119,6 +119,17 @@ class _Operator(base.Operator):
'Deliberately raised exception from Operator.advance (in a test)!')
+class _ProtocolReceiver(base.ProtocolReceiver):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._contexts = []
+
+ def context(self, protocol_context):
+ with self._condition:
+ self._contexts.append(protocol_context)
+
+
class _Servicer(base.Servicer):
"""A base.Servicer with instrumented for testing."""
@@ -144,7 +155,7 @@ class _Servicer(base.Servicer):
controller.service_on_termination)
if outcome is not None:
controller.service_on_termination(outcome)
- return utilities.full_subscription(operator)
+ return utilities.full_subscription(operator, _ProtocolReceiver())
class _OperationTest(unittest.TestCase):
@@ -169,7 +180,8 @@ class _OperationTest(unittest.TestCase):
test_operator = _Operator(
self._controller, self._controller.on_invocation_advance,
self._pool, None)
- subscription = utilities.full_subscription(test_operator)
+ subscription = utilities.full_subscription(
+ test_operator, _ProtocolReceiver())
else:
# TODO(nathaniel): support and test other subscription kinds.
self.fail('Non-full subscriptions not yet supported!')