diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py')
-rw-r--r-- | src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py | 550 |
1 files changed, 275 insertions, 275 deletions
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py index f0befb0b27..0411da0a66 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Code for making a service.TestService more amenable to use in tests.""" import collections @@ -49,17 +48,16 @@ _IDENTITY = lambda x: x class TestServiceDigest( - collections.namedtuple( - 'TestServiceDigest', - ('methods', - 'inline_method_implementations', - 'event_method_implementations', - 'multi_method_implementation', - 'unary_unary_messages_sequences', - 'unary_stream_messages_sequences', - 'stream_unary_messages_sequences', - 'stream_stream_messages_sequences',))): - """A transformation of a service.TestService. + collections.namedtuple('TestServiceDigest', ( + 'methods', + 'inline_method_implementations', + 'event_method_implementations', + 'multi_method_implementation', + 'unary_unary_messages_sequences', + 'unary_stream_messages_sequences', + 'stream_unary_messages_sequences', + 'stream_stream_messages_sequences',))): + """A transformation of a service.TestService. Attributes: methods: A dict from method group-name pair to test_interfaces.Method object @@ -88,303 +86,308 @@ class TestServiceDigest( class _BufferingConsumer(stream.Consumer): - """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" + """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" - def __init__(self): - self.consumed = [] - self.terminated = False + def __init__(self): + self.consumed = [] + self.terminated = False - def consume(self, value): - self.consumed.append(value) + def consume(self, value): + self.consumed.append(value) - def terminate(self): - self.terminated = True + def terminate(self): + self.terminated = True - def consume_and_terminate(self, value): - self.consumed.append(value) - self.terminated = True + def consume_and_terminate(self, value): + self.consumed.append(value) + self.terminated = True class _InlineUnaryUnaryMethod(face.MethodImplementation): - def __init__(self, unary_unary_test_method, control): - self._test_method = unary_unary_test_method - self._control = control + def __init__(self, unary_unary_test_method, control): + self._test_method = unary_unary_test_method + self._control = control - self.cardinality = cardinality.Cardinality.UNARY_UNARY - self.style = style.Service.INLINE + self.cardinality = cardinality.Cardinality.UNARY_UNARY + self.style = style.Service.INLINE - def unary_unary_inline(self, request, context): - response_list = [] - self._test_method.service( - request, response_list.append, context, self._control) - return response_list.pop(0) + def unary_unary_inline(self, request, context): + response_list = [] + self._test_method.service(request, response_list.append, context, + self._control) + return response_list.pop(0) class _EventUnaryUnaryMethod(face.MethodImplementation): - def __init__(self, unary_unary_test_method, control, pool): - self._test_method = unary_unary_test_method - self._control = control - self._pool = pool + def __init__(self, unary_unary_test_method, control, pool): + self._test_method = unary_unary_test_method + self._control = control + self._pool = pool - self.cardinality = cardinality.Cardinality.UNARY_UNARY - self.style = style.Service.EVENT + self.cardinality = cardinality.Cardinality.UNARY_UNARY + self.style = style.Service.EVENT - def unary_unary_event(self, request, response_callback, context): - if self._pool is None: - self._test_method.service( - request, response_callback, context, self._control) - else: - self._pool.submit( - self._test_method.service, request, response_callback, context, - self._control) + def unary_unary_event(self, request, response_callback, context): + if self._pool is None: + self._test_method.service(request, response_callback, context, + self._control) + else: + self._pool.submit(self._test_method.service, request, + response_callback, context, self._control) class _InlineUnaryStreamMethod(face.MethodImplementation): - def __init__(self, unary_stream_test_method, control): - self._test_method = unary_stream_test_method - self._control = control + def __init__(self, unary_stream_test_method, control): + self._test_method = unary_stream_test_method + self._control = control - self.cardinality = cardinality.Cardinality.UNARY_STREAM - self.style = style.Service.INLINE + self.cardinality = cardinality.Cardinality.UNARY_STREAM + self.style = style.Service.INLINE - def unary_stream_inline(self, request, context): - response_consumer = _BufferingConsumer() - self._test_method.service( - request, response_consumer, context, self._control) - for response in response_consumer.consumed: - yield response + def unary_stream_inline(self, request, context): + response_consumer = _BufferingConsumer() + self._test_method.service(request, response_consumer, context, + self._control) + for response in response_consumer.consumed: + yield response class _EventUnaryStreamMethod(face.MethodImplementation): - def __init__(self, unary_stream_test_method, control, pool): - self._test_method = unary_stream_test_method - self._control = control - self._pool = pool + def __init__(self, unary_stream_test_method, control, pool): + self._test_method = unary_stream_test_method + self._control = control + self._pool = pool - self.cardinality = cardinality.Cardinality.UNARY_STREAM - self.style = style.Service.EVENT + self.cardinality = cardinality.Cardinality.UNARY_STREAM + self.style = style.Service.EVENT - def unary_stream_event(self, request, response_consumer, context): - if self._pool is None: - self._test_method.service( - request, response_consumer, context, self._control) - else: - self._pool.submit( - self._test_method.service, request, response_consumer, context, - self._control) + def unary_stream_event(self, request, response_consumer, context): + if self._pool is None: + self._test_method.service(request, response_consumer, context, + self._control) + else: + self._pool.submit(self._test_method.service, request, + response_consumer, context, self._control) class _InlineStreamUnaryMethod(face.MethodImplementation): - def __init__(self, stream_unary_test_method, control): - self._test_method = stream_unary_test_method - self._control = control + def __init__(self, stream_unary_test_method, control): + self._test_method = stream_unary_test_method + self._control = control - self.cardinality = cardinality.Cardinality.STREAM_UNARY - self.style = style.Service.INLINE + self.cardinality = cardinality.Cardinality.STREAM_UNARY + self.style = style.Service.INLINE - def stream_unary_inline(self, request_iterator, context): - response_list = [] - request_consumer = self._test_method.service( - response_list.append, context, self._control) - for request in request_iterator: - request_consumer.consume(request) - request_consumer.terminate() - return response_list.pop(0) + def stream_unary_inline(self, request_iterator, context): + response_list = [] + request_consumer = self._test_method.service(response_list.append, + context, self._control) + for request in request_iterator: + request_consumer.consume(request) + request_consumer.terminate() + return response_list.pop(0) class _EventStreamUnaryMethod(face.MethodImplementation): - def __init__(self, stream_unary_test_method, control, pool): - self._test_method = stream_unary_test_method - self._control = control - self._pool = pool + def __init__(self, stream_unary_test_method, control, pool): + self._test_method = stream_unary_test_method + self._control = control + self._pool = pool - self.cardinality = cardinality.Cardinality.STREAM_UNARY - self.style = style.Service.EVENT + self.cardinality = cardinality.Cardinality.STREAM_UNARY + self.style = style.Service.EVENT - def stream_unary_event(self, response_callback, context): - request_consumer = self._test_method.service( - response_callback, context, self._control) - if self._pool is None: - return request_consumer - else: - return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + def stream_unary_event(self, response_callback, context): + request_consumer = self._test_method.service(response_callback, context, + self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, + self._pool) class _InlineStreamStreamMethod(face.MethodImplementation): - def __init__(self, stream_stream_test_method, control): - self._test_method = stream_stream_test_method - self._control = control + def __init__(self, stream_stream_test_method, control): + self._test_method = stream_stream_test_method + self._control = control - self.cardinality = cardinality.Cardinality.STREAM_STREAM - self.style = style.Service.INLINE + self.cardinality = cardinality.Cardinality.STREAM_STREAM + self.style = style.Service.INLINE - def stream_stream_inline(self, request_iterator, context): - response_consumer = _BufferingConsumer() - request_consumer = self._test_method.service( - response_consumer, context, self._control) + def stream_stream_inline(self, request_iterator, context): + response_consumer = _BufferingConsumer() + request_consumer = self._test_method.service(response_consumer, context, + self._control) - for request in request_iterator: - request_consumer.consume(request) - while response_consumer.consumed: - yield response_consumer.consumed.pop(0) - response_consumer.terminate() + for request in request_iterator: + request_consumer.consume(request) + while response_consumer.consumed: + yield response_consumer.consumed.pop(0) + response_consumer.terminate() class _EventStreamStreamMethod(face.MethodImplementation): - def __init__(self, stream_stream_test_method, control, pool): - self._test_method = stream_stream_test_method - self._control = control - self._pool = pool + def __init__(self, stream_stream_test_method, control, pool): + self._test_method = stream_stream_test_method + self._control = control + self._pool = pool - self.cardinality = cardinality.Cardinality.STREAM_STREAM - self.style = style.Service.EVENT + self.cardinality = cardinality.Cardinality.STREAM_STREAM + self.style = style.Service.EVENT - def stream_stream_event(self, response_consumer, context): - request_consumer = self._test_method.service( - response_consumer, context, self._control) - if self._pool is None: - return request_consumer - else: - return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + def stream_stream_event(self, response_consumer, context): + request_consumer = self._test_method.service(response_consumer, context, + self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, + self._pool) class _UnaryConsumer(stream.Consumer): - """A Consumer that only allows consumption of exactly one value.""" - - def __init__(self, action): - self._lock = threading.Lock() - self._action = action - self._consumed = False - self._terminated = False - - def consume(self, value): - with self._lock: - if self._consumed: - raise ValueError('Unary consumer already consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._consumed = True - - self._action(value) - - def terminate(self): - with self._lock: - if not self._consumed: - raise ValueError('Unary consumer hasn\'t yet consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._terminated = True - - def consume_and_terminate(self, value): - with self._lock: - if self._consumed: - raise ValueError('Unary consumer already consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._consumed = True - self._terminated = True - - self._action(value) + """A Consumer that only allows consumption of exactly one value.""" + + def __init__(self, action): + self._lock = threading.Lock() + self._action = action + self._consumed = False + self._terminated = False + + def consume(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + + self._action(value) + + def terminate(self): + with self._lock: + if not self._consumed: + raise ValueError('Unary consumer hasn\'t yet consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._terminated = True + + def consume_and_terminate(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + self._terminated = True + + self._action(value) class _UnaryUnaryAdaptation(object): - def __init__(self, unary_unary_test_method): - self._method = unary_unary_test_method + def __init__(self, unary_unary_test_method): + self._method = unary_unary_test_method + + def service(self, response_consumer, context, control): + + def action(request): + self._method.service(request, + response_consumer.consume_and_terminate, + context, control) - def service(self, response_consumer, context, control): - def action(request): - self._method.service( - request, response_consumer.consume_and_terminate, context, control) - return _UnaryConsumer(action) + return _UnaryConsumer(action) class _UnaryStreamAdaptation(object): - def __init__(self, unary_stream_test_method): - self._method = unary_stream_test_method + def __init__(self, unary_stream_test_method): + self._method = unary_stream_test_method + + def service(self, response_consumer, context, control): + + def action(request): + self._method.service(request, response_consumer, context, control) - def service(self, response_consumer, context, control): - def action(request): - self._method.service(request, response_consumer, context, control) - return _UnaryConsumer(action) + return _UnaryConsumer(action) class _StreamUnaryAdaptation(object): - def __init__(self, stream_unary_test_method): - self._method = stream_unary_test_method + def __init__(self, stream_unary_test_method): + self._method = stream_unary_test_method - def service(self, response_consumer, context, control): - return self._method.service( - response_consumer.consume_and_terminate, context, control) + def service(self, response_consumer, context, control): + return self._method.service(response_consumer.consume_and_terminate, + context, control) class _MultiMethodImplementation(face.MultiMethodImplementation): - def __init__(self, methods, control, pool): - self._methods = methods - self._control = control - self._pool = pool + def __init__(self, methods, control, pool): + self._methods = methods + self._control = control + self._pool = pool - def service(self, group, name, response_consumer, context): - method = self._methods.get(group, name, None) - if method is None: - raise face.NoSuchMethodError(group, name) - elif self._pool is None: - return method(response_consumer, context, self._control) - else: - request_consumer = method(response_consumer, context, self._control) - return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + def service(self, group, name, response_consumer, context): + method = self._methods.get(group, name, None) + if method is None: + raise face.NoSuchMethodError(group, name) + elif self._pool is None: + return method(response_consumer, context, self._control) + else: + request_consumer = method(response_consumer, context, self._control) + return stream_util.ThreadSwitchingConsumer(request_consumer, + self._pool) class _Assembly( - collections.namedtuple( - '_Assembly', - ['methods', 'inlines', 'events', 'adaptations', 'messages'])): - """An intermediate structure created when creating a TestServiceDigest.""" - - -def _assemble( - scenarios, identifiers, inline_method_constructor, event_method_constructor, - adapter, control, pool): - """Creates an _Assembly from the given scenarios.""" - methods = {} - inlines = {} - events = {} - adaptations = {} - messages = {} - for identifier, scenario in six.iteritems(scenarios): - if identifier in identifiers: - raise ValueError('Repeated identifier "(%s, %s)"!' % identifier) - - test_method = scenario[0] - inline_method = inline_method_constructor(test_method, control) - event_method = event_method_constructor(test_method, control, pool) - adaptation = adapter(test_method) - - methods[identifier] = test_method - inlines[identifier] = inline_method - events[identifier] = event_method - adaptations[identifier] = adaptation - messages[identifier] = scenario[1] - - return _Assembly(methods, inlines, events, adaptations, messages) + collections.namedtuple( + '_Assembly', + ['methods', 'inlines', 'events', 'adaptations', 'messages'])): + """An intermediate structure created when creating a TestServiceDigest.""" + + +def _assemble(scenarios, identifiers, inline_method_constructor, + event_method_constructor, adapter, control, pool): + """Creates an _Assembly from the given scenarios.""" + methods = {} + inlines = {} + events = {} + adaptations = {} + messages = {} + for identifier, scenario in six.iteritems(scenarios): + if identifier in identifiers: + raise ValueError('Repeated identifier "(%s, %s)"!' % identifier) + + test_method = scenario[0] + inline_method = inline_method_constructor(test_method, control) + event_method = event_method_constructor(test_method, control, pool) + adaptation = adapter(test_method) + + methods[identifier] = test_method + inlines[identifier] = inline_method + events[identifier] = event_method + adaptations[identifier] = adaptation + messages[identifier] = scenario[1] + + return _Assembly(methods, inlines, events, adaptations, messages) def digest(service, control, pool): - """Creates a TestServiceDigest from a TestService. + """Creates a TestServiceDigest from a TestService. Args: service: A _service.TestService. @@ -396,51 +399,48 @@ def digest(service, control, pool): Returns: A TestServiceDigest synthesized from the given service.TestService. """ - identifiers = set() - - unary_unary = _assemble( - service.unary_unary_scenarios(), identifiers, _InlineUnaryUnaryMethod, - _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool) - identifiers.update(unary_unary.inlines) - - unary_stream = _assemble( - service.unary_stream_scenarios(), identifiers, _InlineUnaryStreamMethod, - _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool) - identifiers.update(unary_stream.inlines) - - stream_unary = _assemble( - service.stream_unary_scenarios(), identifiers, _InlineStreamUnaryMethod, - _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool) - identifiers.update(stream_unary.inlines) - - stream_stream = _assemble( - service.stream_stream_scenarios(), identifiers, _InlineStreamStreamMethod, - _EventStreamStreamMethod, _IDENTITY, control, pool) - identifiers.update(stream_stream.inlines) - - methods = dict(unary_unary.methods) - methods.update(unary_stream.methods) - methods.update(stream_unary.methods) - methods.update(stream_stream.methods) - adaptations = dict(unary_unary.adaptations) - adaptations.update(unary_stream.adaptations) - adaptations.update(stream_unary.adaptations) - adaptations.update(stream_stream.adaptations) - inlines = dict(unary_unary.inlines) - inlines.update(unary_stream.inlines) - inlines.update(stream_unary.inlines) - inlines.update(stream_stream.inlines) - events = dict(unary_unary.events) - events.update(unary_stream.events) - events.update(stream_unary.events) - events.update(stream_stream.events) - - return TestServiceDigest( - methods, - inlines, - events, - _MultiMethodImplementation(adaptations, control, pool), - unary_unary.messages, - unary_stream.messages, - stream_unary.messages, - stream_stream.messages) + identifiers = set() + + unary_unary = _assemble(service.unary_unary_scenarios(), identifiers, + _InlineUnaryUnaryMethod, _EventUnaryUnaryMethod, + _UnaryUnaryAdaptation, control, pool) + identifiers.update(unary_unary.inlines) + + unary_stream = _assemble(service.unary_stream_scenarios(), identifiers, + _InlineUnaryStreamMethod, _EventUnaryStreamMethod, + _UnaryStreamAdaptation, control, pool) + identifiers.update(unary_stream.inlines) + + stream_unary = _assemble(service.stream_unary_scenarios(), identifiers, + _InlineStreamUnaryMethod, _EventStreamUnaryMethod, + _StreamUnaryAdaptation, control, pool) + identifiers.update(stream_unary.inlines) + + stream_stream = _assemble(service.stream_stream_scenarios(), identifiers, + _InlineStreamStreamMethod, + _EventStreamStreamMethod, _IDENTITY, control, + pool) + identifiers.update(stream_stream.inlines) + + methods = dict(unary_unary.methods) + methods.update(unary_stream.methods) + methods.update(stream_unary.methods) + methods.update(stream_stream.methods) + adaptations = dict(unary_unary.adaptations) + adaptations.update(unary_stream.adaptations) + adaptations.update(stream_unary.adaptations) + adaptations.update(stream_stream.adaptations) + inlines = dict(unary_unary.inlines) + inlines.update(unary_stream.inlines) + inlines.update(stream_unary.inlines) + inlines.update(stream_stream.inlines) + events = dict(unary_unary.events) + events.update(unary_stream.events) + events.update(stream_unary.events) + events.update(stream_stream.events) + + return TestServiceDigest( + methods, inlines, events, + _MultiMethodImplementation(adaptations, control, pool), + unary_unary.messages, unary_stream.messages, stream_unary.messages, + stream_stream.messages) |