diff options
Diffstat (limited to 'src/python/grpcio/grpc/beta/utilities.py')
-rw-r--r-- | src/python/grpcio/grpc/beta/utilities.py | 202 |
1 files changed, 100 insertions, 102 deletions
diff --git a/src/python/grpcio/grpc/beta/utilities.py b/src/python/grpcio/grpc/beta/utilities.py index fb07a76579..60525350a7 100644 --- a/src/python/grpcio/grpc/beta/utilities.py +++ b/src/python/grpcio/grpc/beta/utilities.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Utilities for the gRPC Python Beta API.""" import threading @@ -44,107 +43,107 @@ _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( class _ChannelReadyFuture(future.Future): - def __init__(self, channel): - self._condition = threading.Condition() - self._channel = channel - - self._matured = False - self._cancelled = False - self._done_callbacks = [] - - def _block(self, timeout): - until = None if timeout is None else time.time() + timeout - with self._condition: - while True: - if self._cancelled: - raise future.CancelledError() - elif self._matured: - return - else: - if until is None: - self._condition.wait() - else: - remaining = until - time.time() - if remaining < 0: - raise future.TimeoutError() + def __init__(self, channel): + self._condition = threading.Condition() + self._channel = channel + + self._matured = False + self._cancelled = False + self._done_callbacks = [] + + def _block(self, timeout): + until = None if timeout is None else time.time() + timeout + with self._condition: + while True: + if self._cancelled: + raise future.CancelledError() + elif self._matured: + return + else: + if until is None: + self._condition.wait() + else: + remaining = until - time.time() + if remaining < 0: + raise future.TimeoutError() + else: + self._condition.wait(timeout=remaining) + + def _update(self, connectivity): + with self._condition: + if (not self._cancelled and + connectivity is interfaces.ChannelConnectivity.READY): + self._matured = True + self._channel.unsubscribe(self._update) + self._condition.notify_all() + done_callbacks = tuple(self._done_callbacks) + self._done_callbacks = None + else: + return + + for done_callback in done_callbacks: + callable_util.call_logging_exceptions( + done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) + + def cancel(self): + with self._condition: + if not self._matured: + self._cancelled = True + self._channel.unsubscribe(self._update) + self._condition.notify_all() + done_callbacks = tuple(self._done_callbacks) + self._done_callbacks = None else: - self._condition.wait(timeout=remaining) - - def _update(self, connectivity): - with self._condition: - if (not self._cancelled and - connectivity is interfaces.ChannelConnectivity.READY): - self._matured = True - self._channel.unsubscribe(self._update) - self._condition.notify_all() - done_callbacks = tuple(self._done_callbacks) - self._done_callbacks = None - else: - return - - for done_callback in done_callbacks: - callable_util.call_logging_exceptions( - done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) - - def cancel(self): - with self._condition: - if not self._matured: - self._cancelled = True - self._channel.unsubscribe(self._update) - self._condition.notify_all() - done_callbacks = tuple(self._done_callbacks) - self._done_callbacks = None - else: - return False - - for done_callback in done_callbacks: - callable_util.call_logging_exceptions( - done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) - - def cancelled(self): - with self._condition: - return self._cancelled - - def running(self): - with self._condition: - return not self._cancelled and not self._matured - - def done(self): - with self._condition: - return self._cancelled or self._matured - - def result(self, timeout=None): - self._block(timeout) - return None - - def exception(self, timeout=None): - self._block(timeout) - return None - - def traceback(self, timeout=None): - self._block(timeout) - return None - - def add_done_callback(self, fn): - with self._condition: - if not self._cancelled and not self._matured: - self._done_callbacks.append(fn) - return - - fn(self) - - def start(self): - with self._condition: - self._channel.subscribe(self._update, try_to_connect=True) - - def __del__(self): - with self._condition: - if not self._cancelled and not self._matured: - self._channel.unsubscribe(self._update) + return False + + for done_callback in done_callbacks: + callable_util.call_logging_exceptions( + done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) + + def cancelled(self): + with self._condition: + return self._cancelled + + def running(self): + with self._condition: + return not self._cancelled and not self._matured + + def done(self): + with self._condition: + return self._cancelled or self._matured + + def result(self, timeout=None): + self._block(timeout) + return None + + def exception(self, timeout=None): + self._block(timeout) + return None + + def traceback(self, timeout=None): + self._block(timeout) + return None + + def add_done_callback(self, fn): + with self._condition: + if not self._cancelled and not self._matured: + self._done_callbacks.append(fn) + return + + fn(self) + + def start(self): + with self._condition: + self._channel.subscribe(self._update, try_to_connect=True) + + def __del__(self): + with self._condition: + if not self._cancelled and not self._matured: + self._channel.unsubscribe(self._update) def channel_ready_future(channel): - """Creates a future.Future tracking when an implementations.Channel is ready. + """Creates a future.Future tracking when an implementations.Channel is ready. Cancelling the returned future.Future does not tell the given implementations.Channel to abandon attempts it may have been making to @@ -158,7 +157,6 @@ def channel_ready_future(channel): A future.Future that matures when the given Channel has connectivity interfaces.ChannelConnectivity.READY. """ - ready_future = _ChannelReadyFuture(channel) - ready_future.start() - return ready_future - + ready_future = _ChannelReadyFuture(channel) + ready_future.start() + return ready_future |