diff options
8 files changed, 273 insertions, 15 deletions
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 41e9163cd6..e8c6a99cb1 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -32,6 +32,7 @@ import sys import threading import time +import logging import grpc from grpc import _common @@ -197,7 +198,16 @@ def _consume_request_iterator( event_handler = _event_handler(state, call, None) def consume_request_iterator(): - for request in request_iterator: + while True: + try: + request = next(request_iterator) + except StopIteration: + break + except Exception as e: + logging.exception("Exception iterating requests!") + call.cancel() + _abort(state, grpc.StatusCode.UNKNOWN, "Exception iterating requests!") + return serialized_request = _common.serialize(request, request_serializer) with state.condition: if state.code is None and not state.cancelled: diff --git a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py index c7bfeaeb95..43d6c971b5 100644 --- a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py +++ b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py @@ -75,7 +75,7 @@ class ReflectionServicerTest(unittest.TestCase): file_by_filename='i-donut-exist' ), ) - responses = tuple(self._stub.ServerReflectionInfo(requests)) + responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = ( reflection_pb2.ServerReflectionResponse( valid_host='', @@ -93,7 +93,7 @@ class ReflectionServicerTest(unittest.TestCase): ) ), ) - self.assertEqual(expected_responses, responses) + self.assertSequenceEqual(expected_responses, responses) def testFileBySymbol(self): requests = ( @@ -104,7 +104,7 @@ class ReflectionServicerTest(unittest.TestCase): file_containing_symbol='i.donut.exist.co.uk.org.net.me.name.foo' ), ) - responses = tuple(self._stub.ServerReflectionInfo(requests)) + responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = ( reflection_pb2.ServerReflectionResponse( valid_host='', @@ -122,7 +122,7 @@ class ReflectionServicerTest(unittest.TestCase): ) ), ) - self.assertEqual(expected_responses, responses) + self.assertSequenceEqual(expected_responses, responses) @unittest.skip('TODO(atash): implement file-containing-extension reflection ' '(see https://github.com/google/protobuf/issues/2248)') @@ -141,7 +141,7 @@ class ReflectionServicerTest(unittest.TestCase): ), ), ) - responses = tuple(self._stub.ServerReflectionInfo(requests)) + responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = ( reflection_pb2.ServerReflectionResponse( valid_host='', @@ -159,7 +159,7 @@ class ReflectionServicerTest(unittest.TestCase): ) ), ) - self.assertEqual(expected_responses, responses) + self.assertSequenceEqual(expected_responses, responses) def testListServices(self): requests = ( @@ -167,7 +167,7 @@ class ReflectionServicerTest(unittest.TestCase): list_services='', ), ) - responses = tuple(self._stub.ServerReflectionInfo(requests)) + responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = ( reflection_pb2.ServerReflectionResponse( valid_host='', @@ -179,7 +179,7 @@ class ReflectionServicerTest(unittest.TestCase): ) ), ) - self.assertEqual(expected_responses, responses) + self.assertSequenceEqual(expected_responses, responses) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 0109ee2173..70d965d3ca 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -28,6 +28,7 @@ "unit._empty_message_test.EmptyMessageTest", "unit._exit_test.ExitTest", "unit._invalid_metadata_test.InvalidMetadataTest", + "unit._invocation_defects_test.InvocationDefectsTest", "unit._metadata_code_details_test.MetadataCodeDetailsTest", "unit._metadata_test.MetadataTest", "unit._rpc_test.RPCTest", diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index 83b9109466..4d3f02e917 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -125,7 +125,7 @@ class CompressionTest(unittest.TestCase): compressed_channel = grpc.insecure_channel('localhost:%d' % self._port, options=[('grpc.default_compression_algorithm', 1)]) multi_callable = compressed_channel.stream_stream(_STREAM_STREAM) - call = multi_callable([request] * test_constants.STREAM_LENGTH) + call = multi_callable(iter([request] * test_constants.STREAM_LENGTH)) for response in call: self.assertEqual(request, response) diff --git a/src/python/grpcio_tests/tests/unit/_empty_message_test.py b/src/python/grpcio_tests/tests/unit/_empty_message_test.py index 131f6e9452..69f4689279 100644 --- a/src/python/grpcio_tests/tests/unit/_empty_message_test.py +++ b/src/python/grpcio_tests/tests/unit/_empty_message_test.py @@ -123,12 +123,12 @@ class EmptyMessageTest(unittest.TestCase): def testStreamUnary(self): response = self._channel.stream_unary(_STREAM_UNARY)( - [_REQUEST] * test_constants.STREAM_LENGTH) + iter([_REQUEST] * test_constants.STREAM_LENGTH)) self.assertEqual(_RESPONSE, response) def testStreamStream(self): response_iterator = self._channel.stream_stream(_STREAM_STREAM)( - [_REQUEST] * test_constants.STREAM_LENGTH) + iter([_REQUEST] * test_constants.STREAM_LENGTH)) self.assertSequenceEqual( [_RESPONSE] * test_constants.STREAM_LENGTH, list(response_iterator)) diff --git a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py index b33802bf57..777527137f 100644 --- a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py +++ b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py @@ -240,7 +240,7 @@ if __name__ == '__main__': multi_callable = channel.stream_unary(method) future = multi_callable.future(infinite_request_iterator()) result, call = multi_callable.with_call( - [REQUEST] * test_constants.STREAM_LENGTH) + iter([REQUEST] * test_constants.STREAM_LENGTH)) elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL): multi_callable = channel.stream_stream(method) diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py new file mode 100644 index 0000000000..4312679bb9 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -0,0 +1,247 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import itertools +import threading +import unittest +from concurrent import futures + +import grpc +from grpc.framework.foundation import logging_pool + +from tests.unit.framework.common import test_constants +from tests.unit.framework.common import test_control + +_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2 +_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:] +_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3 +_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3] + +_UNARY_UNARY = '/test/UnaryUnary' +_UNARY_STREAM = '/test/UnaryStream' +_STREAM_UNARY = '/test/StreamUnary' +_STREAM_STREAM = '/test/StreamStream' + + +class _Callback(object): + def __init__(self): + self._condition = threading.Condition() + self._value = None + self._called = False + + def __call__(self, value): + with self._condition: + self._value = value + self._called = True + self._condition.notify_all() + + def value(self): + with self._condition: + while not self._called: + self._condition.wait() + return self._value + + +class _Handler(object): + def __init__(self, control): + self._control = control + + def handle_unary_unary(self, request, servicer_context): + self._control.control() + if servicer_context is not None: + servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + return request + + def handle_unary_stream(self, request, servicer_context): + for _ in range(test_constants.STREAM_LENGTH): + self._control.control() + yield request + self._control.control() + if servicer_context is not None: + servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + + def handle_stream_unary(self, request_iterator, servicer_context): + if servicer_context is not None: + servicer_context.invocation_metadata() + self._control.control() + response_elements = [] + for request in request_iterator: + self._control.control() + response_elements.append(request) + self._control.control() + if servicer_context is not None: + servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + return b''.join(response_elements) + + def handle_stream_stream(self, request_iterator, servicer_context): + self._control.control() + if servicer_context is not None: + servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + for request in request_iterator: + self._control.control() + yield request + self._control.control() + + +class _MethodHandler(grpc.RpcMethodHandler): + def __init__( + self, request_streaming, response_streaming, request_deserializer, + response_serializer, unary_unary, unary_stream, stream_unary, + stream_stream): + self.request_streaming = request_streaming + self.response_streaming = response_streaming + self.request_deserializer = request_deserializer + self.response_serializer = response_serializer + self.unary_unary = unary_unary + self.unary_stream = unary_stream + self.stream_unary = stream_unary + self.stream_stream = stream_stream + + +class _GenericHandler(grpc.GenericRpcHandler): + def __init__(self, handler): + self._handler = handler + + def service(self, handler_call_details): + if handler_call_details.method == _UNARY_UNARY: + return _MethodHandler( + False, False, None, None, self._handler.handle_unary_unary, None, + None, None) + elif handler_call_details.method == _UNARY_STREAM: + return _MethodHandler( + False, True, _DESERIALIZE_REQUEST, _SERIALIZE_RESPONSE, None, + self._handler.handle_unary_stream, None, None) + elif handler_call_details.method == _STREAM_UNARY: + return _MethodHandler( + True, False, _DESERIALIZE_REQUEST, _SERIALIZE_RESPONSE, None, None, + self._handler.handle_stream_unary, None) + elif handler_call_details.method == _STREAM_STREAM: + return _MethodHandler( + True, True, None, None, None, None, None, + self._handler.handle_stream_stream) + else: + return None + + +class FailAfterFewIterationsCounter(object): + def __init__(self, high, bytestring): + self._current = 0 + self._high = high + self._bytestring = bytestring + + def __iter__(self): + return self + + def __next__(self): + if self._current >= self._high: + raise Exception("This is a deliberate failure in a unit test.") + else: + self._current += 1 + return self._bytestring + + +def _unary_unary_multi_callable(channel): + return channel.unary_unary(_UNARY_UNARY) + + +def _unary_stream_multi_callable(channel): + return channel.unary_stream( + _UNARY_STREAM, + request_serializer=_SERIALIZE_REQUEST, + response_deserializer=_DESERIALIZE_RESPONSE) + + +def _stream_unary_multi_callable(channel): + return channel.stream_unary( + _STREAM_UNARY, + request_serializer=_SERIALIZE_REQUEST, + response_deserializer=_DESERIALIZE_RESPONSE) + + +def _stream_stream_multi_callable(channel): + return channel.stream_stream(_STREAM_STREAM) + + +class InvocationDefectsTest(unittest.TestCase): + def setUp(self): + self._control = test_control.PauseFailControl() + self._handler = _Handler(self._control) + self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + + self._server = grpc.server(self._server_pool) + port = self._server.add_insecure_port('[::]:0') + self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) + self._server.start() + + self._channel = grpc.insecure_channel('localhost:%d' % port) + + def tearDown(self): + self._server.stop(0) + + def testIterableStreamRequestBlockingUnaryResponse(self): + requests = [b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)] + multi_callable = _stream_unary_multi_callable(self._channel) + + with self.assertRaises(grpc.RpcError): + response = multi_callable( + requests, + metadata=(('test', 'IterableStreamRequestBlockingUnaryResponse'),)) + + def testIterableStreamRequestFutureUnaryResponse(self): + requests = [b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)] + multi_callable = _stream_unary_multi_callable(self._channel) + response_future = multi_callable.future( + requests, + metadata=( + ('test', 'IterableStreamRequestFutureUnaryResponse'),)) + + with self.assertRaises(grpc.RpcError): + response = response_future.result() + + def testIterableStreamRequestStreamResponse(self): + requests = [b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH)] + multi_callable = _stream_stream_multi_callable(self._channel) + response_iterator = multi_callable( + requests, + metadata=(('test', 'IterableStreamRequestStreamResponse'),)) + + with self.assertRaises(grpc.RpcError): + next(response_iterator) + + def testIteratorStreamRequestStreamResponse(self): + requests_iterator = FailAfterFewIterationsCounter( + test_constants.STREAM_LENGTH // 2, b'\x07\x08') + multi_callable = _stream_stream_multi_callable(self._channel) + response_iterator = multi_callable( + requests_iterator, + metadata=(('test', 'IteratorStreamRequestStreamResponse'),)) + + with self.assertRaises(grpc.RpcError): + for _ in range(test_constants.STREAM_LENGTH // 2 + 1): + next(response_iterator) diff --git a/src/python/grpcio_tests/tests/unit/_metadata_test.py b/src/python/grpcio_tests/tests/unit/_metadata_test.py index da73476929..caba53ffcc 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_test.py @@ -193,7 +193,7 @@ class MetadataTest(unittest.TestCase): def testStreamUnary(self): multi_callable = self._channel.stream_unary(_STREAM_UNARY) unused_response, call = multi_callable.with_call( - [_REQUEST] * test_constants.STREAM_LENGTH, + iter([_REQUEST] * test_constants.STREAM_LENGTH), metadata=_CLIENT_METADATA) self.assertTrue(test_common.metadata_transmitted( _SERVER_INITIAL_METADATA, call.initial_metadata())) @@ -202,7 +202,7 @@ class MetadataTest(unittest.TestCase): def testStreamStream(self): multi_callable = self._channel.stream_stream(_STREAM_STREAM) - call = multi_callable([_REQUEST] * test_constants.STREAM_LENGTH, + call = multi_callable(iter([_REQUEST] * test_constants.STREAM_LENGTH), metadata=_CLIENT_METADATA) self.assertTrue(test_common.metadata_transmitted( _SERVER_INITIAL_METADATA, call.initial_metadata())) |