diff options
Diffstat (limited to 'Firestore/Source/Remote/FSTRemoteStore.mm')
-rw-r--r-- | Firestore/Source/Remote/FSTRemoteStore.mm | 712 |
1 files changed, 712 insertions, 0 deletions
diff --git a/Firestore/Source/Remote/FSTRemoteStore.mm b/Firestore/Source/Remote/FSTRemoteStore.mm new file mode 100644 index 0000000..123df49 --- /dev/null +++ b/Firestore/Source/Remote/FSTRemoteStore.mm @@ -0,0 +1,712 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "Firestore/Source/Remote/FSTRemoteStore.h" + +#include <inttypes.h> + +#import "Firestore/Source/Core/FSTQuery.h" +#import "Firestore/Source/Core/FSTSnapshotVersion.h" +#import "Firestore/Source/Core/FSTTransaction.h" +#import "Firestore/Source/Local/FSTLocalStore.h" +#import "Firestore/Source/Local/FSTQueryData.h" +#import "Firestore/Source/Model/FSTDocument.h" +#import "Firestore/Source/Model/FSTDocumentKey.h" +#import "Firestore/Source/Model/FSTMutation.h" +#import "Firestore/Source/Model/FSTMutationBatch.h" +#import "Firestore/Source/Remote/FSTDatastore.h" +#import "Firestore/Source/Remote/FSTExistenceFilter.h" +#import "Firestore/Source/Remote/FSTRemoteEvent.h" +#import "Firestore/Source/Remote/FSTStream.h" +#import "Firestore/Source/Remote/FSTWatchChange.h" +#import "Firestore/Source/Util/FSTAssert.h" +#import "Firestore/Source/Util/FSTLogger.h" + +NS_ASSUME_NONNULL_BEGIN + +/** + * The maximum number of pending writes to allow. + * TODO(bjornick): Negotiate this value with the backend. + */ +static const int kMaxPendingWrites = 10; + +/** + * The FSTRemoteStore notifies an onlineStateDelegate with FSTOnlineStateFailed if we fail to + * connect to the backend. This subsequently triggers get() requests to fail or use cached data, + * etc. Unfortunately, our connections have historically been subject to various transient failures. + * So we wait for multiple failures before notifying the onlineStateDelegate. + */ +static const int kOnlineAttemptsBeforeFailure = 2; + +#pragma mark - FSTRemoteStore + +@interface FSTRemoteStore () <FSTWatchStreamDelegate, FSTWriteStreamDelegate> + +- (instancetype)initWithLocalStore:(FSTLocalStore *)localStore + datastore:(FSTDatastore *)datastore NS_DESIGNATED_INITIALIZER; + +/** + * The local store, used to fill the write pipeline with outbound mutations and resolve existence + * filter mismatches. Immutable after initialization. + */ +@property(nonatomic, strong, readonly) FSTLocalStore *localStore; + +/** The client-side proxy for interacting with the backend. Immutable after initialization. */ +@property(nonatomic, strong, readonly) FSTDatastore *datastore; + +#pragma mark Watch Stream +// The watchStream is null when the network is disabled. The non-null check is performed by +// isNetworkEnabled. +@property(nonatomic, strong, nullable) FSTWatchStream *watchStream; + +/** + * A mapping of watched targets that the client cares about tracking and the + * user has explicitly called a 'listen' for this target. + * + * These targets may or may not have been sent to or acknowledged by the + * server. On re-establishing the listen stream, these targets should be sent + * to the server. The targets removed with unlistens are removed eagerly + * without waiting for confirmation from the listen stream. */ +@property(nonatomic, strong, readonly) + NSMutableDictionary<FSTBoxedTargetID *, FSTQueryData *> *listenTargets; + +/** + * A mapping of targetId to pending acks needed. + * + * If a targetId is present in this map, then we're waiting for watch to + * acknowledge a removal or addition of the target. If a target is not in this + * mapping, and it's in the listenTargets map, then we consider the target to + * be active. + * + * We increment the count here everytime we issue a request over the stream to + * watch or unwatch. We then decrement the count everytime we get a target + * added or target removed message from the server. Once the count is equal to + * 0 we know that the client and server are in the same state (once this state + * is reached the targetId is removed from the map to free the memory). + */ +@property(nonatomic, strong, readonly) + NSMutableDictionary<FSTBoxedTargetID *, NSNumber *> *pendingTargetResponses; + +@property(nonatomic, strong) NSMutableArray<FSTWatchChange *> *accumulatedChanges; +@property(nonatomic, assign) FSTBatchID lastBatchSeen; + +/** + * The online state of the watch stream. The state is set to healthy if and only if there are + * messages received by the backend. + */ +@property(nonatomic, assign) FSTOnlineState watchStreamOnlineState; + +/** A count of consecutive failures to open the stream. */ +@property(nonatomic, assign) int watchStreamFailures; + +/** Whether the client should fire offline warning. */ +@property(nonatomic, assign) BOOL shouldWarnOffline; + +#pragma mark Write Stream +// The writeStream is null when the network is disabled. The non-null check is performed by +// isNetworkEnabled. +@property(nonatomic, strong, nullable) FSTWriteStream *writeStream; + +/** + * The approximate time the StreamingWrite stream was opened. Used to estimate if stream was + * closed due to an auth expiration (a recoverable error) or some other more permanent error. + */ +@property(nonatomic, strong, nullable) NSDate *writeStreamOpenTime; + +/** + * A FIFO queue of in-flight writes. This is in-flight from the point of view of the caller of + * writeMutations, not from the point of view from the Datastore itself. In particular, these + * requests may not have been sent to the Datastore server if the write stream is not yet running. + */ +@property(nonatomic, strong, readonly) NSMutableArray<FSTMutationBatch *> *pendingWrites; +@end + +@implementation FSTRemoteStore + ++ (instancetype)remoteStoreWithLocalStore:(FSTLocalStore *)localStore + datastore:(FSTDatastore *)datastore { + return [[FSTRemoteStore alloc] initWithLocalStore:localStore datastore:datastore]; +} + +- (instancetype)initWithLocalStore:(FSTLocalStore *)localStore datastore:(FSTDatastore *)datastore { + if (self = [super init]) { + _localStore = localStore; + _datastore = datastore; + _listenTargets = [NSMutableDictionary dictionary]; + _pendingTargetResponses = [NSMutableDictionary dictionary]; + _accumulatedChanges = [NSMutableArray array]; + + _lastBatchSeen = kFSTBatchIDUnknown; + _watchStreamOnlineState = FSTOnlineStateUnknown; + _shouldWarnOffline = YES; + _pendingWrites = [NSMutableArray array]; + } + return self; +} + +- (void)start { + // For now, all setup is handled by enableNetwork(). We might expand on this in the future. + [self enableNetwork]; +} + +/** + * Updates our OnlineState to the new state, updating local state and notifying the + * onlineStateHandler as appropriate. + */ +- (void)updateOnlineState:(FSTOnlineState)newState { + // Update and broadcast the new state. + if (newState != self.watchStreamOnlineState) { + if (newState == FSTOnlineStateHealthy) { + // We've connected to watch at least once. Don't warn the developer about being offline going + // forward. + self.shouldWarnOffline = NO; + } else if (newState == FSTOnlineStateUnknown) { + // The state is set to unknown when a healthy stream is closed (e.g. due to a token timeout) + // or when we have no active listens and therefore there's no need to start the stream. + // Assuming there is (possibly in the future) an active listen, then we will eventually move + // to state Online or Failed, but we always want to make at least kOnlineAttemptsBeforeFailure + // attempts before failing, so we reset the count here. + self.watchStreamFailures = 0; + } + self.watchStreamOnlineState = newState; + [self.onlineStateDelegate applyChangedOnlineState:newState]; + } +} + +/** + * Updates our FSTOnlineState as appropriate after the watch stream reports a failure. The first + * failure moves us to the 'Unknown' state. We then may allow multiple failures (based on + * kOnlineAttemptsBeforeFailure) before we actually transition to FSTOnlineStateFailed. + */ +- (void)updateOnlineStateAfterFailure { + if (self.watchStreamOnlineState == FSTOnlineStateHealthy) { + [self updateOnlineState:FSTOnlineStateUnknown]; + } else { + self.watchStreamFailures++; + if (self.watchStreamFailures >= kOnlineAttemptsBeforeFailure) { + if (self.shouldWarnOffline) { + FSTWarn(@"Could not reach Firestore backend."); + self.shouldWarnOffline = NO; + } + [self updateOnlineState:FSTOnlineStateFailed]; + } + } +} + +#pragma mark Online/Offline state + +- (BOOL)isNetworkEnabled { + FSTAssert((self.watchStream == nil) == (self.writeStream == nil), + @"WatchStream and WriteStream should both be null or non-null"); + return self.watchStream != nil; +} + +- (void)enableNetwork { + if ([self isNetworkEnabled]) { + return; + } + + // Create new streams (but note they're not started yet). + self.watchStream = [self.datastore createWatchStream]; + self.writeStream = [self.datastore createWriteStream]; + + // Load any saved stream token from persistent storage + self.writeStream.lastStreamToken = [self.localStore lastStreamToken]; + + if ([self shouldStartWatchStream]) { + [self startWatchStream]; + } + + [self fillWritePipeline]; // This may start the writeStream. + + // We move back to the unknown state because we might not want to re-open the stream + [self updateOnlineState:FSTOnlineStateUnknown]; +} + +- (void)disableNetwork { + [self disableNetworkInternal]; + // Set the FSTOnlineState to failed so get()'s return from cache, etc. + [self updateOnlineState:FSTOnlineStateFailed]; +} + +/** Disables the network, setting the FSTOnlineState to the specified targetOnlineState. */ +- (void)disableNetworkInternal { + if ([self isNetworkEnabled]) { + // NOTE: We're guaranteed not to get any further events from these streams (not even a close + // event). + [self.watchStream stop]; + [self.writeStream stop]; + + [self cleanUpWatchStreamState]; + [self cleanUpWriteStreamState]; + + self.writeStream = nil; + self.watchStream = nil; + } +} + +#pragma mark Shutdown + +- (void)shutdown { + FSTLog(@"FSTRemoteStore %p shutting down", (__bridge void *)self); + [self disableNetworkInternal]; + // Set the FSTOnlineState to Unknown (rather than Failed) to avoid potentially triggering + // spurious listener events with cached data, etc. + [self updateOnlineState:FSTOnlineStateUnknown]; +} + +- (void)userDidChange:(FSTUser *)user { + FSTLog(@"FSTRemoteStore %p changing users: %@", (__bridge void *)self, user); + if ([self isNetworkEnabled]) { + // Tear down and re-create our network streams. This will ensure we get a fresh auth token + // for the new user and re-fill the write pipeline with new mutations from the LocalStore + // (since mutations are per-user). + [self disableNetworkInternal]; + [self updateOnlineState:FSTOnlineStateUnknown]; + [self enableNetwork]; + } +} + +#pragma mark Watch Stream + +- (void)startWatchStream { + FSTAssert([self shouldStartWatchStream], + @"startWatchStream: called when shouldStartWatchStream: is false."); + [self.watchStream startWithDelegate:self]; +} + +- (void)listenToTargetWithQueryData:(FSTQueryData *)queryData { + NSNumber *targetKey = @(queryData.targetID); + FSTAssert(!self.listenTargets[targetKey], @"listenToQuery called with duplicate target id: %@", + targetKey); + + self.listenTargets[targetKey] = queryData; + + if ([self shouldStartWatchStream]) { + [self startWatchStream]; + } else if ([self isNetworkEnabled] && [self.watchStream isOpen]) { + [self sendWatchRequestWithQueryData:queryData]; + } +} + +- (void)sendWatchRequestWithQueryData:(FSTQueryData *)queryData { + [self recordPendingRequestForTargetID:@(queryData.targetID)]; + [self.watchStream watchQuery:queryData]; +} + +- (void)stopListeningToTargetID:(FSTTargetID)targetID { + FSTBoxedTargetID *targetKey = @(targetID); + FSTQueryData *queryData = self.listenTargets[targetKey]; + FSTAssert(queryData, @"unlistenToTarget: target not currently watched: %@", targetKey); + + [self.listenTargets removeObjectForKey:targetKey]; + if ([self isNetworkEnabled] && [self.watchStream isOpen]) { + [self sendUnwatchRequestForTargetID:targetKey]; + if ([self.listenTargets count] == 0) { + [self.watchStream markIdle]; + } + } +} + +- (void)sendUnwatchRequestForTargetID:(FSTBoxedTargetID *)targetID { + [self recordPendingRequestForTargetID:targetID]; + [self.watchStream unwatchTargetID:[targetID intValue]]; +} + +- (void)recordPendingRequestForTargetID:(FSTBoxedTargetID *)targetID { + NSNumber *count = [self.pendingTargetResponses objectForKey:targetID]; + count = @([count intValue] + 1); + [self.pendingTargetResponses setObject:count forKey:targetID]; +} + +/** + * Returns YES if the network is enabled, the watch stream has not yet been started and there are + * active watch targets. + */ +- (BOOL)shouldStartWatchStream { + return [self isNetworkEnabled] && ![self.watchStream isStarted] && self.listenTargets.count > 0; +} + +- (void)cleanUpWatchStreamState { + // If the connection is closed then we'll never get a snapshot version for the accumulated + // changes and so we'll never be able to complete the batch. When we start up again the server + // is going to resend these changes anyway, so just toss the accumulated state. + [self.accumulatedChanges removeAllObjects]; + [self.pendingTargetResponses removeAllObjects]; +} + +- (void)watchStreamDidOpen { + // Restore any existing watches. + for (FSTQueryData *queryData in [self.listenTargets objectEnumerator]) { + [self sendWatchRequestWithQueryData:queryData]; + } +} + +- (void)watchStreamDidChange:(FSTWatchChange *)change + snapshotVersion:(FSTSnapshotVersion *)snapshotVersion { + // Mark the connection as healthy because we got a message from the server. + [self updateOnlineState:FSTOnlineStateHealthy]; + + FSTWatchTargetChange *watchTargetChange = + [change isKindOfClass:[FSTWatchTargetChange class]] ? (FSTWatchTargetChange *)change : nil; + + if (watchTargetChange && watchTargetChange.state == FSTWatchTargetChangeStateRemoved && + watchTargetChange.cause) { + // There was an error on a target, don't wait for a consistent snapshot to raise events + [self processTargetErrorForWatchChange:(FSTWatchTargetChange *)change]; + } else { + // Accumulate watch changes but don't process them if there's no snapshotVersion or it's + // older than a previous snapshot we've processed (can happen after we resume a target + // using a resume token). + [self.accumulatedChanges addObject:change]; + FSTAssert(snapshotVersion, @"snapshotVersion must not be nil."); + if ([snapshotVersion isEqual:[FSTSnapshotVersion noVersion]] || + [snapshotVersion compare:[self.localStore lastRemoteSnapshotVersion]] == + NSOrderedAscending) { + return; + } + + // Create a batch, giving it the accumulatedChanges array. + NSArray<FSTWatchChange *> *changes = self.accumulatedChanges; + self.accumulatedChanges = [NSMutableArray array]; + + [self processBatchedWatchChanges:changes snapshotVersion:snapshotVersion]; + } +} + +- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error { + FSTAssert([self isNetworkEnabled], + @"watchStreamDidClose should only be called when the network is enabled"); + + [self cleanUpWatchStreamState]; + + // If the watch stream closed due to an error, retry the connection if there are any active + // watch targets. + if ([self shouldStartWatchStream]) { + [self updateOnlineStateAfterFailure]; + [self startWatchStream]; + } else { + // We don't need to restart the watch stream because there are no active targets. The online + // state is set to unknown because there is no active attempt at establishing a connection. + [self updateOnlineState:FSTOnlineStateUnknown]; + } +} + +/** + * Takes a batch of changes from the Datastore, repackages them as a RemoteEvent, and passes that + * on to the SyncEngine. + */ +- (void)processBatchedWatchChanges:(NSArray<FSTWatchChange *> *)changes + snapshotVersion:(FSTSnapshotVersion *)snapshotVersion { + FSTWatchChangeAggregator *aggregator = + [[FSTWatchChangeAggregator alloc] initWithSnapshotVersion:snapshotVersion + listenTargets:self.listenTargets + pendingTargetResponses:self.pendingTargetResponses]; + [aggregator addWatchChanges:changes]; + FSTRemoteEvent *remoteEvent = [aggregator remoteEvent]; + [self.pendingTargetResponses removeAllObjects]; + [self.pendingTargetResponses setDictionary:aggregator.pendingTargetResponses]; + + // Handle existence filters and existence filter mismatches + [aggregator.existenceFilters enumerateKeysAndObjectsUsingBlock:^(FSTBoxedTargetID *target, + FSTExistenceFilter *filter, + BOOL *stop) { + FSTTargetID targetID = target.intValue; + + FSTQueryData *queryData = self.listenTargets[target]; + FSTQuery *query = queryData.query; + if (!queryData) { + // A watched target might have been removed already. + return; + + } else if ([query isDocumentQuery]) { + if (filter.count == 0) { + // The existence filter told us the document does not exist. + // We need to deduce that this document does not exist and apply a deleted document to our + // updates. Without applying a deleted document there might be another query that will + // raise this document as part of a snapshot until it is resolved, essentially exposing + // inconsistency between queries + FSTDocumentKey *key = [FSTDocumentKey keyWithPath:query.path]; + FSTDeletedDocument *deletedDoc = + [FSTDeletedDocument documentWithKey:key version:snapshotVersion]; + [remoteEvent addDocumentUpdate:deletedDoc]; + } else { + FSTAssert(filter.count == 1, @"Single document existence filter with count: %" PRId32, + filter.count); + } + + } else { + // Not a document query. + FSTDocumentKeySet *trackedRemote = [self.localStore remoteDocumentKeysForTarget:targetID]; + FSTTargetMapping *mapping = remoteEvent.targetChanges[target].mapping; + if (mapping) { + if ([mapping isKindOfClass:[FSTUpdateMapping class]]) { + FSTUpdateMapping *update = (FSTUpdateMapping *)mapping; + trackedRemote = [update applyTo:trackedRemote]; + } else { + FSTAssert([mapping isKindOfClass:[FSTResetMapping class]], + @"Expected either reset or update mapping but got something else %@", mapping); + trackedRemote = ((FSTResetMapping *)mapping).documents; + } + } + + if (trackedRemote.count != (NSUInteger)filter.count) { + FSTLog(@"Existence filter mismatch, resetting mapping"); + + // Make sure the mismatch is exposed in the remote event + [remoteEvent handleExistenceFilterMismatchForTargetID:target]; + + // Clear the resume token for the query, since we're in a known mismatch state. + queryData = [[FSTQueryData alloc] initWithQuery:query + targetID:targetID + listenSequenceNumber:queryData.sequenceNumber + purpose:queryData.purpose]; + self.listenTargets[target] = queryData; + + // Cause a hard reset by unwatching and rewatching immediately, but deliberately don't + // send a resume token so that we get a full update. + [self sendUnwatchRequestForTargetID:@(targetID)]; + + // Mark the query we send as being on behalf of an existence filter mismatch, but don't + // actually retain that in listenTargets. This ensures that we flag the first re-listen + // this way without impacting future listens of this target (that might happen e.g. on + // reconnect). + FSTQueryData *requestQueryData = + [[FSTQueryData alloc] initWithQuery:query + targetID:targetID + listenSequenceNumber:queryData.sequenceNumber + purpose:FSTQueryPurposeExistenceFilterMismatch]; + [self sendWatchRequestWithQueryData:requestQueryData]; + } + } + }]; + + // Update in-memory resume tokens. FSTLocalStore will update the persistent view of these when + // applying the completed FSTRemoteEvent. + [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^( + FSTBoxedTargetID *target, FSTTargetChange *change, BOOL *stop) { + NSData *resumeToken = change.resumeToken; + if (resumeToken.length > 0) { + FSTQueryData *queryData = _listenTargets[target]; + // A watched target might have been removed already. + if (queryData) { + _listenTargets[target] = + [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion + resumeToken:resumeToken]; + } + } + }]; + + // Finally handle remote event + [self.syncEngine applyRemoteEvent:remoteEvent]; +} + +/** Process a target error and passes the error along to SyncEngine. */ +- (void)processTargetErrorForWatchChange:(FSTWatchTargetChange *)change { + FSTAssert(change.cause, @"Handling target error without a cause"); + // Ignore targets that have been removed already. + for (FSTBoxedTargetID *targetID in change.targetIDs) { + if (self.listenTargets[targetID]) { + [self.listenTargets removeObjectForKey:targetID]; + [self.syncEngine rejectListenWithTargetID:targetID error:change.cause]; + } + } +} + +#pragma mark Write Stream + +/** + * Returns YES if the network is enabled, the write stream has not yet been started and there are + * pending writes. + */ +- (BOOL)shouldStartWriteStream { + return [self isNetworkEnabled] && ![self.writeStream isStarted] && self.pendingWrites.count > 0; +} + +- (void)startWriteStream { + FSTAssert([self shouldStartWriteStream], + @"startWriteStream: called when shouldStartWriteStream: is false."); + + [self.writeStream startWithDelegate:self]; +} + +- (void)cleanUpWriteStreamState { + self.lastBatchSeen = kFSTBatchIDUnknown; + FSTLog(@"Stopping write stream with %lu pending writes", + (unsigned long)[self.pendingWrites count]); + [self.pendingWrites removeAllObjects]; +} + +- (void)fillWritePipeline { + if ([self isNetworkEnabled]) { + while ([self canWriteMutations]) { + FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen]; + if (!batch) { + break; + } + [self commitBatch:batch]; + } + + if ([self.pendingWrites count] == 0) { + [self.writeStream markIdle]; + } + } +} + +/** + * Returns YES if the backend can accept additional write requests. + * + * When sending mutations to the write stream (e.g. in -fillWritePipeline), call this method first + * to check if more mutations can be sent. + * + * Currently the only thing that can prevent the backend from accepting write requests is if + * there are too many requests already outstanding. As writes complete the backend will be able + * to accept more. + */ +- (BOOL)canWriteMutations { + return [self isNetworkEnabled] && self.pendingWrites.count < kMaxPendingWrites; +} + +/** Given mutations to commit, actually commits them to the backend. */ +- (void)commitBatch:(FSTMutationBatch *)batch { + FSTAssert([self canWriteMutations], @"commitBatch called when mutations can't be written"); + self.lastBatchSeen = batch.batchID; + + [self.pendingWrites addObject:batch]; + + if ([self shouldStartWriteStream]) { + [self startWriteStream]; + } else if ([self isNetworkEnabled] && self.writeStream.handshakeComplete) { + [self.writeStream writeMutations:batch.mutations]; + } +} + +- (void)writeStreamDidOpen { + self.writeStreamOpenTime = [NSDate date]; + + [self.writeStream writeHandshake]; +} + +/** + * Handles a successful handshake response from the server, which is our cue to send any pending + * writes. + */ +- (void)writeStreamDidCompleteHandshake { + // Record the stream token. + [self.localStore setLastStreamToken:self.writeStream.lastStreamToken]; + + // Drain any pending writes. + // + // Note that at this point pendingWrites contains mutations that have already been accepted by + // fillWritePipeline/commitBatch. If the pipeline is full, canWriteMutations will be NO, despite + // the fact that we actually need to send mutations over. + // + // This also means that this method indirectly respects the limits imposed by canWriteMutations + // since writes can't be added to the pendingWrites array when canWriteMutations is NO. If the + // limits imposed by canWriteMutations actually protect us from DOSing ourselves then those limits + // won't be exceeded here and we'll continue to make progress. + for (FSTMutationBatch *write in self.pendingWrites) { + [self.writeStream writeMutations:write.mutations]; + } +} + +/** Handles a successful StreamingWriteResponse from the server that contains a mutation result. */ +- (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion + mutationResults:(NSArray<FSTMutationResult *> *)results { + // This is a response to a write containing mutations and should be correlated to the first + // pending write. + NSMutableArray *pendingWrites = self.pendingWrites; + FSTMutationBatch *batch = pendingWrites[0]; + [pendingWrites removeObjectAtIndex:0]; + + FSTMutationBatchResult *batchResult = + [FSTMutationBatchResult resultWithBatch:batch + commitVersion:commitVersion + mutationResults:results + streamToken:self.writeStream.lastStreamToken]; + [self.syncEngine applySuccessfulWriteWithResult:batchResult]; + + // It's possible that with the completion of this mutation another slot has freed up. + [self fillWritePipeline]; +} + +/** + * Handles the closing of the StreamingWrite RPC, either because of an error or because the RPC + * has been terminated by the client or the server. + */ +- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error { + FSTAssert([self isNetworkEnabled], + @"writeStreamDidClose: should only be called when the network is enabled"); + + // If the write stream closed due to an error, invoke the error callbacks if there are pending + // writes. + if (error != nil && self.pendingWrites.count > 0) { + if (self.writeStream.handshakeComplete) { + // This error affects the actual writes. + [self handleWriteError:error]; + } else { + // If there was an error before the handshake finished, it's possible that the server is + // unable to process the stream token we're sending. (Perhaps it's too old?) + [self handleHandshakeError:error]; + } + } + + // The write stream might have been started by refilling the write pipeline for failed writes + if ([self shouldStartWriteStream]) { + [self startWriteStream]; + } +} + +- (void)handleHandshakeError:(NSError *)error { + // Reset the token if it's a permanent error or the error code is ABORTED, signaling the write + // stream is no longer valid. + if ([FSTDatastore isPermanentWriteError:error] || [FSTDatastore isAbortedError:error]) { + NSString *token = [self.writeStream.lastStreamToken base64EncodedStringWithOptions:0]; + FSTLog(@"FSTRemoteStore %p error before completed handshake; resetting stream token %@: %@", + (__bridge void *)self, token, error); + self.writeStream.lastStreamToken = nil; + [self.localStore setLastStreamToken:nil]; + } +} + +- (void)handleWriteError:(NSError *)error { + // Only handle permanent error. If it's transient, just let the retry logic kick in. + if (![FSTDatastore isPermanentWriteError:error]) { + return; + } + + // If this was a permanent error, the request itself was the problem so it's not going to + // succeed if we resend it. + FSTMutationBatch *batch = self.pendingWrites[0]; + [self.pendingWrites removeObjectAtIndex:0]; + + // In this case it's also unlikely that the server itself is melting down--this was just a + // bad request so inhibit backoff on the next restart. + [self.writeStream inhibitBackoff]; + + [self.syncEngine rejectFailedWriteWithBatchID:batch.batchID error:error]; + + // It's possible that with the completion of this mutation another slot has freed up. + [self fillWritePipeline]; +} + +- (FSTTransaction *)transaction { + return [FSTTransaction transactionWithDatastore:self.datastore]; +} + +@end + +NS_ASSUME_NONNULL_END |