diff options
Diffstat (limited to 'Firestore/core/src/firebase/firestore/util/async_queue.cc')
-rw-r--r-- | Firestore/core/src/firebase/firestore/util/async_queue.cc | 140 |
1 files changed, 140 insertions, 0 deletions
diff --git a/Firestore/core/src/firebase/firestore/util/async_queue.cc b/Firestore/core/src/firebase/firestore/util/async_queue.cc new file mode 100644 index 0000000..71f5cc5 --- /dev/null +++ b/Firestore/core/src/firebase/firestore/util/async_queue.cc @@ -0,0 +1,140 @@ +/* + * Copyright 2018 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Firestore/core/src/firebase/firestore/util/async_queue.h" + +#include <utility> + +#include "Firestore/core/src/firebase/firestore/util/firebase_assert.h" +#include "absl/memory/memory.h" + +namespace firebase { +namespace firestore { +namespace util { + +using internal::Executor; + +AsyncQueue::AsyncQueue(std::unique_ptr<Executor> executor) + : executor_{std::move(executor)} { + is_operation_in_progress_ = false; +} + +void AsyncQueue::VerifyIsCurrentExecutor() const { + FIREBASE_ASSERT_MESSAGE( + executor_->IsCurrentExecutor(), + "Expected to be called by the executor associated with this queue " + "(expected executor: '%s', actual executor: '%s')", + executor_->Name().c_str(), executor_->CurrentExecutorName().c_str()); +} + +void AsyncQueue::VerifyIsCurrentQueue() const { + VerifyIsCurrentExecutor(); + FIREBASE_ASSERT_MESSAGE( + is_operation_in_progress_, + "VerifyIsCurrentQueue called when no operation is executing " + "(expected executor: '%s', actual executor: '%s')", + executor_->Name().c_str(), executor_->CurrentExecutorName().c_str()); +} + +void AsyncQueue::ExecuteBlocking(const Operation& operation) { + VerifyIsCurrentExecutor(); + FIREBASE_ASSERT_MESSAGE(!is_operation_in_progress_, + "ExecuteBlocking may not be called " + "before the previous operation finishes executing"); + + is_operation_in_progress_ = true; + operation(); + is_operation_in_progress_ = false; +} + +void AsyncQueue::Enqueue(const Operation& operation) { + VerifySequentialOrder(); + EnqueueRelaxed(operation); +} + +void AsyncQueue::EnqueueRelaxed(const Operation& operation) { + executor_->Execute(Wrap(operation)); +} + +DelayedOperation AsyncQueue::EnqueueAfterDelay(const Milliseconds delay, + const TimerId timer_id, + const Operation& operation) { + VerifyIsCurrentExecutor(); + + // While not necessarily harmful, we currently don't expect to have multiple + // callbacks with the same timer_id in the queue, so defensively reject + // them. + FIREBASE_ASSERT_MESSAGE( + !IsScheduled(timer_id), + "Attempted to schedule multiple operations with id %d", timer_id); + + Executor::TaggedOperation tagged{static_cast<int>(timer_id), Wrap(operation)}; + return executor_->Schedule(delay, std::move(tagged)); +} + +AsyncQueue::Operation AsyncQueue::Wrap(const Operation& operation) { + // Decorator pattern: wrap `operation` into a call to `ExecuteBlocking` to + // ensure that it doesn't spawn any nested operations. + + // Note: can't move `operation` into lambda until C++14. + return [this, operation] { ExecuteBlocking(operation); }; +} + +void AsyncQueue::VerifySequentialOrder() const { + // This is the inverse of `VerifyIsCurrentQueue`. + FIREBASE_ASSERT_MESSAGE( + !is_operation_in_progress_ || !executor_->IsCurrentExecutor(), + "Enforcing sequential order failed: currently executing operations " + "cannot enqueue more operations " + "(this queue's executor: '%s', current executor: '%s')", + executor_->Name().c_str(), executor_->CurrentExecutorName().c_str()); +} + +// Test-only functions + +void AsyncQueue::EnqueueBlocking(const Operation& operation) { + VerifySequentialOrder(); + executor_->ExecuteBlocking(Wrap(operation)); +} + +bool AsyncQueue::IsScheduled(const TimerId timer_id) const { + return executor_->IsScheduled(static_cast<int>(timer_id)); +} + +void AsyncQueue::RunScheduledOperationsUntil(const TimerId last_timer_id) { + FIREBASE_ASSERT_MESSAGE( + !executor_->IsCurrentExecutor(), + "RunScheduledOperationsUntil must not be called on the queue"); + + executor_->ExecuteBlocking([this, last_timer_id] { + FIREBASE_ASSERT_MESSAGE( + last_timer_id == TimerId::All || IsScheduled(last_timer_id), + "Attempted to run scheduled operations until missing timer id: %d", + last_timer_id); + + for (auto next = executor_->PopFromSchedule(); next.has_value(); + next = executor_->PopFromSchedule()) { + next->operation(); + if (next->tag == static_cast<int>(last_timer_id)) { + break; + } + } + }); +} + +} // namespace util +} // namespace firestore +} // namespace firebase |