aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/tests/unit/framework/face/testing/digest.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/tests/unit/framework/face/testing/digest.py')
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/digest.py452
1 files changed, 0 insertions, 452 deletions
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/digest.py b/src/python/grpcio/tests/unit/framework/face/testing/digest.py
deleted file mode 100644
index 2b45aded20..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/digest.py
+++ /dev/null
@@ -1,452 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Code for making a service.TestService more amenable to use in tests."""
-
-import collections
-import threading
-
-import six
-
-# testing_control, interfaces, and testing_service are referenced from
-# specification in this module.
-from grpc.framework.common import cardinality
-from grpc.framework.common import style
-from grpc.framework.face import exceptions
-from grpc.framework.face import interfaces as face_interfaces
-from grpc.framework.foundation import stream
-from grpc.framework.foundation import stream_util
-from tests.unit.framework.face.testing import control as testing_control # pylint: disable=unused-import
-from tests.unit.framework.face.testing import interfaces # pylint: disable=unused-import
-from tests.unit.framework.face.testing import service as testing_service # pylint: disable=unused-import
-
-_IDENTITY = lambda x: x
-
-
-class TestServiceDigest(
- collections.namedtuple(
- 'TestServiceDigest',
- ['name',
- 'methods',
- 'inline_method_implementations',
- 'event_method_implementations',
- 'multi_method_implementation',
- 'unary_unary_messages_sequences',
- 'unary_stream_messages_sequences',
- 'stream_unary_messages_sequences',
- 'stream_stream_messages_sequences'])):
- """A transformation of a service.TestService.
-
- Attributes:
- name: The RPC service name to be used in the test.
- methods: A sequence of interfaces.Method objects describing the RPC
- methods that will be called during the test.
- inline_method_implementations: A dict from RPC method name to
- face_interfaces.MethodImplementation object to be used in tests of
- in-line calls to behaviors under test.
- event_method_implementations: A dict from RPC method name to
- face_interfaces.MethodImplementation object to be used in tests of
- event-driven calls to behaviors under test.
- multi_method_implementation: A face_interfaces.MultiMethodImplementation to
- be used in tests of generic calls to behaviors under test.
- unary_unary_messages_sequences: A dict from method name to sequence of
- service.UnaryUnaryTestMessages objects to be used to test the method
- with the given name.
- unary_stream_messages_sequences: A dict from method name to sequence of
- service.UnaryStreamTestMessages objects to be used to test the method
- with the given name.
- stream_unary_messages_sequences: A dict from method name to sequence of
- service.StreamUnaryTestMessages objects to be used to test the method
- with the given name.
- stream_stream_messages_sequences: A dict from method name to sequence of
- service.StreamStreamTestMessages objects to be used to test the
- method with the given name.
- serialization: A serial.Serialization object describing serialization
- behaviors for all the RPC methods.
- """
-
-
-class _BufferingConsumer(stream.Consumer):
- """A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
-
- def __init__(self):
- self.consumed = []
- self.terminated = False
-
- def consume(self, value):
- self.consumed.append(value)
-
- def terminate(self):
- self.terminated = True
-
- def consume_and_terminate(self, value):
- self.consumed.append(value)
- self.terminated = True
-
-
-class _InlineUnaryUnaryMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, unary_unary_test_method, control):
- self._test_method = unary_unary_test_method
- self._control = control
-
- self.cardinality = cardinality.Cardinality.UNARY_UNARY
- self.style = style.Service.INLINE
-
- def unary_unary_inline(self, request, context):
- response_list = []
- self._test_method.service(
- request, response_list.append, context, self._control)
- return response_list.pop(0)
-
-
-class _EventUnaryUnaryMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, unary_unary_test_method, control, pool):
- self._test_method = unary_unary_test_method
- self._control = control
- self._pool = pool
-
- self.cardinality = cardinality.Cardinality.UNARY_UNARY
- self.style = style.Service.EVENT
-
- def unary_unary_event(self, request, response_callback, context):
- if self._pool is None:
- self._test_method.service(
- request, response_callback, context, self._control)
- else:
- self._pool.submit(
- self._test_method.service, request, response_callback, context,
- self._control)
-
-
-class _InlineUnaryStreamMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, unary_stream_test_method, control):
- self._test_method = unary_stream_test_method
- self._control = control
-
- self.cardinality = cardinality.Cardinality.UNARY_STREAM
- self.style = style.Service.INLINE
-
- def unary_stream_inline(self, request, context):
- response_consumer = _BufferingConsumer()
- self._test_method.service(
- request, response_consumer, context, self._control)
- for response in response_consumer.consumed:
- yield response
-
-
-class _EventUnaryStreamMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, unary_stream_test_method, control, pool):
- self._test_method = unary_stream_test_method
- self._control = control
- self._pool = pool
-
- self.cardinality = cardinality.Cardinality.UNARY_STREAM
- self.style = style.Service.EVENT
-
- def unary_stream_event(self, request, response_consumer, context):
- if self._pool is None:
- self._test_method.service(
- request, response_consumer, context, self._control)
- else:
- self._pool.submit(
- self._test_method.service, request, response_consumer, context,
- self._control)
-
-
-class _InlineStreamUnaryMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, stream_unary_test_method, control):
- self._test_method = stream_unary_test_method
- self._control = control
-
- self.cardinality = cardinality.Cardinality.STREAM_UNARY
- self.style = style.Service.INLINE
-
- def stream_unary_inline(self, request_iterator, context):
- response_list = []
- request_consumer = self._test_method.service(
- response_list.append, context, self._control)
- for request in request_iterator:
- request_consumer.consume(request)
- request_consumer.terminate()
- return response_list.pop(0)
-
-
-class _EventStreamUnaryMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, stream_unary_test_method, control, pool):
- self._test_method = stream_unary_test_method
- self._control = control
- self._pool = pool
-
- self.cardinality = cardinality.Cardinality.STREAM_UNARY
- self.style = style.Service.EVENT
-
- def stream_unary_event(self, response_callback, context):
- request_consumer = self._test_method.service(
- response_callback, context, self._control)
- if self._pool is None:
- return request_consumer
- else:
- return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
-
-
-class _InlineStreamStreamMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, stream_stream_test_method, control):
- self._test_method = stream_stream_test_method
- self._control = control
-
- self.cardinality = cardinality.Cardinality.STREAM_STREAM
- self.style = style.Service.INLINE
-
- def stream_stream_inline(self, request_iterator, context):
- response_consumer = _BufferingConsumer()
- request_consumer = self._test_method.service(
- response_consumer, context, self._control)
-
- for request in request_iterator:
- request_consumer.consume(request)
- while response_consumer.consumed:
- yield response_consumer.consumed.pop(0)
- response_consumer.terminate()
-
-
-class _EventStreamStreamMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, stream_stream_test_method, control, pool):
- self._test_method = stream_stream_test_method
- self._control = control
- self._pool = pool
-
- self.cardinality = cardinality.Cardinality.STREAM_STREAM
- self.style = style.Service.EVENT
-
- def stream_stream_event(self, response_consumer, context):
- request_consumer = self._test_method.service(
- response_consumer, context, self._control)
- if self._pool is None:
- return request_consumer
- else:
- return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
-
-
-class _UnaryConsumer(stream.Consumer):
- """A Consumer that only allows consumption of exactly one value."""
-
- def __init__(self, action):
- self._lock = threading.Lock()
- self._action = action
- self._consumed = False
- self._terminated = False
-
- def consume(self, value):
- with self._lock:
- if self._consumed:
- raise ValueError('Unary consumer already consumed!')
- elif self._terminated:
- raise ValueError('Unary consumer already terminated!')
- else:
- self._consumed = True
-
- self._action(value)
-
- def terminate(self):
- with self._lock:
- if not self._consumed:
- raise ValueError('Unary consumer hasn\'t yet consumed!')
- elif self._terminated:
- raise ValueError('Unary consumer already terminated!')
- else:
- self._terminated = True
-
- def consume_and_terminate(self, value):
- with self._lock:
- if self._consumed:
- raise ValueError('Unary consumer already consumed!')
- elif self._terminated:
- raise ValueError('Unary consumer already terminated!')
- else:
- self._consumed = True
- self._terminated = True
-
- self._action(value)
-
-
-class _UnaryUnaryAdaptation(object):
-
- def __init__(self, unary_unary_test_method):
- self._method = unary_unary_test_method
-
- def service(self, response_consumer, context, control):
- def action(request):
- self._method.service(
- request, response_consumer.consume_and_terminate, context, control)
- return _UnaryConsumer(action)
-
-
-class _UnaryStreamAdaptation(object):
-
- def __init__(self, unary_stream_test_method):
- self._method = unary_stream_test_method
-
- def service(self, response_consumer, context, control):
- def action(request):
- self._method.service(request, response_consumer, context, control)
- return _UnaryConsumer(action)
-
-
-class _StreamUnaryAdaptation(object):
-
- def __init__(self, stream_unary_test_method):
- self._method = stream_unary_test_method
-
- def service(self, response_consumer, context, control):
- return self._method.service(
- response_consumer.consume_and_terminate, context, control)
-
-
-class _MultiMethodImplementation(face_interfaces.MultiMethodImplementation):
-
- def __init__(self, methods, control, pool):
- self._methods = methods
- self._control = control
- self._pool = pool
-
- def service(self, name, response_consumer, context):
- method = self._methods.get(name, None)
- if method is None:
- raise exceptions.NoSuchMethodError(name)
- elif self._pool is None:
- return method(response_consumer, context, self._control)
- else:
- request_consumer = method(response_consumer, context, self._control)
- return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
-
-
-class _Assembly(
- collections.namedtuple(
- '_Assembly',
- ['methods', 'inlines', 'events', 'adaptations', 'messages'])):
- """An intermediate structure created when creating a TestServiceDigest."""
-
-
-def _assemble(
- scenarios, names, inline_method_constructor, event_method_constructor,
- adapter, control, pool):
- """Creates an _Assembly from the given scenarios."""
- methods = []
- inlines = {}
- events = {}
- adaptations = {}
- messages = {}
- for name, scenario in six.iteritems(scenarios):
- if name in names:
- raise ValueError('Repeated name "%s"!' % name)
-
- test_method = scenario[0]
- inline_method = inline_method_constructor(test_method, control)
- event_method = event_method_constructor(test_method, control, pool)
- adaptation = adapter(test_method)
-
- methods.append(test_method)
- inlines[name] = inline_method
- events[name] = event_method
- adaptations[name] = adaptation
- messages[name] = scenario[1]
-
- return _Assembly(methods, inlines, events, adaptations, messages)
-
-
-def digest(service, control, pool):
- """Creates a TestServiceDigest from a TestService.
-
- Args:
- service: A testing_service.TestService.
- control: A testing_control.Control.
- pool: If RPC methods should be serviced in a separate thread, a thread pool.
- None if RPC methods should be serviced in the thread belonging to the
- run-time that calls for their service.
-
- Returns:
- A TestServiceDigest synthesized from the given service.TestService.
- """
- names = set()
-
- unary_unary = _assemble(
- service.unary_unary_scenarios(), names, _InlineUnaryUnaryMethod,
- _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool)
- names.update(set(unary_unary.inlines))
-
- unary_stream = _assemble(
- service.unary_stream_scenarios(), names, _InlineUnaryStreamMethod,
- _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool)
- names.update(set(unary_stream.inlines))
-
- stream_unary = _assemble(
- service.stream_unary_scenarios(), names, _InlineStreamUnaryMethod,
- _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool)
- names.update(set(stream_unary.inlines))
-
- stream_stream = _assemble(
- service.stream_stream_scenarios(), names, _InlineStreamStreamMethod,
- _EventStreamStreamMethod, _IDENTITY, control, pool)
- names.update(set(stream_stream.inlines))
-
- methods = list(unary_unary.methods)
- methods.extend(unary_stream.methods)
- methods.extend(stream_unary.methods)
- methods.extend(stream_stream.methods)
- adaptations = dict(unary_unary.adaptations)
- adaptations.update(unary_stream.adaptations)
- adaptations.update(stream_unary.adaptations)
- adaptations.update(stream_stream.adaptations)
- inlines = dict(unary_unary.inlines)
- inlines.update(unary_stream.inlines)
- inlines.update(stream_unary.inlines)
- inlines.update(stream_stream.inlines)
- events = dict(unary_unary.events)
- events.update(unary_stream.events)
- events.update(stream_unary.events)
- events.update(stream_stream.events)
-
- return TestServiceDigest(
- service.name(),
- methods,
- inlines,
- events,
- _MultiMethodImplementation(adaptations, control, pool),
- unary_unary.messages,
- unary_stream.messages,
- stream_unary.messages,
- stream_stream.messages)