From 38824e721ebaa3f50e443e1ea4b47d34030e4703 Mon Sep 17 00:00:00 2001 From: Konstantin Varlamov Date: Mon, 7 May 2018 12:22:57 -0400 Subject: C++ migration: add AsyncQueue, the C++ version of FSTDispatchQueue (#1176) AsyncQueue is a queue that executes given operations asynchronously, enforcing that only a single operation is executing at any given time, and that in-progress operations don't spawn more operations. The actual execution is delegated to a platform-specific executor. Executor is an interface for a FIFO queue that executes given operations serially. Two implementations of Executor, one using libdispatch and the other using C++11 standard library, are provided. AsyncQueue is not used anywhere in the code base at this point. --- .../core/src/firebase/firestore/util/async_queue.h | 164 +++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 Firestore/core/src/firebase/firestore/util/async_queue.h (limited to 'Firestore/core/src/firebase/firestore/util/async_queue.h') diff --git a/Firestore/core/src/firebase/firestore/util/async_queue.h b/Firestore/core/src/firebase/firestore/util/async_queue.h new file mode 100644 index 0000000..e2df387 --- /dev/null +++ b/Firestore/core/src/firebase/firestore/util/async_queue.h @@ -0,0 +1,164 @@ +/* + * Copyright 2018 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_ASYNC_QUEUE_H_ +#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_ASYNC_QUEUE_H_ + +#include +#include // NOLINT(build/c++11) +#include +#include + +#include "Firestore/core/src/firebase/firestore/util/executor.h" + +namespace firebase { +namespace firestore { +namespace util { + +/** + * Well-known "timer" ids used when scheduling delayed operations on the + * AsyncQueue. These ids can then be used from tests to check for the + * presence of delayed operations or to run them early. + */ +enum class TimerId { + /** All can be used with `RunDelayedOperationsUntil` to run all timers. */ + All, + + /** + * The following 4 timers are used in `Stream` for the listen and write + * streams. The "Idle" timer is used to close the stream due to inactivity. + * The "ConnectionBackoff" timer is used to restart a stream once the + * appropriate backoff delay has elapsed. + */ + ListenStreamIdle, + ListenStreamConnectionBackoff, + WriteStreamIdle, + WriteStreamConnectionBackoff, + + /** + * A timer used in `OnlineStateTracker` to transition from + * `OnlineStateUnknown` to `Offline` after a set timeout, rather than waiting + * indefinitely for success or failure. + */ + OnlineStateTimeout, +}; + +// A serial queue that executes given operations asynchronously, one at a time. +// Operations may be scheduled to be executed as soon as possible or in the +// future. Operations scheduled for the same time are FIFO-ordered. +// +// `AsyncQueue` wraps a platform-specific executor, adding checks that enforce +// sequential ordering of operations: an enqueued operation, while being run, +// normally cannot enqueue other operations for immediate execution (but see +// `EnqueueRelaxed`). +// +// `AsyncQueue` methods have particular expectations about whether they must be +// invoked on the queue or not; check "preconditions" section in comments on +// each method. +// +// A significant portion of `AsyncQueue` interface only exists for test purposes +// and must *not* be used in regular code. +class AsyncQueue { + public: + using Operation = internal::Executor::Operation; + using Milliseconds = internal::Executor::Milliseconds; + + explicit AsyncQueue(std::unique_ptr executor); + + // Asserts for the caller that it is being invoked as part of an operation on + // the `AsyncQueue`. + void VerifyIsCurrentQueue() const; + + // Enqueue methods + + // Puts the `operation` on the queue to be executed as soon as possible, while + // maintaining FIFO order. + // + // Precondition: `Enqueue` calls cannot be nested; that is, `Enqueue` may not + // be called by a previously enqueued operation when it is run (as a special + // case, destructors invoked when an enqueued operation has run and is being + // destroyed may invoke `Enqueue`). + void Enqueue(const Operation& operation); + + // Like `Enqueue`, but without applying any prerequisite checks. + void EnqueueRelaxed(const Operation& operation); + + // Puts the `operation` on the queue to be executed `delay` milliseconds from + // now, and returns a handle that allows to cancel the operation (provided it + // hasn't run already). + // + // `operation` is tagged by a `timer_id` which allows to identify the caller. + // Only one operation tagged with any given `timer_id` may be on the queue at + // any time; an attempt to put another such operation will result in an + // assertion failure. In tests, these tags also allow to check for presence of + // certain operations and to run certain operations in advance. + // + // Precondition: `EnqueueAfterDelay` is being invoked asynchronously on the + // queue. + DelayedOperation EnqueueAfterDelay(Milliseconds delay, + TimerId timer_id, + const Operation& operation); + + // Direct execution + + // Immediately executes the `operation` on the queue. + // + // This is largely a workaround to allow other classes (GRPC) to directly + // access the underlying dispatch queue without getting `AsyncQueue` into an + // inconsistent state. + // + // Precondition: no other operation is being executed on the queue at the + // moment of the call (i.e., `ExecuteBlocking` cannot call `ExecuteBlocking`). + // + // Precondition: `ExecuteBlocking` is being invoked asynchronously on the + // queue. + void ExecuteBlocking(const Operation& operation); + + // Test-only interface follows + // TODO(varconst): move the test-only interface into a helper object that is + // a friend of AsyncQueue and delegates its public methods to private methods + // on AsyncQueue. + + // Like `Enqueue`, but blocks until the `operation` is complete. + void EnqueueBlocking(const Operation& operation); + + // Checks whether an operation tagged with `timer_id` is currently scheduled + // for execution in the future. + bool IsScheduled(TimerId timer_id) const; + + // Force runs operations scheduled for future execution, in scheduled order, + // up to *and including* the operation tagged with `last_timer_id`. + // + // Precondition: `RunScheduledOperationsUntil` is *not* being invoked on the + // queue. + void RunScheduledOperationsUntil(TimerId last_timer_id); + + private: + Operation Wrap(const Operation& operation); + + // Asserts that the current invocation happens asynchronously on the queue. + void VerifyIsCurrentExecutor() const; + void VerifySequentialOrder() const; + + std::atomic is_operation_in_progress_; + std::unique_ptr executor_; +}; + +} // namespace util +} // namespace firestore +} // namespace firebase + +#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_ASYNC_QUEUE_H_ -- cgit v1.2.3