aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/core/src/firebase/firestore/util/async_queue.h
blob: e2df3872559a20ef136bc8de69c047720d81eaea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/*
 * Copyright 2018 Google
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_ASYNC_QUEUE_H_
#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_ASYNC_QUEUE_H_

#include <atomic>
#include <chrono>  // NOLINT(build/c++11)
#include <functional>
#include <memory>

#include "Firestore/core/src/firebase/firestore/util/executor.h"

namespace firebase {
namespace firestore {
namespace util {

/**
 * Well-known "timer" ids used when scheduling delayed operations on the
 * AsyncQueue. These ids can then be used from tests to check for the
 * presence of delayed operations or to run them early.
 */
enum class TimerId {
  /** All can be used with `RunDelayedOperationsUntil` to run all timers. */
  All,

  /**
   * The following 4 timers are used in `Stream` for the listen and write
   * streams. The "Idle" timer is used to close the stream due to inactivity.
   * The "ConnectionBackoff" timer is used to restart a stream once the
   * appropriate backoff delay has elapsed.
   */
  ListenStreamIdle,
  ListenStreamConnectionBackoff,
  WriteStreamIdle,
  WriteStreamConnectionBackoff,

  /**
   * A timer used in `OnlineStateTracker` to transition from
   * `OnlineStateUnknown` to `Offline` after a set timeout, rather than waiting
   * indefinitely for success or failure.
   */
  OnlineStateTimeout,
};

// A serial queue that executes given operations asynchronously, one at a time.
// Operations may be scheduled to be executed as soon as possible or in the
// future. Operations scheduled for the same time are FIFO-ordered.
//
// `AsyncQueue` wraps a platform-specific executor, adding checks that enforce
// sequential ordering of operations: an enqueued operation, while being run,
// normally cannot enqueue other operations for immediate execution (but see
// `EnqueueRelaxed`).
//
// `AsyncQueue` methods have particular expectations about whether they must be
// invoked on the queue or not; check "preconditions" section in comments on
// each method.
//
// A significant portion of `AsyncQueue` interface only exists for test purposes
// and must *not* be used in regular code.
class AsyncQueue {
 public:
  using Operation = internal::Executor::Operation;
  using Milliseconds = internal::Executor::Milliseconds;

  explicit AsyncQueue(std::unique_ptr<internal::Executor> executor);

  // Asserts for the caller that it is being invoked as part of an operation on
  // the `AsyncQueue`.
  void VerifyIsCurrentQueue() const;

  // Enqueue methods

  // Puts the `operation` on the queue to be executed as soon as possible, while
  // maintaining FIFO order.
  //
  // Precondition: `Enqueue` calls cannot be nested; that is, `Enqueue` may not
  // be called by a previously enqueued operation when it is run (as a special
  // case, destructors invoked when an enqueued operation has run and is being
  // destroyed may invoke `Enqueue`).
  void Enqueue(const Operation& operation);

  // Like `Enqueue`, but without applying any prerequisite checks.
  void EnqueueRelaxed(const Operation& operation);

  // Puts the `operation` on the queue to be executed `delay` milliseconds from
  // now, and returns a handle that allows to cancel the operation (provided it
  // hasn't run already).
  //
  // `operation` is tagged by a `timer_id` which allows to identify the caller.
  // Only one operation tagged with any given `timer_id` may be on the queue at
  // any time; an attempt to put another such operation will result in an
  // assertion failure. In tests, these tags also allow to check for presence of
  // certain operations and to run certain operations in advance.
  //
  // Precondition: `EnqueueAfterDelay` is being invoked asynchronously on the
  // queue.
  DelayedOperation EnqueueAfterDelay(Milliseconds delay,
                                     TimerId timer_id,
                                     const Operation& operation);

  // Direct execution

  // Immediately executes the `operation` on the queue.
  //
  // This is largely a workaround to allow other classes (GRPC) to directly
  // access the underlying dispatch queue without getting `AsyncQueue` into an
  // inconsistent state.
  //
  // Precondition: no other operation is being executed on the queue at the
  // moment of the call (i.e., `ExecuteBlocking` cannot call `ExecuteBlocking`).
  //
  // Precondition: `ExecuteBlocking` is being invoked asynchronously on the
  // queue.
  void ExecuteBlocking(const Operation& operation);

  // Test-only interface follows
  // TODO(varconst): move the test-only interface into a helper object that is
  // a friend of AsyncQueue and delegates its public methods to private methods
  // on AsyncQueue.

  // Like `Enqueue`, but blocks until the `operation` is complete.
  void EnqueueBlocking(const Operation& operation);

  // Checks whether an operation tagged with `timer_id` is currently scheduled
  // for execution in the future.
  bool IsScheduled(TimerId timer_id) const;

  // Force runs operations scheduled for future execution, in scheduled order,
  // up to *and including* the operation tagged with `last_timer_id`.
  //
  // Precondition: `RunScheduledOperationsUntil` is *not* being invoked on the
  // queue.
  void RunScheduledOperationsUntil(TimerId last_timer_id);

 private:
  Operation Wrap(const Operation& operation);

  // Asserts that the current invocation happens asynchronously on the queue.
  void VerifyIsCurrentExecutor() const;
  void VerifySequentialOrder() const;

  std::atomic<bool> is_operation_in_progress_;
  std::unique_ptr<internal::Executor> executor_;
};

}  // namespace util
}  // namespace firestore
}  // namespace firebase

#endif  // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_UTIL_ASYNC_QUEUE_H_