diff options
author | Sebastian Schmidt <mrschmidt@google.com> | 2017-10-31 12:32:32 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-31 12:32:32 -0700 |
commit | f9b2f91da109d2b74f7076e8b6b6d8ced73a5e27 (patch) | |
tree | 56236f42e2d878a936de901533e683031e588285 /Firestore | |
parent | 02ff6bbee95150eacff9563af4dd7a6e1aeaebdd (diff) |
Moving the Stream class to their own file (#418)
Diffstat (limited to 'Firestore')
-rw-r--r-- | Firestore/Example/Tests/Integration/FSTStreamTests.m | 1 | ||||
-rw-r--r-- | Firestore/Example/Tests/SpecTests/FSTMockDatastore.m | 1 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTDatastore.h | 290 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTDatastore.m | 731 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTRemoteStore.m | 1 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTStream.h | 312 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTStream.m | 764 |
7 files changed, 1095 insertions, 1005 deletions
diff --git a/Firestore/Example/Tests/Integration/FSTStreamTests.m b/Firestore/Example/Tests/Integration/FSTStreamTests.m index dccaa70..a9ee4d1 100644 --- a/Firestore/Example/Tests/Integration/FSTStreamTests.m +++ b/Firestore/Example/Tests/Integration/FSTStreamTests.m @@ -25,6 +25,7 @@ #import "FSTTestDispatchQueue.h" #import "Model/FSTDatabaseID.h" #import "Remote/FSTDatastore.h" +#import "Remote/FSTStream.h" #import "Util/FSTAssert.h" /** Exposes otherwise private methods for testing. */ diff --git a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m index 6af2053..10bfc5b 100644 --- a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m +++ b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m @@ -23,6 +23,7 @@ #import "Model/FSTDatabaseID.h" #import "Model/FSTMutation.h" #import "Remote/FSTSerializerBeta.h" +#import "Remote/FSTStream.h" #import "Util/FSTAssert.h" #import "Util/FSTLogger.h" diff --git a/Firestore/Source/Remote/FSTDatastore.h b/Firestore/Source/Remote/FSTDatastore.h index b8e53f5..08aa570 100644 --- a/Firestore/Source/Remote/FSTDatastore.h +++ b/Firestore/Source/Remote/FSTDatastore.h @@ -33,8 +33,7 @@ @class GRXWriter; @protocol FSTCredentialsProvider; -@protocol FSTWatchStreamDelegate; -@protocol FSTWriteStreamDelegate; +@class FSTDatabaseID; NS_ASSUME_NONNULL_BEGIN @@ -64,14 +63,26 @@ NS_ASSUME_NONNULL_BEGIN credentials:(id<FSTCredentialsProvider>)credentials NS_DESIGNATED_INITIALIZER; +/** + * Takes a dictionary of (HTTP) response headers and returns the set of whitelisted headers + * (for logging purposes). + */ ++ (NSDictionary<NSString *, NSString *> *)extractWhiteListedHeaders: + (NSDictionary<NSString *, NSString *> *)header; + /** Converts the error to a FIRFirestoreErrorDomain error. */ + (NSError *)firestoreErrorForError:(NSError *)error; +/** Returns YES if the given error is a GRPC ABORTED error. **/ ++ (BOOL)isAbortedError:(NSError *)error; + /** Returns YES if the given error indicates the RPC associated with it may not be retried. */ + (BOOL)isPermanentWriteError:(NSError *)error; -/** Returns YES if the given error is a GRPC ABORTED error. **/ -+ (BOOL)isAbortedError:(NSError *)error; +/** Adds headers to the RPC including any OAuth access token if provided .*/ ++ (void)prepareHeadersForRPC:(GRPCCall *)rpc + databaseID:(FSTDatabaseID *)databaseID + token:(nullable NSString *)token; /** Looks up a list of documents in datastore. */ - (void)lookupDocuments:(NSArray<FSTDocumentKey *> *)keys @@ -92,275 +103,4 @@ NS_ASSUME_NONNULL_BEGIN @end -/** - * An FSTStream is an abstract base class that represents a restartable streaming RPC to the - * Firestore backend. It's built on top of GRPC's own support for streaming RPCs, and adds several - * critical features for our clients: - * - * - Restarting a stream is allowed (after failure) - * - Exponential backoff on failure (independent of the underlying channel) - * - Authentication via FSTCredentialsProvider - * - Dispatching all callbacks into the shared worker queue - * - * Subclasses of FSTStream implement serialization of models to and from bytes (via protocol - * buffers) for a specific streaming RPC and emit events specific to the stream. - * - * ## Starting and Stopping - * - * Streaming RPCs are stateful and need to be started before messages can be sent and received. - * The FSTStream will call its delegate's specific streamDidOpen method once the stream is ready - * to accept requests. - * - * Should a `start` fail, FSTStream will call its delegate's specific streamDidClose method with an - * NSError indicating what went wrong. The delegate is free to call start again. - * - * An FSTStream can also be explicitly stopped which indicates that the caller has discarded the - * stream and no further events should be emitted. Once explicitly stopped, a stream cannot be - * restarted. - * - * ## Subclassing Notes - * - * An implementation of FSTStream needs to implement the following methods: - * - `createRPCWithRequestsWriter`, should create the specific RPC (a GRPCCall object). - * - `handleStreamMessage`, receives protocol buffer responses from GRPC and must deserialize and - * delegate to some stream specific response method. - * - `notifyStreamOpen`, should call through to the stream-specific streamDidOpen method. - * - `notifyStreamInterrupted`, calls through to the stream-specific streamWasInterrupted method. - * - * Additionally, beyond these required methods, subclasses will want to implement methods that - * take request models, serialize them, and write them to using writeRequest:. - * - * ## RPC Message Type - * - * FSTStream intentionally uses the GRPCCall interface to GRPC directly, bypassing both GRPCProtoRPC - * and GRXBufferedPipe for sending data. This has been done to avoid race conditions that come out - * of a loosely specified locking contract on GRXWriter. There's essentially no way to safely use - * any of the wrapper objects for GRXWriter (that perform buffering or conversion to/from protos). - * - * See https://github.com/grpc/grpc/issues/10957 for the kinds of things we're trying to avoid. - */ -@interface FSTStream <__covariant FSTStreamDelegate> : NSObject - -- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database - workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue - credentials:(id<FSTCredentialsProvider>)credentials - responseMessageClass:(Class)responseMessageClass NS_DESIGNATED_INITIALIZER; - -- (instancetype)init NS_UNAVAILABLE; - -/** - * An abstract method used by `start` to create a streaming RPC specific to this type of stream. - * The RPC should be created such that requests are taken from `self`. - * - * Note that the returned GRPCCall must not be a GRPCProtoRPC, since the rest of the streaming - * mechanism assumes it is dealing in bytes-level requests and responses. - */ -- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter; - -/** - * Returns YES if `start` has been called and no error has occurred. YES indicates the stream is - * open or in the process of opening (which encompasses respecting backoff, getting auth tokens, - * and starting the actual RPC). Use `isOpen` to determine if the stream is open and ready for - * outbound requests. - */ -- (BOOL)isStarted; - -/** Returns YES if the underlying RPC is open and the stream is ready for outbound requests. */ -- (BOOL)isOpen; - -/** - * Starts the RPC. Only allowed if isStarted returns NO. The stream is not immediately ready for - * use: the delegate's watchStreamDidOpen method will be invoked when the RPC is ready for outbound - * requests, at which point `isOpen` will return YES. - * - * When start returns, -isStarted will return YES. - */ -- (void)startWithDelegate:(id)delegate; - -/** - * Stops the RPC. This call is idempotent and allowed regardless of the current isStarted state. - * - * Unlike a transient stream close, stopping a stream is permanent. This is guaranteed NOT to emit - * any further events on the stream-specific delegate, including the streamDidClose method. - * - * NOTE: This no-events contract may seem counter-intuitive but allows the caller to - * straightforwardly sequence stream tear-down without having to worry about when the delegate's - * streamDidClose methods will get called. For example if the stream must be exchanged for another - * during a user change this allows `stop` to be called eagerly without worrying about the - * streamDidClose method accidentally restarting the stream before the new one is ready. - * - * When stop returns, -isStarted and -isOpen will both return NO. - */ -- (void)stop; - -/** - * Initializes the idle timer. If no write takes place within one minute, the GRPC stream will be - * closed. - */ -- (void)markIdle; - -/** - * After an error the stream will usually back off on the next attempt to start it. If the error - * warrants an immediate restart of the stream, the sender can use this to indicate that the - * receiver should not back off. - * - * Each error will call the stream-specific streamDidClose method. That method can decide to - * inhibit backoff if required. - */ -- (void)inhibitBackoff; - -@end - -#pragma mark - FSTWatchStream - -/** A protocol defining the events that can be emitted by the FSTWatchStream. */ -@protocol FSTWatchStreamDelegate <NSObject> - -/** Called by the FSTWatchStream when it is ready to accept outbound request messages. */ -- (void)watchStreamDidOpen; - -/** - * Called by the FSTWatchStream with changes and the snapshot versions included in in the - * WatchChange responses sent back by the server. - */ -- (void)watchStreamDidChange:(FSTWatchChange *)change - snapshotVersion:(FSTSnapshotVersion *)snapshotVersion; - -/** - * Called by the FSTWatchStream when the underlying streaming RPC is interrupted for whatever - * reason, usually because of an error, but possibly due to an idle timeout. The error passed to - * this method may be nil, in which case the stream was closed without attributable fault. - * - * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping" - * on FSTStream for details. - */ -- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error; - -@end - -/** - * An FSTStream that implements the StreamingWatch RPC. - * - * Once the FSTWatchStream has called the streamDidOpen method, any number of watchQuery and - * unwatchTargetId calls can be sent to control what changes will be sent from the server for - * WatchChanges. - */ -@interface FSTWatchStream : FSTStream - -/** - * Initializes the watch stream with its dependencies. - */ -- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database - workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue - credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER; - -- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database - workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue - credentials:(id<FSTCredentialsProvider>)credentials - responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE; - -- (instancetype)init NS_UNAVAILABLE; - -/** - * Registers interest in the results of the given query. If the query includes a resumeToken it - * will be included in the request. Results that affect the query will be streamed back as - * WatchChange messages that reference the targetID included in |query|. - */ -- (void)watchQuery:(FSTQueryData *)query; - -/** Unregisters interest in the results of the query associated with the given target ID. */ -- (void)unwatchTargetID:(FSTTargetID)targetID; - -@end - -#pragma mark - FSTWriteStream - -@protocol FSTWriteStreamDelegate <NSObject> - -/** Called by the FSTWriteStream when it is ready to accept outbound request messages. */ -- (void)writeStreamDidOpen; - -/** - * Called by the FSTWriteStream upon a successful handshake response from the server, which is the - * receiver's cue to send any pending writes. - */ -- (void)writeStreamDidCompleteHandshake; - -/** - * Called by the FSTWriteStream upon receiving a StreamingWriteResponse from the server that - * contains mutation results. - */ -- (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion - mutationResults:(NSArray<FSTMutationResult *> *)results; - -/** - * Called when the FSTWriteStream's underlying RPC is interrupted for whatever reason, usually - * because of an error, but possibly due to an idle timeout. The error passed to this method may be - * nil, in which case the stream was closed without attributable fault. - * - * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping" - * on FSTStream for details. - */ -- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error; - -@end - -/** - * An FSTStream that implements the StreamingWrite RPC. - * - * The StreamingWrite RPC requires the caller to maintain special `streamToken` state in between - * calls, to help the server understand which responses the client has processed by the time the - * next request is made. Every response may contain a `streamToken`; this value must be passed to - * the next request. - * - * After calling `start` on this stream, the next request must be a handshake, containing whatever - * streamToken is on hand. Once a response to this request is received, all pending mutations may - * be submitted. When submitting multiple batches of mutations at the same time, it's okay to use - * the same streamToken for the calls to `writeMutations:`. - */ -@interface FSTWriteStream : FSTStream - -/** - * Initializes the write stream with its dependencies. - */ -- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database - workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue - credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer; - -- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database - workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue - credentials:(id<FSTCredentialsProvider>)credentials - responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE; - -- (instancetype)init NS_UNAVAILABLE; - -/** - * Sends an initial streamToken to the server, performing the handshake required to make the - * StreamingWrite RPC work. Subsequent `writeMutations:` calls should wait until a response has - * been delivered to the delegate's writeStreamDidCompleteHandshake method. - */ -- (void)writeHandshake; - -/** Sends a group of mutations to the Firestore backend to apply. */ -- (void)writeMutations:(NSArray<FSTMutation *> *)mutations; - -/** - * Tracks whether or not a handshake has been successfully exchanged and the stream is ready to - * accept mutations. - */ -@property(nonatomic, assign, readwrite, getter=isHandshakeComplete) BOOL handshakeComplete; - -/** - * The last received stream token from the server, used to acknowledge which responses the client - * has processed. Stream tokens are opaque checkpoint markers whose only real value is their - * inclusion in the next request. - * - * FSTWriteStream manages propagating this value from responses to the next request. - */ -@property(nonatomic, strong, nullable) NSData *lastStreamToken; - -@end - NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTDatastore.m b/Firestore/Source/Remote/FSTDatastore.m index 1cabc36..a26a5a2 100644 --- a/Firestore/Source/Remote/FSTDatastore.m +++ b/Firestore/Source/Remote/FSTDatastore.m @@ -17,27 +17,23 @@ #import "FSTDatastore.h" #import <GRPCClient/GRPCCall+OAuth2.h> -#import <GRPCClient/GRPCCall.h> #import <ProtoRPC/ProtoRPC.h> #import "FIRFirestore+Internal.h" #import "FIRFirestoreErrors.h" #import "FIRFirestoreVersion.h" #import "FSTAssert.h" -#import "FSTBufferedWriter.h" -#import "FSTClasses.h" #import "FSTCredentialsProvider.h" #import "FSTDatabaseID.h" #import "FSTDatabaseInfo.h" #import "FSTDispatchQueue.h" #import "FSTDocument.h" #import "FSTDocumentKey.h" -#import "FSTExponentialBackoff.h" #import "FSTLocalStore.h" #import "FSTLogger.h" #import "FSTMutation.h" -#import "FSTQueryData.h" #import "FSTSerializerBeta.h" +#import "FSTStream.h" #import "Firestore.pbrpc.h" @@ -51,114 +47,12 @@ NS_ASSUME_NONNULL_BEGIN @property(nonatomic, getter=isSecure) BOOL secure; @end -/** - * Initial backoff time in seconds after an error. - * Set to 1s according to https://cloud.google.com/apis/design/errors. - */ -static const NSTimeInterval kBackoffInitialDelay = 1; -static const NSTimeInterval kBackoffMaxDelay = 60.0; -static const double kBackoffFactor = 1.5; static NSString *const kXGoogAPIClientHeader = @"x-goog-api-client"; static NSString *const kGoogleCloudResourcePrefix = @"google-cloud-resource-prefix"; /** Function typedef used to create RPCs. */ typedef GRPCProtoCall * (^RPCFactory)(void); -#pragma mark - FSTStream - -/** The state of a stream. */ -typedef NS_ENUM(NSInteger, FSTStreamState) { - /** - * The streaming RPC is not running and there's no error condition. Calling `start` will - * start the stream immediately without backoff. While in this state -isStarted will return NO. - */ - FSTStreamStateInitial = 0, - - /** - * The stream is starting, and is waiting for an auth token to attach to the initial request. - * While in this state, isStarted will return YES but isOpen will return NO. - */ - FSTStreamStateAuth, - - /** - * The streaming RPC is up and running. Requests and responses can flow freely. Both - * isStarted and isOpen will return YES. - */ - FSTStreamStateOpen, - - /** - * The stream encountered an error. The next start attempt will back off. While in this state - * -isStarted will return NO. - */ - FSTStreamStateError, - - /** - * An in-between state after an error where the stream is waiting before re-starting. After - * waiting is complete, the stream will try to open. While in this state -isStarted will - * return YES but isOpen will return NO. - */ - FSTStreamStateBackoff, - - /** - * The stream has been explicitly stopped; no further events will be emitted. - */ - FSTStreamStateStopped, -}; - -// We need to declare these classes first so that Datastore can alloc them. - -@interface FSTWatchStream () - -/** - * Initializes the watch stream with its dependencies. - */ -- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database - workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue - credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER; - -- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database - workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue - credentials:(id<FSTCredentialsProvider>)credentials - responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE; - -@end - -@interface FSTStream () - -@property(nonatomic, getter=isIdle) BOOL idle; -@property(nonatomic, weak, readwrite, nullable) id delegate; - -@end - -@interface FSTStream () <GRXWriteable> - -@property(nonatomic, strong, readonly) FSTDatabaseInfo *databaseInfo; -@property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue; -@property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials; -@property(nonatomic, unsafe_unretained, readonly) Class responseMessageClass; -@property(nonatomic, strong, readonly) FSTExponentialBackoff *backoff; - -/** A flag tracking whether the stream received a message from the backend. */ -@property(nonatomic, assign) BOOL messageReceived; - -/** - * Stream state as exposed to consumers of FSTStream. This differs from GRXWriter's notion of the - * state of the stream. - */ -@property(nonatomic, assign) FSTStreamState state; - -/** The RPC handle. Used for cancellation. */ -@property(nonatomic, strong, nullable) GRPCCall *rpc; - -/** - * The send-side of the RPC stream in which to submit requests, but only once the underlying RPC has - * started. - */ -@property(nonatomic, strong, nullable) FSTBufferedWriter *requestsWriter; - -@end - #pragma mark - FSTDatastore @interface FSTDatastore () @@ -439,627 +333,4 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { @end -#pragma mark - FSTCallbackFilter - -/** Filter class that allows disabling of GRPC callbacks. */ -@interface FSTCallbackFilter : NSObject <GRXWriteable> - -- (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER; -- (instancetype)init NS_UNAVAILABLE; - -@property(atomic, readwrite) BOOL callbacksEnabled; -@property(nonatomic, strong, readonly) FSTStream *stream; - -@end - -@implementation FSTCallbackFilter - -- (instancetype)initWithStream:(FSTStream *)stream { - if (self = [super init]) { - _callbacksEnabled = YES; - _stream = stream; - } - return self; -} - -- (void)suppressCallbacks { - _callbacksEnabled = NO; -} - -- (void)writeValue:(id)value { - if (_callbacksEnabled) { - [self.stream writeValue:value]; - } -} - -- (void)writesFinishedWithError:(NSError *)errorOrNil { - if (_callbacksEnabled) { - [self.stream writesFinishedWithError:errorOrNil]; - } -} - -@end - -#pragma mark - FSTStream - -@interface FSTStream () - -@property(nonatomic, strong, readwrite) FSTCallbackFilter *callbackFilter; - -@end - -@implementation FSTStream - -/** The time a stream stays open after it is marked idle. */ -static const NSTimeInterval kIdleTimeout = 60.0; - -- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database - workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue - credentials:(id<FSTCredentialsProvider>)credentials - responseMessageClass:(Class)responseMessageClass { - if (self = [super init]) { - _databaseInfo = database; - _workerDispatchQueue = workerDispatchQueue; - _credentials = credentials; - _responseMessageClass = responseMessageClass; - - _backoff = [FSTExponentialBackoff exponentialBackoffWithDispatchQueue:workerDispatchQueue - initialDelay:kBackoffInitialDelay - backoffFactor:kBackoffFactor - maxDelay:kBackoffMaxDelay]; - _state = FSTStreamStateInitial; - } - return self; -} - -- (BOOL)isStarted { - [self.workerDispatchQueue verifyIsCurrentQueue]; - FSTStreamState state = self.state; - return state == FSTStreamStateBackoff || state == FSTStreamStateAuth || - state == FSTStreamStateOpen; -} - -- (BOOL)isOpen { - [self.workerDispatchQueue verifyIsCurrentQueue]; - return self.state == FSTStreamStateOpen; -} - -- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter { - @throw FSTAbstractMethodException(); // NOLINT -} - -- (void)startWithDelegate:(id)delegate { - [self.workerDispatchQueue verifyIsCurrentQueue]; - - if (self.state == FSTStreamStateError) { - [self performBackoffWithDelegate:delegate]; - return; - } - - FSTLog(@"%@ %p start", NSStringFromClass([self class]), (__bridge void *)self); - FSTAssert(self.state == FSTStreamStateInitial, @"Already started"); - - self.state = FSTStreamStateAuth; - FSTAssert(_delegate == nil, @"Delegate must be nil"); - _delegate = delegate; - - [self.credentials - getTokenForcingRefresh:NO - completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) { - error = [FSTDatastore firestoreErrorForError:error]; - [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{ - [self resumeStartWithToken:result error:error]; - }]; - }]; -} - -/** Add an access token to our RPC, after obtaining one from the credentials provider. */ -- (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error { - if (self.state == FSTStreamStateStopped) { - // Streams can be stopped while waiting for authorization. - return; - } - - [self.workerDispatchQueue verifyIsCurrentQueue]; - FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)", - (long)self.state); - - // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token, - // but I'm not sure how to detect that right now. http://b/32762461 - if (error) { - // RPC has not been started yet, so just invoke higher-level close handler. - [self handleStreamClose:error]; - return; - } - - self.requestsWriter = [[FSTBufferedWriter alloc] init]; - _rpc = [self createRPCWithRequestsWriter:self.requestsWriter]; - [FSTDatastore prepareHeadersForRPC:_rpc - databaseID:self.databaseInfo.databaseID - token:token.token]; - FSTAssert(_callbackFilter == nil, @"GRX Filter must be nil"); - _callbackFilter = [[FSTCallbackFilter alloc] initWithStream:self]; - [_rpc startWithWriteable:_callbackFilter]; - - self.state = FSTStreamStateOpen; - [self notifyStreamOpen]; -} - -/** Backs off after an error. */ -- (void)performBackoffWithDelegate:(id)delegate { - FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self); - [self.workerDispatchQueue verifyIsCurrentQueue]; - - FSTAssert(self.state == FSTStreamStateError, @"Should only perform backoff in an error case"); - self.state = FSTStreamStateBackoff; - - FSTWeakify(self); - [self.backoff backoffAndRunBlock:^{ - FSTStrongify(self); - [self resumeStartFromBackoffWithDelegate:delegate]; - }]; -} - -/** Resumes stream start after backing off. */ -- (void)resumeStartFromBackoffWithDelegate:(id)delegate { - if (self.state == FSTStreamStateStopped) { - // Streams can be stopped while waiting for backoff to complete. - return; - } - - // In order to have performed a backoff the stream must have been in an error state just prior - // to entering the backoff state. If we weren't stopped we must be in the backoff state. - FSTAssert(self.state == FSTStreamStateBackoff, @"State should still be backoff (was %ld)", - (long)self.state); - - // Momentarily set state to FSTStreamStateInitial as `start` expects it. - self.state = FSTStreamStateInitial; - [self startWithDelegate:delegate]; - FSTAssert([self isStarted], @"Stream should have started."); -} - -/** - * Closes the stream and cleans up as necessary: - * - * * closes the underlying GRPC stream; - * * calls the onClose handler with the given 'error'; - * * sets internal stream state to 'finalState'; - * * adjusts the backoff timer based on the error - * - * A new stream can be opened by calling `start` unless `finalState` is set to - * `FSTStreamStateStopped`. - * - * @param finalState the intended state of the stream after closing. - * @param error the NSError the connection was closed with. - */ -- (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *)error { - FSTAssert(finalState == FSTStreamStateError || error == nil, - @"Can't provide an error when not in an error state."); - - [self.workerDispatchQueue verifyIsCurrentQueue]; - [self cancelIdleCheck]; - - if (finalState != FSTStreamStateError) { - // If this is an intentional close ensure we don't delay our next connection attempt. - [self.backoff reset]; - } else if (error != nil && error.code == FIRFirestoreErrorCodeResourceExhausted) { - FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class], - (__bridge void *)self); - [self.backoff resetToMax]; - } - - // This state must be assigned before calling `notifyStreamInterrupted` to allow the callback to - // inhibit backoff or otherwise manipulate the state in its non-started state. - self.state = finalState; - - if (self.requestsWriter) { - // Clean up the underlying RPC. If this close: is in response to an error, don't attempt to - // call half-close to avoid secondary failures. - if (finalState != FSTStreamStateError) { - FSTLog(@"%@ %p Closing stream client-side", [self class], (__bridge void *)self); - @synchronized(self.requestsWriter) { - [self.requestsWriter finishWithError:nil]; - } - } - _requestsWriter = nil; - } - - [self.callbackFilter suppressCallbacks]; - _callbackFilter = nil; - - // Clean up remaining state. - _messageReceived = NO; - _rpc = nil; - - // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it - // could trigger undesirable recovery logic, etc.). - if (finalState != FSTStreamStateStopped) { - [self notifyStreamInterruptedWithError:error]; - } - - // Clear the delegates to avoid any possible bleed through of events from GRPC. - FSTAssert(_delegate, - @"closeWithFinalState should only be called for a started stream that has an active " - @"delegate."); - _delegate = nil; -} - -- (void)stop { - FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self); - if ([self isStarted]) { - [self closeWithFinalState:FSTStreamStateStopped error:nil]; - } -} - -- (void)inhibitBackoff { - FSTAssert(![self isStarted], @"Can only inhibit backoff after an error (was %ld)", - (long)self.state); - [self.workerDispatchQueue verifyIsCurrentQueue]; - - // Clear the error condition. - self.state = FSTStreamStateInitial; - [self.backoff reset]; -} - -/** Called by the idle timer when the stream should close due to inactivity. */ -- (void)handleIdleCloseTimer { - [self.workerDispatchQueue verifyIsCurrentQueue]; - if (self.state == FSTStreamStateOpen && [self isIdle]) { - // When timing out an idle stream there's no reason to force the stream into backoff when - // it restarts so set the stream state to Initial instead of Error. - [self closeWithFinalState:FSTStreamStateInitial error:nil]; - } -} - -- (void)markIdle { - [self.workerDispatchQueue verifyIsCurrentQueue]; - if (self.state == FSTStreamStateOpen) { - self.idle = YES; - [self.workerDispatchQueue dispatchAfterDelay:kIdleTimeout - block:^() { - [self handleIdleCloseTimer]; - }]; - } -} - -- (void)cancelIdleCheck { - [self.workerDispatchQueue verifyIsCurrentQueue]; - self.idle = NO; -} - -/** - * Parses a protocol buffer response from the server. If the message fails to parse, generates - * an error and closes the stream. - * - * @param protoClass A protocol buffer message class object, that responds to parseFromData:error:. - * @param data The bytes in the response as returned from GRPC. - * @return An instance of the protocol buffer message, parsed from the data if parsing was - * successful, or nil otherwise. - */ -- (nullable id)parseProto:(Class)protoClass data:(NSData *)data error:(NSError **)error { - NSError *parseError; - id parsed = [protoClass parseFromData:data error:&parseError]; - if (parsed) { - *error = nil; - return parsed; - } else { - NSDictionary *info = @{ - NSLocalizedDescriptionKey : @"Unable to parse response from the server", - NSUnderlyingErrorKey : parseError, - @"Expected class" : protoClass, - @"Received value" : data, - }; - *error = [NSError errorWithDomain:FIRFirestoreErrorDomain - code:FIRFirestoreErrorCodeInternal - userInfo:info]; - return nil; - } -} - -/** - * Writes a request proto into the stream. - */ -- (void)writeRequest:(GPBMessage *)request { - NSData *data = [request data]; - - [self cancelIdleCheck]; - - FSTBufferedWriter *requestsWriter = self.requestsWriter; - @synchronized(requestsWriter) { - [requestsWriter writeValue:data]; - } -} - -#pragma mark Template methods for subclasses - -/** - * Called by the stream after the stream has opened. - * - * Subclasses should relay to their stream-specific delegate. Calling [super notifyStreamOpen] is - * not required. - */ -- (void)notifyStreamOpen { -} - -/** - * Called by the stream after the stream has been unexpectedly interrupted, either due to an error - * or due to idleness. - * - * Subclasses should relay to their stream-specific delegate. Calling [super - * notifyStreamInterrupted] is not required. - */ -- (void)notifyStreamInterruptedWithError:(nullable NSError *)error { -} - -/** - * Called by the stream for each incoming protocol message coming from the server. - * - * Subclasses should implement this to deserialize the value and relay to their stream-specific - * delegate, if appropriate. Calling [super handleStreamMessage] is not required. - */ -- (void)handleStreamMessage:(id)value { -} - -/** - * Called by the stream when the underlying RPC has been closed for whatever reason. - */ -- (void)handleStreamClose:(nullable NSError *)error { - FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error); - FSTAssert([self isStarted], @"Can't handle server close in non-started state."); - - // In theory the stream could close cleanly, however, in our current model we never expect this - // to happen because if we stop a stream ourselves, this callback will never be called. To - // prevent cases where we retry without a backoff accidentally, we set the stream to error - // in all cases. - [self closeWithFinalState:FSTStreamStateError error:error]; -} - -#pragma mark GRXWriteable implementation -// The GRXWriteable implementation defines the receive side of the RPC stream. - -/** - * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately - * redispatch back onto our own worker queue. - */ -- (void)writeValue:(id)value __used { - // TODO(mcg): remove the double-dispatch once GRPCCall at head is released. - // Once released we can set the responseDispatchQueue property on the GRPCCall and then this - // method can call handleStreamMessage directly. - FSTWeakify(self); - [self.workerDispatchQueue dispatchAsync:^{ - FSTStrongify(self); - if (!self || self.state == FSTStreamStateStopped) { - return; - } - if (!self.messageReceived) { - self.messageReceived = YES; - if ([FIRFirestore isLoggingEnabled]) { - FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]), - (__bridge void *)self, - [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]); - } - } - NSError *error; - id proto = [self parseProto:self.responseMessageClass data:value error:&error]; - if (proto) { - [self handleStreamMessage:proto]; - } else { - [_rpc finishWithError:error]; - } - }]; -} - -/** - * Called by GRPC when it closed the stream with an error representing the final state of the - * stream. - * - * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to - * directly inform stream-specific logic, or call stop to tear down the stream. - */ -- (void)writesFinishedWithError:(nullable NSError *)error __used { - error = [FSTDatastore firestoreErrorForError:error]; - FSTWeakify(self); - [self.workerDispatchQueue dispatchAsync:^{ - FSTStrongify(self); - if (!self || self.state == FSTStreamStateStopped) { - return; - } - [self handleStreamClose:error]; - }]; -} - -@end - -#pragma mark - FSTWatchStream - -@interface FSTWatchStream () - -@property(nonatomic, strong, readonly) FSTSerializerBeta *serializer; - -@end - -@implementation FSTWatchStream - -- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database - workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue - credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer { - self = [super initWithDatabase:database - workerDispatchQueue:workerDispatchQueue - credentials:credentials - responseMessageClass:[GCFSListenResponse class]]; - if (self) { - _serializer = serializer; - } - return self; -} - -- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter { - return [[GRPCCall alloc] initWithHost:self.databaseInfo.host - path:@"/google.firestore.v1beta1.Firestore/Listen" - requestsWriter:requestsWriter]; -} - -- (void)notifyStreamOpen { - [self.delegate watchStreamDidOpen]; -} - -- (void)notifyStreamInterruptedWithError:(nullable NSError *)error { - [self.delegate watchStreamWasInterruptedWithError:error]; -} - -- (void)watchQuery:(FSTQueryData *)query { - FSTAssert([self isOpen], @"Not yet open"); - [self.workerDispatchQueue verifyIsCurrentQueue]; - - GCFSListenRequest *request = [GCFSListenRequest message]; - request.database = [_serializer encodedDatabaseID]; - request.addTarget = [_serializer encodedTarget:query]; - request.labels = [_serializer encodedListenRequestLabelsForQueryData:query]; - - FSTLog(@"FSTWatchStream %p watch: %@", (__bridge void *)self, request); - [self writeRequest:request]; -} - -- (void)unwatchTargetID:(FSTTargetID)targetID { - FSTAssert([self isOpen], @"Not yet open"); - [self.workerDispatchQueue verifyIsCurrentQueue]; - - GCFSListenRequest *request = [GCFSListenRequest message]; - request.database = [_serializer encodedDatabaseID]; - request.removeTarget = targetID; - - FSTLog(@"FSTWatchStream %p unwatch: %@", (__bridge void *)self, request); - [self writeRequest:request]; -} - -/** - * Receives an inbound message from GRPC, deserializes, and then passes that on to the delegate's - * watchStreamDidChange:snapshotVersion: callback. - */ -- (void)handleStreamMessage:(GCFSListenResponse *)proto { - FSTLog(@"FSTWatchStream %p response: %@", (__bridge void *)self, proto); - [self.workerDispatchQueue verifyIsCurrentQueue]; - - // A successful response means the stream is healthy. - [self.backoff reset]; - - FSTWatchChange *change = [_serializer decodedWatchChange:proto]; - FSTSnapshotVersion *snap = [_serializer versionFromListenResponse:proto]; - [self.delegate watchStreamDidChange:change snapshotVersion:snap]; -} - -@end - -#pragma mark - FSTWriteStream - -@interface FSTWriteStream () - -@property(nonatomic, strong, readonly) FSTSerializerBeta *serializer; - -@end - -@implementation FSTWriteStream - -- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database - workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue - credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer { - self = [super initWithDatabase:database - workerDispatchQueue:workerDispatchQueue - credentials:credentials - responseMessageClass:[GCFSWriteResponse class]]; - if (self) { - _serializer = serializer; - } - return self; -} - -- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter { - return [[GRPCCall alloc] initWithHost:self.databaseInfo.host - path:@"/google.firestore.v1beta1.Firestore/Write" - requestsWriter:requestsWriter]; -} - -- (void)startWithDelegate:(id)delegate { - self.handshakeComplete = NO; - [super startWithDelegate:delegate]; -} - -- (void)notifyStreamOpen { - [self.delegate writeStreamDidOpen]; -} - -- (void)notifyStreamInterruptedWithError:(nullable NSError *)error { - [self.delegate writeStreamWasInterruptedWithError:error]; -} - -- (void)writeHandshake { - // The initial request cannot contain mutations, but must contain a projectID. - FSTAssert([self isOpen], @"Not yet open"); - FSTAssert(!self.handshakeComplete, @"Handshake sent out of turn"); - [self.workerDispatchQueue verifyIsCurrentQueue]; - - GCFSWriteRequest *request = [GCFSWriteRequest message]; - request.database = [_serializer encodedDatabaseID]; - // TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the - // handshake, ignoring any stream token we might have. - - FSTLog(@"FSTWriteStream %p initial request: %@", (__bridge void *)self, request); - [self writeRequest:request]; -} - -- (void)writeMutations:(NSArray<FSTMutation *> *)mutations { - FSTAssert([self isOpen], @"Not yet open"); - FSTAssert(self.handshakeComplete, @"Mutations sent out of turn"); - [self.workerDispatchQueue verifyIsCurrentQueue]; - - NSMutableArray<GCFSWrite *> *protos = [NSMutableArray arrayWithCapacity:mutations.count]; - for (FSTMutation *mutation in mutations) { - [protos addObject:[_serializer encodedMutation:mutation]]; - }; - - GCFSWriteRequest *request = [GCFSWriteRequest message]; - request.writesArray = protos; - request.streamToken = self.lastStreamToken; - - FSTLog(@"FSTWriteStream %p mutation request: %@", (__bridge void *)self, request); - [self writeRequest:request]; -} - -/** - * Implements GRXWriteable to receive an inbound message from GRPC, deserialize, and then pass - * that on to the mutationResultsHandler. - */ -- (void)handleStreamMessage:(GCFSWriteResponse *)response { - FSTLog(@"FSTWriteStream %p response: %@", (__bridge void *)self, response); - [self.workerDispatchQueue verifyIsCurrentQueue]; - - // Always capture the last stream token. - self.lastStreamToken = response.streamToken; - - if (!self.isHandshakeComplete) { - // The first response is the handshake response - self.handshakeComplete = YES; - - [self.delegate writeStreamDidCompleteHandshake]; - } else { - // A successful first write response means the stream is healthy. - // Note that we could consider a successful handshake healthy, however, the write itself - // might be causing an error we want to back off from. - [self.backoff reset]; - - FSTSnapshotVersion *commitVersion = [_serializer decodedVersion:response.commitTime]; - NSMutableArray<GCFSWriteResult *> *protos = response.writeResultsArray; - NSMutableArray<FSTMutationResult *> *results = [NSMutableArray arrayWithCapacity:protos.count]; - for (GCFSWriteResult *proto in protos) { - [results addObject:[_serializer decodedMutationResult:proto]]; - }; - - [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results]; - } -} - -@end - NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTRemoteStore.m b/Firestore/Source/Remote/FSTRemoteStore.m index 0bb37cc..45a8aca 100644 --- a/Firestore/Source/Remote/FSTRemoteStore.m +++ b/Firestore/Source/Remote/FSTRemoteStore.m @@ -29,6 +29,7 @@ #import "FSTQueryData.h" #import "FSTRemoteEvent.h" #import "FSTSnapshotVersion.h" +#import "FSTStream.h" #import "FSTTransaction.h" #import "FSTWatchChange.h" diff --git a/Firestore/Source/Remote/FSTStream.h b/Firestore/Source/Remote/FSTStream.h new file mode 100644 index 0000000..c3f1c98 --- /dev/null +++ b/Firestore/Source/Remote/FSTStream.h @@ -0,0 +1,312 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import <Foundation/Foundation.h> + +#import "FSTTypes.h" + +@class FSTDatabaseInfo; +@class FSTDocumentKey; +@class FSTDispatchQueue; +@class FSTMutation; +@class FSTMutationResult; +@class FSTQueryData; +@class FSTSerializerBeta; +@class FSTSnapshotVersion; +@class FSTWatchChange; +@class FSTWatchStream; +@class FSTWriteStream; +@class GRPCCall; +@class GRXWriter; + +@protocol FSTCredentialsProvider; +@protocol FSTWatchStreamDelegate; +@protocol FSTWriteStreamDelegate; + +NS_ASSUME_NONNULL_BEGIN + +/** + * An FSTStream is an abstract base class that represents a restartable streaming RPC to the + * Firestore backend. It's built on top of GRPC's own support for streaming RPCs, and adds several + * critical features for our clients: + * + * - Restarting a stream is allowed (after failure) + * - Exponential backoff on failure (independent of the underlying channel) + * - Authentication via FSTCredentialsProvider + * - Dispatching all callbacks into the shared worker queue + * + * Subclasses of FSTStream implement serialization of models to and from bytes (via protocol + * buffers) for a specific streaming RPC and emit events specific to the stream. + * + * ## Starting and Stopping + * + * Streaming RPCs are stateful and need to be started before messages can be sent and received. + * The FSTStream will call its delegate's specific streamDidOpen method once the stream is ready + * to accept requests. + * + * Should a `start` fail, FSTStream will call its delegate's specific streamDidClose method with an + * NSError indicating what went wrong. The delegate is free to call start again. + * + * An FSTStream can also be explicitly stopped which indicates that the caller has discarded the + * stream and no further events should be emitted. Once explicitly stopped, a stream cannot be + * restarted. + * + * ## Subclassing Notes + * + * An implementation of FSTStream needs to implement the following methods: + * - `createRPCWithRequestsWriter`, should create the specific RPC (a GRPCCall object). + * - `handleStreamMessage`, receives protocol buffer responses from GRPC and must deserialize and + * delegate to some stream specific response method. + * - `notifyStreamOpen`, should call through to the stream-specific streamDidOpen method. + * - `notifyStreamInterrupted`, calls through to the stream-specific streamWasInterrupted method. + * + * Additionally, beyond these required methods, subclasses will want to implement methods that + * take request models, serialize them, and write them to using writeRequest:. + * + * ## RPC Message Type + * + * FSTStream intentionally uses the GRPCCall interface to GRPC directly, bypassing both GRPCProtoRPC + * and GRXBufferedPipe for sending data. This has been done to avoid race conditions that come out + * of a loosely specified locking contract on GRXWriter. There's essentially no way to safely use + * any of the wrapper objects for GRXWriter (that perform buffering or conversion to/from protos). + * + * See https://github.com/grpc/grpc/issues/10957 for the kinds of things we're trying to avoid. + */ +@interface FSTStream <__covariant FSTStreamDelegate> : NSObject + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass NS_DESIGNATED_INITIALIZER; + +- (instancetype)init NS_UNAVAILABLE; + +/** + * An abstract method used by `start` to create a streaming RPC specific to this type of stream. + * The RPC should be created such that requests are taken from `self`. + * + * Note that the returned GRPCCall must not be a GRPCProtoRPC, since the rest of the streaming + * mechanism assumes it is dealing in bytes-level requests and responses. + */ +- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter; + +/** + * Returns YES if `start` has been called and no error has occurred. YES indicates the stream is + * open or in the process of opening (which encompasses respecting backoff, getting auth tokens, + * and starting the actual RPC). Use `isOpen` to determine if the stream is open and ready for + * outbound requests. + */ +- (BOOL)isStarted; + +/** Returns YES if the underlying RPC is open and the stream is ready for outbound requests. */ +- (BOOL)isOpen; + +/** + * Starts the RPC. Only allowed if isStarted returns NO. The stream is not immediately ready for + * use: the delegate's watchStreamDidOpen method will be invoked when the RPC is ready for outbound + * requests, at which point `isOpen` will return YES. + * + * When start returns, -isStarted will return YES. + */ +- (void)startWithDelegate:(id)delegate; + +/** + * Stops the RPC. This call is idempotent and allowed regardless of the current isStarted state. + * + * Unlike a transient stream close, stopping a stream is permanent. This is guaranteed NOT to emit + * any further events on the stream-specific delegate, including the streamDidClose method. + * + * NOTE: This no-events contract may seem counter-intuitive but allows the caller to + * straightforwardly sequence stream tear-down without having to worry about when the delegate's + * streamDidClose methods will get called. For example if the stream must be exchanged for another + * during a user change this allows `stop` to be called eagerly without worrying about the + * streamDidClose method accidentally restarting the stream before the new one is ready. + * + * When stop returns, -isStarted and -isOpen will both return NO. + */ +- (void)stop; + +/** + * Initializes the idle timer. If no write takes place within one minute, the GRPC stream will be + * closed. + */ +- (void)markIdle; + +/** + * After an error the stream will usually back off on the next attempt to start it. If the error + * warrants an immediate restart of the stream, the sender can use this to indicate that the + * receiver should not back off. + * + * Each error will call the stream-specific streamDidClose method. That method can decide to + * inhibit backoff if required. + */ +- (void)inhibitBackoff; + +@end + +#pragma mark - FSTWatchStream + +/** A protocol defining the events that can be emitted by the FSTWatchStream. */ +@protocol FSTWatchStreamDelegate <NSObject> + +/** Called by the FSTWatchStream when it is ready to accept outbound request messages. */ +- (void)watchStreamDidOpen; + +/** + * Called by the FSTWatchStream with changes and the snapshot versions included in in the + * WatchChange responses sent back by the server. + */ +- (void)watchStreamDidChange:(FSTWatchChange *)change + snapshotVersion:(FSTSnapshotVersion *)snapshotVersion; + +/** + * Called by the FSTWatchStream when the underlying streaming RPC is interrupted for whatever + * reason, usually because of an error, but possibly due to an idle timeout. The error passed to + * this method may be nil, in which case the stream was closed without attributable fault. + * + * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping" + * on FSTStream for details. + */ +- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error; + +@end + +/** + * An FSTStream that implements the StreamingWatch RPC. + * + * Once the FSTWatchStream has called the streamDidOpen method, any number of watchQuery and + * unwatchTargetId calls can be sent to control what changes will be sent from the server for + * WatchChanges. + */ +@interface FSTWatchStream : FSTStream + +/** + * Initializes the watch stream with its dependencies. + */ +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER; + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE; + +- (instancetype)init NS_UNAVAILABLE; + +/** + * Registers interest in the results of the given query. If the query includes a resumeToken it + * will be included in the request. Results that affect the query will be streamed back as + * WatchChange messages that reference the targetID included in |query|. + */ +- (void)watchQuery:(FSTQueryData *)query; + +/** Unregisters interest in the results of the query associated with the given target ID. */ +- (void)unwatchTargetID:(FSTTargetID)targetID; + +@end + +#pragma mark - FSTWriteStream + +@protocol FSTWriteStreamDelegate <NSObject> + +/** Called by the FSTWriteStream when it is ready to accept outbound request messages. */ +- (void)writeStreamDidOpen; + +/** + * Called by the FSTWriteStream upon a successful handshake response from the server, which is the + * receiver's cue to send any pending writes. + */ +- (void)writeStreamDidCompleteHandshake; + +/** + * Called by the FSTWriteStream upon receiving a StreamingWriteResponse from the server that + * contains mutation results. + */ +- (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion + mutationResults:(NSArray<FSTMutationResult *> *)results; + +/** + * Called when the FSTWriteStream's underlying RPC is interrupted for whatever reason, usually + * because of an error, but possibly due to an idle timeout. The error passed to this method may be + * nil, in which case the stream was closed without attributable fault. + * + * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping" + * on FSTStream for details. + */ +- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error; + +@end + +/** + * An FSTStream that implements the StreamingWrite RPC. + * + * The StreamingWrite RPC requires the caller to maintain special `streamToken` state in between + * calls, to help the server understand which responses the client has processed by the time the + * next request is made. Every response may contain a `streamToken`; this value must be passed to + * the next request. + * + * After calling `start` on this stream, the next request must be a handshake, containing whatever + * streamToken is on hand. Once a response to this request is received, all pending mutations may + * be submitted. When submitting multiple batches of mutations at the same time, it's okay to use + * the same streamToken for the calls to `writeMutations:`. + */ +@interface FSTWriteStream : FSTStream + +/** + * Initializes the write stream with its dependencies. + */ +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + serializer:(FSTSerializerBeta *)serializer; + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE; + +- (instancetype)init NS_UNAVAILABLE; + +/** + * Sends an initial streamToken to the server, performing the handshake required to make the + * StreamingWrite RPC work. Subsequent `writeMutations:` calls should wait until a response has + * been delivered to the delegate's writeStreamDidCompleteHandshake method. + */ +- (void)writeHandshake; + +/** Sends a group of mutations to the Firestore backend to apply. */ +- (void)writeMutations:(NSArray<FSTMutation *> *)mutations; + +/** + * Tracks whether or not a handshake has been successfully exchanged and the stream is ready to + * accept mutations. + */ +@property(nonatomic, assign, readwrite, getter=isHandshakeComplete) BOOL handshakeComplete; + +/** + * The last received stream token from the server, used to acknowledge which responses the client + * has processed. Stream tokens are opaque checkpoint markers whose only real value is their + * inclusion in the next request. + * + * FSTWriteStream manages propagating this value from responses to the next request. + */ +@property(nonatomic, strong, nullable) NSData *lastStreamToken; + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTStream.m b/Firestore/Source/Remote/FSTStream.m new file mode 100644 index 0000000..5a6f982 --- /dev/null +++ b/Firestore/Source/Remote/FSTStream.m @@ -0,0 +1,764 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "FSTDatastore.h" + +#import <GRPCClient/GRPCCall+OAuth2.h> +#import <GRPCClient/GRPCCall.h> + +#import "FIRFirestore+Internal.h" +#import "FIRFirestoreErrors.h" +#import "FSTAssert.h" +#import "FSTBufferedWriter.h" +#import "FSTClasses.h" +#import "FSTCredentialsProvider.h" +#import "FSTDatabaseID.h" +#import "FSTDatabaseInfo.h" +#import "FSTDispatchQueue.h" +#import "FSTExponentialBackoff.h" +#import "FSTLogger.h" +#import "FSTMutation.h" +#import "FSTQueryData.h" +#import "FSTSerializerBeta.h" +#import "FSTStream.h" + +#import "Firestore.pbrpc.h" + +/** + * Initial backoff time in seconds after an error. + * Set to 1s according to https://cloud.google.com/apis/design/errors. + */ +static const NSTimeInterval kBackoffInitialDelay = 1; +static const NSTimeInterval kBackoffMaxDelay = 60.0; +static const double kBackoffFactor = 1.5; + +#pragma mark - FSTStream + +/** The state of a stream. */ +typedef NS_ENUM(NSInteger, FSTStreamState) { + /** + * The streaming RPC is not running and there's no error condition. Calling `start` will + * start the stream immediately without backoff. While in this state -isStarted will return NO. + */ + FSTStreamStateInitial = 0, + + /** + * The stream is starting, and is waiting for an auth token to attach to the initial request. + * While in this state, isStarted will return YES but isOpen will return NO. + */ + FSTStreamStateAuth, + + /** + * The streaming RPC is up and running. Requests and responses can flow freely. Both + * isStarted and isOpen will return YES. + */ + FSTStreamStateOpen, + + /** + * The stream encountered an error. The next start attempt will back off. While in this state + * -isStarted will return NO. + */ + FSTStreamStateError, + + /** + * An in-between state after an error where the stream is waiting before re-starting. After + * waiting is complete, the stream will try to open. While in this state -isStarted will + * return YES but isOpen will return NO. + */ + FSTStreamStateBackoff, + + /** + * The stream has been explicitly stopped; no further events will be emitted. + */ + FSTStreamStateStopped, +}; + +// We need to declare these classes first so that Datastore can alloc them. + +@interface FSTWatchStream () + +/** + * Initializes the watch stream with its dependencies. + */ +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER; + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE; + +@end + +@interface FSTStream () + +@property(nonatomic, getter=isIdle) BOOL idle; +@property(nonatomic, weak, readwrite, nullable) id delegate; + +@end + +@interface FSTStream () <GRXWriteable> + +@property(nonatomic, strong, readonly) FSTDatabaseInfo *databaseInfo; +@property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue; +@property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials; +@property(nonatomic, unsafe_unretained, readonly) Class responseMessageClass; +@property(nonatomic, strong, readonly) FSTExponentialBackoff *backoff; + +/** A flag tracking whether the stream received a message from the backend. */ +@property(nonatomic, assign) BOOL messageReceived; + +/** + * Stream state as exposed to consumers of FSTStream. This differs from GRXWriter's notion of the + * state of the stream. + */ +@property(nonatomic, assign) FSTStreamState state; + +/** The RPC handle. Used for cancellation. */ +@property(nonatomic, strong, nullable) GRPCCall *rpc; + +/** + * The send-side of the RPC stream in which to submit requests, but only once the underlying RPC has + * started. + */ +@property(nonatomic, strong, nullable) FSTBufferedWriter *requestsWriter; + +@end + +#pragma mark - FSTCallbackFilter + +/** Filter class that allows disabling of GRPC callbacks. */ +@interface FSTCallbackFilter : NSObject <GRXWriteable> + +- (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER; +- (instancetype)init NS_UNAVAILABLE; + +@property(atomic, readwrite) BOOL callbacksEnabled; +@property(nonatomic, strong, readonly) FSTStream *stream; + +@end + +@implementation FSTCallbackFilter + +- (instancetype)initWithStream:(FSTStream *)stream { + if (self = [super init]) { + _callbacksEnabled = YES; + _stream = stream; + } + return self; +} + +- (void)suppressCallbacks { + _callbacksEnabled = NO; +} + +- (void)writeValue:(id)value { + if (_callbacksEnabled) { + [self.stream writeValue:value]; + } +} + +- (void)writesFinishedWithError:(NSError *)errorOrNil { + if (_callbacksEnabled) { + [self.stream writesFinishedWithError:errorOrNil]; + } +} + +@end + +#pragma mark - FSTStream + +@interface FSTStream () + +@property(nonatomic, strong, readwrite) FSTCallbackFilter *callbackFilter; + +@end + +@implementation FSTStream + +/** The time a stream stays open after it is marked idle. */ +static const NSTimeInterval kIdleTimeout = 60.0; + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass { + if (self = [super init]) { + _databaseInfo = database; + _workerDispatchQueue = workerDispatchQueue; + _credentials = credentials; + _responseMessageClass = responseMessageClass; + + _backoff = [FSTExponentialBackoff exponentialBackoffWithDispatchQueue:workerDispatchQueue + initialDelay:kBackoffInitialDelay + backoffFactor:kBackoffFactor + maxDelay:kBackoffMaxDelay]; + _state = FSTStreamStateInitial; + } + return self; +} + +- (BOOL)isStarted { + [self.workerDispatchQueue verifyIsCurrentQueue]; + FSTStreamState state = self.state; + return state == FSTStreamStateBackoff || state == FSTStreamStateAuth || + state == FSTStreamStateOpen; +} + +- (BOOL)isOpen { + [self.workerDispatchQueue verifyIsCurrentQueue]; + return self.state == FSTStreamStateOpen; +} + +- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter { + @throw FSTAbstractMethodException(); // NOLINT +} + +- (void)startWithDelegate:(id)delegate { + [self.workerDispatchQueue verifyIsCurrentQueue]; + + if (self.state == FSTStreamStateError) { + [self performBackoffWithDelegate:delegate]; + return; + } + + FSTLog(@"%@ %p start", NSStringFromClass([self class]), (__bridge void *)self); + FSTAssert(self.state == FSTStreamStateInitial, @"Already started"); + + self.state = FSTStreamStateAuth; + FSTAssert(_delegate == nil, @"Delegate must be nil"); + _delegate = delegate; + + [self.credentials + getTokenForcingRefresh:NO + completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) { + error = [FSTDatastore firestoreErrorForError:error]; + [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{ + [self resumeStartWithToken:result error:error]; + }]; + }]; +} + +/** Add an access token to our RPC, after obtaining one from the credentials provider. */ +- (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error { + if (self.state == FSTStreamStateStopped) { + // Streams can be stopped while waiting for authorization. + return; + } + + [self.workerDispatchQueue verifyIsCurrentQueue]; + FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)", + (long)self.state); + + // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token, + // but I'm not sure how to detect that right now. http://b/32762461 + if (error) { + // RPC has not been started yet, so just invoke higher-level close handler. + [self handleStreamClose:error]; + return; + } + + self.requestsWriter = [[FSTBufferedWriter alloc] init]; + _rpc = [self createRPCWithRequestsWriter:self.requestsWriter]; + [FSTDatastore prepareHeadersForRPC:_rpc + databaseID:self.databaseInfo.databaseID + token:token.token]; + FSTAssert(_callbackFilter == nil, @"GRX Filter must be nil"); + _callbackFilter = [[FSTCallbackFilter alloc] initWithStream:self]; + [_rpc startWithWriteable:_callbackFilter]; + + self.state = FSTStreamStateOpen; + [self notifyStreamOpen]; +} + +/** Backs off after an error. */ +- (void)performBackoffWithDelegate:(id)delegate { + FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + FSTAssert(self.state == FSTStreamStateError, @"Should only perform backoff in an error case"); + self.state = FSTStreamStateBackoff; + + FSTWeakify(self); + [self.backoff backoffAndRunBlock:^{ + FSTStrongify(self); + [self resumeStartFromBackoffWithDelegate:delegate]; + }]; +} + +/** Resumes stream start after backing off. */ +- (void)resumeStartFromBackoffWithDelegate:(id)delegate { + if (self.state == FSTStreamStateStopped) { + // Streams can be stopped while waiting for backoff to complete. + return; + } + + // In order to have performed a backoff the stream must have been in an error state just prior + // to entering the backoff state. If we weren't stopped we must be in the backoff state. + FSTAssert(self.state == FSTStreamStateBackoff, @"State should still be backoff (was %ld)", + (long)self.state); + + // Momentarily set state to FSTStreamStateInitial as `start` expects it. + self.state = FSTStreamStateInitial; + [self startWithDelegate:delegate]; + FSTAssert([self isStarted], @"Stream should have started."); +} + +/** + * Closes the stream and cleans up as necessary: + * + * * closes the underlying GRPC stream; + * * calls the onClose handler with the given 'error'; + * * sets internal stream state to 'finalState'; + * * adjusts the backoff timer based on the error + * + * A new stream can be opened by calling `start` unless `finalState` is set to + * `FSTStreamStateStopped`. + * + * @param finalState the intended state of the stream after closing. + * @param error the NSError the connection was closed with. + */ +- (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *)error { + FSTAssert(finalState == FSTStreamStateError || error == nil, + @"Can't provide an error when not in an error state."); + + [self.workerDispatchQueue verifyIsCurrentQueue]; + [self cancelIdleCheck]; + + if (finalState != FSTStreamStateError) { + // If this is an intentional close ensure we don't delay our next connection attempt. + [self.backoff reset]; + } else if (error != nil && error.code == FIRFirestoreErrorCodeResourceExhausted) { + FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class], + (__bridge void *)self); + [self.backoff resetToMax]; + } + + // This state must be assigned before calling `notifyStreamInterrupted` to allow the callback to + // inhibit backoff or otherwise manipulate the state in its non-started state. + self.state = finalState; + + if (self.requestsWriter) { + // Clean up the underlying RPC. If this close: is in response to an error, don't attempt to + // call half-close to avoid secondary failures. + if (finalState != FSTStreamStateError) { + FSTLog(@"%@ %p Closing stream client-side", [self class], (__bridge void *)self); + @synchronized(self.requestsWriter) { + [self.requestsWriter finishWithError:nil]; + } + } + _requestsWriter = nil; + } + + [self.callbackFilter suppressCallbacks]; + _callbackFilter = nil; + + // Clean up remaining state. + _messageReceived = NO; + _rpc = nil; + + // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it + // could trigger undesirable recovery logic, etc.). + if (finalState != FSTStreamStateStopped) { + [self notifyStreamInterruptedWithError:error]; + } + + // Clear the delegates to avoid any possible bleed through of events from GRPC. + FSTAssert(_delegate, + @"closeWithFinalState should only be called for a started stream that has an active " + @"delegate."); + _delegate = nil; +} + +- (void)stop { + FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self); + if ([self isStarted]) { + [self closeWithFinalState:FSTStreamStateStopped error:nil]; + } +} + +- (void)inhibitBackoff { + FSTAssert(![self isStarted], @"Can only inhibit backoff after an error (was %ld)", + (long)self.state); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + // Clear the error condition. + self.state = FSTStreamStateInitial; + [self.backoff reset]; +} + +/** Called by the idle timer when the stream should close due to inactivity. */ +- (void)handleIdleCloseTimer { + [self.workerDispatchQueue verifyIsCurrentQueue]; + if (self.state == FSTStreamStateOpen && [self isIdle]) { + // When timing out an idle stream there's no reason to force the stream into backoff when + // it restarts so set the stream state to Initial instead of Error. + [self closeWithFinalState:FSTStreamStateInitial error:nil]; + } +} + +- (void)markIdle { + [self.workerDispatchQueue verifyIsCurrentQueue]; + if (self.state == FSTStreamStateOpen) { + self.idle = YES; + [self.workerDispatchQueue dispatchAfterDelay:kIdleTimeout + block:^() { + [self handleIdleCloseTimer]; + }]; + } +} + +- (void)cancelIdleCheck { + [self.workerDispatchQueue verifyIsCurrentQueue]; + self.idle = NO; +} + +/** + * Parses a protocol buffer response from the server. If the message fails to parse, generates + * an error and closes the stream. + * + * @param protoClass A protocol buffer message class object, that responds to parseFromData:error:. + * @param data The bytes in the response as returned from GRPC. + * @return An instance of the protocol buffer message, parsed from the data if parsing was + * successful, or nil otherwise. + */ +- (nullable id)parseProto:(Class)protoClass data:(NSData *)data error:(NSError **)error { + NSError *parseError; + id parsed = [protoClass parseFromData:data error:&parseError]; + if (parsed) { + *error = nil; + return parsed; + } else { + NSDictionary *info = @{ + NSLocalizedDescriptionKey : @"Unable to parse response from the server", + NSUnderlyingErrorKey : parseError, + @"Expected class" : protoClass, + @"Received value" : data, + }; + *error = [NSError errorWithDomain:FIRFirestoreErrorDomain + code:FIRFirestoreErrorCodeInternal + userInfo:info]; + return nil; + } +} + +/** + * Writes a request proto into the stream. + */ +- (void)writeRequest:(GPBMessage *)request { + NSData *data = [request data]; + + [self cancelIdleCheck]; + + FSTBufferedWriter *requestsWriter = self.requestsWriter; + @synchronized(requestsWriter) { + [requestsWriter writeValue:data]; + } +} + +#pragma mark Template methods for subclasses + +/** + * Called by the stream after the stream has opened. + * + * Subclasses should relay to their stream-specific delegate. Calling [super notifyStreamOpen] is + * not required. + */ +- (void)notifyStreamOpen { +} + +/** + * Called by the stream after the stream has been unexpectedly interrupted, either due to an error + * or due to idleness. + * + * Subclasses should relay to their stream-specific delegate. Calling [super + * notifyStreamInterrupted] is not required. + */ +- (void)notifyStreamInterruptedWithError:(nullable NSError *)error { +} + +/** + * Called by the stream for each incoming protocol message coming from the server. + * + * Subclasses should implement this to deserialize the value and relay to their stream-specific + * delegate, if appropriate. Calling [super handleStreamMessage] is not required. + */ +- (void)handleStreamMessage:(id)value { +} + +/** + * Called by the stream when the underlying RPC has been closed for whatever reason. + */ +- (void)handleStreamClose:(nullable NSError *)error { + FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error); + FSTAssert([self isStarted], @"Can't handle server close in non-started state."); + + // In theory the stream could close cleanly, however, in our current model we never expect this + // to happen because if we stop a stream ourselves, this callback will never be called. To + // prevent cases where we retry without a backoff accidentally, we set the stream to error + // in all cases. + [self closeWithFinalState:FSTStreamStateError error:error]; +} + +#pragma mark GRXWriteable implementation +// The GRXWriteable implementation defines the receive side of the RPC stream. + +/** + * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately + * redispatch back onto our own worker queue. + */ +- (void)writeValue:(id)value __used { + // TODO(mcg): remove the double-dispatch once GRPCCall at head is released. + // Once released we can set the responseDispatchQueue property on the GRPCCall and then this + // method can call handleStreamMessage directly. + FSTWeakify(self); + [self.workerDispatchQueue dispatchAsync:^{ + FSTStrongify(self); + if (!self || self.state == FSTStreamStateStopped) { + return; + } + if (!self.messageReceived) { + self.messageReceived = YES; + if ([FIRFirestore isLoggingEnabled]) { + FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]), + (__bridge void *)self, + [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]); + } + } + NSError *error; + id proto = [self parseProto:self.responseMessageClass data:value error:&error]; + if (proto) { + [self handleStreamMessage:proto]; + } else { + [_rpc finishWithError:error]; + } + }]; +} + +/** + * Called by GRPC when it closed the stream with an error representing the final state of the + * stream. + * + * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to + * directly inform stream-specific logic, or call stop to tear down the stream. + */ +- (void)writesFinishedWithError:(nullable NSError *)error __used { + error = [FSTDatastore firestoreErrorForError:error]; + FSTWeakify(self); + [self.workerDispatchQueue dispatchAsync:^{ + FSTStrongify(self); + if (!self || self.state == FSTStreamStateStopped) { + return; + } + [self handleStreamClose:error]; + }]; +} + +@end + +#pragma mark - FSTWatchStream + +@interface FSTWatchStream () + +@property(nonatomic, strong, readonly) FSTSerializerBeta *serializer; + +@end + +@implementation FSTWatchStream + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + serializer:(FSTSerializerBeta *)serializer { + self = [super initWithDatabase:database + workerDispatchQueue:workerDispatchQueue + credentials:credentials + responseMessageClass:[GCFSListenResponse class]]; + if (self) { + _serializer = serializer; + } + return self; +} + +- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter { + return [[GRPCCall alloc] initWithHost:self.databaseInfo.host + path:@"/google.firestore.v1beta1.Firestore/Listen" + requestsWriter:requestsWriter]; +} + +- (void)notifyStreamOpen { + [self.delegate watchStreamDidOpen]; +} + +- (void)notifyStreamInterruptedWithError:(nullable NSError *)error { + [self.delegate watchStreamWasInterruptedWithError:error]; +} + +- (void)watchQuery:(FSTQueryData *)query { + FSTAssert([self isOpen], @"Not yet open"); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + GCFSListenRequest *request = [GCFSListenRequest message]; + request.database = [_serializer encodedDatabaseID]; + request.addTarget = [_serializer encodedTarget:query]; + request.labels = [_serializer encodedListenRequestLabelsForQueryData:query]; + + FSTLog(@"FSTWatchStream %p watch: %@", (__bridge void *)self, request); + [self writeRequest:request]; +} + +- (void)unwatchTargetID:(FSTTargetID)targetID { + FSTAssert([self isOpen], @"Not yet open"); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + GCFSListenRequest *request = [GCFSListenRequest message]; + request.database = [_serializer encodedDatabaseID]; + request.removeTarget = targetID; + + FSTLog(@"FSTWatchStream %p unwatch: %@", (__bridge void *)self, request); + [self writeRequest:request]; +} + +/** + * Receives an inbound message from GRPC, deserializes, and then passes that on to the delegate's + * watchStreamDidChange:snapshotVersion: callback. + */ +- (void)handleStreamMessage:(GCFSListenResponse *)proto { + FSTLog(@"FSTWatchStream %p response: %@", (__bridge void *)self, proto); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + // A successful response means the stream is healthy. + [self.backoff reset]; + + FSTWatchChange *change = [_serializer decodedWatchChange:proto]; + FSTSnapshotVersion *snap = [_serializer versionFromListenResponse:proto]; + [self.delegate watchStreamDidChange:change snapshotVersion:snap]; +} + +@end + +#pragma mark - FSTWriteStream + +@interface FSTWriteStream () + +@property(nonatomic, strong, readonly) FSTSerializerBeta *serializer; + +@end + +@implementation FSTWriteStream + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + serializer:(FSTSerializerBeta *)serializer { + self = [super initWithDatabase:database + workerDispatchQueue:workerDispatchQueue + credentials:credentials + responseMessageClass:[GCFSWriteResponse class]]; + if (self) { + _serializer = serializer; + } + return self; +} + +- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter { + return [[GRPCCall alloc] initWithHost:self.databaseInfo.host + path:@"/google.firestore.v1beta1.Firestore/Write" + requestsWriter:requestsWriter]; +} + +- (void)startWithDelegate:(id)delegate { + self.handshakeComplete = NO; + [super startWithDelegate:delegate]; +} + +- (void)notifyStreamOpen { + [self.delegate writeStreamDidOpen]; +} + +- (void)notifyStreamInterruptedWithError:(nullable NSError *)error { + [self.delegate writeStreamWasInterruptedWithError:error]; +} + +- (void)writeHandshake { + // The initial request cannot contain mutations, but must contain a projectID. + FSTAssert([self isOpen], @"Not yet open"); + FSTAssert(!self.handshakeComplete, @"Handshake sent out of turn"); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + GCFSWriteRequest *request = [GCFSWriteRequest message]; + request.database = [_serializer encodedDatabaseID]; + // TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the + // handshake, ignoring any stream token we might have. + + FSTLog(@"FSTWriteStream %p initial request: %@", (__bridge void *)self, request); + [self writeRequest:request]; +} + +- (void)writeMutations:(NSArray<FSTMutation *> *)mutations { + FSTAssert([self isOpen], @"Not yet open"); + FSTAssert(self.handshakeComplete, @"Mutations sent out of turn"); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + NSMutableArray<GCFSWrite *> *protos = [NSMutableArray arrayWithCapacity:mutations.count]; + for (FSTMutation *mutation in mutations) { + [protos addObject:[_serializer encodedMutation:mutation]]; + }; + + GCFSWriteRequest *request = [GCFSWriteRequest message]; + request.writesArray = protos; + request.streamToken = self.lastStreamToken; + + FSTLog(@"FSTWriteStream %p mutation request: %@", (__bridge void *)self, request); + [self writeRequest:request]; +} + +/** + * Implements GRXWriteable to receive an inbound message from GRPC, deserialize, and then pass + * that on to the mutationResultsHandler. + */ +- (void)handleStreamMessage:(GCFSWriteResponse *)response { + FSTLog(@"FSTWriteStream %p response: %@", (__bridge void *)self, response); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + // Always capture the last stream token. + self.lastStreamToken = response.streamToken; + + if (!self.isHandshakeComplete) { + // The first response is the handshake response + self.handshakeComplete = YES; + + [self.delegate writeStreamDidCompleteHandshake]; + } else { + // A successful first write response means the stream is healthy. + // Note that we could consider a successful handshake healthy, however, the write itself + // might be causing an error we want to back off from. + [self.backoff reset]; + + FSTSnapshotVersion *commitVersion = [_serializer decodedVersion:response.commitTime]; + NSMutableArray<GCFSWriteResult *> *protos = response.writeResultsArray; + NSMutableArray<FSTMutationResult *> *results = [NSMutableArray arrayWithCapacity:protos.count]; + for (GCFSWriteResult *proto in protos) { + [results addObject:[_serializer decodedMutationResult:proto]]; + }; + + [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results]; + } +} + +@end |