aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/core/src
diff options
context:
space:
mode:
authorGravatar Konstantin Varlamov <var-const@users.noreply.github.com>2018-05-07 12:22:57 -0400
committerGravatar GitHub <noreply@github.com>2018-05-07 12:22:57 -0400
commit38824e721ebaa3f50e443e1ea4b47d34030e4703 (patch)
treeb4a7f806b52f40d85fb92e571c216acaeba2fdc3 /Firestore/core/src
parent04a28fce81c737b6505e6c542a14d8529c9f891d (diff)
C++ migration: add AsyncQueue, the C++ version of FSTDispatchQueue (#1176)
AsyncQueue is a queue that executes given operations asynchronously, enforcing that only a single operation is executing at any given time, and that in-progress operations don't spawn more operations. The actual execution is delegated to a platform-specific executor. Executor is an interface for a FIFO queue that executes given operations serially. Two implementations of Executor, one using libdispatch and the other using C++11 standard library, are provided. AsyncQueue is not used anywhere in the code base at this point.
Diffstat (limited to 'Firestore/core/src')
-rw-r--r--Firestore/core/src/firebase/firestore/util/CMakeLists.txt58
-rw-r--r--Firestore/core/src/firebase/firestore/util/async_queue.cc140
-rw-r--r--Firestore/core/src/firebase/firestore/util/async_queue.h164
-rw-r--r--Firestore/core/src/firebase/firestore/util/executor.h129
-rw-r--r--Firestore/core/src/firebase/firestore/util/executor_libdispatch.cc296
-rw-r--r--Firestore/core/src/firebase/firestore/util/executor_libdispatch.h92
-rw-r--r--Firestore/core/src/firebase/firestore/util/executor_std.cc155
-rw-r--r--Firestore/core/src/firebase/firestore/util/executor_std.h281
8 files changed, 1315 insertions, 0 deletions
diff --git a/Firestore/core/src/firebase/firestore/util/CMakeLists.txt b/Firestore/core/src/firebase/firestore/util/CMakeLists.txt
index 95cd72f..29d91c7 100644
--- a/Firestore/core/src/firebase/firestore/util/CMakeLists.txt
+++ b/Firestore/core/src/firebase/firestore/util/CMakeLists.txt
@@ -109,6 +109,63 @@ else()
endif()
+## async queue
+
+check_symbol_exists(dispatch_async_f dispatch/dispatch.h HAVE_LIBDISPATCH)
+
+cc_library(
+ firebase_firestore_util_executor_std
+ SOURCES
+ executor_std.cc
+ executor_std.h
+ executor.h
+ DEPENDS
+ absl_bad_optional_access
+ absl_optional
+ ${FIREBASE_FIRESTORE_UTIL_LOG}
+ EXCLUDE_FROM_ALL
+)
+
+if(HAVE_LIBDISPATCH)
+cc_library(
+ firebase_firestore_util_executor_libdispatch
+ SOURCES
+ executor_libdispatch.cc
+ executor_libdispatch.h
+ executor.h
+ DEPENDS
+ absl_bad_optional_access
+ absl_optional
+ absl_strings
+ ${FIREBASE_FIRESTORE_UTIL_LOG}
+ EXCLUDE_FROM_ALL
+)
+endif()
+
+if(HAVE_LIBDISPATCH)
+ set(
+ FIREBASE_FIRESTORE_UTIL_EXECUTOR
+ firebase_firestore_util_executor_libdispatch
+ )
+
+else()
+ set(
+ FIREBASE_FIRESTORE_UTIL_EXECUTOR
+ firebase_firestore_util_executor_std
+ )
+
+endif()
+
+cc_library(
+ firebase_firestore_util_async_queue
+ SOURCES
+ async_queue.cc
+ async_queue.h
+ DEPENDS
+ ${FIREBASE_FIRESTORE_UTIL_EXECUTOR}
+ ${FIREBASE_FIRESTORE_UTIL_LOG}
+ EXCLUDE_FROM_ALL
+)
## main library
@@ -144,6 +201,7 @@ cc_library(
DEPENDS
absl_base
firebase_firestore_util_base
+ firebase_firestore_util_async_queue
${FIREBASE_FIRESTORE_UTIL_LOG}
${FIREBASE_FIRESTORE_UTIL_RANDOM}
)
diff --git a/Firestore/core/src/firebase/firestore/util/async_queue.cc b/Firestore/core/src/firebase/firestore/util/async_queue.cc
new file mode 100644
index 0000000..71f5cc5
--- /dev/null
+++ b/Firestore/core/src/firebase/firestore/util/async_queue.cc
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2018 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Firestore/core/src/firebase/firestore/util/async_queue.h"
+
+#include <utility>
+
+#include "Firestore/core/src/firebase/firestore/util/firebase_assert.h"
+#include "absl/memory/memory.h"
+
+namespace firebase {
+namespace firestore {
+namespace util {
+
+using internal::Executor;
+
+AsyncQueue::AsyncQueue(std::unique_ptr<Executor> executor)
+ : executor_{std::move(executor)} {
+ is_operation_in_progress_ = false;
+}
+
+void AsyncQueue::VerifyIsCurrentExecutor() const {
+ FIREBASE_ASSERT_MESSAGE(
+ executor_->IsCurrentExecutor(),
+ "Expected to be called by the executor associated with this queue "
+ "(expected executor: '%s', actual executor: '%s')",
+ executor_->Name().c_str(), executor_->CurrentExecutorName().c_str());
+}
+
+void AsyncQueue::VerifyIsCurrentQueue() const {
+ VerifyIsCurrentExecutor();
+ FIREBASE_ASSERT_MESSAGE(
+ is_operation_in_progress_,
+ "VerifyIsCurrentQueue called when no operation is executing "
+ "(expected executor: '%s', actual executor: '%s')",
+ executor_->Name().c_str(), executor_->CurrentExecutorName().c_str());
+}
+
+void AsyncQueue::ExecuteBlocking(const Operation& operation) {
+ VerifyIsCurrentExecutor();
+ FIREBASE_ASSERT_MESSAGE(!is_operation_in_progress_,
+ "ExecuteBlocking may not be called "
+ "before the previous operation finishes executing");
+
+ is_operation_in_progress_ = true;
+ operation();
+ is_operation_in_progress_ = false;
+}
+
+void AsyncQueue::Enqueue(const Operation& operation) {
+ VerifySequentialOrder();
+ EnqueueRelaxed(operation);
+}
+
+void AsyncQueue::EnqueueRelaxed(const Operation& operation) {
+ executor_->Execute(Wrap(operation));
+}
+
+DelayedOperation AsyncQueue::EnqueueAfterDelay(const Milliseconds delay,
+ const TimerId timer_id,
+ const Operation& operation) {
+ VerifyIsCurrentExecutor();
+
+ // While not necessarily harmful, we currently don't expect to have multiple
+ // callbacks with the same timer_id in the queue, so defensively reject
+ // them.
+ FIREBASE_ASSERT_MESSAGE(
+ !IsScheduled(timer_id),
+ "Attempted to schedule multiple operations with id %d", timer_id);
+
+ Executor::TaggedOperation tagged{static_cast<int>(timer_id), Wrap(operation)};
+ return executor_->Schedule(delay, std::move(tagged));
+}
+
+AsyncQueue::Operation AsyncQueue::Wrap(const Operation& operation) {
+ // Decorator pattern: wrap `operation` into a call to `ExecuteBlocking` to
+ // ensure that it doesn't spawn any nested operations.
+
+ // Note: can't move `operation` into lambda until C++14.
+ return [this, operation] { ExecuteBlocking(operation); };
+}
+
+void AsyncQueue::VerifySequentialOrder() const {
+ // This is the inverse of `VerifyIsCurrentQueue`.
+ FIREBASE_ASSERT_MESSAGE(
+ !is_operation_in_progress_ || !executor_->IsCurrentExecutor(),
+ "Enforcing sequential order failed: currently executing operations "
+ "cannot enqueue more operations "
+ "(this queue's executor: '%s', current executor: '%s')",
+ executor_->Name().c_str(), executor_->CurrentExecutorName().c_str());
+}
+
+// Test-only functions
+
+void AsyncQueue::EnqueueBlocking(const Operation& operation) {
+ VerifySequentialOrder();
+ executor_->ExecuteBlocking(Wrap(operation));
+}
+
+bool AsyncQueue::IsScheduled(const TimerId timer_id) const {
+ return executor_->IsScheduled(static_cast<int>(timer_id));
+}
+
+void AsyncQueue::RunScheduledOperationsUntil(const TimerId last_timer_id) {
+ FIREBASE_ASSERT_MESSAGE(
+ !executor_->IsCurrentExecutor(),
+ "RunScheduledOperationsUntil must not be called on the queue");
+
+ executor_->ExecuteBlocking([this, last_timer_id] {
+ FIREBASE_ASSERT_MESSAGE(
+ last_timer_id == TimerId::All || IsScheduled(last_timer_id),
+ "Attempted to run scheduled operations until missing timer id: %d",
+ last_timer_id);
+
+ for (auto next = executor_->PopFromSchedule(); next.has_value();
+ next = executor_->PopFromSchedule()) {
+ next->operation();
+ if (next->tag == static_cast<int>(last_timer_id)) {
+ break;
+ }
+ }
+ });
+}
+
+} // namespace util
+} // namespace firestore
+} // namespace firebase
diff --git a/Firestore/core/src/firebase/firestore/util/async_queue.h b/Firestore/core/src/firebase/firestore/util/async_queue.h
new file mode 100644
index 0000000..e2df387
--- /dev/null
+++ b/Firestore/core/src/firebase/firestore/util/async_queue.h
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2018 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_ASYNC_QUEUE_H_
+#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_ASYNC_QUEUE_H_
+
+#include <atomic>
+#include <chrono> // NOLINT(build/c++11)
+#include <functional>
+#include <memory>
+
+#include "Firestore/core/src/firebase/firestore/util/executor.h"
+
+namespace firebase {
+namespace firestore {
+namespace util {
+
+/**
+ * Well-known "timer" ids used when scheduling delayed operations on the
+ * AsyncQueue. These ids can then be used from tests to check for the
+ * presence of delayed operations or to run them early.
+ */
+enum class TimerId {
+ /** All can be used with `RunDelayedOperationsUntil` to run all timers. */
+ All,
+
+ /**
+ * The following 4 timers are used in `Stream` for the listen and write
+ * streams. The "Idle" timer is used to close the stream due to inactivity.
+ * The "ConnectionBackoff" timer is used to restart a stream once the
+ * appropriate backoff delay has elapsed.
+ */
+ ListenStreamIdle,
+ ListenStreamConnectionBackoff,
+ WriteStreamIdle,
+ WriteStreamConnectionBackoff,
+
+ /**
+ * A timer used in `OnlineStateTracker` to transition from
+ * `OnlineStateUnknown` to `Offline` after a set timeout, rather than waiting
+ * indefinitely for success or failure.
+ */
+ OnlineStateTimeout,
+};
+
+// A serial queue that executes given operations asynchronously, one at a time.
+// Operations may be scheduled to be executed as soon as possible or in the
+// future. Operations scheduled for the same time are FIFO-ordered.
+//
+// `AsyncQueue` wraps a platform-specific executor, adding checks that enforce
+// sequential ordering of operations: an enqueued operation, while being run,
+// normally cannot enqueue other operations for immediate execution (but see
+// `EnqueueRelaxed`).
+//
+// `AsyncQueue` methods have particular expectations about whether they must be
+// invoked on the queue or not; check "preconditions" section in comments on
+// each method.
+//
+// A significant portion of `AsyncQueue` interface only exists for test purposes
+// and must *not* be used in regular code.
+class AsyncQueue {
+ public:
+ using Operation = internal::Executor::Operation;
+ using Milliseconds = internal::Executor::Milliseconds;
+
+ explicit AsyncQueue(std::unique_ptr<internal::Executor> executor);
+
+ // Asserts for the caller that it is being invoked as part of an operation on
+ // the `AsyncQueue`.
+ void VerifyIsCurrentQueue() const;
+
+ // Enqueue methods
+
+ // Puts the `operation` on the queue to be executed as soon as possible, while
+ // maintaining FIFO order.
+ //
+ // Precondition: `Enqueue` calls cannot be nested; that is, `Enqueue` may not
+ // be called by a previously enqueued operation when it is run (as a special
+ // case, destructors invoked when an enqueued operation has run and is being
+ // destroyed may invoke `Enqueue`).
+ void Enqueue(const Operation& operation);
+
+ // Like `Enqueue`, but without applying any prerequisite checks.
+ void EnqueueRelaxed(const Operation& operation);
+
+ // Puts the `operation` on the queue to be executed `delay` milliseconds from
+ // now, and returns a handle that allows to cancel the operation (provided it
+ // hasn't run already).
+ //
+ // `operation` is tagged by a `timer_id` which allows to identify the caller.
+ // Only one operation tagged with any given `timer_id` may be on the queue at
+ // any time; an attempt to put another such operation will result in an
+ // assertion failure. In tests, these tags also allow to check for presence of
+ // certain operations and to run certain operations in advance.
+ //
+ // Precondition: `EnqueueAfterDelay` is being invoked asynchronously on the
+ // queue.
+ DelayedOperation EnqueueAfterDelay(Milliseconds delay,
+ TimerId timer_id,
+ const Operation& operation);
+
+ // Direct execution
+
+ // Immediately executes the `operation` on the queue.
+ //
+ // This is largely a workaround to allow other classes (GRPC) to directly
+ // access the underlying dispatch queue without getting `AsyncQueue` into an
+ // inconsistent state.
+ //
+ // Precondition: no other operation is being executed on the queue at the
+ // moment of the call (i.e., `ExecuteBlocking` cannot call `ExecuteBlocking`).
+ //
+ // Precondition: `ExecuteBlocking` is being invoked asynchronously on the
+ // queue.
+ void ExecuteBlocking(const Operation& operation);
+
+ // Test-only interface follows
+ // TODO(varconst): move the test-only interface into a helper object that is
+ // a friend of AsyncQueue and delegates its public methods to private methods
+ // on AsyncQueue.
+
+ // Like `Enqueue`, but blocks until the `operation` is complete.
+ void EnqueueBlocking(const Operation& operation);
+
+ // Checks whether an operation tagged with `timer_id` is currently scheduled
+ // for execution in the future.
+ bool IsScheduled(TimerId timer_id) const;
+
+ // Force runs operations scheduled for future execution, in scheduled order,
+ // up to *and including* the operation tagged with `last_timer_id`.
+ //
+ // Precondition: `RunScheduledOperationsUntil` is *not* being invoked on the
+ // queue.
+ void RunScheduledOperationsUntil(TimerId last_timer_id);
+
+ private:
+ Operation Wrap(const Operation& operation);
+
+ // Asserts that the current invocation happens asynchronously on the queue.
+ void VerifyIsCurrentExecutor() const;
+ void VerifySequentialOrder() const;
+
+ std::atomic<bool> is_operation_in_progress_;
+ std::unique_ptr<internal::Executor> executor_;
+};
+
+} // namespace util
+} // namespace firestore
+} // namespace firebase
+
+#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_ASYNC_QUEUE_H_
diff --git a/Firestore/core/src/firebase/firestore/util/executor.h b/Firestore/core/src/firebase/firestore/util/executor.h
new file mode 100644
index 0000000..df8b0b5
--- /dev/null
+++ b/Firestore/core/src/firebase/firestore/util/executor.h
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2018 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_H_
+#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_H_
+
+#include <chrono> // NOLINT(build/c++11)
+#include <functional>
+#include <string>
+#include <utility>
+
+#include "absl/types/optional.h"
+
+namespace firebase {
+namespace firestore {
+namespace util {
+
+// A handle to an operation scheduled for future execution. The handle may
+// outlive the operation, but it *cannot* outlive the executor that created it.
+class DelayedOperation {
+ public:
+ DelayedOperation() {
+ }
+
+ // If the operation has not been run yet, cancels the operation. Otherwise,
+ // this function is a no-op.
+ void Cancel() {
+ cancel_func_();
+ }
+
+ // Internal use only.
+ explicit DelayedOperation(std::function<void()>&& cancel_func)
+ : cancel_func_{std::move(cancel_func)} {
+ }
+
+ private:
+ std::function<void()> cancel_func_;
+};
+
+namespace internal {
+
+// An interface to a platform-specific executor of asynchronous operations
+// (called tasks on other platforms).
+//
+// Operations may be scheduled for immediate or delayed execution. Operations
+// delayed until the exact same time are scheduled in FIFO order.
+//
+// The operations are executed sequentially; only a single operation is executed
+// at any given time.
+//
+// Delayed operations may be canceled if they have not already been run.
+class Executor {
+ public:
+ using Tag = int;
+ using Operation = std::function<void()>;
+ using Milliseconds = std::chrono::milliseconds;
+
+ // Operations scheduled for future execution have an opaque tag. The value of
+ // the tag is ignored by the executor but can be used to find operations with
+ // a given tag after they are scheduled.
+ struct TaggedOperation {
+ TaggedOperation() {
+ }
+ TaggedOperation(const Tag tag, Operation&& operation)
+ : tag{tag}, operation{std::move(operation)} {
+ }
+ Tag tag = 0;
+ Operation operation;
+ };
+
+ virtual ~Executor() {
+ }
+
+ // Schedules the `operation` to be asynchronously executed as soon as
+ // possible, in FIFO order.
+ virtual void Execute(Operation&& operation) = 0;
+ // Like `Execute`, but blocks until the `operation` finishes, consequently
+ // draining immediate operations from the executor.
+ virtual void ExecuteBlocking(Operation&& operation) = 0;
+ // Scheduled the given `operation` to be executed after `delay` milliseconds
+ // from now, and returns a handle that allows to cancel the operation
+ // (provided it hasn't been run already). The operation is tagged to allow
+ // retrieving it later.
+ //
+ // `delay` must be non-negative; use `Execute` to schedule operations for
+ // immediate execution.
+ virtual DelayedOperation Schedule(Milliseconds delay,
+ TaggedOperation&& operation) = 0;
+
+ // Checks for the caller whether it is being invoked by this executor.
+ virtual bool IsCurrentExecutor() const = 0;
+ // Returns some sort of an identifier for the current execution context. The
+ // only guarantee is that it will return different values depending on whether
+ // this function is invoked by this executor or not.
+ virtual std::string CurrentExecutorName() const = 0;
+ // Like `CurrentExecutorName`, but returns an identifier for this executor,
+ // whether the caller code currently runs on this executor or not.
+ virtual std::string Name() const = 0;
+
+ // Checks whether an operation tagged with the given `tag` is currently
+ // scheduled for future execution.
+ virtual bool IsScheduled(Tag tag) const = 0;
+ // Removes the nearest due scheduled operation from the schedule and returns
+ // it to the caller. This function may be used to reschedule operations.
+ // Immediate operations don't count; only operations scheduled for delayed
+ // execution may be removed. If no such operations are currently scheduled, an
+ // empty `optional` is returned.
+ virtual absl::optional<TaggedOperation> PopFromSchedule() = 0;
+};
+
+} // namespace internal
+} // namespace util
+} // namespace firestore
+} // namespace firebase
+
+#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_H_
diff --git a/Firestore/core/src/firebase/firestore/util/executor_libdispatch.cc b/Firestore/core/src/firebase/firestore/util/executor_libdispatch.cc
new file mode 100644
index 0000000..b40f0dd
--- /dev/null
+++ b/Firestore/core/src/firebase/firestore/util/executor_libdispatch.cc
@@ -0,0 +1,296 @@
+/*
+ * Copyright 2018 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Firestore/core/src/firebase/firestore/util/executor_libdispatch.h"
+
+namespace firebase {
+namespace firestore {
+namespace util {
+namespace internal {
+
+namespace {
+
+absl::string_view StringViewFromDispatchLabel(const char* const label) {
+ // Make sure string_view's data is not null, because it's used for logging.
+ return label ? absl::string_view{label} : absl::string_view{""};
+}
+
+void RunSynchronized(const ExecutorLibdispatch* const executor,
+ std::function<void()>&& work) {
+ if (executor->IsCurrentExecutor()) {
+ work();
+ } else {
+ DispatchSync(executor->dispatch_queue(), std::move(work));
+ }
+}
+
+} // namespace
+
+void DispatchAsync(const dispatch_queue_t queue, std::function<void()>&& work) {
+ // Dynamically allocate the function to make sure the object is valid by the
+ // time libdispatch gets to it.
+ const auto wrap = new std::function<void()>{std::move(work)};
+
+ dispatch_async_f(queue, wrap, [](void* const raw_work) {
+ const auto unwrap = static_cast<std::function<void()>*>(raw_work);
+ (*unwrap)();
+ delete unwrap;
+ });
+}
+
+void DispatchSync(const dispatch_queue_t queue, Executor::Operation work) {
+ // Unlike dispatch_async_f, dispatch_sync_f blocks until the work passed to it
+ // is done, so passing a reference to a local variable is okay.
+ dispatch_sync_f(queue, &work, [](void* const raw_work) {
+ const auto unwrap = static_cast<const Executor::Operation*>(raw_work);
+ (*unwrap)();
+ });
+}
+
+// Represents a "busy" time slot on the schedule.
+//
+// Since libdispatch doesn't provide a way to cancel a scheduled operation, once
+// a slot is created, it will always stay in the schedule until the time is
+// past. Consequently, it is more useful to think of a time slot than
+// a particular scheduled operation -- by the time the slot comes, operation may
+// or may not be there (imagine getting to a meeting and finding out it's been
+// canceled).
+//
+// Precondition: all member functions, including the constructor, are *only*
+// invoked on the Firestore queue.
+//
+// Ownership:
+//
+// - `TimeSlot` is exclusively owned by libdispatch;
+// - `ExecutorLibdispatch` contains non-owning pointers to `TimeSlot`s;
+// - invariant: if the executor contains a pointer to a `TimeSlot`, it is
+// a valid object. It is achieved because when libdispatch invokes
+// a `TimeSlot`, it always removes it from the executor before deleting it.
+// The reverse is not true: a canceled time slot is removed from the executor,
+// but won't be destroyed until its original due time is past.
+
+class TimeSlot {
+ public:
+ TimeSlot(ExecutorLibdispatch* executor,
+ Executor::Milliseconds delay,
+ Executor::TaggedOperation&& operation);
+
+ // Returns the operation that was scheduled for this time slot and turns the
+ // slot into a no-op.
+ Executor::TaggedOperation Unschedule();
+
+ bool operator<(const TimeSlot& rhs) const {
+ return target_time_ < rhs.target_time_;
+ }
+ bool operator==(const Executor::Tag tag) const {
+ return tagged_.tag == tag;
+ }
+
+ void MarkDone() {
+ done_ = true;
+ }
+
+ static void InvokedByLibdispatch(void* const raw_self);
+
+ private:
+ void Execute();
+ void RemoveFromSchedule();
+
+ using TimePoint = std::chrono::time_point<std::chrono::system_clock,
+ Executor::Milliseconds>;
+
+ ExecutorLibdispatch* const executor_;
+ const TimePoint target_time_; // Used for sorting
+ Executor::TaggedOperation tagged_;
+
+ // True if the operation has either been run or canceled.
+ //
+ // Note on thread-safety: because the precondition is that all member
+ // functions of this class are executed on the dispatch queue, no
+ // synchronization is required for `done_`.
+ bool done_ = false;
+};
+
+TimeSlot::TimeSlot(ExecutorLibdispatch* const executor,
+ const Executor::Milliseconds delay,
+ Executor::TaggedOperation&& operation)
+ : executor_{executor},
+ target_time_{std::chrono::time_point_cast<Executor::Milliseconds>(
+ std::chrono::system_clock::now()) +
+ delay},
+ tagged_{std::move(operation)} {
+}
+
+Executor::TaggedOperation TimeSlot::Unschedule() {
+ if (!done_) {
+ RemoveFromSchedule();
+ }
+ return std::move(tagged_);
+}
+
+void TimeSlot::InvokedByLibdispatch(void* const raw_self) {
+ auto const self = static_cast<TimeSlot*>(raw_self);
+ self->Execute();
+ delete self;
+}
+
+void TimeSlot::Execute() {
+ if (done_) {
+ // `done_` might mean that the executor is already destroyed, so don't call
+ // `RemoveFromSchedule`.
+ return;
+ }
+
+ RemoveFromSchedule();
+
+ FIREBASE_ASSERT_MESSAGE(tagged_.operation,
+ "TimeSlot contains an invalid function object");
+ tagged_.operation();
+}
+
+void TimeSlot::RemoveFromSchedule() {
+ executor_->RemoveFromSchedule(this);
+}
+
+// ExecutorLibdispatch
+
+ExecutorLibdispatch::ExecutorLibdispatch(const dispatch_queue_t dispatch_queue)
+ : dispatch_queue_{dispatch_queue} {
+}
+ExecutorLibdispatch::ExecutorLibdispatch()
+ : ExecutorLibdispatch{dispatch_queue_create("com.google.firebase.firestore",
+ DISPATCH_QUEUE_SERIAL)} {
+}
+
+ExecutorLibdispatch::~ExecutorLibdispatch() {
+ // Turn any operations that might still be in the queue into no-ops, lest
+ // they try to access `ExecutorLibdispatch` after it gets destroyed. Because
+ // the queue is serial, by the time libdispatch gets to the newly-enqueued
+ // work, the pending operations that might have been in progress would have
+ // already finished.
+ RunSynchronized(this, [this] {
+ for (auto slot : schedule_) {
+ slot->MarkDone();
+ }
+ });
+}
+
+bool ExecutorLibdispatch::IsCurrentExecutor() const {
+ return GetCurrentQueueLabel().data() == GetTargetQueueLabel().data();
+}
+std::string ExecutorLibdispatch::CurrentExecutorName() const {
+ return GetCurrentQueueLabel().data();
+}
+std::string ExecutorLibdispatch::Name() const {
+ return GetTargetQueueLabel().data();
+}
+
+void ExecutorLibdispatch::Execute(Operation&& operation) {
+ DispatchAsync(dispatch_queue(), std::move(operation));
+}
+void ExecutorLibdispatch::ExecuteBlocking(Operation&& operation) {
+ DispatchSync(dispatch_queue(), std::move(operation));
+}
+
+DelayedOperation ExecutorLibdispatch::Schedule(const Milliseconds delay,
+ TaggedOperation&& operation) {
+ namespace chr = std::chrono;
+ const dispatch_time_t delay_ns = dispatch_time(
+ DISPATCH_TIME_NOW, chr::duration_cast<chr::nanoseconds>(delay).count());
+
+ // Ownership is fully transferred to libdispatch -- because it's impossible
+ // to truly cancel work after it's been dispatched, libdispatch is
+ // guaranteed to outlive the executor, and it's possible for work to be
+ // invoked by libdispatch after the executor is destroyed. Executor only
+ // stores an observer pointer to the operation.
+
+ auto const time_slot = new TimeSlot{this, delay, std::move(operation)};
+ dispatch_after_f(delay_ns, dispatch_queue(), time_slot,
+ TimeSlot::InvokedByLibdispatch);
+ RunSynchronized(this, [this, time_slot] { schedule_.push_back(time_slot); });
+ return DelayedOperation{[this, time_slot] {
+ // `time_slot` might be destroyed by the time cancellation function runs.
+ // Therefore, don't access any methods on `time_slot`, only use it as
+ // a handle to remove from `schedule_`.
+ RemoveFromSchedule(time_slot);
+ }};
+}
+
+void ExecutorLibdispatch::RemoveFromSchedule(const TimeSlot* const to_remove) {
+ RunSynchronized(this, [this, to_remove] {
+ const auto found = std::find_if(
+ schedule_.begin(), schedule_.end(),
+ [to_remove](const TimeSlot* op) { return op == to_remove; });
+ // It's possible for the operation to be missing if libdispatch gets to run
+ // it after it was force-run, for example.
+ if (found != schedule_.end()) {
+ (*found)->MarkDone();
+ schedule_.erase(found);
+ }
+ });
+}
+
+// GetLabel functions are guaranteed to never return a "null" string_view
+// (i.e. data() != nullptr).
+absl::string_view ExecutorLibdispatch::GetCurrentQueueLabel() const {
+ // Note: dispatch_queue_get_label may return nullptr if the queue wasn't
+ // initialized with a label.
+ return StringViewFromDispatchLabel(
+ dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL));
+}
+
+absl::string_view ExecutorLibdispatch::GetTargetQueueLabel() const {
+ return StringViewFromDispatchLabel(
+ dispatch_queue_get_label(dispatch_queue()));
+}
+
+// Test-only methods
+
+bool ExecutorLibdispatch::IsScheduled(const Tag tag) const {
+ bool result = false;
+ RunSynchronized(this, [this, tag, &result] {
+ result = std::find_if(schedule_.begin(), schedule_.end(),
+ [&tag](const TimeSlot* const operation) {
+ return *operation == tag;
+ }) != schedule_.end();
+ });
+ return result;
+}
+
+absl::optional<Executor::TaggedOperation>
+ExecutorLibdispatch::PopFromSchedule() {
+ absl::optional<Executor::TaggedOperation> result;
+
+ RunSynchronized(this, [this, &result] {
+ if (schedule_.empty()) {
+ return;
+ }
+ // Sorting upon each call to `PopFromSchedule` is inefficient, which is
+ // consciously ignored because this function is only ever called from tests.
+ std::sort(
+ schedule_.begin(), schedule_.end(),
+ [](const TimeSlot* lhs, const TimeSlot* rhs) { return *lhs < *rhs; });
+ const auto nearest = schedule_.begin();
+ result = (*nearest)->Unschedule();
+ });
+
+ return result;
+}
+
+} // namespace internal
+} // namespace util
+} // namespace firestore
+} // namespace firebase
diff --git a/Firestore/core/src/firebase/firestore/util/executor_libdispatch.h b/Firestore/core/src/firebase/firestore/util/executor_libdispatch.h
new file mode 100644
index 0000000..b32dbff
--- /dev/null
+++ b/Firestore/core/src/firebase/firestore/util/executor_libdispatch.h
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2018 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_LIBDISPATCH_H_
+#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_LIBDISPATCH_H_
+
+#include <atomic>
+#include <chrono> // NOLINT(build/c++11)
+#include <functional>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+#include "dispatch/dispatch.h"
+
+#include "Firestore/core/src/firebase/firestore/util/executor.h"
+#include "Firestore/core/src/firebase/firestore/util/firebase_assert.h"
+#include "absl/strings/string_view.h"
+
+namespace firebase {
+namespace firestore {
+namespace util {
+
+namespace internal {
+
+// Generic wrapper over `dispatch_async_f`, providing `dispatch_async`-like
+// interface: accepts an arbitrary invocable object in place of an Objective-C
+// block.
+void DispatchAsync(const dispatch_queue_t queue, std::function<void()>&& work);
+
+// Similar to `DispatchAsync` but wraps `dispatch_sync_f`.
+void DispatchSync(const dispatch_queue_t queue, std::function<void()> work);
+
+class TimeSlot;
+
+// A serial queue built on top of libdispatch. The operations are run on
+// a dedicated serial dispatch queue.
+class ExecutorLibdispatch : public Executor {
+ public:
+ ExecutorLibdispatch();
+ explicit ExecutorLibdispatch(dispatch_queue_t dispatch_queue);
+ ~ExecutorLibdispatch();
+
+ bool IsCurrentExecutor() const override;
+ std::string CurrentExecutorName() const override;
+ std::string Name() const override;
+
+ void Execute(Operation&& operation) override;
+ void ExecuteBlocking(Operation&& operation) override;
+ DelayedOperation Schedule(Milliseconds delay,
+ TaggedOperation&& operation) override;
+
+ void RemoveFromSchedule(const TimeSlot* to_remove);
+
+ bool IsScheduled(Tag tag) const override;
+ absl::optional<TaggedOperation> PopFromSchedule() override;
+
+ dispatch_queue_t dispatch_queue() const {
+ return dispatch_queue_;
+ }
+
+ private:
+ // GetLabel functions are guaranteed to never return a "null" string_view
+ // (i.e. data() != nullptr).
+ absl::string_view GetCurrentQueueLabel() const;
+ absl::string_view GetTargetQueueLabel() const;
+
+ std::atomic<dispatch_queue_t> dispatch_queue_;
+ // Stores non-owned pointers to `TimeSlot`s.
+ // Invariant: if a `TimeSlot` is in `schedule_`, it's a valid pointer.
+ std::vector<TimeSlot*> schedule_;
+};
+
+} // namespace internal
+} // namespace util
+} // namespace firestore
+} // namespace firebase
+
+#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_LIBDISPATCH_H_
diff --git a/Firestore/core/src/firebase/firestore/util/executor_std.cc b/Firestore/core/src/firebase/firestore/util/executor_std.cc
new file mode 100644
index 0000000..59197e1
--- /dev/null
+++ b/Firestore/core/src/firebase/firestore/util/executor_std.cc
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2018 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Firestore/core/src/firebase/firestore/util/executor_std.h"
+
+#include <future> // NOLINT(build/c++11)
+#include <sstream>
+
+namespace firebase {
+namespace firestore {
+namespace util {
+namespace internal {
+
+namespace {
+
+// The only guarantee is that different `thread_id`s will produce different
+// values.
+std::string ThreadIdToString(const std::thread::id thread_id) {
+ std::ostringstream stream;
+ stream << thread_id;
+ return stream.str();
+}
+
+} // namespace
+
+ExecutorStd::ExecutorStd() {
+ // Somewhat counter-intuitively, constructor of `std::atomic` assigns the
+ // value non-atomically, so the atomic initialization must be provided here,
+ // before the worker thread is started.
+ // See [this thread](https://stackoverflow.com/questions/25609858) for context
+ // on the constructor.
+ current_id_ = 0;
+ shutting_down_ = false;
+ worker_thread_ = std::thread{&ExecutorStd::PollingThread, this};
+}
+
+ExecutorStd::~ExecutorStd() {
+ shutting_down_ = true;
+ // Make sure the worker thread is not blocked, so that the call to `join`
+ // doesn't hang.
+ UnblockQueue();
+ worker_thread_.join();
+}
+
+void ExecutorStd::Execute(Operation&& operation) {
+ PushOnSchedule(std::move(operation), Immediate());
+}
+
+DelayedOperation ExecutorStd::Schedule(const Milliseconds delay,
+ TaggedOperation&& tagged) {
+ // While negative delay can be interpreted as a request for immediate
+ // execution, supporting it would provide a hacky way to modify FIFO ordering
+ // of immediate operations.
+ FIREBASE_ASSERT_MESSAGE(delay.count() >= 0,
+ "Schedule: delay cannot be negative");
+
+ namespace chr = std::chrono;
+ const auto now = chr::time_point_cast<Milliseconds>(chr::system_clock::now());
+ const auto id =
+ PushOnSchedule(std::move(tagged.operation), now + delay, tagged.tag);
+
+ return DelayedOperation{[this, id] { TryCancel(id); }};
+}
+
+void ExecutorStd::TryCancel(const Id operation_id) {
+ schedule_.RemoveIf(
+ [operation_id](const Entry& e) { return e.id == operation_id; });
+}
+
+ExecutorStd::Id ExecutorStd::PushOnSchedule(Operation&& operation,
+ const TimePoint when,
+ const Tag tag) {
+ // Note: operations scheduled for immediate execution don't actually need an
+ // id. This could be tweaked to reuse the same id for all such operations.
+ const auto id = NextId();
+ schedule_.Push(Entry{std::move(operation), id, tag}, when);
+ return id;
+}
+
+void ExecutorStd::PollingThread() {
+ while (!shutting_down_) {
+ Entry entry = schedule_.PopBlocking();
+ if (entry.tagged.operation) {
+ entry.tagged.operation();
+ }
+ }
+}
+
+void ExecutorStd::UnblockQueue() {
+ // Put a no-op for immediate execution on the queue to ensure that
+ // `schedule_.PopBlocking` returns, and worker thread can notice that shutdown
+ // is in progress.
+ schedule_.Push(Entry{[] {}, /*id=*/0}, Immediate());
+}
+
+ExecutorStd::Id ExecutorStd::NextId() {
+ // The wrap around after ~4 billion operations is explicitly ignored. Even if
+ // an instance of `ExecutorStd` runs long enough to get `current_id_` to
+ // overflow, it's extremely unlikely that any object still holds a reference
+ // that is old enough to cause a conflict.
+ return current_id_++;
+}
+
+bool ExecutorStd::IsCurrentExecutor() const {
+ return std::this_thread::get_id() == worker_thread_.get_id();
+}
+
+std::string ExecutorStd::CurrentExecutorName() const {
+ return ThreadIdToString(std::this_thread::get_id());
+}
+
+std::string ExecutorStd::Name() const {
+ return ThreadIdToString(worker_thread_.get_id());
+}
+
+void ExecutorStd::ExecuteBlocking(Operation&& operation) {
+ std::promise<void> signal_finished;
+ Execute([&] {
+ operation();
+ signal_finished.set_value();
+ });
+ signal_finished.get_future().wait();
+}
+
+bool ExecutorStd::IsScheduled(const Tag tag) const {
+ return schedule_.Contains(
+ [&tag](const Entry& e) { return e.tagged.tag == tag; });
+}
+
+absl::optional<Executor::TaggedOperation> ExecutorStd::PopFromSchedule() {
+ auto removed =
+ schedule_.RemoveIf([](const Entry& e) { return !e.IsImmediate(); });
+ if (!removed.has_value()) {
+ return {};
+ }
+ return {std::move(removed.value().tagged)};
+}
+
+} // namespace internal
+} // namespace util
+} // namespace firestore
+} // namespace firebase
diff --git a/Firestore/core/src/firebase/firestore/util/executor_std.h b/Firestore/core/src/firebase/firestore/util/executor_std.h
new file mode 100644
index 0000000..4ac62e1
--- /dev/null
+++ b/Firestore/core/src/firebase/firestore/util/executor_std.h
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2018 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_STD_H_
+#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_STD_H_
+
+#include <algorithm>
+#include <atomic>
+#include <condition_variable> // NOLINT(build/c++11)
+#include <deque>
+#include <mutex> // NOLINT(build/c++11)
+#include <string>
+#include <thread> // NOLINT(build/c++11)
+#include <utility>
+
+#include "Firestore/core/src/firebase/firestore/util/executor.h"
+#include "Firestore/core/src/firebase/firestore/util/firebase_assert.h"
+#include "absl/types/optional.h"
+
+namespace firebase {
+namespace firestore {
+namespace util {
+
+namespace async {
+
+// A thread-safe class similar to a priority queue where the entries are
+// prioritized by the time for which they're scheduled. Entries scheduled for
+// the exact same time are prioritized in FIFO order.
+//
+// The main function of `Schedule` is `PopBlocking`, which sleeps until an entry
+// becomes available. It correctly handles entries being asynchonously added or
+// removed from the schedule.
+//
+// The details of time management are completely concealed within the class.
+// Once an entry is scheduled, there is no way to reschedule or even retrieve
+// the time.
+template <typename T>
+class Schedule {
+ // Internal invariants:
+ // - entries are always in sorted order, leftmost entry is always the most
+ // due;
+ // - each operation modifying the queue notifies the condition variable `cv_`.
+ public:
+ using Duration = std::chrono::milliseconds;
+ using Clock = std::chrono::system_clock;
+ // Entries are scheduled using absolute time.
+ using TimePoint = std::chrono::time_point<Clock, Duration>;
+
+ // Schedules an entry for the specified time due. `due` may be in the past.
+ void Push(const T& value, const TimePoint due) {
+ InsertPreservingOrder(Entry{value, due});
+ }
+ void Push(T&& value, const TimePoint due) {
+ InsertPreservingOrder(Entry{std::move(value), due});
+ }
+
+ // If the queue contains at least one entry for which the scheduled time is
+ // due now (according to the system clock), removes the entry which is the
+ // most overdue from the queue and returns it. If no entry is due, returns an
+ // empty `optional`.
+ absl::optional<T> PopIfDue() {
+ std::lock_guard<std::mutex> lock{mutex_};
+
+ if (HasDueLocked()) {
+ return ExtractLocked(scheduled_.begin());
+ }
+ return {};
+ }
+
+ // Blocks until at least one entry is available for which the scheduled time
+ // is due now (according to the system clock), removes the entry which is the
+ // most overdue from the queue and returns it. The function will
+ // attempt to minimize both the waiting time and busy waiting.
+ T PopBlocking() {
+ std::unique_lock<std::mutex> lock{mutex_};
+
+ while (true) {
+ cv_.wait(lock, [this] { return !scheduled_.empty(); });
+
+ // To minimize busy waiting, sleep until either the nearest entry in the
+ // future either changes, or else becomes due.
+ const auto until = scheduled_.front().due;
+ cv_.wait_until(lock, until,
+ [this, until] { return scheduled_.front().due != until; });
+ // There are 3 possibilities why `wait_until` has returned:
+ // - `wait_until` has timed out, in which case the current time is at
+ // least `until`, so there must be an overdue entry;
+ // - a new entry has been added which comes before `until`. It must be
+ // either overdue (in which case `HasDueLocked` will break the cycle),
+ // or else `until` must be reevaluated (on the next iteration of the
+ // loop);
+ // - `until` entry has been removed. This means `until` has to be
+ // reevaluated, similar to #2.
+
+ if (HasDueLocked()) {
+ return ExtractLocked(scheduled_.begin());
+ }
+ }
+ }
+
+ bool empty() const {
+ std::lock_guard<std::mutex> lock{mutex_};
+ return scheduled_.empty();
+ }
+
+ size_t size() const {
+ std::lock_guard<std::mutex> lock{mutex_};
+ return scheduled_.size();
+ }
+
+ // Removes the first entry satisfying predicate from the queue and returns it.
+ // If no such entry exists, returns an empty `optional`. Predicate is applied
+ // to entries in order according to their scheduled time.
+ //
+ // Note that this function doesn't take into account whether the removed entry
+ // is past its due time.
+ template <typename Pred>
+ absl::optional<T> RemoveIf(const Pred pred) {
+ std::lock_guard<std::mutex> lock{mutex_};
+
+ for (auto iter = scheduled_.begin(), end = scheduled_.end(); iter != end;
+ ++iter) {
+ if (pred(iter->value)) {
+ return ExtractLocked(iter);
+ }
+ }
+ return {};
+ }
+
+ // Checks whether the queue contains an entry satisfying the given predicate.
+ template <typename Pred>
+ bool Contains(const Pred pred) const {
+ std::lock_guard<std::mutex> lock{mutex_};
+ return std::any_of(scheduled_.begin(), scheduled_.end(),
+ [&pred](const Entry& s) { return pred(s.value); });
+ }
+
+ private:
+ struct Entry {
+ bool operator<(const Entry& rhs) const {
+ return due < rhs.due;
+ }
+
+ T value;
+ TimePoint due;
+ };
+ // All removals are on the front, but most insertions are expected to be on
+ // the back.
+ using Container = std::deque<Entry>;
+ using Iterator = typename Container::iterator;
+
+ void InsertPreservingOrder(Entry&& new_entry) {
+ std::lock_guard<std::mutex> lock{mutex_};
+
+ const auto insertion_point =
+ std::upper_bound(scheduled_.begin(), scheduled_.end(), new_entry);
+ scheduled_.insert(insertion_point, std::move(new_entry));
+
+ cv_.notify_one();
+ }
+
+ // This function expects the mutex to be already locked.
+ bool HasDueLocked() const {
+ namespace chr = std::chrono;
+ const auto now = chr::time_point_cast<Duration>(Clock::now());
+ return !scheduled_.empty() && now >= scheduled_.front().due;
+ }
+
+ // This function expects the mutex to be already locked.
+ T ExtractLocked(const Iterator where) {
+ FIREBASE_ASSERT_MESSAGE(!scheduled_.empty(),
+ "Trying to pop an entry from an empty queue.");
+
+ T result = std::move(where->value);
+ scheduled_.erase(where);
+ cv_.notify_one();
+
+ return result;
+ }
+
+ mutable std::mutex mutex_;
+ std::condition_variable cv_;
+ Container scheduled_;
+};
+
+} // namespace async
+
+namespace internal {
+
+// A serial queue that executes provided operations on a dedicated background
+// thread, using C++11 standard library functionality.
+class ExecutorStd : public Executor {
+ public:
+ ExecutorStd();
+ ~ExecutorStd();
+
+ void Execute(Operation&& operation) override;
+ void ExecuteBlocking(Operation&& operation) override;
+
+ DelayedOperation Schedule(Milliseconds delay,
+ TaggedOperation&& tagged) override;
+
+ bool IsCurrentExecutor() const override;
+ std::string CurrentExecutorName() const override;
+ std::string Name() const override;
+
+ bool IsScheduled(Tag tag) const override;
+ absl::optional<TaggedOperation> PopFromSchedule() override;
+
+ private:
+ using TimePoint = async::Schedule<Operation>::TimePoint;
+ // To allow canceling operations, each scheduled operation is assigned
+ // a monotonically increasing identifier.
+ using Id = unsigned int;
+
+ // If the operation hasn't yet been run, it will be removed from the queue.
+ // Otherwise, this function is a no-op.
+ void TryCancel(Id operation_id);
+
+ Id PushOnSchedule(Operation&& operation, TimePoint when, Tag tag = -1);
+
+ void PollingThread();
+ void UnblockQueue();
+ Id NextId();
+
+ // As a convention, assign the epoch time to all operations scheduled for
+ // immediate execution. Note that it means that an immediate operation is
+ // always scheduled before any delayed operation, even in the corner case when
+ // the immediate operation was scheduled after a delayed operation was due
+ // (but hasn't yet run).
+ static TimePoint Immediate() {
+ return TimePoint{};
+ }
+
+ struct Entry {
+ Entry() {
+ }
+ Entry(Operation&& operation,
+ const ExecutorStd::Id id,
+ const ExecutorStd::Tag tag = kNoTag)
+ : tagged{tag, std::move(operation)}, id{id} {
+ }
+
+ bool IsImmediate() const {
+ return tagged.tag == kNoTag;
+ }
+
+ static constexpr Tag kNoTag = -1;
+ TaggedOperation tagged;
+ Id id = 0;
+ };
+ // Operations scheduled for immediate execution are also put on the schedule
+ // (with due time set to `Immediate`).
+ async::Schedule<Entry> schedule_;
+
+ std::thread worker_thread_;
+ // Used to stop the worker thread.
+ std::atomic<bool> shutting_down_{false};
+
+ std::atomic<Id> current_id_{0};
+};
+
+} // namespace internal
+} // namespace util
+} // namespace firestore
+} // namespace firebase
+
+#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_STD_H_