aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore
diff options
context:
space:
mode:
authorGravatar Sebastian Schmidt <mrschmidt@google.com>2017-10-31 12:32:32 -0700
committerGravatar GitHub <noreply@github.com>2017-10-31 12:32:32 -0700
commitf9b2f91da109d2b74f7076e8b6b6d8ced73a5e27 (patch)
tree56236f42e2d878a936de901533e683031e588285 /Firestore
parent02ff6bbee95150eacff9563af4dd7a6e1aeaebdd (diff)
Moving the Stream class to their own file (#418)
Diffstat (limited to 'Firestore')
-rw-r--r--Firestore/Example/Tests/Integration/FSTStreamTests.m1
-rw-r--r--Firestore/Example/Tests/SpecTests/FSTMockDatastore.m1
-rw-r--r--Firestore/Source/Remote/FSTDatastore.h290
-rw-r--r--Firestore/Source/Remote/FSTDatastore.m731
-rw-r--r--Firestore/Source/Remote/FSTRemoteStore.m1
-rw-r--r--Firestore/Source/Remote/FSTStream.h312
-rw-r--r--Firestore/Source/Remote/FSTStream.m764
7 files changed, 1095 insertions, 1005 deletions
diff --git a/Firestore/Example/Tests/Integration/FSTStreamTests.m b/Firestore/Example/Tests/Integration/FSTStreamTests.m
index dccaa70..a9ee4d1 100644
--- a/Firestore/Example/Tests/Integration/FSTStreamTests.m
+++ b/Firestore/Example/Tests/Integration/FSTStreamTests.m
@@ -25,6 +25,7 @@
#import "FSTTestDispatchQueue.h"
#import "Model/FSTDatabaseID.h"
#import "Remote/FSTDatastore.h"
+#import "Remote/FSTStream.h"
#import "Util/FSTAssert.h"
/** Exposes otherwise private methods for testing. */
diff --git a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m
index 6af2053..10bfc5b 100644
--- a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m
+++ b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m
@@ -23,6 +23,7 @@
#import "Model/FSTDatabaseID.h"
#import "Model/FSTMutation.h"
#import "Remote/FSTSerializerBeta.h"
+#import "Remote/FSTStream.h"
#import "Util/FSTAssert.h"
#import "Util/FSTLogger.h"
diff --git a/Firestore/Source/Remote/FSTDatastore.h b/Firestore/Source/Remote/FSTDatastore.h
index b8e53f5..08aa570 100644
--- a/Firestore/Source/Remote/FSTDatastore.h
+++ b/Firestore/Source/Remote/FSTDatastore.h
@@ -33,8 +33,7 @@
@class GRXWriter;
@protocol FSTCredentialsProvider;
-@protocol FSTWatchStreamDelegate;
-@protocol FSTWriteStreamDelegate;
+@class FSTDatabaseID;
NS_ASSUME_NONNULL_BEGIN
@@ -64,14 +63,26 @@ NS_ASSUME_NONNULL_BEGIN
credentials:(id<FSTCredentialsProvider>)credentials
NS_DESIGNATED_INITIALIZER;
+/**
+ * Takes a dictionary of (HTTP) response headers and returns the set of whitelisted headers
+ * (for logging purposes).
+ */
++ (NSDictionary<NSString *, NSString *> *)extractWhiteListedHeaders:
+ (NSDictionary<NSString *, NSString *> *)header;
+
/** Converts the error to a FIRFirestoreErrorDomain error. */
+ (NSError *)firestoreErrorForError:(NSError *)error;
+/** Returns YES if the given error is a GRPC ABORTED error. **/
++ (BOOL)isAbortedError:(NSError *)error;
+
/** Returns YES if the given error indicates the RPC associated with it may not be retried. */
+ (BOOL)isPermanentWriteError:(NSError *)error;
-/** Returns YES if the given error is a GRPC ABORTED error. **/
-+ (BOOL)isAbortedError:(NSError *)error;
+/** Adds headers to the RPC including any OAuth access token if provided .*/
++ (void)prepareHeadersForRPC:(GRPCCall *)rpc
+ databaseID:(FSTDatabaseID *)databaseID
+ token:(nullable NSString *)token;
/** Looks up a list of documents in datastore. */
- (void)lookupDocuments:(NSArray<FSTDocumentKey *> *)keys
@@ -92,275 +103,4 @@ NS_ASSUME_NONNULL_BEGIN
@end
-/**
- * An FSTStream is an abstract base class that represents a restartable streaming RPC to the
- * Firestore backend. It's built on top of GRPC's own support for streaming RPCs, and adds several
- * critical features for our clients:
- *
- * - Restarting a stream is allowed (after failure)
- * - Exponential backoff on failure (independent of the underlying channel)
- * - Authentication via FSTCredentialsProvider
- * - Dispatching all callbacks into the shared worker queue
- *
- * Subclasses of FSTStream implement serialization of models to and from bytes (via protocol
- * buffers) for a specific streaming RPC and emit events specific to the stream.
- *
- * ## Starting and Stopping
- *
- * Streaming RPCs are stateful and need to be started before messages can be sent and received.
- * The FSTStream will call its delegate's specific streamDidOpen method once the stream is ready
- * to accept requests.
- *
- * Should a `start` fail, FSTStream will call its delegate's specific streamDidClose method with an
- * NSError indicating what went wrong. The delegate is free to call start again.
- *
- * An FSTStream can also be explicitly stopped which indicates that the caller has discarded the
- * stream and no further events should be emitted. Once explicitly stopped, a stream cannot be
- * restarted.
- *
- * ## Subclassing Notes
- *
- * An implementation of FSTStream needs to implement the following methods:
- * - `createRPCWithRequestsWriter`, should create the specific RPC (a GRPCCall object).
- * - `handleStreamMessage`, receives protocol buffer responses from GRPC and must deserialize and
- * delegate to some stream specific response method.
- * - `notifyStreamOpen`, should call through to the stream-specific streamDidOpen method.
- * - `notifyStreamInterrupted`, calls through to the stream-specific streamWasInterrupted method.
- *
- * Additionally, beyond these required methods, subclasses will want to implement methods that
- * take request models, serialize them, and write them to using writeRequest:.
- *
- * ## RPC Message Type
- *
- * FSTStream intentionally uses the GRPCCall interface to GRPC directly, bypassing both GRPCProtoRPC
- * and GRXBufferedPipe for sending data. This has been done to avoid race conditions that come out
- * of a loosely specified locking contract on GRXWriter. There's essentially no way to safely use
- * any of the wrapper objects for GRXWriter (that perform buffering or conversion to/from protos).
- *
- * See https://github.com/grpc/grpc/issues/10957 for the kinds of things we're trying to avoid.
- */
-@interface FSTStream <__covariant FSTStreamDelegate> : NSObject
-
-- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
- workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
- credentials:(id<FSTCredentialsProvider>)credentials
- responseMessageClass:(Class)responseMessageClass NS_DESIGNATED_INITIALIZER;
-
-- (instancetype)init NS_UNAVAILABLE;
-
-/**
- * An abstract method used by `start` to create a streaming RPC specific to this type of stream.
- * The RPC should be created such that requests are taken from `self`.
- *
- * Note that the returned GRPCCall must not be a GRPCProtoRPC, since the rest of the streaming
- * mechanism assumes it is dealing in bytes-level requests and responses.
- */
-- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter;
-
-/**
- * Returns YES if `start` has been called and no error has occurred. YES indicates the stream is
- * open or in the process of opening (which encompasses respecting backoff, getting auth tokens,
- * and starting the actual RPC). Use `isOpen` to determine if the stream is open and ready for
- * outbound requests.
- */
-- (BOOL)isStarted;
-
-/** Returns YES if the underlying RPC is open and the stream is ready for outbound requests. */
-- (BOOL)isOpen;
-
-/**
- * Starts the RPC. Only allowed if isStarted returns NO. The stream is not immediately ready for
- * use: the delegate's watchStreamDidOpen method will be invoked when the RPC is ready for outbound
- * requests, at which point `isOpen` will return YES.
- *
- * When start returns, -isStarted will return YES.
- */
-- (void)startWithDelegate:(id)delegate;
-
-/**
- * Stops the RPC. This call is idempotent and allowed regardless of the current isStarted state.
- *
- * Unlike a transient stream close, stopping a stream is permanent. This is guaranteed NOT to emit
- * any further events on the stream-specific delegate, including the streamDidClose method.
- *
- * NOTE: This no-events contract may seem counter-intuitive but allows the caller to
- * straightforwardly sequence stream tear-down without having to worry about when the delegate's
- * streamDidClose methods will get called. For example if the stream must be exchanged for another
- * during a user change this allows `stop` to be called eagerly without worrying about the
- * streamDidClose method accidentally restarting the stream before the new one is ready.
- *
- * When stop returns, -isStarted and -isOpen will both return NO.
- */
-- (void)stop;
-
-/**
- * Initializes the idle timer. If no write takes place within one minute, the GRPC stream will be
- * closed.
- */
-- (void)markIdle;
-
-/**
- * After an error the stream will usually back off on the next attempt to start it. If the error
- * warrants an immediate restart of the stream, the sender can use this to indicate that the
- * receiver should not back off.
- *
- * Each error will call the stream-specific streamDidClose method. That method can decide to
- * inhibit backoff if required.
- */
-- (void)inhibitBackoff;
-
-@end
-
-#pragma mark - FSTWatchStream
-
-/** A protocol defining the events that can be emitted by the FSTWatchStream. */
-@protocol FSTWatchStreamDelegate <NSObject>
-
-/** Called by the FSTWatchStream when it is ready to accept outbound request messages. */
-- (void)watchStreamDidOpen;
-
-/**
- * Called by the FSTWatchStream with changes and the snapshot versions included in in the
- * WatchChange responses sent back by the server.
- */
-- (void)watchStreamDidChange:(FSTWatchChange *)change
- snapshotVersion:(FSTSnapshotVersion *)snapshotVersion;
-
-/**
- * Called by the FSTWatchStream when the underlying streaming RPC is interrupted for whatever
- * reason, usually because of an error, but possibly due to an idle timeout. The error passed to
- * this method may be nil, in which case the stream was closed without attributable fault.
- *
- * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping"
- * on FSTStream for details.
- */
-- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error;
-
-@end
-
-/**
- * An FSTStream that implements the StreamingWatch RPC.
- *
- * Once the FSTWatchStream has called the streamDidOpen method, any number of watchQuery and
- * unwatchTargetId calls can be sent to control what changes will be sent from the server for
- * WatchChanges.
- */
-@interface FSTWatchStream : FSTStream
-
-/**
- * Initializes the watch stream with its dependencies.
- */
-- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
- workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
- credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
-
-- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
- workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
- credentials:(id<FSTCredentialsProvider>)credentials
- responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
-
-- (instancetype)init NS_UNAVAILABLE;
-
-/**
- * Registers interest in the results of the given query. If the query includes a resumeToken it
- * will be included in the request. Results that affect the query will be streamed back as
- * WatchChange messages that reference the targetID included in |query|.
- */
-- (void)watchQuery:(FSTQueryData *)query;
-
-/** Unregisters interest in the results of the query associated with the given target ID. */
-- (void)unwatchTargetID:(FSTTargetID)targetID;
-
-@end
-
-#pragma mark - FSTWriteStream
-
-@protocol FSTWriteStreamDelegate <NSObject>
-
-/** Called by the FSTWriteStream when it is ready to accept outbound request messages. */
-- (void)writeStreamDidOpen;
-
-/**
- * Called by the FSTWriteStream upon a successful handshake response from the server, which is the
- * receiver's cue to send any pending writes.
- */
-- (void)writeStreamDidCompleteHandshake;
-
-/**
- * Called by the FSTWriteStream upon receiving a StreamingWriteResponse from the server that
- * contains mutation results.
- */
-- (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion
- mutationResults:(NSArray<FSTMutationResult *> *)results;
-
-/**
- * Called when the FSTWriteStream's underlying RPC is interrupted for whatever reason, usually
- * because of an error, but possibly due to an idle timeout. The error passed to this method may be
- * nil, in which case the stream was closed without attributable fault.
- *
- * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping"
- * on FSTStream for details.
- */
-- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error;
-
-@end
-
-/**
- * An FSTStream that implements the StreamingWrite RPC.
- *
- * The StreamingWrite RPC requires the caller to maintain special `streamToken` state in between
- * calls, to help the server understand which responses the client has processed by the time the
- * next request is made. Every response may contain a `streamToken`; this value must be passed to
- * the next request.
- *
- * After calling `start` on this stream, the next request must be a handshake, containing whatever
- * streamToken is on hand. Once a response to this request is received, all pending mutations may
- * be submitted. When submitting multiple batches of mutations at the same time, it's okay to use
- * the same streamToken for the calls to `writeMutations:`.
- */
-@interface FSTWriteStream : FSTStream
-
-/**
- * Initializes the write stream with its dependencies.
- */
-- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
- workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
- credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer;
-
-- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
- workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
- credentials:(id<FSTCredentialsProvider>)credentials
- responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
-
-- (instancetype)init NS_UNAVAILABLE;
-
-/**
- * Sends an initial streamToken to the server, performing the handshake required to make the
- * StreamingWrite RPC work. Subsequent `writeMutations:` calls should wait until a response has
- * been delivered to the delegate's writeStreamDidCompleteHandshake method.
- */
-- (void)writeHandshake;
-
-/** Sends a group of mutations to the Firestore backend to apply. */
-- (void)writeMutations:(NSArray<FSTMutation *> *)mutations;
-
-/**
- * Tracks whether or not a handshake has been successfully exchanged and the stream is ready to
- * accept mutations.
- */
-@property(nonatomic, assign, readwrite, getter=isHandshakeComplete) BOOL handshakeComplete;
-
-/**
- * The last received stream token from the server, used to acknowledge which responses the client
- * has processed. Stream tokens are opaque checkpoint markers whose only real value is their
- * inclusion in the next request.
- *
- * FSTWriteStream manages propagating this value from responses to the next request.
- */
-@property(nonatomic, strong, nullable) NSData *lastStreamToken;
-
-@end
-
NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTDatastore.m b/Firestore/Source/Remote/FSTDatastore.m
index 1cabc36..a26a5a2 100644
--- a/Firestore/Source/Remote/FSTDatastore.m
+++ b/Firestore/Source/Remote/FSTDatastore.m
@@ -17,27 +17,23 @@
#import "FSTDatastore.h"
#import <GRPCClient/GRPCCall+OAuth2.h>
-#import <GRPCClient/GRPCCall.h>
#import <ProtoRPC/ProtoRPC.h>
#import "FIRFirestore+Internal.h"
#import "FIRFirestoreErrors.h"
#import "FIRFirestoreVersion.h"
#import "FSTAssert.h"
-#import "FSTBufferedWriter.h"
-#import "FSTClasses.h"
#import "FSTCredentialsProvider.h"
#import "FSTDatabaseID.h"
#import "FSTDatabaseInfo.h"
#import "FSTDispatchQueue.h"
#import "FSTDocument.h"
#import "FSTDocumentKey.h"
-#import "FSTExponentialBackoff.h"
#import "FSTLocalStore.h"
#import "FSTLogger.h"
#import "FSTMutation.h"
-#import "FSTQueryData.h"
#import "FSTSerializerBeta.h"
+#import "FSTStream.h"
#import "Firestore.pbrpc.h"
@@ -51,114 +47,12 @@ NS_ASSUME_NONNULL_BEGIN
@property(nonatomic, getter=isSecure) BOOL secure;
@end
-/**
- * Initial backoff time in seconds after an error.
- * Set to 1s according to https://cloud.google.com/apis/design/errors.
- */
-static const NSTimeInterval kBackoffInitialDelay = 1;
-static const NSTimeInterval kBackoffMaxDelay = 60.0;
-static const double kBackoffFactor = 1.5;
static NSString *const kXGoogAPIClientHeader = @"x-goog-api-client";
static NSString *const kGoogleCloudResourcePrefix = @"google-cloud-resource-prefix";
/** Function typedef used to create RPCs. */
typedef GRPCProtoCall * (^RPCFactory)(void);
-#pragma mark - FSTStream
-
-/** The state of a stream. */
-typedef NS_ENUM(NSInteger, FSTStreamState) {
- /**
- * The streaming RPC is not running and there's no error condition. Calling `start` will
- * start the stream immediately without backoff. While in this state -isStarted will return NO.
- */
- FSTStreamStateInitial = 0,
-
- /**
- * The stream is starting, and is waiting for an auth token to attach to the initial request.
- * While in this state, isStarted will return YES but isOpen will return NO.
- */
- FSTStreamStateAuth,
-
- /**
- * The streaming RPC is up and running. Requests and responses can flow freely. Both
- * isStarted and isOpen will return YES.
- */
- FSTStreamStateOpen,
-
- /**
- * The stream encountered an error. The next start attempt will back off. While in this state
- * -isStarted will return NO.
- */
- FSTStreamStateError,
-
- /**
- * An in-between state after an error where the stream is waiting before re-starting. After
- * waiting is complete, the stream will try to open. While in this state -isStarted will
- * return YES but isOpen will return NO.
- */
- FSTStreamStateBackoff,
-
- /**
- * The stream has been explicitly stopped; no further events will be emitted.
- */
- FSTStreamStateStopped,
-};
-
-// We need to declare these classes first so that Datastore can alloc them.
-
-@interface FSTWatchStream ()
-
-/**
- * Initializes the watch stream with its dependencies.
- */
-- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
- workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
- credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
-
-- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
- workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
- credentials:(id<FSTCredentialsProvider>)credentials
- responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
-
-@end
-
-@interface FSTStream ()
-
-@property(nonatomic, getter=isIdle) BOOL idle;
-@property(nonatomic, weak, readwrite, nullable) id delegate;
-
-@end
-
-@interface FSTStream () <GRXWriteable>
-
-@property(nonatomic, strong, readonly) FSTDatabaseInfo *databaseInfo;
-@property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue;
-@property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials;
-@property(nonatomic, unsafe_unretained, readonly) Class responseMessageClass;
-@property(nonatomic, strong, readonly) FSTExponentialBackoff *backoff;
-
-/** A flag tracking whether the stream received a message from the backend. */
-@property(nonatomic, assign) BOOL messageReceived;
-
-/**
- * Stream state as exposed to consumers of FSTStream. This differs from GRXWriter's notion of the
- * state of the stream.
- */
-@property(nonatomic, assign) FSTStreamState state;
-
-/** The RPC handle. Used for cancellation. */
-@property(nonatomic, strong, nullable) GRPCCall *rpc;
-
-/**
- * The send-side of the RPC stream in which to submit requests, but only once the underlying RPC has
- * started.
- */
-@property(nonatomic, strong, nullable) FSTBufferedWriter *requestsWriter;
-
-@end
-
#pragma mark - FSTDatastore
@interface FSTDatastore ()
@@ -439,627 +333,4 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
@end
-#pragma mark - FSTCallbackFilter
-
-/** Filter class that allows disabling of GRPC callbacks. */
-@interface FSTCallbackFilter : NSObject <GRXWriteable>
-
-- (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER;
-- (instancetype)init NS_UNAVAILABLE;
-
-@property(atomic, readwrite) BOOL callbacksEnabled;
-@property(nonatomic, strong, readonly) FSTStream *stream;
-
-@end
-
-@implementation FSTCallbackFilter
-
-- (instancetype)initWithStream:(FSTStream *)stream {
- if (self = [super init]) {
- _callbacksEnabled = YES;
- _stream = stream;
- }
- return self;
-}
-
-- (void)suppressCallbacks {
- _callbacksEnabled = NO;
-}
-
-- (void)writeValue:(id)value {
- if (_callbacksEnabled) {
- [self.stream writeValue:value];
- }
-}
-
-- (void)writesFinishedWithError:(NSError *)errorOrNil {
- if (_callbacksEnabled) {
- [self.stream writesFinishedWithError:errorOrNil];
- }
-}
-
-@end
-
-#pragma mark - FSTStream
-
-@interface FSTStream ()
-
-@property(nonatomic, strong, readwrite) FSTCallbackFilter *callbackFilter;
-
-@end
-
-@implementation FSTStream
-
-/** The time a stream stays open after it is marked idle. */
-static const NSTimeInterval kIdleTimeout = 60.0;
-
-- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
- workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
- credentials:(id<FSTCredentialsProvider>)credentials
- responseMessageClass:(Class)responseMessageClass {
- if (self = [super init]) {
- _databaseInfo = database;
- _workerDispatchQueue = workerDispatchQueue;
- _credentials = credentials;
- _responseMessageClass = responseMessageClass;
-
- _backoff = [FSTExponentialBackoff exponentialBackoffWithDispatchQueue:workerDispatchQueue
- initialDelay:kBackoffInitialDelay
- backoffFactor:kBackoffFactor
- maxDelay:kBackoffMaxDelay];
- _state = FSTStreamStateInitial;
- }
- return self;
-}
-
-- (BOOL)isStarted {
- [self.workerDispatchQueue verifyIsCurrentQueue];
- FSTStreamState state = self.state;
- return state == FSTStreamStateBackoff || state == FSTStreamStateAuth ||
- state == FSTStreamStateOpen;
-}
-
-- (BOOL)isOpen {
- [self.workerDispatchQueue verifyIsCurrentQueue];
- return self.state == FSTStreamStateOpen;
-}
-
-- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
- @throw FSTAbstractMethodException(); // NOLINT
-}
-
-- (void)startWithDelegate:(id)delegate {
- [self.workerDispatchQueue verifyIsCurrentQueue];
-
- if (self.state == FSTStreamStateError) {
- [self performBackoffWithDelegate:delegate];
- return;
- }
-
- FSTLog(@"%@ %p start", NSStringFromClass([self class]), (__bridge void *)self);
- FSTAssert(self.state == FSTStreamStateInitial, @"Already started");
-
- self.state = FSTStreamStateAuth;
- FSTAssert(_delegate == nil, @"Delegate must be nil");
- _delegate = delegate;
-
- [self.credentials
- getTokenForcingRefresh:NO
- completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) {
- error = [FSTDatastore firestoreErrorForError:error];
- [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{
- [self resumeStartWithToken:result error:error];
- }];
- }];
-}
-
-/** Add an access token to our RPC, after obtaining one from the credentials provider. */
-- (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error {
- if (self.state == FSTStreamStateStopped) {
- // Streams can be stopped while waiting for authorization.
- return;
- }
-
- [self.workerDispatchQueue verifyIsCurrentQueue];
- FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
- (long)self.state);
-
- // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token,
- // but I'm not sure how to detect that right now. http://b/32762461
- if (error) {
- // RPC has not been started yet, so just invoke higher-level close handler.
- [self handleStreamClose:error];
- return;
- }
-
- self.requestsWriter = [[FSTBufferedWriter alloc] init];
- _rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
- [FSTDatastore prepareHeadersForRPC:_rpc
- databaseID:self.databaseInfo.databaseID
- token:token.token];
- FSTAssert(_callbackFilter == nil, @"GRX Filter must be nil");
- _callbackFilter = [[FSTCallbackFilter alloc] initWithStream:self];
- [_rpc startWithWriteable:_callbackFilter];
-
- self.state = FSTStreamStateOpen;
- [self notifyStreamOpen];
-}
-
-/** Backs off after an error. */
-- (void)performBackoffWithDelegate:(id)delegate {
- FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self);
- [self.workerDispatchQueue verifyIsCurrentQueue];
-
- FSTAssert(self.state == FSTStreamStateError, @"Should only perform backoff in an error case");
- self.state = FSTStreamStateBackoff;
-
- FSTWeakify(self);
- [self.backoff backoffAndRunBlock:^{
- FSTStrongify(self);
- [self resumeStartFromBackoffWithDelegate:delegate];
- }];
-}
-
-/** Resumes stream start after backing off. */
-- (void)resumeStartFromBackoffWithDelegate:(id)delegate {
- if (self.state == FSTStreamStateStopped) {
- // Streams can be stopped while waiting for backoff to complete.
- return;
- }
-
- // In order to have performed a backoff the stream must have been in an error state just prior
- // to entering the backoff state. If we weren't stopped we must be in the backoff state.
- FSTAssert(self.state == FSTStreamStateBackoff, @"State should still be backoff (was %ld)",
- (long)self.state);
-
- // Momentarily set state to FSTStreamStateInitial as `start` expects it.
- self.state = FSTStreamStateInitial;
- [self startWithDelegate:delegate];
- FSTAssert([self isStarted], @"Stream should have started.");
-}
-
-/**
- * Closes the stream and cleans up as necessary:
- *
- * * closes the underlying GRPC stream;
- * * calls the onClose handler with the given 'error';
- * * sets internal stream state to 'finalState';
- * * adjusts the backoff timer based on the error
- *
- * A new stream can be opened by calling `start` unless `finalState` is set to
- * `FSTStreamStateStopped`.
- *
- * @param finalState the intended state of the stream after closing.
- * @param error the NSError the connection was closed with.
- */
-- (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *)error {
- FSTAssert(finalState == FSTStreamStateError || error == nil,
- @"Can't provide an error when not in an error state.");
-
- [self.workerDispatchQueue verifyIsCurrentQueue];
- [self cancelIdleCheck];
-
- if (finalState != FSTStreamStateError) {
- // If this is an intentional close ensure we don't delay our next connection attempt.
- [self.backoff reset];
- } else if (error != nil && error.code == FIRFirestoreErrorCodeResourceExhausted) {
- FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class],
- (__bridge void *)self);
- [self.backoff resetToMax];
- }
-
- // This state must be assigned before calling `notifyStreamInterrupted` to allow the callback to
- // inhibit backoff or otherwise manipulate the state in its non-started state.
- self.state = finalState;
-
- if (self.requestsWriter) {
- // Clean up the underlying RPC. If this close: is in response to an error, don't attempt to
- // call half-close to avoid secondary failures.
- if (finalState != FSTStreamStateError) {
- FSTLog(@"%@ %p Closing stream client-side", [self class], (__bridge void *)self);
- @synchronized(self.requestsWriter) {
- [self.requestsWriter finishWithError:nil];
- }
- }
- _requestsWriter = nil;
- }
-
- [self.callbackFilter suppressCallbacks];
- _callbackFilter = nil;
-
- // Clean up remaining state.
- _messageReceived = NO;
- _rpc = nil;
-
- // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it
- // could trigger undesirable recovery logic, etc.).
- if (finalState != FSTStreamStateStopped) {
- [self notifyStreamInterruptedWithError:error];
- }
-
- // Clear the delegates to avoid any possible bleed through of events from GRPC.
- FSTAssert(_delegate,
- @"closeWithFinalState should only be called for a started stream that has an active "
- @"delegate.");
- _delegate = nil;
-}
-
-- (void)stop {
- FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self);
- if ([self isStarted]) {
- [self closeWithFinalState:FSTStreamStateStopped error:nil];
- }
-}
-
-- (void)inhibitBackoff {
- FSTAssert(![self isStarted], @"Can only inhibit backoff after an error (was %ld)",
- (long)self.state);
- [self.workerDispatchQueue verifyIsCurrentQueue];
-
- // Clear the error condition.
- self.state = FSTStreamStateInitial;
- [self.backoff reset];
-}
-
-/** Called by the idle timer when the stream should close due to inactivity. */
-- (void)handleIdleCloseTimer {
- [self.workerDispatchQueue verifyIsCurrentQueue];
- if (self.state == FSTStreamStateOpen && [self isIdle]) {
- // When timing out an idle stream there's no reason to force the stream into backoff when
- // it restarts so set the stream state to Initial instead of Error.
- [self closeWithFinalState:FSTStreamStateInitial error:nil];
- }
-}
-
-- (void)markIdle {
- [self.workerDispatchQueue verifyIsCurrentQueue];
- if (self.state == FSTStreamStateOpen) {
- self.idle = YES;
- [self.workerDispatchQueue dispatchAfterDelay:kIdleTimeout
- block:^() {
- [self handleIdleCloseTimer];
- }];
- }
-}
-
-- (void)cancelIdleCheck {
- [self.workerDispatchQueue verifyIsCurrentQueue];
- self.idle = NO;
-}
-
-/**
- * Parses a protocol buffer response from the server. If the message fails to parse, generates
- * an error and closes the stream.
- *
- * @param protoClass A protocol buffer message class object, that responds to parseFromData:error:.
- * @param data The bytes in the response as returned from GRPC.
- * @return An instance of the protocol buffer message, parsed from the data if parsing was
- * successful, or nil otherwise.
- */
-- (nullable id)parseProto:(Class)protoClass data:(NSData *)data error:(NSError **)error {
- NSError *parseError;
- id parsed = [protoClass parseFromData:data error:&parseError];
- if (parsed) {
- *error = nil;
- return parsed;
- } else {
- NSDictionary *info = @{
- NSLocalizedDescriptionKey : @"Unable to parse response from the server",
- NSUnderlyingErrorKey : parseError,
- @"Expected class" : protoClass,
- @"Received value" : data,
- };
- *error = [NSError errorWithDomain:FIRFirestoreErrorDomain
- code:FIRFirestoreErrorCodeInternal
- userInfo:info];
- return nil;
- }
-}
-
-/**
- * Writes a request proto into the stream.
- */
-- (void)writeRequest:(GPBMessage *)request {
- NSData *data = [request data];
-
- [self cancelIdleCheck];
-
- FSTBufferedWriter *requestsWriter = self.requestsWriter;
- @synchronized(requestsWriter) {
- [requestsWriter writeValue:data];
- }
-}
-
-#pragma mark Template methods for subclasses
-
-/**
- * Called by the stream after the stream has opened.
- *
- * Subclasses should relay to their stream-specific delegate. Calling [super notifyStreamOpen] is
- * not required.
- */
-- (void)notifyStreamOpen {
-}
-
-/**
- * Called by the stream after the stream has been unexpectedly interrupted, either due to an error
- * or due to idleness.
- *
- * Subclasses should relay to their stream-specific delegate. Calling [super
- * notifyStreamInterrupted] is not required.
- */
-- (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
-}
-
-/**
- * Called by the stream for each incoming protocol message coming from the server.
- *
- * Subclasses should implement this to deserialize the value and relay to their stream-specific
- * delegate, if appropriate. Calling [super handleStreamMessage] is not required.
- */
-- (void)handleStreamMessage:(id)value {
-}
-
-/**
- * Called by the stream when the underlying RPC has been closed for whatever reason.
- */
-- (void)handleStreamClose:(nullable NSError *)error {
- FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);
- FSTAssert([self isStarted], @"Can't handle server close in non-started state.");
-
- // In theory the stream could close cleanly, however, in our current model we never expect this
- // to happen because if we stop a stream ourselves, this callback will never be called. To
- // prevent cases where we retry without a backoff accidentally, we set the stream to error
- // in all cases.
- [self closeWithFinalState:FSTStreamStateError error:error];
-}
-
-#pragma mark GRXWriteable implementation
-// The GRXWriteable implementation defines the receive side of the RPC stream.
-
-/**
- * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately
- * redispatch back onto our own worker queue.
- */
-- (void)writeValue:(id)value __used {
- // TODO(mcg): remove the double-dispatch once GRPCCall at head is released.
- // Once released we can set the responseDispatchQueue property on the GRPCCall and then this
- // method can call handleStreamMessage directly.
- FSTWeakify(self);
- [self.workerDispatchQueue dispatchAsync:^{
- FSTStrongify(self);
- if (!self || self.state == FSTStreamStateStopped) {
- return;
- }
- if (!self.messageReceived) {
- self.messageReceived = YES;
- if ([FIRFirestore isLoggingEnabled]) {
- FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
- (__bridge void *)self,
- [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
- }
- }
- NSError *error;
- id proto = [self parseProto:self.responseMessageClass data:value error:&error];
- if (proto) {
- [self handleStreamMessage:proto];
- } else {
- [_rpc finishWithError:error];
- }
- }];
-}
-
-/**
- * Called by GRPC when it closed the stream with an error representing the final state of the
- * stream.
- *
- * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to
- * directly inform stream-specific logic, or call stop to tear down the stream.
- */
-- (void)writesFinishedWithError:(nullable NSError *)error __used {
- error = [FSTDatastore firestoreErrorForError:error];
- FSTWeakify(self);
- [self.workerDispatchQueue dispatchAsync:^{
- FSTStrongify(self);
- if (!self || self.state == FSTStreamStateStopped) {
- return;
- }
- [self handleStreamClose:error];
- }];
-}
-
-@end
-
-#pragma mark - FSTWatchStream
-
-@interface FSTWatchStream ()
-
-@property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
-
-@end
-
-@implementation FSTWatchStream
-
-- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
- workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
- credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer {
- self = [super initWithDatabase:database
- workerDispatchQueue:workerDispatchQueue
- credentials:credentials
- responseMessageClass:[GCFSListenResponse class]];
- if (self) {
- _serializer = serializer;
- }
- return self;
-}
-
-- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
- return [[GRPCCall alloc] initWithHost:self.databaseInfo.host
- path:@"/google.firestore.v1beta1.Firestore/Listen"
- requestsWriter:requestsWriter];
-}
-
-- (void)notifyStreamOpen {
- [self.delegate watchStreamDidOpen];
-}
-
-- (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
- [self.delegate watchStreamWasInterruptedWithError:error];
-}
-
-- (void)watchQuery:(FSTQueryData *)query {
- FSTAssert([self isOpen], @"Not yet open");
- [self.workerDispatchQueue verifyIsCurrentQueue];
-
- GCFSListenRequest *request = [GCFSListenRequest message];
- request.database = [_serializer encodedDatabaseID];
- request.addTarget = [_serializer encodedTarget:query];
- request.labels = [_serializer encodedListenRequestLabelsForQueryData:query];
-
- FSTLog(@"FSTWatchStream %p watch: %@", (__bridge void *)self, request);
- [self writeRequest:request];
-}
-
-- (void)unwatchTargetID:(FSTTargetID)targetID {
- FSTAssert([self isOpen], @"Not yet open");
- [self.workerDispatchQueue verifyIsCurrentQueue];
-
- GCFSListenRequest *request = [GCFSListenRequest message];
- request.database = [_serializer encodedDatabaseID];
- request.removeTarget = targetID;
-
- FSTLog(@"FSTWatchStream %p unwatch: %@", (__bridge void *)self, request);
- [self writeRequest:request];
-}
-
-/**
- * Receives an inbound message from GRPC, deserializes, and then passes that on to the delegate's
- * watchStreamDidChange:snapshotVersion: callback.
- */
-- (void)handleStreamMessage:(GCFSListenResponse *)proto {
- FSTLog(@"FSTWatchStream %p response: %@", (__bridge void *)self, proto);
- [self.workerDispatchQueue verifyIsCurrentQueue];
-
- // A successful response means the stream is healthy.
- [self.backoff reset];
-
- FSTWatchChange *change = [_serializer decodedWatchChange:proto];
- FSTSnapshotVersion *snap = [_serializer versionFromListenResponse:proto];
- [self.delegate watchStreamDidChange:change snapshotVersion:snap];
-}
-
-@end
-
-#pragma mark - FSTWriteStream
-
-@interface FSTWriteStream ()
-
-@property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
-
-@end
-
-@implementation FSTWriteStream
-
-- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
- workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
- credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer {
- self = [super initWithDatabase:database
- workerDispatchQueue:workerDispatchQueue
- credentials:credentials
- responseMessageClass:[GCFSWriteResponse class]];
- if (self) {
- _serializer = serializer;
- }
- return self;
-}
-
-- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
- return [[GRPCCall alloc] initWithHost:self.databaseInfo.host
- path:@"/google.firestore.v1beta1.Firestore/Write"
- requestsWriter:requestsWriter];
-}
-
-- (void)startWithDelegate:(id)delegate {
- self.handshakeComplete = NO;
- [super startWithDelegate:delegate];
-}
-
-- (void)notifyStreamOpen {
- [self.delegate writeStreamDidOpen];
-}
-
-- (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
- [self.delegate writeStreamWasInterruptedWithError:error];
-}
-
-- (void)writeHandshake {
- // The initial request cannot contain mutations, but must contain a projectID.
- FSTAssert([self isOpen], @"Not yet open");
- FSTAssert(!self.handshakeComplete, @"Handshake sent out of turn");
- [self.workerDispatchQueue verifyIsCurrentQueue];
-
- GCFSWriteRequest *request = [GCFSWriteRequest message];
- request.database = [_serializer encodedDatabaseID];
- // TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the
- // handshake, ignoring any stream token we might have.
-
- FSTLog(@"FSTWriteStream %p initial request: %@", (__bridge void *)self, request);
- [self writeRequest:request];
-}
-
-- (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
- FSTAssert([self isOpen], @"Not yet open");
- FSTAssert(self.handshakeComplete, @"Mutations sent out of turn");
- [self.workerDispatchQueue verifyIsCurrentQueue];
-
- NSMutableArray<GCFSWrite *> *protos = [NSMutableArray arrayWithCapacity:mutations.count];
- for (FSTMutation *mutation in mutations) {
- [protos addObject:[_serializer encodedMutation:mutation]];
- };
-
- GCFSWriteRequest *request = [GCFSWriteRequest message];
- request.writesArray = protos;
- request.streamToken = self.lastStreamToken;
-
- FSTLog(@"FSTWriteStream %p mutation request: %@", (__bridge void *)self, request);
- [self writeRequest:request];
-}
-
-/**
- * Implements GRXWriteable to receive an inbound message from GRPC, deserialize, and then pass
- * that on to the mutationResultsHandler.
- */
-- (void)handleStreamMessage:(GCFSWriteResponse *)response {
- FSTLog(@"FSTWriteStream %p response: %@", (__bridge void *)self, response);
- [self.workerDispatchQueue verifyIsCurrentQueue];
-
- // Always capture the last stream token.
- self.lastStreamToken = response.streamToken;
-
- if (!self.isHandshakeComplete) {
- // The first response is the handshake response
- self.handshakeComplete = YES;
-
- [self.delegate writeStreamDidCompleteHandshake];
- } else {
- // A successful first write response means the stream is healthy.
- // Note that we could consider a successful handshake healthy, however, the write itself
- // might be causing an error we want to back off from.
- [self.backoff reset];
-
- FSTSnapshotVersion *commitVersion = [_serializer decodedVersion:response.commitTime];
- NSMutableArray<GCFSWriteResult *> *protos = response.writeResultsArray;
- NSMutableArray<FSTMutationResult *> *results = [NSMutableArray arrayWithCapacity:protos.count];
- for (GCFSWriteResult *proto in protos) {
- [results addObject:[_serializer decodedMutationResult:proto]];
- };
-
- [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results];
- }
-}
-
-@end
-
NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTRemoteStore.m b/Firestore/Source/Remote/FSTRemoteStore.m
index 0bb37cc..45a8aca 100644
--- a/Firestore/Source/Remote/FSTRemoteStore.m
+++ b/Firestore/Source/Remote/FSTRemoteStore.m
@@ -29,6 +29,7 @@
#import "FSTQueryData.h"
#import "FSTRemoteEvent.h"
#import "FSTSnapshotVersion.h"
+#import "FSTStream.h"
#import "FSTTransaction.h"
#import "FSTWatchChange.h"
diff --git a/Firestore/Source/Remote/FSTStream.h b/Firestore/Source/Remote/FSTStream.h
new file mode 100644
index 0000000..c3f1c98
--- /dev/null
+++ b/Firestore/Source/Remote/FSTStream.h
@@ -0,0 +1,312 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import <Foundation/Foundation.h>
+
+#import "FSTTypes.h"
+
+@class FSTDatabaseInfo;
+@class FSTDocumentKey;
+@class FSTDispatchQueue;
+@class FSTMutation;
+@class FSTMutationResult;
+@class FSTQueryData;
+@class FSTSerializerBeta;
+@class FSTSnapshotVersion;
+@class FSTWatchChange;
+@class FSTWatchStream;
+@class FSTWriteStream;
+@class GRPCCall;
+@class GRXWriter;
+
+@protocol FSTCredentialsProvider;
+@protocol FSTWatchStreamDelegate;
+@protocol FSTWriteStreamDelegate;
+
+NS_ASSUME_NONNULL_BEGIN
+
+/**
+ * An FSTStream is an abstract base class that represents a restartable streaming RPC to the
+ * Firestore backend. It's built on top of GRPC's own support for streaming RPCs, and adds several
+ * critical features for our clients:
+ *
+ * - Restarting a stream is allowed (after failure)
+ * - Exponential backoff on failure (independent of the underlying channel)
+ * - Authentication via FSTCredentialsProvider
+ * - Dispatching all callbacks into the shared worker queue
+ *
+ * Subclasses of FSTStream implement serialization of models to and from bytes (via protocol
+ * buffers) for a specific streaming RPC and emit events specific to the stream.
+ *
+ * ## Starting and Stopping
+ *
+ * Streaming RPCs are stateful and need to be started before messages can be sent and received.
+ * The FSTStream will call its delegate's specific streamDidOpen method once the stream is ready
+ * to accept requests.
+ *
+ * Should a `start` fail, FSTStream will call its delegate's specific streamDidClose method with an
+ * NSError indicating what went wrong. The delegate is free to call start again.
+ *
+ * An FSTStream can also be explicitly stopped which indicates that the caller has discarded the
+ * stream and no further events should be emitted. Once explicitly stopped, a stream cannot be
+ * restarted.
+ *
+ * ## Subclassing Notes
+ *
+ * An implementation of FSTStream needs to implement the following methods:
+ * - `createRPCWithRequestsWriter`, should create the specific RPC (a GRPCCall object).
+ * - `handleStreamMessage`, receives protocol buffer responses from GRPC and must deserialize and
+ * delegate to some stream specific response method.
+ * - `notifyStreamOpen`, should call through to the stream-specific streamDidOpen method.
+ * - `notifyStreamInterrupted`, calls through to the stream-specific streamWasInterrupted method.
+ *
+ * Additionally, beyond these required methods, subclasses will want to implement methods that
+ * take request models, serialize them, and write them to using writeRequest:.
+ *
+ * ## RPC Message Type
+ *
+ * FSTStream intentionally uses the GRPCCall interface to GRPC directly, bypassing both GRPCProtoRPC
+ * and GRXBufferedPipe for sending data. This has been done to avoid race conditions that come out
+ * of a loosely specified locking contract on GRXWriter. There's essentially no way to safely use
+ * any of the wrapper objects for GRXWriter (that perform buffering or conversion to/from protos).
+ *
+ * See https://github.com/grpc/grpc/issues/10957 for the kinds of things we're trying to avoid.
+ */
+@interface FSTStream <__covariant FSTStreamDelegate> : NSObject
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass NS_DESIGNATED_INITIALIZER;
+
+- (instancetype)init NS_UNAVAILABLE;
+
+/**
+ * An abstract method used by `start` to create a streaming RPC specific to this type of stream.
+ * The RPC should be created such that requests are taken from `self`.
+ *
+ * Note that the returned GRPCCall must not be a GRPCProtoRPC, since the rest of the streaming
+ * mechanism assumes it is dealing in bytes-level requests and responses.
+ */
+- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter;
+
+/**
+ * Returns YES if `start` has been called and no error has occurred. YES indicates the stream is
+ * open or in the process of opening (which encompasses respecting backoff, getting auth tokens,
+ * and starting the actual RPC). Use `isOpen` to determine if the stream is open and ready for
+ * outbound requests.
+ */
+- (BOOL)isStarted;
+
+/** Returns YES if the underlying RPC is open and the stream is ready for outbound requests. */
+- (BOOL)isOpen;
+
+/**
+ * Starts the RPC. Only allowed if isStarted returns NO. The stream is not immediately ready for
+ * use: the delegate's watchStreamDidOpen method will be invoked when the RPC is ready for outbound
+ * requests, at which point `isOpen` will return YES.
+ *
+ * When start returns, -isStarted will return YES.
+ */
+- (void)startWithDelegate:(id)delegate;
+
+/**
+ * Stops the RPC. This call is idempotent and allowed regardless of the current isStarted state.
+ *
+ * Unlike a transient stream close, stopping a stream is permanent. This is guaranteed NOT to emit
+ * any further events on the stream-specific delegate, including the streamDidClose method.
+ *
+ * NOTE: This no-events contract may seem counter-intuitive but allows the caller to
+ * straightforwardly sequence stream tear-down without having to worry about when the delegate's
+ * streamDidClose methods will get called. For example if the stream must be exchanged for another
+ * during a user change this allows `stop` to be called eagerly without worrying about the
+ * streamDidClose method accidentally restarting the stream before the new one is ready.
+ *
+ * When stop returns, -isStarted and -isOpen will both return NO.
+ */
+- (void)stop;
+
+/**
+ * Initializes the idle timer. If no write takes place within one minute, the GRPC stream will be
+ * closed.
+ */
+- (void)markIdle;
+
+/**
+ * After an error the stream will usually back off on the next attempt to start it. If the error
+ * warrants an immediate restart of the stream, the sender can use this to indicate that the
+ * receiver should not back off.
+ *
+ * Each error will call the stream-specific streamDidClose method. That method can decide to
+ * inhibit backoff if required.
+ */
+- (void)inhibitBackoff;
+
+@end
+
+#pragma mark - FSTWatchStream
+
+/** A protocol defining the events that can be emitted by the FSTWatchStream. */
+@protocol FSTWatchStreamDelegate <NSObject>
+
+/** Called by the FSTWatchStream when it is ready to accept outbound request messages. */
+- (void)watchStreamDidOpen;
+
+/**
+ * Called by the FSTWatchStream with changes and the snapshot versions included in in the
+ * WatchChange responses sent back by the server.
+ */
+- (void)watchStreamDidChange:(FSTWatchChange *)change
+ snapshotVersion:(FSTSnapshotVersion *)snapshotVersion;
+
+/**
+ * Called by the FSTWatchStream when the underlying streaming RPC is interrupted for whatever
+ * reason, usually because of an error, but possibly due to an idle timeout. The error passed to
+ * this method may be nil, in which case the stream was closed without attributable fault.
+ *
+ * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping"
+ * on FSTStream for details.
+ */
+- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error;
+
+@end
+
+/**
+ * An FSTStream that implements the StreamingWatch RPC.
+ *
+ * Once the FSTWatchStream has called the streamDidOpen method, any number of watchQuery and
+ * unwatchTargetId calls can be sent to control what changes will be sent from the server for
+ * WatchChanges.
+ */
+@interface FSTWatchStream : FSTStream
+
+/**
+ * Initializes the watch stream with its dependencies.
+ */
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
+
+- (instancetype)init NS_UNAVAILABLE;
+
+/**
+ * Registers interest in the results of the given query. If the query includes a resumeToken it
+ * will be included in the request. Results that affect the query will be streamed back as
+ * WatchChange messages that reference the targetID included in |query|.
+ */
+- (void)watchQuery:(FSTQueryData *)query;
+
+/** Unregisters interest in the results of the query associated with the given target ID. */
+- (void)unwatchTargetID:(FSTTargetID)targetID;
+
+@end
+
+#pragma mark - FSTWriteStream
+
+@protocol FSTWriteStreamDelegate <NSObject>
+
+/** Called by the FSTWriteStream when it is ready to accept outbound request messages. */
+- (void)writeStreamDidOpen;
+
+/**
+ * Called by the FSTWriteStream upon a successful handshake response from the server, which is the
+ * receiver's cue to send any pending writes.
+ */
+- (void)writeStreamDidCompleteHandshake;
+
+/**
+ * Called by the FSTWriteStream upon receiving a StreamingWriteResponse from the server that
+ * contains mutation results.
+ */
+- (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion
+ mutationResults:(NSArray<FSTMutationResult *> *)results;
+
+/**
+ * Called when the FSTWriteStream's underlying RPC is interrupted for whatever reason, usually
+ * because of an error, but possibly due to an idle timeout. The error passed to this method may be
+ * nil, in which case the stream was closed without attributable fault.
+ *
+ * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping"
+ * on FSTStream for details.
+ */
+- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error;
+
+@end
+
+/**
+ * An FSTStream that implements the StreamingWrite RPC.
+ *
+ * The StreamingWrite RPC requires the caller to maintain special `streamToken` state in between
+ * calls, to help the server understand which responses the client has processed by the time the
+ * next request is made. Every response may contain a `streamToken`; this value must be passed to
+ * the next request.
+ *
+ * After calling `start` on this stream, the next request must be a handshake, containing whatever
+ * streamToken is on hand. Once a response to this request is received, all pending mutations may
+ * be submitted. When submitting multiple batches of mutations at the same time, it's okay to use
+ * the same streamToken for the calls to `writeMutations:`.
+ */
+@interface FSTWriteStream : FSTStream
+
+/**
+ * Initializes the write stream with its dependencies.
+ */
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ serializer:(FSTSerializerBeta *)serializer;
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
+
+- (instancetype)init NS_UNAVAILABLE;
+
+/**
+ * Sends an initial streamToken to the server, performing the handshake required to make the
+ * StreamingWrite RPC work. Subsequent `writeMutations:` calls should wait until a response has
+ * been delivered to the delegate's writeStreamDidCompleteHandshake method.
+ */
+- (void)writeHandshake;
+
+/** Sends a group of mutations to the Firestore backend to apply. */
+- (void)writeMutations:(NSArray<FSTMutation *> *)mutations;
+
+/**
+ * Tracks whether or not a handshake has been successfully exchanged and the stream is ready to
+ * accept mutations.
+ */
+@property(nonatomic, assign, readwrite, getter=isHandshakeComplete) BOOL handshakeComplete;
+
+/**
+ * The last received stream token from the server, used to acknowledge which responses the client
+ * has processed. Stream tokens are opaque checkpoint markers whose only real value is their
+ * inclusion in the next request.
+ *
+ * FSTWriteStream manages propagating this value from responses to the next request.
+ */
+@property(nonatomic, strong, nullable) NSData *lastStreamToken;
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTStream.m b/Firestore/Source/Remote/FSTStream.m
new file mode 100644
index 0000000..5a6f982
--- /dev/null
+++ b/Firestore/Source/Remote/FSTStream.m
@@ -0,0 +1,764 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import "FSTDatastore.h"
+
+#import <GRPCClient/GRPCCall+OAuth2.h>
+#import <GRPCClient/GRPCCall.h>
+
+#import "FIRFirestore+Internal.h"
+#import "FIRFirestoreErrors.h"
+#import "FSTAssert.h"
+#import "FSTBufferedWriter.h"
+#import "FSTClasses.h"
+#import "FSTCredentialsProvider.h"
+#import "FSTDatabaseID.h"
+#import "FSTDatabaseInfo.h"
+#import "FSTDispatchQueue.h"
+#import "FSTExponentialBackoff.h"
+#import "FSTLogger.h"
+#import "FSTMutation.h"
+#import "FSTQueryData.h"
+#import "FSTSerializerBeta.h"
+#import "FSTStream.h"
+
+#import "Firestore.pbrpc.h"
+
+/**
+ * Initial backoff time in seconds after an error.
+ * Set to 1s according to https://cloud.google.com/apis/design/errors.
+ */
+static const NSTimeInterval kBackoffInitialDelay = 1;
+static const NSTimeInterval kBackoffMaxDelay = 60.0;
+static const double kBackoffFactor = 1.5;
+
+#pragma mark - FSTStream
+
+/** The state of a stream. */
+typedef NS_ENUM(NSInteger, FSTStreamState) {
+ /**
+ * The streaming RPC is not running and there's no error condition. Calling `start` will
+ * start the stream immediately without backoff. While in this state -isStarted will return NO.
+ */
+ FSTStreamStateInitial = 0,
+
+ /**
+ * The stream is starting, and is waiting for an auth token to attach to the initial request.
+ * While in this state, isStarted will return YES but isOpen will return NO.
+ */
+ FSTStreamStateAuth,
+
+ /**
+ * The streaming RPC is up and running. Requests and responses can flow freely. Both
+ * isStarted and isOpen will return YES.
+ */
+ FSTStreamStateOpen,
+
+ /**
+ * The stream encountered an error. The next start attempt will back off. While in this state
+ * -isStarted will return NO.
+ */
+ FSTStreamStateError,
+
+ /**
+ * An in-between state after an error where the stream is waiting before re-starting. After
+ * waiting is complete, the stream will try to open. While in this state -isStarted will
+ * return YES but isOpen will return NO.
+ */
+ FSTStreamStateBackoff,
+
+ /**
+ * The stream has been explicitly stopped; no further events will be emitted.
+ */
+ FSTStreamStateStopped,
+};
+
+// We need to declare these classes first so that Datastore can alloc them.
+
+@interface FSTWatchStream ()
+
+/**
+ * Initializes the watch stream with its dependencies.
+ */
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
+
+@end
+
+@interface FSTStream ()
+
+@property(nonatomic, getter=isIdle) BOOL idle;
+@property(nonatomic, weak, readwrite, nullable) id delegate;
+
+@end
+
+@interface FSTStream () <GRXWriteable>
+
+@property(nonatomic, strong, readonly) FSTDatabaseInfo *databaseInfo;
+@property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue;
+@property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials;
+@property(nonatomic, unsafe_unretained, readonly) Class responseMessageClass;
+@property(nonatomic, strong, readonly) FSTExponentialBackoff *backoff;
+
+/** A flag tracking whether the stream received a message from the backend. */
+@property(nonatomic, assign) BOOL messageReceived;
+
+/**
+ * Stream state as exposed to consumers of FSTStream. This differs from GRXWriter's notion of the
+ * state of the stream.
+ */
+@property(nonatomic, assign) FSTStreamState state;
+
+/** The RPC handle. Used for cancellation. */
+@property(nonatomic, strong, nullable) GRPCCall *rpc;
+
+/**
+ * The send-side of the RPC stream in which to submit requests, but only once the underlying RPC has
+ * started.
+ */
+@property(nonatomic, strong, nullable) FSTBufferedWriter *requestsWriter;
+
+@end
+
+#pragma mark - FSTCallbackFilter
+
+/** Filter class that allows disabling of GRPC callbacks. */
+@interface FSTCallbackFilter : NSObject <GRXWriteable>
+
+- (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER;
+- (instancetype)init NS_UNAVAILABLE;
+
+@property(atomic, readwrite) BOOL callbacksEnabled;
+@property(nonatomic, strong, readonly) FSTStream *stream;
+
+@end
+
+@implementation FSTCallbackFilter
+
+- (instancetype)initWithStream:(FSTStream *)stream {
+ if (self = [super init]) {
+ _callbacksEnabled = YES;
+ _stream = stream;
+ }
+ return self;
+}
+
+- (void)suppressCallbacks {
+ _callbacksEnabled = NO;
+}
+
+- (void)writeValue:(id)value {
+ if (_callbacksEnabled) {
+ [self.stream writeValue:value];
+ }
+}
+
+- (void)writesFinishedWithError:(NSError *)errorOrNil {
+ if (_callbacksEnabled) {
+ [self.stream writesFinishedWithError:errorOrNil];
+ }
+}
+
+@end
+
+#pragma mark - FSTStream
+
+@interface FSTStream ()
+
+@property(nonatomic, strong, readwrite) FSTCallbackFilter *callbackFilter;
+
+@end
+
+@implementation FSTStream
+
+/** The time a stream stays open after it is marked idle. */
+static const NSTimeInterval kIdleTimeout = 60.0;
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass {
+ if (self = [super init]) {
+ _databaseInfo = database;
+ _workerDispatchQueue = workerDispatchQueue;
+ _credentials = credentials;
+ _responseMessageClass = responseMessageClass;
+
+ _backoff = [FSTExponentialBackoff exponentialBackoffWithDispatchQueue:workerDispatchQueue
+ initialDelay:kBackoffInitialDelay
+ backoffFactor:kBackoffFactor
+ maxDelay:kBackoffMaxDelay];
+ _state = FSTStreamStateInitial;
+ }
+ return self;
+}
+
+- (BOOL)isStarted {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ FSTStreamState state = self.state;
+ return state == FSTStreamStateBackoff || state == FSTStreamStateAuth ||
+ state == FSTStreamStateOpen;
+}
+
+- (BOOL)isOpen {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ return self.state == FSTStreamStateOpen;
+}
+
+- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
+ @throw FSTAbstractMethodException(); // NOLINT
+}
+
+- (void)startWithDelegate:(id)delegate {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ if (self.state == FSTStreamStateError) {
+ [self performBackoffWithDelegate:delegate];
+ return;
+ }
+
+ FSTLog(@"%@ %p start", NSStringFromClass([self class]), (__bridge void *)self);
+ FSTAssert(self.state == FSTStreamStateInitial, @"Already started");
+
+ self.state = FSTStreamStateAuth;
+ FSTAssert(_delegate == nil, @"Delegate must be nil");
+ _delegate = delegate;
+
+ [self.credentials
+ getTokenForcingRefresh:NO
+ completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) {
+ error = [FSTDatastore firestoreErrorForError:error];
+ [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{
+ [self resumeStartWithToken:result error:error];
+ }];
+ }];
+}
+
+/** Add an access token to our RPC, after obtaining one from the credentials provider. */
+- (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error {
+ if (self.state == FSTStreamStateStopped) {
+ // Streams can be stopped while waiting for authorization.
+ return;
+ }
+
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
+ (long)self.state);
+
+ // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token,
+ // but I'm not sure how to detect that right now. http://b/32762461
+ if (error) {
+ // RPC has not been started yet, so just invoke higher-level close handler.
+ [self handleStreamClose:error];
+ return;
+ }
+
+ self.requestsWriter = [[FSTBufferedWriter alloc] init];
+ _rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
+ [FSTDatastore prepareHeadersForRPC:_rpc
+ databaseID:self.databaseInfo.databaseID
+ token:token.token];
+ FSTAssert(_callbackFilter == nil, @"GRX Filter must be nil");
+ _callbackFilter = [[FSTCallbackFilter alloc] initWithStream:self];
+ [_rpc startWithWriteable:_callbackFilter];
+
+ self.state = FSTStreamStateOpen;
+ [self notifyStreamOpen];
+}
+
+/** Backs off after an error. */
+- (void)performBackoffWithDelegate:(id)delegate {
+ FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self);
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ FSTAssert(self.state == FSTStreamStateError, @"Should only perform backoff in an error case");
+ self.state = FSTStreamStateBackoff;
+
+ FSTWeakify(self);
+ [self.backoff backoffAndRunBlock:^{
+ FSTStrongify(self);
+ [self resumeStartFromBackoffWithDelegate:delegate];
+ }];
+}
+
+/** Resumes stream start after backing off. */
+- (void)resumeStartFromBackoffWithDelegate:(id)delegate {
+ if (self.state == FSTStreamStateStopped) {
+ // Streams can be stopped while waiting for backoff to complete.
+ return;
+ }
+
+ // In order to have performed a backoff the stream must have been in an error state just prior
+ // to entering the backoff state. If we weren't stopped we must be in the backoff state.
+ FSTAssert(self.state == FSTStreamStateBackoff, @"State should still be backoff (was %ld)",
+ (long)self.state);
+
+ // Momentarily set state to FSTStreamStateInitial as `start` expects it.
+ self.state = FSTStreamStateInitial;
+ [self startWithDelegate:delegate];
+ FSTAssert([self isStarted], @"Stream should have started.");
+}
+
+/**
+ * Closes the stream and cleans up as necessary:
+ *
+ * * closes the underlying GRPC stream;
+ * * calls the onClose handler with the given 'error';
+ * * sets internal stream state to 'finalState';
+ * * adjusts the backoff timer based on the error
+ *
+ * A new stream can be opened by calling `start` unless `finalState` is set to
+ * `FSTStreamStateStopped`.
+ *
+ * @param finalState the intended state of the stream after closing.
+ * @param error the NSError the connection was closed with.
+ */
+- (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *)error {
+ FSTAssert(finalState == FSTStreamStateError || error == nil,
+ @"Can't provide an error when not in an error state.");
+
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ [self cancelIdleCheck];
+
+ if (finalState != FSTStreamStateError) {
+ // If this is an intentional close ensure we don't delay our next connection attempt.
+ [self.backoff reset];
+ } else if (error != nil && error.code == FIRFirestoreErrorCodeResourceExhausted) {
+ FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class],
+ (__bridge void *)self);
+ [self.backoff resetToMax];
+ }
+
+ // This state must be assigned before calling `notifyStreamInterrupted` to allow the callback to
+ // inhibit backoff or otherwise manipulate the state in its non-started state.
+ self.state = finalState;
+
+ if (self.requestsWriter) {
+ // Clean up the underlying RPC. If this close: is in response to an error, don't attempt to
+ // call half-close to avoid secondary failures.
+ if (finalState != FSTStreamStateError) {
+ FSTLog(@"%@ %p Closing stream client-side", [self class], (__bridge void *)self);
+ @synchronized(self.requestsWriter) {
+ [self.requestsWriter finishWithError:nil];
+ }
+ }
+ _requestsWriter = nil;
+ }
+
+ [self.callbackFilter suppressCallbacks];
+ _callbackFilter = nil;
+
+ // Clean up remaining state.
+ _messageReceived = NO;
+ _rpc = nil;
+
+ // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it
+ // could trigger undesirable recovery logic, etc.).
+ if (finalState != FSTStreamStateStopped) {
+ [self notifyStreamInterruptedWithError:error];
+ }
+
+ // Clear the delegates to avoid any possible bleed through of events from GRPC.
+ FSTAssert(_delegate,
+ @"closeWithFinalState should only be called for a started stream that has an active "
+ @"delegate.");
+ _delegate = nil;
+}
+
+- (void)stop {
+ FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self);
+ if ([self isStarted]) {
+ [self closeWithFinalState:FSTStreamStateStopped error:nil];
+ }
+}
+
+- (void)inhibitBackoff {
+ FSTAssert(![self isStarted], @"Can only inhibit backoff after an error (was %ld)",
+ (long)self.state);
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ // Clear the error condition.
+ self.state = FSTStreamStateInitial;
+ [self.backoff reset];
+}
+
+/** Called by the idle timer when the stream should close due to inactivity. */
+- (void)handleIdleCloseTimer {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ if (self.state == FSTStreamStateOpen && [self isIdle]) {
+ // When timing out an idle stream there's no reason to force the stream into backoff when
+ // it restarts so set the stream state to Initial instead of Error.
+ [self closeWithFinalState:FSTStreamStateInitial error:nil];
+ }
+}
+
+- (void)markIdle {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ if (self.state == FSTStreamStateOpen) {
+ self.idle = YES;
+ [self.workerDispatchQueue dispatchAfterDelay:kIdleTimeout
+ block:^() {
+ [self handleIdleCloseTimer];
+ }];
+ }
+}
+
+- (void)cancelIdleCheck {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ self.idle = NO;
+}
+
+/**
+ * Parses a protocol buffer response from the server. If the message fails to parse, generates
+ * an error and closes the stream.
+ *
+ * @param protoClass A protocol buffer message class object, that responds to parseFromData:error:.
+ * @param data The bytes in the response as returned from GRPC.
+ * @return An instance of the protocol buffer message, parsed from the data if parsing was
+ * successful, or nil otherwise.
+ */
+- (nullable id)parseProto:(Class)protoClass data:(NSData *)data error:(NSError **)error {
+ NSError *parseError;
+ id parsed = [protoClass parseFromData:data error:&parseError];
+ if (parsed) {
+ *error = nil;
+ return parsed;
+ } else {
+ NSDictionary *info = @{
+ NSLocalizedDescriptionKey : @"Unable to parse response from the server",
+ NSUnderlyingErrorKey : parseError,
+ @"Expected class" : protoClass,
+ @"Received value" : data,
+ };
+ *error = [NSError errorWithDomain:FIRFirestoreErrorDomain
+ code:FIRFirestoreErrorCodeInternal
+ userInfo:info];
+ return nil;
+ }
+}
+
+/**
+ * Writes a request proto into the stream.
+ */
+- (void)writeRequest:(GPBMessage *)request {
+ NSData *data = [request data];
+
+ [self cancelIdleCheck];
+
+ FSTBufferedWriter *requestsWriter = self.requestsWriter;
+ @synchronized(requestsWriter) {
+ [requestsWriter writeValue:data];
+ }
+}
+
+#pragma mark Template methods for subclasses
+
+/**
+ * Called by the stream after the stream has opened.
+ *
+ * Subclasses should relay to their stream-specific delegate. Calling [super notifyStreamOpen] is
+ * not required.
+ */
+- (void)notifyStreamOpen {
+}
+
+/**
+ * Called by the stream after the stream has been unexpectedly interrupted, either due to an error
+ * or due to idleness.
+ *
+ * Subclasses should relay to their stream-specific delegate. Calling [super
+ * notifyStreamInterrupted] is not required.
+ */
+- (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
+}
+
+/**
+ * Called by the stream for each incoming protocol message coming from the server.
+ *
+ * Subclasses should implement this to deserialize the value and relay to their stream-specific
+ * delegate, if appropriate. Calling [super handleStreamMessage] is not required.
+ */
+- (void)handleStreamMessage:(id)value {
+}
+
+/**
+ * Called by the stream when the underlying RPC has been closed for whatever reason.
+ */
+- (void)handleStreamClose:(nullable NSError *)error {
+ FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);
+ FSTAssert([self isStarted], @"Can't handle server close in non-started state.");
+
+ // In theory the stream could close cleanly, however, in our current model we never expect this
+ // to happen because if we stop a stream ourselves, this callback will never be called. To
+ // prevent cases where we retry without a backoff accidentally, we set the stream to error
+ // in all cases.
+ [self closeWithFinalState:FSTStreamStateError error:error];
+}
+
+#pragma mark GRXWriteable implementation
+// The GRXWriteable implementation defines the receive side of the RPC stream.
+
+/**
+ * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately
+ * redispatch back onto our own worker queue.
+ */
+- (void)writeValue:(id)value __used {
+ // TODO(mcg): remove the double-dispatch once GRPCCall at head is released.
+ // Once released we can set the responseDispatchQueue property on the GRPCCall and then this
+ // method can call handleStreamMessage directly.
+ FSTWeakify(self);
+ [self.workerDispatchQueue dispatchAsync:^{
+ FSTStrongify(self);
+ if (!self || self.state == FSTStreamStateStopped) {
+ return;
+ }
+ if (!self.messageReceived) {
+ self.messageReceived = YES;
+ if ([FIRFirestore isLoggingEnabled]) {
+ FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
+ (__bridge void *)self,
+ [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
+ }
+ }
+ NSError *error;
+ id proto = [self parseProto:self.responseMessageClass data:value error:&error];
+ if (proto) {
+ [self handleStreamMessage:proto];
+ } else {
+ [_rpc finishWithError:error];
+ }
+ }];
+}
+
+/**
+ * Called by GRPC when it closed the stream with an error representing the final state of the
+ * stream.
+ *
+ * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to
+ * directly inform stream-specific logic, or call stop to tear down the stream.
+ */
+- (void)writesFinishedWithError:(nullable NSError *)error __used {
+ error = [FSTDatastore firestoreErrorForError:error];
+ FSTWeakify(self);
+ [self.workerDispatchQueue dispatchAsync:^{
+ FSTStrongify(self);
+ if (!self || self.state == FSTStreamStateStopped) {
+ return;
+ }
+ [self handleStreamClose:error];
+ }];
+}
+
+@end
+
+#pragma mark - FSTWatchStream
+
+@interface FSTWatchStream ()
+
+@property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
+
+@end
+
+@implementation FSTWatchStream
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ serializer:(FSTSerializerBeta *)serializer {
+ self = [super initWithDatabase:database
+ workerDispatchQueue:workerDispatchQueue
+ credentials:credentials
+ responseMessageClass:[GCFSListenResponse class]];
+ if (self) {
+ _serializer = serializer;
+ }
+ return self;
+}
+
+- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
+ return [[GRPCCall alloc] initWithHost:self.databaseInfo.host
+ path:@"/google.firestore.v1beta1.Firestore/Listen"
+ requestsWriter:requestsWriter];
+}
+
+- (void)notifyStreamOpen {
+ [self.delegate watchStreamDidOpen];
+}
+
+- (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
+ [self.delegate watchStreamWasInterruptedWithError:error];
+}
+
+- (void)watchQuery:(FSTQueryData *)query {
+ FSTAssert([self isOpen], @"Not yet open");
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ GCFSListenRequest *request = [GCFSListenRequest message];
+ request.database = [_serializer encodedDatabaseID];
+ request.addTarget = [_serializer encodedTarget:query];
+ request.labels = [_serializer encodedListenRequestLabelsForQueryData:query];
+
+ FSTLog(@"FSTWatchStream %p watch: %@", (__bridge void *)self, request);
+ [self writeRequest:request];
+}
+
+- (void)unwatchTargetID:(FSTTargetID)targetID {
+ FSTAssert([self isOpen], @"Not yet open");
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ GCFSListenRequest *request = [GCFSListenRequest message];
+ request.database = [_serializer encodedDatabaseID];
+ request.removeTarget = targetID;
+
+ FSTLog(@"FSTWatchStream %p unwatch: %@", (__bridge void *)self, request);
+ [self writeRequest:request];
+}
+
+/**
+ * Receives an inbound message from GRPC, deserializes, and then passes that on to the delegate's
+ * watchStreamDidChange:snapshotVersion: callback.
+ */
+- (void)handleStreamMessage:(GCFSListenResponse *)proto {
+ FSTLog(@"FSTWatchStream %p response: %@", (__bridge void *)self, proto);
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ // A successful response means the stream is healthy.
+ [self.backoff reset];
+
+ FSTWatchChange *change = [_serializer decodedWatchChange:proto];
+ FSTSnapshotVersion *snap = [_serializer versionFromListenResponse:proto];
+ [self.delegate watchStreamDidChange:change snapshotVersion:snap];
+}
+
+@end
+
+#pragma mark - FSTWriteStream
+
+@interface FSTWriteStream ()
+
+@property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
+
+@end
+
+@implementation FSTWriteStream
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ serializer:(FSTSerializerBeta *)serializer {
+ self = [super initWithDatabase:database
+ workerDispatchQueue:workerDispatchQueue
+ credentials:credentials
+ responseMessageClass:[GCFSWriteResponse class]];
+ if (self) {
+ _serializer = serializer;
+ }
+ return self;
+}
+
+- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
+ return [[GRPCCall alloc] initWithHost:self.databaseInfo.host
+ path:@"/google.firestore.v1beta1.Firestore/Write"
+ requestsWriter:requestsWriter];
+}
+
+- (void)startWithDelegate:(id)delegate {
+ self.handshakeComplete = NO;
+ [super startWithDelegate:delegate];
+}
+
+- (void)notifyStreamOpen {
+ [self.delegate writeStreamDidOpen];
+}
+
+- (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
+ [self.delegate writeStreamWasInterruptedWithError:error];
+}
+
+- (void)writeHandshake {
+ // The initial request cannot contain mutations, but must contain a projectID.
+ FSTAssert([self isOpen], @"Not yet open");
+ FSTAssert(!self.handshakeComplete, @"Handshake sent out of turn");
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ GCFSWriteRequest *request = [GCFSWriteRequest message];
+ request.database = [_serializer encodedDatabaseID];
+ // TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the
+ // handshake, ignoring any stream token we might have.
+
+ FSTLog(@"FSTWriteStream %p initial request: %@", (__bridge void *)self, request);
+ [self writeRequest:request];
+}
+
+- (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
+ FSTAssert([self isOpen], @"Not yet open");
+ FSTAssert(self.handshakeComplete, @"Mutations sent out of turn");
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ NSMutableArray<GCFSWrite *> *protos = [NSMutableArray arrayWithCapacity:mutations.count];
+ for (FSTMutation *mutation in mutations) {
+ [protos addObject:[_serializer encodedMutation:mutation]];
+ };
+
+ GCFSWriteRequest *request = [GCFSWriteRequest message];
+ request.writesArray = protos;
+ request.streamToken = self.lastStreamToken;
+
+ FSTLog(@"FSTWriteStream %p mutation request: %@", (__bridge void *)self, request);
+ [self writeRequest:request];
+}
+
+/**
+ * Implements GRXWriteable to receive an inbound message from GRPC, deserialize, and then pass
+ * that on to the mutationResultsHandler.
+ */
+- (void)handleStreamMessage:(GCFSWriteResponse *)response {
+ FSTLog(@"FSTWriteStream %p response: %@", (__bridge void *)self, response);
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ // Always capture the last stream token.
+ self.lastStreamToken = response.streamToken;
+
+ if (!self.isHandshakeComplete) {
+ // The first response is the handshake response
+ self.handshakeComplete = YES;
+
+ [self.delegate writeStreamDidCompleteHandshake];
+ } else {
+ // A successful first write response means the stream is healthy.
+ // Note that we could consider a successful handshake healthy, however, the write itself
+ // might be causing an error we want to back off from.
+ [self.backoff reset];
+
+ FSTSnapshotVersion *commitVersion = [_serializer decodedVersion:response.commitTime];
+ NSMutableArray<GCFSWriteResult *> *protos = response.writeResultsArray;
+ NSMutableArray<FSTMutationResult *> *results = [NSMutableArray arrayWithCapacity:protos.count];
+ for (GCFSWriteResult *proto in protos) {
+ [results addObject:[_serializer decodedMutationResult:proto]];
+ };
+
+ [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results];
+ }
+}
+
+@end