aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/core/src/firebase/firestore/util/async_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'Firestore/core/src/firebase/firestore/util/async_queue.cc')
-rw-r--r--Firestore/core/src/firebase/firestore/util/async_queue.cc140
1 files changed, 140 insertions, 0 deletions
diff --git a/Firestore/core/src/firebase/firestore/util/async_queue.cc b/Firestore/core/src/firebase/firestore/util/async_queue.cc
new file mode 100644
index 0000000..71f5cc5
--- /dev/null
+++ b/Firestore/core/src/firebase/firestore/util/async_queue.cc
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2018 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Firestore/core/src/firebase/firestore/util/async_queue.h"
+
+#include <utility>
+
+#include "Firestore/core/src/firebase/firestore/util/firebase_assert.h"
+#include "absl/memory/memory.h"
+
+namespace firebase {
+namespace firestore {
+namespace util {
+
+using internal::Executor;
+
+AsyncQueue::AsyncQueue(std::unique_ptr<Executor> executor)
+ : executor_{std::move(executor)} {
+ is_operation_in_progress_ = false;
+}
+
+void AsyncQueue::VerifyIsCurrentExecutor() const {
+ FIREBASE_ASSERT_MESSAGE(
+ executor_->IsCurrentExecutor(),
+ "Expected to be called by the executor associated with this queue "
+ "(expected executor: '%s', actual executor: '%s')",
+ executor_->Name().c_str(), executor_->CurrentExecutorName().c_str());
+}
+
+void AsyncQueue::VerifyIsCurrentQueue() const {
+ VerifyIsCurrentExecutor();
+ FIREBASE_ASSERT_MESSAGE(
+ is_operation_in_progress_,
+ "VerifyIsCurrentQueue called when no operation is executing "
+ "(expected executor: '%s', actual executor: '%s')",
+ executor_->Name().c_str(), executor_->CurrentExecutorName().c_str());
+}
+
+void AsyncQueue::ExecuteBlocking(const Operation& operation) {
+ VerifyIsCurrentExecutor();
+ FIREBASE_ASSERT_MESSAGE(!is_operation_in_progress_,
+ "ExecuteBlocking may not be called "
+ "before the previous operation finishes executing");
+
+ is_operation_in_progress_ = true;
+ operation();
+ is_operation_in_progress_ = false;
+}
+
+void AsyncQueue::Enqueue(const Operation& operation) {
+ VerifySequentialOrder();
+ EnqueueRelaxed(operation);
+}
+
+void AsyncQueue::EnqueueRelaxed(const Operation& operation) {
+ executor_->Execute(Wrap(operation));
+}
+
+DelayedOperation AsyncQueue::EnqueueAfterDelay(const Milliseconds delay,
+ const TimerId timer_id,
+ const Operation& operation) {
+ VerifyIsCurrentExecutor();
+
+ // While not necessarily harmful, we currently don't expect to have multiple
+ // callbacks with the same timer_id in the queue, so defensively reject
+ // them.
+ FIREBASE_ASSERT_MESSAGE(
+ !IsScheduled(timer_id),
+ "Attempted to schedule multiple operations with id %d", timer_id);
+
+ Executor::TaggedOperation tagged{static_cast<int>(timer_id), Wrap(operation)};
+ return executor_->Schedule(delay, std::move(tagged));
+}
+
+AsyncQueue::Operation AsyncQueue::Wrap(const Operation& operation) {
+ // Decorator pattern: wrap `operation` into a call to `ExecuteBlocking` to
+ // ensure that it doesn't spawn any nested operations.
+
+ // Note: can't move `operation` into lambda until C++14.
+ return [this, operation] { ExecuteBlocking(operation); };
+}
+
+void AsyncQueue::VerifySequentialOrder() const {
+ // This is the inverse of `VerifyIsCurrentQueue`.
+ FIREBASE_ASSERT_MESSAGE(
+ !is_operation_in_progress_ || !executor_->IsCurrentExecutor(),
+ "Enforcing sequential order failed: currently executing operations "
+ "cannot enqueue more operations "
+ "(this queue's executor: '%s', current executor: '%s')",
+ executor_->Name().c_str(), executor_->CurrentExecutorName().c_str());
+}
+
+// Test-only functions
+
+void AsyncQueue::EnqueueBlocking(const Operation& operation) {
+ VerifySequentialOrder();
+ executor_->ExecuteBlocking(Wrap(operation));
+}
+
+bool AsyncQueue::IsScheduled(const TimerId timer_id) const {
+ return executor_->IsScheduled(static_cast<int>(timer_id));
+}
+
+void AsyncQueue::RunScheduledOperationsUntil(const TimerId last_timer_id) {
+ FIREBASE_ASSERT_MESSAGE(
+ !executor_->IsCurrentExecutor(),
+ "RunScheduledOperationsUntil must not be called on the queue");
+
+ executor_->ExecuteBlocking([this, last_timer_id] {
+ FIREBASE_ASSERT_MESSAGE(
+ last_timer_id == TimerId::All || IsScheduled(last_timer_id),
+ "Attempted to run scheduled operations until missing timer id: %d",
+ last_timer_id);
+
+ for (auto next = executor_->PopFromSchedule(); next.has_value();
+ next = executor_->PopFromSchedule()) {
+ next->operation();
+ if (next->tag == static_cast<int>(last_timer_id)) {
+ break;
+ }
+ }
+ });
+}
+
+} // namespace util
+} // namespace firestore
+} // namespace firebase