aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2015-01-23 21:32:47 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2015-01-23 21:32:47 +0000
commitb68f3d1746ccd70f041887d74acc166cb7c3c5fd (patch)
tree4d43639f7f5fe0498b51d9720143a5147c991d7f
parentfcd6c0c9227dac16065c245477cfd949d1ecf621 (diff)
Fill out the foundation package.
-rw-r--r--src/python/_framework/foundation/_later_test.py145
-rw-r--r--src/python/_framework/foundation/_timer_future.py156
-rw-r--r--src/python/_framework/foundation/abandonment.py38
-rw-r--r--src/python/_framework/foundation/callable_util.py78
-rw-r--r--src/python/_framework/foundation/future.py172
-rw-r--r--src/python/_framework/foundation/later.py51
-rw-r--r--src/python/_framework/foundation/stream.py60
-rw-r--r--src/python/_framework/foundation/stream_testing.py73
-rw-r--r--src/python/_framework/foundation/stream_util.py160
9 files changed, 933 insertions, 0 deletions
diff --git a/src/python/_framework/foundation/_later_test.py b/src/python/_framework/foundation/_later_test.py
new file mode 100644
index 0000000000..fbd17a4ad9
--- /dev/null
+++ b/src/python/_framework/foundation/_later_test.py
@@ -0,0 +1,145 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of the later module."""
+
+import threading
+import time
+import unittest
+
+from _framework.foundation import future
+from _framework.foundation import later
+
+TICK = 0.1
+
+
+class LaterTest(unittest.TestCase):
+
+ def test_simple_delay(self):
+ lock = threading.Lock()
+ cell = [0]
+ def increment_cell():
+ with lock:
+ cell[0] += 1
+ computation_future = later.later(TICK * 2, increment_cell)
+ self.assertFalse(computation_future.done())
+ self.assertFalse(computation_future.cancelled())
+ time.sleep(TICK)
+ self.assertFalse(computation_future.done())
+ self.assertFalse(computation_future.cancelled())
+ with lock:
+ self.assertEqual(0, cell[0])
+ time.sleep(TICK * 2)
+ self.assertTrue(computation_future.done())
+ self.assertFalse(computation_future.cancelled())
+ with lock:
+ self.assertEqual(1, cell[0])
+ outcome = computation_future.outcome()
+ self.assertEqual(future.RETURNED, outcome.category)
+
+ def test_callback(self):
+ lock = threading.Lock()
+ cell = [0]
+ callback_called = [False]
+ outcome_passed_to_callback = [None]
+ def increment_cell():
+ with lock:
+ cell[0] += 1
+ computation_future = later.later(TICK * 2, increment_cell)
+ def callback(outcome):
+ with lock:
+ callback_called[0] = True
+ outcome_passed_to_callback[0] = outcome
+ computation_future.add_done_callback(callback)
+ time.sleep(TICK)
+ with lock:
+ self.assertFalse(callback_called[0])
+ time.sleep(TICK * 2)
+ with lock:
+ self.assertTrue(callback_called[0])
+ self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category)
+
+ callback_called[0] = False
+ outcome_passed_to_callback[0] = None
+
+ computation_future.add_done_callback(callback)
+ with lock:
+ self.assertTrue(callback_called[0])
+ self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category)
+
+ def test_cancel(self):
+ lock = threading.Lock()
+ cell = [0]
+ callback_called = [False]
+ outcome_passed_to_callback = [None]
+ def increment_cell():
+ with lock:
+ cell[0] += 1
+ computation_future = later.later(TICK * 2, increment_cell)
+ def callback(outcome):
+ with lock:
+ callback_called[0] = True
+ outcome_passed_to_callback[0] = outcome
+ computation_future.add_done_callback(callback)
+ time.sleep(TICK)
+ with lock:
+ self.assertFalse(callback_called[0])
+ computation_future.cancel()
+ self.assertTrue(computation_future.cancelled())
+ self.assertFalse(computation_future.done())
+ self.assertEqual(future.ABORTED, computation_future.outcome().category)
+ with lock:
+ self.assertTrue(callback_called[0])
+ self.assertEqual(future.ABORTED, outcome_passed_to_callback[0].category)
+
+ def test_outcome(self):
+ lock = threading.Lock()
+ cell = [0]
+ callback_called = [False]
+ outcome_passed_to_callback = [None]
+ def increment_cell():
+ with lock:
+ cell[0] += 1
+ computation_future = later.later(TICK * 2, increment_cell)
+ def callback(outcome):
+ with lock:
+ callback_called[0] = True
+ outcome_passed_to_callback[0] = outcome
+ computation_future.add_done_callback(callback)
+ returned_outcome = computation_future.outcome()
+ self.assertEqual(future.RETURNED, returned_outcome.category)
+
+ # The callback may not yet have been called! Sleep a tick.
+ time.sleep(TICK)
+ with lock:
+ self.assertTrue(callback_called[0])
+ self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/python/_framework/foundation/_timer_future.py b/src/python/_framework/foundation/_timer_future.py
new file mode 100644
index 0000000000..86bc073d56
--- /dev/null
+++ b/src/python/_framework/foundation/_timer_future.py
@@ -0,0 +1,156 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Affords a Future implementation based on Python's threading.Timer."""
+
+import threading
+import time
+
+from _framework.foundation import future
+
+
+class TimerFuture(future.Future):
+ """A Future implementation based around Timer objects."""
+
+ def __init__(self, compute_time, computation):
+ """Constructor.
+
+ Args:
+ compute_time: The time after which to begin this future's computation.
+ computation: The computation to be performed within this Future.
+ """
+ self._lock = threading.Lock()
+ self._compute_time = compute_time
+ self._computation = computation
+ self._timer = None
+ self._computing = False
+ self._computed = False
+ self._cancelled = False
+ self._outcome = None
+ self._waiting = []
+
+ def _compute(self):
+ """Performs the computation embedded in this Future.
+
+ Or doesn't, if the time to perform it has not yet arrived.
+ """
+ with self._lock:
+ time_remaining = self._compute_time - time.time()
+ if 0 < time_remaining:
+ self._timer = threading.Timer(time_remaining, self._compute)
+ self._timer.start()
+ return
+ else:
+ self._computing = True
+
+ try:
+ returned_value = self._computation()
+ outcome = future.returned(returned_value)
+ except Exception as e: # pylint: disable=broad-except
+ outcome = future.raised(e)
+
+ with self._lock:
+ self._computing = False
+ self._computed = True
+ self._outcome = outcome
+ waiting = self._waiting
+
+ for callback in waiting:
+ callback(outcome)
+
+ def start(self):
+ """Starts this Future.
+
+ This must be called exactly once, immediately after construction.
+ """
+ with self._lock:
+ self._timer = threading.Timer(
+ self._compute_time - time.time(), self._compute)
+ self._timer.start()
+
+ def cancel(self):
+ """See future.Future.cancel for specification."""
+ with self._lock:
+ if self._computing or self._computed:
+ return False
+ elif self._cancelled:
+ return True
+ else:
+ self._timer.cancel()
+ self._cancelled = True
+ self._outcome = future.aborted()
+ outcome = self._outcome
+ waiting = self._waiting
+
+ for callback in waiting:
+ try:
+ callback(outcome)
+ except Exception: # pylint: disable=broad-except
+ pass
+
+ return True
+
+ def cancelled(self):
+ """See future.Future.cancelled for specification."""
+ with self._lock:
+ return self._cancelled
+
+ def done(self):
+ """See future.Future.done for specification."""
+ with self._lock:
+ return self._computed
+
+ def outcome(self):
+ """See future.Future.outcome for specification."""
+ with self._lock:
+ if self._computed or self._cancelled:
+ return self._outcome
+
+ condition = threading.Condition()
+ def notify_condition(unused_outcome):
+ with condition:
+ condition.notify()
+ self._waiting.append(notify_condition)
+
+ with condition:
+ condition.wait()
+
+ with self._lock:
+ return self._outcome
+
+ def add_done_callback(self, callback):
+ """See future.Future.add_done_callback for specification."""
+ with self._lock:
+ if not self._computed and not self._cancelled:
+ self._waiting.append(callback)
+ return
+ else:
+ outcome = self._outcome
+
+ callback(outcome)
diff --git a/src/python/_framework/foundation/abandonment.py b/src/python/_framework/foundation/abandonment.py
new file mode 100644
index 0000000000..960b4d06b4
--- /dev/null
+++ b/src/python/_framework/foundation/abandonment.py
@@ -0,0 +1,38 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for indicating abandonment of computation."""
+
+
+class Abandoned(Exception):
+ """Indicates that some computation is being abandoned.
+
+ Abandoning a computation is different than returning a value or raising
+ an exception indicating some operational or programming defect.
+ """
diff --git a/src/python/_framework/foundation/callable_util.py b/src/python/_framework/foundation/callable_util.py
new file mode 100644
index 0000000000..1f7546cb76
--- /dev/null
+++ b/src/python/_framework/foundation/callable_util.py
@@ -0,0 +1,78 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for working with callables."""
+
+import functools
+import logging
+
+from _framework.foundation import future
+
+
+def _call_logging_exceptions(behavior, message, *args, **kwargs):
+ try:
+ return future.returned(behavior(*args, **kwargs))
+ except Exception as e: # pylint: disable=broad-except
+ logging.exception(message)
+ return future.raised(e)
+
+
+def with_exceptions_logged(behavior, message):
+ """Wraps a callable in a try-except that logs any exceptions it raises.
+
+ Args:
+ behavior: Any callable.
+ message: A string to log if the behavior raises an exception.
+
+ Returns:
+ A callable that when executed invokes the given behavior. The returned
+ callable takes the same arguments as the given behavior but returns a
+ future.Outcome describing whether the given behavior returned a value or
+ raised an exception.
+ """
+ @functools.wraps(behavior)
+ def wrapped_behavior(*args, **kwargs):
+ return _call_logging_exceptions(behavior, message, *args, **kwargs)
+ return wrapped_behavior
+
+
+def call_logging_exceptions(behavior, message, *args, **kwargs):
+ """Calls a behavior in a try-except that logs any exceptions it raises.
+
+ Args:
+ behavior: Any callable.
+ message: A string to log if the behavior raises an exception.
+ *args: Positional arguments to pass to the given behavior.
+ **kwargs: Keyword arguments to pass to the given behavior.
+
+ Returns:
+ A future.Outcome describing whether the given behavior returned a value or
+ raised an exception.
+ """
+ return _call_logging_exceptions(behavior, message, *args, **kwargs)
diff --git a/src/python/_framework/foundation/future.py b/src/python/_framework/foundation/future.py
new file mode 100644
index 0000000000..f00c503257
--- /dev/null
+++ b/src/python/_framework/foundation/future.py
@@ -0,0 +1,172 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""The Future interface missing from Python's standard library.
+
+Python's concurrent.futures library defines a Future class very much like the
+Future defined here, but since that class is concrete and without construction
+semantics it is only available within the concurrent.futures library itself.
+The Future class defined here is an entirely abstract interface that anyone may
+implement and use.
+"""
+
+import abc
+import collections
+
+RETURNED = object()
+RAISED = object()
+ABORTED = object()
+
+
+class Outcome(object):
+ """A sum type describing the outcome of some computation.
+
+ Attributes:
+ category: One of RETURNED, RAISED, or ABORTED, respectively indicating
+ that the computation returned a value, raised an exception, or was
+ aborted.
+ return_value: The value returned by the computation. Must be present if
+ category is RETURNED.
+ exception: The exception raised by the computation. Must be present if
+ category is RAISED.
+ """
+ __metaclass__ = abc.ABCMeta
+
+
+class _EasyOutcome(
+ collections.namedtuple('_EasyOutcome',
+ ['category', 'return_value', 'exception']),
+ Outcome):
+ """A trivial implementation of Outcome."""
+
+# All Outcomes describing abortion are indistinguishable so there might as well
+# be only one.
+_ABORTED_OUTCOME = _EasyOutcome(ABORTED, None, None)
+
+
+def aborted():
+ """Returns an Outcome indicating that a computation was aborted.
+
+ Returns:
+ An Outcome indicating that a computation was aborted.
+ """
+ return _ABORTED_OUTCOME
+
+
+def raised(exception):
+ """Returns an Outcome indicating that a computation raised an exception.
+
+ Args:
+ exception: The exception raised by the computation.
+
+ Returns:
+ An Outcome indicating that a computation raised the given exception.
+ """
+ return _EasyOutcome(RAISED, None, exception)
+
+
+def returned(value):
+ """Returns an Outcome indicating that a computation returned a value.
+
+ Args:
+ value: The value returned by the computation.
+
+ Returns:
+ An Outcome indicating that a computation returned the given value.
+ """
+ return _EasyOutcome(RETURNED, value, None)
+
+
+class Future(object):
+ """A representation of a computation happening in another control flow.
+
+ Computations represented by a Future may have already completed, may be
+ ongoing, or may be yet to be begun.
+
+ Computations represented by a Future are considered uninterruptable; once
+ started they will be allowed to terminate either by returning or raising
+ an exception.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Attempts to cancel the computation.
+
+ Returns:
+ True if the computation will not be allowed to take place or False if
+ the computation has already taken place or is currently taking place.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancelled(self):
+ """Describes whether the computation was cancelled.
+
+ Returns:
+ True if the computation was cancelled and did not take place or False
+ if the computation took place, is taking place, or is scheduled to
+ take place in the future.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def done(self):
+ """Describes whether the computation has taken place.
+
+ Returns:
+ True if the computation took place; False otherwise.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def outcome(self):
+ """Accesses the outcome of the computation.
+
+ If the computation has not yet completed, this method blocks until it has.
+
+ Returns:
+ An Outcome describing the outcome of the computation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_done_callback(self, callback):
+ """Adds a function to be called at completion of the computation.
+
+ The callback will be passed an Outcome object describing the outcome of
+ the computation.
+
+ If the computation has already completed, the callback will be called
+ immediately.
+
+ Args:
+ callback: A callable taking an Outcome as its single parameter.
+ """
+ raise NotImplementedError()
diff --git a/src/python/_framework/foundation/later.py b/src/python/_framework/foundation/later.py
new file mode 100644
index 0000000000..fc2cf578d0
--- /dev/null
+++ b/src/python/_framework/foundation/later.py
@@ -0,0 +1,51 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Enables scheduling execution at a later time."""
+
+import time
+
+from _framework.foundation import _timer_future
+
+
+def later(delay, computation):
+ """Schedules later execution of a callable.
+
+ Args:
+ delay: Any numeric value. Represents the minimum length of time in seconds
+ to allow to pass before beginning the computation. No guarantees are made
+ about the maximum length of time that will pass.
+ computation: A callable that accepts no arguments.
+
+ Returns:
+ A Future representing the scheduled computation.
+ """
+ timer_future = _timer_future.TimerFuture(time.time() + delay, computation)
+ timer_future.start()
+ return timer_future
diff --git a/src/python/_framework/foundation/stream.py b/src/python/_framework/foundation/stream.py
new file mode 100644
index 0000000000..75c0cf145b
--- /dev/null
+++ b/src/python/_framework/foundation/stream.py
@@ -0,0 +1,60 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces related to streams of values or objects."""
+
+import abc
+
+
+class Consumer(object):
+ """Interface for consumers of finite streams of values or objects."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def consume(self, value):
+ """Accepts a value.
+
+ Args:
+ value: Any value accepted by this Consumer.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self):
+ """Indicates to this Consumer that no more values will be supplied."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def consume_and_terminate(self, value):
+ """Supplies a value and signals that no more values will be supplied.
+
+ Args:
+ value: Any value accepted by this Consumer.
+ """
+ raise NotImplementedError()
diff --git a/src/python/_framework/foundation/stream_testing.py b/src/python/_framework/foundation/stream_testing.py
new file mode 100644
index 0000000000..c1acedc5c6
--- /dev/null
+++ b/src/python/_framework/foundation/stream_testing.py
@@ -0,0 +1,73 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for testing stream-related code."""
+
+from _framework.foundation import stream
+
+
+class TestConsumer(stream.Consumer):
+ """A stream.Consumer instrumented for testing.
+
+ Attributes:
+ calls: A sequence of value-termination pairs describing the history of calls
+ made on this object.
+ """
+
+ def __init__(self):
+ self.calls = []
+
+ def consume(self, value):
+ """See stream.Consumer.consume for specification."""
+ self.calls.append((value, False))
+
+ def terminate(self):
+ """See stream.Consumer.terminate for specification."""
+ self.calls.append((None, True))
+
+ def consume_and_terminate(self, value):
+ """See stream.Consumer.consume_and_terminate for specification."""
+ self.calls.append((value, True))
+
+ def is_legal(self):
+ """Reports whether or not a legal sequence of calls has been made."""
+ terminated = False
+ for value, terminal in self.calls:
+ if terminated:
+ return False
+ elif terminal:
+ terminated = True
+ elif value is None:
+ return False
+ else: # pylint: disable=useless-else-on-loop
+ return True
+
+ def values(self):
+ """Returns the sequence of values that have been passed to this Consumer."""
+ return [value for value, _ in self.calls if value]
diff --git a/src/python/_framework/foundation/stream_util.py b/src/python/_framework/foundation/stream_util.py
new file mode 100644
index 0000000000..3a9c043316
--- /dev/null
+++ b/src/python/_framework/foundation/stream_util.py
@@ -0,0 +1,160 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Helpful utilities related to the stream module."""
+
+import logging
+import threading
+
+from _framework.foundation import stream
+
+_NO_VALUE = object()
+
+
+class TransformingConsumer(stream.Consumer):
+ """A stream.Consumer that passes a transformation of its input to another."""
+
+ def __init__(self, transformation, downstream):
+ self._transformation = transformation
+ self._downstream = downstream
+
+ def consume(self, value):
+ self._downstream.consume(self._transformation(value))
+
+ def terminate(self):
+ self._downstream.terminate()
+
+ def consume_and_terminate(self, value):
+ self._downstream.consume_and_terminate(self._transformation(value))
+
+
+class IterableConsumer(stream.Consumer):
+ """A Consumer that when iterated over emits the values it has consumed."""
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._values = []
+ self._active = True
+
+ def consume(self, stock_reply):
+ with self._condition:
+ if self._active:
+ self._values.append(stock_reply)
+ self._condition.notify()
+
+ def terminate(self):
+ with self._condition:
+ self._active = False
+ self._condition.notify()
+
+ def consume_and_terminate(self, stock_reply):
+ with self._condition:
+ if self._active:
+ self._values.append(stock_reply)
+ self._active = False
+ self._condition.notify()
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ with self._condition:
+ while self._active and not self._values:
+ self._condition.wait()
+ if self._values:
+ return self._values.pop(0)
+ else:
+ raise StopIteration()
+
+
+class ThreadSwitchingConsumer(stream.Consumer):
+ """A Consumer decorator that affords serialization and asynchrony."""
+
+ def __init__(self, sink, pool):
+ self._lock = threading.Lock()
+ self._sink = sink
+ self._pool = pool
+ # True if self._spin has been submitted to the pool to be called once and
+ # that call has not yet returned, False otherwise.
+ self._spinning = False
+ self._values = []
+ self._active = True
+
+ def _spin(self, sink, value, terminate):
+ while True:
+ try:
+ if value is _NO_VALUE:
+ sink.terminate()
+ elif terminate:
+ sink.consume_and_terminate(value)
+ else:
+ sink.consume(value)
+ except Exception as e: # pylint:disable=broad-except
+ logging.exception(e)
+
+ with self._lock:
+ if terminate:
+ self._spinning = False
+ return
+ elif self._values:
+ value = self._values.pop(0)
+ terminate = not self._values and not self._active
+ elif not self._active:
+ value = _NO_VALUE
+ terminate = True
+ else:
+ self._spinning = False
+ return
+
+ def consume(self, value):
+ with self._lock:
+ if self._active:
+ if self._spinning:
+ self._values.append(value)
+ else:
+ self._pool.submit(self._spin, self._sink, value, False)
+ self._spinning = True
+
+ def terminate(self):
+ with self._lock:
+ if self._active:
+ self._active = False
+ if not self._spinning:
+ self._pool.submit(self._spin, self._sink, _NO_VALUE, True)
+ self._spinning = True
+
+ def consume_and_terminate(self, value):
+ with self._lock:
+ if self._active:
+ self._active = False
+ if self._spinning:
+ self._values.append(value)
+ else:
+ self._pool.submit(self._spin, self._sink, value, True)
+ self._spinning = True