From 38824e721ebaa3f50e443e1ea4b47d34030e4703 Mon Sep 17 00:00:00 2001 From: Konstantin Varlamov Date: Mon, 7 May 2018 12:22:57 -0400 Subject: C++ migration: add AsyncQueue, the C++ version of FSTDispatchQueue (#1176) AsyncQueue is a queue that executes given operations asynchronously, enforcing that only a single operation is executing at any given time, and that in-progress operations don't spawn more operations. The actual execution is delegated to a platform-specific executor. Executor is an interface for a FIFO queue that executes given operations serially. Two implementations of Executor, one using libdispatch and the other using C++11 standard library, are provided. AsyncQueue is not used anywhere in the code base at this point. --- .../src/firebase/firestore/util/CMakeLists.txt | 58 ++++ .../src/firebase/firestore/util/async_queue.cc | 140 ++++++++++ .../core/src/firebase/firestore/util/async_queue.h | 164 ++++++++++++ .../core/src/firebase/firestore/util/executor.h | 129 +++++++++ .../firestore/util/executor_libdispatch.cc | 296 +++++++++++++++++++++ .../firebase/firestore/util/executor_libdispatch.h | 92 +++++++ .../src/firebase/firestore/util/executor_std.cc | 155 +++++++++++ .../src/firebase/firestore/util/executor_std.h | 281 +++++++++++++++++++ 8 files changed, 1315 insertions(+) create mode 100644 Firestore/core/src/firebase/firestore/util/async_queue.cc create mode 100644 Firestore/core/src/firebase/firestore/util/async_queue.h create mode 100644 Firestore/core/src/firebase/firestore/util/executor.h create mode 100644 Firestore/core/src/firebase/firestore/util/executor_libdispatch.cc create mode 100644 Firestore/core/src/firebase/firestore/util/executor_libdispatch.h create mode 100644 Firestore/core/src/firebase/firestore/util/executor_std.cc create mode 100644 Firestore/core/src/firebase/firestore/util/executor_std.h (limited to 'Firestore/core/src') diff --git a/Firestore/core/src/firebase/firestore/util/CMakeLists.txt b/Firestore/core/src/firebase/firestore/util/CMakeLists.txt index 95cd72f..29d91c7 100644 --- a/Firestore/core/src/firebase/firestore/util/CMakeLists.txt +++ b/Firestore/core/src/firebase/firestore/util/CMakeLists.txt @@ -109,6 +109,63 @@ else() endif() +## async queue + +check_symbol_exists(dispatch_async_f dispatch/dispatch.h HAVE_LIBDISPATCH) + +cc_library( + firebase_firestore_util_executor_std + SOURCES + executor_std.cc + executor_std.h + executor.h + DEPENDS + absl_bad_optional_access + absl_optional + ${FIREBASE_FIRESTORE_UTIL_LOG} + EXCLUDE_FROM_ALL +) + +if(HAVE_LIBDISPATCH) +cc_library( + firebase_firestore_util_executor_libdispatch + SOURCES + executor_libdispatch.cc + executor_libdispatch.h + executor.h + DEPENDS + absl_bad_optional_access + absl_optional + absl_strings + ${FIREBASE_FIRESTORE_UTIL_LOG} + EXCLUDE_FROM_ALL +) +endif() + +if(HAVE_LIBDISPATCH) + set( + FIREBASE_FIRESTORE_UTIL_EXECUTOR + firebase_firestore_util_executor_libdispatch + ) + +else() + set( + FIREBASE_FIRESTORE_UTIL_EXECUTOR + firebase_firestore_util_executor_std + ) + +endif() + +cc_library( + firebase_firestore_util_async_queue + SOURCES + async_queue.cc + async_queue.h + DEPENDS + ${FIREBASE_FIRESTORE_UTIL_EXECUTOR} + ${FIREBASE_FIRESTORE_UTIL_LOG} + EXCLUDE_FROM_ALL +) ## main library @@ -144,6 +201,7 @@ cc_library( DEPENDS absl_base firebase_firestore_util_base + firebase_firestore_util_async_queue ${FIREBASE_FIRESTORE_UTIL_LOG} ${FIREBASE_FIRESTORE_UTIL_RANDOM} ) diff --git a/Firestore/core/src/firebase/firestore/util/async_queue.cc b/Firestore/core/src/firebase/firestore/util/async_queue.cc new file mode 100644 index 0000000..71f5cc5 --- /dev/null +++ b/Firestore/core/src/firebase/firestore/util/async_queue.cc @@ -0,0 +1,140 @@ +/* + * Copyright 2018 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Firestore/core/src/firebase/firestore/util/async_queue.h" + +#include + +#include "Firestore/core/src/firebase/firestore/util/firebase_assert.h" +#include "absl/memory/memory.h" + +namespace firebase { +namespace firestore { +namespace util { + +using internal::Executor; + +AsyncQueue::AsyncQueue(std::unique_ptr executor) + : executor_{std::move(executor)} { + is_operation_in_progress_ = false; +} + +void AsyncQueue::VerifyIsCurrentExecutor() const { + FIREBASE_ASSERT_MESSAGE( + executor_->IsCurrentExecutor(), + "Expected to be called by the executor associated with this queue " + "(expected executor: '%s', actual executor: '%s')", + executor_->Name().c_str(), executor_->CurrentExecutorName().c_str()); +} + +void AsyncQueue::VerifyIsCurrentQueue() const { + VerifyIsCurrentExecutor(); + FIREBASE_ASSERT_MESSAGE( + is_operation_in_progress_, + "VerifyIsCurrentQueue called when no operation is executing " + "(expected executor: '%s', actual executor: '%s')", + executor_->Name().c_str(), executor_->CurrentExecutorName().c_str()); +} + +void AsyncQueue::ExecuteBlocking(const Operation& operation) { + VerifyIsCurrentExecutor(); + FIREBASE_ASSERT_MESSAGE(!is_operation_in_progress_, + "ExecuteBlocking may not be called " + "before the previous operation finishes executing"); + + is_operation_in_progress_ = true; + operation(); + is_operation_in_progress_ = false; +} + +void AsyncQueue::Enqueue(const Operation& operation) { + VerifySequentialOrder(); + EnqueueRelaxed(operation); +} + +void AsyncQueue::EnqueueRelaxed(const Operation& operation) { + executor_->Execute(Wrap(operation)); +} + +DelayedOperation AsyncQueue::EnqueueAfterDelay(const Milliseconds delay, + const TimerId timer_id, + const Operation& operation) { + VerifyIsCurrentExecutor(); + + // While not necessarily harmful, we currently don't expect to have multiple + // callbacks with the same timer_id in the queue, so defensively reject + // them. + FIREBASE_ASSERT_MESSAGE( + !IsScheduled(timer_id), + "Attempted to schedule multiple operations with id %d", timer_id); + + Executor::TaggedOperation tagged{static_cast(timer_id), Wrap(operation)}; + return executor_->Schedule(delay, std::move(tagged)); +} + +AsyncQueue::Operation AsyncQueue::Wrap(const Operation& operation) { + // Decorator pattern: wrap `operation` into a call to `ExecuteBlocking` to + // ensure that it doesn't spawn any nested operations. + + // Note: can't move `operation` into lambda until C++14. + return [this, operation] { ExecuteBlocking(operation); }; +} + +void AsyncQueue::VerifySequentialOrder() const { + // This is the inverse of `VerifyIsCurrentQueue`. + FIREBASE_ASSERT_MESSAGE( + !is_operation_in_progress_ || !executor_->IsCurrentExecutor(), + "Enforcing sequential order failed: currently executing operations " + "cannot enqueue more operations " + "(this queue's executor: '%s', current executor: '%s')", + executor_->Name().c_str(), executor_->CurrentExecutorName().c_str()); +} + +// Test-only functions + +void AsyncQueue::EnqueueBlocking(const Operation& operation) { + VerifySequentialOrder(); + executor_->ExecuteBlocking(Wrap(operation)); +} + +bool AsyncQueue::IsScheduled(const TimerId timer_id) const { + return executor_->IsScheduled(static_cast(timer_id)); +} + +void AsyncQueue::RunScheduledOperationsUntil(const TimerId last_timer_id) { + FIREBASE_ASSERT_MESSAGE( + !executor_->IsCurrentExecutor(), + "RunScheduledOperationsUntil must not be called on the queue"); + + executor_->ExecuteBlocking([this, last_timer_id] { + FIREBASE_ASSERT_MESSAGE( + last_timer_id == TimerId::All || IsScheduled(last_timer_id), + "Attempted to run scheduled operations until missing timer id: %d", + last_timer_id); + + for (auto next = executor_->PopFromSchedule(); next.has_value(); + next = executor_->PopFromSchedule()) { + next->operation(); + if (next->tag == static_cast(last_timer_id)) { + break; + } + } + }); +} + +} // namespace util +} // namespace firestore +} // namespace firebase diff --git a/Firestore/core/src/firebase/firestore/util/async_queue.h b/Firestore/core/src/firebase/firestore/util/async_queue.h new file mode 100644 index 0000000..e2df387 --- /dev/null +++ b/Firestore/core/src/firebase/firestore/util/async_queue.h @@ -0,0 +1,164 @@ +/* + * Copyright 2018 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_ASYNC_QUEUE_H_ +#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_ASYNC_QUEUE_H_ + +#include +#include // NOLINT(build/c++11) +#include +#include + +#include "Firestore/core/src/firebase/firestore/util/executor.h" + +namespace firebase { +namespace firestore { +namespace util { + +/** + * Well-known "timer" ids used when scheduling delayed operations on the + * AsyncQueue. These ids can then be used from tests to check for the + * presence of delayed operations or to run them early. + */ +enum class TimerId { + /** All can be used with `RunDelayedOperationsUntil` to run all timers. */ + All, + + /** + * The following 4 timers are used in `Stream` for the listen and write + * streams. The "Idle" timer is used to close the stream due to inactivity. + * The "ConnectionBackoff" timer is used to restart a stream once the + * appropriate backoff delay has elapsed. + */ + ListenStreamIdle, + ListenStreamConnectionBackoff, + WriteStreamIdle, + WriteStreamConnectionBackoff, + + /** + * A timer used in `OnlineStateTracker` to transition from + * `OnlineStateUnknown` to `Offline` after a set timeout, rather than waiting + * indefinitely for success or failure. + */ + OnlineStateTimeout, +}; + +// A serial queue that executes given operations asynchronously, one at a time. +// Operations may be scheduled to be executed as soon as possible or in the +// future. Operations scheduled for the same time are FIFO-ordered. +// +// `AsyncQueue` wraps a platform-specific executor, adding checks that enforce +// sequential ordering of operations: an enqueued operation, while being run, +// normally cannot enqueue other operations for immediate execution (but see +// `EnqueueRelaxed`). +// +// `AsyncQueue` methods have particular expectations about whether they must be +// invoked on the queue or not; check "preconditions" section in comments on +// each method. +// +// A significant portion of `AsyncQueue` interface only exists for test purposes +// and must *not* be used in regular code. +class AsyncQueue { + public: + using Operation = internal::Executor::Operation; + using Milliseconds = internal::Executor::Milliseconds; + + explicit AsyncQueue(std::unique_ptr executor); + + // Asserts for the caller that it is being invoked as part of an operation on + // the `AsyncQueue`. + void VerifyIsCurrentQueue() const; + + // Enqueue methods + + // Puts the `operation` on the queue to be executed as soon as possible, while + // maintaining FIFO order. + // + // Precondition: `Enqueue` calls cannot be nested; that is, `Enqueue` may not + // be called by a previously enqueued operation when it is run (as a special + // case, destructors invoked when an enqueued operation has run and is being + // destroyed may invoke `Enqueue`). + void Enqueue(const Operation& operation); + + // Like `Enqueue`, but without applying any prerequisite checks. + void EnqueueRelaxed(const Operation& operation); + + // Puts the `operation` on the queue to be executed `delay` milliseconds from + // now, and returns a handle that allows to cancel the operation (provided it + // hasn't run already). + // + // `operation` is tagged by a `timer_id` which allows to identify the caller. + // Only one operation tagged with any given `timer_id` may be on the queue at + // any time; an attempt to put another such operation will result in an + // assertion failure. In tests, these tags also allow to check for presence of + // certain operations and to run certain operations in advance. + // + // Precondition: `EnqueueAfterDelay` is being invoked asynchronously on the + // queue. + DelayedOperation EnqueueAfterDelay(Milliseconds delay, + TimerId timer_id, + const Operation& operation); + + // Direct execution + + // Immediately executes the `operation` on the queue. + // + // This is largely a workaround to allow other classes (GRPC) to directly + // access the underlying dispatch queue without getting `AsyncQueue` into an + // inconsistent state. + // + // Precondition: no other operation is being executed on the queue at the + // moment of the call (i.e., `ExecuteBlocking` cannot call `ExecuteBlocking`). + // + // Precondition: `ExecuteBlocking` is being invoked asynchronously on the + // queue. + void ExecuteBlocking(const Operation& operation); + + // Test-only interface follows + // TODO(varconst): move the test-only interface into a helper object that is + // a friend of AsyncQueue and delegates its public methods to private methods + // on AsyncQueue. + + // Like `Enqueue`, but blocks until the `operation` is complete. + void EnqueueBlocking(const Operation& operation); + + // Checks whether an operation tagged with `timer_id` is currently scheduled + // for execution in the future. + bool IsScheduled(TimerId timer_id) const; + + // Force runs operations scheduled for future execution, in scheduled order, + // up to *and including* the operation tagged with `last_timer_id`. + // + // Precondition: `RunScheduledOperationsUntil` is *not* being invoked on the + // queue. + void RunScheduledOperationsUntil(TimerId last_timer_id); + + private: + Operation Wrap(const Operation& operation); + + // Asserts that the current invocation happens asynchronously on the queue. + void VerifyIsCurrentExecutor() const; + void VerifySequentialOrder() const; + + std::atomic is_operation_in_progress_; + std::unique_ptr executor_; +}; + +} // namespace util +} // namespace firestore +} // namespace firebase + +#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_ASYNC_QUEUE_H_ diff --git a/Firestore/core/src/firebase/firestore/util/executor.h b/Firestore/core/src/firebase/firestore/util/executor.h new file mode 100644 index 0000000..df8b0b5 --- /dev/null +++ b/Firestore/core/src/firebase/firestore/util/executor.h @@ -0,0 +1,129 @@ +/* + * Copyright 2018 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_H_ +#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_H_ + +#include // NOLINT(build/c++11) +#include +#include +#include + +#include "absl/types/optional.h" + +namespace firebase { +namespace firestore { +namespace util { + +// A handle to an operation scheduled for future execution. The handle may +// outlive the operation, but it *cannot* outlive the executor that created it. +class DelayedOperation { + public: + DelayedOperation() { + } + + // If the operation has not been run yet, cancels the operation. Otherwise, + // this function is a no-op. + void Cancel() { + cancel_func_(); + } + + // Internal use only. + explicit DelayedOperation(std::function&& cancel_func) + : cancel_func_{std::move(cancel_func)} { + } + + private: + std::function cancel_func_; +}; + +namespace internal { + +// An interface to a platform-specific executor of asynchronous operations +// (called tasks on other platforms). +// +// Operations may be scheduled for immediate or delayed execution. Operations +// delayed until the exact same time are scheduled in FIFO order. +// +// The operations are executed sequentially; only a single operation is executed +// at any given time. +// +// Delayed operations may be canceled if they have not already been run. +class Executor { + public: + using Tag = int; + using Operation = std::function; + using Milliseconds = std::chrono::milliseconds; + + // Operations scheduled for future execution have an opaque tag. The value of + // the tag is ignored by the executor but can be used to find operations with + // a given tag after they are scheduled. + struct TaggedOperation { + TaggedOperation() { + } + TaggedOperation(const Tag tag, Operation&& operation) + : tag{tag}, operation{std::move(operation)} { + } + Tag tag = 0; + Operation operation; + }; + + virtual ~Executor() { + } + + // Schedules the `operation` to be asynchronously executed as soon as + // possible, in FIFO order. + virtual void Execute(Operation&& operation) = 0; + // Like `Execute`, but blocks until the `operation` finishes, consequently + // draining immediate operations from the executor. + virtual void ExecuteBlocking(Operation&& operation) = 0; + // Scheduled the given `operation` to be executed after `delay` milliseconds + // from now, and returns a handle that allows to cancel the operation + // (provided it hasn't been run already). The operation is tagged to allow + // retrieving it later. + // + // `delay` must be non-negative; use `Execute` to schedule operations for + // immediate execution. + virtual DelayedOperation Schedule(Milliseconds delay, + TaggedOperation&& operation) = 0; + + // Checks for the caller whether it is being invoked by this executor. + virtual bool IsCurrentExecutor() const = 0; + // Returns some sort of an identifier for the current execution context. The + // only guarantee is that it will return different values depending on whether + // this function is invoked by this executor or not. + virtual std::string CurrentExecutorName() const = 0; + // Like `CurrentExecutorName`, but returns an identifier for this executor, + // whether the caller code currently runs on this executor or not. + virtual std::string Name() const = 0; + + // Checks whether an operation tagged with the given `tag` is currently + // scheduled for future execution. + virtual bool IsScheduled(Tag tag) const = 0; + // Removes the nearest due scheduled operation from the schedule and returns + // it to the caller. This function may be used to reschedule operations. + // Immediate operations don't count; only operations scheduled for delayed + // execution may be removed. If no such operations are currently scheduled, an + // empty `optional` is returned. + virtual absl::optional PopFromSchedule() = 0; +}; + +} // namespace internal +} // namespace util +} // namespace firestore +} // namespace firebase + +#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_H_ diff --git a/Firestore/core/src/firebase/firestore/util/executor_libdispatch.cc b/Firestore/core/src/firebase/firestore/util/executor_libdispatch.cc new file mode 100644 index 0000000..b40f0dd --- /dev/null +++ b/Firestore/core/src/firebase/firestore/util/executor_libdispatch.cc @@ -0,0 +1,296 @@ +/* + * Copyright 2018 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Firestore/core/src/firebase/firestore/util/executor_libdispatch.h" + +namespace firebase { +namespace firestore { +namespace util { +namespace internal { + +namespace { + +absl::string_view StringViewFromDispatchLabel(const char* const label) { + // Make sure string_view's data is not null, because it's used for logging. + return label ? absl::string_view{label} : absl::string_view{""}; +} + +void RunSynchronized(const ExecutorLibdispatch* const executor, + std::function&& work) { + if (executor->IsCurrentExecutor()) { + work(); + } else { + DispatchSync(executor->dispatch_queue(), std::move(work)); + } +} + +} // namespace + +void DispatchAsync(const dispatch_queue_t queue, std::function&& work) { + // Dynamically allocate the function to make sure the object is valid by the + // time libdispatch gets to it. + const auto wrap = new std::function{std::move(work)}; + + dispatch_async_f(queue, wrap, [](void* const raw_work) { + const auto unwrap = static_cast*>(raw_work); + (*unwrap)(); + delete unwrap; + }); +} + +void DispatchSync(const dispatch_queue_t queue, Executor::Operation work) { + // Unlike dispatch_async_f, dispatch_sync_f blocks until the work passed to it + // is done, so passing a reference to a local variable is okay. + dispatch_sync_f(queue, &work, [](void* const raw_work) { + const auto unwrap = static_cast(raw_work); + (*unwrap)(); + }); +} + +// Represents a "busy" time slot on the schedule. +// +// Since libdispatch doesn't provide a way to cancel a scheduled operation, once +// a slot is created, it will always stay in the schedule until the time is +// past. Consequently, it is more useful to think of a time slot than +// a particular scheduled operation -- by the time the slot comes, operation may +// or may not be there (imagine getting to a meeting and finding out it's been +// canceled). +// +// Precondition: all member functions, including the constructor, are *only* +// invoked on the Firestore queue. +// +// Ownership: +// +// - `TimeSlot` is exclusively owned by libdispatch; +// - `ExecutorLibdispatch` contains non-owning pointers to `TimeSlot`s; +// - invariant: if the executor contains a pointer to a `TimeSlot`, it is +// a valid object. It is achieved because when libdispatch invokes +// a `TimeSlot`, it always removes it from the executor before deleting it. +// The reverse is not true: a canceled time slot is removed from the executor, +// but won't be destroyed until its original due time is past. + +class TimeSlot { + public: + TimeSlot(ExecutorLibdispatch* executor, + Executor::Milliseconds delay, + Executor::TaggedOperation&& operation); + + // Returns the operation that was scheduled for this time slot and turns the + // slot into a no-op. + Executor::TaggedOperation Unschedule(); + + bool operator<(const TimeSlot& rhs) const { + return target_time_ < rhs.target_time_; + } + bool operator==(const Executor::Tag tag) const { + return tagged_.tag == tag; + } + + void MarkDone() { + done_ = true; + } + + static void InvokedByLibdispatch(void* const raw_self); + + private: + void Execute(); + void RemoveFromSchedule(); + + using TimePoint = std::chrono::time_point; + + ExecutorLibdispatch* const executor_; + const TimePoint target_time_; // Used for sorting + Executor::TaggedOperation tagged_; + + // True if the operation has either been run or canceled. + // + // Note on thread-safety: because the precondition is that all member + // functions of this class are executed on the dispatch queue, no + // synchronization is required for `done_`. + bool done_ = false; +}; + +TimeSlot::TimeSlot(ExecutorLibdispatch* const executor, + const Executor::Milliseconds delay, + Executor::TaggedOperation&& operation) + : executor_{executor}, + target_time_{std::chrono::time_point_cast( + std::chrono::system_clock::now()) + + delay}, + tagged_{std::move(operation)} { +} + +Executor::TaggedOperation TimeSlot::Unschedule() { + if (!done_) { + RemoveFromSchedule(); + } + return std::move(tagged_); +} + +void TimeSlot::InvokedByLibdispatch(void* const raw_self) { + auto const self = static_cast(raw_self); + self->Execute(); + delete self; +} + +void TimeSlot::Execute() { + if (done_) { + // `done_` might mean that the executor is already destroyed, so don't call + // `RemoveFromSchedule`. + return; + } + + RemoveFromSchedule(); + + FIREBASE_ASSERT_MESSAGE(tagged_.operation, + "TimeSlot contains an invalid function object"); + tagged_.operation(); +} + +void TimeSlot::RemoveFromSchedule() { + executor_->RemoveFromSchedule(this); +} + +// ExecutorLibdispatch + +ExecutorLibdispatch::ExecutorLibdispatch(const dispatch_queue_t dispatch_queue) + : dispatch_queue_{dispatch_queue} { +} +ExecutorLibdispatch::ExecutorLibdispatch() + : ExecutorLibdispatch{dispatch_queue_create("com.google.firebase.firestore", + DISPATCH_QUEUE_SERIAL)} { +} + +ExecutorLibdispatch::~ExecutorLibdispatch() { + // Turn any operations that might still be in the queue into no-ops, lest + // they try to access `ExecutorLibdispatch` after it gets destroyed. Because + // the queue is serial, by the time libdispatch gets to the newly-enqueued + // work, the pending operations that might have been in progress would have + // already finished. + RunSynchronized(this, [this] { + for (auto slot : schedule_) { + slot->MarkDone(); + } + }); +} + +bool ExecutorLibdispatch::IsCurrentExecutor() const { + return GetCurrentQueueLabel().data() == GetTargetQueueLabel().data(); +} +std::string ExecutorLibdispatch::CurrentExecutorName() const { + return GetCurrentQueueLabel().data(); +} +std::string ExecutorLibdispatch::Name() const { + return GetTargetQueueLabel().data(); +} + +void ExecutorLibdispatch::Execute(Operation&& operation) { + DispatchAsync(dispatch_queue(), std::move(operation)); +} +void ExecutorLibdispatch::ExecuteBlocking(Operation&& operation) { + DispatchSync(dispatch_queue(), std::move(operation)); +} + +DelayedOperation ExecutorLibdispatch::Schedule(const Milliseconds delay, + TaggedOperation&& operation) { + namespace chr = std::chrono; + const dispatch_time_t delay_ns = dispatch_time( + DISPATCH_TIME_NOW, chr::duration_cast(delay).count()); + + // Ownership is fully transferred to libdispatch -- because it's impossible + // to truly cancel work after it's been dispatched, libdispatch is + // guaranteed to outlive the executor, and it's possible for work to be + // invoked by libdispatch after the executor is destroyed. Executor only + // stores an observer pointer to the operation. + + auto const time_slot = new TimeSlot{this, delay, std::move(operation)}; + dispatch_after_f(delay_ns, dispatch_queue(), time_slot, + TimeSlot::InvokedByLibdispatch); + RunSynchronized(this, [this, time_slot] { schedule_.push_back(time_slot); }); + return DelayedOperation{[this, time_slot] { + // `time_slot` might be destroyed by the time cancellation function runs. + // Therefore, don't access any methods on `time_slot`, only use it as + // a handle to remove from `schedule_`. + RemoveFromSchedule(time_slot); + }}; +} + +void ExecutorLibdispatch::RemoveFromSchedule(const TimeSlot* const to_remove) { + RunSynchronized(this, [this, to_remove] { + const auto found = std::find_if( + schedule_.begin(), schedule_.end(), + [to_remove](const TimeSlot* op) { return op == to_remove; }); + // It's possible for the operation to be missing if libdispatch gets to run + // it after it was force-run, for example. + if (found != schedule_.end()) { + (*found)->MarkDone(); + schedule_.erase(found); + } + }); +} + +// GetLabel functions are guaranteed to never return a "null" string_view +// (i.e. data() != nullptr). +absl::string_view ExecutorLibdispatch::GetCurrentQueueLabel() const { + // Note: dispatch_queue_get_label may return nullptr if the queue wasn't + // initialized with a label. + return StringViewFromDispatchLabel( + dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL)); +} + +absl::string_view ExecutorLibdispatch::GetTargetQueueLabel() const { + return StringViewFromDispatchLabel( + dispatch_queue_get_label(dispatch_queue())); +} + +// Test-only methods + +bool ExecutorLibdispatch::IsScheduled(const Tag tag) const { + bool result = false; + RunSynchronized(this, [this, tag, &result] { + result = std::find_if(schedule_.begin(), schedule_.end(), + [&tag](const TimeSlot* const operation) { + return *operation == tag; + }) != schedule_.end(); + }); + return result; +} + +absl::optional +ExecutorLibdispatch::PopFromSchedule() { + absl::optional result; + + RunSynchronized(this, [this, &result] { + if (schedule_.empty()) { + return; + } + // Sorting upon each call to `PopFromSchedule` is inefficient, which is + // consciously ignored because this function is only ever called from tests. + std::sort( + schedule_.begin(), schedule_.end(), + [](const TimeSlot* lhs, const TimeSlot* rhs) { return *lhs < *rhs; }); + const auto nearest = schedule_.begin(); + result = (*nearest)->Unschedule(); + }); + + return result; +} + +} // namespace internal +} // namespace util +} // namespace firestore +} // namespace firebase diff --git a/Firestore/core/src/firebase/firestore/util/executor_libdispatch.h b/Firestore/core/src/firebase/firestore/util/executor_libdispatch.h new file mode 100644 index 0000000..b32dbff --- /dev/null +++ b/Firestore/core/src/firebase/firestore/util/executor_libdispatch.h @@ -0,0 +1,92 @@ +/* + * Copyright 2018 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_LIBDISPATCH_H_ +#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_LIBDISPATCH_H_ + +#include +#include // NOLINT(build/c++11) +#include +#include +#include +#include +#include +#include "dispatch/dispatch.h" + +#include "Firestore/core/src/firebase/firestore/util/executor.h" +#include "Firestore/core/src/firebase/firestore/util/firebase_assert.h" +#include "absl/strings/string_view.h" + +namespace firebase { +namespace firestore { +namespace util { + +namespace internal { + +// Generic wrapper over `dispatch_async_f`, providing `dispatch_async`-like +// interface: accepts an arbitrary invocable object in place of an Objective-C +// block. +void DispatchAsync(const dispatch_queue_t queue, std::function&& work); + +// Similar to `DispatchAsync` but wraps `dispatch_sync_f`. +void DispatchSync(const dispatch_queue_t queue, std::function work); + +class TimeSlot; + +// A serial queue built on top of libdispatch. The operations are run on +// a dedicated serial dispatch queue. +class ExecutorLibdispatch : public Executor { + public: + ExecutorLibdispatch(); + explicit ExecutorLibdispatch(dispatch_queue_t dispatch_queue); + ~ExecutorLibdispatch(); + + bool IsCurrentExecutor() const override; + std::string CurrentExecutorName() const override; + std::string Name() const override; + + void Execute(Operation&& operation) override; + void ExecuteBlocking(Operation&& operation) override; + DelayedOperation Schedule(Milliseconds delay, + TaggedOperation&& operation) override; + + void RemoveFromSchedule(const TimeSlot* to_remove); + + bool IsScheduled(Tag tag) const override; + absl::optional PopFromSchedule() override; + + dispatch_queue_t dispatch_queue() const { + return dispatch_queue_; + } + + private: + // GetLabel functions are guaranteed to never return a "null" string_view + // (i.e. data() != nullptr). + absl::string_view GetCurrentQueueLabel() const; + absl::string_view GetTargetQueueLabel() const; + + std::atomic dispatch_queue_; + // Stores non-owned pointers to `TimeSlot`s. + // Invariant: if a `TimeSlot` is in `schedule_`, it's a valid pointer. + std::vector schedule_; +}; + +} // namespace internal +} // namespace util +} // namespace firestore +} // namespace firebase + +#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_LIBDISPATCH_H_ diff --git a/Firestore/core/src/firebase/firestore/util/executor_std.cc b/Firestore/core/src/firebase/firestore/util/executor_std.cc new file mode 100644 index 0000000..59197e1 --- /dev/null +++ b/Firestore/core/src/firebase/firestore/util/executor_std.cc @@ -0,0 +1,155 @@ +/* + * Copyright 2018 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Firestore/core/src/firebase/firestore/util/executor_std.h" + +#include // NOLINT(build/c++11) +#include + +namespace firebase { +namespace firestore { +namespace util { +namespace internal { + +namespace { + +// The only guarantee is that different `thread_id`s will produce different +// values. +std::string ThreadIdToString(const std::thread::id thread_id) { + std::ostringstream stream; + stream << thread_id; + return stream.str(); +} + +} // namespace + +ExecutorStd::ExecutorStd() { + // Somewhat counter-intuitively, constructor of `std::atomic` assigns the + // value non-atomically, so the atomic initialization must be provided here, + // before the worker thread is started. + // See [this thread](https://stackoverflow.com/questions/25609858) for context + // on the constructor. + current_id_ = 0; + shutting_down_ = false; + worker_thread_ = std::thread{&ExecutorStd::PollingThread, this}; +} + +ExecutorStd::~ExecutorStd() { + shutting_down_ = true; + // Make sure the worker thread is not blocked, so that the call to `join` + // doesn't hang. + UnblockQueue(); + worker_thread_.join(); +} + +void ExecutorStd::Execute(Operation&& operation) { + PushOnSchedule(std::move(operation), Immediate()); +} + +DelayedOperation ExecutorStd::Schedule(const Milliseconds delay, + TaggedOperation&& tagged) { + // While negative delay can be interpreted as a request for immediate + // execution, supporting it would provide a hacky way to modify FIFO ordering + // of immediate operations. + FIREBASE_ASSERT_MESSAGE(delay.count() >= 0, + "Schedule: delay cannot be negative"); + + namespace chr = std::chrono; + const auto now = chr::time_point_cast(chr::system_clock::now()); + const auto id = + PushOnSchedule(std::move(tagged.operation), now + delay, tagged.tag); + + return DelayedOperation{[this, id] { TryCancel(id); }}; +} + +void ExecutorStd::TryCancel(const Id operation_id) { + schedule_.RemoveIf( + [operation_id](const Entry& e) { return e.id == operation_id; }); +} + +ExecutorStd::Id ExecutorStd::PushOnSchedule(Operation&& operation, + const TimePoint when, + const Tag tag) { + // Note: operations scheduled for immediate execution don't actually need an + // id. This could be tweaked to reuse the same id for all such operations. + const auto id = NextId(); + schedule_.Push(Entry{std::move(operation), id, tag}, when); + return id; +} + +void ExecutorStd::PollingThread() { + while (!shutting_down_) { + Entry entry = schedule_.PopBlocking(); + if (entry.tagged.operation) { + entry.tagged.operation(); + } + } +} + +void ExecutorStd::UnblockQueue() { + // Put a no-op for immediate execution on the queue to ensure that + // `schedule_.PopBlocking` returns, and worker thread can notice that shutdown + // is in progress. + schedule_.Push(Entry{[] {}, /*id=*/0}, Immediate()); +} + +ExecutorStd::Id ExecutorStd::NextId() { + // The wrap around after ~4 billion operations is explicitly ignored. Even if + // an instance of `ExecutorStd` runs long enough to get `current_id_` to + // overflow, it's extremely unlikely that any object still holds a reference + // that is old enough to cause a conflict. + return current_id_++; +} + +bool ExecutorStd::IsCurrentExecutor() const { + return std::this_thread::get_id() == worker_thread_.get_id(); +} + +std::string ExecutorStd::CurrentExecutorName() const { + return ThreadIdToString(std::this_thread::get_id()); +} + +std::string ExecutorStd::Name() const { + return ThreadIdToString(worker_thread_.get_id()); +} + +void ExecutorStd::ExecuteBlocking(Operation&& operation) { + std::promise signal_finished; + Execute([&] { + operation(); + signal_finished.set_value(); + }); + signal_finished.get_future().wait(); +} + +bool ExecutorStd::IsScheduled(const Tag tag) const { + return schedule_.Contains( + [&tag](const Entry& e) { return e.tagged.tag == tag; }); +} + +absl::optional ExecutorStd::PopFromSchedule() { + auto removed = + schedule_.RemoveIf([](const Entry& e) { return !e.IsImmediate(); }); + if (!removed.has_value()) { + return {}; + } + return {std::move(removed.value().tagged)}; +} + +} // namespace internal +} // namespace util +} // namespace firestore +} // namespace firebase diff --git a/Firestore/core/src/firebase/firestore/util/executor_std.h b/Firestore/core/src/firebase/firestore/util/executor_std.h new file mode 100644 index 0000000..4ac62e1 --- /dev/null +++ b/Firestore/core/src/firebase/firestore/util/executor_std.h @@ -0,0 +1,281 @@ +/* + * Copyright 2018 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_STD_H_ +#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_STD_H_ + +#include +#include +#include // NOLINT(build/c++11) +#include +#include // NOLINT(build/c++11) +#include +#include // NOLINT(build/c++11) +#include + +#include "Firestore/core/src/firebase/firestore/util/executor.h" +#include "Firestore/core/src/firebase/firestore/util/firebase_assert.h" +#include "absl/types/optional.h" + +namespace firebase { +namespace firestore { +namespace util { + +namespace async { + +// A thread-safe class similar to a priority queue where the entries are +// prioritized by the time for which they're scheduled. Entries scheduled for +// the exact same time are prioritized in FIFO order. +// +// The main function of `Schedule` is `PopBlocking`, which sleeps until an entry +// becomes available. It correctly handles entries being asynchonously added or +// removed from the schedule. +// +// The details of time management are completely concealed within the class. +// Once an entry is scheduled, there is no way to reschedule or even retrieve +// the time. +template +class Schedule { + // Internal invariants: + // - entries are always in sorted order, leftmost entry is always the most + // due; + // - each operation modifying the queue notifies the condition variable `cv_`. + public: + using Duration = std::chrono::milliseconds; + using Clock = std::chrono::system_clock; + // Entries are scheduled using absolute time. + using TimePoint = std::chrono::time_point; + + // Schedules an entry for the specified time due. `due` may be in the past. + void Push(const T& value, const TimePoint due) { + InsertPreservingOrder(Entry{value, due}); + } + void Push(T&& value, const TimePoint due) { + InsertPreservingOrder(Entry{std::move(value), due}); + } + + // If the queue contains at least one entry for which the scheduled time is + // due now (according to the system clock), removes the entry which is the + // most overdue from the queue and returns it. If no entry is due, returns an + // empty `optional`. + absl::optional PopIfDue() { + std::lock_guard lock{mutex_}; + + if (HasDueLocked()) { + return ExtractLocked(scheduled_.begin()); + } + return {}; + } + + // Blocks until at least one entry is available for which the scheduled time + // is due now (according to the system clock), removes the entry which is the + // most overdue from the queue and returns it. The function will + // attempt to minimize both the waiting time and busy waiting. + T PopBlocking() { + std::unique_lock lock{mutex_}; + + while (true) { + cv_.wait(lock, [this] { return !scheduled_.empty(); }); + + // To minimize busy waiting, sleep until either the nearest entry in the + // future either changes, or else becomes due. + const auto until = scheduled_.front().due; + cv_.wait_until(lock, until, + [this, until] { return scheduled_.front().due != until; }); + // There are 3 possibilities why `wait_until` has returned: + // - `wait_until` has timed out, in which case the current time is at + // least `until`, so there must be an overdue entry; + // - a new entry has been added which comes before `until`. It must be + // either overdue (in which case `HasDueLocked` will break the cycle), + // or else `until` must be reevaluated (on the next iteration of the + // loop); + // - `until` entry has been removed. This means `until` has to be + // reevaluated, similar to #2. + + if (HasDueLocked()) { + return ExtractLocked(scheduled_.begin()); + } + } + } + + bool empty() const { + std::lock_guard lock{mutex_}; + return scheduled_.empty(); + } + + size_t size() const { + std::lock_guard lock{mutex_}; + return scheduled_.size(); + } + + // Removes the first entry satisfying predicate from the queue and returns it. + // If no such entry exists, returns an empty `optional`. Predicate is applied + // to entries in order according to their scheduled time. + // + // Note that this function doesn't take into account whether the removed entry + // is past its due time. + template + absl::optional RemoveIf(const Pred pred) { + std::lock_guard lock{mutex_}; + + for (auto iter = scheduled_.begin(), end = scheduled_.end(); iter != end; + ++iter) { + if (pred(iter->value)) { + return ExtractLocked(iter); + } + } + return {}; + } + + // Checks whether the queue contains an entry satisfying the given predicate. + template + bool Contains(const Pred pred) const { + std::lock_guard lock{mutex_}; + return std::any_of(scheduled_.begin(), scheduled_.end(), + [&pred](const Entry& s) { return pred(s.value); }); + } + + private: + struct Entry { + bool operator<(const Entry& rhs) const { + return due < rhs.due; + } + + T value; + TimePoint due; + }; + // All removals are on the front, but most insertions are expected to be on + // the back. + using Container = std::deque; + using Iterator = typename Container::iterator; + + void InsertPreservingOrder(Entry&& new_entry) { + std::lock_guard lock{mutex_}; + + const auto insertion_point = + std::upper_bound(scheduled_.begin(), scheduled_.end(), new_entry); + scheduled_.insert(insertion_point, std::move(new_entry)); + + cv_.notify_one(); + } + + // This function expects the mutex to be already locked. + bool HasDueLocked() const { + namespace chr = std::chrono; + const auto now = chr::time_point_cast(Clock::now()); + return !scheduled_.empty() && now >= scheduled_.front().due; + } + + // This function expects the mutex to be already locked. + T ExtractLocked(const Iterator where) { + FIREBASE_ASSERT_MESSAGE(!scheduled_.empty(), + "Trying to pop an entry from an empty queue."); + + T result = std::move(where->value); + scheduled_.erase(where); + cv_.notify_one(); + + return result; + } + + mutable std::mutex mutex_; + std::condition_variable cv_; + Container scheduled_; +}; + +} // namespace async + +namespace internal { + +// A serial queue that executes provided operations on a dedicated background +// thread, using C++11 standard library functionality. +class ExecutorStd : public Executor { + public: + ExecutorStd(); + ~ExecutorStd(); + + void Execute(Operation&& operation) override; + void ExecuteBlocking(Operation&& operation) override; + + DelayedOperation Schedule(Milliseconds delay, + TaggedOperation&& tagged) override; + + bool IsCurrentExecutor() const override; + std::string CurrentExecutorName() const override; + std::string Name() const override; + + bool IsScheduled(Tag tag) const override; + absl::optional PopFromSchedule() override; + + private: + using TimePoint = async::Schedule::TimePoint; + // To allow canceling operations, each scheduled operation is assigned + // a monotonically increasing identifier. + using Id = unsigned int; + + // If the operation hasn't yet been run, it will be removed from the queue. + // Otherwise, this function is a no-op. + void TryCancel(Id operation_id); + + Id PushOnSchedule(Operation&& operation, TimePoint when, Tag tag = -1); + + void PollingThread(); + void UnblockQueue(); + Id NextId(); + + // As a convention, assign the epoch time to all operations scheduled for + // immediate execution. Note that it means that an immediate operation is + // always scheduled before any delayed operation, even in the corner case when + // the immediate operation was scheduled after a delayed operation was due + // (but hasn't yet run). + static TimePoint Immediate() { + return TimePoint{}; + } + + struct Entry { + Entry() { + } + Entry(Operation&& operation, + const ExecutorStd::Id id, + const ExecutorStd::Tag tag = kNoTag) + : tagged{tag, std::move(operation)}, id{id} { + } + + bool IsImmediate() const { + return tagged.tag == kNoTag; + } + + static constexpr Tag kNoTag = -1; + TaggedOperation tagged; + Id id = 0; + }; + // Operations scheduled for immediate execution are also put on the schedule + // (with due time set to `Immediate`). + async::Schedule schedule_; + + std::thread worker_thread_; + // Used to stop the worker thread. + std::atomic shutting_down_{false}; + + std::atomic current_id_{0}; +}; + +} // namespace internal +} // namespace util +} // namespace firestore +} // namespace firebase + +#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_EXECUTOR_STD_H_ -- cgit v1.2.3