aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/Source/Remote/FSTStream.h
blob: 546aa3d155d0141b6671b9345822f5dffb61814f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
/*
 * Copyright 2017 Google
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#import <Foundation/Foundation.h>

#import "Firestore/Source/Core/FSTTypes.h"

@class FSTDatabaseInfo;
@class FSTDocumentKey;
@class FSTDispatchQueue;
@class FSTMutation;
@class FSTMutationResult;
@class FSTQueryData;
@class FSTSerializerBeta;
@class FSTSnapshotVersion;
@class FSTWatchChange;
@class FSTWatchStream;
@class FSTWriteStream;
@class GRPCCall;
@class GRXWriter;

@protocol FSTCredentialsProvider;
@protocol FSTWatchStreamDelegate;
@protocol FSTWriteStreamDelegate;

NS_ASSUME_NONNULL_BEGIN

/**
 * An FSTStream is an abstract base class that represents a restartable streaming RPC to the
 * Firestore backend. It's built on top of GRPC's own support for streaming RPCs, and adds several
 * critical features for our clients:
 *
 *   - Restarting a stream is allowed (after failure)
 *   - Exponential backoff on failure (independent of the underlying channel)
 *   - Authentication via FSTCredentialsProvider
 *   - Dispatching all callbacks into the shared worker queue
 *
 * Subclasses of FSTStream implement serialization of models to and from bytes (via protocol
 * buffers) for a specific streaming RPC and emit events specific to the stream.
 *
 * ## Starting and Stopping
 *
 * Streaming RPCs are stateful and need to be started before messages can be sent and received.
 * The FSTStream will call its delegate's specific streamDidOpen method once the stream is ready
 * to accept requests.
 *
 * Should a `start` fail, FSTStream will call its delegate's specific streamDidClose method with an
 * NSError indicating what went wrong. The delegate is free to call start again.
 *
 * An FSTStream can also be explicitly stopped which indicates that the caller has discarded the
 * stream and no further events should be emitted. Once explicitly stopped, a stream cannot be
 * restarted.
 *
 * ## Subclassing Notes
 *
 * An implementation of FSTStream needs to implement the following methods:
 *   - `createRPCWithRequestsWriter`, should create the specific RPC (a GRPCCall object).
 *   - `handleStreamMessage`, receives protocol buffer responses from GRPC and must deserialize and
 *     delegate to some stream specific response method.
 *   - `notifyStreamOpen`, should call through to the stream-specific streamDidOpen method.
 *   - `notifyStreamInterrupted`, calls through to the stream-specific streamWasInterrupted method.
 *
 * Additionally, beyond these required methods, subclasses will want to implement methods that
 * take request models, serialize them, and write them to using writeRequest:. Implementation
 * specific cleanup logic can be added to tearDown:.
 *
 * ## RPC Message Type
 *
 * FSTStream intentionally uses the GRPCCall interface to GRPC directly, bypassing both GRPCProtoRPC
 * and GRXBufferedPipe for sending data. This has been done to avoid race conditions that come out
 * of a loosely specified locking contract on GRXWriter. There's essentially no way to safely use
 * any of the wrapper objects for GRXWriter (that perform buffering or conversion to/from protos).
 *
 * See https://github.com/grpc/grpc/issues/10957 for the kinds of things we're trying to avoid.
 */
@interface FSTStream <__covariant FSTStreamDelegate> : NSObject

- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
             workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
                     credentials:(id<FSTCredentialsProvider>)credentials
            responseMessageClass:(Class)responseMessageClass NS_DESIGNATED_INITIALIZER;

- (instancetype)init NS_UNAVAILABLE;

/**
 * An abstract method used by `start` to create a streaming RPC specific to this type of stream.
 * The RPC should be created such that requests are taken from `self`.
 *
 * Note that the returned GRPCCall must not be a GRPCProtoRPC, since the rest of the streaming
 * mechanism assumes it is dealing in bytes-level requests and responses.
 */
- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter;

/**
 * Returns YES if `start` has been called and no error has occurred. YES indicates the stream is
 * open or in the process of opening (which encompasses respecting backoff, getting auth tokens,
 * and starting the actual RPC). Use `isOpen` to determine if the stream is open and ready for
 * outbound requests.
 */
- (BOOL)isStarted;

/** Returns YES if the underlying RPC is open and the stream is ready for outbound requests. */
- (BOOL)isOpen;

/**
 * Starts the RPC. Only allowed if isStarted returns NO. The stream is not immediately ready for
 * use: the delegate's watchStreamDidOpen method will be invoked when the RPC is ready for outbound
 * requests, at which point `isOpen` will return YES.
 *
 * When start returns, -isStarted will return YES.
 */
- (void)startWithDelegate:(id)delegate;

/**
 * Stops the RPC. This call is idempotent and allowed regardless of the current isStarted state.
 *
 * Unlike a transient stream close, stopping a stream is permanent. This is guaranteed NOT to emit
 * any further events on the stream-specific delegate, including the streamDidClose method.
 *
 * NOTE: This no-events contract may seem counter-intuitive but allows the caller to
 * straightforwardly sequence stream tear-down without having to worry about when the delegate's
 * streamDidClose methods will get called. For example if the stream must be exchanged for another
 * during a user change this allows `stop` to be called eagerly without worrying about the
 * streamDidClose method accidentally restarting the stream before the new one is ready.
 *
 * When stop returns, -isStarted and -isOpen will both return NO.
 */
