aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
diff options
context:
space:
mode:
authorGravatar Masood Malekghassemi <atash@google.com>2016-06-02 16:39:20 -0700
committerGravatar Masood Malekghassemi <atash@google.com>2016-07-01 10:18:45 -0700
commit1ff429da2a94bc79300ebce3f8aae7efb10e9a75 (patch)
tree2803b53422cabf78d7690a89e23a073dadad036b /src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
parentb926ef2fb7e10df1f45166296e9e3a8a44f7060f (diff)
Organize Python tests to use grpcio-tools directly
Moves all tests into a separate package. This does not change existing supported means of running tests (e.g. through run_tests.py).
Diffstat (limited to 'src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py')
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py480
1 files changed, 0 insertions, 480 deletions
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
deleted file mode 100644
index 791620307b..0000000000
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
+++ /dev/null
@@ -1,480 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Test code for the Face layer of RPC Framework."""
-
-from __future__ import division
-
-import abc
-import contextlib
-import itertools
-import threading
-import unittest
-from concurrent import futures
-
-import six
-
-# test_interfaces is referenced from specification in this module.
-from grpc.framework.foundation import logging_pool
-from grpc.framework.interfaces.face import face
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.common import test_control
-from tests.unit.framework.common import test_coverage
-from tests.unit.framework.interfaces.face import _3069_test_constant
-from tests.unit.framework.interfaces.face import _digest
-from tests.unit.framework.interfaces.face import _stock_service
-from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
-
-
-class _PauseableIterator(object):
-
- def __init__(self, upstream):
- self._upstream = upstream
- self._condition = threading.Condition()
- self._paused = False
-
- @contextlib.contextmanager
- def pause(self):
- with self._condition:
- self._paused = True
- yield
- with self._condition:
- self._paused = False
- self._condition.notify_all()
-
- def __iter__(self):
- return self
-
- def __next__(self):
- return self.next()
-
- def next(self):
- with self._condition:
- while self._paused:
- self._condition.wait()
- return next(self._upstream)
-
-
-class _Callback(object):
-
- def __init__(self):
- self._condition = threading.Condition()
- self._called = False
- self._passed_future = None
- self._passed_other_stuff = None
-
- def __call__(self, *args, **kwargs):
- with self._condition:
- self._called = True
- if args:
- self._passed_future = args[0]
- if 1 < len(args) or kwargs:
- self._passed_other_stuff = tuple(args[1:]), dict(kwargs)
- self._condition.notify_all()
-
- def future(self):
- with self._condition:
- while True:
- if self._passed_other_stuff is not None:
- raise ValueError(
- 'Test callback passed unexpected values: %s',
- self._passed_other_stuff)
- elif self._called:
- return self._passed_future
- else:
- self._condition.wait()
-
-
-class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.TestCase)):
- """A test of the Face layer of RPC Framework.
-
- Concrete subclasses must have an "implementation" attribute of type
- test_interfaces.Implementation and an "invoker_constructor" attribute of type
- _invocation.InvokerConstructor.
- """
-
- NAME = 'FutureInvocationAsynchronousEventServiceTest'
-
- def setUp(self):
- """See unittest.TestCase.setUp for full specification.
-
- Overriding implementations must call this implementation.
- """
- self._control = test_control.PauseFailControl()
- self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE)
- self._digest = _digest.digest(
- _stock_service.STOCK_TEST_SERVICE, self._control, self._digest_pool)
-
- generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
- self._digest.methods, self._digest.event_method_implementations, None)
- self._invoker = self.invoker_constructor.construct_invoker(
- generic_stub, dynamic_stubs, self._digest.methods)
-
- def tearDown(self):
- """See unittest.TestCase.tearDown for full specification.
-
- Overriding implementations must call this implementation.
- """
- self._invoker = None
- self.implementation.destantiate(self._memo)
- self._digest_pool.shutdown(wait=True)
-
- def testSuccessfulUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = _Callback()
-
- response_future = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- response_future.add_done_callback(callback)
- response = response_future.result()
-
- test_messages.verify(request, response, self)
- self.assertIs(callback.future(), response_future)
-
- def testSuccessfulUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- response_iterator = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(request, responses, self)
-
- def testSuccessfulStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- request_iterator = _PauseableIterator(iter(requests))
- callback = _Callback()
-
- # Use of a paused iterator of requests allows us to test that control is
- # returned to calling code before the iterator yields any requests.
- with request_iterator.pause():
- response_future = self._invoker.future(group, method)(
- request_iterator, test_constants.LONG_TIMEOUT)
- response_future.add_done_callback(callback)
- future_passed_to_callback = callback.future()
- response = future_passed_to_callback.result()
-
- test_messages.verify(requests, response, self)
- self.assertIs(future_passed_to_callback, response_future)
-
- def testSuccessfulStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- request_iterator = _PauseableIterator(iter(requests))
-
- # Use of a paused iterator of requests allows us to test that control is
- # returned to calling code before the iterator yields any requests.
- with request_iterator.pause():
- response_iterator = self._invoker.future(group, method)(
- request_iterator, test_constants.LONG_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(requests, responses, self)
-
- def testSequentialInvocations(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response_future = self._invoker.future(group, method)(
- first_request, test_constants.LONG_TIMEOUT)
- first_response = first_response_future.result()
-
- test_messages.verify(first_request, first_response, self)
-
- second_response_future = self._invoker.future(group, method)(
- second_request, test_constants.LONG_TIMEOUT)
- second_response = second_response_future.result()
-
- test_messages.verify(second_request, second_response, self)
-
- def testParallelInvocations(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response_future = self._invoker.future(group, method)(
- first_request, test_constants.LONG_TIMEOUT)
- second_response_future = self._invoker.future(group, method)(
- second_request, test_constants.LONG_TIMEOUT)
- first_response = first_response_future.result()
- second_response = second_response_future.result()
-
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = []
- response_futures = []
- for _ in range(test_constants.THREAD_CONCURRENCY):
- request = test_messages.request()
- response_future = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- requests.append(request)
- response_futures.append(response_future)
-
- responses = [
- response_future.result() for response_future in response_futures]
-
- for request, response in zip(requests, responses):
- test_messages.verify(request, response, self)
-
- def testWaitingForSomeButNotAllParallelInvocations(self):
- pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = []
- response_futures_to_indices = {}
- for index in range(test_constants.THREAD_CONCURRENCY):
- request = test_messages.request()
- inner_response_future = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- outer_response_future = pool.submit(inner_response_future.result)
- requests.append(request)
- response_futures_to_indices[outer_response_future] = index
-
- some_completed_response_futures_iterator = itertools.islice(
- futures.as_completed(response_futures_to_indices),
- test_constants.THREAD_CONCURRENCY // 2)
- for response_future in some_completed_response_futures_iterator:
- index = response_futures_to_indices[response_future]
- test_messages.verify(requests[index], response_future.result(), self)
- pool.shutdown(wait=True)
-
- def testCancelledUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = _Callback()
-
- with self._control.pause():
- response_future = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- response_future.add_done_callback(callback)
- cancel_method_return_value = response_future.cancel()
-
- self.assertIs(callback.future(), response_future)
- self.assertFalse(cancel_method_return_value)
- self.assertTrue(response_future.cancelled())
-
- def testCancelledUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.pause():
- response_iterator = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- response_iterator.cancel()
-
- with self.assertRaises(face.CancellationError):
- next(response_iterator)
-
- def testCancelledStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = _Callback()
-
- with self._control.pause():
- response_future = self._invoker.future(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
- response_future.add_done_callback(callback)
- cancel_method_return_value = response_future.cancel()
-
- self.assertIs(callback.future(), response_future)
- self.assertFalse(cancel_method_return_value)
- self.assertTrue(response_future.cancelled())
-
- def testCancelledStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.pause():
- response_iterator = self._invoker.future(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
- response_iterator.cancel()
-
- with self.assertRaises(face.CancellationError):
- next(response_iterator)
-
- def testExpiredUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = _Callback()
-
- with self._control.pause():
- response_future = self._invoker.future(
- group, method)(request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- response_future.add_done_callback(callback)
- self.assertIs(callback.future(), response_future)
- self.assertIsInstance(
- response_future.exception(), face.ExpirationError)
- with self.assertRaises(face.ExpirationError):
- response_future.result()
-
- def testExpiredUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.pause():
- response_iterator = self._invoker.future(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- with self.assertRaises(face.ExpirationError):
- list(response_iterator)
-
- def testExpiredStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = _Callback()
-
- with self._control.pause():
- response_future = self._invoker.future(group, method)(
- iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
- response_future.add_done_callback(callback)
- self.assertIs(callback.future(), response_future)
- self.assertIsInstance(
- response_future.exception(), face.ExpirationError)
- with self.assertRaises(face.ExpirationError):
- response_future.result()
-
- def testExpiredStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.pause():
- response_iterator = self._invoker.future(group, method)(
- iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
- with self.assertRaises(face.ExpirationError):
- list(response_iterator)
-
- def testFailedUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = _Callback()
-
- with self._control.fail():
- response_future = self._invoker.future(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- response_future.add_done_callback(callback)
-
- self.assertIs(callback.future(), response_future)
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is
- # indistinguishable from simply not having called its
- # response_callback before the expiration of the RPC.
- self.assertIsInstance(
- response_future.exception(), face.ExpirationError)
- with self.assertRaises(face.ExpirationError):
- response_future.result()
-
- def testFailedUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is indistinguishable
- # from simply not having called its response_consumer before the
- # expiration of the RPC.
- with self._control.fail(), self.assertRaises(face.ExpirationError):
- response_iterator = self._invoker.future(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- list(response_iterator)
-
- def testFailedStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = _Callback()
-
- with self._control.fail():
- response_future = self._invoker.future(group, method)(
- iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
- response_future.add_done_callback(callback)
-
- self.assertIs(callback.future(), response_future)
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is
- # indistinguishable from simply not having called its
- # response_callback before the expiration of the RPC.
- self.assertIsInstance(
- response_future.exception(), face.ExpirationError)
- with self.assertRaises(face.ExpirationError):
- response_future.result()
-
- def testFailedStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is indistinguishable
- # from simply not having called its response_consumer before the
- # expiration of the RPC.
- with self._control.fail(), self.assertRaises(face.ExpirationError):
- response_iterator = self._invoker.future(group, method)(
- iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
- list(response_iterator)