diff options
Diffstat (limited to 'src/python/grpcio_test/grpc_test')
8 files changed, 309 insertions, 51 deletions
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py index 27a5b82e9c..90ad0b9bcb 100644 --- a/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py +++ b/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py @@ -191,7 +191,7 @@ class EchoTest(unittest.TestCase): metadata[server_leading_binary_metadata_key]) for datum in test_data: - client_call.write(datum, write_tag) + client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS) write_accepted = self.client_events.get() self.assertIsNotNone(write_accepted) self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED) @@ -206,7 +206,7 @@ class EchoTest(unittest.TestCase): self.assertIsNotNone(read_accepted.bytes) server_data.append(read_accepted.bytes) - server_call.write(read_accepted.bytes, write_tag) + server_call.write(read_accepted.bytes, write_tag, 0) write_accepted = self.server_events.get() self.assertIsNotNone(write_accepted) self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind) @@ -370,14 +370,14 @@ class CancellationTest(unittest.TestCase): self.assertIsNotNone(metadata_accepted) for datum in test_data: - client_call.write(datum, write_tag) + client_call.write(datum, write_tag, 0) write_accepted = self.client_events.get() server_call.read(read_tag) read_accepted = self.server_events.get() server_data.append(read_accepted.bytes) - server_call.write(read_accepted.bytes, write_tag) + server_call.write(read_accepted.bytes, write_tag, 0) write_accepted = self.server_events.get() self.assertIsNotNone(write_accepted) diff --git a/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py new file mode 100644 index 0000000000..fad57da9d0 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py @@ -0,0 +1,232 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests Face interface compliance of the gRPC Python Beta API.""" + +import threading +import unittest + +from grpc.beta import implementations +from grpc.beta import interfaces +from grpc.framework.common import cardinality +from grpc.framework.interfaces.face import utilities +from grpc_test import resources +from grpc_test.beta import test_utilities +from grpc_test.framework.common import test_constants + +_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' + +_GROUP = 'group' +_UNARY_UNARY = 'unary-unary' +_UNARY_STREAM = 'unary-stream' +_STREAM_UNARY = 'stream-unary' +_STREAM_STREAM = 'stream-stream' + +_REQUEST = b'abc' +_RESPONSE = b'123' + + +class _Servicer(object): + + def __init__(self): + self._condition = threading.Condition() + self._peer = None + self._serviced = False + + def unary_unary(self, request, context): + with self._condition: + self._request = request + self._peer = context.protocol_context().peer() + context.protocol_context().disable_next_response_compression() + self._serviced = True + self._condition.notify_all() + return _RESPONSE + + def unary_stream(self, request, context): + with self._condition: + self._request = request + self._peer = context.protocol_context().peer() + context.protocol_context().disable_next_response_compression() + self._serviced = True + self._condition.notify_all() + return + yield + + def stream_unary(self, request_iterator, context): + for request in request_iterator: + self._request = request + with self._condition: + self._peer = context.protocol_context().peer() + context.protocol_context().disable_next_response_compression() + self._serviced = True + self._condition.notify_all() + return _RESPONSE + + def stream_stream(self, request_iterator, context): + for request in request_iterator: + with self._condition: + self._peer = context.protocol_context().peer() + context.protocol_context().disable_next_response_compression() + yield _RESPONSE + with self._condition: + self._serviced = True + self._condition.notify_all() + + def peer(self): + with self._condition: + return self._peer + + def block_until_serviced(self): + with self._condition: + while not self._serviced: + self._condition.wait() + + +class _BlockingIterator(object): + + def __init__(self, upstream): + self._condition = threading.Condition() + self._upstream = upstream + self._allowed = [] + + def __iter__(self): + return self + + def next(self): + with self._condition: + while True: + if self._allowed is None: + raise StopIteration() + elif self._allowed: + return self._allowed.pop(0) + else: + self._condition.wait() + + def allow(self): + with self._condition: + try: + self._allowed.append(next(self._upstream)) + except StopIteration: + self._allowed = None + self._condition.notify_all() + + +class BetaFeaturesTest(unittest.TestCase): + + def setUp(self): + self._servicer = _Servicer() + method_implementations = { + (_GROUP, _UNARY_UNARY): + utilities.unary_unary_inline(self._servicer.unary_unary), + (_GROUP, _UNARY_STREAM): + utilities.unary_stream_inline(self._servicer.unary_stream), + (_GROUP, _STREAM_UNARY): + utilities.stream_unary_inline(self._servicer.stream_unary), + (_GROUP, _STREAM_STREAM): + utilities.stream_stream_inline(self._servicer.stream_stream), + } + + cardinalities = { + _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY, + _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM, + _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY, + _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM, + } + + server_options = implementations.server_options( + thread_pool_size=test_constants.POOL_SIZE) + self._server = implementations.server( + method_implementations, options=server_options) + server_credentials = implementations.ssl_server_credentials( + [(resources.private_key(), resources.certificate_chain(),),]) + port = self._server.add_secure_port('[::]:0', server_credentials) + self._server.start() + self._client_credentials = implementations.ssl_client_credentials( + resources.test_root_certificates(), None, None) + channel = test_utilities.not_really_secure_channel( + 'localhost', port, self._client_credentials, _SERVER_HOST_OVERRIDE) + stub_options = implementations.stub_options( + thread_pool_size=test_constants.POOL_SIZE) + self._dynamic_stub = implementations.dynamic_stub( + channel, _GROUP, cardinalities, options=stub_options) + + def tearDown(self): + self._dynamic_stub = None + self._server.stop(test_constants.SHORT_TIMEOUT).wait() + + def test_unary_unary(self): + call_options = interfaces.grpc_call_options( + disable_compression=True, credentials=self._client_credentials) + response = getattr(self._dynamic_stub, _UNARY_UNARY)( + _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options) + self.assertEqual(_RESPONSE, response) + self.assertIsNotNone(self._servicer.peer()) + + def test_unary_stream(self): + call_options = interfaces.grpc_call_options( + disable_compression=True, credentials=self._client_credentials) + response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)( + _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options) + self._servicer.block_until_serviced() + self.assertIsNotNone(self._servicer.peer()) + + def test_stream_unary(self): + call_options = interfaces.grpc_call_options( + credentials=self._client_credentials) + request_iterator = _BlockingIterator(iter((_REQUEST,))) + response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future( + request_iterator, test_constants.LONG_TIMEOUT, + protocol_options=call_options) + response_future.protocol_context().disable_next_request_compression() + request_iterator.allow() + response_future.protocol_context().disable_next_request_compression() + request_iterator.allow() + self._servicer.block_until_serviced() + self.assertIsNotNone(self._servicer.peer()) + self.assertEqual(_RESPONSE, response_future.result()) + + def test_stream_stream(self): + call_options = interfaces.grpc_call_options( + credentials=self._client_credentials) + request_iterator = _BlockingIterator(iter((_REQUEST,))) + response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)( + request_iterator, test_constants.SHORT_TIMEOUT, + protocol_options=call_options) + response_iterator.protocol_context().disable_next_request_compression() + request_iterator.allow() + response = next(response_iterator) + response_iterator.protocol_context().disable_next_request_compression() + request_iterator.allow() + self._servicer.block_until_serviced() + self.assertIsNotNone(self._servicer.peer()) + self.assertEqual(_RESPONSE, response) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py index 038464889d..b3c05bdb0c 100644 --- a/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py +++ b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py @@ -36,14 +36,9 @@ import unittest from grpc._adapter import _low from grpc._adapter import _types from grpc.beta import _connectivity_channel +from grpc.beta import interfaces from grpc_test.framework.common import test_constants -_MAPPING_FUNCTION = lambda integer: integer * 200 + 17 -_MAPPING = { - state: _MAPPING_FUNCTION(state) for state in _types.ConnectivityState} -_IDLE, _CONNECTING, _READY, _TRANSIENT_FAILURE, _FATAL_FAILURE = map( - _MAPPING_FUNCTION, _types.ConnectivityState) - def _drive_completion_queue(completion_queue): while True: @@ -84,7 +79,7 @@ class ChannelConnectivityTest(unittest.TestCase): callback = _Callback() connectivity_channel = _connectivity_channel.ConnectivityChannel( - low_channel, _MAPPING) + low_channel) connectivity_channel.subscribe(callback.update, try_to_connect=False) first_connectivities = callback.block_until_connectivities_satisfy(bool) connectivity_channel.subscribe(callback.update, try_to_connect=True) @@ -98,11 +93,16 @@ class ChannelConnectivityTest(unittest.TestCase): connectivity_channel.unsubscribe(callback.update) fifth_connectivities = callback.connectivities() - self.assertSequenceEqual((_IDLE,), first_connectivities) - self.assertNotIn(_READY, second_connectivities) - self.assertNotIn(_READY, third_connectivities) - self.assertNotIn(_READY, fourth_connectivities) - self.assertNotIn(_READY, fifth_connectivities) + self.assertSequenceEqual( + (interfaces.ChannelConnectivity.IDLE,), first_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.READY, second_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.READY, third_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.READY, fourth_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.READY, fifth_connectivities) def test_immediately_connectable_channel_connectivity(self): server_completion_queue = _low.CompletionQueue() @@ -117,7 +117,7 @@ class ChannelConnectivityTest(unittest.TestCase): second_callback = _Callback() connectivity_channel = _connectivity_channel.ConnectivityChannel( - low_channel, _MAPPING) + low_channel) connectivity_channel.subscribe(first_callback.update, try_to_connect=False) first_connectivities = first_callback.block_until_connectivities_satisfy( bool) @@ -132,9 +132,11 @@ class ChannelConnectivityTest(unittest.TestCase): bool) # Wait for a connection that will happen (or may already have happened). first_callback.block_until_connectivities_satisfy( - lambda connectivities: _READY in connectivities) + lambda connectivities: + interfaces.ChannelConnectivity.READY in connectivities) second_callback.block_until_connectivities_satisfy( - lambda connectivities: _READY in connectivities) + lambda connectivities: + interfaces.ChannelConnectivity.READY in connectivities) connectivity_channel.unsubscribe(first_callback.update) connectivity_channel.unsubscribe(second_callback.update) @@ -142,12 +144,19 @@ class ChannelConnectivityTest(unittest.TestCase): server_completion_queue.shutdown() server_completion_queue_thread.join() - self.assertSequenceEqual((_IDLE,), first_connectivities) - self.assertSequenceEqual((_IDLE,), second_connectivities) - self.assertNotIn(_TRANSIENT_FAILURE, third_connectivities) - self.assertNotIn(_FATAL_FAILURE, third_connectivities) - self.assertNotIn(_TRANSIENT_FAILURE, fourth_connectivities) - self.assertNotIn(_FATAL_FAILURE, fourth_connectivities) + self.assertSequenceEqual( + (interfaces.ChannelConnectivity.IDLE,), first_connectivities) + self.assertSequenceEqual( + (interfaces.ChannelConnectivity.IDLE,), second_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.FATAL_FAILURE, third_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.TRANSIENT_FAILURE, + fourth_connectivities) + self.assertNotIn( + interfaces.ChannelConnectivity.FATAL_FAILURE, fourth_connectivities) def test_reachable_then_unreachable_channel_connectivity(self): server_completion_queue = _low.CompletionQueue() @@ -161,14 +170,16 @@ class ChannelConnectivityTest(unittest.TestCase): callback = _Callback() connectivity_channel = _connectivity_channel.ConnectivityChannel( - low_channel, _MAPPING) + low_channel) connectivity_channel.subscribe(callback.update, try_to_connect=True) callback.block_until_connectivities_satisfy( - lambda connectivities: _READY in connectivities) + lambda connectivities: + interfaces.ChannelConnectivity.READY in connectivities) # Now take down the server and confirm that channel readiness is repudiated. server.shutdown() callback.block_until_connectivities_satisfy( - lambda connectivities: connectivities[-1] is not _READY) + lambda connectivities: + connectivities[-1] is not interfaces.ChannelConnectivity.READY) connectivity_channel.unsubscribe(callback.update) server.shutdown() diff --git a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py index e9087a7949..aa33e1e6f8 100644 --- a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py +++ b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py @@ -32,7 +32,7 @@ import collections import unittest -from grpc.beta import beta +from grpc.beta import implementations from grpc.beta import interfaces from grpc_test import resources from grpc_test import test_common as grpc_test_common @@ -81,25 +81,26 @@ class _Implementation(test_interfaces.Implementation): method: method_object.cardinality() for (group, method), method_object in methods.iteritems()} - server_options = beta.server_options( + server_options = implementations.server_options( request_deserializers=serialization_behaviors.request_deserializers, response_serializers=serialization_behaviors.response_serializers, thread_pool_size=test_constants.POOL_SIZE) - server = beta.server(method_implementations, options=server_options) - server_credentials = beta.ssl_server_credentials( + server = implementations.server( + method_implementations, options=server_options) + server_credentials = implementations.ssl_server_credentials( [(resources.private_key(), resources.certificate_chain(),),]) port = server.add_secure_port('[::]:0', server_credentials) server.start() - client_credentials = beta.ssl_client_credentials( + client_credentials = implementations.ssl_client_credentials( resources.test_root_certificates(), None, None) - channel = test_utilities.create_not_really_secure_channel( + channel = test_utilities.not_really_secure_channel( 'localhost', port, client_credentials, _SERVER_HOST_OVERRIDE) - stub_options = beta.stub_options( + stub_options = implementations.stub_options( request_serializers=serialization_behaviors.request_serializers, response_deserializers=serialization_behaviors.response_deserializers, thread_pool_size=test_constants.POOL_SIZE) - generic_stub = beta.generic_stub(channel, options=stub_options) - dynamic_stub = beta.dynamic_stub( + generic_stub = implementations.generic_stub(channel, options=stub_options) + dynamic_stub = implementations.dynamic_stub( channel, service, cardinalities, options=stub_options) return generic_stub, {service: dynamic_stub}, server diff --git a/src/python/grpcio_test/grpc_test/beta/_not_found_test.py b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py index ecd10f2175..5feb997fef 100644 --- a/src/python/grpcio_test/grpc_test/beta/_not_found_test.py +++ b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py @@ -31,7 +31,7 @@ import unittest -from grpc.beta import beta +from grpc.beta import implementations from grpc.beta import interfaces from grpc.framework.interfaces.face import face from grpc_test.framework.common import test_constants @@ -40,10 +40,10 @@ from grpc_test.framework.common import test_constants class NotFoundTest(unittest.TestCase): def setUp(self): - self._server = beta.server({}) + self._server = implementations.server({}) port = self._server.add_insecure_port('[::]:0') - channel = beta.create_insecure_channel('localhost', port) - self._generic_stub = beta.generic_stub(channel) + channel = implementations.insecure_channel('localhost', port) + self._generic_stub = implementations.generic_stub(channel) self._server.start() def tearDown(self): diff --git a/src/python/grpcio_test/grpc_test/beta/_utilities_test.py b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py index 998e74ccf4..996cea9118 100644 --- a/src/python/grpcio_test/grpc_test/beta/_utilities_test.py +++ b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py @@ -35,7 +35,7 @@ import unittest from grpc._adapter import _low from grpc._adapter import _types -from grpc.beta import beta +from grpc.beta import implementations from grpc.beta import utilities from grpc.framework.foundation import future from grpc_test.framework.common import test_constants @@ -69,7 +69,7 @@ class _Callback(object): class ChannelConnectivityTest(unittest.TestCase): def test_lonely_channel_connectivity(self): - channel = beta.create_insecure_channel('localhost', 12345) + channel = implementations.insecure_channel('localhost', 12345) callback = _Callback() ready_future = utilities.channel_ready_future(channel) @@ -94,7 +94,7 @@ class ChannelConnectivityTest(unittest.TestCase): server_completion_queue_thread = threading.Thread( target=_drive_completion_queue, args=(server_completion_queue,)) server_completion_queue_thread.start() - channel = beta.create_insecure_channel('localhost', port) + channel = implementations.insecure_channel('localhost', port) callback = _Callback() try: diff --git a/src/python/grpcio_test/grpc_test/beta/test_utilities.py b/src/python/grpcio_test/grpc_test/beta/test_utilities.py index 338670478d..24a8600e12 100644 --- a/src/python/grpcio_test/grpc_test/beta/test_utilities.py +++ b/src/python/grpcio_test/grpc_test/beta/test_utilities.py @@ -30,25 +30,27 @@ """Test-appropriate entry points into the gRPC Python Beta API.""" from grpc._adapter import _intermediary_low -from grpc.beta import beta +from grpc.beta import implementations -def create_not_really_secure_channel( +def not_really_secure_channel( host, port, client_credentials, server_host_override): """Creates an insecure Channel to a remote host. Args: host: The name of the remote host to which to connect. port: The port of the remote host to which to connect. - client_credentials: The beta.ClientCredentials with which to connect. + client_credentials: The implementations.ClientCredentials with which to + connect. server_host_override: The target name used for SSL host name checking. Returns: - A beta.Channel to the remote host through which RPCs may be conducted. + An implementations.Channel to the remote host through which RPCs may be + conducted. """ hostport = '%s:%d' % (host, port) intermediary_low_channel = _intermediary_low.Channel( hostport, client_credentials._intermediary_low_credentials, server_host_override=server_host_override) - return beta.Channel( + return implementations.Channel( intermediary_low_channel._internal, intermediary_low_channel) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py index 5065a3f38a..ddda1018c3 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py @@ -119,6 +119,17 @@ class _Operator(base.Operator): 'Deliberately raised exception from Operator.advance (in a test)!') +class _ProtocolReceiver(base.ProtocolReceiver): + + def __init__(self): + self._condition = threading.Condition() + self._contexts = [] + + def context(self, protocol_context): + with self._condition: + self._contexts.append(protocol_context) + + class _Servicer(base.Servicer): """A base.Servicer with instrumented for testing.""" @@ -144,7 +155,7 @@ class _Servicer(base.Servicer): controller.service_on_termination) if outcome is not None: controller.service_on_termination(outcome) - return utilities.full_subscription(operator) + return utilities.full_subscription(operator, _ProtocolReceiver()) class _OperationTest(unittest.TestCase): @@ -169,7 +180,8 @@ class _OperationTest(unittest.TestCase): test_operator = _Operator( self._controller, self._controller.on_invocation_advance, self._pool, None) - subscription = utilities.full_subscription(test_operator) + subscription = utilities.full_subscription( + test_operator, _ProtocolReceiver()) else: # TODO(nathaniel): support and test other subscription kinds. self.fail('Non-full subscriptions not yet supported!') |