aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/Source/Remote/FSTRemoteStore.mm
diff options
context:
space:
mode:
Diffstat (limited to 'Firestore/Source/Remote/FSTRemoteStore.mm')
-rw-r--r--Firestore/Source/Remote/FSTRemoteStore.mm204
1 files changed, 76 insertions, 128 deletions
diff --git a/Firestore/Source/Remote/FSTRemoteStore.mm b/Firestore/Source/Remote/FSTRemoteStore.mm
index 7338ffb..54d00c4 100644
--- a/Firestore/Source/Remote/FSTRemoteStore.mm
+++ b/Firestore/Source/Remote/FSTRemoteStore.mm
@@ -96,14 +96,13 @@ static const int kMaxPendingWrites = 10;
* 0 we know that the client and server are in the same state (once this state
* is reached the targetId is removed from the map to free the memory).
*/
-@property(nonatomic, strong, readonly)
- NSMutableDictionary<FSTBoxedTargetID *, NSNumber *> *pendingTargetResponses;
-@property(nonatomic, strong) NSMutableArray<FSTWatchChange *> *accumulatedChanges;
@property(nonatomic, assign) FSTBatchID lastBatchSeen;
@property(nonatomic, strong, readonly) FSTOnlineStateTracker *onlineStateTracker;
+@property(nonatomic, strong, nullable) FSTWatchChangeAggregator *watchChangeAggregator;
+
#pragma mark Write Stream
// The writeStream is null when the network is disabled. The non-null check is performed by
// isNetworkEnabled.
@@ -126,8 +125,6 @@ static const int kMaxPendingWrites = 10;
_localStore = localStore;
_datastore = datastore;
_listenTargets = [NSMutableDictionary dictionary];
- _pendingTargetResponses = [NSMutableDictionary dictionary];
- _accumulatedChanges = [NSMutableArray array];
_lastBatchSeen = kFSTBatchIDUnknown;
_pendingWrites = [NSMutableArray array];
@@ -229,6 +226,7 @@ static const int kMaxPendingWrites = 10;
- (void)startWatchStream {
HARD_ASSERT([self shouldStartWatchStream],
"startWatchStream: called when shouldStartWatchStream: is false.");
+ _watchChangeAggregator = [[FSTWatchChangeAggregator alloc] initWithTargetMetadataProvider:self];
[self.watchStream startWithDelegate:self];
[self.onlineStateTracker handleWatchStreamStart];
}
@@ -248,7 +246,7 @@ static const int kMaxPendingWrites = 10;
}
- (void)sendWatchRequestWithQueryData:(FSTQueryData *)queryData {
- [self recordPendingRequestForTargetID:@(queryData.targetID)];
+ [self.watchChangeAggregator recordTargetRequest:@(queryData.targetID)];
[self.watchStream watchQuery:queryData];
}
@@ -267,16 +265,10 @@ static const int kMaxPendingWrites = 10;
}
- (void)sendUnwatchRequestForTargetID:(FSTBoxedTargetID *)targetID {
- [self recordPendingRequestForTargetID:targetID];
+ [self.watchChangeAggregator recordTargetRequest:targetID];
[self.watchStream unwatchTargetID:[targetID intValue]];
}
-- (void)recordPendingRequestForTargetID:(FSTBoxedTargetID *)targetID {
- NSNumber *count = [self.pendingTargetResponses objectForKey:targetID];
- count = @([count intValue] + 1);
- [self.pendingTargetResponses setObject:count forKey:targetID];
-}
-
/**
* Returns YES if the network is enabled, the watch stream has not yet been started and there are
* active watch targets.
@@ -286,11 +278,7 @@ static const int kMaxPendingWrites = 10;
}
- (void)cleanUpWatchStreamState {
- // If the connection is closed then we'll never get a snapshot version for the accumulated
- // changes and so we'll never be able to complete the batch. When we start up again the server
- // is going to resend these changes anyway, so just toss the accumulated state.
- [self.accumulatedChanges removeAllObjects];
- [self.pendingTargetResponses removeAllObjects];
+ _watchChangeAggregator = nil;
}
- (void)watchStreamDidOpen {
@@ -305,28 +293,27 @@ static const int kMaxPendingWrites = 10;
// Mark the connection as Online because we got a message from the server.
[self.onlineStateTracker updateState:FSTOnlineStateOnline];
- FSTWatchTargetChange *watchTargetChange =
- [change isKindOfClass:[FSTWatchTargetChange class]] ? (FSTWatchTargetChange *)change : nil;
-
- if (watchTargetChange && watchTargetChange.state == FSTWatchTargetChangeStateRemoved &&
- watchTargetChange.cause) {
- // There was an error on a target, don't wait for a consistent snapshot to raise events
- [self processTargetErrorForWatchChange:(FSTWatchTargetChange *)change];
- } else {
- // Accumulate watch changes but don't process them if there's no snapshotVersion or it's
- // older than a previous snapshot we've processed (can happen after we resume a target
- // using a resume token).
- [self.accumulatedChanges addObject:change];
- if (snapshotVersion == SnapshotVersion::None() ||
- snapshotVersion < [self.localStore lastRemoteSnapshotVersion]) {
- return;
+ if ([change isKindOfClass:[FSTWatchTargetChange class]]) {
+ FSTWatchTargetChange *watchTargetChange = (FSTWatchTargetChange *)change;
+ if (watchTargetChange.state == FSTWatchTargetChangeStateRemoved && watchTargetChange.cause) {
+ // There was an error on a target, don't wait for a consistent snapshot to raise events
+ return [self processTargetErrorForWatchChange:watchTargetChange];
+ } else {
+ [self.watchChangeAggregator handleTargetChange:watchTargetChange];
}
+ } else if ([change isKindOfClass:[FSTDocumentWatchChange class]]) {
+ [self.watchChangeAggregator handleDocumentChange:(FSTDocumentWatchChange *)change];
+ } else {
+ HARD_ASSERT([change isKindOfClass:[FSTExistenceFilterWatchChange class]],
+ "Expected watchChange to be an instance of FSTExistenceFilterWatchChange");
+ [self.watchChangeAggregator handleExistenceFilter:(FSTExistenceFilterWatchChange *)change];
+ }
- // Create a batch, giving it the accumulatedChanges array.
- NSArray<FSTWatchChange *> *changes = self.accumulatedChanges;
- self.accumulatedChanges = [NSMutableArray array];
-
- [self processBatchedWatchChanges:changes snapshotVersion:snapshotVersion];
+ if (snapshotVersion != SnapshotVersion::None() &&
+ snapshotVersion >= [self.localStore lastRemoteSnapshotVersion]) {
+ // We have received a target change with a global snapshot if the snapshot version is not equal
+ // to SnapshotVersion.None().
+ [self raiseWatchSnapshotWithSnapshotVersion:snapshotVersion];
}
}
@@ -353,107 +340,60 @@ static const int kMaxPendingWrites = 10;
* Takes a batch of changes from the Datastore, repackages them as a RemoteEvent, and passes that
* on to the SyncEngine.
*/
-- (void)processBatchedWatchChanges:(NSArray<FSTWatchChange *> *)changes
- snapshotVersion:(const SnapshotVersion &)snapshotVersion {
- FSTWatchChangeAggregator *aggregator =
- [[FSTWatchChangeAggregator alloc] initWithSnapshotVersion:snapshotVersion
- listenTargets:self.listenTargets
- pendingTargetResponses:self.pendingTargetResponses];
- [aggregator addWatchChanges:changes];
- FSTRemoteEvent *remoteEvent = [aggregator remoteEvent];
- [self.pendingTargetResponses removeAllObjects];
- [self.pendingTargetResponses setDictionary:aggregator.pendingTargetResponses];
-
- // Handle existence filters and existence filter mismatches
- [aggregator.existenceFilters enumerateKeysAndObjectsUsingBlock:^(FSTBoxedTargetID *target,
- FSTExistenceFilter *filter,
- BOOL *stop) {
- FSTTargetID targetID = target.intValue;
-
- FSTQueryData *queryData = self.listenTargets[target];
- FSTQuery *query = queryData.query;
- if (!queryData) {
- // A watched target might have been removed already.
- return;
-
- } else if ([query isDocumentQuery]) {
- if (filter.count == 0) {
- // The existence filter told us the document does not exist.
- // We need to deduce that this document does not exist and apply a deleted document to our
- // updates. Without applying a deleted document there might be another query that will
- // raise this document as part of a snapshot until it is resolved, essentially exposing
- // inconsistency between queries
- const DocumentKey key{query.path};
- FSTDeletedDocument *deletedDoc =
- [FSTDeletedDocument documentWithKey:key version:snapshotVersion];
- [remoteEvent addDocumentUpdate:deletedDoc];
- } else {
- HARD_ASSERT(filter.count == 1, "Single document existence filter with count: %s",
- filter.count);
- }
-
- } else {
- // Not a document query.
- DocumentKeySet trackedRemote = [self.localStore remoteDocumentKeysForTarget:targetID];
- FSTTargetMapping *mapping = remoteEvent.targetChanges[target].mapping;
- if (mapping) {
- if ([mapping isKindOfClass:[FSTUpdateMapping class]]) {
- FSTUpdateMapping *update = (FSTUpdateMapping *)mapping;
- trackedRemote = [update applyTo:trackedRemote];
- } else {
- HARD_ASSERT([mapping isKindOfClass:[FSTResetMapping class]],
- "Expected either reset or update mapping but got something else %s", mapping);
- trackedRemote = ((FSTResetMapping *)mapping).documents;
- }
- }
+- (void)raiseWatchSnapshotWithSnapshotVersion:(const SnapshotVersion &)snapshotVersion {
+ HARD_ASSERT(snapshotVersion != SnapshotVersion::None(),
+ "Can't raise event for unknown SnapshotVersion");
- if (trackedRemote.size() != static_cast<size_t>(filter.count)) {
- LOG_DEBUG("Existence filter mismatch, resetting mapping");
-
- // Make sure the mismatch is exposed in the remote event
- [remoteEvent handleExistenceFilterMismatchForTargetID:target];
-
- // Clear the resume token for the query, since we're in a known mismatch state.
- queryData = [[FSTQueryData alloc] initWithQuery:query
- targetID:targetID
- listenSequenceNumber:queryData.sequenceNumber
- purpose:queryData.purpose];
- self.listenTargets[target] = queryData;
-
- // Cause a hard reset by unwatching and rewatching immediately, but deliberately don't
- // send a resume token so that we get a full update.
- [self sendUnwatchRequestForTargetID:@(targetID)];
-
- // Mark the query we send as being on behalf of an existence filter mismatch, but don't
- // actually retain that in listenTargets. This ensures that we flag the first re-listen
- // this way without impacting future listens of this target (that might happen e.g. on
- // reconnect).
- FSTQueryData *requestQueryData =
- [[FSTQueryData alloc] initWithQuery:query
- targetID:targetID
- listenSequenceNumber:queryData.sequenceNumber
- purpose:FSTQueryPurposeExistenceFilterMismatch];
- [self sendWatchRequestWithQueryData:requestQueryData];
- }
- }
- }];
+ FSTRemoteEvent *remoteEvent =
+ [self.watchChangeAggregator remoteEventAtSnapshotVersion:snapshotVersion];
// Update in-memory resume tokens. FSTLocalStore will update the persistent view of these when
// applying the completed FSTRemoteEvent.
- [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^(
- FSTBoxedTargetID *target, FSTTargetChange *change, BOOL *stop) {
- NSData *resumeToken = change.resumeToken;
+ for (const auto &entry : remoteEvent.targetChanges) {
+ NSData *resumeToken = entry.second.resumeToken;
if (resumeToken.length > 0) {
- FSTQueryData *queryData = self->_listenTargets[target];
+ FSTBoxedTargetID *targetID = @(entry.first);
+ FSTQueryData *queryData = _listenTargets[targetID];
// A watched target might have been removed already.
if (queryData) {
- self->_listenTargets[target] =
- [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion
+ _listenTargets[targetID] =
+ [queryData queryDataByReplacingSnapshotVersion:snapshotVersion
resumeToken:resumeToken
sequenceNumber:queryData.sequenceNumber];
}
}
- }];
+ }
+
+ // Re-establish listens for the targets that have been invalidated by existence filter mismatches.
+ for (FSTTargetID targetID : remoteEvent.targetMismatches) {
+ FSTQueryData *queryData = self.listenTargets[@(targetID)];
+
+ if (!queryData) {
+ // A watched target might have been removed already.
+ continue;
+ }
+
+ // Clear the resume token for the query, since we're in a known mismatch state.
+ queryData = [[FSTQueryData alloc] initWithQuery:queryData.query
+ targetID:targetID
+ listenSequenceNumber:queryData.sequenceNumber
+ purpose:queryData.purpose];
+ self.listenTargets[@(targetID)] = queryData;
+
+ // Cause a hard reset by unwatching and rewatching immediately, but deliberately don't send a
+ // resume token so that we get a full update.
+ [self sendUnwatchRequestForTargetID:@(targetID)];
+
+ // Mark the query we send as being on behalf of an existence filter mismatch, but don't actually
+ // retain that in listenTargets. This ensures that we flag the first re-listen this way without
+ // impacting future listens of this target (that might happen e.g. on reconnect).
+ FSTQueryData *requestQueryData =
+ [[FSTQueryData alloc] initWithQuery:queryData.query
+ targetID:targetID
+ listenSequenceNumber:queryData.sequenceNumber
+ purpose:FSTQueryPurposeExistenceFilterMismatch];
+ [self sendWatchRequestWithQueryData:requestQueryData];
+ }
// Finally handle remote event
[self.syncEngine applyRemoteEvent:remoteEvent];
@@ -471,6 +411,14 @@ static const int kMaxPendingWrites = 10;
}
}
+- (firebase::firestore::model::DocumentKeySet)remoteKeysForTarget:(FSTBoxedTargetID *)targetID {
+ return [self.syncEngine remoteKeysForTarget:targetID];
+}
+
+- (nullable FSTQueryData *)queryDataForTarget:(FSTBoxedTargetID *)targetID {
+ return self.listenTargets[targetID];
+}
+
#pragma mark Write Stream
/**