aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/unit/_interceptor_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/_interceptor_test.py')
-rw-r--r--src/python/grpcio_tests/tests/unit/_interceptor_test.py158
1 files changed, 93 insertions, 65 deletions
diff --git a/src/python/grpcio_tests/tests/unit/_interceptor_test.py b/src/python/grpcio_tests/tests/unit/_interceptor_test.py
index 2aee298df2..3d547b71cd 100644
--- a/src/python/grpcio_tests/tests/unit/_interceptor_test.py
+++ b/src/python/grpcio_tests/tests/unit/_interceptor_test.py
@@ -65,7 +65,10 @@ class _Handler(object):
def handle_unary_unary(self, request, servicer_context):
self._control.control()
if servicer_context is not None:
- servicer_context.set_trailing_metadata((('testkey', 'testvalue',),))
+ servicer_context.set_trailing_metadata(((
+ 'testkey',
+ 'testvalue',
+ ),))
return request
def handle_unary_stream(self, request, servicer_context):
@@ -74,7 +77,10 @@ class _Handler(object):
yield request
self._control.control()
if servicer_context is not None:
- servicer_context.set_trailing_metadata((('testkey', 'testvalue',),))
+ servicer_context.set_trailing_metadata(((
+ 'testkey',
+ 'testvalue',
+ ),))
def handle_stream_unary(self, request_iterator, servicer_context):
if servicer_context is not None:
@@ -86,13 +92,19 @@ class _Handler(object):
response_elements.append(request)
self._control.control()
if servicer_context is not None:
- servicer_context.set_trailing_metadata((('testkey', 'testvalue',),))
+ servicer_context.set_trailing_metadata(((
+ 'testkey',
+ 'testvalue',
+ ),))
return b''.join(response_elements)
def handle_stream_stream(self, request_iterator, servicer_context):
self._control.control()
if servicer_context is not None:
- servicer_context.set_trailing_metadata((('testkey', 'testvalue',),))
+ servicer_context.set_trailing_metadata(((
+ 'testkey',
+ 'testvalue',
+ ),))
for request in request_iterator:
self._control.control()
yield request
@@ -162,9 +174,10 @@ def _stream_stream_multi_callable(channel):
class _ClientCallDetails(
- collections.namedtuple('_ClientCallDetails',
- ('method', 'timeout', 'metadata',
- 'credentials')), grpc.ClientCallDetails):
+ collections.namedtuple(
+ '_ClientCallDetails',
+ ('method', 'timeout', 'metadata', 'credentials')),
+ grpc.ClientCallDetails):
pass
@@ -262,7 +275,10 @@ def _append_request_header_interceptor(header, value):
metadata = []
if client_call_details.metadata:
metadata = list(client_call_details.metadata)
- metadata.append((header, value,))
+ metadata.append((
+ header,
+ value,
+ ))
client_call_details = _ClientCallDetails(
client_call_details.method, client_call_details.timeout, metadata,
client_call_details.credentials)
@@ -306,9 +322,11 @@ class InterceptorTest(unittest.TestCase):
self._server = grpc.server(
self._server_pool,
options=(('grpc.so_reuseport', 0),),
- interceptors=(_LoggingInterceptor('s1', self._record),
- conditional_interceptor,
- _LoggingInterceptor('s2', self._record),))
+ interceptors=(
+ _LoggingInterceptor('s1', self._record),
+ conditional_interceptor,
+ _LoggingInterceptor('s2', self._record),
+ ))
port = self._server.add_insecure_port('[::]:0')
self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),))
self._server.start()
@@ -333,8 +351,8 @@ class InterceptorTest(unittest.TestCase):
interceptor = _wrap_request_iterator_stream_interceptor(triple)
channel = grpc.intercept_channel(self._channel, interceptor)
- requests = tuple(b'\x07\x08'
- for _ in range(test_constants.STREAM_LENGTH))
+ requests = tuple(
+ b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
multi_callable = _stream_stream_multi_callable(channel)
response_iterator = multi_callable(
@@ -365,8 +383,8 @@ class InterceptorTest(unittest.TestCase):
multi_callable = _unary_unary_multi_callable(defective_channel)
call_future = multi_callable.future(
request,
- metadata=(
- ('test', 'InterceptedUnaryRequestBlockingUnaryResponse'),))
+ metadata=(('test',
+ 'InterceptedUnaryRequestBlockingUnaryResponse'),))
self.assertIsNotNone(call_future.exception())
self.assertEqual(call_future.code(), grpc.StatusCode.INTERNAL)
@@ -374,12 +392,14 @@ class InterceptorTest(unittest.TestCase):
def testInterceptedHeaderManipulationWithServerSideVerification(self):
request = b'\x07\x08'
- channel = grpc.intercept_channel(
- self._channel, _append_request_header_interceptor('secret', '42'))
- channel = grpc.intercept_channel(
- channel,
- _LoggingInterceptor('c1', self._record),
- _LoggingInterceptor('c2', self._record))
+ channel = grpc.intercept_channel(self._channel,
+ _append_request_header_interceptor(
+ 'secret', '42'))
+ channel = grpc.intercept_channel(channel,
+ _LoggingInterceptor(
+ 'c1', self._record),
+ _LoggingInterceptor(
+ 'c2', self._record))
self._record[:] = []
@@ -401,16 +421,17 @@ class InterceptorTest(unittest.TestCase):
self._record[:] = []
- channel = grpc.intercept_channel(
- self._channel,
- _LoggingInterceptor('c1', self._record),
- _LoggingInterceptor('c2', self._record))
+ channel = grpc.intercept_channel(self._channel,
+ _LoggingInterceptor(
+ 'c1', self._record),
+ _LoggingInterceptor(
+ 'c2', self._record))
multi_callable = _unary_unary_multi_callable(channel)
multi_callable(
request,
- metadata=(
- ('test', 'InterceptedUnaryRequestBlockingUnaryResponse'),))
+ metadata=(('test',
+ 'InterceptedUnaryRequestBlockingUnaryResponse'),))
self.assertSequenceEqual(self._record, [
'c1:intercept_unary_unary', 'c2:intercept_unary_unary',
@@ -420,10 +441,11 @@ class InterceptorTest(unittest.TestCase):
def testInterceptedUnaryRequestBlockingUnaryResponseWithCall(self):
request = b'\x07\x08'
- channel = grpc.intercept_channel(
- self._channel,
- _LoggingInterceptor('c1', self._record),
- _LoggingInterceptor('c2', self._record))
+ channel = grpc.intercept_channel(self._channel,
+ _LoggingInterceptor(
+ 'c1', self._record),
+ _LoggingInterceptor(
+ 'c2', self._record))
self._record[:] = []
@@ -443,10 +465,11 @@ class InterceptorTest(unittest.TestCase):
request = b'\x07\x08'
self._record[:] = []
- channel = grpc.intercept_channel(
- self._channel,
- _LoggingInterceptor('c1', self._record),
- _LoggingInterceptor('c2', self._record))
+ channel = grpc.intercept_channel(self._channel,
+ _LoggingInterceptor(
+ 'c1', self._record),
+ _LoggingInterceptor(
+ 'c2', self._record))
multi_callable = _unary_unary_multi_callable(channel)
response_future = multi_callable.future(
@@ -463,10 +486,11 @@ class InterceptorTest(unittest.TestCase):
request = b'\x37\x58'
self._record[:] = []
- channel = grpc.intercept_channel(
- self._channel,
- _LoggingInterceptor('c1', self._record),
- _LoggingInterceptor('c2', self._record))
+ channel = grpc.intercept_channel(self._channel,
+ _LoggingInterceptor(
+ 'c1', self._record),
+ _LoggingInterceptor(
+ 'c2', self._record))
multi_callable = _unary_stream_multi_callable(channel)
response_iterator = multi_callable(
@@ -480,21 +504,22 @@ class InterceptorTest(unittest.TestCase):
])
def testInterceptedStreamRequestBlockingUnaryResponse(self):
- requests = tuple(b'\x07\x08'
- for _ in range(test_constants.STREAM_LENGTH))
+ requests = tuple(
+ b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
request_iterator = iter(requests)
self._record[:] = []
- channel = grpc.intercept_channel(
- self._channel,
- _LoggingInterceptor('c1', self._record),
- _LoggingInterceptor('c2', self._record))
+ channel = grpc.intercept_channel(self._channel,
+ _LoggingInterceptor(
+ 'c1', self._record),
+ _LoggingInterceptor(
+ 'c2', self._record))
multi_callable = _stream_unary_multi_callable(channel)
multi_callable(
request_iterator,
- metadata=(
- ('test', 'InterceptedStreamRequestBlockingUnaryResponse'),))
+ metadata=(('test',
+ 'InterceptedStreamRequestBlockingUnaryResponse'),))
self.assertSequenceEqual(self._record, [
'c1:intercept_stream_unary', 'c2:intercept_stream_unary',
@@ -502,15 +527,16 @@ class InterceptorTest(unittest.TestCase):
])
def testInterceptedStreamRequestBlockingUnaryResponseWithCall(self):
- requests = tuple(b'\x07\x08'
- for _ in range(test_constants.STREAM_LENGTH))
+ requests = tuple(
+ b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
request_iterator = iter(requests)
self._record[:] = []
- channel = grpc.intercept_channel(
- self._channel,
- _LoggingInterceptor('c1', self._record),
- _LoggingInterceptor('c2', self._record))
+ channel = grpc.intercept_channel(self._channel,
+ _LoggingInterceptor(
+ 'c1', self._record),
+ _LoggingInterceptor(
+ 'c2', self._record))
multi_callable = _stream_unary_multi_callable(channel)
multi_callable.with_call(
@@ -525,15 +551,16 @@ class InterceptorTest(unittest.TestCase):
])
def testInterceptedStreamRequestFutureUnaryResponse(self):
- requests = tuple(b'\x07\x08'
- for _ in range(test_constants.STREAM_LENGTH))
+ requests = tuple(
+ b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
request_iterator = iter(requests)
self._record[:] = []
- channel = grpc.intercept_channel(
- self._channel,
- _LoggingInterceptor('c1', self._record),
- _LoggingInterceptor('c2', self._record))
+ channel = grpc.intercept_channel(self._channel,
+ _LoggingInterceptor(
+ 'c1', self._record),
+ _LoggingInterceptor(
+ 'c2', self._record))
multi_callable = _stream_unary_multi_callable(channel)
response_future = multi_callable.future(
@@ -547,15 +574,16 @@ class InterceptorTest(unittest.TestCase):
])
def testInterceptedStreamRequestStreamResponse(self):
- requests = tuple(b'\x77\x58'
- for _ in range(test_constants.STREAM_LENGTH))
+ requests = tuple(
+ b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH))
request_iterator = iter(requests)
self._record[:] = []
- channel = grpc.intercept_channel(
- self._channel,
- _LoggingInterceptor('c1', self._record),
- _LoggingInterceptor('c2', self._record))
+ channel = grpc.intercept_channel(self._channel,
+ _LoggingInterceptor(
+ 'c1', self._record),
+ _LoggingInterceptor(
+ 'c2', self._record))
multi_callable = _stream_stream_multi_callable(channel)
response_iterator = multi_callable(