aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/unit/beta
diff options
context:
space:
mode:
authorGravatar Masood Malekghassemi <atash@google.com>2017-01-13 19:20:10 -0800
committerGravatar Masood Malekghassemi <atash@google.com>2017-01-17 10:55:33 -0800
commitcc793703bfba6f661f523b6fec82ff8a913e1759 (patch)
treef3cb0c7330565e9ed9947a07c6423f81e5c00f72 /src/python/grpcio_tests/tests/unit/beta
parent06dea573daa2175b244a430bb89b49bb5c8e8c5b (diff)
Run Python formatting
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/beta')
-rw-r--r--src/python/grpcio_tests/tests/unit/beta/__init__.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py528
-rw-r--r--src/python/grpcio_tests/tests/unit/beta/_connectivity_channel_test.py15
-rw-r--r--src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py160
-rw-r--r--src/python/grpcio_tests/tests/unit/beta/_implementations_test.py44
-rw-r--r--src/python/grpcio_tests/tests/unit/beta/_not_found_test.py58
-rw-r--r--src/python/grpcio_tests/tests/unit/beta/_utilities_test.py113
-rw-r--r--src/python/grpcio_tests/tests/unit/beta/test_utilities.py17
8 files changed, 473 insertions, 464 deletions
diff --git a/src/python/grpcio_tests/tests/unit/beta/__init__.py b/src/python/grpcio_tests/tests/unit/beta/__init__.py
index 7086519106..b89398809f 100644
--- a/src/python/grpcio_tests/tests/unit/beta/__init__.py
+++ b/src/python/grpcio_tests/tests/unit/beta/__init__.py
@@ -26,5 +26,3 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py
index 3a9701b8eb..b5fdac26c1 100644
--- a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py
+++ b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Tests Face interface compliance of the gRPC Python Beta API."""
import threading
@@ -57,290 +56,303 @@ _RESPONSE = b'123'
class _Servicer(object):
- def __init__(self):
- self._condition = threading.Condition()
- self._peer = None
- self._serviced = False
-
- def unary_unary(self, request, context):
- with self._condition:
- self._request = request
- self._peer = context.protocol_context().peer()
- self._invocation_metadata = context.invocation_metadata()
- context.protocol_context().disable_next_response_compression()
- self._serviced = True
- self._condition.notify_all()
- return _RESPONSE
-
- def unary_stream(self, request, context):
- with self._condition:
- self._request = request
- self._peer = context.protocol_context().peer()
- self._invocation_metadata = context.invocation_metadata()
- context.protocol_context().disable_next_response_compression()
- self._serviced = True
- self._condition.notify_all()
- return
- yield
-
- def stream_unary(self, request_iterator, context):
- for request in request_iterator:
- self._request = request
- with self._condition:
- self._peer = context.protocol_context().peer()
- self._invocation_metadata = context.invocation_metadata()
- context.protocol_context().disable_next_response_compression()
- self._serviced = True
- self._condition.notify_all()
- return _RESPONSE
-
- def stream_stream(self, request_iterator, context):
- for request in request_iterator:
- with self._condition:
- self._peer = context.protocol_context().peer()
- context.protocol_context().disable_next_response_compression()
- yield _RESPONSE
- with self._condition:
- self._invocation_metadata = context.invocation_metadata()
- self._serviced = True
- self._condition.notify_all()
-
- def peer(self):
- with self._condition:
- return self._peer
-
- def block_until_serviced(self):
- with self._condition:
- while not self._serviced:
- self._condition.wait()
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._peer = None
+ self._serviced = False
+
+ def unary_unary(self, request, context):
+ with self._condition:
+ self._request = request
+ self._peer = context.protocol_context().peer()
+ self._invocation_metadata = context.invocation_metadata()
+ context.protocol_context().disable_next_response_compression()
+ self._serviced = True
+ self._condition.notify_all()
+ return _RESPONSE
+
+ def unary_stream(self, request, context):
+ with self._condition:
+ self._request = request
+ self._peer = context.protocol_context().peer()
+ self._invocation_metadata = context.invocation_metadata()
+ context.protocol_context().disable_next_response_compression()
+ self._serviced = True
+ self._condition.notify_all()
+ return
+ yield
+
+ def stream_unary(self, request_iterator, context):
+ for request in request_iterator:
+ self._request = request
+ with self._condition:
+ self._peer = context.protocol_context().peer()
+ self._invocation_metadata = context.invocation_metadata()
+ context.protocol_context().disable_next_response_compression()
+ self._serviced = True
+ self._condition.notify_all()
+ return _RESPONSE
+
+ def stream_stream(self, request_iterator, context):
+ for request in request_iterator:
+ with self._condition:
+ self._peer = context.protocol_context().peer()
+ context.protocol_context().disable_next_response_compression()
+ yield _RESPONSE
+ with self._condition:
+ self._invocation_metadata = context.invocation_metadata()
+ self._serviced = True
+ self._condition.notify_all()
+
+ def peer(self):
+ with self._condition:
+ return self._peer
+
+ def block_until_serviced(self):
+ with self._condition:
+ while not self._serviced:
+ self._condition.wait()
class _BlockingIterator(object):
- def __init__(self, upstream):
- self._condition = threading.Condition()
- self._upstream = upstream
- self._allowed = []
+ def __init__(self, upstream):
+ self._condition = threading.Condition()
+ self._upstream = upstream
+ self._allowed = []
- def __iter__(self):
- return self
+ def __iter__(self):
+ return self
- def __next__(self):
- return self.next()
+ def __next__(self):
+ return self.next()
- def next(self):
- with self._condition:
- while True:
- if self._allowed is None:
- raise StopIteration()
- elif self._allowed:
- return self._allowed.pop(0)
- else:
- self._condition.wait()
+ def next(self):
+ with self._condition:
+ while True:
+ if self._allowed is None:
+ raise StopIteration()
+ elif self._allowed:
+ return self._allowed.pop(0)
+ else:
+ self._condition.wait()
- def allow(self):
- with self._condition:
- try:
- self._allowed.append(next(self._upstream))
- except StopIteration:
- self._allowed = None
- self._condition.notify_all()
+ def allow(self):
+ with self._condition:
+ try:
+ self._allowed.append(next(self._upstream))
+ except StopIteration:
+ self._allowed = None
+ self._condition.notify_all()
def _metadata_plugin(context, callback):
- callback([(_PER_RPC_CREDENTIALS_METADATA_KEY,
- _PER_RPC_CREDENTIALS_METADATA_VALUE)], None)
+ callback([(_PER_RPC_CREDENTIALS_METADATA_KEY,
+ _PER_RPC_CREDENTIALS_METADATA_VALUE)], None)
class BetaFeaturesTest(unittest.TestCase):
- def setUp(self):
- self._servicer = _Servicer()
- method_implementations = {
- (_GROUP, _UNARY_UNARY):
+ def setUp(self):
+ self._servicer = _Servicer()
+ method_implementations = {
+ (_GROUP, _UNARY_UNARY):
utilities.unary_unary_inline(self._servicer.unary_unary),
- (_GROUP, _UNARY_STREAM):
+ (_GROUP, _UNARY_STREAM):
utilities.unary_stream_inline(self._servicer.unary_stream),
- (_GROUP, _STREAM_UNARY):
+ (_GROUP, _STREAM_UNARY):
utilities.stream_unary_inline(self._servicer.stream_unary),
- (_GROUP, _STREAM_STREAM):
+ (_GROUP, _STREAM_STREAM):
utilities.stream_stream_inline(self._servicer.stream_stream),
- }
-
- cardinalities = {
- _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
- _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
- _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
- _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
- }
-
- server_options = implementations.server_options(
- thread_pool_size=test_constants.POOL_SIZE)
- self._server = implementations.server(
- method_implementations, options=server_options)
- server_credentials = implementations.ssl_server_credentials(
- [(resources.private_key(), resources.certificate_chain(),),])
- port = self._server.add_secure_port('[::]:0', server_credentials)
- self._server.start()
- self._channel_credentials = implementations.ssl_channel_credentials(
- resources.test_root_certificates())
- self._call_credentials = implementations.metadata_call_credentials(
- _metadata_plugin)
- channel = test_utilities.not_really_secure_channel(
- 'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE)
- stub_options = implementations.stub_options(
- thread_pool_size=test_constants.POOL_SIZE)
- self._dynamic_stub = implementations.dynamic_stub(
- channel, _GROUP, cardinalities, options=stub_options)
-
- def tearDown(self):
- self._dynamic_stub = None
- self._server.stop(test_constants.SHORT_TIMEOUT).wait()
-
- def test_unary_unary(self):
- call_options = interfaces.grpc_call_options(
- disable_compression=True, credentials=self._call_credentials)
- response = getattr(self._dynamic_stub, _UNARY_UNARY)(
- _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options)
- self.assertEqual(_RESPONSE, response)
- self.assertIsNotNone(self._servicer.peer())
- invocation_metadata = [(metadatum.key, metadatum.value) for metadatum in
- self._servicer._invocation_metadata]
- self.assertIn(
- (_PER_RPC_CREDENTIALS_METADATA_KEY,
- _PER_RPC_CREDENTIALS_METADATA_VALUE),
- invocation_metadata)
-
- def test_unary_stream(self):
- call_options = interfaces.grpc_call_options(
- disable_compression=True, credentials=self._call_credentials)
- response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)(
- _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options)
- self._servicer.block_until_serviced()
- self.assertIsNotNone(self._servicer.peer())
- invocation_metadata = [(metadatum.key, metadatum.value) for metadatum in
- self._servicer._invocation_metadata]
- self.assertIn(
- (_PER_RPC_CREDENTIALS_METADATA_KEY,
- _PER_RPC_CREDENTIALS_METADATA_VALUE),
- invocation_metadata)
-
- def test_stream_unary(self):
- call_options = interfaces.grpc_call_options(
- credentials=self._call_credentials)
- request_iterator = _BlockingIterator(iter((_REQUEST,)))
- response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future(
- request_iterator, test_constants.LONG_TIMEOUT,
- protocol_options=call_options)
- response_future.protocol_context().disable_next_request_compression()
- request_iterator.allow()
- response_future.protocol_context().disable_next_request_compression()
- request_iterator.allow()
- self._servicer.block_until_serviced()
- self.assertIsNotNone(self._servicer.peer())
- self.assertEqual(_RESPONSE, response_future.result())
- invocation_metadata = [(metadatum.key, metadatum.value) for metadatum in
- self._servicer._invocation_metadata]
- self.assertIn(
- (_PER_RPC_CREDENTIALS_METADATA_KEY,
- _PER_RPC_CREDENTIALS_METADATA_VALUE),
- invocation_metadata)
-
- def test_stream_stream(self):
- call_options = interfaces.grpc_call_options(
- credentials=self._call_credentials)
- request_iterator = _BlockingIterator(iter((_REQUEST,)))
- response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)(
- request_iterator, test_constants.SHORT_TIMEOUT,
- protocol_options=call_options)
- response_iterator.protocol_context().disable_next_request_compression()
- request_iterator.allow()
- response = next(response_iterator)
- response_iterator.protocol_context().disable_next_request_compression()
- request_iterator.allow()
- self._servicer.block_until_serviced()
- self.assertIsNotNone(self._servicer.peer())
- self.assertEqual(_RESPONSE, response)
- invocation_metadata = [(metadatum.key, metadatum.value) for metadatum in
- self._servicer._invocation_metadata]
- self.assertIn(
- (_PER_RPC_CREDENTIALS_METADATA_KEY,
- _PER_RPC_CREDENTIALS_METADATA_VALUE),
- invocation_metadata)
+ }
+
+ cardinalities = {
+ _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
+ _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
+ _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
+ _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
+ }
+
+ server_options = implementations.server_options(
+ thread_pool_size=test_constants.POOL_SIZE)
+ self._server = implementations.server(
+ method_implementations, options=server_options)
+ server_credentials = implementations.ssl_server_credentials([(
+ resources.private_key(),
+ resources.certificate_chain(),),])
+ port = self._server.add_secure_port('[::]:0', server_credentials)
+ self._server.start()
+ self._channel_credentials = implementations.ssl_channel_credentials(
+ resources.test_root_certificates())
+ self._call_credentials = implementations.metadata_call_credentials(
+ _metadata_plugin)
+ channel = test_utilities.not_really_secure_channel(
+ 'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE)
+ stub_options = implementations.stub_options(
+ thread_pool_size=test_constants.POOL_SIZE)
+ self._dynamic_stub = implementations.dynamic_stub(
+ channel, _GROUP, cardinalities, options=stub_options)
+
+ def tearDown(self):
+ self._dynamic_stub = None
+ self._server.stop(test_constants.SHORT_TIMEOUT).wait()
+
+ def test_unary_unary(self):
+ call_options = interfaces.grpc_call_options(
+ disable_compression=True, credentials=self._call_credentials)
+ response = getattr(self._dynamic_stub, _UNARY_UNARY)(
+ _REQUEST,
+ test_constants.LONG_TIMEOUT,
+ protocol_options=call_options)
+ self.assertEqual(_RESPONSE, response)
+ self.assertIsNotNone(self._servicer.peer())
+ invocation_metadata = [
+ (metadatum.key, metadatum.value)
+ for metadatum in self._servicer._invocation_metadata
+ ]
+ self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
+ _PER_RPC_CREDENTIALS_METADATA_VALUE),
+ invocation_metadata)
+
+ def test_unary_stream(self):
+ call_options = interfaces.grpc_call_options(
+ disable_compression=True, credentials=self._call_credentials)
+ response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)(
+ _REQUEST,
+ test_constants.LONG_TIMEOUT,
+ protocol_options=call_options)
+ self._servicer.block_until_serviced()
+ self.assertIsNotNone(self._servicer.peer())
+ invocation_metadata = [
+ (metadatum.key, metadatum.value)
+ for metadatum in self._servicer._invocation_metadata
+ ]
+ self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
+ _PER_RPC_CREDENTIALS_METADATA_VALUE),
+ invocation_metadata)
+
+ def test_stream_unary(self):
+ call_options = interfaces.grpc_call_options(
+ credentials=self._call_credentials)
+ request_iterator = _BlockingIterator(iter((_REQUEST,)))
+ response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future(
+ request_iterator,
+ test_constants.LONG_TIMEOUT,
+ protocol_options=call_options)
+ response_future.protocol_context().disable_next_request_compression()
+ request_iterator.allow()
+ response_future.protocol_context().disable_next_request_compression()
+ request_iterator.allow()
+ self._servicer.block_until_serviced()
+ self.assertIsNotNone(self._servicer.peer())
+ self.assertEqual(_RESPONSE, response_future.result())
+ invocation_metadata = [
+ (metadatum.key, metadatum.value)
+ for metadatum in self._servicer._invocation_metadata
+ ]
+ self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
+ _PER_RPC_CREDENTIALS_METADATA_VALUE),
+ invocation_metadata)
+
+ def test_stream_stream(self):
+ call_options = interfaces.grpc_call_options(
+ credentials=self._call_credentials)
+ request_iterator = _BlockingIterator(iter((_REQUEST,)))
+ response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)(
+ request_iterator,
+ test_constants.SHORT_TIMEOUT,
+ protocol_options=call_options)
+ response_iterator.protocol_context().disable_next_request_compression()
+ request_iterator.allow()
+ response = next(response_iterator)
+ response_iterator.protocol_context().disable_next_request_compression()
+ request_iterator.allow()
+ self._servicer.block_until_serviced()
+ self.assertIsNotNone(self._servicer.peer())
+ self.assertEqual(_RESPONSE, response)
+ invocation_metadata = [
+ (metadatum.key, metadatum.value)
+ for metadatum in self._servicer._invocation_metadata
+ ]
+ self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
+ _PER_RPC_CREDENTIALS_METADATA_VALUE),
+ invocation_metadata)
class ContextManagementAndLifecycleTest(unittest.TestCase):
- def setUp(self):
- self._servicer = _Servicer()
- self._method_implementations = {
- (_GROUP, _UNARY_UNARY):
+ def setUp(self):
+ self._servicer = _Servicer()
+ self._method_implementations = {
+ (_GROUP, _UNARY_UNARY):
utilities.unary_unary_inline(self._servicer.unary_unary),
- (_GROUP, _UNARY_STREAM):
+ (_GROUP, _UNARY_STREAM):
utilities.unary_stream_inline(self._servicer.unary_stream),
- (_GROUP, _STREAM_UNARY):
+ (_GROUP, _STREAM_UNARY):
utilities.stream_unary_inline(self._servicer.stream_unary),
- (_GROUP, _STREAM_STREAM):
+ (_GROUP, _STREAM_STREAM):
utilities.stream_stream_inline(self._servicer.stream_stream),
- }
-
- self._cardinalities = {
- _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
- _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
- _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
- _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
- }
-
- self._server_options = implementations.server_options(
- thread_pool_size=test_constants.POOL_SIZE)
- self._server_credentials = implementations.ssl_server_credentials(
- [(resources.private_key(), resources.certificate_chain(),),])
- self._channel_credentials = implementations.ssl_channel_credentials(
- resources.test_root_certificates())
- self._stub_options = implementations.stub_options(
- thread_pool_size=test_constants.POOL_SIZE)
-
- def test_stub_context(self):
- server = implementations.server(
- self._method_implementations, options=self._server_options)
- port = server.add_secure_port('[::]:0', self._server_credentials)
- server.start()
-
- channel = test_utilities.not_really_secure_channel(
- 'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE)
- dynamic_stub = implementations.dynamic_stub(
- channel, _GROUP, self._cardinalities, options=self._stub_options)
- for _ in range(100):
- with dynamic_stub:
- pass
- for _ in range(10):
- with dynamic_stub:
- call_options = interfaces.grpc_call_options(
- disable_compression=True)
- response = getattr(dynamic_stub, _UNARY_UNARY)(
- _REQUEST, test_constants.LONG_TIMEOUT,
- protocol_options=call_options)
- self.assertEqual(_RESPONSE, response)
- self.assertIsNotNone(self._servicer.peer())
-
- server.stop(test_constants.SHORT_TIMEOUT).wait()
-
- def test_server_lifecycle(self):
- for _ in range(100):
- server = implementations.server(
- self._method_implementations, options=self._server_options)
- port = server.add_secure_port('[::]:0', self._server_credentials)
- server.start()
- server.stop(test_constants.SHORT_TIMEOUT).wait()
- for _ in range(100):
- server = implementations.server(
- self._method_implementations, options=self._server_options)
- server.add_secure_port('[::]:0', self._server_credentials)
- server.add_insecure_port('[::]:0')
- with server:
- server.stop(test_constants.SHORT_TIMEOUT)
- server.stop(test_constants.SHORT_TIMEOUT)
+ }
+
+ self._cardinalities = {
+ _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
+ _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
+ _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
+ _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
+ }
+
+ self._server_options = implementations.server_options(
+ thread_pool_size=test_constants.POOL_SIZE)
+ self._server_credentials = implementations.ssl_server_credentials([(
+ resources.private_key(),
+ resources.certificate_chain(),),])
+ self._channel_credentials = implementations.ssl_channel_credentials(
+ resources.test_root_certificates())
+ self._stub_options = implementations.stub_options(
+ thread_pool_size=test_constants.POOL_SIZE)
+
+ def test_stub_context(self):
+ server = implementations.server(
+ self._method_implementations, options=self._server_options)
+ port = server.add_secure_port('[::]:0', self._server_credentials)
+ server.start()
+
+ channel = test_utilities.not_really_secure_channel(
+ 'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE)
+ dynamic_stub = implementations.dynamic_stub(
+ channel, _GROUP, self._cardinalities, options=self._stub_options)
+ for _ in range(100):
+ with dynamic_stub:
+ pass
+ for _ in range(10):
+ with dynamic_stub:
+ call_options = interfaces.grpc_call_options(
+ disable_compression=True)
+ response = getattr(dynamic_stub, _UNARY_UNARY)(
+ _REQUEST,
+ test_constants.LONG_TIMEOUT,
+ protocol_options=call_options)
+ self.assertEqual(_RESPONSE, response)
+ self.assertIsNotNone(self._servicer.peer())
+
+ server.stop(test_constants.SHORT_TIMEOUT).wait()
+
+ def test_server_lifecycle(self):
+ for _ in range(100):
+ server = implementations.server(
+ self._method_implementations, options=self._server_options)
+ port = server.add_secure_port('[::]:0', self._server_credentials)
+ server.start()
+ server.stop(test_constants.SHORT_TIMEOUT).wait()
+ for _ in range(100):
+ server = implementations.server(
+ self._method_implementations, options=self._server_options)
+ server.add_secure_port('[::]:0', self._server_credentials)
+ server.add_insecure_port('[::]:0')
+ with server:
+ server.stop(test_constants.SHORT_TIMEOUT)
+ server.stop(test_constants.SHORT_TIMEOUT)
if __name__ == '__main__':
- unittest.main(verbosity=2)
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/beta/_connectivity_channel_test.py b/src/python/grpcio_tests/tests/unit/beta/_connectivity_channel_test.py
index 5d826a269d..49d683b8a6 100644
--- a/src/python/grpcio_tests/tests/unit/beta/_connectivity_channel_test.py
+++ b/src/python/grpcio_tests/tests/unit/beta/_connectivity_channel_test.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Tests of grpc.beta._connectivity_channel."""
import unittest
@@ -36,13 +35,13 @@ from grpc.beta import interfaces
class ConnectivityStatesTest(unittest.TestCase):
- def testBetaConnectivityStates(self):
- self.assertIsNotNone(interfaces.ChannelConnectivity.IDLE)
- self.assertIsNotNone(interfaces.ChannelConnectivity.CONNECTING)
- self.assertIsNotNone(interfaces.ChannelConnectivity.READY)
- self.assertIsNotNone(interfaces.ChannelConnectivity.TRANSIENT_FAILURE)
- self.assertIsNotNone(interfaces.ChannelConnectivity.FATAL_FAILURE)
+ def testBetaConnectivityStates(self):
+ self.assertIsNotNone(interfaces.ChannelConnectivity.IDLE)
+ self.assertIsNotNone(interfaces.ChannelConnectivity.CONNECTING)
+ self.assertIsNotNone(interfaces.ChannelConnectivity.READY)
+ self.assertIsNotNone(interfaces.ChannelConnectivity.TRANSIENT_FAILURE)
+ self.assertIsNotNone(interfaces.ChannelConnectivity.FATAL_FAILURE)
if __name__ == '__main__':
- unittest.main(verbosity=2)
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py b/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py
index 3a67516906..f421442624 100644
--- a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py
+++ b/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Tests Face interface compliance of the gRPC Python Beta API."""
import collections
@@ -47,94 +46,97 @@ _SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
class _SerializationBehaviors(
- collections.namedtuple(
- '_SerializationBehaviors',
- ('request_serializers', 'request_deserializers', 'response_serializers',
- 'response_deserializers',))):
- pass
+ collections.namedtuple('_SerializationBehaviors', (
+ 'request_serializers',
+ 'request_deserializers',
+ 'response_serializers',
+ 'response_deserializers',))):
+ pass
def _serialization_behaviors_from_test_methods(test_methods):
- request_serializers = {}
- request_deserializers = {}
- response_serializers = {}
- response_deserializers = {}
- for (group, method), test_method in six.iteritems(test_methods):
- request_serializers[group, method] = test_method.serialize_request
- request_deserializers[group, method] = test_method.deserialize_request
- response_serializers[group, method] = test_method.serialize_response
- response_deserializers[group, method] = test_method.deserialize_response
- return _SerializationBehaviors(
- request_serializers, request_deserializers, response_serializers,
- response_deserializers)
+ request_serializers = {}
+ request_deserializers = {}
+ response_serializers = {}
+ response_deserializers = {}
+ for (group, method), test_method in six.iteritems(test_methods):
+ request_serializers[group, method] = test_method.serialize_request
+ request_deserializers[group, method] = test_method.deserialize_request
+ response_serializers[group, method] = test_method.serialize_response
+ response_deserializers[group, method] = test_method.deserialize_response
+ return _SerializationBehaviors(request_serializers, request_deserializers,
+ response_serializers, response_deserializers)
class _Implementation(test_interfaces.Implementation):
- def instantiate(
- self, methods, method_implementations, multi_method_implementation):
- serialization_behaviors = _serialization_behaviors_from_test_methods(
- methods)
- # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
- service = next(iter(methods))[0]
- # TODO(nathaniel): Add a "cardinalities_by_group" attribute to
- # _digest.TestServiceDigest.
- cardinalities = {
- method: method_object.cardinality()
- for (group, method), method_object in six.iteritems(methods)}
-
- server_options = implementations.server_options(
- request_deserializers=serialization_behaviors.request_deserializers,
- response_serializers=serialization_behaviors.response_serializers,
- thread_pool_size=test_constants.POOL_SIZE)
- server = implementations.server(
- method_implementations, options=server_options)
- server_credentials = implementations.ssl_server_credentials(
- [(resources.private_key(), resources.certificate_chain(),),])
- port = server.add_secure_port('[::]:0', server_credentials)
- server.start()
- channel_credentials = implementations.ssl_channel_credentials(
- resources.test_root_certificates())
- channel = test_utilities.not_really_secure_channel(
- 'localhost', port, channel_credentials, _SERVER_HOST_OVERRIDE)
- stub_options = implementations.stub_options(
- request_serializers=serialization_behaviors.request_serializers,
- response_deserializers=serialization_behaviors.response_deserializers,
- thread_pool_size=test_constants.POOL_SIZE)
- generic_stub = implementations.generic_stub(channel, options=stub_options)
- dynamic_stub = implementations.dynamic_stub(
- channel, service, cardinalities, options=stub_options)
- return generic_stub, {service: dynamic_stub}, server
-
- def destantiate(self, memo):
- memo.stop(test_constants.SHORT_TIMEOUT).wait()
-
- def invocation_metadata(self):
- return grpc_test_common.INVOCATION_INITIAL_METADATA
-
- def initial_metadata(self):
- return grpc_test_common.SERVICE_INITIAL_METADATA
-
- def terminal_metadata(self):
- return grpc_test_common.SERVICE_TERMINAL_METADATA
-
- def code(self):
- return interfaces.StatusCode.OK
-
- def details(self):
- return grpc_test_common.DETAILS
-
- def metadata_transmitted(self, original_metadata, transmitted_metadata):
- return original_metadata is None or grpc_test_common.metadata_transmitted(
- original_metadata, transmitted_metadata)
+ def instantiate(self, methods, method_implementations,
+ multi_method_implementation):
+ serialization_behaviors = _serialization_behaviors_from_test_methods(
+ methods)
+ # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest.
+ service = next(iter(methods))[0]
+ # TODO(nathaniel): Add a "cardinalities_by_group" attribute to
+ # _digest.TestServiceDigest.
+ cardinalities = {
+ method: method_object.cardinality()
+ for (group, method), method_object in six.iteritems(methods)
+ }
+
+ server_options = implementations.server_options(
+ request_deserializers=serialization_behaviors.request_deserializers,
+ response_serializers=serialization_behaviors.response_serializers,
+ thread_pool_size=test_constants.POOL_SIZE)
+ server = implementations.server(
+ method_implementations, options=server_options)
+ server_credentials = implementations.ssl_server_credentials([(
+ resources.private_key(),
+ resources.certificate_chain(),),])
+ port = server.add_secure_port('[::]:0', server_credentials)
+ server.start()
+ channel_credentials = implementations.ssl_channel_credentials(
+ resources.test_root_certificates())
+ channel = test_utilities.not_really_secure_channel(
+ 'localhost', port, channel_credentials, _SERVER_HOST_OVERRIDE)
+ stub_options = implementations.stub_options(
+ request_serializers=serialization_behaviors.request_serializers,
+ response_deserializers=serialization_behaviors.
+ response_deserializers,
+ thread_pool_size=test_constants.POOL_SIZE)
+ generic_stub = implementations.generic_stub(
+ channel, options=stub_options)
+ dynamic_stub = implementations.dynamic_stub(
+ channel, service, cardinalities, options=stub_options)
+ return generic_stub, {service: dynamic_stub}, server
+
+ def destantiate(self, memo):
+ memo.stop(test_constants.SHORT_TIMEOUT).wait()
+
+ def invocation_metadata(self):
+ return grpc_test_common.INVOCATION_INITIAL_METADATA
+
+ def initial_metadata(self):
+ return grpc_test_common.SERVICE_INITIAL_METADATA
+
+ def terminal_metadata(self):
+ return grpc_test_common.SERVICE_TERMINAL_METADATA
+
+ def code(self):
+ return interfaces.StatusCode.OK
+
+ def details(self):
+ return grpc_test_common.DETAILS
+
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ return original_metadata is None or grpc_test_common.metadata_transmitted(
+ original_metadata, transmitted_metadata)
def load_tests(loader, tests, pattern):
- return unittest.TestSuite(
- tests=tuple(
- loader.loadTestsFromTestCase(test_case_class)
- for test_case_class in test_cases.test_cases(_Implementation())))
+ return unittest.TestSuite(tests=tuple(
+ loader.loadTestsFromTestCase(test_case_class)
+ for test_case_class in test_cases.test_cases(_Implementation())))
if __name__ == '__main__':
- unittest.main(verbosity=2)
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/beta/_implementations_test.py b/src/python/grpcio_tests/tests/unit/beta/_implementations_test.py
index 127f93e9bb..69bb5cc2a5 100644
--- a/src/python/grpcio_tests/tests/unit/beta/_implementations_test.py
+++ b/src/python/grpcio_tests/tests/unit/beta/_implementations_test.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Tests the implementations module of the gRPC Python Beta API."""
import datetime
@@ -40,31 +39,32 @@ from tests.unit import resources
class ChannelCredentialsTest(unittest.TestCase):
- def test_runtime_provided_root_certificates(self):
- channel_credentials = implementations.ssl_channel_credentials()
- self.assertIsInstance(
- channel_credentials, implementations.ChannelCredentials)
-
- def test_application_provided_root_certificates(self):
- channel_credentials = implementations.ssl_channel_credentials(
- resources.test_root_certificates())
- self.assertIsInstance(
- channel_credentials, implementations.ChannelCredentials)
+ def test_runtime_provided_root_certificates(self):
+ channel_credentials = implementations.ssl_channel_credentials()
+ self.assertIsInstance(channel_credentials,
+ implementations.ChannelCredentials)
+
+ def test_application_provided_root_certificates(self):
+ channel_credentials = implementations.ssl_channel_credentials(
+ resources.test_root_certificates())
+ self.assertIsInstance(channel_credentials,
+ implementations.ChannelCredentials)
class CallCredentialsTest(unittest.TestCase):
- def test_google_call_credentials(self):
- creds = oauth2client_client.GoogleCredentials(
- 'token', 'client_id', 'secret', 'refresh_token',
- datetime.datetime(2008, 6, 24), 'https://refresh.uri.com/',
- 'user_agent')
- call_creds = implementations.google_call_credentials(creds)
- self.assertIsInstance(call_creds, implementations.CallCredentials)
+ def test_google_call_credentials(self):
+ creds = oauth2client_client.GoogleCredentials(
+ 'token', 'client_id', 'secret', 'refresh_token',
+ datetime.datetime(2008, 6, 24), 'https://refresh.uri.com/',
+ 'user_agent')
+ call_creds = implementations.google_call_credentials(creds)
+ self.assertIsInstance(call_creds, implementations.CallCredentials)
+
+ def test_access_token_call_credentials(self):
+ call_creds = implementations.access_token_call_credentials('token')
+ self.assertIsInstance(call_creds, implementations.CallCredentials)
- def test_access_token_call_credentials(self):
- call_creds = implementations.access_token_call_credentials('token')
- self.assertIsInstance(call_creds, implementations.CallCredentials)
if __name__ == '__main__':
- unittest.main(verbosity=2)
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/beta/_not_found_test.py b/src/python/grpcio_tests/tests/unit/beta/_not_found_test.py
index 37b8c49120..664e47c769 100644
--- a/src/python/grpcio_tests/tests/unit/beta/_not_found_test.py
+++ b/src/python/grpcio_tests/tests/unit/beta/_not_found_test.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Tests of RPC-method-not-found behavior."""
import unittest
@@ -39,37 +38,38 @@ from tests.unit.framework.common import test_constants
class NotFoundTest(unittest.TestCase):
- def setUp(self):
- self._server = implementations.server({})
- port = self._server.add_insecure_port('[::]:0')
- channel = implementations.insecure_channel('localhost', port)
- self._generic_stub = implementations.generic_stub(channel)
- self._server.start()
+ def setUp(self):
+ self._server = implementations.server({})
+ port = self._server.add_insecure_port('[::]:0')
+ channel = implementations.insecure_channel('localhost', port)
+ self._generic_stub = implementations.generic_stub(channel)
+ self._server.start()
- def tearDown(self):
- self._server.stop(0).wait()
- self._generic_stub = None
+ def tearDown(self):
+ self._server.stop(0).wait()
+ self._generic_stub = None
- def test_blocking_unary_unary_not_found(self):
- with self.assertRaises(face.LocalError) as exception_assertion_context:
- self._generic_stub.blocking_unary_unary(
- 'groop', 'meffod', b'abc', test_constants.LONG_TIMEOUT,
- with_call=True)
- self.assertIs(
- exception_assertion_context.exception.code,
- interfaces.StatusCode.UNIMPLEMENTED)
+ def test_blocking_unary_unary_not_found(self):
+ with self.assertRaises(face.LocalError) as exception_assertion_context:
+ self._generic_stub.blocking_unary_unary(
+ 'groop',
+ 'meffod',
+ b'abc',
+ test_constants.LONG_TIMEOUT,
+ with_call=True)
+ self.assertIs(exception_assertion_context.exception.code,
+ interfaces.StatusCode.UNIMPLEMENTED)
- def test_future_stream_unary_not_found(self):
- rpc_future = self._generic_stub.future_stream_unary(
- 'grupe', 'mevvod', [b'def'], test_constants.LONG_TIMEOUT)
- with self.assertRaises(face.LocalError) as exception_assertion_context:
- rpc_future.result()
- self.assertIs(
- exception_assertion_context.exception.code,
- interfaces.StatusCode.UNIMPLEMENTED)
- self.assertIs(
- rpc_future.exception().code, interfaces.StatusCode.UNIMPLEMENTED)
+ def test_future_stream_unary_not_found(self):
+ rpc_future = self._generic_stub.future_stream_unary(
+ 'grupe', 'mevvod', [b'def'], test_constants.LONG_TIMEOUT)
+ with self.assertRaises(face.LocalError) as exception_assertion_context:
+ rpc_future.result()
+ self.assertIs(exception_assertion_context.exception.code,
+ interfaces.StatusCode.UNIMPLEMENTED)
+ self.assertIs(rpc_future.exception().code,
+ interfaces.StatusCode.UNIMPLEMENTED)
if __name__ == '__main__':
- unittest.main(verbosity=2)
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/beta/_utilities_test.py b/src/python/grpcio_tests/tests/unit/beta/_utilities_test.py
index 9cce96cc85..e8e62c322a 100644
--- a/src/python/grpcio_tests/tests/unit/beta/_utilities_test.py
+++ b/src/python/grpcio_tests/tests/unit/beta/_utilities_test.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Tests of grpc.beta.utilities."""
import threading
@@ -41,68 +40,68 @@ from tests.unit.framework.common import test_constants
class _Callback(object):
- def __init__(self):
- self._condition = threading.Condition()
- self._value = None
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._value = None
- def accept_value(self, value):
- with self._condition:
- self._value = value
- self._condition.notify_all()
+ def accept_value(self, value):
+ with self._condition:
+ self._value = value
+ self._condition.notify_all()
- def block_until_called(self):
- with self._condition:
- while self._value is None:
- self._condition.wait()
- return self._value
+ def block_until_called(self):
+ with self._condition:
+ while self._value is None:
+ self._condition.wait()
+ return self._value
class ChannelConnectivityTest(unittest.TestCase):
- def test_lonely_channel_connectivity(self):
- channel = implementations.insecure_channel('localhost', 12345)
- callback = _Callback()
-
- ready_future = utilities.channel_ready_future(channel)
- ready_future.add_done_callback(callback.accept_value)
- with self.assertRaises(future.TimeoutError):
- ready_future.result(timeout=test_constants.SHORT_TIMEOUT)
- self.assertFalse(ready_future.cancelled())
- self.assertFalse(ready_future.done())
- self.assertTrue(ready_future.running())
- ready_future.cancel()
- value_passed_to_callback = callback.block_until_called()
- self.assertIs(ready_future, value_passed_to_callback)
- self.assertTrue(ready_future.cancelled())
- self.assertTrue(ready_future.done())
- self.assertFalse(ready_future.running())
-
- def test_immediately_connectable_channel_connectivity(self):
- server = implementations.server({})
- port = server.add_insecure_port('[::]:0')
- server.start()
- channel = implementations.insecure_channel('localhost', port)
- callback = _Callback()
-
- try:
- ready_future = utilities.channel_ready_future(channel)
- ready_future.add_done_callback(callback.accept_value)
- self.assertIsNone(
- ready_future.result(timeout=test_constants.LONG_TIMEOUT))
- value_passed_to_callback = callback.block_until_called()
- self.assertIs(ready_future, value_passed_to_callback)
- self.assertFalse(ready_future.cancelled())
- self.assertTrue(ready_future.done())
- self.assertFalse(ready_future.running())
- # Cancellation after maturity has no effect.
- ready_future.cancel()
- self.assertFalse(ready_future.cancelled())
- self.assertTrue(ready_future.done())
- self.assertFalse(ready_future.running())
- finally:
- ready_future.cancel()
- server.stop(0)
+ def test_lonely_channel_connectivity(self):
+ channel = implementations.insecure_channel('localhost', 12345)
+ callback = _Callback()
+
+ ready_future = utilities.channel_ready_future(channel)
+ ready_future.add_done_callback(callback.accept_value)
+ with self.assertRaises(future.TimeoutError):
+ ready_future.result(timeout=test_constants.SHORT_TIMEOUT)
+ self.assertFalse(ready_future.cancelled())
+ self.assertFalse(ready_future.done())
+ self.assertTrue(ready_future.running())
+ ready_future.cancel()
+ value_passed_to_callback = callback.block_until_called()
+ self.assertIs(ready_future, value_passed_to_callback)
+ self.assertTrue(ready_future.cancelled())
+ self.assertTrue(ready_future.done())
+ self.assertFalse(ready_future.running())
+
+ def test_immediately_connectable_channel_connectivity(self):
+ server = implementations.server({})
+ port = server.add_insecure_port('[::]:0')
+ server.start()
+ channel = implementations.insecure_channel('localhost', port)
+ callback = _Callback()
+
+ try:
+ ready_future = utilities.channel_ready_future(channel)
+ ready_future.add_done_callback(callback.accept_value)
+ self.assertIsNone(
+ ready_future.result(timeout=test_constants.LONG_TIMEOUT))
+ value_passed_to_callback = callback.block_until_called()
+ self.assertIs(ready_future, value_passed_to_callback)
+ self.assertFalse(ready_future.cancelled())
+ self.assertTrue(ready_future.done())
+ self.assertFalse(ready_future.running())
+ # Cancellation after maturity has no effect.
+ ready_future.cancel()
+ self.assertFalse(ready_future.cancelled())
+ self.assertTrue(ready_future.done())
+ self.assertFalse(ready_future.running())
+ finally:
+ ready_future.cancel()
+ server.stop(0)
if __name__ == '__main__':
- unittest.main(verbosity=2)
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/beta/test_utilities.py b/src/python/grpcio_tests/tests/unit/beta/test_utilities.py
index 692da9c97d..f542420683 100644
--- a/src/python/grpcio_tests/tests/unit/beta/test_utilities.py
+++ b/src/python/grpcio_tests/tests/unit/beta/test_utilities.py
@@ -26,16 +26,15 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Test-appropriate entry points into the gRPC Python Beta API."""
import grpc
from grpc.beta import implementations
-def not_really_secure_channel(
- host, port, channel_credentials, server_host_override):
- """Creates an insecure Channel to a remote host.
+def not_really_secure_channel(host, port, channel_credentials,
+ server_host_override):
+ """Creates an insecure Channel to a remote host.
Args:
host: The name of the remote host to which to connect.
@@ -48,8 +47,8 @@ def not_really_secure_channel(
An implementations.Channel to the remote host through which RPCs may be
conducted.
"""
- target = '%s:%d' % (host, port)
- channel = grpc.secure_channel(
- target, channel_credentials,
- (('grpc.ssl_target_name_override', server_host_override,),))
- return implementations.Channel(channel)
+ target = '%s:%d' % (host, port)
+ channel = grpc.secure_channel(target, channel_credentials, ((
+ 'grpc.ssl_target_name_override',
+ server_host_override,),))
+ return implementations.Channel(channel)