diff options
Diffstat (limited to 'Firestore/Source/Remote/FSTRemoteStore.mm')
-rw-r--r-- | Firestore/Source/Remote/FSTRemoteStore.mm | 204 |
1 files changed, 76 insertions, 128 deletions
diff --git a/Firestore/Source/Remote/FSTRemoteStore.mm b/Firestore/Source/Remote/FSTRemoteStore.mm index 7338ffb..54d00c4 100644 --- a/Firestore/Source/Remote/FSTRemoteStore.mm +++ b/Firestore/Source/Remote/FSTRemoteStore.mm @@ -96,14 +96,13 @@ static const int kMaxPendingWrites = 10; * 0 we know that the client and server are in the same state (once this state * is reached the targetId is removed from the map to free the memory). */ -@property(nonatomic, strong, readonly) - NSMutableDictionary<FSTBoxedTargetID *, NSNumber *> *pendingTargetResponses; -@property(nonatomic, strong) NSMutableArray<FSTWatchChange *> *accumulatedChanges; @property(nonatomic, assign) FSTBatchID lastBatchSeen; @property(nonatomic, strong, readonly) FSTOnlineStateTracker *onlineStateTracker; +@property(nonatomic, strong, nullable) FSTWatchChangeAggregator *watchChangeAggregator; + #pragma mark Write Stream // The writeStream is null when the network is disabled. The non-null check is performed by // isNetworkEnabled. @@ -126,8 +125,6 @@ static const int kMaxPendingWrites = 10; _localStore = localStore; _datastore = datastore; _listenTargets = [NSMutableDictionary dictionary]; - _pendingTargetResponses = [NSMutableDictionary dictionary]; - _accumulatedChanges = [NSMutableArray array]; _lastBatchSeen = kFSTBatchIDUnknown; _pendingWrites = [NSMutableArray array]; @@ -229,6 +226,7 @@ static const int kMaxPendingWrites = 10; - (void)startWatchStream { HARD_ASSERT([self shouldStartWatchStream], "startWatchStream: called when shouldStartWatchStream: is false."); + _watchChangeAggregator = [[FSTWatchChangeAggregator alloc] initWithTargetMetadataProvider:self]; [self.watchStream startWithDelegate:self]; [self.onlineStateTracker handleWatchStreamStart]; } @@ -248,7 +246,7 @@ static const int kMaxPendingWrites = 10; } - (void)sendWatchRequestWithQueryData:(FSTQueryData *)queryData { - [self recordPendingRequestForTargetID:@(queryData.targetID)]; + [self.watchChangeAggregator recordTargetRequest:@(queryData.targetID)]; [self.watchStream watchQuery:queryData]; } @@ -267,16 +265,10 @@ static const int kMaxPendingWrites = 10; } - (void)sendUnwatchRequestForTargetID:(FSTBoxedTargetID *)targetID { - [self recordPendingRequestForTargetID:targetID]; + [self.watchChangeAggregator recordTargetRequest:targetID]; [self.watchStream unwatchTargetID:[targetID intValue]]; } -- (void)recordPendingRequestForTargetID:(FSTBoxedTargetID *)targetID { - NSNumber *count = [self.pendingTargetResponses objectForKey:targetID]; - count = @([count intValue] + 1); - [self.pendingTargetResponses setObject:count forKey:targetID]; -} - /** * Returns YES if the network is enabled, the watch stream has not yet been started and there are * active watch targets. @@ -286,11 +278,7 @@ static const int kMaxPendingWrites = 10; } - (void)cleanUpWatchStreamState { - // If the connection is closed then we'll never get a snapshot version for the accumulated - // changes and so we'll never be able to complete the batch. When we start up again the server - // is going to resend these changes anyway, so just toss the accumulated state. - [self.accumulatedChanges removeAllObjects]; - [self.pendingTargetResponses removeAllObjects]; + _watchChangeAggregator = nil; } - (void)watchStreamDidOpen { @@ -305,28 +293,27 @@ static const int kMaxPendingWrites = 10; // Mark the connection as Online because we got a message from the server. [self.onlineStateTracker updateState:FSTOnlineStateOnline]; - FSTWatchTargetChange *watchTargetChange = - [change isKindOfClass:[FSTWatchTargetChange class]] ? (FSTWatchTargetChange *)change : nil; - - if (watchTargetChange && watchTargetChange.state == FSTWatchTargetChangeStateRemoved && - watchTargetChange.cause) { - // There was an error on a target, don't wait for a consistent snapshot to raise events - [self processTargetErrorForWatchChange:(FSTWatchTargetChange *)change]; - } else { - // Accumulate watch changes but don't process them if there's no snapshotVersion or it's - // older than a previous snapshot we've processed (can happen after we resume a target - // using a resume token). - [self.accumulatedChanges addObject:change]; - if (snapshotVersion == SnapshotVersion::None() || - snapshotVersion < [self.localStore lastRemoteSnapshotVersion]) { - return; + if ([change isKindOfClass:[FSTWatchTargetChange class]]) { + FSTWatchTargetChange *watchTargetChange = (FSTWatchTargetChange *)change; + if (watchTargetChange.state == FSTWatchTargetChangeStateRemoved && watchTargetChange.cause) { + // There was an error on a target, don't wait for a consistent snapshot to raise events + return [self processTargetErrorForWatchChange:watchTargetChange]; + } else { + [self.watchChangeAggregator handleTargetChange:watchTargetChange]; } + } else if ([change isKindOfClass:[FSTDocumentWatchChange class]]) { + [self.watchChangeAggregator handleDocumentChange:(FSTDocumentWatchChange *)change]; + } else { + HARD_ASSERT([change isKindOfClass:[FSTExistenceFilterWatchChange class]], + "Expected watchChange to be an instance of FSTExistenceFilterWatchChange"); + [self.watchChangeAggregator handleExistenceFilter:(FSTExistenceFilterWatchChange *)change]; + } - // Create a batch, giving it the accumulatedChanges array. - NSArray<FSTWatchChange *> *changes = self.accumulatedChanges; - self.accumulatedChanges = [NSMutableArray array]; - - [self processBatchedWatchChanges:changes snapshotVersion:snapshotVersion]; + if (snapshotVersion != SnapshotVersion::None() && + snapshotVersion >= [self.localStore lastRemoteSnapshotVersion]) { + // We have received a target change with a global snapshot if the snapshot version is not equal + // to SnapshotVersion.None(). + [self raiseWatchSnapshotWithSnapshotVersion:snapshotVersion]; } } @@ -353,107 +340,60 @@ static const int kMaxPendingWrites = 10; * Takes a batch of changes from the Datastore, repackages them as a RemoteEvent, and passes that * on to the SyncEngine. */ -- (void)processBatchedWatchChanges:(NSArray<FSTWatchChange *> *)changes - snapshotVersion:(const SnapshotVersion &)snapshotVersion { - FSTWatchChangeAggregator *aggregator = - [[FSTWatchChangeAggregator alloc] initWithSnapshotVersion:snapshotVersion - listenTargets:self.listenTargets - pendingTargetResponses:self.pendingTargetResponses]; - [aggregator addWatchChanges:changes]; - FSTRemoteEvent *remoteEvent = [aggregator remoteEvent]; - [self.pendingTargetResponses removeAllObjects]; - [self.pendingTargetResponses setDictionary:aggregator.pendingTargetResponses]; - - // Handle existence filters and existence filter mismatches - [aggregator.existenceFilters enumerateKeysAndObjectsUsingBlock:^(FSTBoxedTargetID *target, - FSTExistenceFilter *filter, - BOOL *stop) { - FSTTargetID targetID = target.intValue; - - FSTQueryData *queryData = self.listenTargets[target]; - FSTQuery *query = queryData.query; - if (!queryData) { - // A watched target might have been removed already. - return; - - } else if ([query isDocumentQuery]) { - if (filter.count == 0) { - // The existence filter told us the document does not exist. - // We need to deduce that this document does not exist and apply a deleted document to our - // updates. Without applying a deleted document there might be another query that will - // raise this document as part of a snapshot until it is resolved, essentially exposing - // inconsistency between queries - const DocumentKey key{query.path}; - FSTDeletedDocument *deletedDoc = - [FSTDeletedDocument documentWithKey:key version:snapshotVersion]; - [remoteEvent addDocumentUpdate:deletedDoc]; - } else { - HARD_ASSERT(filter.count == 1, "Single document existence filter with count: %s", - filter.count); - } - - } else { - // Not a document query. - DocumentKeySet trackedRemote = [self.localStore remoteDocumentKeysForTarget:targetID]; - FSTTargetMapping *mapping = remoteEvent.targetChanges[target].mapping; - if (mapping) { - if ([mapping isKindOfClass:[FSTUpdateMapping class]]) { - FSTUpdateMapping *update = (FSTUpdateMapping *)mapping; - trackedRemote = [update applyTo:trackedRemote]; - } else { - HARD_ASSERT([mapping isKindOfClass:[FSTResetMapping class]], - "Expected either reset or update mapping but got something else %s", mapping); - trackedRemote = ((FSTResetMapping *)mapping).documents; - } - } +- (void)raiseWatchSnapshotWithSnapshotVersion:(const SnapshotVersion &)snapshotVersion { + HARD_ASSERT(snapshotVersion != SnapshotVersion::None(), + "Can't raise event for unknown SnapshotVersion"); - if (trackedRemote.size() != static_cast<size_t>(filter.count)) { - LOG_DEBUG("Existence filter mismatch, resetting mapping"); - - // Make sure the mismatch is exposed in the remote event - [remoteEvent handleExistenceFilterMismatchForTargetID:target]; - - // Clear the resume token for the query, since we're in a known mismatch state. - queryData = [[FSTQueryData alloc] initWithQuery:query - targetID:targetID - listenSequenceNumber:queryData.sequenceNumber - purpose:queryData.purpose]; - self.listenTargets[target] = queryData; - - // Cause a hard reset by unwatching and rewatching immediately, but deliberately don't - // send a resume token so that we get a full update. - [self sendUnwatchRequestForTargetID:@(targetID)]; - - // Mark the query we send as being on behalf of an existence filter mismatch, but don't - // actually retain that in listenTargets. This ensures that we flag the first re-listen - // this way without impacting future listens of this target (that might happen e.g. on - // reconnect). - FSTQueryData *requestQueryData = - [[FSTQueryData alloc] initWithQuery:query - targetID:targetID - listenSequenceNumber:queryData.sequenceNumber - purpose:FSTQueryPurposeExistenceFilterMismatch]; - [self sendWatchRequestWithQueryData:requestQueryData]; - } - } - }]; + FSTRemoteEvent *remoteEvent = + [self.watchChangeAggregator remoteEventAtSnapshotVersion:snapshotVersion]; // Update in-memory resume tokens. FSTLocalStore will update the persistent view of these when // applying the completed FSTRemoteEvent. - [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^( - FSTBoxedTargetID *target, FSTTargetChange *change, BOOL *stop) { - NSData *resumeToken = change.resumeToken; + for (const auto &entry : remoteEvent.targetChanges) { + NSData *resumeToken = entry.second.resumeToken; if (resumeToken.length > 0) { - FSTQueryData *queryData = self->_listenTargets[target]; + FSTBoxedTargetID *targetID = @(entry.first); + FSTQueryData *queryData = _listenTargets[targetID]; // A watched target might have been removed already. if (queryData) { - self->_listenTargets[target] = - [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion + _listenTargets[targetID] = + [queryData queryDataByReplacingSnapshotVersion:snapshotVersion resumeToken:resumeToken sequenceNumber:queryData.sequenceNumber]; } } - }]; + } + + // Re-establish listens for the targets that have been invalidated by existence filter mismatches. + for (FSTTargetID targetID : remoteEvent.targetMismatches) { + FSTQueryData *queryData = self.listenTargets[@(targetID)]; + + if (!queryData) { + // A watched target might have been removed already. + continue; + } + + // Clear the resume token for the query, since we're in a known mismatch state. + queryData = [[FSTQueryData alloc] initWithQuery:queryData.query + targetID:targetID + listenSequenceNumber:queryData.sequenceNumber + purpose:queryData.purpose]; + self.listenTargets[@(targetID)] = queryData; + + // Cause a hard reset by unwatching and rewatching immediately, but deliberately don't send a + // resume token so that we get a full update. + [self sendUnwatchRequestForTargetID:@(targetID)]; + + // Mark the query we send as being on behalf of an existence filter mismatch, but don't actually + // retain that in listenTargets. This ensures that we flag the first re-listen this way without + // impacting future listens of this target (that might happen e.g. on reconnect). + FSTQueryData *requestQueryData = + [[FSTQueryData alloc] initWithQuery:queryData.query + targetID:targetID + listenSequenceNumber:queryData.sequenceNumber + purpose:FSTQueryPurposeExistenceFilterMismatch]; + [self sendWatchRequestWithQueryData:requestQueryData]; + } // Finally handle remote event [self.syncEngine applyRemoteEvent:remoteEvent]; @@ -471,6 +411,14 @@ static const int kMaxPendingWrites = 10; } } +- (firebase::firestore::model::DocumentKeySet)remoteKeysForTarget:(FSTBoxedTargetID *)targetID { + return [self.syncEngine remoteKeysForTarget:targetID]; +} + +- (nullable FSTQueryData *)queryDataForTarget:(FSTBoxedTargetID *)targetID { + return self.listenTargets[targetID]; +} + #pragma mark Write Stream /** |