From 41abb052b8ed180ca14b9ff1427a215b8e4dcd60 Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Sat, 5 Sep 2015 02:46:08 +0000 Subject: gRPC protocol objects --- .../grpcio/grpc/_adapter/_intermediary_low.py | 10 +- src/python/grpcio/grpc/_adapter/fore.py | 4 +- src/python/grpcio/grpc/_adapter/rear.py | 6 +- src/python/grpcio/grpc/_links/invocation.py | 57 ++++- src/python/grpcio/grpc/_links/service.py | 39 +++- src/python/grpcio/grpc/beta/interfaces.py | 58 ++++++ .../grpc_test/_adapter/_intermediary_low_test.py | 8 +- .../grpc_test/beta/_beta_features_test.py | 231 +++++++++++++++++++++ 8 files changed, 389 insertions(+), 24 deletions(-) create mode 100644 src/python/grpcio_test/grpc_test/beta/_beta_features_test.py diff --git a/src/python/grpcio/grpc/_adapter/_intermediary_low.py b/src/python/grpcio/grpc/_adapter/_intermediary_low.py index 735ad205a4..e2feec6ffb 100644 --- a/src/python/grpcio/grpc/_adapter/_intermediary_low.py +++ b/src/python/grpcio/grpc/_adapter/_intermediary_low.py @@ -59,6 +59,7 @@ from grpc._adapter import _types _IGNORE_ME_TAG = object() Code = _types.StatusCode +WriteFlags = _types.OpWriteFlags class Status(collections.namedtuple('Status', ['code', 'details'])): @@ -125,9 +126,9 @@ class Call(object): ], _TagAdapter(finish_tag, Event.Kind.FINISH)) return err0 if err0 != _types.CallError.OK else err1 if err1 != _types.CallError.OK else err2 if err2 != _types.CallError.OK else _types.CallError.OK - def write(self, message, tag): + def write(self, message, tag, flags): return self._internal.start_batch([ - _types.OpArgs.send_message(message, 0) + _types.OpArgs.send_message(message, flags) ], _TagAdapter(tag, Event.Kind.WRITE_ACCEPTED)) def complete(self, tag): @@ -163,8 +164,11 @@ class Call(object): def cancel(self): return self._internal.cancel() + def peer(self): + return self._internal.peer() + def set_credentials(self, creds): - return self._internal.set_credentials(creds) + return self._internal.set_credentials(creds._internal) class Channel(object): diff --git a/src/python/grpcio/grpc/_adapter/fore.py b/src/python/grpcio/grpc/_adapter/fore.py index daa41e8bde..acdd69c420 100644 --- a/src/python/grpcio/grpc/_adapter/fore.py +++ b/src/python/grpcio/grpc/_adapter/fore.py @@ -56,7 +56,7 @@ class _LowWrite(enum.Enum): def _write(call, rpc_state, payload): serialized_payload = rpc_state.serializer(payload) if rpc_state.write.low is _LowWrite.OPEN: - call.write(serialized_payload, call) + call.write(serialized_payload, call, 0) rpc_state.write.low = _LowWrite.ACTIVE else: rpc_state.write.pending.append(serialized_payload) @@ -164,7 +164,7 @@ class ForeLink(base_interfaces.ForeLink, activated.Activated): if rpc_state.write.pending: serialized_payload = rpc_state.write.pending.pop(0) - call.write(serialized_payload, call) + call.write(serialized_payload, call, 0) elif rpc_state.write.high is _common.HighWrite.CLOSED: _status(call, rpc_state) else: diff --git a/src/python/grpcio/grpc/_adapter/rear.py b/src/python/grpcio/grpc/_adapter/rear.py index fd6f45f7a7..17fa47f746 100644 --- a/src/python/grpcio/grpc/_adapter/rear.py +++ b/src/python/grpcio/grpc/_adapter/rear.py @@ -78,7 +78,7 @@ class _RPCState(object): def _write(operation_id, call, outstanding, write_state, serialized_payload): if write_state.low is _LowWrite.OPEN: - call.write(serialized_payload, operation_id) + call.write(serialized_payload, operation_id, 0) outstanding.add(_low.Event.Kind.WRITE_ACCEPTED) write_state.low = _LowWrite.ACTIVE elif write_state.low is _LowWrite.ACTIVE: @@ -144,7 +144,7 @@ class RearLink(base_interfaces.RearLink, activated.Activated): if event.write_accepted: if rpc_state.common.write.pending: rpc_state.call.write( - rpc_state.common.write.pending.pop(0), operation_id) + rpc_state.common.write.pending.pop(0), operation_id, 0) rpc_state.outstanding.add(_low.Event.Kind.WRITE_ACCEPTED) elif rpc_state.common.write.high is _common.HighWrite.CLOSED: rpc_state.call.complete(operation_id) @@ -263,7 +263,7 @@ class RearLink(base_interfaces.RearLink, activated.Activated): low_state = _LowWrite.OPEN else: serialized_payload = request_serializer(payload) - call.write(serialized_payload, operation_id) + call.write(serialized_payload, operation_id, 0) outstanding.add(_low.Event.Kind.WRITE_ACCEPTED) low_state = _LowWrite.ACTIVE diff --git a/src/python/grpcio/grpc/_links/invocation.py b/src/python/grpcio/grpc/_links/invocation.py index fecb550ae0..67ef86a176 100644 --- a/src/python/grpcio/grpc/_links/invocation.py +++ b/src/python/grpcio/grpc/_links/invocation.py @@ -37,6 +37,7 @@ import time from grpc._adapter import _intermediary_low from grpc._links import _constants +from grpc.beta import interfaces as beta_interfaces from grpc.framework.foundation import activated from grpc.framework.foundation import logging_pool from grpc.framework.foundation import relay @@ -73,11 +74,28 @@ class _LowWrite(enum.Enum): CLOSED = 'CLOSED' +class _Context(beta_interfaces.GRPCInvocationContext): + + def __init__(self): + self._lock = threading.Lock() + self._disable_next_compression = False + + def disable_next_request_compression(self): + with self._lock: + self._disable_next_compression = True + + def next_compression_disabled(self): + with self._lock: + disabled = self._disable_next_compression + self._disable_next_compression = False + return disabled + + class _RPCState(object): def __init__( self, call, request_serializer, response_deserializer, sequence_number, - read, allowance, high_write, low_write, due): + read, allowance, high_write, low_write, due, context): self.call = call self.request_serializer = request_serializer self.response_deserializer = response_deserializer @@ -87,6 +105,7 @@ class _RPCState(object): self.high_write = high_write self.low_write = low_write self.due = due + self.context = context def _no_longer_due(kind, rpc_state, key, rpc_states): @@ -209,7 +228,7 @@ class _Kernel(object): def _invoke( self, operation_id, group, method, initial_metadata, payload, termination, - timeout, allowance): + timeout, allowance, options): """Invoke an RPC. Args: @@ -224,6 +243,7 @@ class _Kernel(object): timeout: A duration of time in seconds to allow for the RPC. allowance: The number of payloads (beyond the free first one) that the local ticket exchange mate has granted permission to be read. + options: A beta_interfaces.GRPCCallOptions value or None. """ if termination is links.Ticket.Termination.COMPLETION: high_write = _HighWrite.CLOSED @@ -241,6 +261,8 @@ class _Kernel(object): call = _intermediary_low.Call( self._channel, self._completion_queue, '/%s/%s' % (group, method), self._host, time.time() + timeout) + if options is not None and options.credentials is not None: + call.set_credentials(options.credentials._intermediary_low_credentials) if transformed_initial_metadata is not None: for metadata_key, metadata_value in transformed_initial_metadata: call.add_metadata(metadata_key, metadata_value) @@ -254,17 +276,33 @@ class _Kernel(object): low_write = _LowWrite.OPEN due = set((_METADATA, _FINISH,)) else: - call.write(request_serializer(payload), operation_id) + if options is not None and options.disable_compression: + flags = _intermediary_low.WriteFlags.WRITE_NO_COMPRESS + else: + flags = 0 + call.write(request_serializer(payload), operation_id, flags) low_write = _LowWrite.ACTIVE due = set((_WRITE, _METADATA, _FINISH,)) + context = _Context() self._rpc_states[operation_id] = _RPCState( - call, request_serializer, response_deserializer, 0, + call, request_serializer, response_deserializer, 1, _Read.AWAITING_METADATA, 1 if allowance is None else (1 + allowance), - high_write, low_write, due) + high_write, low_write, due, context) + protocol = links.Protocol(links.Protocol.Kind.INVOCATION_CONTEXT, context) + ticket = links.Ticket( + operation_id, 0, None, None, None, None, None, None, None, None, None, + None, None, protocol) + self._relay.add_value(ticket) def _advance(self, operation_id, rpc_state, payload, termination, allowance): if payload is not None: - rpc_state.call.write(rpc_state.request_serializer(payload), operation_id) + disable_compression = rpc_state.context.next_compression_disabled() + if disable_compression: + flags = _intermediary_low.WriteFlags.WRITE_NO_COMPRESS + else: + flags = 0 + rpc_state.call.write( + rpc_state.request_serializer(payload), operation_id, flags) rpc_state.low_write = _LowWrite.ACTIVE rpc_state.due.add(_WRITE) @@ -292,10 +330,15 @@ class _Kernel(object): if self._completion_queue is None: logging.error('Received invocation ticket %s after stop!', ticket) else: + if (ticket.protocol is not None and + ticket.protocol.kind is links.Protocol.Kind.CALL_OPTION): + grpc_call_options = ticket.protocol.value + else: + grpc_call_options = None self._invoke( ticket.operation_id, ticket.group, ticket.method, ticket.initial_metadata, ticket.payload, ticket.termination, - ticket.timeout, ticket.allowance) + ticket.timeout, ticket.allowance, grpc_call_options) else: rpc_state = self._rpc_states.get(ticket.operation_id) if rpc_state is not None: diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py index 07772c7de3..f56df84007 100644 --- a/src/python/grpcio/grpc/_links/service.py +++ b/src/python/grpcio/grpc/_links/service.py @@ -37,6 +37,7 @@ import time from grpc._adapter import _intermediary_low from grpc._links import _constants +from grpc.beta import interfaces as beta_interfaces from grpc.framework.foundation import logging_pool from grpc.framework.foundation import relay from grpc.framework.interfaces.links import links @@ -89,12 +90,34 @@ class _LowWrite(enum.Enum): CLOSED = 'CLOSED' +class _Context(beta_interfaces.GRPCServicerContext): + + def __init__(self, call): + self._lock = threading.Lock() + self._call = call + self._disable_next_compression = False + + def peer(self): + with self._lock: + return self._call.peer() + + def disable_next_response_compression(self): + with self._lock: + self._disable_next_compression = True + + def next_compression_disabled(self): + with self._lock: + disabled = self._disable_next_compression + self._disable_next_compression = False + return disabled + + class _RPCState(object): def __init__( self, request_deserializer, response_serializer, sequence_number, read, early_read, allowance, high_write, low_write, premetadataed, - terminal_metadata, code, message, due): + terminal_metadata, code, message, due, context): self.request_deserializer = request_deserializer self.response_serializer = response_serializer self.sequence_number = sequence_number @@ -110,6 +133,7 @@ class _RPCState(object): self.code = code self.message = message self.due = due + self.context = context def _no_longer_due(kind, rpc_state, key, rpc_states): @@ -163,12 +187,12 @@ class _Kernel(object): (group, method), _IDENTITY) call.read(call) + context = _Context(call) self._rpc_states[call] = _RPCState( request_deserializer, response_serializer, 1, _Read.READING, None, 1, _HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None, - set((_READ, _FINISH,))) - protocol = links.Protocol( - links.Protocol.Kind.SERVICER_CONTEXT, 'TODO: Service Context Object!') + set((_READ, _FINISH,)), context) + protocol = links.Protocol(links.Protocol.Kind.SERVICER_CONTEXT, context) ticket = links.Ticket( call, 0, group, method, links.Ticket.Subscription.FULL, service_acceptance.deadline - time.time(), None, event.metadata, None, @@ -313,7 +337,12 @@ class _Kernel(object): self._relay.add_value(early_read_ticket) if ticket.payload is not None: - call.write(rpc_state.response_serializer(ticket.payload), call) + disable_compression = rpc_state.context.next_compression_disabled() + if disable_compression: + flags = _intermediary_low.WriteFlags.WRITE_NO_COMPRESS + else: + flags = 0 + call.write(rpc_state.response_serializer(ticket.payload), call, flags) rpc_state.due.add(_WRITE) rpc_state.low_write = _LowWrite.ACTIVE diff --git a/src/python/grpcio/grpc/beta/interfaces.py b/src/python/grpcio/grpc/beta/interfaces.py index 25e6a9c66b..79f2620dd4 100644 --- a/src/python/grpcio/grpc/beta/interfaces.py +++ b/src/python/grpcio/grpc/beta/interfaces.py @@ -29,6 +29,7 @@ """Constants and interfaces of the Beta API of gRPC Python.""" +import abc import enum @@ -52,3 +53,60 @@ class StatusCode(enum.Enum): UNAVAILABLE = 14 DATA_LOSS = 15 UNAUTHENTICATED = 16 + + +class GRPCCallOptions(object): + """A value encapsulating gRPC-specific options passed on RPC invocation. + + This class and its instances have no supported interface - it exists to + define the type of its instances and its instances exist to be passed to + other functions. + """ + + def __init__(self, disable_compression, subcall_of, credentials): + self.disable_compression = disable_compression + self.subcall_of = subcall_of + self.credentials = credentials + + +def grpc_call_options(disable_compression=False, credentials=None): + """Creates a GRPCCallOptions value to be passed at RPC invocation. + + All parameters are optional and should always be passed by keyword. + + Args: + disable_compression: A boolean indicating whether or not compression should + be disabled for the request object of the RPC. Only valid for + request-unary RPCs. + credentials: A ClientCredentials object to use for the invoked RPC. + """ + return GRPCCallOptions(disable_compression, None, credentials) + + +class GRPCServicerContext(object): + """Exposes gRPC-specific options and behaviors to code servicing RPCs.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def peer(self): + """Identifies the peer that invoked the RPC being serviced. + + Returns: + A string identifying the peer that invoked the RPC being serviced. + """ + raise NotImplementedError() + + @abc.abstractmethod + def disable_next_response_compression(self): + """Disables compression of the next response passed by the application.""" + raise NotImplementedError() + + +class GRPCInvocationContext(object): + """Exposes gRPC-specific options and behaviors to code invoking RPCs.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def disable_next_request_compression(self): + """Disables compression of the next request passed by the application.""" + raise NotImplementedError() diff --git a/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py index 27a5b82e9c..90ad0b9bcb 100644 --- a/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py +++ b/src/python/grpcio_test/grpc_test/_adapter/_intermediary_low_test.py @@ -191,7 +191,7 @@ class EchoTest(unittest.TestCase): metadata[server_leading_binary_metadata_key]) for datum in test_data: - client_call.write(datum, write_tag) + client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS) write_accepted = self.client_events.get() self.assertIsNotNone(write_accepted) self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED) @@ -206,7 +206,7 @@ class EchoTest(unittest.TestCase): self.assertIsNotNone(read_accepted.bytes) server_data.append(read_accepted.bytes) - server_call.write(read_accepted.bytes, write_tag) + server_call.write(read_accepted.bytes, write_tag, 0) write_accepted = self.server_events.get() self.assertIsNotNone(write_accepted) self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind) @@ -370,14 +370,14 @@ class CancellationTest(unittest.TestCase): self.assertIsNotNone(metadata_accepted) for datum in test_data: - client_call.write(datum, write_tag) + client_call.write(datum, write_tag, 0) write_accepted = self.client_events.get() server_call.read(read_tag) read_accepted = self.server_events.get() server_data.append(read_accepted.bytes) - server_call.write(read_accepted.bytes, write_tag) + server_call.write(read_accepted.bytes, write_tag, 0) write_accepted = self.server_events.get() self.assertIsNotNone(write_accepted) diff --git a/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py new file mode 100644 index 0000000000..89fe4b2acf --- /dev/null +++ b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py @@ -0,0 +1,231 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests Face interface compliance of the gRPC Python Beta API.""" + +import threading +import unittest + +from grpc.beta import beta +from grpc.beta import interfaces +from grpc.framework.common import cardinality +from grpc.framework.interfaces.face import utilities +from grpc_test import resources +from grpc_test.beta import test_utilities +from grpc_test.framework.common import test_constants + +_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' + +_GROUP = 'group' +_UNARY_UNARY = 'unary-unary' +_UNARY_STREAM = 'unary-stream' +_STREAM_UNARY = 'stream-unary' +_STREAM_STREAM = 'stream-stream' + +_REQUEST = b'abc' +_RESPONSE = b'123' + + +class _Servicer(object): + + def __init__(self): + self._condition = threading.Condition() + self._peer = None + self._serviced = False + + def unary_unary(self, request, context): + with self._condition: + self._request = request + self._peer = context.protocol_context().peer() + context.protocol_context().disable_next_response_compression() + self._serviced = True + self._condition.notify_all() + return _RESPONSE + + def unary_stream(self, request, context): + with self._condition: + self._request = request + self._peer = context.protocol_context().peer() + context.protocol_context().disable_next_response_compression() + self._serviced = True + self._condition.notify_all() + return + yield + + def stream_unary(self, request_iterator, context): + for request in request_iterator: + self._request = request + with self._condition: + self._peer = context.protocol_context().peer() + context.protocol_context().disable_next_response_compression() + self._serviced = True + self._condition.notify_all() + return _RESPONSE + + def stream_stream(self, request_iterator, context): + for request in request_iterator: + with self._condition: + self._peer = context.protocol_context().peer() + context.protocol_context().disable_next_response_compression() + yield _RESPONSE + with self._condition: + self._serviced = True + self._condition.notify_all() + + def peer(self): + with self._condition: + return self._peer + + def block_until_serviced(self): + with self._condition: + while not self._serviced: + self._condition.wait() + + +class _BlockingIterator(object): + + def __init__(self, upstream): + self._condition = threading.Condition() + self._upstream = upstream + self._allowed = [] + + def __iter__(self): + return self + + def next(self): + with self._condition: + while True: + if self._allowed is None: + raise StopIteration() + elif self._allowed: + return self._allowed.pop(0) + else: + self._condition.wait() + + def allow(self): + with self._condition: + try: + self._allowed.append(next(self._upstream)) + except StopIteration: + self._allowed = None + self._condition.notify_all() + + +class BetaFeaturesTest(unittest.TestCase): + + def setUp(self): + self._servicer = _Servicer() + method_implementations = { + (_GROUP, _UNARY_UNARY): + utilities.unary_unary_inline(self._servicer.unary_unary), + (_GROUP, _UNARY_STREAM): + utilities.unary_stream_inline(self._servicer.unary_stream), + (_GROUP, _STREAM_UNARY): + utilities.stream_unary_inline(self._servicer.stream_unary), + (_GROUP, _STREAM_STREAM): + utilities.stream_stream_inline(self._servicer.stream_stream), + } + + cardinalities = { + _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY, + _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM, + _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY, + _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM, + } + + server_options = beta.server_options( + thread_pool_size=test_constants.POOL_SIZE) + self._server = beta.server(method_implementations, options=server_options) + server_credentials = beta.ssl_server_credentials( + [(resources.private_key(), resources.certificate_chain(),),]) + port = self._server.add_secure_port('[::]:0', server_credentials) + self._server.start() + self._client_credentials = beta.ssl_client_credentials( + resources.test_root_certificates(), None, None) + channel = test_utilities.create_not_really_secure_channel( + 'localhost', port, self._client_credentials, _SERVER_HOST_OVERRIDE) + stub_options = beta.stub_options( + thread_pool_size=test_constants.POOL_SIZE) + self._dynamic_stub = beta.dynamic_stub( + channel, _GROUP, cardinalities, options=stub_options) + + def tearDown(self): + self._dynamic_stub = None + self._server.stop(test_constants.SHORT_TIMEOUT).wait() + + def test_unary_unary(self): + call_options = interfaces.grpc_call_options( + disable_compression=True, credentials=self._client_credentials) + response = getattr(self._dynamic_stub, _UNARY_UNARY)( + _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options) + self.assertEqual(_RESPONSE, response) + self.assertIsNotNone(self._servicer.peer()) + + def test_unary_stream(self): + call_options = interfaces.grpc_call_options( + disable_compression=True, credentials=self._client_credentials) + response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)( + _REQUEST, test_constants.LONG_TIMEOUT, protocol_options=call_options) + self._servicer.block_until_serviced() + self.assertIsNotNone(self._servicer.peer()) + + def test_stream_unary(self): + call_options = interfaces.grpc_call_options( + credentials=self._client_credentials) + request_iterator = _BlockingIterator(iter((_REQUEST,))) + response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future( + request_iterator, test_constants.LONG_TIMEOUT, + protocol_options=call_options) + response_future.protocol_context().disable_next_request_compression() + request_iterator.allow() + response_future.protocol_context().disable_next_request_compression() + request_iterator.allow() + self._servicer.block_until_serviced() + self.assertIsNotNone(self._servicer.peer()) + self.assertEqual(_RESPONSE, response_future.result()) + + def test_stream_stream(self): + call_options = interfaces.grpc_call_options( + credentials=self._client_credentials) + request_iterator = _BlockingIterator(iter((_REQUEST,))) + response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)( + request_iterator, test_constants.SHORT_TIMEOUT, + protocol_options=call_options) + response_iterator.protocol_context().disable_next_request_compression() + request_iterator.allow() + response = next(response_iterator) + response_iterator.protocol_context().disable_next_request_compression() + request_iterator.allow() + self._servicer.block_until_serviced() + self.assertIsNotNone(self._servicer.peer()) + self.assertEqual(_RESPONSE, response) + + +if __name__ == '__main__': + unittest.main(verbosity=2) -- cgit v1.2.3