From afea8d5aacf474b57b4364feda08be9ca195594b Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Tue, 12 Jun 2018 10:58:35 -0700 Subject: Refactor Remote Event (#1396) --- Firestore/Source/Core/FSTSyncEngine.mm | 45 +- Firestore/Source/Core/FSTView.mm | 30 +- Firestore/Source/Local/FSTLocalStore.mm | 38 +- Firestore/Source/Remote/FSTRemoteEvent.h | 199 +++---- Firestore/Source/Remote/FSTRemoteEvent.mm | 853 ++++++++++++++---------------- Firestore/Source/Remote/FSTRemoteStore.h | 10 +- Firestore/Source/Remote/FSTRemoteStore.mm | 204 +++---- Firestore/Source/Remote/FSTWatchChange.mm | 2 +- 8 files changed, 617 insertions(+), 764 deletions(-) (limited to 'Firestore/Source') diff --git a/Firestore/Source/Core/FSTSyncEngine.mm b/Firestore/Source/Core/FSTSyncEngine.mm index 7d0c1a3..89cb774 100644 --- a/Firestore/Source/Core/FSTSyncEngine.mm +++ b/Firestore/Source/Core/FSTSyncEngine.mm @@ -295,21 +295,6 @@ static const FSTListenSequenceNumber kIrrelevantSequenceNumber = -1; - (void)applyRemoteEvent:(FSTRemoteEvent *)remoteEvent { [self assertDelegateExistsForSelector:_cmd]; - // Make sure limbo documents are deleted if there were no results. - // Filter out document additions to targets that they already belong to. - [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^( - FSTBoxedTargetID *_Nonnull targetID, - FSTTargetChange *_Nonnull targetChange, BOOL *_Nonnull stop) { - const auto iter = self->_limboKeysByTarget.find([targetID intValue]); - if (iter == self->_limboKeysByTarget.end()) { - FSTQueryView *qv = self.queryViewsByTarget[targetID]; - HARD_ASSERT(qv, "Missing queryview for non-limbo query: %s", [targetID intValue]); - [targetChange.mapping filterUpdatesUsingExistingKeys:qv.view.syncedDocuments]; - } else { - [remoteEvent synthesizeDeleteForLimboTargetChange:targetChange key:iter->second]; - } - }]; - FSTMaybeDocumentDictionary *changes = [self.localStore applyRemoteEvent:remoteEvent]; [self emitNewSnapshotsWithChanges:changes remoteEvent:remoteEvent]; } @@ -345,18 +330,16 @@ static const FSTListenSequenceNumber kIrrelevantSequenceNumber = -1; // It's a limbo doc. Create a synthetic event saying it was deleted. This is kind of a hack. // Ideally, we would have a method in the local store to purge a document. However, it would // be tricky to keep all of the local store's invariants with another method. - NSMutableDictionary *targetChanges = - [NSMutableDictionary dictionary]; FSTDeletedDocument *doc = [FSTDeletedDocument documentWithKey:limboKey version:SnapshotVersion::None()]; DocumentKeySet limboDocuments = DocumentKeySet{doc.key}; - FSTRemoteEvent *event = - [[FSTRemoteEvent alloc] initWithSnapshotVersion:SnapshotVersion::None() - targetChanges:targetChanges - documentUpdates:{ - { limboKey, doc } - } - limboDocuments:std::move(limboDocuments)]; + FSTRemoteEvent *event = [[FSTRemoteEvent alloc] initWithSnapshotVersion:SnapshotVersion::None() + targetChanges:{} + targetMismatches:{} + documentUpdates:{ + { limboKey, doc } + } + limboDocuments:std::move(limboDocuments)]; [self applyRemoteEvent:event]; } else { FSTQueryView *queryView = self.queryViewsByTarget[@(targetID)]; @@ -439,7 +422,14 @@ static const FSTListenSequenceNumber kIrrelevantSequenceNumber = -1; FSTDocumentDictionary *docs = [self.localStore executeQuery:queryView.query]; viewDocChanges = [view computeChangesWithDocuments:docs previousChanges:viewDocChanges]; } - FSTTargetChange *_Nullable targetChange = remoteEvent.targetChanges[@(queryView.targetID)]; + + FSTTargetChange *_Nullable targetChange = nil; + if (remoteEvent) { + auto it = remoteEvent.targetChanges.find(queryView.targetID); + if (it != remoteEvent.targetChanges.end()) { + targetChange = it->second; + } + } FSTViewChange *viewChange = [queryView.view applyChangesToDocuments:viewDocChanges targetChange:targetChange]; @@ -531,6 +521,11 @@ static const FSTListenSequenceNumber kIrrelevantSequenceNumber = -1; [self.remoteStore userDidChange:user]; } +- (firebase::firestore::model::DocumentKeySet)remoteKeysForTarget:(FSTBoxedTargetID *)targetId { + FSTQueryView *queryView = self.queryViewsByTarget[targetId]; + return queryView ? queryView.view.syncedDocuments : DocumentKeySet{}; +} + @end NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Core/FSTView.mm b/Firestore/Source/Core/FSTView.mm index 63efd4e..f954731 100644 --- a/Firestore/Source/Core/FSTView.mm +++ b/Firestore/Source/Core/FSTView.mm @@ -401,28 +401,18 @@ static NSComparisonResult FSTCompareDocumentViewChangeTypes(FSTDocumentViewChang */ - (void)applyTargetChange:(nullable FSTTargetChange *)targetChange { if (targetChange) { - FSTTargetMapping *targetMapping = targetChange.mapping; - if ([targetMapping isKindOfClass:[FSTResetMapping class]]) { - _syncedDocuments = ((FSTResetMapping *)targetMapping).documents; - } else if ([targetMapping isKindOfClass:[FSTUpdateMapping class]]) { - for (const DocumentKey &key : ((FSTUpdateMapping *)targetMapping).addedDocuments) { - _syncedDocuments = _syncedDocuments.insert(key); - } - for (const DocumentKey &key : ((FSTUpdateMapping *)targetMapping).removedDocuments) { - _syncedDocuments = _syncedDocuments.erase(key); - } + for (const DocumentKey &key : targetChange.addedDocuments) { + _syncedDocuments = _syncedDocuments.insert(key); } - - switch (targetChange.currentStatusUpdate) { - case FSTCurrentStatusUpdateMarkCurrent: - self.current = YES; - break; - case FSTCurrentStatusUpdateMarkNotCurrent: - self.current = NO; - break; - case FSTCurrentStatusUpdateNone: - break; + for (const DocumentKey &key : targetChange.modifiedDocuments) { + HARD_ASSERT(_syncedDocuments.find(key) != _syncedDocuments.end(), + "Modified document %s not found in view.", key.ToString()); } + for (const DocumentKey &key : targetChange.removedDocuments) { + _syncedDocuments = _syncedDocuments.erase(key); + } + + self.current = targetChange.current; } } diff --git a/Firestore/Source/Local/FSTLocalStore.mm b/Firestore/Source/Local/FSTLocalStore.mm index 7469c71..6aab78e 100644 --- a/Firestore/Source/Local/FSTLocalStore.mm +++ b/Firestore/Source/Local/FSTLocalStore.mm @@ -268,46 +268,32 @@ NS_ASSUME_NONNULL_BEGIN FSTListenSequenceNumber sequenceNumber = [self.listenSequence next]; id queryCache = self.queryCache; - [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^( - NSNumber *targetIDNumber, FSTTargetChange *change, BOOL *stop) { - FSTTargetID targetID = targetIDNumber.intValue; + for (const auto &entry : remoteEvent.targetChanges) { + FSTTargetID targetID = entry.first; + FSTBoxedTargetID *boxedTargetID = @(targetID); + FSTTargetChange *change = entry.second; // Do not ref/unref unassigned targetIDs - it may lead to leaks. - FSTQueryData *queryData = self.targetIDs[targetIDNumber]; + FSTQueryData *queryData = self.targetIDs[boxedTargetID]; if (!queryData) { - return; + continue; } + [queryCache removeMatchingKeys:change.removedDocuments forTargetID:targetID]; + [queryCache addMatchingKeys:change.addedDocuments forTargetID:targetID]; + // Update the resume token if the change includes one. Don't clear any preexisting value. // Bump the sequence number as well, so that documents being removed now are ordered later // than documents that were previously removed from this target. NSData *resumeToken = change.resumeToken; if (resumeToken.length > 0) { - queryData = [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion + queryData = [queryData queryDataByReplacingSnapshotVersion:remoteEvent.snapshotVersion resumeToken:resumeToken sequenceNumber:sequenceNumber]; - self.targetIDs[targetIDNumber] = queryData; + self.targetIDs[boxedTargetID] = queryData; [self.queryCache updateQueryData:queryData]; } - - FSTTargetMapping *mapping = change.mapping; - if (mapping) { - // First make sure that all references are deleted. - if ([mapping isKindOfClass:[FSTResetMapping class]]) { - FSTResetMapping *reset = (FSTResetMapping *)mapping; - [queryCache removeMatchingKeysForTargetID:targetID]; - [queryCache addMatchingKeys:reset.documents forTargetID:targetID]; - - } else if ([mapping isKindOfClass:[FSTUpdateMapping class]]) { - FSTUpdateMapping *update = (FSTUpdateMapping *)mapping; - [queryCache removeMatchingKeys:update.removedDocuments forTargetID:targetID]; - [queryCache addMatchingKeys:update.addedDocuments forTargetID:targetID]; - - } else { - HARD_FAIL("Unknown mapping type: %s", mapping); - } - } - }]; + } // TODO(klimt): This could probably be an NSMutableDictionary. DocumentKeySet changedDocKeys; diff --git a/Firestore/Source/Remote/FSTRemoteEvent.h b/Firestore/Source/Remote/FSTRemoteEvent.h index c84e34d..9ea0f9c 100644 --- a/Firestore/Source/Remote/FSTRemoteEvent.h +++ b/Firestore/Source/Remote/FSTRemoteEvent.h @@ -17,6 +17,9 @@ #import #include +#include +#include +#include #import "Firestore/Source/Core/FSTTypes.h" #import "Firestore/Source/Model/FSTDocumentDictionary.h" @@ -30,128 +33,82 @@ @class FSTMaybeDocument; @class FSTWatchChange; @class FSTQueryData; +@class FSTDocumentWatchChange; +@class FSTWatchTargetChange; +@class FSTExistenceFilterWatchChange; NS_ASSUME_NONNULL_BEGIN -#pragma mark - FSTTargetMapping - -/** - * TargetMapping represents a change to the documents in a query from the server. This can either - * be an incremental Update or a full Reset. - * - *

This is an empty abstract class so that all the different kinds of changes can have a common - * base class. - */ -@interface FSTTargetMapping : NSObject - -/** - * Strips out mapping changes that aren't actually changes. That is, if the document already - * existed in the target, and is being added in the target, and this is not a reset, we can - * skip doing the work to associate the document with the target because it has already been done. - */ -- (void)filterUpdatesUsingExistingKeys: - (const firebase::firestore::model::DocumentKeySet &)existingKeys; - -@end - -#pragma mark - FSTResetMapping - -/** The new set of documents to replace the current documents for a target. */ -@interface FSTResetMapping : FSTTargetMapping - /** - * Creates a new mapping with the keys for the given documents added. This is intended primarily - * for testing. + * Interface implemented by RemoteStore to expose target metadata to the FSTWatchChangeAggregator. */ -+ (FSTResetMapping *)mappingWithDocuments:(NSArray *)documents; - -/** The new set of documents for the target. */ -- (const firebase::firestore::model::DocumentKeySet &)documents; -@end - -#pragma mark - FSTUpdateMapping +@protocol FSTTargetMetadataProvider /** - * A target should update its set of documents with the given added/removed set of documents. + * Returns the set of remote document keys for the given target ID as of the last raised snapshot. */ -@interface FSTUpdateMapping : FSTTargetMapping +- (firebase::firestore::model::DocumentKeySet)remoteKeysForTarget:(FSTBoxedTargetID *)targetID; /** - * Creates a new mapping with the keys for the given documents added. This is intended primarily - * for testing. + * Returns the FSTQueryData for an active target ID or 'null' if this query has become inactive */ -+ (FSTUpdateMapping *)mappingWithAddedDocuments:(NSArray *)added - removedDocuments:(NSArray *)removed; +- (nullable FSTQueryData *)queryDataForTarget:(FSTBoxedTargetID *)targetID; -- (firebase::firestore::model::DocumentKeySet)applyTo: - (const firebase::firestore::model::DocumentKeySet &)keys; - -/** The documents added to the target. */ -- (const firebase::firestore::model::DocumentKeySet &)addedDocuments; -/** The documents removed from the target. */ -- (const firebase::firestore::model::DocumentKeySet &)removedDocuments; @end #pragma mark - FSTTargetChange /** - * Represents an update to the current status of a target, either explicitly having no new state, or - * the new value to set. Note "current" has special meaning in the RPC protocol that implies that a - * target is both up-to-date and consistent with the rest of the watch stream. - */ -typedef NS_ENUM(NSUInteger, FSTCurrentStatusUpdate) { - /** The current status is not affected and should not be modified */ - FSTCurrentStatusUpdateNone, - /** The target must be marked as no longer "current" */ - FSTCurrentStatusUpdateMarkNotCurrent, - /** The target must be marked as "current" */ - FSTCurrentStatusUpdateMarkCurrent, -}; - -/** - * A part of an FSTRemoteEvent specifying set of changes to a specific target. These changes track - * what documents are currently included in the target as well as the current snapshot version and - * resume token but the actual changes *to* documents are not part of the FSTTargetChange since - * documents may be part of multiple targets. + * An FSTTargetChange specifies the set of changes for a specific target as part of an + * FSTRemoteEvent. These changes track which documents are added, modified or emoved, as well as the + * target's resume token and whether the target is marked CURRENT. + * + * The actual changes *to* documents are not part of the FSTTargetChange since documents may be part + * of multiple targets. */ @interface FSTTargetChange : NSObject /** * Creates a new target change with the given SnapshotVersion. */ -- (instancetype)initWithSnapshotVersion: - (firebase::firestore::model::SnapshotVersion)snapshotVersion; +- (instancetype)initWithResumeToken:(NSData *)resumeToken + current:(BOOL)current + addedDocuments:(firebase::firestore::model::DocumentKeySet)addedDocuments + modifiedDocuments:(firebase::firestore::model::DocumentKeySet)modifiedDocuments + removedDocuments:(firebase::firestore::model::DocumentKeySet)removedDocuments + NS_DESIGNATED_INITIALIZER; + +- (instancetype)init NS_UNAVAILABLE; /** - * Creates a new target change with the given documents. Instances of FSTDocument are considered - * added. Instance of FSTDeletedDocument are considered removed. This is intended primarily for - * testing. + * An opaque, server-assigned token that allows watching a query to be resumed after + * disconnecting without retransmitting all the data that matches the query. The resume token + * essentially identifies a point in time from which the server should resume sending results. */ -+ (instancetype)changeWithDocuments:(NSArray *)docs - currentStatusUpdate:(FSTCurrentStatusUpdate)currentStatusUpdate; +@property(nonatomic, strong, readonly) NSData *resumeToken; /** - * The snapshot version representing the last state at which this target received a consistent - * snapshot from the backend. + * The "current" (synced) status of this target. Note that "current" has special meaning in the RPC + * protocol that implies that a target is both up-to-date and consistent with the rest of the watch + * stream. */ -- (const firebase::firestore::model::SnapshotVersion &)snapshotVersion; +@property(nonatomic, assign, readonly) BOOL current; /** - * The new "current" (synced) status of this target. Set to CurrentStatusUpdateNone if the status - * should not be updated. Note "current" has special meaning for in the RPC protocol that implies - * that a target is both up-to-date and consistent with the rest of the watch stream. + * The set of documents that were newly assigned to this target as part of this remote event. */ -@property(nonatomic, assign, readonly) FSTCurrentStatusUpdate currentStatusUpdate; +- (const firebase::firestore::model::DocumentKeySet &)addedDocuments; -/** A set of changes to documents in this target. */ -@property(nonatomic, strong, readonly) FSTTargetMapping *mapping; +/** + * The set of documents that were already assigned to this target but received an update during this + * remote event. + */ +- (const firebase::firestore::model::DocumentKeySet &)modifiedDocuments; /** - * An opaque, server-assigned token that allows watching a query to be resumed after disconnecting - * without retransmitting all the data that matches the query. The resume token essentially - * identifies a point in time from which the server should resume sending results. + * The set of documents that were removed from this target as part of this remote event. */ -@property(nonatomic, strong, readonly) NSData *resumeToken; +- (const firebase::firestore::model::DocumentKeySet &)removedDocuments; @end @@ -165,34 +122,38 @@ typedef NS_ENUM(NSUInteger, FSTCurrentStatusUpdate) { - (instancetype) initWithSnapshotVersion:(firebase::firestore::model::SnapshotVersion)snapshotVersion - targetChanges:(NSMutableDictionary *)targetChanges + targetChanges:(std::unordered_map)targetChanges + targetMismatches:(std::unordered_set)targetMismatches documentUpdates: - (std::map)documentUpdates + (std::unordered_map)documentUpdates limboDocuments:(firebase::firestore::model::DocumentKeySet)limboDocuments; /** The snapshot version this event brings us up to. */ - (const firebase::firestore::model::SnapshotVersion &)snapshotVersion; -/** A map from target to changes to the target. See TargetChange. */ -@property(nonatomic, strong, readonly) - NSDictionary *targetChanges; - /** - * A set of which documents have changed or been deleted, along with the doc's new values - * (if not deleted). + * A set of which document updates are due only to limbo resolution targets. */ -- (const std::map &)documentUpdates; - - (const firebase::firestore::model::DocumentKeySet &)limboDocumentChanges; -/** Adds a document update to this remote event */ -- (void)addDocumentUpdate:(FSTMaybeDocument *)document; +/** A map from target to changes to the target. See TargetChange. */ +- (const std::unordered_map &)targetChanges; -/** Handles an existence filter mismatch */ -- (void)handleExistenceFilterMismatchForTargetID:(FSTBoxedTargetID *)targetID; +/** + * A set of targets that is known to be inconsistent. Listens for these targets should be + * re-established without resume tokens. + */ +- (const std::unordered_set &)targetMismatches; -- (void)synthesizeDeleteForLimboTargetChange:(FSTTargetChange *)targetChange - key:(const firebase::firestore::model::DocumentKey &)key; +/** + * A set of which documents have changed or been deleted, along with the doc's new values (if not + * deleted). + */ +- (const std::unordered_map &)documentUpdates; @end @@ -204,33 +165,35 @@ initWithSnapshotVersion:(firebase::firestore::model::SnapshotVersion)snapshotVer */ @interface FSTWatchChangeAggregator : NSObject -- (instancetype) -initWithSnapshotVersion:(firebase::firestore::model::SnapshotVersion)snapshotVersion - listenTargets:(NSDictionary *)listenTargets - pendingTargetResponses:(NSDictionary *)pendingTargetResponses +- (instancetype)initWithTargetMetadataProvider:(id)targetMetadataProvider NS_DESIGNATED_INITIALIZER; - (instancetype)init NS_UNAVAILABLE; -/** The number of pending responses that are being waited on from watch */ -@property(nonatomic, strong, readonly) - NSMutableDictionary *pendingTargetResponses; +/** Processes and adds the FSTDocumentWatchChange to the current set of changes. */ +- (void)handleDocumentChange:(FSTDocumentWatchChange *)documentChange; -/** Aggregates a watch change into the current state */ -- (void)addWatchChange:(FSTWatchChange *)watchChange; +/** Processes and adds the WatchTargetChange to the current set of changes. */ +- (void)handleTargetChange:(FSTWatchTargetChange *)targetChange; -/** Aggregates all provided watch changes to the current state in order */ -- (void)addWatchChanges:(NSArray *)watchChanges; +/** + * Handles existence filters and synthesizes deletes for filter mismatches. Targets that are + * invalidated by filter mismatches are added to `targetMismatches`. + */ +- (void)handleExistenceFilter:(FSTExistenceFilterWatchChange *)existenceFilter; + +/** + * Increment the number of acks needed from watch before we can consider the server to be 'in-sync' + * with the client's active targets. + */ +- (void)recordTargetRequest:(FSTBoxedTargetID *)targetID; /** * Converts the current state into a remote event with the snapshot version taken from the * initializer. */ -- (FSTRemoteEvent *)remoteEvent; - -/** The existence filters - if any - for the given target IDs. */ -@property(nonatomic, strong, readonly) - NSDictionary *existenceFilters; +- (FSTRemoteEvent *)remoteEventAtSnapshotVersion: + (const firebase::firestore::model::SnapshotVersion &)snapshotVersion; @end diff --git a/Firestore/Source/Remote/FSTRemoteEvent.mm b/Firestore/Source/Remote/FSTRemoteEvent.mm index 67af650..5e00310 100644 --- a/Firestore/Source/Remote/FSTRemoteEvent.mm +++ b/Firestore/Source/Remote/FSTRemoteEvent.mm @@ -17,10 +17,16 @@ #import "Firestore/Source/Remote/FSTRemoteEvent.h" #include +#include +#include #include +#import "Firestore/Source/Core/FSTQuery.h" +#import "Firestore/Source/Core/FSTViewSnapshot.h" #import "Firestore/Source/Local/FSTQueryData.h" #import "Firestore/Source/Model/FSTDocument.h" +#import "Firestore/Source/Remote/FSTExistenceFilter.h" +#import "Firestore/Source/Remote/FSTRemoteStore.h" #import "Firestore/Source/Remote/FSTWatchChange.h" #import "Firestore/Source/Util/FSTClasses.h" @@ -30,619 +36,578 @@ #include "Firestore/core/src/firebase/firestore/util/log.h" using firebase::firestore::model::DocumentKey; +using firebase::firestore::model::DocumentKeyHash; +using firebase::firestore::model::DocumentKeySet; using firebase::firestore::model::SnapshotVersion; using firebase::firestore::util::Hash; -using firebase::firestore::model::DocumentKeySet; NS_ASSUME_NONNULL_BEGIN -#pragma mark - FSTTargetMapping - -@interface FSTTargetMapping () - -/** Private mutator method to add a document key to the mapping */ -- (void)addDocumentKey:(const DocumentKey &)documentKey; - -/** Private mutator method to remove a document key from the mapping */ -- (void)removeDocumentKey:(const DocumentKey &)documentKey; - -@end - -@implementation FSTTargetMapping - -- (void)addDocumentKey:(const DocumentKey &)documentKey { - @throw FSTAbstractMethodException(); // NOLINT -} +#pragma mark - FSTTargetChange -- (void)removeDocumentKey:(const DocumentKey &)documentKey { - @throw FSTAbstractMethodException(); // NOLINT +@implementation FSTTargetChange { + DocumentKeySet _addedDocuments; + DocumentKeySet _modifiedDocuments; + DocumentKeySet _removedDocuments; } -- (void)filterUpdatesUsingExistingKeys:(const DocumentKeySet &)existingKeys { - @throw FSTAbstractMethodException(); // NOLINT +- (instancetype)initWithResumeToken:(NSData *)resumeToken + current:(BOOL)current + addedDocuments:(DocumentKeySet)addedDocuments + modifiedDocuments:(DocumentKeySet)modifiedDocuments + removedDocuments:(DocumentKeySet)removedDocuments { + if (self = [super init]) { + _resumeToken = [resumeToken copy]; + _current = current; + _addedDocuments = std::move(addedDocuments); + _modifiedDocuments = std::move(modifiedDocuments); + _removedDocuments = std::move(removedDocuments); + } + return self; } -@end - -#pragma mark - FSTResetMapping - -@implementation FSTResetMapping { - DocumentKeySet _documents; +- (const DocumentKeySet &)addedDocuments { + return _addedDocuments; } -+ (instancetype)mappingWithDocuments:(NSArray *)documents { - DocumentKeySet keys; - for (FSTDocument *doc in documents) { - keys = keys.insert(doc.key); - } - return [[FSTResetMapping alloc] initWithDocuments:std::move(keys)]; +- (const DocumentKeySet &)modifiedDocuments { + return _modifiedDocuments; } -- (instancetype)initWithDocuments:(DocumentKeySet)documents { - self = [super init]; - if (self) { - _documents = std::move(documents); - } - return self; -} - -- (const DocumentKeySet &)documents { - return _documents; +- (const DocumentKeySet &)removedDocuments { + return _removedDocuments; } - (BOOL)isEqual:(id)other { if (other == self) { return YES; } - if (![other isMemberOfClass:[FSTResetMapping class]]) { + if (![other isMemberOfClass:[FSTTargetChange class]]) { return NO; } - FSTResetMapping *otherMapping = (FSTResetMapping *)other; - return _documents == otherMapping.documents; + return [self current] == [other current] && + [[self resumeToken] isEqualToData:[other resumeToken]] && + [self addedDocuments] == [other addedDocuments] && + [self modifiedDocuments] == [other modifiedDocuments] && + [self removedDocuments] == [other removedDocuments]; } -- (NSUInteger)hash { - return Hash(_documents); -} +@end -- (void)addDocumentKey:(const DocumentKey &)documentKey { - _documents = _documents.insert(documentKey); -} +#pragma mark - FSTTargetState -- (void)removeDocumentKey:(const DocumentKey &)documentKey { - _documents = _documents.erase(documentKey); -} +/** Tracks the internal state of a Watch target. */ +@interface FSTTargetState : NSObject -- (void)filterUpdatesUsingExistingKeys:(const DocumentKeySet &)existingKeys { - // No-op. Resets are not filtered. -} +/** + * Whether this target has been marked 'current'. + * + * 'Current' has special meaning in the RPC protocol: It implies that the Watch backend has sent us + * all changes up to the point at which the target was added and that the target is consistent with + * the rest of the watch stream. + */ +@property(nonatomic) BOOL current; -@end +/** The last resume token sent to us for this target. */ +@property(nonatomic, readonly, strong) NSData *resumeToken; -#pragma mark - FSTUpdateMapping +/** Whether we have modified any state that should trigger a snapshot. */ +@property(nonatomic, readonly) BOOL hasPendingChanges; -@implementation FSTUpdateMapping { - DocumentKeySet _addedDocuments; - DocumentKeySet _removedDocuments; -} +/** Whether this target has pending target adds or target removes. */ +- (BOOL)isPending; -+ (FSTUpdateMapping *)mappingWithAddedDocuments:(NSArray *)added - removedDocuments:(NSArray *)removed { - DocumentKeySet addedDocuments; - DocumentKeySet removedDocuments; - for (FSTDocument *doc in added) { - addedDocuments = addedDocuments.insert(doc.key); - } - for (FSTDocument *doc in removed) { - removedDocuments = removedDocuments.insert(doc.key); - } - return [[FSTUpdateMapping alloc] initWithAddedDocuments:std::move(addedDocuments) - removedDocuments:std::move(removedDocuments)]; -} +/** + * Applies the resume token to the TargetChange, but only when it has a new value. Empty + * resumeTokens are discarded. + */ +- (void)updateResumeToken:(NSData *)resumeToken; -- (instancetype)initWithAddedDocuments:(DocumentKeySet)addedDocuments - removedDocuments:(DocumentKeySet)removedDocuments { - self = [super init]; - if (self) { - _addedDocuments = std::move(addedDocuments); - _removedDocuments = std::move(removedDocuments); - } - return self; -} +/** Resets the document changes and sets `hasPendingChanges` to false. */ +- (void)clearPendingChanges; +/** + * Creates a target change from the current set of changes. + * + * To reset the document changes after raising this snapshot, call `clearPendingChanges()`. + */ +- (FSTTargetChange *)toTargetChange; -- (const DocumentKeySet &)addedDocuments { - return _addedDocuments; -} +- (void)recordTargetRequest; +- (void)recordTargetResponse; +- (void)markCurrent; +- (void)addDocumentChangeWithType:(FSTDocumentViewChangeType)type + forKey:(const DocumentKey &)documentKey; +- (void)removeDocumentChangeForKey:(const DocumentKey &)documentKey; -- (const DocumentKeySet &)removedDocuments { - return _removedDocuments; -} +@end -- (BOOL)isEqual:(id)other { - if (other == self) { - return YES; - } - if (![other isMemberOfClass:[FSTUpdateMapping class]]) { - return NO; - } +@implementation FSTTargetState { + /** + * The number of outstanding responses (adds or removes) that we are waiting on. We only consider + * targets active that have no outstanding responses. + */ + int _outstandingResponses; - FSTUpdateMapping *otherMapping = (FSTUpdateMapping *)other; - return _addedDocuments == otherMapping.addedDocuments && - _removedDocuments == otherMapping.removedDocuments; + /** + * Keeps track of the document changes since the last raised snapshot. + * + * These changes are continuously updated as we receive document updates and always reflect the + * current set of changes against the last issued snapshot. + */ + std::unordered_map _documentChanges; } -- (NSUInteger)hash { - return Hash(_addedDocuments, _removedDocuments); -} +- (instancetype)init { + if (self = [super init]) { + _resumeToken = [NSData data]; + _outstandingResponses = 0; -- (DocumentKeySet)applyTo:(const DocumentKeySet &)keys { - DocumentKeySet result = keys; - for (const DocumentKey &key : _addedDocuments) { - result = result.insert(key); + // We initialize to 'true' so that newly-added targets are included in the next RemoteEvent. + _hasPendingChanges = YES; } - for (const DocumentKey &key : _removedDocuments) { - result = result.erase(key); - } - return result; -} - -- (void)addDocumentKey:(const DocumentKey &)documentKey { - _addedDocuments = _addedDocuments.insert(documentKey); - _removedDocuments = _removedDocuments.erase(documentKey); + return self; } -- (void)removeDocumentKey:(const DocumentKey &)documentKey { - _addedDocuments = _addedDocuments.erase(documentKey); - _removedDocuments = _removedDocuments.insert(documentKey); +- (BOOL)isPending { + return _outstandingResponses != 0; } -- (void)filterUpdatesUsingExistingKeys:(const DocumentKeySet &)existingKeys { - DocumentKeySet result = _addedDocuments; - for (const DocumentKey &key : _addedDocuments) { - if (existingKeys.contains(key)) { - result = result.erase(key); - } +- (void)updateResumeToken:(NSData *)resumeToken { + if (resumeToken.length > 0) { + _hasPendingChanges = YES; + _resumeToken = [resumeToken copy]; } - _addedDocuments = result; } -@end - -#pragma mark - FSTTargetChange +- (void)clearPendingChanges { + _hasPendingChanges = NO; + _documentChanges.clear(); +} -@interface FSTTargetChange () -@property(nonatomic, assign) FSTCurrentStatusUpdate currentStatusUpdate; -@property(nonatomic, strong, nullable) FSTTargetMapping *mapping; -@property(nonatomic, strong) NSData *resumeToken; -@end +- (void)recordTargetRequest { + _outstandingResponses += 1; +} -@implementation FSTTargetChange { - SnapshotVersion _snapshotVersion; +- (void)recordTargetResponse { + _outstandingResponses -= 1; } -- (instancetype)init { - if (self = [super init]) { - _currentStatusUpdate = FSTCurrentStatusUpdateNone; - _resumeToken = [NSData data]; - } - return self; +- (void)markCurrent { + _hasPendingChanges = YES; + _current = true; } -- (instancetype)initWithSnapshotVersion:(SnapshotVersion)snapshotVersion { - if (self = [self init]) { - _snapshotVersion = std::move(snapshotVersion); - } - return self; +- (void)addDocumentChangeWithType:(FSTDocumentViewChangeType)type + forKey:(const DocumentKey &)documentKey { + _hasPendingChanges = YES; + _documentChanges[documentKey] = type; } -- (const SnapshotVersion &)snapshotVersion { - return _snapshotVersion; +- (void)removeDocumentChangeForKey:(const DocumentKey &)documentKey { + _hasPendingChanges = YES; + _documentChanges.erase(documentKey); } -+ (instancetype)changeWithDocuments:(NSArray *)docs - currentStatusUpdate:(FSTCurrentStatusUpdate)currentStatusUpdate { +- (FSTTargetChange *)toTargetChange { DocumentKeySet addedDocuments; + DocumentKeySet modifiedDocuments; DocumentKeySet removedDocuments; - for (FSTMaybeDocument *doc in docs) { - if ([doc isKindOfClass:[FSTDeletedDocument class]]) { - removedDocuments = removedDocuments.insert(doc.key); - } else { - addedDocuments = addedDocuments.insert(doc.key); + + for (const auto &entry : _documentChanges) { + switch (entry.second) { + case FSTDocumentViewChangeTypeAdded: + addedDocuments = addedDocuments.insert(entry.first); + break; + case FSTDocumentViewChangeTypeModified: + modifiedDocuments = modifiedDocuments.insert(entry.first); + break; + case FSTDocumentViewChangeTypeRemoved: + removedDocuments = removedDocuments.insert(entry.first); + break; + default: + HARD_FAIL("Encountered invalid change type: %s", entry.second); } } - FSTUpdateMapping *mapping = - [[FSTUpdateMapping alloc] initWithAddedDocuments:std::move(addedDocuments) - removedDocuments:std::move(removedDocuments)]; - - FSTTargetChange *change = [[FSTTargetChange alloc] init]; - change.mapping = mapping; - change.currentStatusUpdate = currentStatusUpdate; - return change; -} - -+ (instancetype)changeWithMapping:(FSTTargetMapping *)mapping - snapshotVersion:(SnapshotVersion)snapshotVersion - currentStatusUpdate:(FSTCurrentStatusUpdate)currentStatusUpdate { - FSTTargetChange *change = [[FSTTargetChange alloc] init]; - change.mapping = mapping; - change->_snapshotVersion = std::move(snapshotVersion); - change.currentStatusUpdate = currentStatusUpdate; - return change; -} - -- (FSTTargetMapping *)mapping { - if (!_mapping) { - // Create an FSTUpdateMapping by default, since resets are always explicit - _mapping = [[FSTUpdateMapping alloc] init]; - } - return _mapping; -} -/** - * Sets the resume token but only when it has a new value. Empty resumeTokens are - * discarded. - */ -- (void)setResumeToken:(NSData *)resumeToken { - if (resumeToken.length > 0) { - _resumeToken = resumeToken; - } + return [[FSTTargetChange alloc] initWithResumeToken:_resumeToken + current:_current + addedDocuments:std::move(addedDocuments) + modifiedDocuments:std::move(modifiedDocuments) + removedDocuments:std::move(removedDocuments)]; } - @end #pragma mark - FSTRemoteEvent @implementation FSTRemoteEvent { SnapshotVersion _snapshotVersion; - NSMutableDictionary *_targetChanges; - std::map _documentUpdates; + std::unordered_map _targetChanges; + std::unordered_set _targetMismatches; + std::unordered_map _documentUpdates; DocumentKeySet _limboDocumentChanges; } -- (instancetype)initWithSnapshotVersion:(SnapshotVersion)snapshotVersion - targetChanges: - (NSMutableDictionary *)targetChanges - documentUpdates:(std::map)documentUpdates - limboDocuments:(DocumentKeySet)limboDocuments { +- (instancetype) +initWithSnapshotVersion:(SnapshotVersion)snapshotVersion + targetChanges:(std::unordered_map)targetChanges + targetMismatches:(std::unordered_set)targetMismatches + documentUpdates: + (std::unordered_map)documentUpdates + limboDocuments:(DocumentKeySet)limboDocuments { self = [super init]; if (self) { _snapshotVersion = std::move(snapshotVersion); - _targetChanges = targetChanges; + _targetChanges = std::move(targetChanges); + _targetMismatches = std::move(targetMismatches); _documentUpdates = std::move(documentUpdates); _limboDocumentChanges = std::move(limboDocuments); } return self; } -- (NSDictionary *)targetChanges { - return _targetChanges; +- (const SnapshotVersion &)snapshotVersion { + return _snapshotVersion; } - (const DocumentKeySet &)limboDocumentChanges { return _limboDocumentChanges; } -- (const std::map &)documentUpdates { - return _documentUpdates; -} - -- (const SnapshotVersion &)snapshotVersion { - return _snapshotVersion; -} - -- (void)synthesizeDeleteForLimboTargetChange:(FSTTargetChange *)targetChange - key:(const DocumentKey &)key { - if (targetChange.currentStatusUpdate == FSTCurrentStatusUpdateMarkCurrent && - _documentUpdates.find(key) == _documentUpdates.end()) { - // When listening to a query the server responds with a snapshot containing documents - // matching the query and a current marker telling us we're now in sync. It's possible for - // these to arrive as separate remote events or as a single remote event. For a document - // query, there will be no documents sent in the response if the document doesn't exist. - // - // If the snapshot arrives separately from the current marker, we handle it normally and - // updateTrackedLimboDocumentsWithChanges:targetID: will resolve the limbo status of the - // document, removing it from limboDocumentRefs. This works because clients only initiate - // limbo resolution when a target is current and because all current targets are always at a - // consistent snapshot. - // - // However, if the document doesn't exist and the current marker arrives, the document is - // not present in the snapshot and our normal view handling would consider the document to - // remain in limbo indefinitely because there are no updates to the document. To avoid this, - // we specially handle this case here: synthesizing a delete. - // - // TODO(dimond): Ideally we would have an explicit lookup query instead resulting in an - // explicit delete message and we could remove this special logic. - _documentUpdates[key] = [FSTDeletedDocument documentWithKey:key version:_snapshotVersion]; - _limboDocumentChanges = _limboDocumentChanges.insert(key); - } +- (const std::unordered_map &)targetChanges { + return _targetChanges; } -/** Adds a document update to this remote event */ -- (void)addDocumentUpdate:(FSTMaybeDocument *)document { - _documentUpdates[document.key] = document; +- (const std::unordered_map &)documentUpdates { + return _documentUpdates; } -/** Handles an existence filter mismatch */ -- (void)handleExistenceFilterMismatchForTargetID:(FSTBoxedTargetID *)targetID { - // An existence filter mismatch will reset the query and we need to reset the mapping to contain - // no documents and an empty resume token. - // - // Note: - // * The reset mapping is empty, specifically forcing the consumer of the change to - // forget all keys for this targetID; - // * The resume snapshot for this target must be reset - // * The target must be unacked because unwatching and rewatching introduces a race for - // changes. - // - // TODO(dimond): keep track of reset targets not to raise. - FSTTargetChange *targetChange = - [FSTTargetChange changeWithMapping:[[FSTResetMapping alloc] init] - snapshotVersion:SnapshotVersion::None() - currentStatusUpdate:FSTCurrentStatusUpdateMarkNotCurrent]; - _targetChanges[targetID] = targetChange; +- (const std::unordered_set &)targetMismatches { + return _targetMismatches; } @end #pragma mark - FSTWatchChangeAggregator -@interface FSTWatchChangeAggregator () - -/** Keeps track of the current target mappings */ -@property(nonatomic, strong, readonly) - NSMutableDictionary *targetChanges; - -/** The set of open listens on the client */ -@property(nonatomic, strong, readonly) - NSDictionary *listenTargets; +@implementation FSTWatchChangeAggregator { + /** The internal state of all tracked targets. */ + std::unordered_map _targetStates; -/** Whether this aggregator was frozen and can no longer be modified */ -@property(nonatomic, assign) BOOL frozen; + /** Keeps track of document to update */ + std::unordered_map _pendingDocumentUpdates; -@end + /** A mapping of document keys to their set of target IDs. */ + std::unordered_map, DocumentKeyHash> + _pendingDocumentTargetMappings; -@implementation FSTWatchChangeAggregator { - NSMutableDictionary *_existenceFilters; - /** Keeps track of document to update */ - std::map _documentUpdates; + /** + * A list of targets with existence filter mismatches. These targets are known to be inconsistent + * and their listens needs to be re-established by RemoteStore. + */ + std::unordered_set _pendingTargetResets; - DocumentKeySet _limboDocuments; - /** The snapshot version for every target change this creates. */ - SnapshotVersion _snapshotVersion; + id _targetMetadataProvider; } -- (instancetype) -initWithSnapshotVersion:(SnapshotVersion)snapshotVersion - listenTargets:(NSDictionary *)listenTargets - pendingTargetResponses:(NSDictionary *)pendingTargetResponses { +- (instancetype)initWithTargetMetadataProvider: + (id)targetMetadataProvider { self = [super init]; if (self) { - _snapshotVersion = std::move(snapshotVersion); - - _frozen = NO; - _targetChanges = [NSMutableDictionary dictionary]; - _listenTargets = listenTargets; - _pendingTargetResponses = [NSMutableDictionary dictionaryWithDictionary:pendingTargetResponses]; - _limboDocuments = DocumentKeySet{}; - _existenceFilters = [NSMutableDictionary dictionary]; + _targetMetadataProvider = targetMetadataProvider; } return self; } -- (NSDictionary *)existenceFilters { - return _existenceFilters; -} - -- (FSTTargetChange *)targetChangeForTargetID:(FSTBoxedTargetID *)targetID { - FSTTargetChange *change = self.targetChanges[targetID]; - if (!change) { - change = [[FSTTargetChange alloc] initWithSnapshotVersion:_snapshotVersion]; - self.targetChanges[targetID] = change; - } - return change; -} - -- (void)addWatchChanges:(NSArray *)watchChanges { - HARD_ASSERT(!self.frozen, "Trying to modify frozen FSTWatchChangeAggregator"); - for (FSTWatchChange *watchChange in watchChanges) { - [self addWatchChange:watchChange]; - } -} - -- (void)addWatchChange:(FSTWatchChange *)watchChange { - HARD_ASSERT(!self.frozen, "Trying to modify frozen FSTWatchChangeAggregator"); - if ([watchChange isKindOfClass:[FSTDocumentWatchChange class]]) { - [self addDocumentChange:(FSTDocumentWatchChange *)watchChange]; - } else if ([watchChange isKindOfClass:[FSTWatchTargetChange class]]) { - [self addTargetChange:(FSTWatchTargetChange *)watchChange]; - } else if ([watchChange isKindOfClass:[FSTExistenceFilterWatchChange class]]) { - [self addExistenceFilterChange:(FSTExistenceFilterWatchChange *)watchChange]; - } else { - HARD_FAIL("Unknown watch change: %s", watchChange); - } -} - -/** - * Updates limbo document tracking for a given target-document mapping change. If the target is a - * limbo target, and the change for the document has only seen limbo targets so far, and we are not - * already tracking a change for this document, then consider this document a limbo document update. - * Otherwise, ensure that we don't consider this document a limbo document. Returns true if the - * change still has only seen limbo resolution changes. - */ -- (BOOL)updateLimboDocumentsForKey:(const DocumentKey &)documentKey - queryData:(FSTQueryData *)queryData - isOnlyLimbo:(BOOL)isOnlyLimbo { - if (!isOnlyLimbo) { - // It wasn't a limbo doc before, so it definitely isn't now. - return NO; - } - if (_documentUpdates.find(documentKey) == _documentUpdates.end()) { - // We haven't seen a document update for this key yet. - if (queryData.purpose == FSTQueryPurposeLimboResolution) { - // We haven't seen this document before, and this target is a limbo target. - _limboDocuments = _limboDocuments.insert(documentKey); - return YES; - } else { - // We haven't seen the document before, but this is a non-limbo target. - // Since we haven't seen it, we know it's not in our set of limbo docs. Return NO to ensure - // that this key is marked as non-limbo. - return NO; - } - } else if (queryData.purpose == FSTQueryPurposeLimboResolution) { - // We have only seen limbo targets so far for this document, and this is another limbo target. - return YES; - } else { - // We haven't marked this as non-limbo yet, but this target is not a limbo target. - // Mark the key as non-limbo and make sure it isn't in our set. - _limboDocuments = _limboDocuments.erase(documentKey); - return NO; - } -} - -- (void)addDocumentChange:(FSTDocumentWatchChange *)docChange { - BOOL relevant = NO; - BOOL isOnlyLimbo = YES; - - for (FSTBoxedTargetID *targetID in docChange.updatedTargetIDs) { - FSTQueryData *queryData = [self queryDataForActiveTarget:targetID]; - if (queryData) { - FSTTargetChange *change = [self targetChangeForTargetID:targetID]; - isOnlyLimbo = [self updateLimboDocumentsForKey:docChange.documentKey - queryData:queryData - isOnlyLimbo:isOnlyLimbo]; - [change.mapping addDocumentKey:docChange.documentKey]; - relevant = YES; - } - } - - for (FSTBoxedTargetID *targetID in docChange.removedTargetIDs) { - FSTQueryData *queryData = [self queryDataForActiveTarget:targetID]; - if (queryData) { - FSTTargetChange *change = [self targetChangeForTargetID:targetID]; - isOnlyLimbo = [self updateLimboDocumentsForKey:docChange.documentKey - queryData:queryData - isOnlyLimbo:isOnlyLimbo]; - [change.mapping removeDocumentKey:docChange.documentKey]; - relevant = YES; +- (void)handleDocumentChange:(FSTDocumentWatchChange *)documentChange { + for (FSTBoxedTargetID *targetID in documentChange.updatedTargetIDs) { + if ([documentChange.document isKindOfClass:[FSTDocument class]]) { + [self addDocument:documentChange.document toTarget:targetID.intValue]; + } else if ([documentChange.document isKindOfClass:[FSTDeletedDocument class]]) { + [self removeDocument:documentChange.document + withKey:documentChange.documentKey + fromTarget:targetID.intValue]; } } - // Only update the document if there is a new document to replace, this might be just a target - // update instead. - if (docChange.document && relevant) { - _documentUpdates[docChange.documentKey] = docChange.document; + for (FSTBoxedTargetID *targetID in documentChange.removedTargetIDs) { + [self removeDocument:documentChange.document + withKey:documentChange.documentKey + fromTarget:targetID.intValue]; } } -- (void)addTargetChange:(FSTWatchTargetChange *)targetChange { - for (FSTBoxedTargetID *targetID in targetChange.targetIDs) { - FSTTargetChange *change = [self targetChangeForTargetID:targetID]; +- (void)handleTargetChange:(FSTWatchTargetChange *)targetChange { + for (FSTBoxedTargetID *boxedTargetID in targetChange.targetIDs) { + int targetID = boxedTargetID.intValue; + FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID]; switch (targetChange.state) { case FSTWatchTargetChangeStateNoChange: if ([self isActiveTarget:targetID]) { - // Creating the change above satisfies the semantics of no-change. - change.resumeToken = targetChange.resumeToken; + [targetState updateResumeToken:targetChange.resumeToken]; } break; case FSTWatchTargetChangeStateAdded: - [self recordResponseForTargetID:targetID]; - if (!self.pendingTargetResponses[targetID]) { - // We have a freshly added target, so we need to reset any state that we had previously - // This can happen e.g. when remove and add back a target for existence filter - // mismatches. - change.mapping = nil; - change.currentStatusUpdate = FSTCurrentStatusUpdateNone; - [_existenceFilters removeObjectForKey:targetID]; + // We need to decrement the number of pending acks needed from watch for this targetId. + [targetState recordTargetResponse]; + if (!targetState.isPending) { + // We have a freshly added target, so we need to reset any state that we had previously. + // This can happen e.g. when remove and add back a target for existence filter mismatches. + [targetState clearPendingChanges]; } - change.resumeToken = targetChange.resumeToken; + [targetState updateResumeToken:targetChange.resumeToken]; break; case FSTWatchTargetChangeStateRemoved: // We need to keep track of removed targets to we can post-filter and remove any target // changes. - [self recordResponseForTargetID:targetID]; - HARD_ASSERT(!targetChange.cause, "WatchChangeAggregator does not handle errored targets."); + [targetState recordTargetResponse]; + if (!targetState.isPending) { + [self removeTarget:targetID]; + } + HARD_ASSERT(!targetChange.cause, "WatchChangeAggregator does not handle errored targets"); break; case FSTWatchTargetChangeStateCurrent: if ([self isActiveTarget:targetID]) { - change.currentStatusUpdate = FSTCurrentStatusUpdateMarkCurrent; - change.resumeToken = targetChange.resumeToken; + [targetState markCurrent]; + [targetState updateResumeToken:targetChange.resumeToken]; } break; case FSTWatchTargetChangeStateReset: if ([self isActiveTarget:targetID]) { - // Overwrite any existing target mapping with a reset mapping. Every subsequent update - // will modify the reset mapping, not an update mapping. - change.mapping = [[FSTResetMapping alloc] init]; - change.resumeToken = targetChange.resumeToken; + // Reset the target and synthesizes removes for all existing documents. The backend will + // re-add any documents that still match the target before it sends the next global + // snapshot. + [self resetTarget:targetID]; + [targetState updateResumeToken:targetChange.resumeToken]; } break; default: - LOG_WARN("Unknown target watch change type: %s", targetChange.state); + HARD_FAIL("Unknown target watch change state: %s", targetChange.state); } } } +- (void)removeTarget:(FSTTargetID)targetID { + _targetStates.erase(targetID); +} + +- (void)handleExistenceFilter:(FSTExistenceFilterWatchChange *)existenceFilter { + FSTTargetID targetID = existenceFilter.targetID; + int expectedCount = existenceFilter.filter.count; + + FSTQueryData *queryData = [self queryDataForActiveTarget:targetID]; + if (queryData) { + FSTQuery *query = queryData.query; + if ([query isDocumentQuery]) { + if (expectedCount == 0) { + // The existence filter told us the document does not exist. We deduce that this document + // does not exist and apply a deleted document to our updates. Without applying this deleted + // document there might be another query that will raise this document as part of a snapshot + // until it is resolved, essentially exposing inconsistency between queries. + FSTDocumentKey *key = [FSTDocumentKey keyWithPath:query.path]; + [self + removeDocument:[FSTDeletedDocument documentWithKey:key version:SnapshotVersion::None()] + withKey:key + fromTarget:targetID]; + } else { + HARD_ASSERT(expectedCount == 1, "Single document existence filter with count: %s", + expectedCount); + } + } else { + int currentSize = [self currentDocumentCountForTarget:targetID]; + if (currentSize != expectedCount) { + // Existence filter mismatch: We reset the mapping and raise a new snapshot with + // `isFromCache:true`. + [self resetTarget:targetID]; + _pendingTargetResets.insert(targetID); + } + } + } +} + +- (int)currentDocumentCountForTarget:(FSTTargetID)targetID { + FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID]; + FSTTargetChange *targetChange = [targetState toTargetChange]; + return ([_targetMetadataProvider remoteKeysForTarget:@(targetID)].size() + + targetChange.addedDocuments.size() - targetChange.removedDocuments.size()); +} + /** - * Records that we got a watch target add/remove by decrementing the number of pending target - * responses that we have. + * Resets the state of a Watch target to its initial state (e.g. sets 'current' to false, clears the + * resume token and removes its target mapping from all documents). */ -- (void)recordResponseForTargetID:(FSTBoxedTargetID *)targetID { - NSNumber *count = self.pendingTargetResponses[targetID]; - int newCount = count ? [count intValue] - 1 : -1; - if (newCount == 0) { - [self.pendingTargetResponses removeObjectForKey:targetID]; +- (void)resetTarget:(FSTTargetID)targetID { + auto currentTargetState = _targetStates.find(targetID); + HARD_ASSERT(currentTargetState != _targetStates.end() && !(currentTargetState->second.isPending), + "Should only reset active targets"); + + _targetStates[targetID] = [FSTTargetState new]; + + // Trigger removal for any documents currently mapped to this target. These removals will be part + // of the initial snapshot if Watch does not resend these documents. + DocumentKeySet existingKeys = [_targetMetadataProvider remoteKeysForTarget:@(targetID)]; + + for (FSTDocumentKey *key : existingKeys) { + [self removeDocument:nil withKey:key fromTarget:targetID]; + } +} + +/** + * Adds the provided document to the internal list of document updates and its document key to the + * given target's mapping. + */ +- (void)addDocument:(FSTMaybeDocument *)document toTarget:(FSTTargetID)targetID { + if (![self isActiveTarget:targetID]) { + return; + } + + FSTDocumentViewChangeType changeType = [self containsDocument:document.key inTarget:targetID] + ? FSTDocumentViewChangeTypeModified + : FSTDocumentViewChangeTypeAdded; + + FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID]; + [targetState addDocumentChangeWithType:changeType forKey:document.key]; + + _pendingDocumentUpdates[document.key] = document; + _pendingDocumentTargetMappings[document.key].insert(targetID); +} + +/** + * Removes the provided document from the target mapping. If the document no longer matches the + * target, but the document's state is still known (e.g. we know that the document was deleted or we + * received the change that caused the filter mismatch), the new document can be provided to update + * the remote document cache. + */ +- (void)removeDocument:(FSTMaybeDocument *_Nullable)document + withKey:(const DocumentKey &)key + fromTarget:(FSTTargetID)targetID { + if (![self isActiveTarget:targetID]) { + return; + } + + FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID]; + + if ([self containsDocument:key inTarget:targetID]) { + [targetState addDocumentChangeWithType:FSTDocumentViewChangeTypeRemoved forKey:key]; } else { - self.pendingTargetResponses[targetID] = @(newCount); + // The document may have entered and left the target before we raised a snapshot, so we can just + // ignore the change. + [targetState removeDocumentChangeForKey:key]; + } + + _pendingDocumentTargetMappings[key].erase(targetID); + + if (document) { + _pendingDocumentUpdates[key] = document; + } +} + +/** + * Returns whether the LocalStore considers the document to be part of the specified target. + */ +- (BOOL)containsDocument:(FSTDocumentKey *)key inTarget:(FSTTargetID)targetID { + const DocumentKeySet &existingKeys = [_targetMetadataProvider remoteKeysForTarget:@(targetID)]; + return existingKeys.contains(key); +} + +- (FSTTargetState *)ensureTargetStateForTarget:(FSTTargetID)targetID { + if (!_targetStates[targetID]) { + _targetStates[targetID] = [FSTTargetState new]; } + + return _targetStates[targetID]; } /** - * Returns true if the given targetId is active. Active targets are those for which there are no + * Returns YES if the given targetId is active. Active targets are those for which there are no * pending requests to add a listen and are in the current list of targets the client cares about. * * Clients can repeatedly listen and stop listening to targets, so this check is useful in * preventing in preventing race conditions for a target where events arrive but the server hasn't * yet acknowledged the intended change in state. */ -- (BOOL)isActiveTarget:(FSTBoxedTargetID *)targetID { +- (BOOL)isActiveTarget:(FSTTargetID)targetID { return [self queryDataForActiveTarget:targetID] != nil; } -- (FSTQueryData *_Nullable)queryDataForActiveTarget:(FSTBoxedTargetID *)targetID { - FSTQueryData *queryData = self.listenTargets[targetID]; - return (queryData && !self.pendingTargetResponses[targetID]) ? queryData : nil; +- (nullable FSTQueryData *)queryDataForActiveTarget:(FSTTargetID)targetID { + auto targetState = _targetStates.find(targetID); + return targetState != _targetStates.end() && targetState->second.isPending + ? nil + : [_targetMetadataProvider queryDataForTarget:@(targetID)]; } -- (void)addExistenceFilterChange:(FSTExistenceFilterWatchChange *)existenceFilterChange { - FSTBoxedTargetID *targetID = @(existenceFilterChange.targetID); - if ([self isActiveTarget:targetID]) { - _existenceFilters[targetID] = existenceFilterChange.filter; +- (FSTRemoteEvent *)remoteEventAtSnapshotVersion:(const SnapshotVersion &)snapshotVersion { + std::unordered_map targetChanges; + + for (const auto &entry : _targetStates) { + FSTTargetID targetID = entry.first; + FSTTargetState *targetState = entry.second; + + FSTQueryData *queryData = [self queryDataForActiveTarget:targetID]; + if (queryData) { + if (targetState.current && [queryData.query isDocumentQuery]) { + // Document queries for document that don't exist can produce an empty result set. To update + // our local cache, we synthesize a document delete if we have not previously received the + // document. This resolves the limbo state of the document, removing it from + // limboDocumentRefs. + FSTDocumentKey *key = [FSTDocumentKey keyWithPath:queryData.query.path]; + if (_pendingDocumentUpdates.find(key) == _pendingDocumentUpdates.end() && + ![self containsDocument:key inTarget:targetID]) { + [self removeDocument:[FSTDeletedDocument documentWithKey:key version:snapshotVersion] + withKey:key + fromTarget:targetID]; + } + } + + if (targetState.hasPendingChanges) { + targetChanges[targetID] = [targetState toTargetChange]; + [targetState clearPendingChanges]; + } + } } -} -- (FSTRemoteEvent *)remoteEvent { - NSMutableDictionary *targetChanges = self.targetChanges; + DocumentKeySet resolvedLimboDocuments; - NSMutableArray *targetsToRemove = [NSMutableArray array]; + // We extract the set of limbo-only document updates as the GC logic special-cases documents that + // do not appear in the query cache. + // + // TODO(gsoltis): Expand on this comment. + for (const auto &entry : _pendingDocumentTargetMappings) { + BOOL isOnlyLimboTarget = YES; + + for (FSTTargetID targetID : entry.second) { + FSTQueryData *queryData = [self queryDataForActiveTarget:targetID]; + if (queryData && queryData.purpose != FSTQueryPurposeLimboResolution) { + isOnlyLimboTarget = NO; + break; + } + } - // Apply any inactive targets. - for (FSTBoxedTargetID *targetID in [targetChanges keyEnumerator]) { - if (![self isActiveTarget:targetID]) { - [targetsToRemove addObject:targetID]; + if (isOnlyLimboTarget) { + resolvedLimboDocuments = resolvedLimboDocuments.insert(entry.first); } } - [targetChanges removeObjectsForKeys:targetsToRemove]; + FSTRemoteEvent *remoteEvent = + [[FSTRemoteEvent alloc] initWithSnapshotVersion:snapshotVersion + targetChanges:targetChanges + targetMismatches:_pendingTargetResets + documentUpdates:_pendingDocumentUpdates + limboDocuments:resolvedLimboDocuments]; + + _pendingDocumentUpdates.clear(); + _pendingDocumentTargetMappings.clear(); + _pendingTargetResets.clear(); - // Mark this aggregator as frozen so no further modifications are made. - self.frozen = YES; - return [[FSTRemoteEvent alloc] initWithSnapshotVersion:_snapshotVersion - targetChanges:targetChanges - documentUpdates:_documentUpdates - limboDocuments:_limboDocuments]; + return remoteEvent; } +- (void)recordTargetRequest:(FSTBoxedTargetID *)targetID { + // For each request we get we need to record we need a response for it. + FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID.intValue]; + [targetState recordTargetRequest]; +} @end NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTRemoteStore.h b/Firestore/Source/Remote/FSTRemoteStore.h index 9b01ce4..302d56a 100644 --- a/Firestore/Source/Remote/FSTRemoteStore.h +++ b/Firestore/Source/Remote/FSTRemoteStore.h @@ -17,6 +17,7 @@ #import #import "Firestore/Source/Core/FSTTypes.h" +#import "Firestore/Source/Remote/FSTRemoteEvent.h" #include "Firestore/core/src/firebase/firestore/auth/user.h" @@ -24,7 +25,6 @@ @class FSTLocalStore; @class FSTMutationBatch; @class FSTMutationBatchResult; -@class FSTQuery; @class FSTQueryData; @class FSTRemoteEvent; @class FSTTransaction; @@ -73,6 +73,12 @@ NS_ASSUME_NONNULL_BEGIN */ - (void)rejectFailedWriteWithBatchID:(FSTBatchID)batchID error:(NSError *)error; +/** + * Returns the set of remote document keys for the given target ID. This list includes the + * documents that were assigned to the target when we received the last snapshot. + */ +- (firebase::firestore::model::DocumentKeySet)remoteKeysForTarget:(FSTBoxedTargetID *)targetId; + @end /** @@ -93,7 +99,7 @@ NS_ASSUME_NONNULL_BEGIN * FSTRemoteStore handles all interaction with the backend through a simple, clean interface. This * class is not thread safe and should be only called from the worker dispatch queue. */ -@interface FSTRemoteStore : NSObject +@interface FSTRemoteStore : NSObject - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore datastore:(FSTDatastore *)datastore diff --git a/Firestore/Source/Remote/FSTRemoteStore.mm b/Firestore/Source/Remote/FSTRemoteStore.mm index 7338ffb..54d00c4 100644 --- a/Firestore/Source/Remote/FSTRemoteStore.mm +++ b/Firestore/Source/Remote/FSTRemoteStore.mm @@ -96,14 +96,13 @@ static const int kMaxPendingWrites = 10; * 0 we know that the client and server are in the same state (once this state * is reached the targetId is removed from the map to free the memory). */ -@property(nonatomic, strong, readonly) - NSMutableDictionary *pendingTargetResponses; -@property(nonatomic, strong) NSMutableArray *accumulatedChanges; @property(nonatomic, assign) FSTBatchID lastBatchSeen; @property(nonatomic, strong, readonly) FSTOnlineStateTracker *onlineStateTracker; +@property(nonatomic, strong, nullable) FSTWatchChangeAggregator *watchChangeAggregator; + #pragma mark Write Stream // The writeStream is null when the network is disabled. The non-null check is performed by // isNetworkEnabled. @@ -126,8 +125,6 @@ static const int kMaxPendingWrites = 10; _localStore = localStore; _datastore = datastore; _listenTargets = [NSMutableDictionary dictionary]; - _pendingTargetResponses = [NSMutableDictionary dictionary]; - _accumulatedChanges = [NSMutableArray array]; _lastBatchSeen = kFSTBatchIDUnknown; _pendingWrites = [NSMutableArray array]; @@ -229,6 +226,7 @@ static const int kMaxPendingWrites = 10; - (void)startWatchStream { HARD_ASSERT([self shouldStartWatchStream], "startWatchStream: called when shouldStartWatchStream: is false."); + _watchChangeAggregator = [[FSTWatchChangeAggregator alloc] initWithTargetMetadataProvider:self]; [self.watchStream startWithDelegate:self]; [self.onlineStateTracker handleWatchStreamStart]; } @@ -248,7 +246,7 @@ static const int kMaxPendingWrites = 10; } - (void)sendWatchRequestWithQueryData:(FSTQueryData *)queryData { - [self recordPendingRequestForTargetID:@(queryData.targetID)]; + [self.watchChangeAggregator recordTargetRequest:@(queryData.targetID)]; [self.watchStream watchQuery:queryData]; } @@ -267,16 +265,10 @@ static const int kMaxPendingWrites = 10; } - (void)sendUnwatchRequestForTargetID:(FSTBoxedTargetID *)targetID { - [self recordPendingRequestForTargetID:targetID]; + [self.watchChangeAggregator recordTargetRequest:targetID]; [self.watchStream unwatchTargetID:[targetID intValue]]; } -- (void)recordPendingRequestForTargetID:(FSTBoxedTargetID *)targetID { - NSNumber *count = [self.pendingTargetResponses objectForKey:targetID]; - count = @([count intValue] + 1); - [self.pendingTargetResponses setObject:count forKey:targetID]; -} - /** * Returns YES if the network is enabled, the watch stream has not yet been started and there are * active watch targets. @@ -286,11 +278,7 @@ static const int kMaxPendingWrites = 10; } - (void)cleanUpWatchStreamState { - // If the connection is closed then we'll never get a snapshot version for the accumulated - // changes and so we'll never be able to complete the batch. When we start up again the server - // is going to resend these changes anyway, so just toss the accumulated state. - [self.accumulatedChanges removeAllObjects]; - [self.pendingTargetResponses removeAllObjects]; + _watchChangeAggregator = nil; } - (void)watchStreamDidOpen { @@ -305,28 +293,27 @@ static const int kMaxPendingWrites = 10; // Mark the connection as Online because we got a message from the server. [self.onlineStateTracker updateState:FSTOnlineStateOnline]; - FSTWatchTargetChange *watchTargetChange = - [change isKindOfClass:[FSTWatchTargetChange class]] ? (FSTWatchTargetChange *)change : nil; - - if (watchTargetChange && watchTargetChange.state == FSTWatchTargetChangeStateRemoved && - watchTargetChange.cause) { - // There was an error on a target, don't wait for a consistent snapshot to raise events - [self processTargetErrorForWatchChange:(FSTWatchTargetChange *)change]; - } else { - // Accumulate watch changes but don't process them if there's no snapshotVersion or it's - // older than a previous snapshot we've processed (can happen after we resume a target - // using a resume token). - [self.accumulatedChanges addObject:change]; - if (snapshotVersion == SnapshotVersion::None() || - snapshotVersion < [self.localStore lastRemoteSnapshotVersion]) { - return; + if ([change isKindOfClass:[FSTWatchTargetChange class]]) { + FSTWatchTargetChange *watchTargetChange = (FSTWatchTargetChange *)change; + if (watchTargetChange.state == FSTWatchTargetChangeStateRemoved && watchTargetChange.cause) { + // There was an error on a target, don't wait for a consistent snapshot to raise events + return [self processTargetErrorForWatchChange:watchTargetChange]; + } else { + [self.watchChangeAggregator handleTargetChange:watchTargetChange]; } + } else if ([change isKindOfClass:[FSTDocumentWatchChange class]]) { + [self.watchChangeAggregator handleDocumentChange:(FSTDocumentWatchChange *)change]; + } else { + HARD_ASSERT([change isKindOfClass:[FSTExistenceFilterWatchChange class]], + "Expected watchChange to be an instance of FSTExistenceFilterWatchChange"); + [self.watchChangeAggregator handleExistenceFilter:(FSTExistenceFilterWatchChange *)change]; + } - // Create a batch, giving it the accumulatedChanges array. - NSArray *changes = self.accumulatedChanges; - self.accumulatedChanges = [NSMutableArray array]; - - [self processBatchedWatchChanges:changes snapshotVersion:snapshotVersion]; + if (snapshotVersion != SnapshotVersion::None() && + snapshotVersion >= [self.localStore lastRemoteSnapshotVersion]) { + // We have received a target change with a global snapshot if the snapshot version is not equal + // to SnapshotVersion.None(). + [self raiseWatchSnapshotWithSnapshotVersion:snapshotVersion]; } } @@ -353,107 +340,60 @@ static const int kMaxPendingWrites = 10; * Takes a batch of changes from the Datastore, repackages them as a RemoteEvent, and passes that * on to the SyncEngine. */ -- (void)processBatchedWatchChanges:(NSArray *)changes - snapshotVersion:(const SnapshotVersion &)snapshotVersion { - FSTWatchChangeAggregator *aggregator = - [[FSTWatchChangeAggregator alloc] initWithSnapshotVersion:snapshotVersion - listenTargets:self.listenTargets - pendingTargetResponses:self.pendingTargetResponses]; - [aggregator addWatchChanges:changes]; - FSTRemoteEvent *remoteEvent = [aggregator remoteEvent]; - [self.pendingTargetResponses removeAllObjects]; - [self.pendingTargetResponses setDictionary:aggregator.pendingTargetResponses]; - - // Handle existence filters and existence filter mismatches - [aggregator.existenceFilters enumerateKeysAndObjectsUsingBlock:^(FSTBoxedTargetID *target, - FSTExistenceFilter *filter, - BOOL *stop) { - FSTTargetID targetID = target.intValue; - - FSTQueryData *queryData = self.listenTargets[target]; - FSTQuery *query = queryData.query; - if (!queryData) { - // A watched target might have been removed already. - return; - - } else if ([query isDocumentQuery]) { - if (filter.count == 0) { - // The existence filter told us the document does not exist. - // We need to deduce that this document does not exist and apply a deleted document to our - // updates. Without applying a deleted document there might be another query that will - // raise this document as part of a snapshot until it is resolved, essentially exposing - // inconsistency between queries - const DocumentKey key{query.path}; - FSTDeletedDocument *deletedDoc = - [FSTDeletedDocument documentWithKey:key version:snapshotVersion]; - [remoteEvent addDocumentUpdate:deletedDoc]; - } else { - HARD_ASSERT(filter.count == 1, "Single document existence filter with count: %s", - filter.count); - } - - } else { - // Not a document query. - DocumentKeySet trackedRemote = [self.localStore remoteDocumentKeysForTarget:targetID]; - FSTTargetMapping *mapping = remoteEvent.targetChanges[target].mapping; - if (mapping) { - if ([mapping isKindOfClass:[FSTUpdateMapping class]]) { - FSTUpdateMapping *update = (FSTUpdateMapping *)mapping; - trackedRemote = [update applyTo:trackedRemote]; - } else { - HARD_ASSERT([mapping isKindOfClass:[FSTResetMapping class]], - "Expected either reset or update mapping but got something else %s", mapping); - trackedRemote = ((FSTResetMapping *)mapping).documents; - } - } +- (void)raiseWatchSnapshotWithSnapshotVersion:(const SnapshotVersion &)snapshotVersion { + HARD_ASSERT(snapshotVersion != SnapshotVersion::None(), + "Can't raise event for unknown SnapshotVersion"); - if (trackedRemote.size() != static_cast(filter.count)) { - LOG_DEBUG("Existence filter mismatch, resetting mapping"); - - // Make sure the mismatch is exposed in the remote event - [remoteEvent handleExistenceFilterMismatchForTargetID:target]; - - // Clear the resume token for the query, since we're in a known mismatch state. - queryData = [[FSTQueryData alloc] initWithQuery:query - targetID:targetID - listenSequenceNumber:queryData.sequenceNumber - purpose:queryData.purpose]; - self.listenTargets[target] = queryData; - - // Cause a hard reset by unwatching and rewatching immediately, but deliberately don't - // send a resume token so that we get a full update. - [self sendUnwatchRequestForTargetID:@(targetID)]; - - // Mark the query we send as being on behalf of an existence filter mismatch, but don't - // actually retain that in listenTargets. This ensures that we flag the first re-listen - // this way without impacting future listens of this target (that might happen e.g. on - // reconnect). - FSTQueryData *requestQueryData = - [[FSTQueryData alloc] initWithQuery:query - targetID:targetID - listenSequenceNumber:queryData.sequenceNumber - purpose:FSTQueryPurposeExistenceFilterMismatch]; - [self sendWatchRequestWithQueryData:requestQueryData]; - } - } - }]; + FSTRemoteEvent *remoteEvent = + [self.watchChangeAggregator remoteEventAtSnapshotVersion:snapshotVersion]; // Update in-memory resume tokens. FSTLocalStore will update the persistent view of these when // applying the completed FSTRemoteEvent. - [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^( - FSTBoxedTargetID *target, FSTTargetChange *change, BOOL *stop) { - NSData *resumeToken = change.resumeToken; + for (const auto &entry : remoteEvent.targetChanges) { + NSData *resumeToken = entry.second.resumeToken; if (resumeToken.length > 0) { - FSTQueryData *queryData = self->_listenTargets[target]; + FSTBoxedTargetID *targetID = @(entry.first); + FSTQueryData *queryData = _listenTargets[targetID]; // A watched target might have been removed already. if (queryData) { - self->_listenTargets[target] = - [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion + _listenTargets[targetID] = + [queryData queryDataByReplacingSnapshotVersion:snapshotVersion resumeToken:resumeToken sequenceNumber:queryData.sequenceNumber]; } } - }]; + } + + // Re-establish listens for the targets that have been invalidated by existence filter mismatches. + for (FSTTargetID targetID : remoteEvent.targetMismatches) { + FSTQueryData *queryData = self.listenTargets[@(targetID)]; + + if (!queryData) { + // A watched target might have been removed already. + continue; + } + + // Clear the resume token for the query, since we're in a known mismatch state. + queryData = [[FSTQueryData alloc] initWithQuery:queryData.query + targetID:targetID + listenSequenceNumber:queryData.sequenceNumber + purpose:queryData.purpose]; + self.listenTargets[@(targetID)] = queryData; + + // Cause a hard reset by unwatching and rewatching immediately, but deliberately don't send a + // resume token so that we get a full update. + [self sendUnwatchRequestForTargetID:@(targetID)]; + + // Mark the query we send as being on behalf of an existence filter mismatch, but don't actually + // retain that in listenTargets. This ensures that we flag the first re-listen this way without + // impacting future listens of this target (that might happen e.g. on reconnect). + FSTQueryData *requestQueryData = + [[FSTQueryData alloc] initWithQuery:queryData.query + targetID:targetID + listenSequenceNumber:queryData.sequenceNumber + purpose:FSTQueryPurposeExistenceFilterMismatch]; + [self sendWatchRequestWithQueryData:requestQueryData]; + } // Finally handle remote event [self.syncEngine applyRemoteEvent:remoteEvent]; @@ -471,6 +411,14 @@ static const int kMaxPendingWrites = 10; } } +- (firebase::firestore::model::DocumentKeySet)remoteKeysForTarget:(FSTBoxedTargetID *)targetID { + return [self.syncEngine remoteKeysForTarget:targetID]; +} + +- (nullable FSTQueryData *)queryDataForTarget:(FSTBoxedTargetID *)targetID { + return self.listenTargets[targetID]; +} + #pragma mark Write Stream /** diff --git a/Firestore/Source/Remote/FSTWatchChange.mm b/Firestore/Source/Remote/FSTWatchChange.mm index 284e980..04656b9 100644 --- a/Firestore/Source/Remote/FSTWatchChange.mm +++ b/Firestore/Source/Remote/FSTWatchChange.mm @@ -127,7 +127,7 @@ NS_ASSUME_NONNULL_BEGIN if (self) { _state = state; _targetIDs = targetIDs; - _resumeToken = resumeToken; + _resumeToken = [resumeToken copy]; _cause = cause; } return self; -- cgit v1.2.3