- (void)stop;

/**
 * Initializes the idle timer. If no write takes place within one minute, the GRPC stream will be
 * closed.
 */
- (void)markIdle;

/**
 * After an error the stream will usually back off on the next attempt to start it. If the error
 * warrants an immediate restart of the stream, the sender can use this to indicate that the
 * receiver should not back off.
 *
 * Each error will call the stream-specific streamDidClose method. That method can decide to
 * inhibit backoff if required.
 */
- (void)inhibitBackoff;

@end

#pragma mark - FSTWatchStream

/** A protocol defining the events that can be emitted by the FSTWatchStream. */
@protocol FSTWatchStreamDelegate <NSObject>

/** Called by the FSTWatchStream when it is ready to accept outbound request messages. */
- (void)watchStreamDidOpen;

/**
 * Called by the FSTWatchStream with changes and the snapshot versions included in in the
 * WatchChange responses sent back by the server.
 */
- (void)watchStreamDidChange:(FSTWatchChange *)change
             snapshotVersion:(FSTSnapshotVersion *)snapshotVersion;

/**
 * Called by the FSTWatchStream when the underlying streaming RPC is interrupted for whatever
 * reason, usually because of an error, but possibly due to an idle timeout. The error passed to
 * this method may be nil, in which case the stream was closed without attributable fault.
 *
 * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping"
 * on FSTStream for details.
 */
- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error;

@end

/**
 * An FSTStream that implements the StreamingWatch RPC.
 *
 * Once the FSTWatchStream has called the streamDidOpen method, any number of watchQuery and
 * unwatchTargetId calls can be sent to control what changes will be sent from the server for
 * WatchChanges.
 */
@interface FSTWatchStream : FSTStream

/**
 * Initializes the watch stream with its dependencies.
 */
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
             workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
                     credentials:(id<FSTCredentialsProvider>)credentials
                      serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;

- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
             workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
                     credentials:(id<FSTCredentialsProvider>)credentials
            responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;

- (instancetype)init NS_UNAVAILABLE;

/**
 * Registers interest in the results of the given query. If the query includes a resumeToken it
 * will be included in the request. Results that affect the query will be streamed back as
 * WatchChange messages that reference the targetID included in |query|.
 */
- (void)watchQuery:(FSTQueryData *)query;

/** Unregisters interest in the results of the query associated with the given target ID. */
- (void)unwatchTargetID:(FSTTargetID)targetID;

@end

#pragma mark - FSTWriteStream

@protocol FSTWriteStreamDelegate <NSObject>

/** Called by the FSTWriteStream when it is ready to accept outbound request messages. */
- (void)writeStreamDidOpen;

/**
 * Called by the FSTWriteStream upon a successful handshake response from the server, which is the
 * receiver's cue to send any pending writes.
 */
- (void)writeStreamDidCompleteHandshake;

/**
 * Called by the FSTWriteStream upon receiving a StreamingWriteResponse from the server that
 * contains mutation results.
 */
- (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion
                                 mutationResults:(NSArray<FSTMutationResult *> *)results;

/**
 * Called when the FSTWriteStream's underlying RPC is interrupted for whatever reason, usually
 * because of an error, but possibly due to an idle timeout. The error passed to this method may be
 * nil, in which case the stream was closed without attributable fault.
 *
 * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping"
 * on FSTStream for details.
 */
- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error;

@end

/**
 * An FSTStream that implements the StreamingWrite RPC.
 *
 * The StreamingWrite RPC requires the caller to maintain special `streamToken` state in between
 * calls, to help the server understand which responses the client has processed by the time the
 * next request is made. Every response may contain a `streamToken`; this value must be passed to
 * the next request.
 *
 * After calling `start` on this stream, the next request must be a handshake, containing whatever
 * streamToken is on hand. Once a response to this request is received, all pending mutations may
 * be submitted. When submitting multiple batches of mutations at the same time, it's okay to use
 * the same streamToken for the calls to `writeMutations:`.
 */
@interface FSTWriteStream : FSTStream

/**
 * Initializes the write stream with its dependencies.
 */
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
             workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
                     credentials:(id<FSTCredentialsProvider>)credentials
                      serializer:(FSTSerializerBeta *)serializer;

- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
             workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
                     credentials:(id<FSTCredentialsProvider>)credentials
            responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;

- (instancetype)init NS_UNAVAILABLE;

/**
 * Sends an initial streamToken to the server, performing the handshake required to make the
 * StreamingWrite RPC work. Subsequent `writeMutations:` calls should wait until a response has
 * been delivered to the delegate's writeStreamDidCompleteHandshake method.
 */
- (void)writeHandshake;

/** Sends a group of mutations to the Firestore backend to apply. */
- (void)writeMutations:(NSArray<FSTMutation *> *)mutations;

/**
 * Tracks whether or not a handshake has been successfully exchanged and the stream is ready to
 * accept mutations.
 */
@property(nonatomic, assign, readwrite, getter=isHandshakeComplete) BOOL handshakeComplete;

/**
 * The last received stream token from the server, used to acknowledge which responses the client
 * has processed. Stream tokens are opaque checkpoint markers whose only real value is their
 * inclusion in the next request.
 *
 * FSTWriteStream manages propagating this value from responses to the next request.
 */
@property(nonatomic, strong, nullable) NSData *lastStreamToken;

@end

NS_ASSUME_NONNULL_END