diff options
author | Nathaniel Manista <nathaniel@google.com> | 2015-02-13 22:16:35 +0000 |
---|---|---|
committer | Nathaniel Manista <nathaniel@google.com> | 2015-02-13 22:16:35 +0000 |
commit | 83edf3e8d369d1848bda8c8ae7136c1c15c5969f (patch) | |
tree | 9a2ce13030c7ca64b253f008cc2de3e0c755acd7 /src/python | |
parent | f47491954dbdcb7c4f004d6f1dc41ff9dadc26fd (diff) |
Change the interface of RPC Framework's Future interface.
Now our Future interface is as-API-compatible-as-possible with
Python's own concurrent.futures.Future.
Diffstat (limited to 'src/python')
6 files changed, 509 insertions, 219 deletions
diff --git a/src/python/src/_framework/face/_calls.py b/src/python/src/_framework/face/_calls.py index 9128aef7c4..a7d8be5e43 100644 --- a/src/python/src/_framework/face/_calls.py +++ b/src/python/src/_framework/face/_calls.py @@ -29,6 +29,7 @@ """Utility functions for invoking RPCs.""" +import sys import threading from _framework.base import interfaces as base_interfaces @@ -79,20 +80,46 @@ def _stream_event_subscription(result_consumer, abortion_callback): _EventServicedIngestor(result_consumer, abortion_callback)) +# NOTE(nathaniel): This class has some extremely special semantics around +# cancellation that allow it to be used by both "blocking" APIs and "futures" +# APIs. +# +# Since futures.Future defines its own exception for cancellation, we want these +# objects, when returned by methods of a returning-Futures-from-other-methods +# object, to raise the same exception for cancellation. But that's weird in a +# blocking API - why should this object, also returned by methods of blocking +# APIs, raise exceptions from the "future" module? Should we do something like +# have this class be parameterized by the type of exception that it raises in +# cancellation circumstances? +# +# We don't have to take such a dramatic step: since blocking APIs define no +# cancellation semantics whatsoever, there is no supported way for +# blocking-API-users of these objects to cancel RPCs, and thus no supported way +# for them to see an exception the type of which would be weird to them. +# +# Bonus: in both blocking and futures APIs, this object still properly raises +# exceptions.CancellationError for any *server-side cancellation* of an RPC. class _OperationCancellableIterator(interfaces.CancellableIterator): """An interfaces.CancellableIterator for response-streaming operations.""" def __init__(self, rendezvous, operation): + self._lock = threading.Lock() self._rendezvous = rendezvous self._operation = operation + self._cancelled = False def __iter__(self): return self def next(self): + with self._lock: + if self._cancelled: + raise future.CancelledError() return next(self._rendezvous) def cancel(self): + with self._lock: + self._cancelled = True self._operation.cancel() self._rendezvous.set_outcome(base_interfaces.Outcome.CANCELLED) @@ -105,46 +132,126 @@ class _OperationFuture(future.Future): self._rendezvous = rendezvous self._operation = operation - self._outcome = None + self._cancelled = False + self._computed = False + self._payload = None + self._exception = None + self._traceback = None self._callbacks = [] def cancel(self): """See future.Future.cancel for specification.""" with self._condition: - if self._outcome is None: + if not self._cancelled and not self._computed: self._operation.cancel() - self._outcome = future.aborted() + self._cancelled = True self._condition.notify_all() return False def cancelled(self): """See future.Future.cancelled for specification.""" - return False + with self._condition: + return self._cancelled + + def running(self): + """See future.Future.running for specification.""" + with self._condition: + return not self._cancelled and not self._computed def done(self): """See future.Future.done for specification.""" with self._condition: - return (self._outcome is not None and - self._outcome.category is not future.ABORTED) + return self._cancelled or self._computed + + def result(self, timeout=None): + """See future.Future.result for specification.""" + with self._condition: + if self._cancelled: + raise future.CancelledError() + if self._computed: + if self._payload is None: + raise self._exception # pylint: disable=raising-bad-type + else: + return self._payload + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._callbacks.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) + + with self._condition: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + if self._payload is None: + raise self._exception # pylint: disable=raising-bad-type + else: + return self._payload + else: + raise future.TimeoutError() + + def exception(self, timeout=None): + """See future.Future.exception for specification.""" + with self._condition: + if self._cancelled: + raise future.CancelledError() + if self._computed: + return self._exception + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._callbacks.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) - def outcome(self): - """See future.Future.outcome for specification.""" with self._condition: - while self._outcome is None: - self._condition.wait() - return self._outcome + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._exception + else: + raise future.TimeoutError() - def add_done_callback(self, callback): + def traceback(self, timeout=None): + """See future.Future.traceback for specification.""" + with self._condition: + if self._cancelled: + raise future.CancelledError() + if self._computed: + return self._traceback + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._callbacks.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) + + with self._condition: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._traceback + else: + raise future.TimeoutError() + + def add_done_callback(self, fn): """See future.Future.add_done_callback for specification.""" with self._condition: if self._callbacks is not None: - self._callbacks.add(callback) + self._callbacks.add(fn) return - outcome = self._outcome - - callable_util.call_logging_exceptions( - callback, _DONE_CALLBACK_LOG_MESSAGE, outcome) + callable_util.call_logging_exceptions(fn, _DONE_CALLBACK_LOG_MESSAGE, self) def on_operation_termination(self, operation_outcome): """Indicates to this object that the operation has terminated. @@ -154,34 +261,42 @@ class _OperationFuture(future.Future): outcome of the operation. """ with self._condition: - if (self._outcome is None and - operation_outcome is not base_interfaces.Outcome.COMPLETED): - self._outcome = future.raised( - _control.abortion_outcome_to_exception(operation_outcome)) - self._condition.notify_all() - - outcome = self._outcome - rendezvous = self._rendezvous - callbacks = list(self._callbacks) - self._callbacks = None - - if outcome is None: - try: - return_value = next(rendezvous) - except Exception as e: # pylint: disable=broad-except - outcome = future.raised(e) + cancelled = self._cancelled + if cancelled: + callbacks = list(self._callbacks) + self._callbacks = None else: - outcome = future.returned(return_value) + rendezvous = self._rendezvous + + if not cancelled: + payload = None + exception = None + traceback = None + if operation_outcome == base_interfaces.Outcome.COMPLETED: + try: + payload = next(rendezvous) + except Exception as e: # pylint: disable=broad-except + exception = e + traceback = sys.exc_info()[2] + else: + try: + # We raise and then immediately catch in order to create a traceback. + raise _control.abortion_outcome_to_exception(operation_outcome) + except Exception as e: # pylint: disable=broad-except + exception = e + traceback = sys.exc_info()[2] with self._condition: - if self._outcome is None: - self._outcome = outcome - self._condition.notify_all() - else: - outcome = self._outcome + if not self._cancelled: + self._computed = True + self._payload = payload + self._exception = exception + self._traceback = traceback + callbacks = list(self._callbacks) + self._callbacks = None for callback in callbacks: callable_util.call_logging_exceptions( - callback, _DONE_CALLBACK_LOG_MESSAGE, outcome) + callback, _DONE_CALLBACK_LOG_MESSAGE, self) class _Call(interfaces.Call): diff --git a/src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py index cf8b2eeb95..939b238b66 100644 --- a/src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py +++ b/src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -116,7 +116,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( response_future = self.stub.future_value_in_value_out( name, request, _TIMEOUT) - response = response_future.outcome().return_value + response = response_future.result() test_messages.verify(request, response, self) @@ -144,7 +144,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with request_iterator.pause(): response_future = self.stub.future_stream_in_value_out( name, request_iterator, _TIMEOUT) - response = response_future.outcome().return_value + response = response_future.result() test_messages.verify(requests, response, self) @@ -173,13 +173,13 @@ class FutureInvocationAsynchronousEventServiceTestCase( first_response_future = self.stub.future_value_in_value_out( name, first_request, _TIMEOUT) - first_response = first_response_future.outcome().return_value + first_response = first_response_future.result() test_messages.verify(first_request, first_response, self) second_response_future = self.stub.future_value_in_value_out( name, second_request, _TIMEOUT) - second_response = second_response_future.outcome().return_value + second_response = second_response_future.result() test_messages.verify(second_request, second_response, self) @@ -192,10 +192,10 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_value_in_value_out( name, request, _TIMEOUT) - outcome = response_future.outcome() - - self.assertIsInstance( - outcome.exception, exceptions.ExpirationError) + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() def testExpiredUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -203,11 +203,11 @@ class FutureInvocationAsynchronousEventServiceTestCase( for test_messages in test_messages_sequence: request = test_messages.request() - with self.control.pause(), self.assertRaises( - exceptions.ExpirationError): + with self.control.pause(): response_iterator = self.stub.inline_value_in_stream_out( name, request, _TIMEOUT) - list(response_iterator) + with self.assertRaises(exceptions.ExpirationError): + list(response_iterator) def testExpiredStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( @@ -218,10 +218,10 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_stream_in_value_out( name, iter(requests), _TIMEOUT) - outcome = response_future.outcome() - - self.assertIsInstance( - outcome.exception, exceptions.ExpirationError) + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() def testExpiredStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -229,11 +229,11 @@ class FutureInvocationAsynchronousEventServiceTestCase( for test_messages in test_messages_sequence: requests = test_messages.requests() - with self.control.pause(), self.assertRaises( - exceptions.ExpirationError): + with self.control.pause(): response_iterator = self.stub.inline_stream_in_stream_out( name, iter(requests), _TIMEOUT) - list(response_iterator) + with self.assertRaises(exceptions.ExpirationError): + list(response_iterator) def testFailedUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( @@ -244,13 +244,15 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.fail(): response_future = self.stub.future_value_in_value_out( name, request, _TIMEOUT) - outcome = response_future.outcome() - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_callback before the - # expiration of the RPC. - self.assertIsInstance(outcome.exception, exceptions.ExpirationError) + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() def testFailedUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -276,13 +278,15 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.fail(): response_future = self.stub.future_stream_in_value_out( name, iter(requests), _TIMEOUT) - outcome = response_future.outcome() - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_callback before the - # expiration of the RPC. - self.assertIsInstance(outcome.exception, exceptions.ExpirationError) + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() def testFailedStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -310,8 +314,8 @@ class FutureInvocationAsynchronousEventServiceTestCase( name, first_request, _TIMEOUT) second_response_future = self.stub.future_value_in_value_out( name, second_request, _TIMEOUT) - first_response = first_response_future.outcome().return_value - second_response = second_response_future.outcome().return_value + first_response = first_response_future.result() + second_response = second_response_future.result() test_messages.verify(first_request, first_response, self) test_messages.verify(second_request, second_response, self) @@ -329,10 +333,10 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_value_in_value_out( name, request, _TIMEOUT) - cancelled = response_future.cancel() + cancel_method_return_value = response_future.cancel() - self.assertFalse(cancelled) - self.assertEqual(future.ABORTED, response_future.outcome().category) + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) def testCancelledUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -345,7 +349,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( name, request, _TIMEOUT) response_iterator.cancel() - with self.assertRaises(exceptions.CancellationError): + with self.assertRaises(future.CancelledError): next(response_iterator) def testCancelledStreamRequestUnaryResponse(self): @@ -357,10 +361,10 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_stream_in_value_out( name, iter(requests), _TIMEOUT) - cancelled = response_future.cancel() + cancel_method_return_value = response_future.cancel() - self.assertFalse(cancelled) - self.assertEqual(future.ABORTED, response_future.outcome().category) + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) def testCancelledStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -373,5 +377,5 @@ class FutureInvocationAsynchronousEventServiceTestCase( name, iter(requests), _TIMEOUT) response_iterator.cancel() - with self.assertRaises(exceptions.CancellationError): + with self.assertRaises(future.CancelledError): next(response_iterator) diff --git a/src/python/src/_framework/foundation/_later_test.py b/src/python/src/_framework/foundation/_later_test.py index fbd17a4ad9..50b67907db 100644 --- a/src/python/src/_framework/foundation/_later_test.py +++ b/src/python/src/_framework/foundation/_later_test.py @@ -33,7 +33,6 @@ import threading import time import unittest -from _framework.foundation import future from _framework.foundation import later TICK = 0.1 @@ -44,10 +43,14 @@ class LaterTest(unittest.TestCase): def test_simple_delay(self): lock = threading.Lock() cell = [0] - def increment_cell(): + return_value = object() + + def computation(): with lock: cell[0] += 1 - computation_future = later.later(TICK * 2, increment_cell) + return return_value + computation_future = later.later(TICK * 2, computation) + self.assertFalse(computation_future.done()) self.assertFalse(computation_future.cancelled()) time.sleep(TICK) @@ -60,22 +63,21 @@ class LaterTest(unittest.TestCase): self.assertFalse(computation_future.cancelled()) with lock: self.assertEqual(1, cell[0]) - outcome = computation_future.outcome() - self.assertEqual(future.RETURNED, outcome.category) + self.assertEqual(return_value, computation_future.result()) def test_callback(self): lock = threading.Lock() cell = [0] callback_called = [False] - outcome_passed_to_callback = [None] - def increment_cell(): + future_passed_to_callback = [None] + def computation(): with lock: cell[0] += 1 - computation_future = later.later(TICK * 2, increment_cell) + computation_future = later.later(TICK * 2, computation) def callback(outcome): with lock: callback_called[0] = True - outcome_passed_to_callback[0] = outcome + future_passed_to_callback[0] = outcome computation_future.add_done_callback(callback) time.sleep(TICK) with lock: @@ -83,63 +85,67 @@ class LaterTest(unittest.TestCase): time.sleep(TICK * 2) with lock: self.assertTrue(callback_called[0]) - self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) + self.assertTrue(future_passed_to_callback[0].done()) callback_called[0] = False - outcome_passed_to_callback[0] = None + future_passed_to_callback[0] = None computation_future.add_done_callback(callback) with lock: self.assertTrue(callback_called[0]) - self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) + self.assertTrue(future_passed_to_callback[0].done()) def test_cancel(self): lock = threading.Lock() cell = [0] callback_called = [False] - outcome_passed_to_callback = [None] - def increment_cell(): + future_passed_to_callback = [None] + def computation(): with lock: cell[0] += 1 - computation_future = later.later(TICK * 2, increment_cell) + computation_future = later.later(TICK * 2, computation) def callback(outcome): with lock: callback_called[0] = True - outcome_passed_to_callback[0] = outcome + future_passed_to_callback[0] = outcome computation_future.add_done_callback(callback) time.sleep(TICK) with lock: self.assertFalse(callback_called[0]) computation_future.cancel() self.assertTrue(computation_future.cancelled()) - self.assertFalse(computation_future.done()) - self.assertEqual(future.ABORTED, computation_future.outcome().category) + self.assertFalse(computation_future.running()) + self.assertTrue(computation_future.done()) with lock: self.assertTrue(callback_called[0]) - self.assertEqual(future.ABORTED, outcome_passed_to_callback[0].category) + self.assertTrue(future_passed_to_callback[0].cancelled()) - def test_outcome(self): + def test_result(self): lock = threading.Lock() cell = [0] callback_called = [False] - outcome_passed_to_callback = [None] - def increment_cell(): + future_passed_to_callback_cell = [None] + return_value = object() + + def computation(): with lock: cell[0] += 1 - computation_future = later.later(TICK * 2, increment_cell) - def callback(outcome): + return return_value + computation_future = later.later(TICK * 2, computation) + + def callback(future_passed_to_callback): with lock: callback_called[0] = True - outcome_passed_to_callback[0] = outcome + future_passed_to_callback_cell[0] = future_passed_to_callback computation_future.add_done_callback(callback) - returned_outcome = computation_future.outcome() - self.assertEqual(future.RETURNED, returned_outcome.category) + returned_value = computation_future.result() + self.assertEqual(return_value, returned_value) # The callback may not yet have been called! Sleep a tick. time.sleep(TICK) with lock: self.assertTrue(callback_called[0]) - self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) + self.assertEqual(return_value, future_passed_to_callback_cell[0].result()) if __name__ == '__main__': unittest.main() diff --git a/src/python/src/_framework/foundation/_timer_future.py b/src/python/src/_framework/foundation/_timer_future.py index 86bc073d56..4aa66991c5 100644 --- a/src/python/src/_framework/foundation/_timer_future.py +++ b/src/python/src/_framework/foundation/_timer_future.py @@ -29,6 +29,7 @@ """Affords a Future implementation based on Python's threading.Timer.""" +import sys import threading import time @@ -52,7 +53,9 @@ class TimerFuture(future.Future): self._computing = False self._computed = False self._cancelled = False - self._outcome = None + self._result = None + self._exception = None + self._traceback = None self._waiting = [] def _compute(self): @@ -70,19 +73,24 @@ class TimerFuture(future.Future): self._computing = True try: - returned_value = self._computation() - outcome = future.returned(returned_value) + return_value = self._computation() + exception = None + traceback = None except Exception as e: # pylint: disable=broad-except - outcome = future.raised(e) + return_value = None + exception = e + traceback = sys.exc_info()[2] with self._lock: self._computing = False self._computed = True - self._outcome = outcome + self._return_value = return_value + self._exception = exception + self._traceback = traceback waiting = self._waiting for callback in waiting: - callback(outcome) + callback(self) def start(self): """Starts this Future. @@ -104,13 +112,11 @@ class TimerFuture(future.Future): else: self._timer.cancel() self._cancelled = True - self._outcome = future.aborted() - outcome = self._outcome waiting = self._waiting for callback in waiting: try: - callback(outcome) + callback(self) except Exception: # pylint: disable=broad-except pass @@ -121,36 +127,102 @@ class TimerFuture(future.Future): with self._lock: return self._cancelled + def running(self): + """See future.Future.running for specification.""" + with self._lock: + return not self._computed and not self._cancelled + def done(self): """See future.Future.done for specification.""" with self._lock: - return self._computed + return self._computed or self._cancelled + + def result(self, timeout=None): + """See future.Future.result for specification.""" + with self._lock: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + if self._exception is None: + return self._return_value + else: + raise self._exception # pylint: disable=raising-bad-type + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._waiting.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) + + with self._lock: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + if self._exception is None: + return self._return_value + else: + raise self._exception # pylint: disable=raising-bad-type + else: + raise future.TimeoutError() + + def exception(self, timeout=None): + """See future.Future.exception for specification.""" + with self._lock: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._exception + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._waiting.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) + + with self._lock: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._exception + else: + raise future.TimeoutError() - def outcome(self): - """See future.Future.outcome for specification.""" + def traceback(self, timeout=None): + """See future.Future.traceback for specification.""" with self._lock: - if self._computed or self._cancelled: - return self._outcome + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._traceback condition = threading.Condition() - def notify_condition(unused_outcome): + def notify_condition(unused_future): with condition: condition.notify() self._waiting.append(notify_condition) with condition: - condition.wait() + condition.wait(timeout=timeout) with self._lock: - return self._outcome + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._traceback + else: + raise future.TimeoutError() - def add_done_callback(self, callback): + def add_done_callback(self, fn): """See future.Future.add_done_callback for specification.""" with self._lock: if not self._computed and not self._cancelled: - self._waiting.append(callback) + self._waiting.append(fn) return - else: - outcome = self._outcome - callback(outcome) + fn(self) diff --git a/src/python/src/_framework/foundation/callable_util.py b/src/python/src/_framework/foundation/callable_util.py index 1f7546cb76..32b0751a01 100644 --- a/src/python/src/_framework/foundation/callable_util.py +++ b/src/python/src/_framework/foundation/callable_util.py @@ -29,18 +29,47 @@ """Utilities for working with callables.""" +import abc +import collections +import enum import functools import logging -from _framework.foundation import future + +class Outcome(object): + """A sum type describing the outcome of some call. + + Attributes: + kind: One of Kind.RETURNED or Kind.RAISED respectively indicating that the + call returned a value or raised an exception. + return_value: The value returned by the call. Must be present if kind is + Kind.RETURNED. + exception: The exception raised by the call. Must be present if kind is + Kind.RAISED. + """ + __metaclass__ = abc.ABCMeta + + @enum.unique + class Kind(enum.Enum): + """Identifies the general kind of the outcome of some call.""" + + RETURNED = object() + RAISED = object() + + +class _EasyOutcome( + collections.namedtuple( + '_EasyOutcome', ['kind', 'return_value', 'exception']), + Outcome): + """A trivial implementation of Outcome.""" def _call_logging_exceptions(behavior, message, *args, **kwargs): try: - return future.returned(behavior(*args, **kwargs)) + return _EasyOutcome(Outcome.Kind.RETURNED, behavior(*args, **kwargs), None) except Exception as e: # pylint: disable=broad-except logging.exception(message) - return future.raised(e) + return _EasyOutcome(Outcome.Kind.RAISED, None, e) def with_exceptions_logged(behavior, message): @@ -72,7 +101,7 @@ def call_logging_exceptions(behavior, message, *args, **kwargs): **kwargs: Keyword arguments to pass to the given behavior. Returns: - A future.Outcome describing whether the given behavior returned a value or - raised an exception. + An Outcome describing whether the given behavior returned a value or raised + an exception. """ return _call_logging_exceptions(behavior, message, *args, **kwargs) diff --git a/src/python/src/_framework/foundation/future.py b/src/python/src/_framework/foundation/future.py index f00c503257..bfc16fc1ea 100644 --- a/src/python/src/_framework/foundation/future.py +++ b/src/python/src/_framework/foundation/future.py @@ -27,146 +27,210 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""The Future interface missing from Python's standard library. +"""A Future interface. -Python's concurrent.futures library defines a Future class very much like the -Future defined here, but since that class is concrete and without construction -semantics it is only available within the concurrent.futures library itself. -The Future class defined here is an entirely abstract interface that anyone may +Python doesn't have a Future interface in its standard library. In the absence +of such a standard, three separate, incompatible implementations +(concurrent.futures.Future, ndb.Future, and asyncio.Future) have appeared. This +interface attempts to be as compatible as possible with +concurrent.futures.Future. From ndb.Future it adopts a traceback-object accessor +method. + +Unlike the concrete and implemented Future classes listed above, the Future +class defined in this module is an entirely abstract interface that anyone may implement and use. + +The one known incompatibility between this interface and the interface of +concurrent.futures.Future is that this interface defines its own CancelledError +and TimeoutError exceptions rather than raising the implementation-private +concurrent.futures._base.CancelledError and the +built-in-but-only-in-3.3-and-later TimeoutError. """ import abc -import collections - -RETURNED = object() -RAISED = object() -ABORTED = object() - -class Outcome(object): - """A sum type describing the outcome of some computation. - - Attributes: - category: One of RETURNED, RAISED, or ABORTED, respectively indicating - that the computation returned a value, raised an exception, or was - aborted. - return_value: The value returned by the computation. Must be present if - category is RETURNED. - exception: The exception raised by the computation. Must be present if - category is RAISED. - """ - __metaclass__ = abc.ABCMeta +class TimeoutError(Exception): + """Indicates that a particular call timed out.""" -class _EasyOutcome( - collections.namedtuple('_EasyOutcome', - ['category', 'return_value', 'exception']), - Outcome): - """A trivial implementation of Outcome.""" -# All Outcomes describing abortion are indistinguishable so there might as well -# be only one. -_ABORTED_OUTCOME = _EasyOutcome(ABORTED, None, None) +class CancelledError(Exception): + """Indicates that the computation underlying a Future was cancelled.""" -def aborted(): - """Returns an Outcome indicating that a computation was aborted. +class Future(object): + """A representation of a computation in another control flow. - Returns: - An Outcome indicating that a computation was aborted. + Computations represented by a Future may be yet to be begun, may be ongoing, + or may have already completed. """ - return _ABORTED_OUTCOME - - -def raised(exception): - """Returns an Outcome indicating that a computation raised an exception. - - Args: - exception: The exception raised by the computation. + __metaclass__ = abc.ABCMeta - Returns: - An Outcome indicating that a computation raised the given exception. - """ - return _EasyOutcome(RAISED, None, exception) + # NOTE(nathaniel): This isn't the return type that I would want to have if it + # were up to me. Were this interface being written from scratch, the return + # type of this method would probably be a sum type like: + # + # NOT_COMMENCED + # COMMENCED_AND_NOT_COMPLETED + # PARTIAL_RESULT<Partial_Result_Type> + # COMPLETED<Result_Type> + # UNCANCELLABLE + # NOT_IMMEDIATELY_DETERMINABLE + @abc.abstractmethod + def cancel(self): + """Attempts to cancel the computation. + This method does not block. -def returned(value): - """Returns an Outcome indicating that a computation returned a value. + Returns: + True if the computation has not yet begun, will not be allowed to take + place, and determination of both was possible without blocking. False + under all other circumstances including but not limited to the + computation's already having begun, the computation's already having + finished, and the computation's having been scheduled for execution on a + remote system for which a determination of whether or not it commenced + before being cancelled cannot be made without blocking. + """ + raise NotImplementedError() - Args: - value: The value returned by the computation. + # NOTE(nathaniel): Here too this isn't the return type that I'd want this + # method to have if it were up to me. I think I'd go with another sum type + # like: + # + # NOT_CANCELLED (this object's cancel method hasn't been called) + # NOT_COMMENCED + # COMMENCED_AND_NOT_COMPLETED + # PARTIAL_RESULT<Partial_Result_Type> + # COMPLETED<Result_Type> + # UNCANCELLABLE + # NOT_IMMEDIATELY_DETERMINABLE + # + # Notice how giving the cancel method the right semantics obviates most + # reasons for this method to exist. + @abc.abstractmethod + def cancelled(self): + """Describes whether the computation was cancelled. - Returns: - An Outcome indicating that a computation returned the given value. - """ - return _EasyOutcome(RETURNED, value, None) + This method does not block. + Returns: + True if the computation was cancelled any time before its result became + immediately available. False under all other circumstances including but + not limited to this object's cancel method not having been called and + the computation's result having become immediately available. + """ + raise NotImplementedError() -class Future(object): - """A representation of a computation happening in another control flow. + @abc.abstractmethod + def running(self): + """Describes whether the computation is taking place. - Computations represented by a Future may have already completed, may be - ongoing, or may be yet to be begun. + This method does not block. - Computations represented by a Future are considered uninterruptable; once - started they will be allowed to terminate either by returning or raising - an exception. - """ - __metaclass__ = abc.ABCMeta + Returns: + True if the computation is scheduled to take place in the future or is + taking place now, or False if the computation took place in the past or + was cancelled. + """ + raise NotImplementedError() + # NOTE(nathaniel): These aren't quite the semantics I'd like here either. I + # would rather this only returned True in cases in which the underlying + # computation completed successfully. A computation's having been cancelled + # conflicts with considering that computation "done". @abc.abstractmethod - def cancel(self): - """Attempts to cancel the computation. + def done(self): + """Describes whether the computation has taken place. + + This method does not block. Returns: - True if the computation will not be allowed to take place or False if - the computation has already taken place or is currently taking place. + True if the computation is known to have either completed or have been + unscheduled or interrupted. False if the computation may possibly be + executing or scheduled to execute later. """ raise NotImplementedError() @abc.abstractmethod - def cancelled(self): - """Describes whether the computation was cancelled. + def result(self, timeout=None): + """Accesses the outcome of the computation or raises its exception. + + This method may return immediately or may block. + + Args: + timeout: The length of time in seconds to wait for the computation to + finish or be cancelled, or None if this method should block until the + computation has finished or is cancelled no matter how long that takes. Returns: - True if the computation was cancelled and did not take place or False - if the computation took place, is taking place, or is scheduled to - take place in the future. + The return value of the computation. + + Raises: + TimeoutError: If a timeout value is passed and the computation does not + terminate within the allotted time. + CancelledError: If the computation was cancelled. + Exception: If the computation raised an exception, this call will raise + the same exception. """ raise NotImplementedError() @abc.abstractmethod - def done(self): - """Describes whether the computation has taken place. + def exception(self, timeout=None): + """Return the exception raised by the computation. + + This method may return immediately or may block. + + Args: + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. Returns: - True if the computation took place; False otherwise. + The exception raised by the computation, or None if the computation did + not raise an exception. + + Raises: + TimeoutError: If a timeout value is passed and the computation does not + terminate within the allotted time. + CancelledError: If the computation was cancelled. """ raise NotImplementedError() @abc.abstractmethod - def outcome(self): - """Accesses the outcome of the computation. + def traceback(self, timeout=None): + """Access the traceback of the exception raised by the computation. - If the computation has not yet completed, this method blocks until it has. + This method may return immediately or may block. + + Args: + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. Returns: - An Outcome describing the outcome of the computation. + The traceback of the exception raised by the computation, or None if the + computation did not raise an exception. + + Raises: + TimeoutError: If a timeout value is passed and the computation does not + terminate within the allotted time. + CancelledError: If the computation was cancelled. """ raise NotImplementedError() @abc.abstractmethod - def add_done_callback(self, callback): + def add_done_callback(self, fn): """Adds a function to be called at completion of the computation. - The callback will be passed an Outcome object describing the outcome of + The callback will be passed this Future object describing the outcome of the computation. If the computation has already completed, the callback will be called immediately. Args: - callback: A callable taking an Outcome as its single parameter. + fn: A callable taking a this Future object as its single parameter. """ raise NotImplementedError() |