aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/Source/Remote/FSTRemoteEvent.mm
diff options
context:
space:
mode:
Diffstat (limited to 'Firestore/Source/Remote/FSTRemoteEvent.mm')
-rw-r--r--Firestore/Source/Remote/FSTRemoteEvent.mm853
1 files changed, 409 insertions, 444 deletions
diff --git a/Firestore/Source/Remote/FSTRemoteEvent.mm b/Firestore/Source/Remote/FSTRemoteEvent.mm
index 67af650..5e00310 100644
--- a/Firestore/Source/Remote/FSTRemoteEvent.mm
+++ b/Firestore/Source/Remote/FSTRemoteEvent.mm
@@ -17,10 +17,16 @@
#import "Firestore/Source/Remote/FSTRemoteEvent.h"
#include <map>
+#include <set>
+#include <unordered_map>
#include <utility>
+#import "Firestore/Source/Core/FSTQuery.h"
+#import "Firestore/Source/Core/FSTViewSnapshot.h"
#import "Firestore/Source/Local/FSTQueryData.h"
#import "Firestore/Source/Model/FSTDocument.h"
+#import "Firestore/Source/Remote/FSTExistenceFilter.h"
+#import "Firestore/Source/Remote/FSTRemoteStore.h"
#import "Firestore/Source/Remote/FSTWatchChange.h"
#import "Firestore/Source/Util/FSTClasses.h"
@@ -30,619 +36,578 @@
#include "Firestore/core/src/firebase/firestore/util/log.h"
using firebase::firestore::model::DocumentKey;
+using firebase::firestore::model::DocumentKeyHash;
+using firebase::firestore::model::DocumentKeySet;
using firebase::firestore::model::SnapshotVersion;
using firebase::firestore::util::Hash;
-using firebase::firestore::model::DocumentKeySet;
NS_ASSUME_NONNULL_BEGIN
-#pragma mark - FSTTargetMapping
-
-@interface FSTTargetMapping ()
-
-/** Private mutator method to add a document key to the mapping */
-- (void)addDocumentKey:(const DocumentKey &)documentKey;
-
-/** Private mutator method to remove a document key from the mapping */
-- (void)removeDocumentKey:(const DocumentKey &)documentKey;
-
-@end
-
-@implementation FSTTargetMapping
-
-- (void)addDocumentKey:(const DocumentKey &)documentKey {
- @throw FSTAbstractMethodException(); // NOLINT
-}
+#pragma mark - FSTTargetChange
-- (void)removeDocumentKey:(const DocumentKey &)documentKey {
- @throw FSTAbstractMethodException(); // NOLINT
+@implementation FSTTargetChange {
+ DocumentKeySet _addedDocuments;
+ DocumentKeySet _modifiedDocuments;
+ DocumentKeySet _removedDocuments;
}
-- (void)filterUpdatesUsingExistingKeys:(const DocumentKeySet &)existingKeys {
- @throw FSTAbstractMethodException(); // NOLINT
+- (instancetype)initWithResumeToken:(NSData *)resumeToken
+ current:(BOOL)current
+ addedDocuments:(DocumentKeySet)addedDocuments
+ modifiedDocuments:(DocumentKeySet)modifiedDocuments
+ removedDocuments:(DocumentKeySet)removedDocuments {
+ if (self = [super init]) {
+ _resumeToken = [resumeToken copy];
+ _current = current;
+ _addedDocuments = std::move(addedDocuments);
+ _modifiedDocuments = std::move(modifiedDocuments);
+ _removedDocuments = std::move(removedDocuments);
+ }
+ return self;
}
-@end
-
-#pragma mark - FSTResetMapping
-
-@implementation FSTResetMapping {
- DocumentKeySet _documents;
+- (const DocumentKeySet &)addedDocuments {
+ return _addedDocuments;
}
-+ (instancetype)mappingWithDocuments:(NSArray<FSTDocument *> *)documents {
- DocumentKeySet keys;
- for (FSTDocument *doc in documents) {
- keys = keys.insert(doc.key);
- }
- return [[FSTResetMapping alloc] initWithDocuments:std::move(keys)];
+- (const DocumentKeySet &)modifiedDocuments {
+ return _modifiedDocuments;
}
-- (instancetype)initWithDocuments:(DocumentKeySet)documents {
- self = [super init];
- if (self) {
- _documents = std::move(documents);
- }
- return self;
-}
-
-- (const DocumentKeySet &)documents {
- return _documents;
+- (const DocumentKeySet &)removedDocuments {
+ return _removedDocuments;
}
- (BOOL)isEqual:(id)other {
if (other == self) {
return YES;
}
- if (![other isMemberOfClass:[FSTResetMapping class]]) {
+ if (![other isMemberOfClass:[FSTTargetChange class]]) {
return NO;
}
- FSTResetMapping *otherMapping = (FSTResetMapping *)other;
- return _documents == otherMapping.documents;
+ return [self current] == [other current] &&
+ [[self resumeToken] isEqualToData:[other resumeToken]] &&
+ [self addedDocuments] == [other addedDocuments] &&
+ [self modifiedDocuments] == [other modifiedDocuments] &&
+ [self removedDocuments] == [other removedDocuments];
}
-- (NSUInteger)hash {
- return Hash(_documents);
-}
+@end
-- (void)addDocumentKey:(const DocumentKey &)documentKey {
- _documents = _documents.insert(documentKey);
-}
+#pragma mark - FSTTargetState
-- (void)removeDocumentKey:(const DocumentKey &)documentKey {
- _documents = _documents.erase(documentKey);
-}
+/** Tracks the internal state of a Watch target. */
+@interface FSTTargetState : NSObject
-- (void)filterUpdatesUsingExistingKeys:(const DocumentKeySet &)existingKeys {
- // No-op. Resets are not filtered.
-}
+/**
+ * Whether this target has been marked 'current'.
+ *
+ * 'Current' has special meaning in the RPC protocol: It implies that the Watch backend has sent us
+ * all changes up to the point at which the target was added and that the target is consistent with
+ * the rest of the watch stream.
+ */
+@property(nonatomic) BOOL current;
-@end
+/** The last resume token sent to us for this target. */
+@property(nonatomic, readonly, strong) NSData *resumeToken;
-#pragma mark - FSTUpdateMapping
+/** Whether we have modified any state that should trigger a snapshot. */
+@property(nonatomic, readonly) BOOL hasPendingChanges;
-@implementation FSTUpdateMapping {
- DocumentKeySet _addedDocuments;
- DocumentKeySet _removedDocuments;
-}
+/** Whether this target has pending target adds or target removes. */
+- (BOOL)isPending;
-+ (FSTUpdateMapping *)mappingWithAddedDocuments:(NSArray<FSTDocument *> *)added
- removedDocuments:(NSArray<FSTDocument *> *)removed {
- DocumentKeySet addedDocuments;
- DocumentKeySet removedDocuments;
- for (FSTDocument *doc in added) {
- addedDocuments = addedDocuments.insert(doc.key);
- }
- for (FSTDocument *doc in removed) {
- removedDocuments = removedDocuments.insert(doc.key);
- }
- return [[FSTUpdateMapping alloc] initWithAddedDocuments:std::move(addedDocuments)
- removedDocuments:std::move(removedDocuments)];
-}
+/**
+ * Applies the resume token to the TargetChange, but only when it has a new value. Empty
+ * resumeTokens are discarded.
+ */
+- (void)updateResumeToken:(NSData *)resumeToken;
-- (instancetype)initWithAddedDocuments:(DocumentKeySet)addedDocuments
- removedDocuments:(DocumentKeySet)removedDocuments {
- self = [super init];
- if (self) {
- _addedDocuments = std::move(addedDocuments);
- _removedDocuments = std::move(removedDocuments);
- }
- return self;
-}
+/** Resets the document changes and sets `hasPendingChanges` to false. */
+- (void)clearPendingChanges;
+/**
+ * Creates a target change from the current set of changes.
+ *
+ * To reset the document changes after raising this snapshot, call `clearPendingChanges()`.
+ */
+- (FSTTargetChange *)toTargetChange;
-- (const DocumentKeySet &)addedDocuments {
- return _addedDocuments;
-}
+- (void)recordTargetRequest;
+- (void)recordTargetResponse;
+- (void)markCurrent;
+- (void)addDocumentChangeWithType:(FSTDocumentViewChangeType)type
+ forKey:(const DocumentKey &)documentKey;
+- (void)removeDocumentChangeForKey:(const DocumentKey &)documentKey;
-- (const DocumentKeySet &)removedDocuments {
- return _removedDocuments;
-}
+@end
-- (BOOL)isEqual:(id)other {
- if (other == self) {
- return YES;
- }
- if (![other isMemberOfClass:[FSTUpdateMapping class]]) {
- return NO;
- }
+@implementation FSTTargetState {
+ /**
+ * The number of outstanding responses (adds or removes) that we are waiting on. We only consider
+ * targets active that have no outstanding responses.
+ */
+ int _outstandingResponses;
- FSTUpdateMapping *otherMapping = (FSTUpdateMapping *)other;
- return _addedDocuments == otherMapping.addedDocuments &&
- _removedDocuments == otherMapping.removedDocuments;
+ /**
+ * Keeps track of the document changes since the last raised snapshot.
+ *
+ * These changes are continuously updated as we receive document updates and always reflect the
+ * current set of changes against the last issued snapshot.
+ */
+ std::unordered_map<DocumentKey, FSTDocumentViewChangeType, DocumentKeyHash> _documentChanges;
}
-- (NSUInteger)hash {
- return Hash(_addedDocuments, _removedDocuments);
-}
+- (instancetype)init {
+ if (self = [super init]) {
+ _resumeToken = [NSData data];
+ _outstandingResponses = 0;
-- (DocumentKeySet)applyTo:(const DocumentKeySet &)keys {
- DocumentKeySet result = keys;
- for (const DocumentKey &key : _addedDocuments) {
- result = result.insert(key);
+ // We initialize to 'true' so that newly-added targets are included in the next RemoteEvent.
+ _hasPendingChanges = YES;
}
- for (const DocumentKey &key : _removedDocuments) {
- result = result.erase(key);
- }
- return result;
-}
-
-- (void)addDocumentKey:(const DocumentKey &)documentKey {
- _addedDocuments = _addedDocuments.insert(documentKey);
- _removedDocuments = _removedDocuments.erase(documentKey);
+ return self;
}
-- (void)removeDocumentKey:(const DocumentKey &)documentKey {
- _addedDocuments = _addedDocuments.erase(documentKey);
- _removedDocuments = _removedDocuments.insert(documentKey);
+- (BOOL)isPending {
+ return _outstandingResponses != 0;
}
-- (void)filterUpdatesUsingExistingKeys:(const DocumentKeySet &)existingKeys {
- DocumentKeySet result = _addedDocuments;
- for (const DocumentKey &key : _addedDocuments) {
- if (existingKeys.contains(key)) {
- result = result.erase(key);
- }
+- (void)updateResumeToken:(NSData *)resumeToken {
+ if (resumeToken.length > 0) {
+ _hasPendingChanges = YES;
+ _resumeToken = [resumeToken copy];
}
- _addedDocuments = result;
}
-@end
-
-#pragma mark - FSTTargetChange
+- (void)clearPendingChanges {
+ _hasPendingChanges = NO;
+ _documentChanges.clear();
+}
-@interface FSTTargetChange ()
-@property(nonatomic, assign) FSTCurrentStatusUpdate currentStatusUpdate;
-@property(nonatomic, strong, nullable) FSTTargetMapping *mapping;
-@property(nonatomic, strong) NSData *resumeToken;
-@end
+- (void)recordTargetRequest {
+ _outstandingResponses += 1;
+}
-@implementation FSTTargetChange {
- SnapshotVersion _snapshotVersion;
+- (void)recordTargetResponse {
+ _outstandingResponses -= 1;
}
-- (instancetype)init {
- if (self = [super init]) {
- _currentStatusUpdate = FSTCurrentStatusUpdateNone;
- _resumeToken = [NSData data];
- }
- return self;
+- (void)markCurrent {
+ _hasPendingChanges = YES;
+ _current = true;
}
-- (instancetype)initWithSnapshotVersion:(SnapshotVersion)snapshotVersion {
- if (self = [self init]) {
- _snapshotVersion = std::move(snapshotVersion);
- }
- return self;
+- (void)addDocumentChangeWithType:(FSTDocumentViewChangeType)type
+ forKey:(const DocumentKey &)documentKey {
+ _hasPendingChanges = YES;
+ _documentChanges[documentKey] = type;
}
-- (const SnapshotVersion &)snapshotVersion {
- return _snapshotVersion;
+- (void)removeDocumentChangeForKey:(const DocumentKey &)documentKey {
+ _hasPendingChanges = YES;
+ _documentChanges.erase(documentKey);
}
-+ (instancetype)changeWithDocuments:(NSArray<FSTMaybeDocument *> *)docs
- currentStatusUpdate:(FSTCurrentStatusUpdate)currentStatusUpdate {
+- (FSTTargetChange *)toTargetChange {
DocumentKeySet addedDocuments;
+ DocumentKeySet modifiedDocuments;
DocumentKeySet removedDocuments;
- for (FSTMaybeDocument *doc in docs) {
- if ([doc isKindOfClass:[FSTDeletedDocument class]]) {
- removedDocuments = removedDocuments.insert(doc.key);
- } else {
- addedDocuments = addedDocuments.insert(doc.key);
+
+ for (const auto &entry : _documentChanges) {
+ switch (entry.second) {
+ case FSTDocumentViewChangeTypeAdded:
+ addedDocuments = addedDocuments.insert(entry.first);
+ break;
+ case FSTDocumentViewChangeTypeModified:
+ modifiedDocuments = modifiedDocuments.insert(entry.first);
+ break;
+ case FSTDocumentViewChangeTypeRemoved:
+ removedDocuments = removedDocuments.insert(entry.first);
+ break;
+ default:
+ HARD_FAIL("Encountered invalid change type: %s", entry.second);
}
}
- FSTUpdateMapping *mapping =
- [[FSTUpdateMapping alloc] initWithAddedDocuments:std::move(addedDocuments)
- removedDocuments:std::move(removedDocuments)];
-
- FSTTargetChange *change = [[FSTTargetChange alloc] init];
- change.mapping = mapping;
- change.currentStatusUpdate = currentStatusUpdate;
- return change;
-}
-
-+ (instancetype)changeWithMapping:(FSTTargetMapping *)mapping
- snapshotVersion:(SnapshotVersion)snapshotVersion
- currentStatusUpdate:(FSTCurrentStatusUpdate)currentStatusUpdate {
- FSTTargetChange *change = [[FSTTargetChange alloc] init];
- change.mapping = mapping;
- change->_snapshotVersion = std::move(snapshotVersion);
- change.currentStatusUpdate = currentStatusUpdate;
- return change;
-}
-
-- (FSTTargetMapping *)mapping {
- if (!_mapping) {
- // Create an FSTUpdateMapping by default, since resets are always explicit
- _mapping = [[FSTUpdateMapping alloc] init];
- }
- return _mapping;
-}
-/**
- * Sets the resume token but only when it has a new value. Empty resumeTokens are
- * discarded.
- */
-- (void)setResumeToken:(NSData *)resumeToken {
- if (resumeToken.length > 0) {
- _resumeToken = resumeToken;
- }
+ return [[FSTTargetChange alloc] initWithResumeToken:_resumeToken
+ current:_current
+ addedDocuments:std::move(addedDocuments)
+ modifiedDocuments:std::move(modifiedDocuments)
+ removedDocuments:std::move(removedDocuments)];
}
-
@end
#pragma mark - FSTRemoteEvent
@implementation FSTRemoteEvent {
SnapshotVersion _snapshotVersion;
- NSMutableDictionary<FSTBoxedTargetID *, FSTTargetChange *> *_targetChanges;
- std::map<DocumentKey, FSTMaybeDocument *> _documentUpdates;
+ std::unordered_map<FSTTargetID, FSTTargetChange *> _targetChanges;
+ std::unordered_set<FSTTargetID> _targetMismatches;
+ std::unordered_map<DocumentKey, FSTMaybeDocument *, DocumentKeyHash> _documentUpdates;
DocumentKeySet _limboDocumentChanges;
}
-- (instancetype)initWithSnapshotVersion:(SnapshotVersion)snapshotVersion
- targetChanges:
- (NSMutableDictionary<NSNumber *, FSTTargetChange *> *)targetChanges
- documentUpdates:(std::map<DocumentKey, FSTMaybeDocument *>)documentUpdates
- limboDocuments:(DocumentKeySet)limboDocuments {
+- (instancetype)
+initWithSnapshotVersion:(SnapshotVersion)snapshotVersion
+ targetChanges:(std::unordered_map<FSTTargetID, FSTTargetChange *>)targetChanges
+ targetMismatches:(std::unordered_set<FSTTargetID>)targetMismatches
+ documentUpdates:
+ (std::unordered_map<DocumentKey, FSTMaybeDocument *, DocumentKeyHash>)documentUpdates
+ limboDocuments:(DocumentKeySet)limboDocuments {
self = [super init];
if (self) {
_snapshotVersion = std::move(snapshotVersion);
- _targetChanges = targetChanges;
+ _targetChanges = std::move(targetChanges);
+ _targetMismatches = std::move(targetMismatches);
_documentUpdates = std::move(documentUpdates);
_limboDocumentChanges = std::move(limboDocuments);
}
return self;
}
-- (NSDictionary<FSTBoxedTargetID *, FSTTargetChange *> *)targetChanges {
- return _targetChanges;
+- (const SnapshotVersion &)snapshotVersion {
+ return _snapshotVersion;
}
- (const DocumentKeySet &)limboDocumentChanges {
return _limboDocumentChanges;
}
-- (const std::map<DocumentKey, FSTMaybeDocument *> &)documentUpdates {
- return _documentUpdates;
-}
-
-- (const SnapshotVersion &)snapshotVersion {
- return _snapshotVersion;
-}
-
-- (void)synthesizeDeleteForLimboTargetChange:(FSTTargetChange *)targetChange
- key:(const DocumentKey &)key {
- if (targetChange.currentStatusUpdate == FSTCurrentStatusUpdateMarkCurrent &&
- _documentUpdates.find(key) == _documentUpdates.end()) {
- // When listening to a query the server responds with a snapshot containing documents
- // matching the query and a current marker telling us we're now in sync. It's possible for
- // these to arrive as separate remote events or as a single remote event. For a document
- // query, there will be no documents sent in the response if the document doesn't exist.
- //
- // If the snapshot arrives separately from the current marker, we handle it normally and
- // updateTrackedLimboDocumentsWithChanges:targetID: will resolve the limbo status of the
- // document, removing it from limboDocumentRefs. This works because clients only initiate
- // limbo resolution when a target is current and because all current targets are always at a
- // consistent snapshot.
- //
- // However, if the document doesn't exist and the current marker arrives, the document is
- // not present in the snapshot and our normal view handling would consider the document to
- // remain in limbo indefinitely because there are no updates to the document. To avoid this,
- // we specially handle this case here: synthesizing a delete.
- //
- // TODO(dimond): Ideally we would have an explicit lookup query instead resulting in an
- // explicit delete message and we could remove this special logic.
- _documentUpdates[key] = [FSTDeletedDocument documentWithKey:key version:_snapshotVersion];
- _limboDocumentChanges = _limboDocumentChanges.insert(key);
- }
+- (const std::unordered_map<FSTTargetID, FSTTargetChange *> &)targetChanges {
+ return _targetChanges;
}
-/** Adds a document update to this remote event */
-- (void)addDocumentUpdate:(FSTMaybeDocument *)document {
- _documentUpdates[document.key] = document;
+- (const std::unordered_map<DocumentKey, FSTMaybeDocument *, DocumentKeyHash> &)documentUpdates {
+ return _documentUpdates;
}
-/** Handles an existence filter mismatch */
-- (void)handleExistenceFilterMismatchForTargetID:(FSTBoxedTargetID *)targetID {
- // An existence filter mismatch will reset the query and we need to reset the mapping to contain
- // no documents and an empty resume token.
- //
- // Note:
- // * The reset mapping is empty, specifically forcing the consumer of the change to
- // forget all keys for this targetID;
- // * The resume snapshot for this target must be reset
- // * The target must be unacked because unwatching and rewatching introduces a race for
- // changes.
- //
- // TODO(dimond): keep track of reset targets not to raise.
- FSTTargetChange *targetChange =
- [FSTTargetChange changeWithMapping:[[FSTResetMapping alloc] init]
- snapshotVersion:SnapshotVersion::None()
- currentStatusUpdate:FSTCurrentStatusUpdateMarkNotCurrent];
- _targetChanges[targetID] = targetChange;
+- (const std::unordered_set<FSTTargetID> &)targetMismatches {
+ return _targetMismatches;
}
@end
#pragma mark - FSTWatchChangeAggregator
-@interface FSTWatchChangeAggregator ()
-
-/** Keeps track of the current target mappings */
-@property(nonatomic, strong, readonly)
- NSMutableDictionary<FSTBoxedTargetID *, FSTTargetChange *> *targetChanges;
-
-/** The set of open listens on the client */
-@property(nonatomic, strong, readonly)
- NSDictionary<FSTBoxedTargetID *, FSTQueryData *> *listenTargets;
+@implementation FSTWatchChangeAggregator {
+ /** The internal state of all tracked targets. */
+ std::unordered_map<FSTTargetID, FSTTargetState *> _targetStates;
-/** Whether this aggregator was frozen and can no longer be modified */
-@property(nonatomic, assign) BOOL frozen;
+ /** Keeps track of document to update */
+ std::unordered_map<DocumentKey, FSTMaybeDocument *, DocumentKeyHash> _pendingDocumentUpdates;
-@end
+ /** A mapping of document keys to their set of target IDs. */
+ std::unordered_map<DocumentKey, std::set<FSTTargetID>, DocumentKeyHash>
+ _pendingDocumentTargetMappings;
-@implementation FSTWatchChangeAggregator {
- NSMutableDictionary<FSTBoxedTargetID *, FSTExistenceFilter *> *_existenceFilters;
- /** Keeps track of document to update */
- std::map<DocumentKey, FSTMaybeDocument *> _documentUpdates;
+ /**
+ * A list of targets with existence filter mismatches. These targets are known to be inconsistent
+ * and their listens needs to be re-established by RemoteStore.
+ */
+ std::unordered_set<FSTTargetID> _pendingTargetResets;
- DocumentKeySet _limboDocuments;
- /** The snapshot version for every target change this creates. */
- SnapshotVersion _snapshotVersion;
+ id<FSTTargetMetadataProvider> _targetMetadataProvider;
}
-- (instancetype)
-initWithSnapshotVersion:(SnapshotVersion)snapshotVersion
- listenTargets:(NSDictionary<FSTBoxedTargetID *, FSTQueryData *> *)listenTargets
- pendingTargetResponses:(NSDictionary<FSTBoxedTargetID *, NSNumber *> *)pendingTargetResponses {
+- (instancetype)initWithTargetMetadataProvider:
+ (id<FSTTargetMetadataProvider>)targetMetadataProvider {
self = [super init];
if (self) {
- _snapshotVersion = std::move(snapshotVersion);
-
- _frozen = NO;
- _targetChanges = [NSMutableDictionary dictionary];
- _listenTargets = listenTargets;
- _pendingTargetResponses = [NSMutableDictionary dictionaryWithDictionary:pendingTargetResponses];
- _limboDocuments = DocumentKeySet{};
- _existenceFilters = [NSMutableDictionary dictionary];
+ _targetMetadataProvider = targetMetadataProvider;
}
return self;
}
-- (NSDictionary<FSTBoxedTargetID *, FSTExistenceFilter *> *)existenceFilters {
- return _existenceFilters;
-}
-
-- (FSTTargetChange *)targetChangeForTargetID:(FSTBoxedTargetID *)targetID {
- FSTTargetChange *change = self.targetChanges[targetID];
- if (!change) {
- change = [[FSTTargetChange alloc] initWithSnapshotVersion:_snapshotVersion];
- self.targetChanges[targetID] = change;
- }
- return change;
-}
-
-- (void)addWatchChanges:(NSArray<FSTWatchChange *> *)watchChanges {
- HARD_ASSERT(!self.frozen, "Trying to modify frozen FSTWatchChangeAggregator");
- for (FSTWatchChange *watchChange in watchChanges) {
- [self addWatchChange:watchChange];
- }
-}
-
-- (void)addWatchChange:(FSTWatchChange *)watchChange {
- HARD_ASSERT(!self.frozen, "Trying to modify frozen FSTWatchChangeAggregator");
- if ([watchChange isKindOfClass:[FSTDocumentWatchChange class]]) {
- [self addDocumentChange:(FSTDocumentWatchChange *)watchChange];
- } else if ([watchChange isKindOfClass:[FSTWatchTargetChange class]]) {
- [self addTargetChange:(FSTWatchTargetChange *)watchChange];
- } else if ([watchChange isKindOfClass:[FSTExistenceFilterWatchChange class]]) {
- [self addExistenceFilterChange:(FSTExistenceFilterWatchChange *)watchChange];
- } else {
- HARD_FAIL("Unknown watch change: %s", watchChange);
- }
-}
-
-/**
- * Updates limbo document tracking for a given target-document mapping change. If the target is a
- * limbo target, and the change for the document has only seen limbo targets so far, and we are not
- * already tracking a change for this document, then consider this document a limbo document update.
- * Otherwise, ensure that we don't consider this document a limbo document. Returns true if the
- * change still has only seen limbo resolution changes.
- */
-- (BOOL)updateLimboDocumentsForKey:(const DocumentKey &)documentKey
- queryData:(FSTQueryData *)queryData
- isOnlyLimbo:(BOOL)isOnlyLimbo {
- if (!isOnlyLimbo) {
- // It wasn't a limbo doc before, so it definitely isn't now.
- return NO;
- }
- if (_documentUpdates.find(documentKey) == _documentUpdates.end()) {
- // We haven't seen a document update for this key yet.
- if (queryData.purpose == FSTQueryPurposeLimboResolution) {
- // We haven't seen this document before, and this target is a limbo target.
- _limboDocuments = _limboDocuments.insert(documentKey);
- return YES;
- } else {
- // We haven't seen the document before, but this is a non-limbo target.
- // Since we haven't seen it, we know it's not in our set of limbo docs. Return NO to ensure
- // that this key is marked as non-limbo.
- return NO;
- }
- } else if (queryData.purpose == FSTQueryPurposeLimboResolution) {
- // We have only seen limbo targets so far for this document, and this is another limbo target.
- return YES;
- } else {
- // We haven't marked this as non-limbo yet, but this target is not a limbo target.
- // Mark the key as non-limbo and make sure it isn't in our set.
- _limboDocuments = _limboDocuments.erase(documentKey);
- return NO;
- }
-}
-
-- (void)addDocumentChange:(FSTDocumentWatchChange *)docChange {
- BOOL relevant = NO;
- BOOL isOnlyLimbo = YES;
-
- for (FSTBoxedTargetID *targetID in docChange.updatedTargetIDs) {
- FSTQueryData *queryData = [self queryDataForActiveTarget:targetID];
- if (queryData) {
- FSTTargetChange *change = [self targetChangeForTargetID:targetID];
- isOnlyLimbo = [self updateLimboDocumentsForKey:docChange.documentKey
- queryData:queryData
- isOnlyLimbo:isOnlyLimbo];
- [change.mapping addDocumentKey:docChange.documentKey];
- relevant = YES;
- }
- }
-
- for (FSTBoxedTargetID *targetID in docChange.removedTargetIDs) {
- FSTQueryData *queryData = [self queryDataForActiveTarget:targetID];
- if (queryData) {
- FSTTargetChange *change = [self targetChangeForTargetID:targetID];
- isOnlyLimbo = [self updateLimboDocumentsForKey:docChange.documentKey
- queryData:queryData
- isOnlyLimbo:isOnlyLimbo];
- [change.mapping removeDocumentKey:docChange.documentKey];
- relevant = YES;
+- (void)handleDocumentChange:(FSTDocumentWatchChange *)documentChange {
+ for (FSTBoxedTargetID *targetID in documentChange.updatedTargetIDs) {
+ if ([documentChange.document isKindOfClass:[FSTDocument class]]) {
+ [self addDocument:documentChange.document toTarget:targetID.intValue];
+ } else if ([documentChange.document isKindOfClass:[FSTDeletedDocument class]]) {
+ [self removeDocument:documentChange.document
+ withKey:documentChange.documentKey
+ fromTarget:targetID.intValue];
}
}
- // Only update the document if there is a new document to replace, this might be just a target
- // update instead.
- if (docChange.document && relevant) {
- _documentUpdates[docChange.documentKey] = docChange.document;
+ for (FSTBoxedTargetID *targetID in documentChange.removedTargetIDs) {
+ [self removeDocument:documentChange.document
+ withKey:documentChange.documentKey
+ fromTarget:targetID.intValue];
}
}
-- (void)addTargetChange:(FSTWatchTargetChange *)targetChange {
- for (FSTBoxedTargetID *targetID in targetChange.targetIDs) {
- FSTTargetChange *change = [self targetChangeForTargetID:targetID];
+- (void)handleTargetChange:(FSTWatchTargetChange *)targetChange {
+ for (FSTBoxedTargetID *boxedTargetID in targetChange.targetIDs) {
+ int targetID = boxedTargetID.intValue;
+ FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID];
switch (targetChange.state) {
case FSTWatchTargetChangeStateNoChange:
if ([self isActiveTarget:targetID]) {
- // Creating the change above satisfies the semantics of no-change.
- change.resumeToken = targetChange.resumeToken;
+ [targetState updateResumeToken:targetChange.resumeToken];
}
break;
case FSTWatchTargetChangeStateAdded:
- [self recordResponseForTargetID:targetID];
- if (!self.pendingTargetResponses[targetID]) {
- // We have a freshly added target, so we need to reset any state that we had previously
- // This can happen e.g. when remove and add back a target for existence filter
- // mismatches.
- change.mapping = nil;
- change.currentStatusUpdate = FSTCurrentStatusUpdateNone;
- [_existenceFilters removeObjectForKey:targetID];
+ // We need to decrement the number of pending acks needed from watch for this targetId.
+ [targetState recordTargetResponse];
+ if (!targetState.isPending) {
+ // We have a freshly added target, so we need to reset any state that we had previously.
+ // This can happen e.g. when remove and add back a target for existence filter mismatches.
+ [targetState clearPendingChanges];
}
- change.resumeToken = targetChange.resumeToken;
+ [targetState updateResumeToken:targetChange.resumeToken];
break;
case FSTWatchTargetChangeStateRemoved:
// We need to keep track of removed targets to we can post-filter and remove any target
// changes.
- [self recordResponseForTargetID:targetID];
- HARD_ASSERT(!targetChange.cause, "WatchChangeAggregator does not handle errored targets.");
+ [targetState recordTargetResponse];
+ if (!targetState.isPending) {
+ [self removeTarget:targetID];
+ }
+ HARD_ASSERT(!targetChange.cause, "WatchChangeAggregator does not handle errored targets");
break;
case FSTWatchTargetChangeStateCurrent:
if ([self isActiveTarget:targetID]) {
- change.currentStatusUpdate = FSTCurrentStatusUpdateMarkCurrent;
- change.resumeToken = targetChange.resumeToken;
+ [targetState markCurrent];
+ [targetState updateResumeToken:targetChange.resumeToken];
}
break;
case FSTWatchTargetChangeStateReset:
if ([self isActiveTarget:targetID]) {
- // Overwrite any existing target mapping with a reset mapping. Every subsequent update
- // will modify the reset mapping, not an update mapping.
- change.mapping = [[FSTResetMapping alloc] init];
- change.resumeToken = targetChange.resumeToken;
+ // Reset the target and synthesizes removes for all existing documents. The backend will
+ // re-add any documents that still match the target before it sends the next global
+ // snapshot.
+ [self resetTarget:targetID];
+ [targetState updateResumeToken:targetChange.resumeToken];
}
break;
default:
- LOG_WARN("Unknown target watch change type: %s", targetChange.state);
+ HARD_FAIL("Unknown target watch change state: %s", targetChange.state);
}
}
}
+- (void)removeTarget:(FSTTargetID)targetID {
+ _targetStates.erase(targetID);
+}
+
+- (void)handleExistenceFilter:(FSTExistenceFilterWatchChange *)existenceFilter {
+ FSTTargetID targetID = existenceFilter.targetID;
+ int expectedCount = existenceFilter.filter.count;
+
+ FSTQueryData *queryData = [self queryDataForActiveTarget:targetID];
+ if (queryData) {
+ FSTQuery *query = queryData.query;
+ if ([query isDocumentQuery]) {
+ if (expectedCount == 0) {
+ // The existence filter told us the document does not exist. We deduce that this document
+ // does not exist and apply a deleted document to our updates. Without applying this deleted
+ // document there might be another query that will raise this document as part of a snapshot
+ // until it is resolved, essentially exposing inconsistency between queries.
+ FSTDocumentKey *key = [FSTDocumentKey keyWithPath:query.path];
+ [self
+ removeDocument:[FSTDeletedDocument documentWithKey:key version:SnapshotVersion::None()]
+ withKey:key
+ fromTarget:targetID];
+ } else {
+ HARD_ASSERT(expectedCount == 1, "Single document existence filter with count: %s",
+ expectedCount);
+ }
+ } else {
+ int currentSize = [self currentDocumentCountForTarget:targetID];
+ if (currentSize != expectedCount) {
+ // Existence filter mismatch: We reset the mapping and raise a new snapshot with
+ // `isFromCache:true`.
+ [self resetTarget:targetID];
+ _pendingTargetResets.insert(targetID);
+ }
+ }
+ }
+}
+
+- (int)currentDocumentCountForTarget:(FSTTargetID)targetID {
+ FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID];
+ FSTTargetChange *targetChange = [targetState toTargetChange];
+ return ([_targetMetadataProvider remoteKeysForTarget:@(targetID)].size() +
+ targetChange.addedDocuments.size() - targetChange.removedDocuments.size());
+}
+
/**
- * Records that we got a watch target add/remove by decrementing the number of pending target
- * responses that we have.
+ * Resets the state of a Watch target to its initial state (e.g. sets 'current' to false, clears the
+ * resume token and removes its target mapping from all documents).
*/
-- (void)recordResponseForTargetID:(FSTBoxedTargetID *)targetID {
- NSNumber *count = self.pendingTargetResponses[targetID];
- int newCount = count ? [count intValue] - 1 : -1;
- if (newCount == 0) {
- [self.pendingTargetResponses removeObjectForKey:targetID];
+- (void)resetTarget:(FSTTargetID)targetID {
+ auto currentTargetState = _targetStates.find(targetID);
+ HARD_ASSERT(currentTargetState != _targetStates.end() && !(currentTargetState->second.isPending),
+ "Should only reset active targets");
+
+ _targetStates[targetID] = [FSTTargetState new];
+
+ // Trigger removal for any documents currently mapped to this target. These removals will be part
+ // of the initial snapshot if Watch does not resend these documents.
+ DocumentKeySet existingKeys = [_targetMetadataProvider remoteKeysForTarget:@(targetID)];
+
+ for (FSTDocumentKey *key : existingKeys) {
+ [self removeDocument:nil withKey:key fromTarget:targetID];
+ }
+}
+
+/**
+ * Adds the provided document to the internal list of document updates and its document key to the
+ * given target's mapping.
+ */
+- (void)addDocument:(FSTMaybeDocument *)document toTarget:(FSTTargetID)targetID {
+ if (![self isActiveTarget:targetID]) {
+ return;
+ }
+
+ FSTDocumentViewChangeType changeType = [self containsDocument:document.key inTarget:targetID]
+ ? FSTDocumentViewChangeTypeModified
+ : FSTDocumentViewChangeTypeAdded;
+
+ FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID];
+ [targetState addDocumentChangeWithType:changeType forKey:document.key];
+
+ _pendingDocumentUpdates[document.key] = document;
+ _pendingDocumentTargetMappings[document.key].insert(targetID);
+}
+
+/**
+ * Removes the provided document from the target mapping. If the document no longer matches the
+ * target, but the document's state is still known (e.g. we know that the document was deleted or we
+ * received the change that caused the filter mismatch), the new document can be provided to update
+ * the remote document cache.
+ */
+- (void)removeDocument:(FSTMaybeDocument *_Nullable)document
+ withKey:(const DocumentKey &)key
+ fromTarget:(FSTTargetID)targetID {
+ if (![self isActiveTarget:targetID]) {
+ return;
+ }
+
+ FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID];
+
+ if ([self containsDocument:key inTarget:targetID]) {
+ [targetState addDocumentChangeWithType:FSTDocumentViewChangeTypeRemoved forKey:key];
} else {
- self.pendingTargetResponses[targetID] = @(newCount);
+ // The document may have entered and left the target before we raised a snapshot, so we can just
+ // ignore the change.
+ [targetState removeDocumentChangeForKey:key];
+ }
+
+ _pendingDocumentTargetMappings[key].erase(targetID);
+
+ if (document) {
+ _pendingDocumentUpdates[key] = document;
+ }
+}
+
+/**
+ * Returns whether the LocalStore considers the document to be part of the specified target.
+ */
+- (BOOL)containsDocument:(FSTDocumentKey *)key inTarget:(FSTTargetID)targetID {
+ const DocumentKeySet &existingKeys = [_targetMetadataProvider remoteKeysForTarget:@(targetID)];
+ return existingKeys.contains(key);
+}
+
+- (FSTTargetState *)ensureTargetStateForTarget:(FSTTargetID)targetID {
+ if (!_targetStates[targetID]) {
+ _targetStates[targetID] = [FSTTargetState new];
}
+
+ return _targetStates[targetID];
}
/**
- * Returns true if the given targetId is active. Active targets are those for which there are no
+ * Returns YES if the given targetId is active. Active targets are those for which there are no
* pending requests to add a listen and are in the current list of targets the client cares about.
*
* Clients can repeatedly listen and stop listening to targets, so this check is useful in
* preventing in preventing race conditions for a target where events arrive but the server hasn't
* yet acknowledged the intended change in state.
*/
-- (BOOL)isActiveTarget:(FSTBoxedTargetID *)targetID {
+- (BOOL)isActiveTarget:(FSTTargetID)targetID {
return [self queryDataForActiveTarget:targetID] != nil;
}
-- (FSTQueryData *_Nullable)queryDataForActiveTarget:(FSTBoxedTargetID *)targetID {
- FSTQueryData *queryData = self.listenTargets[targetID];
- return (queryData && !self.pendingTargetResponses[targetID]) ? queryData : nil;
+- (nullable FSTQueryData *)queryDataForActiveTarget:(FSTTargetID)targetID {
+ auto targetState = _targetStates.find(targetID);
+ return targetState != _targetStates.end() && targetState->second.isPending
+ ? nil
+ : [_targetMetadataProvider queryDataForTarget:@(targetID)];
}
-- (void)addExistenceFilterChange:(FSTExistenceFilterWatchChange *)existenceFilterChange {
- FSTBoxedTargetID *targetID = @(existenceFilterChange.targetID);
- if ([self isActiveTarget:targetID]) {
- _existenceFilters[targetID] = existenceFilterChange.filter;
+- (FSTRemoteEvent *)remoteEventAtSnapshotVersion:(const SnapshotVersion &)snapshotVersion {
+ std::unordered_map<FSTTargetID, FSTTargetChange *> targetChanges;
+
+ for (const auto &entry : _targetStates) {
+ FSTTargetID targetID = entry.first;
+ FSTTargetState *targetState = entry.second;
+
+ FSTQueryData *queryData = [self queryDataForActiveTarget:targetID];
+ if (queryData) {
+ if (targetState.current && [queryData.query isDocumentQuery]) {
+ // Document queries for document that don't exist can produce an empty result set. To update
+ // our local cache, we synthesize a document delete if we have not previously received the
+ // document. This resolves the limbo state of the document, removing it from
+ // limboDocumentRefs.
+ FSTDocumentKey *key = [FSTDocumentKey keyWithPath:queryData.query.path];
+ if (_pendingDocumentUpdates.find(key) == _pendingDocumentUpdates.end() &&
+ ![self containsDocument:key inTarget:targetID]) {
+ [self removeDocument:[FSTDeletedDocument documentWithKey:key version:snapshotVersion]
+ withKey:key
+ fromTarget:targetID];
+ }
+ }
+
+ if (targetState.hasPendingChanges) {
+ targetChanges[targetID] = [targetState toTargetChange];
+ [targetState clearPendingChanges];
+ }
+ }
}
-}
-- (FSTRemoteEvent *)remoteEvent {
- NSMutableDictionary<FSTBoxedTargetID *, FSTTargetChange *> *targetChanges = self.targetChanges;
+ DocumentKeySet resolvedLimboDocuments;
- NSMutableArray *targetsToRemove = [NSMutableArray array];
+ // We extract the set of limbo-only document updates as the GC logic special-cases documents that
+ // do not appear in the query cache.
+ //
+ // TODO(gsoltis): Expand on this comment.
+ for (const auto &entry : _pendingDocumentTargetMappings) {
+ BOOL isOnlyLimboTarget = YES;
+
+ for (FSTTargetID targetID : entry.second) {
+ FSTQueryData *queryData = [self queryDataForActiveTarget:targetID];
+ if (queryData && queryData.purpose != FSTQueryPurposeLimboResolution) {
+ isOnlyLimboTarget = NO;
+ break;
+ }
+ }
- // Apply any inactive targets.
- for (FSTBoxedTargetID *targetID in [targetChanges keyEnumerator]) {
- if (![self isActiveTarget:targetID]) {
- [targetsToRemove addObject:targetID];
+ if (isOnlyLimboTarget) {
+ resolvedLimboDocuments = resolvedLimboDocuments.insert(entry.first);
}
}
- [targetChanges removeObjectsForKeys:targetsToRemove];
+ FSTRemoteEvent *remoteEvent =
+ [[FSTRemoteEvent alloc] initWithSnapshotVersion:snapshotVersion
+ targetChanges:targetChanges
+ targetMismatches:_pendingTargetResets
+ documentUpdates:_pendingDocumentUpdates
+ limboDocuments:resolvedLimboDocuments];
+
+ _pendingDocumentUpdates.clear();
+ _pendingDocumentTargetMappings.clear();
+ _pendingTargetResets.clear();
- // Mark this aggregator as frozen so no further modifications are made.
- self.frozen = YES;
- return [[FSTRemoteEvent alloc] initWithSnapshotVersion:_snapshotVersion
- targetChanges:targetChanges
- documentUpdates:_documentUpdates
- limboDocuments:_limboDocuments];
+ return remoteEvent;
}
+- (void)recordTargetRequest:(FSTBoxedTargetID *)targetID {
+ // For each request we get we need to record we need a response for it.
+ FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID.intValue];
+ [targetState recordTargetRequest];
+}
@end
NS_ASSUME_NONNULL_END