diff options
author | Masood Malekghassemi <atash@google.com> | 2017-01-13 19:20:10 -0800 |
---|---|---|
committer | Masood Malekghassemi <atash@google.com> | 2017-01-17 10:55:33 -0800 |
commit | cc793703bfba6f661f523b6fec82ff8a913e1759 (patch) | |
tree | f3cb0c7330565e9ed9947a07c6423f81e5c00f72 /src/python/grpcio_tests/tests/unit/beta | |
parent | 06dea573daa2175b244a430bb89b49bb5c8e8c5b (diff) |
Run Python formatting
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/beta')
8 files changed, 473 insertions, 464 deletions
diff --git a/src/python/grpcio_tests/tests/unit/beta/__init__.py b/src/python/grpcio_tests/tests/unit/beta/__init__.py index 7086519106..b89398809f 100644 --- a/src/python/grpcio_tests/tests/unit/beta/__init__.py +++ b/src/python/grpcio_tests/tests/unit/beta/__init__.py @@ -26,5 +26,3 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - diff --git a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py index 3a9701b8eb..b5fdac26c1 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Tests Face interface compliance of the gRPC Python Beta API.""" import threading @@ -57,290 +56,303 @@ _RESPONSE = b'123' class _Servicer(object): - def __init__(self): - self._condition = threading.Condition() - self._peer = None - self._serviced = False - - def unary_unary(self, request, context): - with self._condition: - self._request = request - self._peer = context.protocol_context().peer() - self._invocation_metadata = context.invocation_metadata() - context.protocol_context().disable_next_response_compression() - self._serviced = True - self._condition.notify_all() - return _RESPONSE - - def unary_stream(self, request, context): - with self._condition: - self._request = request - self._peer = context.protocol_context().peer() - self._invocation_metadata = context.invocation_metadata() - context.protocol_context().disable_next_response_compression() - self._serviced = True - self._condition.notify_all() - return - yield - - def stream_unary(self, request_iterator, context): - for request in request_iterator: - self._request = request - with self._condition: - self._peer = context.protocol_context().peer() - self._invocation_metadata = context.invocation_metadata() - context.protocol_context().disable_next_response_compression() - self._serviced = True - self._condition.notify_all() - return _RESPONSE - - def stream_stream(self, request_iterator, context): - for request in request_iterator: - with self._condition: - self._peer = context.protocol_context().peer() - context.protocol_context().disable_next_response_compression() - yield _RESPONSE - with self._condition: - self._invocation_metadata = context.invocation_metadata() - self._serviced = True - self._condition.notify_all() - - def peer(self): - with self._condition: - return self._peer - - def block_until_serviced(self): - with self._condition: - while not self._serviced: - self._condition.wait() + def __init__(self): + self._condition = threading.Condition() + self._peer = None + self._serviced = False + + def unary_unary(self, request, context): + with self._condition: + self._request = request + self._peer = context.protocol_context().peer() + self._invocation_metadata = context.invocation_metadata() + context.protocol_context().disable_next_response_compression() + self._serviced = True + self._condition.notify_all() + return _RESPONSE + + def unary_stream(self, request, context): + with self._condition: + self._request = request + self._peer = context.protocol_context().peer() + self._invocation_metadata = context.invocation_metadata() + context.protocol_context().disable_next_response_compression() + self._serviced = True + self._condition.notify_all() + return + yield + + def stream_unary(self, request_iterator, context): + for request in request_iterator: + self._request = request + with self._condition: + self._peer = context.protocol_context().peer() + self._invocation_metadata = context.invocation_metadata() + context.protocol_context().disable_next_response_compression() + self._serviced = True + self._condition.notify_all() + return _RESPONSE + + def stream_stream(self, request_iterator, context): + for request in request_iterator: + with self._condition: + self._peer = context.protocol_context().peer() + context.protocol_context().disable_next_response_compression() + yield _RESPONSE + with self._condition: + self._invocation_metadata = context.invocation_metadata() + self._serviced = True + self._condition.notify_all() + + def peer(self): + with self._condition: + return self._peer + + def block_until_serviced(self): + with self._condition: + while not self._serviced: + self._condition.wait() class _BlockingIterator(object): - def __init__(self, upstream): - self._condition = threading.Condition() - self._upstream = upstream - self._allowed = [] + def __init__(self, upstream): + self._condition = threading.Condition() + self._upstream = upstream + self._allowed = [] - def __iter__(self): - return self + def __iter__(self): + return self - def __next__(self): - return self.next() + def __next__(self): + return self.next() - def next(self): - with self._condition: - while True: - if self._allowed is None: - raise StopIteration() - elif self._allowed: - return self._allowed.pop(0) - else: - self._condition.wait() + def next(self): + with self._condition: + while True: + if self._allowed is None: + raise StopIteration() + elif self._allowed: + return self._allowed.pop(0) + else: + self._condition.wait() - def allow(self): - with self._condition: - try: - self._allowed.append(next(self._upstream)) - except StopIteration: - self._allowed = None - self._condition.notify_all() + def allow(self): + with self._condition: + try: + self._allowed.append(next(self._upstream)) + except StopIteration: + self._allowed = None + self._condition.notify_all() def _metadata_plugin(context, callback): - callback([(_PER_RPC_CREDENTIALS_METADATA_KEY, - _PER_RPC_CREDENTIALS_METADATA_VALUE)], None) + callback([(_PER_RPC_CREDENTIALS_METADATA_KEY, + _PER_RPC_CREDENTIALS_METADATA_VALUE)], None) class BetaFeaturesTest(unittest.TestCase): - def setUp(self): - self._servicer = _Servicer() - method_implementations = { - (_GROUP, _UNARY_UNARY): + def setUp(self): + self._servicer = _Servicer() + method_implementations = { + (_GROUP, _UNARY_UNARY): utilities.unary_unary_inline(self._servicer.unary_unary), - (_GROUP, _UNARY_STREAM): + (_GROUP, _UNARY_STREAM): utilities.unary_stream_inline(self._servicer.unary_stream), - (_GROUP, _STREAM_UNARY): + (_GROUP, _STREAM_UNARY): utilities.stream_unary_inline(self._servicer.stream_unary), - (_GROUP, _STREAM_STREAM): + (_GROUP, _STREAM_STREAM): utilities.stream_stream_inline(self._servicer.stream_stream), - } - - cardinalities = { - _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY, - _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM, - _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY, - _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM, - } - - server_options = implementations.server_options( - thread_pool_size=test_constants.POOL_SIZE) - self._server = implementations.server( - method_implementations, options=server_options) - server_credentials = implementations.ssl_server_credentials( - [(resources.private_key(), resources.certificate_chain(),),]) - port = self._server.add_secure_port('[::]:0', server_credentials) - self._server.start() - self._channel_credentials = implementations.ssl_channel_credentials( - resources.test_root_certificates()) - self._call_credentials = implementations.metadata_call_credentials( - _metadata_plugin) - channel = test_utilities.not_really_secure_channel( - 'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE) - stub_options = implementations.stub_options( - thread_pool_size=test_constants.POOL_SIZE) - self._dynamic_stub = implementations.dynamic_stub( - channel, _GROUP, cardinalities, options=stub_options) - - def tearDown(self): - self._dynamic_stub = None - self._server.stop(test_constants.SHORT_TIMEOUT).wait() - - def test_unary_unary(self): - call_options = interfaces.grpc_call_options( - disable_compression=True, credentials=self._call_credentials) - response = getattr(self._dynamic_stub, _UNARY_UNARY)( - _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options) - self.assertEqual(_RESPONSE, response) - self.assertIsNotNone(self._servicer.peer()) - invocation_metadata = [(metadatum.key, metadatum.value) for metadatum in - self._servicer._invocation_metadata] - self.assertIn( - (_PER_RPC_CREDENTIALS_METADATA_KEY, - _PER_RPC_CREDENTIALS_METADATA_VALUE), - invocation_metadata) - - def test_unary_stream(self): - call_options = interfaces.grpc_call_options( - disable_compression=True, credentials=self._call_credentials) - response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)( - _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options) - self._servicer.block_until_serviced() - self.assertIsNotNone(self._servicer.peer()) - invocation_metadata = [(metadatum.key, metadatum.value) for metadatum in - self._servicer._invocation_metadata] - self.assertIn( - (_PER_RPC_CREDENTIALS_METADATA_KEY, - _PER_RPC_CREDENTIALS_METADATA_VALUE), - invocation_metadata) - - def test_stream_unary(self): - call_options = interfaces.grpc_call_options( - credentials=self._call_credentials) - request_iterator = _BlockingIterator(iter((_REQUEST,))) - response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future( - request_iterator, test_constants.LONG_TIMEOUT, - protocol_options=call_options) - response_future.protocol_context().disable_next_request_compression() - request_iterator.allow() - response_future.protocol_context().disable_next_request_compression() - request_iterator.allow() - self._servicer.block_until_serviced() - self.assertIsNotNone(self._servicer.peer()) - self.assertEqual(_RESPONSE, response_future.result()) - invocation_metadata = [(metadatum.key, metadatum.value) for metadatum in - self._servicer._invocation_metadata] - self.assertIn( - (_PER_RPC_CREDENTIALS_METADATA_KEY, - _PER_RPC_CREDENTIALS_METADATA_VALUE), - invocation_metadata) - - def test_stream_stream(self): - call_options = interfaces.grpc_call_options( - credentials=self._call_credentials) - request_iterator = _BlockingIterator(iter((_REQUEST,))) - response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)( - request_iterator, test_constants.SHORT_TIMEOUT, - protocol_options=call_options) - response_iterator.protocol_context().disable_next_request_compression() - request_iterator.allow() - response = next(response_iterator) - response_iterator.protocol_context().disable_next_request_compression() - request_iterator.allow() - self._servicer.block_until_serviced() - self.assertIsNotNone(self._servicer.peer()) - self.assertEqual(_RESPONSE, response) - invocation_metadata = [(metadatum.key, metadatum.value) for metadatum in - self._servicer._invocation_metadata] - self.assertIn( - (_PER_RPC_CREDENTIALS_METADATA_KEY, - _PER_RPC_CREDENTIALS_METADATA_VALUE), - invocation_metadata) + } + + cardinalities = { + _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY, + _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM, + _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY, + _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM, + } + + server_options = implementations.server_options( + thread_pool_size=test_constants.POOL_SIZE) + self._server = implementations.server( + method_implementations, options=server_options) + server_credentials = implementations.ssl_server_credentials([( + resources.private_key(), + resources.certificate_chain(),),]) + port = self._server.add_secure_port('[::]:0', server_credentials) + self._server.start() + self._channel_credentials = implementations.ssl_channel_credentials( + resources.test_root_certificates()) + self._call_credentials = implementations.metadata_call_credentials( + _metadata_plugin) + channel = test_utilities.not_really_secure_channel( + 'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE) + stub_options = implementations.stub_options( + thread_pool_size=test_constants.POOL_SIZE) + self._dynamic_stub = implementations.dynamic_stub( + channel, _GROUP, cardinalities, options=stub_options) + + def tearDown(self): + self._dynamic_stub = None + self._server.stop(test_constants.SHORT_TIMEOUT).wait() + + def test_unary_unary(self): + call_options = interfaces.grpc_call_options( + disable_compression=True, credentials=self._call_credentials) + response = getattr(self._dynamic_stub, _UNARY_UNARY)( + _REQUEST, + test_constants.LONG_TIMEOUT, + protocol_options=call_options) + self.assertEqual(_RESPONSE, response) + self.assertIsNotNone(self._servicer.peer()) + invocation_metadata = [ + (metadatum.key, metadatum.value) + for metadatum in self._servicer._invocation_metadata + ] + self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY, + _PER_RPC_CREDENTIALS_METADATA_VALUE), + invocation_metadata) + + def test_unary_stream(self): + call_options = interfaces.grpc_call_options( + disable_compression=True, credentials=self._call_credentials) + response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)( + _REQUEST, + test_constants.LONG_TIMEOUT, + protocol_options=call_options) + self._servicer.block_until_serviced() + self.assertIsNotNone(self._servicer.peer()) + invocation_metadata = [ + (metadatum.key, metadatum.value) + for metadatum in self._servicer._invocation_metadata + ] + self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY, + _PER_RPC_CREDENTIALS_METADATA_VALUE), + invocation_metadata) + + def test_stream_unary(self): + call_options = interfaces.grpc_call_options( + credentials=self._call_credentials) + request_iterator = _BlockingIterator(iter((_REQUEST,))) + response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future( + request_iterator, + test_constants.LONG_TIMEOUT, + protocol_options=call_options) + response_future.protocol_context().disable_next_request_compression() + request_iterator.allow() + response_future.protocol_context().disable_next_request_compression() + request_iterator.allow() + self._servicer.block_until_serviced() + self.assertIsNotNone(self._servicer.peer()) + self.assertEqual(_RESPONSE, response_future.result()) + invocation_metadata = [ + (metadatum.key, metadatum.value) + for metadatum in self._servicer._invocation_metadata + ] + self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY, + _PER_RPC_CREDENTIALS_METADATA_VALUE), + invocation_metadata) + + def test_stream_stream(self): + call_options = interfaces.grpc_call_options( + credentials=self._call_credentials) + request_iterator = _BlockingIterator(iter((_REQUEST,))) + response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)( + request_iterator, + test_constants.SHORT_TIMEOUT, + protocol_options=call_options) + response_iterator.protocol_context().disable_next_request_compression() + request_iterator.allow() + response = next(response_iterator) + response_iterator.protocol_context().disable_next_request_compression() + request_iterator.allow() + self._servicer.block_until_serviced() + self.assertIsNotNone(self._servicer.peer()) + self.assertEqual(_RESPONSE, response) + invocation_metadata = [ + (metadatum.key, metadatum.value) + for metadatum in self._servicer._invocation_metadata + ] + self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY, + _PER_RPC_CREDENTIALS_METADATA_VALUE), + invocation_metadata) class ContextManagementAndLifecycleTest(unittest.TestCase): - def setUp(self): - self._servicer = _Servicer() - self._method_implementations = { - (_GROUP, _UNARY_UNARY): + def setUp(self): + self._servicer = _Servicer() + self._method_implementations = { + (_GROUP, _UNARY_UNARY): utilities.unary_unary_inline(self._servicer.unary_unary), - (_GROUP, _UNARY_STREAM): + (_GROUP, _UNARY_STREAM): utilities.unary_stream_inline(self._servicer.unary_stream), - (_GROUP, _STREAM_UNARY): + (_GROUP, _STREAM_UNARY): utilities.stream_unary_inline(self._servicer.stream_unary), - (_GROUP, _STREAM_STREAM): + (_GROUP, _STREAM_STREAM): utilities.stream_stream_inline(self._servicer.stream_stream), - } - - self._cardinalities = { - _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY, - _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM, - _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY, - _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM, - } - - self._server_options = implementations.server_options( - thread_pool_size=test_constants.POOL_SIZE) - self._server_credentials = implementations.ssl_server_credentials( - [(resources.private_key(), resources.certificate_chain(),),]) - self._channel_credentials = implementations.ssl_channel_credentials( - resources.test_root_certificates()) - self._stub_options = implementations.stub_options( - thread_pool_size=test_constants.POOL_SIZE) - - def test_stub_context(self): - server = implementations.server( - self._method_implementations, options=self._server_options) - port = server.add_secure_port('[::]:0', self._server_credentials) - server.start() - - channel = test_utilities.not_really_secure_channel( - 'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE) - dynamic_stub = implementations.dynamic_stub( - channel, _GROUP, self._cardinalities, options=self._stub_options) - for _ in range(100): - with dynamic_stub: - pass - for _ in range(10): - with dynamic_stub: - call_options = interfaces.grpc_call_options( - disable_compression=True) - response = getattr(dynamic_stub, _UNARY_UNARY)( - _REQUEST, test_constants.LONG_TIMEOUT, - protocol_options=call_options) - self.assertEqual(_RESPONSE, response) - self.assertIsNotNone(self._servicer.peer()) - - server.stop(test_constants.SHORT_TIMEOUT).wait() - - def test_server_lifecycle(self): - for _ in range(100): - server = implementations.server( - self._method_implementations, options=self._server_options) - port = server.add_secure_port('[::]:0', self._server_credentials) - server.start() - server.stop(test_constants.SHORT_TIMEOUT).wait() - for _ in range(100): - server = implementations.server( - self._method_implementations, options=self._server_options) - server.add_secure_port('[::]:0', self._server_credentials) - server.add_insecure_port('[::]:0') - with server: - server.stop(test_constants.SHORT_TIMEOUT) - server.stop(test_constants.SHORT_TIMEOUT) + } + + self._cardinalities = { + _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY, + _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM, + _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY, + _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM, + } + + self._server_options = implementations.server_options( + thread_pool_size=test_constants.POOL_SIZE) + self._server_credentials = implementations.ssl_server_credentials([( + resources.private_key(), + resources.certificate_chain(),),]) + self._channel_credentials = implementations.ssl_channel_credentials( + resources.test_root_certificates()) + self._stub_options = implementations.stub_options( + thread_pool_size=test_constants.POOL_SIZE) + + def test_stub_context(self): + server = implementations.server( + self._method_implementations, options=self._server_options) + port = server.add_secure_port('[::]:0', self._server_credentials) + server.start() + + channel = test_utilities.not_really_secure_channel( + 'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE) + dynamic_stub = implementations.dynamic_stub( + channel, _GROUP, self._cardinalities, options=self._stub_options) + for _ in range(100): + with dynamic_stub: + pass + for _ in range(10): + with dynamic_stub: + call_options = interfaces.grpc_call_options( + disable_compression=True) + response = getattr(dynamic_stub, _UNARY_UNARY)( + _REQUEST, + test_constants.LONG_TIMEOUT, + protocol_options=call_options) + self.assertEqual(_RESPONSE, response) + self.assertIsNotNone(self._servicer.peer()) + + server.stop(test_constants.SHORT_TIMEOUT).wait() + + def test_server_lifecycle(self): + for _ in range(100): + server = implementations.server( + self._method_implementations, options=self._server_options) + port = server.add_secure_port('[::]:0', self._server_credentials) + server.start() + server.stop(test_constants.SHORT_TIMEOUT).wait() + for _ in range(100): + server = implementations.server( + self._method_implementations, options=self._server_options) + server.add_secure_port('[::]:0', self._server_credentials) + server.add_insecure_port('[::]:0') + with server: + server.stop(test_constants.SHORT_TIMEOUT) + server.stop(test_constants.SHORT_TIMEOUT) if __name__ == '__main__': - unittest.main(verbosity=2) + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/beta/_connectivity_channel_test.py b/src/python/grpcio_tests/tests/unit/beta/_connectivity_channel_test.py index 5d826a269d..49d683b8a6 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_connectivity_channel_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_connectivity_channel_test.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Tests of grpc.beta._connectivity_channel.""" import unittest @@ -36,13 +35,13 @@ from grpc.beta import interfaces class ConnectivityStatesTest(unittest.TestCase): - def testBetaConnectivityStates(self): - self.assertIsNotNone(interfaces.ChannelConnectivity.IDLE) - self.assertIsNotNone(interfaces.ChannelConnectivity.CONNECTING) - self.assertIsNotNone(interfaces.ChannelConnectivity.READY) - self.assertIsNotNone(interfaces.ChannelConnectivity.TRANSIENT_FAILURE) - self.assertIsNotNone(interfaces.ChannelConnectivity.FATAL_FAILURE) + def testBetaConnectivityStates(self): + self.assertIsNotNone(interfaces.ChannelConnectivity.IDLE) + self.assertIsNotNone(interfaces.ChannelConnectivity.CONNECTING) + self.assertIsNotNone(interfaces.ChannelConnectivity.READY) + self.assertIsNotNone(interfaces.ChannelConnectivity.TRANSIENT_FAILURE) + self.assertIsNotNone(interfaces.ChannelConnectivity.FATAL_FAILURE) if __name__ == '__main__': - unittest.main(verbosity=2) + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py b/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py index 3a67516906..f421442624 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Tests Face interface compliance of the gRPC Python Beta API.""" import collections @@ -47,94 +46,97 @@ _SERVER_HOST_OVERRIDE = 'foo.test.google.fr' class _SerializationBehaviors( - collections.namedtuple( - '_SerializationBehaviors', - ('request_serializers', 'request_deserializers', 'response_serializers', - 'response_deserializers',))): - pass + collections.namedtuple('_SerializationBehaviors', ( + 'request_serializers', + 'request_deserializers', + 'response_serializers', + 'response_deserializers',))): + pass def _serialization_behaviors_from_test_methods(test_methods): - request_serializers = {} - request_deserializers = {} - response_serializers = {} - response_deserializers = {} - for (group, method), test_method in six.iteritems(test_methods): - request_serializers[group, method] = test_method.serialize_request - request_deserializers[group, method] = test_method.deserialize_request - response_serializers[group, method] = test_method.serialize_response - response_deserializers[group, method] = test_method.deserialize_response - return _SerializationBehaviors( - request_serializers, request_deserializers, response_serializers, - response_deserializers) + request_serializers = {} + request_deserializers = {} + response_serializers = {} + response_deserializers = {} + for (group, method), test_method in six.iteritems(test_methods): + request_serializers[group, method] = test_method.serialize_request + request_deserializers[group, method] = test_method.deserialize_request + response_serializers[group, method] = test_method.serialize_response + response_deserializers[group, method] = test_method.deserialize_response + return _SerializationBehaviors(request_serializers, request_deserializers, + response_serializers, response_deserializers) class _Implementation(test_interfaces.Implementation): - def instantiate( - self, methods, method_implementations, multi_method_implementation): - serialization_behaviors = _serialization_behaviors_from_test_methods( - methods) - # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. - service = next(iter(methods))[0] - # TODO(nathaniel): Add a "cardinalities_by_group" attribute to - # _digest.TestServiceDigest. - cardinalities = { - method: method_object.cardinality() - for (group, method), method_object in six.iteritems(methods)} - - server_options = implementations.server_options( - request_deserializers=serialization_behaviors.request_deserializers, - response_serializers=serialization_behaviors.response_serializers, - thread_pool_size=test_constants.POOL_SIZE) - server = implementations.server( - method_implementations, options=server_options) - server_credentials = implementations.ssl_server_credentials( - [(resources.private_key(), resources.certificate_chain(),),]) - port = server.add_secure_port('[::]:0', server_credentials) - server.start() - channel_credentials = implementations.ssl_channel_credentials( - resources.test_root_certificates()) - channel = test_utilities.not_really_secure_channel( - 'localhost', port, channel_credentials, _SERVER_HOST_OVERRIDE) - stub_options = implementations.stub_options( - request_serializers=serialization_behaviors.request_serializers, - response_deserializers=serialization_behaviors.response_deserializers, - thread_pool_size=test_constants.POOL_SIZE) - generic_stub = implementations.generic_stub(channel, options=stub_options) - dynamic_stub = implementations.dynamic_stub( - channel, service, cardinalities, options=stub_options) - return generic_stub, {service: dynamic_stub}, server - - def destantiate(self, memo): - memo.stop(test_constants.SHORT_TIMEOUT).wait() - - def invocation_metadata(self): - return grpc_test_common.INVOCATION_INITIAL_METADATA - - def initial_metadata(self): - return grpc_test_common.SERVICE_INITIAL_METADATA - - def terminal_metadata(self): - return grpc_test_common.SERVICE_TERMINAL_METADATA - - def code(self): - return interfaces.StatusCode.OK - - def details(self): - return grpc_test_common.DETAILS - - def metadata_transmitted(self, original_metadata, transmitted_metadata): - return original_metadata is None or grpc_test_common.metadata_transmitted( - original_metadata, transmitted_metadata) + def instantiate(self, methods, method_implementations, + multi_method_implementation): + serialization_behaviors = _serialization_behaviors_from_test_methods( + methods) + # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. + service = next(iter(methods))[0] + # TODO(nathaniel): Add a "cardinalities_by_group" attribute to + # _digest.TestServiceDigest. + cardinalities = { + method: method_object.cardinality() + for (group, method), method_object in six.iteritems(methods) + } + + server_options = implementations.server_options( + request_deserializers=serialization_behaviors.request_deserializers, + response_serializers=serialization_behaviors.response_serializers, + thread_pool_size=test_constants.POOL_SIZE) + server = implementations.server( + method_implementations, options=server_options) + server_credentials = implementations.ssl_server_credentials([( + resources.private_key(), + resources.certificate_chain(),),]) + port = server.add_secure_port('[::]:0', server_credentials) + server.start() + channel_credentials = implementations.ssl_channel_credentials( + resources.test_root_certificates()) + channel = test_utilities.not_really_secure_channel( + 'localhost', port, channel_credentials, _SERVER_HOST_OVERRIDE) + stub_options = implementations.stub_options( + request_serializers=serialization_behaviors.request_serializers, + response_deserializers=serialization_behaviors. + response_deserializers, + thread_pool_size=test_constants.POOL_SIZE) + generic_stub = implementations.generic_stub( + channel, options=stub_options) + dynamic_stub = implementations.dynamic_stub( + channel, service, cardinalities, options=stub_options) + return generic_stub, {service: dynamic_stub}, server + + def destantiate(self, memo): + memo.stop(test_constants.SHORT_TIMEOUT).wait() + + def invocation_metadata(self): + return grpc_test_common.INVOCATION_INITIAL_METADATA + + def initial_metadata(self): + return grpc_test_common.SERVICE_INITIAL_METADATA + + def terminal_metadata(self): + return grpc_test_common.SERVICE_TERMINAL_METADATA + + def code(self): + return interfaces.StatusCode.OK + + def details(self): + return grpc_test_common.DETAILS + + def metadata_transmitted(self, original_metadata, transmitted_metadata): + return original_metadata is None or grpc_test_common.metadata_transmitted( + original_metadata, transmitted_metadata) def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) + return unittest.TestSuite(tests=tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in test_cases.test_cases(_Implementation()))) if __name__ == '__main__': - unittest.main(verbosity=2) + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/beta/_implementations_test.py b/src/python/grpcio_tests/tests/unit/beta/_implementations_test.py index 127f93e9bb..69bb5cc2a5 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_implementations_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_implementations_test.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Tests the implementations module of the gRPC Python Beta API.""" import datetime @@ -40,31 +39,32 @@ from tests.unit import resources class ChannelCredentialsTest(unittest.TestCase): - def test_runtime_provided_root_certificates(self): - channel_credentials = implementations.ssl_channel_credentials() - self.assertIsInstance( - channel_credentials, implementations.ChannelCredentials) - - def test_application_provided_root_certificates(self): - channel_credentials = implementations.ssl_channel_credentials( - resources.test_root_certificates()) - self.assertIsInstance( - channel_credentials, implementations.ChannelCredentials) + def test_runtime_provided_root_certificates(self): + channel_credentials = implementations.ssl_channel_credentials() + self.assertIsInstance(channel_credentials, + implementations.ChannelCredentials) + + def test_application_provided_root_certificates(self): + channel_credentials = implementations.ssl_channel_credentials( + resources.test_root_certificates()) + self.assertIsInstance(channel_credentials, + implementations.ChannelCredentials) class CallCredentialsTest(unittest.TestCase): - def test_google_call_credentials(self): - creds = oauth2client_client.GoogleCredentials( - 'token', 'client_id', 'secret', 'refresh_token', - datetime.datetime(2008, 6, 24), 'https://refresh.uri.com/', - 'user_agent') - call_creds = implementations.google_call_credentials(creds) - self.assertIsInstance(call_creds, implementations.CallCredentials) + def test_google_call_credentials(self): + creds = oauth2client_client.GoogleCredentials( + 'token', 'client_id', 'secret', 'refresh_token', + datetime.datetime(2008, 6, 24), 'https://refresh.uri.com/', + 'user_agent') + call_creds = implementations.google_call_credentials(creds) + self.assertIsInstance(call_creds, implementations.CallCredentials) + + def test_access_token_call_credentials(self): + call_creds = implementations.access_token_call_credentials('token') + self.assertIsInstance(call_creds, implementations.CallCredentials) - def test_access_token_call_credentials(self): - call_creds = implementations.access_token_call_credentials('token') - self.assertIsInstance(call_creds, implementations.CallCredentials) if __name__ == '__main__': - unittest.main(verbosity=2) + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/beta/_not_found_test.py b/src/python/grpcio_tests/tests/unit/beta/_not_found_test.py index 37b8c49120..664e47c769 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_not_found_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_not_found_test.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Tests of RPC-method-not-found behavior.""" import unittest @@ -39,37 +38,38 @@ from tests.unit.framework.common import test_constants class NotFoundTest(unittest.TestCase): - def setUp(self): - self._server = implementations.server({}) - port = self._server.add_insecure_port('[::]:0') - channel = implementations.insecure_channel('localhost', port) - self._generic_stub = implementations.generic_stub(channel) - self._server.start() + def setUp(self): + self._server = implementations.server({}) + port = self._server.add_insecure_port('[::]:0') + channel = implementations.insecure_channel('localhost', port) + self._generic_stub = implementations.generic_stub(channel) + self._server.start() - def tearDown(self): - self._server.stop(0).wait() - self._generic_stub = None + def tearDown(self): + self._server.stop(0).wait() + self._generic_stub = None - def test_blocking_unary_unary_not_found(self): - with self.assertRaises(face.LocalError) as exception_assertion_context: - self._generic_stub.blocking_unary_unary( - 'groop', 'meffod', b'abc', test_constants.LONG_TIMEOUT, - with_call=True) - self.assertIs( - exception_assertion_context.exception.code, - interfaces.StatusCode.UNIMPLEMENTED) + def test_blocking_unary_unary_not_found(self): + with self.assertRaises(face.LocalError) as exception_assertion_context: + self._generic_stub.blocking_unary_unary( + 'groop', + 'meffod', + b'abc', + test_constants.LONG_TIMEOUT, + with_call=True) + self.assertIs(exception_assertion_context.exception.code, + interfaces.StatusCode.UNIMPLEMENTED) - def test_future_stream_unary_not_found(self): - rpc_future = self._generic_stub.future_stream_unary( - 'grupe', 'mevvod', [b'def'], test_constants.LONG_TIMEOUT) - with self.assertRaises(face.LocalError) as exception_assertion_context: - rpc_future.result() - self.assertIs( - exception_assertion_context.exception.code, - interfaces.StatusCode.UNIMPLEMENTED) - self.assertIs( - rpc_future.exception().code, interfaces.StatusCode.UNIMPLEMENTED) + def test_future_stream_unary_not_found(self): + rpc_future = self._generic_stub.future_stream_unary( + 'grupe', 'mevvod', [b'def'], test_constants.LONG_TIMEOUT) + with self.assertRaises(face.LocalError) as exception_assertion_context: + rpc_future.result() + self.assertIs(exception_assertion_context.exception.code, + interfaces.StatusCode.UNIMPLEMENTED) + self.assertIs(rpc_future.exception().code, + interfaces.StatusCode.UNIMPLEMENTED) if __name__ == '__main__': - unittest.main(verbosity=2) + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/beta/_utilities_test.py b/src/python/grpcio_tests/tests/unit/beta/_utilities_test.py index 9cce96cc85..e8e62c322a 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_utilities_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_utilities_test.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Tests of grpc.beta.utilities.""" import threading @@ -41,68 +40,68 @@ from tests.unit.framework.common import test_constants class _Callback(object): - def __init__(self): - self._condition = threading.Condition() - self._value = None + def __init__(self): + self._condition = threading.Condition() + self._value = None - def accept_value(self, value): - with self._condition: - self._value = value - self._condition.notify_all() + def accept_value(self, value): + with self._condition: + self._value = value + self._condition.notify_all() - def block_until_called(self): - with self._condition: - while self._value is None: - self._condition.wait() - return self._value + def block_until_called(self): + with self._condition: + while self._value is None: + self._condition.wait() + return self._value class ChannelConnectivityTest(unittest.TestCase): - def test_lonely_channel_connectivity(self): - channel = implementations.insecure_channel('localhost', 12345) - callback = _Callback() - - ready_future = utilities.channel_ready_future(channel) - ready_future.add_done_callback(callback.accept_value) - with self.assertRaises(future.TimeoutError): - ready_future.result(timeout=test_constants.SHORT_TIMEOUT) - self.assertFalse(ready_future.cancelled()) - self.assertFalse(ready_future.done()) - self.assertTrue(ready_future.running()) - ready_future.cancel() - value_passed_to_callback = callback.block_until_called() - self.assertIs(ready_future, value_passed_to_callback) - self.assertTrue(ready_future.cancelled()) - self.assertTrue(ready_future.done()) - self.assertFalse(ready_future.running()) - - def test_immediately_connectable_channel_connectivity(self): - server = implementations.server({}) - port = server.add_insecure_port('[::]:0') - server.start() - channel = implementations.insecure_channel('localhost', port) - callback = _Callback() - - try: - ready_future = utilities.channel_ready_future(channel) - ready_future.add_done_callback(callback.accept_value) - self.assertIsNone( - ready_future.result(timeout=test_constants.LONG_TIMEOUT)) - value_passed_to_callback = callback.block_until_called() - self.assertIs(ready_future, value_passed_to_callback) - self.assertFalse(ready_future.cancelled()) - self.assertTrue(ready_future.done()) - self.assertFalse(ready_future.running()) - # Cancellation after maturity has no effect. - ready_future.cancel() - self.assertFalse(ready_future.cancelled()) - self.assertTrue(ready_future.done()) - self.assertFalse(ready_future.running()) - finally: - ready_future.cancel() - server.stop(0) + def test_lonely_channel_connectivity(self): + channel = implementations.insecure_channel('localhost', 12345) + callback = _Callback() + + ready_future = utilities.channel_ready_future(channel) + ready_future.add_done_callback(callback.accept_value) + with self.assertRaises(future.TimeoutError): + ready_future.result(timeout=test_constants.SHORT_TIMEOUT) + self.assertFalse(ready_future.cancelled()) + self.assertFalse(ready_future.done()) + self.assertTrue(ready_future.running()) + ready_future.cancel() + value_passed_to_callback = callback.block_until_called() + self.assertIs(ready_future, value_passed_to_callback) + self.assertTrue(ready_future.cancelled()) + self.assertTrue(ready_future.done()) + self.assertFalse(ready_future.running()) + + def test_immediately_connectable_channel_connectivity(self): + server = implementations.server({}) + port = server.add_insecure_port('[::]:0') + server.start() + channel = implementations.insecure_channel('localhost', port) + callback = _Callback() + + try: + ready_future = utilities.channel_ready_future(channel) + ready_future.add_done_callback(callback.accept_value) + self.assertIsNone( + ready_future.result(timeout=test_constants.LONG_TIMEOUT)) + value_passed_to_callback = callback.block_until_called() + self.assertIs(ready_future, value_passed_to_callback) + self.assertFalse(ready_future.cancelled()) + self.assertTrue(ready_future.done()) + self.assertFalse(ready_future.running()) + # Cancellation after maturity has no effect. + ready_future.cancel() + self.assertFalse(ready_future.cancelled()) + self.assertTrue(ready_future.done()) + self.assertFalse(ready_future.running()) + finally: + ready_future.cancel() + server.stop(0) if __name__ == '__main__': - unittest.main(verbosity=2) + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/beta/test_utilities.py b/src/python/grpcio_tests/tests/unit/beta/test_utilities.py index 692da9c97d..f542420683 100644 --- a/src/python/grpcio_tests/tests/unit/beta/test_utilities.py +++ b/src/python/grpcio_tests/tests/unit/beta/test_utilities.py @@ -26,16 +26,15 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Test-appropriate entry points into the gRPC Python Beta API.""" import grpc from grpc.beta import implementations -def not_really_secure_channel( - host, port, channel_credentials, server_host_override): - """Creates an insecure Channel to a remote host. +def not_really_secure_channel(host, port, channel_credentials, + server_host_override): + """Creates an insecure Channel to a remote host. Args: host: The name of the remote host to which to connect. @@ -48,8 +47,8 @@ def not_really_secure_channel( An implementations.Channel to the remote host through which RPCs may be conducted. """ - target = '%s:%d' % (host, port) - channel = grpc.secure_channel( - target, channel_credentials, - (('grpc.ssl_target_name_override', server_host_override,),)) - return implementations.Channel(channel) + target = '%s:%d' % (host, port) + channel = grpc.secure_channel(target, channel_credentials, (( + 'grpc.ssl_target_name_override', + server_host_override,),)) + return implementations.Channel(channel) |