From 7226b4adf38e4732dfb9a840d25f86d3e5533bdb Mon Sep 17 00:00:00 2001 From: zxu Date: Thu, 25 Jan 2018 18:39:46 -0500 Subject: port TargetIdGenerator to iOS (#709) * port TargetIdGenerator to iOS * fix style * move pointer property to instance variable * TriggerTravis --- .../Example/Firestore.xcodeproj/project.pbxproj | 4 - .../Example/Tests/Core/FSTTargetIDGeneratorTests.m | 94 ---- Firestore/Source/Core/FSTSyncEngine.m | 540 -------------------- Firestore/Source/Core/FSTSyncEngine.mm | 542 ++++++++++++++++++++ Firestore/Source/Core/FSTTargetIDGenerator.h | 55 -- Firestore/Source/Core/FSTTargetIDGenerator.m | 105 ---- Firestore/Source/Local/FSTLocalStore.m | 554 -------------------- Firestore/Source/Local/FSTLocalStore.mm | 559 +++++++++++++++++++++ .../firebase/firestore/core/target_id_generator.h | 3 + 9 files changed, 1104 insertions(+), 1352 deletions(-) delete mode 100644 Firestore/Example/Tests/Core/FSTTargetIDGeneratorTests.m delete mode 100644 Firestore/Source/Core/FSTSyncEngine.m create mode 100644 Firestore/Source/Core/FSTSyncEngine.mm delete mode 100644 Firestore/Source/Core/FSTTargetIDGenerator.h delete mode 100644 Firestore/Source/Core/FSTTargetIDGenerator.m delete mode 100644 Firestore/Source/Local/FSTLocalStore.m create mode 100644 Firestore/Source/Local/FSTLocalStore.mm (limited to 'Firestore') diff --git a/Firestore/Example/Firestore.xcodeproj/project.pbxproj b/Firestore/Example/Firestore.xcodeproj/project.pbxproj index 7bc57fc..7c6508a 100644 --- a/Firestore/Example/Firestore.xcodeproj/project.pbxproj +++ b/Firestore/Example/Firestore.xcodeproj/project.pbxproj @@ -111,7 +111,6 @@ DE51B1CE1F0D48CD0013853F /* FSTEventManagerTests.m in Sources */ = {isa = PBXBuildFile; fileRef = DE51B1AA1F0D48AC0013853F /* FSTEventManagerTests.m */; }; DE51B1CF1F0D48CD0013853F /* FSTQueryListenerTests.m in Sources */ = {isa = PBXBuildFile; fileRef = DE51B1AB1F0D48AC0013853F /* FSTQueryListenerTests.m */; }; DE51B1D01F0D48CD0013853F /* FSTQueryTests.m in Sources */ = {isa = PBXBuildFile; fileRef = DE51B1AC1F0D48AC0013853F /* FSTQueryTests.m */; }; - DE51B1D11F0D48CD0013853F /* FSTTargetIDGeneratorTests.m in Sources */ = {isa = PBXBuildFile; fileRef = DE51B1AE1F0D48AC0013853F /* FSTTargetIDGeneratorTests.m */; }; DE51B1D21F0D48CD0013853F /* FSTTimestampTests.m in Sources */ = {isa = PBXBuildFile; fileRef = DE51B1AF1F0D48AC0013853F /* FSTTimestampTests.m */; }; DE51B1D31F0D48CD0013853F /* FSTViewSnapshotTest.m in Sources */ = {isa = PBXBuildFile; fileRef = DE51B1B01F0D48AC0013853F /* FSTViewSnapshotTest.m */; }; DE51B1D41F0D48CD0013853F /* FSTViewTests.m in Sources */ = {isa = PBXBuildFile; fileRef = DE51B1B11F0D48AC0013853F /* FSTViewTests.m */; }; @@ -331,7 +330,6 @@ DE51B1AB1F0D48AC0013853F /* FSTQueryListenerTests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = FSTQueryListenerTests.m; sourceTree = ""; }; DE51B1AC1F0D48AC0013853F /* FSTQueryTests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = FSTQueryTests.m; sourceTree = ""; }; DE51B1AD1F0D48AC0013853F /* FSTSyncEngine+Testing.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = "FSTSyncEngine+Testing.h"; sourceTree = ""; }; - DE51B1AE1F0D48AC0013853F /* FSTTargetIDGeneratorTests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = FSTTargetIDGeneratorTests.m; sourceTree = ""; }; DE51B1AF1F0D48AC0013853F /* FSTTimestampTests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = FSTTimestampTests.m; sourceTree = ""; }; DE51B1B01F0D48AC0013853F /* FSTViewSnapshotTest.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = FSTViewSnapshotTest.m; sourceTree = ""; }; DE51B1B11F0D48AC0013853F /* FSTViewTests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = FSTViewTests.m; sourceTree = ""; }; @@ -718,7 +716,6 @@ DE51B1AA1F0D48AC0013853F /* FSTEventManagerTests.m */, DE51B1AB1F0D48AC0013853F /* FSTQueryListenerTests.m */, DE51B1AC1F0D48AC0013853F /* FSTQueryTests.m */, - DE51B1AE1F0D48AC0013853F /* FSTTargetIDGeneratorTests.m */, DE51B1AF1F0D48AC0013853F /* FSTTimestampTests.m */, DE51B1B01F0D48AC0013853F /* FSTViewSnapshotTest.m */, DE51B1B11F0D48AC0013853F /* FSTViewTests.m */, @@ -1253,7 +1250,6 @@ DE51B1F41F0D491B0013853F /* FSTRemoteEventTests.m in Sources */, 548DB927200D590300E00ABC /* assert_test.cc in Sources */, 54E928241F33953300C1953E /* FSTEventAccumulator.m in Sources */, - DE51B1D11F0D48CD0013853F /* FSTTargetIDGeneratorTests.m in Sources */, 5436F32420008FAD006E51E3 /* string_printf_test.cc in Sources */, DE51B1EF1F0D49140013853F /* FSTDocumentTests.m in Sources */, DE51B1DC1F0D490D0013853F /* FSTLocalSerializerTests.m in Sources */, diff --git a/Firestore/Example/Tests/Core/FSTTargetIDGeneratorTests.m b/Firestore/Example/Tests/Core/FSTTargetIDGeneratorTests.m deleted file mode 100644 index 6f54fd1..0000000 --- a/Firestore/Example/Tests/Core/FSTTargetIDGeneratorTests.m +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2017 Google - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#import "Firestore/Source/Core/FSTTargetIDGenerator.h" - -#import - -NS_ASSUME_NONNULL_BEGIN - -@interface FSTTargetIDGenerator () -- (instancetype)initWithGeneratorID:(NSInteger)generatorID startingAfterID:(FSTTargetID)after; -@end - -@interface FSTTargetIDGeneratorTests : XCTestCase -@end - -@implementation FSTTargetIDGeneratorTests - -- (void)testConstructor { - XCTAssertEqual([[[FSTTargetIDGenerator alloc] initWithGeneratorID:0 startingAfterID:0] nextID], - 2); - XCTAssertEqual([[[FSTTargetIDGenerator alloc] initWithGeneratorID:1 startingAfterID:0] nextID], - 1); - - XCTAssertEqual([[FSTTargetIDGenerator generatorForLocalStoreStartingAfterID:0] nextID], 2); - XCTAssertEqual([[FSTTargetIDGenerator generatorForSyncEngineStartingAfterID:0] nextID], 1); -} - -- (void)testSkipPast { - FSTTargetIDGenerator *gen = - [[FSTTargetIDGenerator alloc] initWithGeneratorID:1 startingAfterID:-1]; - XCTAssertEqual([gen nextID], 1); - - gen = [[FSTTargetIDGenerator alloc] initWithGeneratorID:1 startingAfterID:2]; - XCTAssertEqual([gen nextID], 3); - - gen = [[FSTTargetIDGenerator alloc] initWithGeneratorID:1 startingAfterID:4]; - XCTAssertEqual([gen nextID], 5); - - for (int i = 4; i < 12; ++i) { - FSTTargetIDGenerator *gen0 = - [[FSTTargetIDGenerator alloc] initWithGeneratorID:0 startingAfterID:i]; - FSTTargetIDGenerator *gen1 = - [[FSTTargetIDGenerator alloc] initWithGeneratorID:1 startingAfterID:i]; - XCTAssertEqual([gen0 nextID], i + 2 & ~1, @"Skip failed for index %d", i); - XCTAssertEqual([gen1 nextID], i + 1 | 1, @"Skip failed for index %d", i); - } - - gen = [[FSTTargetIDGenerator alloc] initWithGeneratorID:1 startingAfterID:12]; - XCTAssertEqual([gen nextID], 13); - - gen = [[FSTTargetIDGenerator alloc] initWithGeneratorID:0 startingAfterID:22]; - XCTAssertEqual([gen nextID], 24); -} - -- (void)testIncrement { - FSTTargetIDGenerator *gen = - [[FSTTargetIDGenerator alloc] initWithGeneratorID:0 startingAfterID:0]; - XCTAssertEqual([gen nextID], 2); - XCTAssertEqual([gen nextID], 4); - XCTAssertEqual([gen nextID], 6); - gen = [[FSTTargetIDGenerator alloc] initWithGeneratorID:0 startingAfterID:46]; - XCTAssertEqual([gen nextID], 48); - XCTAssertEqual([gen nextID], 50); - XCTAssertEqual([gen nextID], 52); - XCTAssertEqual([gen nextID], 54); - - gen = [[FSTTargetIDGenerator alloc] initWithGeneratorID:1 startingAfterID:0]; - XCTAssertEqual([gen nextID], 1); - XCTAssertEqual([gen nextID], 3); - XCTAssertEqual([gen nextID], 5); - gen = [[FSTTargetIDGenerator alloc] initWithGeneratorID:1 startingAfterID:46]; - XCTAssertEqual([gen nextID], 47); - XCTAssertEqual([gen nextID], 49); - XCTAssertEqual([gen nextID], 51); - XCTAssertEqual([gen nextID], 53); -} - -@end - -NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Core/FSTSyncEngine.m b/Firestore/Source/Core/FSTSyncEngine.m deleted file mode 100644 index f90c5dd..0000000 --- a/Firestore/Source/Core/FSTSyncEngine.m +++ /dev/null @@ -1,540 +0,0 @@ -/* - * Copyright 2017 Google - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#import "Firestore/Source/Core/FSTSyncEngine.h" - -#import - -#import "FIRFirestoreErrors.h" -#import "Firestore/Source/Auth/FSTUser.h" -#import "Firestore/Source/Core/FSTQuery.h" -#import "Firestore/Source/Core/FSTSnapshotVersion.h" -#import "Firestore/Source/Core/FSTTargetIDGenerator.h" -#import "Firestore/Source/Core/FSTTransaction.h" -#import "Firestore/Source/Core/FSTView.h" -#import "Firestore/Source/Core/FSTViewSnapshot.h" -#import "Firestore/Source/Local/FSTEagerGarbageCollector.h" -#import "Firestore/Source/Local/FSTLocalStore.h" -#import "Firestore/Source/Local/FSTLocalViewChanges.h" -#import "Firestore/Source/Local/FSTLocalWriteResult.h" -#import "Firestore/Source/Local/FSTQueryData.h" -#import "Firestore/Source/Local/FSTReferenceSet.h" -#import "Firestore/Source/Model/FSTDocument.h" -#import "Firestore/Source/Model/FSTDocumentKey.h" -#import "Firestore/Source/Model/FSTDocumentSet.h" -#import "Firestore/Source/Model/FSTMutationBatch.h" -#import "Firestore/Source/Remote/FSTRemoteEvent.h" -#import "Firestore/Source/Util/FSTAssert.h" -#import "Firestore/Source/Util/FSTDispatchQueue.h" -#import "Firestore/Source/Util/FSTLogger.h" - -NS_ASSUME_NONNULL_BEGIN - -// Limbo documents don't use persistence, and are eagerly GC'd. So, listens for them don't need -// real sequence numbers. -static const FSTListenSequenceNumber kIrrelevantSequenceNumber = -1; - -#pragma mark - FSTQueryView - -/** - * FSTQueryView contains all of the info that FSTSyncEngine needs to track for a particular - * query and view. - */ -@interface FSTQueryView : NSObject - -- (instancetype)initWithQuery:(FSTQuery *)query - targetID:(FSTTargetID)targetID - resumeToken:(NSData *)resumeToken - view:(FSTView *)view NS_DESIGNATED_INITIALIZER; - -- (instancetype)init NS_UNAVAILABLE; - -/** The query itself. */ -@property(nonatomic, strong, readonly) FSTQuery *query; - -/** The targetID created by the client that is used in the watch stream to identify this query. */ -@property(nonatomic, assign, readonly) FSTTargetID targetID; - -/** - * An identifier from the datastore backend that indicates the last state of the results that - * was received. This can be used to indicate where to continue receiving new doc changes for the - * query. - */ -@property(nonatomic, copy, readonly) NSData *resumeToken; - -/** - * The view is responsible for computing the final merged truth of what docs are in the query. - * It gets notified of local and remote changes, and applies the query filters and limits to - * determine the most correct possible results. - */ -@property(nonatomic, strong, readonly) FSTView *view; - -@end - -@implementation FSTQueryView - -- (instancetype)initWithQuery:(FSTQuery *)query - targetID:(FSTTargetID)targetID - resumeToken:(NSData *)resumeToken - view:(FSTView *)view { - if (self = [super init]) { - _query = query; - _targetID = targetID; - _resumeToken = resumeToken; - _view = view; - } - return self; -} - -@end - -#pragma mark - FSTSyncEngine - -@interface FSTSyncEngine () - -/** The local store, used to persist mutations and cached documents. */ -@property(nonatomic, strong, readonly) FSTLocalStore *localStore; - -/** The remote store for sending writes, watches, etc. to the backend. */ -@property(nonatomic, strong, readonly) FSTRemoteStore *remoteStore; - -/** FSTQueryViews for all active queries, indexed by query. */ -@property(nonatomic, strong, readonly) - NSMutableDictionary *queryViewsByQuery; - -/** FSTQueryViews for all active queries, indexed by target ID. */ -@property(nonatomic, strong, readonly) - NSMutableDictionary *queryViewsByTarget; - -/** - * When a document is in limbo, we create a special listen to resolve it. This maps the - * FSTDocumentKey of each limbo document to the FSTTargetID of the listen resolving it. - */ -@property(nonatomic, strong, readonly) - NSMutableDictionary *limboTargetsByKey; - -/** The inverse of limboTargetsByKey, a map of FSTTargetID to the key of the limbo doc. */ -@property(nonatomic, strong, readonly) - NSMutableDictionary *limboKeysByTarget; - -/** Used to track any documents that are currently in limbo. */ -@property(nonatomic, strong, readonly) FSTReferenceSet *limboDocumentRefs; - -/** The garbage collector used to collect documents that are no longer in limbo. */ -@property(nonatomic, strong, readonly) FSTEagerGarbageCollector *limboCollector; - -/** Stores user completion blocks, indexed by user and FSTBatchID. */ -@property(nonatomic, strong) - NSMutableDictionary *> - *mutationCompletionBlocks; - -/** Used for creating the FSTTargetIDs for the listens used to resolve limbo documents. */ -@property(nonatomic, strong, readonly) FSTTargetIDGenerator *targetIdGenerator; - -@property(nonatomic, strong) FSTUser *currentUser; - -@end - -@implementation FSTSyncEngine - -- (instancetype)initWithLocalStore:(FSTLocalStore *)localStore - remoteStore:(FSTRemoteStore *)remoteStore - initialUser:(FSTUser *)initialUser { - if (self = [super init]) { - _localStore = localStore; - _remoteStore = remoteStore; - - _queryViewsByQuery = [NSMutableDictionary dictionary]; - _queryViewsByTarget = [NSMutableDictionary dictionary]; - - _limboTargetsByKey = [NSMutableDictionary dictionary]; - _limboKeysByTarget = [NSMutableDictionary dictionary]; - _limboCollector = [[FSTEagerGarbageCollector alloc] init]; - _limboDocumentRefs = [[FSTReferenceSet alloc] init]; - [_limboCollector addGarbageSource:_limboDocumentRefs]; - - _mutationCompletionBlocks = [NSMutableDictionary dictionary]; - _targetIdGenerator = [FSTTargetIDGenerator generatorForSyncEngineStartingAfterID:0]; - _currentUser = initialUser; - } - return self; -} - -- (FSTTargetID)listenToQuery:(FSTQuery *)query { - [self assertDelegateExistsForSelector:_cmd]; - FSTAssert(self.queryViewsByQuery[query] == nil, @"We already listen to query: %@", query); - - FSTQueryData *queryData = [self.localStore allocateQuery:query]; - FSTDocumentDictionary *docs = [self.localStore executeQuery:query]; - FSTDocumentKeySet *remoteKeys = [self.localStore remoteDocumentKeysForTarget:queryData.targetID]; - - FSTView *view = [[FSTView alloc] initWithQuery:query remoteDocuments:remoteKeys]; - FSTViewDocumentChanges *viewDocChanges = [view computeChangesWithDocuments:docs]; - FSTViewChange *viewChange = [view applyChangesToDocuments:viewDocChanges]; - FSTAssert(viewChange.limboChanges.count == 0, - @"View returned limbo docs before target ack from the server."); - - FSTQueryView *queryView = [[FSTQueryView alloc] initWithQuery:query - targetID:queryData.targetID - resumeToken:queryData.resumeToken - view:view]; - self.queryViewsByQuery[query] = queryView; - self.queryViewsByTarget[@(queryData.targetID)] = queryView; - [self.delegate handleViewSnapshots:@[ viewChange.snapshot ]]; - - [self.remoteStore listenToTargetWithQueryData:queryData]; - return queryData.targetID; -} - -- (void)stopListeningToQuery:(FSTQuery *)query { - [self assertDelegateExistsForSelector:_cmd]; - - FSTQueryView *queryView = self.queryViewsByQuery[query]; - FSTAssert(queryView, @"Trying to stop listening to a query not found"); - - [self.localStore releaseQuery:query]; - [self.remoteStore stopListeningToTargetID:queryView.targetID]; - [self removeAndCleanupQuery:queryView]; - [self.localStore collectGarbage]; -} - -- (void)writeMutations:(NSArray *)mutations - completion:(FSTVoidErrorBlock)completion { - [self assertDelegateExistsForSelector:_cmd]; - - FSTLocalWriteResult *result = [self.localStore locallyWriteMutations:mutations]; - [self addMutationCompletionBlock:completion batchID:result.batchID]; - - [self emitNewSnapshotsWithChanges:result.changes remoteEvent:nil]; - [self.remoteStore fillWritePipeline]; -} - -- (void)addMutationCompletionBlock:(FSTVoidErrorBlock)completion batchID:(FSTBatchID)batchID { - NSMutableDictionary *completionBlocks = - self.mutationCompletionBlocks[self.currentUser]; - if (!completionBlocks) { - completionBlocks = [NSMutableDictionary dictionary]; - self.mutationCompletionBlocks[self.currentUser] = completionBlocks; - } - [completionBlocks setObject:completion forKey:@(batchID)]; -} - -/** - * Takes an updateBlock in which a set of reads and writes can be performed atomically. In the - * updateBlock, user code can read and write values using a transaction object. After the - * updateBlock, all changes will be committed. If someone else has changed any of the data - * referenced, then the updateBlock will be called again. If the updateBlock still fails after the - * given number of retries, then the transaction will be rejected. - * - * The transaction object passed to the updateBlock contains methods for accessing documents - * and collections. Unlike other firestore access, data accessed with the transaction will not - * reflect local changes that have not been committed. For this reason, it is required that all - * reads are performed before any writes. Transactions must be performed while online. - */ -- (void)transactionWithRetries:(int)retries - workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue - updateBlock:(FSTTransactionBlock)updateBlock - completion:(FSTVoidIDErrorBlock)completion { - [workerDispatchQueue verifyIsCurrentQueue]; - FSTAssert(retries >= 0, @"Got negative number of retries for transaction"); - FSTTransaction *transaction = [self.remoteStore transaction]; - updateBlock(transaction, ^(id _Nullable result, NSError *_Nullable error) { - [workerDispatchQueue dispatchAsync:^{ - if (error) { - completion(nil, error); - return; - } - [transaction commitWithCompletion:^(NSError *_Nullable transactionError) { - if (!transactionError) { - completion(result, nil); - return; - } - // TODO(b/35201829): Only retry on real transaction failures. - if (retries == 0) { - NSError *wrappedError = - [NSError errorWithDomain:FIRFirestoreErrorDomain - code:FIRFirestoreErrorCodeFailedPrecondition - userInfo:@{ - NSLocalizedDescriptionKey : @"Transaction failed all retries.", - NSUnderlyingErrorKey : transactionError - }]; - completion(nil, wrappedError); - return; - } - [workerDispatchQueue verifyIsCurrentQueue]; - return [self transactionWithRetries:(retries - 1) - workerDispatchQueue:workerDispatchQueue - updateBlock:updateBlock - completion:completion]; - }]; - }]; - }); -} - -- (void)applyRemoteEvent:(FSTRemoteEvent *)remoteEvent { - [self assertDelegateExistsForSelector:_cmd]; - - // Make sure limbo documents are deleted if there were no results - [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^( - FSTBoxedTargetID *_Nonnull targetID, - FSTTargetChange *_Nonnull targetChange, BOOL *_Nonnull stop) { - FSTDocumentKey *limboKey = self.limboKeysByTarget[targetID]; - if (limboKey && targetChange.currentStatusUpdate == FSTCurrentStatusUpdateMarkCurrent && - remoteEvent.documentUpdates[limboKey] == nil) { - // When listening to a query the server responds with a snapshot containing documents - // matching the query and a current marker telling us we're now in sync. It's possible for - // these to arrive as separate remote events or as a single remote event. For a document - // query, there will be no documents sent in the response if the document doesn't exist. - // - // If the snapshot arrives separately from the current marker, we handle it normally and - // updateTrackedLimboDocumentsWithChanges:targetID: will resolve the limbo status of the - // document, removing it from limboDocumentRefs. This works because clients only initiate - // limbo resolution when a target is current and because all current targets are always at a - // consistent snapshot. - // - // However, if the document doesn't exist and the current marker arrives, the document is - // not present in the snapshot and our normal view handling would consider the document to - // remain in limbo indefinitely because there are no updates to the document. To avoid this, - // we specially handle this just this case here: synthesizing a delete. - // - // TODO(dimond): Ideally we would have an explicit lookup query instead resulting in an - // explicit delete message and we could remove this special logic. - [remoteEvent - addDocumentUpdate:[FSTDeletedDocument documentWithKey:limboKey - version:remoteEvent.snapshotVersion]]; - } - }]; - - FSTMaybeDocumentDictionary *changes = [self.localStore applyRemoteEvent:remoteEvent]; - [self emitNewSnapshotsWithChanges:changes remoteEvent:remoteEvent]; -} - -- (void)applyChangedOnlineState:(FSTOnlineState)onlineState { - NSMutableArray *newViewSnapshots = [NSMutableArray array]; - [self.queryViewsByQuery - enumerateKeysAndObjectsUsingBlock:^(FSTQuery *query, FSTQueryView *queryView, BOOL *stop) { - FSTViewChange *viewChange = [queryView.view applyChangedOnlineState:onlineState]; - FSTAssert(viewChange.limboChanges.count == 0, - @"OnlineState should not affect limbo documents."); - if (viewChange.snapshot) { - [newViewSnapshots addObject:viewChange.snapshot]; - } - }]; - - [self.delegate handleViewSnapshots:newViewSnapshots]; -} - -- (void)rejectListenWithTargetID:(FSTBoxedTargetID *)targetID error:(NSError *)error { - [self assertDelegateExistsForSelector:_cmd]; - - FSTDocumentKey *limboKey = self.limboKeysByTarget[targetID]; - if (limboKey) { - // Since this query failed, we won't want to manually unlisten to it. - // So go ahead and remove it from bookkeeping. - [self.limboTargetsByKey removeObjectForKey:limboKey]; - [self.limboKeysByTarget removeObjectForKey:targetID]; - - // TODO(dimond): Retry on transient errors? - - // It's a limbo doc. Create a synthetic event saying it was deleted. This is kind of a hack. - // Ideally, we would have a method in the local store to purge a document. However, it would - // be tricky to keep all of the local store's invariants with another method. - NSMutableDictionary *targetChanges = - [NSMutableDictionary dictionary]; - FSTDeletedDocument *doc = - [FSTDeletedDocument documentWithKey:limboKey version:[FSTSnapshotVersion noVersion]]; - NSMutableDictionary *docUpdate = - [NSMutableDictionary dictionaryWithObject:doc forKey:limboKey]; - FSTRemoteEvent *event = [FSTRemoteEvent eventWithSnapshotVersion:[FSTSnapshotVersion noVersion] - targetChanges:targetChanges - documentUpdates:docUpdate]; - [self applyRemoteEvent:event]; - } else { - FSTQueryView *queryView = self.queryViewsByTarget[targetID]; - FSTAssert(queryView, @"Unknown targetId: %@", targetID); - [self.localStore releaseQuery:queryView.query]; - [self removeAndCleanupQuery:queryView]; - [self.delegate handleError:error forQuery:queryView.query]; - } -} - -- (void)applySuccessfulWriteWithResult:(FSTMutationBatchResult *)batchResult { - [self assertDelegateExistsForSelector:_cmd]; - - // The local store may or may not be able to apply the write result and raise events immediately - // (depending on whether the watcher is caught up), so we raise user callbacks first so that they - // consistently happen before listen events. - [self processUserCallbacksForBatchID:batchResult.batch.batchID error:nil]; - - FSTMaybeDocumentDictionary *changes = [self.localStore acknowledgeBatchWithResult:batchResult]; - [self emitNewSnapshotsWithChanges:changes remoteEvent:nil]; -} - -- (void)rejectFailedWriteWithBatchID:(FSTBatchID)batchID error:(NSError *)error { - [self assertDelegateExistsForSelector:_cmd]; - - // The local store may or may not be able to apply the write result and raise events immediately - // (depending on whether the watcher is caught up), so we raise user callbacks first so that they - // consistently happen before listen events. - [self processUserCallbacksForBatchID:batchID error:error]; - - FSTMaybeDocumentDictionary *changes = [self.localStore rejectBatchID:batchID]; - [self emitNewSnapshotsWithChanges:changes remoteEvent:nil]; -} - -- (void)processUserCallbacksForBatchID:(FSTBatchID)batchID error:(NSError *_Nullable)error { - NSMutableDictionary *completionBlocks = - self.mutationCompletionBlocks[self.currentUser]; - - // NOTE: Mutations restored from persistence won't have completion blocks, so it's okay for - // this (or the completion below) to be nil. - if (completionBlocks) { - NSNumber *boxedBatchID = @(batchID); - FSTVoidErrorBlock completion = completionBlocks[boxedBatchID]; - if (completion) { - completion(error); - [completionBlocks removeObjectForKey:boxedBatchID]; - } - } -} - -- (void)assertDelegateExistsForSelector:(SEL)methodSelector { - FSTAssert(self.delegate, @"Tried to call '%@' before delegate was registered.", - NSStringFromSelector(methodSelector)); -} - -- (void)removeAndCleanupQuery:(FSTQueryView *)queryView { - [self.queryViewsByQuery removeObjectForKey:queryView.query]; - [self.queryViewsByTarget removeObjectForKey:@(queryView.targetID)]; - - [self.limboDocumentRefs removeReferencesForID:queryView.targetID]; - [self garbageCollectLimboDocuments]; -} - -/** - * Computes a new snapshot from the changes and calls the registered callback with the new snapshot. - */ -- (void)emitNewSnapshotsWithChanges:(FSTMaybeDocumentDictionary *)changes - remoteEvent:(FSTRemoteEvent *_Nullable)remoteEvent { - NSMutableArray *newSnapshots = [NSMutableArray array]; - NSMutableArray *documentChangesInAllViews = [NSMutableArray array]; - - [self.queryViewsByQuery - enumerateKeysAndObjectsUsingBlock:^(FSTQuery *query, FSTQueryView *queryView, BOOL *stop) { - FSTView *view = queryView.view; - FSTViewDocumentChanges *viewDocChanges = [view computeChangesWithDocuments:changes]; - if (viewDocChanges.needsRefill) { - // The query has a limit and some docs were removed/updated, so we need to re-run the - // query against the local store to make sure we didn't lose any good docs that had been - // past the limit. - FSTDocumentDictionary *docs = [self.localStore executeQuery:queryView.query]; - viewDocChanges = [view computeChangesWithDocuments:docs previousChanges:viewDocChanges]; - } - FSTTargetChange *_Nullable targetChange = remoteEvent.targetChanges[@(queryView.targetID)]; - FSTViewChange *viewChange = - [queryView.view applyChangesToDocuments:viewDocChanges targetChange:targetChange]; - - [self updateTrackedLimboDocumentsWithChanges:viewChange.limboChanges - targetID:queryView.targetID]; - - if (viewChange.snapshot) { - [newSnapshots addObject:viewChange.snapshot]; - FSTLocalViewChanges *docChanges = - [FSTLocalViewChanges changesForViewSnapshot:viewChange.snapshot]; - [documentChangesInAllViews addObject:docChanges]; - } - }]; - - [self.delegate handleViewSnapshots:newSnapshots]; - [self.localStore notifyLocalViewChanges:documentChangesInAllViews]; - [self.localStore collectGarbage]; -} - -/** Updates the limbo document state for the given targetID. */ -- (void)updateTrackedLimboDocumentsWithChanges:(NSArray *)limboChanges - targetID:(FSTTargetID)targetID { - for (FSTLimboDocumentChange *limboChange in limboChanges) { - switch (limboChange.type) { - case FSTLimboDocumentChangeTypeAdded: - [self.limboDocumentRefs addReferenceToKey:limboChange.key forID:targetID]; - [self trackLimboChange:limboChange]; - break; - - case FSTLimboDocumentChangeTypeRemoved: - FSTLog(@"Document no longer in limbo: %@", limboChange.key); - [self.limboDocumentRefs removeReferenceToKey:limboChange.key forID:targetID]; - break; - - default: - FSTFail(@"Unknown limbo change type: %ld", (long)limboChange.type); - } - } - [self garbageCollectLimboDocuments]; -} - -- (void)trackLimboChange:(FSTLimboDocumentChange *)limboChange { - FSTDocumentKey *key = limboChange.key; - - if (!self.limboTargetsByKey[key]) { - FSTLog(@"New document in limbo: %@", key); - FSTTargetID limboTargetID = [self.targetIdGenerator nextID]; - FSTQuery *query = [FSTQuery queryWithPath:key.path]; - FSTQueryData *queryData = [[FSTQueryData alloc] initWithQuery:query - targetID:limboTargetID - listenSequenceNumber:kIrrelevantSequenceNumber - purpose:FSTQueryPurposeLimboResolution]; - self.limboKeysByTarget[@(limboTargetID)] = key; - [self.remoteStore listenToTargetWithQueryData:queryData]; - self.limboTargetsByKey[key] = @(limboTargetID); - } -} - -/** Garbage collect the limbo documents that we no longer need to track. */ -- (void)garbageCollectLimboDocuments { - NSSet *garbage = [self.limboCollector collectGarbage]; - for (FSTDocumentKey *key in garbage) { - FSTBoxedTargetID *limboTarget = self.limboTargetsByKey[key]; - if (!limboTarget) { - // This target already got removed, because the query failed. - return; - } - FSTTargetID limboTargetID = limboTarget.intValue; - [self.remoteStore stopListeningToTargetID:limboTargetID]; - [self.limboTargetsByKey removeObjectForKey:key]; - [self.limboKeysByTarget removeObjectForKey:limboTarget]; - } -} - -// Used for testing -- (NSDictionary *)currentLimboDocuments { - // Return defensive copy - return [self.limboTargetsByKey copy]; -} - -- (void)userDidChange:(FSTUser *)user { - self.currentUser = user; - - // Notify local store and emit any resulting events from swapping out the mutation queue. - FSTMaybeDocumentDictionary *changes = [self.localStore userDidChange:user]; - [self emitNewSnapshotsWithChanges:changes remoteEvent:nil]; - - // Notify remote store so it can restart its streams. - [self.remoteStore userDidChange:user]; -} - -@end - -NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Core/FSTSyncEngine.mm b/Firestore/Source/Core/FSTSyncEngine.mm new file mode 100644 index 0000000..d82cc99 --- /dev/null +++ b/Firestore/Source/Core/FSTSyncEngine.mm @@ -0,0 +1,542 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "Firestore/Source/Core/FSTSyncEngine.h" + +#import + +#import "FIRFirestoreErrors.h" +#import "Firestore/Source/Auth/FSTUser.h" +#import "Firestore/Source/Core/FSTQuery.h" +#import "Firestore/Source/Core/FSTSnapshotVersion.h" +#import "Firestore/Source/Core/FSTTransaction.h" +#import "Firestore/Source/Core/FSTView.h" +#import "Firestore/Source/Core/FSTViewSnapshot.h" +#import "Firestore/Source/Local/FSTEagerGarbageCollector.h" +#import "Firestore/Source/Local/FSTLocalStore.h" +#import "Firestore/Source/Local/FSTLocalViewChanges.h" +#import "Firestore/Source/Local/FSTLocalWriteResult.h" +#import "Firestore/Source/Local/FSTQueryData.h" +#import "Firestore/Source/Local/FSTReferenceSet.h" +#import "Firestore/Source/Model/FSTDocument.h" +#import "Firestore/Source/Model/FSTDocumentKey.h" +#import "Firestore/Source/Model/FSTDocumentSet.h" +#import "Firestore/Source/Model/FSTMutationBatch.h" +#import "Firestore/Source/Remote/FSTRemoteEvent.h" +#import "Firestore/Source/Util/FSTAssert.h" +#import "Firestore/Source/Util/FSTDispatchQueue.h" +#import "Firestore/Source/Util/FSTLogger.h" + +#include "Firestore/core/src/firebase/firestore/core/target_id_generator.h" + +NS_ASSUME_NONNULL_BEGIN + +// Limbo documents don't use persistence, and are eagerly GC'd. So, listens for them don't need +// real sequence numbers. +static const FSTListenSequenceNumber kIrrelevantSequenceNumber = -1; + +#pragma mark - FSTQueryView + +/** + * FSTQueryView contains all of the info that FSTSyncEngine needs to track for a particular + * query and view. + */ +@interface FSTQueryView : NSObject + +- (instancetype)initWithQuery:(FSTQuery *)query + targetID:(FSTTargetID)targetID + resumeToken:(NSData *)resumeToken + view:(FSTView *)view NS_DESIGNATED_INITIALIZER; + +- (instancetype)init NS_UNAVAILABLE; + +/** The query itself. */ +@property(nonatomic, strong, readonly) FSTQuery *query; + +/** The targetID created by the client that is used in the watch stream to identify this query. */ +@property(nonatomic, assign, readonly) FSTTargetID targetID; + +/** + * An identifier from the datastore backend that indicates the last state of the results that + * was received. This can be used to indicate where to continue receiving new doc changes for the + * query. + */ +@property(nonatomic, copy, readonly) NSData *resumeToken; + +/** + * The view is responsible for computing the final merged truth of what docs are in the query. + * It gets notified of local and remote changes, and applies the query filters and limits to + * determine the most correct possible results. + */ +@property(nonatomic, strong, readonly) FSTView *view; + +@end + +@implementation FSTQueryView + +- (instancetype)initWithQuery:(FSTQuery *)query + targetID:(FSTTargetID)targetID + resumeToken:(NSData *)resumeToken + view:(FSTView *)view { + if (self = [super init]) { + _query = query; + _targetID = targetID; + _resumeToken = resumeToken; + _view = view; + } + return self; +} + +@end + +#pragma mark - FSTSyncEngine + +@interface FSTSyncEngine () + +/** The local store, used to persist mutations and cached documents. */ +@property(nonatomic, strong, readonly) FSTLocalStore *localStore; + +/** The remote store for sending writes, watches, etc. to the backend. */ +@property(nonatomic, strong, readonly) FSTRemoteStore *remoteStore; + +/** FSTQueryViews for all active queries, indexed by query. */ +@property(nonatomic, strong, readonly) + NSMutableDictionary *queryViewsByQuery; + +/** FSTQueryViews for all active queries, indexed by target ID. */ +@property(nonatomic, strong, readonly) + NSMutableDictionary *queryViewsByTarget; + +/** + * When a document is in limbo, we create a special listen to resolve it. This maps the + * FSTDocumentKey of each limbo document to the FSTTargetID of the listen resolving it. + */ +@property(nonatomic, strong, readonly) + NSMutableDictionary *limboTargetsByKey; + +/** The inverse of limboTargetsByKey, a map of FSTTargetID to the key of the limbo doc. */ +@property(nonatomic, strong, readonly) + NSMutableDictionary *limboKeysByTarget; + +/** Used to track any documents that are currently in limbo. */ +@property(nonatomic, strong, readonly) FSTReferenceSet *limboDocumentRefs; + +/** The garbage collector used to collect documents that are no longer in limbo. */ +@property(nonatomic, strong, readonly) FSTEagerGarbageCollector *limboCollector; + +/** Stores user completion blocks, indexed by user and FSTBatchID. */ +@property(nonatomic, strong) + NSMutableDictionary *> + *mutationCompletionBlocks; + +@property(nonatomic, strong) FSTUser *currentUser; + +@end + +@implementation FSTSyncEngine { + /** Used for creating the FSTTargetIDs for the listens used to resolve limbo documents. */ + firebase::firestore::core::TargetIdGenerator _targetIdGenerator; +} + +- (instancetype)initWithLocalStore:(FSTLocalStore *)localStore + remoteStore:(FSTRemoteStore *)remoteStore + initialUser:(FSTUser *)initialUser { + if (self = [super init]) { + _localStore = localStore; + _remoteStore = remoteStore; + + _queryViewsByQuery = [NSMutableDictionary dictionary]; + _queryViewsByTarget = [NSMutableDictionary dictionary]; + + _limboTargetsByKey = [NSMutableDictionary dictionary]; + _limboKeysByTarget = [NSMutableDictionary dictionary]; + _limboCollector = [[FSTEagerGarbageCollector alloc] init]; + _limboDocumentRefs = [[FSTReferenceSet alloc] init]; + [_limboCollector addGarbageSource:_limboDocumentRefs]; + + _mutationCompletionBlocks = [NSMutableDictionary dictionary]; + _targetIdGenerator = + firebase::firestore::core::TargetIdGenerator::SyncEngineTargetIdGenerator(0); + _currentUser = initialUser; + } + return self; +} + +- (FSTTargetID)listenToQuery:(FSTQuery *)query { + [self assertDelegateExistsForSelector:_cmd]; + FSTAssert(self.queryViewsByQuery[query] == nil, @"We already listen to query: %@", query); + + FSTQueryData *queryData = [self.localStore allocateQuery:query]; + FSTDocumentDictionary *docs = [self.localStore executeQuery:query]; + FSTDocumentKeySet *remoteKeys = [self.localStore remoteDocumentKeysForTarget:queryData.targetID]; + + FSTView *view = [[FSTView alloc] initWithQuery:query remoteDocuments:remoteKeys]; + FSTViewDocumentChanges *viewDocChanges = [view computeChangesWithDocuments:docs]; + FSTViewChange *viewChange = [view applyChangesToDocuments:viewDocChanges]; + FSTAssert(viewChange.limboChanges.count == 0, + @"View returned limbo docs before target ack from the server."); + + FSTQueryView *queryView = [[FSTQueryView alloc] initWithQuery:query + targetID:queryData.targetID + resumeToken:queryData.resumeToken + view:view]; + self.queryViewsByQuery[query] = queryView; + self.queryViewsByTarget[@(queryData.targetID)] = queryView; + [self.delegate handleViewSnapshots:@[ viewChange.snapshot ]]; + + [self.remoteStore listenToTargetWithQueryData:queryData]; + return queryData.targetID; +} + +- (void)stopListeningToQuery:(FSTQuery *)query { + [self assertDelegateExistsForSelector:_cmd]; + + FSTQueryView *queryView = self.queryViewsByQuery[query]; + FSTAssert(queryView, @"Trying to stop listening to a query not found"); + + [self.localStore releaseQuery:query]; + [self.remoteStore stopListeningToTargetID:queryView.targetID]; + [self removeAndCleanupQuery:queryView]; + [self.localStore collectGarbage]; +} + +- (void)writeMutations:(NSArray *)mutations + completion:(FSTVoidErrorBlock)completion { + [self assertDelegateExistsForSelector:_cmd]; + + FSTLocalWriteResult *result = [self.localStore locallyWriteMutations:mutations]; + [self addMutationCompletionBlock:completion batchID:result.batchID]; + + [self emitNewSnapshotsWithChanges:result.changes remoteEvent:nil]; + [self.remoteStore fillWritePipeline]; +} + +- (void)addMutationCompletionBlock:(FSTVoidErrorBlock)completion batchID:(FSTBatchID)batchID { + NSMutableDictionary *completionBlocks = + self.mutationCompletionBlocks[self.currentUser]; + if (!completionBlocks) { + completionBlocks = [NSMutableDictionary dictionary]; + self.mutationCompletionBlocks[self.currentUser] = completionBlocks; + } + [completionBlocks setObject:completion forKey:@(batchID)]; +} + +/** + * Takes an updateBlock in which a set of reads and writes can be performed atomically. In the + * updateBlock, user code can read and write values using a transaction object. After the + * updateBlock, all changes will be committed. If someone else has changed any of the data + * referenced, then the updateBlock will be called again. If the updateBlock still fails after the + * given number of retries, then the transaction will be rejected. + * + * The transaction object passed to the updateBlock contains methods for accessing documents + * and collections. Unlike other firestore access, data accessed with the transaction will not + * reflect local changes that have not been committed. For this reason, it is required that all + * reads are performed before any writes. Transactions must be performed while online. + */ +- (void)transactionWithRetries:(int)retries + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + updateBlock:(FSTTransactionBlock)updateBlock + completion:(FSTVoidIDErrorBlock)completion { + [workerDispatchQueue verifyIsCurrentQueue]; + FSTAssert(retries >= 0, @"Got negative number of retries for transaction"); + FSTTransaction *transaction = [self.remoteStore transaction]; + updateBlock(transaction, ^(id _Nullable result, NSError *_Nullable error) { + [workerDispatchQueue dispatchAsync:^{ + if (error) { + completion(nil, error); + return; + } + [transaction commitWithCompletion:^(NSError *_Nullable transactionError) { + if (!transactionError) { + completion(result, nil); + return; + } + // TODO(b/35201829): Only retry on real transaction failures. + if (retries == 0) { + NSError *wrappedError = + [NSError errorWithDomain:FIRFirestoreErrorDomain + code:FIRFirestoreErrorCodeFailedPrecondition + userInfo:@{ + NSLocalizedDescriptionKey : @"Transaction failed all retries.", + NSUnderlyingErrorKey : transactionError + }]; + completion(nil, wrappedError); + return; + } + [workerDispatchQueue verifyIsCurrentQueue]; + return [self transactionWithRetries:(retries - 1) + workerDispatchQueue:workerDispatchQueue + updateBlock:updateBlock + completion:completion]; + }]; + }]; + }); +} + +- (void)applyRemoteEvent:(FSTRemoteEvent *)remoteEvent { + [self assertDelegateExistsForSelector:_cmd]; + + // Make sure limbo documents are deleted if there were no results + [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^( + FSTBoxedTargetID *_Nonnull targetID, + FSTTargetChange *_Nonnull targetChange, BOOL *_Nonnull stop) { + FSTDocumentKey *limboKey = self.limboKeysByTarget[targetID]; + if (limboKey && targetChange.currentStatusUpdate == FSTCurrentStatusUpdateMarkCurrent && + remoteEvent.documentUpdates[limboKey] == nil) { + // When listening to a query the server responds with a snapshot containing documents + // matching the query and a current marker telling us we're now in sync. It's possible for + // these to arrive as separate remote events or as a single remote event. For a document + // query, there will be no documents sent in the response if the document doesn't exist. + // + // If the snapshot arrives separately from the current marker, we handle it normally and + // updateTrackedLimboDocumentsWithChanges:targetID: will resolve the limbo status of the + // document, removing it from limboDocumentRefs. This works because clients only initiate + // limbo resolution when a target is current and because all current targets are always at a + // consistent snapshot. + // + // However, if the document doesn't exist and the current marker arrives, the document is + // not present in the snapshot and our normal view handling would consider the document to + // remain in limbo indefinitely because there are no updates to the document. To avoid this, + // we specially handle this just this case here: synthesizing a delete. + // + // TODO(dimond): Ideally we would have an explicit lookup query instead resulting in an + // explicit delete message and we could remove this special logic. + [remoteEvent + addDocumentUpdate:[FSTDeletedDocument documentWithKey:limboKey + version:remoteEvent.snapshotVersion]]; + } + }]; + + FSTMaybeDocumentDictionary *changes = [self.localStore applyRemoteEvent:remoteEvent]; + [self emitNewSnapshotsWithChanges:changes remoteEvent:remoteEvent]; +} + +- (void)applyChangedOnlineState:(FSTOnlineState)onlineState { + NSMutableArray *newViewSnapshots = [NSMutableArray array]; + [self.queryViewsByQuery + enumerateKeysAndObjectsUsingBlock:^(FSTQuery *query, FSTQueryView *queryView, BOOL *stop) { + FSTViewChange *viewChange = [queryView.view applyChangedOnlineState:onlineState]; + FSTAssert(viewChange.limboChanges.count == 0, + @"OnlineState should not affect limbo documents."); + if (viewChange.snapshot) { + [newViewSnapshots addObject:viewChange.snapshot]; + } + }]; + + [self.delegate handleViewSnapshots:newViewSnapshots]; +} + +- (void)rejectListenWithTargetID:(FSTBoxedTargetID *)targetID error:(NSError *)error { + [self assertDelegateExistsForSelector:_cmd]; + + FSTDocumentKey *limboKey = self.limboKeysByTarget[targetID]; + if (limboKey) { + // Since this query failed, we won't want to manually unlisten to it. + // So go ahead and remove it from bookkeeping. + [self.limboTargetsByKey removeObjectForKey:limboKey]; + [self.limboKeysByTarget removeObjectForKey:targetID]; + + // TODO(dimond): Retry on transient errors? + + // It's a limbo doc. Create a synthetic event saying it was deleted. This is kind of a hack. + // Ideally, we would have a method in the local store to purge a document. However, it would + // be tricky to keep all of the local store's invariants with another method. + NSMutableDictionary *targetChanges = + [NSMutableDictionary dictionary]; + FSTDeletedDocument *doc = + [FSTDeletedDocument documentWithKey:limboKey version:[FSTSnapshotVersion noVersion]]; + NSMutableDictionary *docUpdate = + [NSMutableDictionary dictionaryWithObject:doc forKey:limboKey]; + FSTRemoteEvent *event = [FSTRemoteEvent eventWithSnapshotVersion:[FSTSnapshotVersion noVersion] + targetChanges:targetChanges + documentUpdates:docUpdate]; + [self applyRemoteEvent:event]; + } else { + FSTQueryView *queryView = self.queryViewsByTarget[targetID]; + FSTAssert(queryView, @"Unknown targetId: %@", targetID); + [self.localStore releaseQuery:queryView.query]; + [self removeAndCleanupQuery:queryView]; + [self.delegate handleError:error forQuery:queryView.query]; + } +} + +- (void)applySuccessfulWriteWithResult:(FSTMutationBatchResult *)batchResult { + [self assertDelegateExistsForSelector:_cmd]; + + // The local store may or may not be able to apply the write result and raise events immediately + // (depending on whether the watcher is caught up), so we raise user callbacks first so that they + // consistently happen before listen events. + [self processUserCallbacksForBatchID:batchResult.batch.batchID error:nil]; + + FSTMaybeDocumentDictionary *changes = [self.localStore acknowledgeBatchWithResult:batchResult]; + [self emitNewSnapshotsWithChanges:changes remoteEvent:nil]; +} + +- (void)rejectFailedWriteWithBatchID:(FSTBatchID)batchID error:(NSError *)error { + [self assertDelegateExistsForSelector:_cmd]; + + // The local store may or may not be able to apply the write result and raise events immediately + // (depending on whether the watcher is caught up), so we raise user callbacks first so that they + // consistently happen before listen events. + [self processUserCallbacksForBatchID:batchID error:error]; + + FSTMaybeDocumentDictionary *changes = [self.localStore rejectBatchID:batchID]; + [self emitNewSnapshotsWithChanges:changes remoteEvent:nil]; +} + +- (void)processUserCallbacksForBatchID:(FSTBatchID)batchID error:(NSError *_Nullable)error { + NSMutableDictionary *completionBlocks = + self.mutationCompletionBlocks[self.currentUser]; + + // NOTE: Mutations restored from persistence won't have completion blocks, so it's okay for + // this (or the completion below) to be nil. + if (completionBlocks) { + NSNumber *boxedBatchID = @(batchID); + FSTVoidErrorBlock completion = completionBlocks[boxedBatchID]; + if (completion) { + completion(error); + [completionBlocks removeObjectForKey:boxedBatchID]; + } + } +} + +- (void)assertDelegateExistsForSelector:(SEL)methodSelector { + FSTAssert(self.delegate, @"Tried to call '%@' before delegate was registered.", + NSStringFromSelector(methodSelector)); +} + +- (void)removeAndCleanupQuery:(FSTQueryView *)queryView { + [self.queryViewsByQuery removeObjectForKey:queryView.query]; + [self.queryViewsByTarget removeObjectForKey:@(queryView.targetID)]; + + [self.limboDocumentRefs removeReferencesForID:queryView.targetID]; + [self garbageCollectLimboDocuments]; +} + +/** + * Computes a new snapshot from the changes and calls the registered callback with the new snapshot. + */ +- (void)emitNewSnapshotsWithChanges:(FSTMaybeDocumentDictionary *)changes + remoteEvent:(FSTRemoteEvent *_Nullable)remoteEvent { + NSMutableArray *newSnapshots = [NSMutableArray array]; + NSMutableArray *documentChangesInAllViews = [NSMutableArray array]; + + [self.queryViewsByQuery + enumerateKeysAndObjectsUsingBlock:^(FSTQuery *query, FSTQueryView *queryView, BOOL *stop) { + FSTView *view = queryView.view; + FSTViewDocumentChanges *viewDocChanges = [view computeChangesWithDocuments:changes]; + if (viewDocChanges.needsRefill) { + // The query has a limit and some docs were removed/updated, so we need to re-run the + // query against the local store to make sure we didn't lose any good docs that had been + // past the limit. + FSTDocumentDictionary *docs = [self.localStore executeQuery:queryView.query]; + viewDocChanges = [view computeChangesWithDocuments:docs previousChanges:viewDocChanges]; + } + FSTTargetChange *_Nullable targetChange = remoteEvent.targetChanges[@(queryView.targetID)]; + FSTViewChange *viewChange = + [queryView.view applyChangesToDocuments:viewDocChanges targetChange:targetChange]; + + [self updateTrackedLimboDocumentsWithChanges:viewChange.limboChanges + targetID:queryView.targetID]; + + if (viewChange.snapshot) { + [newSnapshots addObject:viewChange.snapshot]; + FSTLocalViewChanges *docChanges = + [FSTLocalViewChanges changesForViewSnapshot:viewChange.snapshot]; + [documentChangesInAllViews addObject:docChanges]; + } + }]; + + [self.delegate handleViewSnapshots:newSnapshots]; + [self.localStore notifyLocalViewChanges:documentChangesInAllViews]; + [self.localStore collectGarbage]; +} + +/** Updates the limbo document state for the given targetID. */ +- (void)updateTrackedLimboDocumentsWithChanges:(NSArray *)limboChanges + targetID:(FSTTargetID)targetID { + for (FSTLimboDocumentChange *limboChange in limboChanges) { + switch (limboChange.type) { + case FSTLimboDocumentChangeTypeAdded: + [self.limboDocumentRefs addReferenceToKey:limboChange.key forID:targetID]; + [self trackLimboChange:limboChange]; + break; + + case FSTLimboDocumentChangeTypeRemoved: + FSTLog(@"Document no longer in limbo: %@", limboChange.key); + [self.limboDocumentRefs removeReferenceToKey:limboChange.key forID:targetID]; + break; + + default: + FSTFail(@"Unknown limbo change type: %ld", (long)limboChange.type); + } + } + [self garbageCollectLimboDocuments]; +} + +- (void)trackLimboChange:(FSTLimboDocumentChange *)limboChange { + FSTDocumentKey *key = limboChange.key; + + if (!self.limboTargetsByKey[key]) { + FSTLog(@"New document in limbo: %@", key); + FSTTargetID limboTargetID = _targetIdGenerator.NextId(); + FSTQuery *query = [FSTQuery queryWithPath:key.path]; + FSTQueryData *queryData = [[FSTQueryData alloc] initWithQuery:query + targetID:limboTargetID + listenSequenceNumber:kIrrelevantSequenceNumber + purpose:FSTQueryPurposeLimboResolution]; + self.limboKeysByTarget[@(limboTargetID)] = key; + [self.remoteStore listenToTargetWithQueryData:queryData]; + self.limboTargetsByKey[key] = @(limboTargetID); + } +} + +/** Garbage collect the limbo documents that we no longer need to track. */ +- (void)garbageCollectLimboDocuments { + NSSet *garbage = [self.limboCollector collectGarbage]; + for (FSTDocumentKey *key in garbage) { + FSTBoxedTargetID *limboTarget = self.limboTargetsByKey[key]; + if (!limboTarget) { + // This target already got removed, because the query failed. + return; + } + FSTTargetID limboTargetID = limboTarget.intValue; + [self.remoteStore stopListeningToTargetID:limboTargetID]; + [self.limboTargetsByKey removeObjectForKey:key]; + [self.limboKeysByTarget removeObjectForKey:limboTarget]; + } +} + +// Used for testing +- (NSDictionary *)currentLimboDocuments { + // Return defensive copy + return [self.limboTargetsByKey copy]; +} + +- (void)userDidChange:(FSTUser *)user { + self.currentUser = user; + + // Notify local store and emit any resulting events from swapping out the mutation queue. + FSTMaybeDocumentDictionary *changes = [self.localStore userDidChange:user]; + [self emitNewSnapshotsWithChanges:changes remoteEvent:nil]; + + // Notify remote store so it can restart its streams. + [self.remoteStore userDidChange:user]; +} + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Core/FSTTargetIDGenerator.h b/Firestore/Source/Core/FSTTargetIDGenerator.h deleted file mode 100644 index 0b230ae..0000000 --- a/Firestore/Source/Core/FSTTargetIDGenerator.h +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2017 Google - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#import - -#import "Firestore/Source/Core/FSTTypes.h" - -NS_ASSUME_NONNULL_BEGIN - -/** - * FSTTargetIDGenerator generates monotonically increasing integer IDs. There are separate - * generators for different scopes. While these generators will operate independently of each - * other, they are scoped, such that no two generators will ever produce the same ID. This is - * useful, because sometimes the backend may group IDs from separate parts of the client into the - * same ID space. - */ -@interface FSTTargetIDGenerator : NSObject - -/** - * Creates and returns the FSTTargetIDGenerator for the local store. - * - * @param after An ID to start at. Every call to nextID will return an ID > @a after. - * @return A shared instance of FSTTargetIDGenerator. - */ -+ (instancetype)generatorForLocalStoreStartingAfterID:(FSTTargetID)after; - -/** - * Creates and returns the FSTTargetIDGenerator for the sync engine. - * - * @param after An ID to start at. Every call to nextID will return an ID > @a after. - * @return A shared instance of FSTTargetIDGenerator. - */ -+ (instancetype)generatorForSyncEngineStartingAfterID:(FSTTargetID)after; - -- (id)init __attribute__((unavailable("Use a static constructor method."))); - -/** Returns the next ID in the sequence. */ -- (FSTTargetID)nextID; - -@end - -NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Core/FSTTargetIDGenerator.m b/Firestore/Source/Core/FSTTargetIDGenerator.m deleted file mode 100644 index 58092ec..0000000 --- a/Firestore/Source/Core/FSTTargetIDGenerator.m +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright 2017 Google - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#import "Firestore/Source/Core/FSTTargetIDGenerator.h" - -#import - -NS_ASSUME_NONNULL_BEGIN - -#pragma mark - FSTTargetIDGenerator - -static const int kReservedBits = 1; - -/** FSTTargetIDGeneratorID is the set of all valid generators. */ -typedef NS_ENUM(NSInteger, FSTTargetIDGeneratorID) { - FSTTargetIDGeneratorIDLocalStore = 0, - FSTTargetIDGeneratorIDSyncEngine = 1 -}; - -@interface FSTTargetIDGenerator () { - // This is volatile so it can be used with OSAtomicAdd32. - volatile FSTTargetID _previousID; -} - -/** - * Initializes the generator. - * - * @param generatorID A unique ID indicating which generator this is. - * @param after Every call to nextID will return a number > @a after. - */ -- (instancetype)initWithGeneratorID:(FSTTargetIDGeneratorID)generatorID - startingAfterID:(FSTTargetID)after NS_DESIGNATED_INITIALIZER; - -// This is typed as FSTTargetID because we need to do bitwise operations with them together. -@property(nonatomic, assign) FSTTargetID generatorID; -@end - -@implementation FSTTargetIDGenerator - -#pragma mark - Constructors - -- (instancetype)initWithGeneratorID:(FSTTargetIDGeneratorID)generatorID - startingAfterID:(FSTTargetID)after { - self = [super init]; - if (self) { - _generatorID = generatorID; - - // Replace the generator part of |after| with this generator's ID. - FSTTargetID afterWithoutGenerator = (after >> kReservedBits) << kReservedBits; - FSTTargetID afterGenerator = after - afterWithoutGenerator; - if (afterGenerator >= _generatorID) { - // For example, if: - // self.generatorID = 0b0000 - // after = 0b1011 - // afterGenerator = 0b0001 - // Then: - // previous = 0b1010 - // next = 0b1100 - _previousID = afterWithoutGenerator | self.generatorID; - } else { - // For example, if: - // self.generatorID = 0b0001 - // after = 0b1010 - // afterGenerator = 0b0000 - // Then: - // previous = 0b1001 - // next = 0b1011 - _previousID = (afterWithoutGenerator | self.generatorID) - (1 << kReservedBits); - } - } - return self; -} - -+ (instancetype)generatorForLocalStoreStartingAfterID:(FSTTargetID)after { - return [[FSTTargetIDGenerator alloc] initWithGeneratorID:FSTTargetIDGeneratorIDLocalStore - startingAfterID:after]; -} - -+ (instancetype)generatorForSyncEngineStartingAfterID:(FSTTargetID)after { - return [[FSTTargetIDGenerator alloc] initWithGeneratorID:FSTTargetIDGeneratorIDSyncEngine - startingAfterID:after]; -} - -#pragma mark - Public methods - -- (FSTTargetID)nextID { - return OSAtomicAdd32(1 << kReservedBits, &_previousID); -} - -@end - -NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Local/FSTLocalStore.m b/Firestore/Source/Local/FSTLocalStore.m deleted file mode 100644 index 3a5b0b4..0000000 --- a/Firestore/Source/Local/FSTLocalStore.m +++ /dev/null @@ -1,554 +0,0 @@ -/* - * Copyright 2017 Google - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#import "Firestore/Source/Local/FSTLocalStore.h" - -#import "Firestore/Source/Auth/FSTUser.h" -#import "Firestore/Source/Core/FSTListenSequence.h" -#import "Firestore/Source/Core/FSTQuery.h" -#import "Firestore/Source/Core/FSTSnapshotVersion.h" -#import "Firestore/Source/Core/FSTTargetIDGenerator.h" -#import "Firestore/Source/Core/FSTTimestamp.h" -#import "Firestore/Source/Local/FSTGarbageCollector.h" -#import "Firestore/Source/Local/FSTLocalDocumentsView.h" -#import "Firestore/Source/Local/FSTLocalViewChanges.h" -#import "Firestore/Source/Local/FSTLocalWriteResult.h" -#import "Firestore/Source/Local/FSTMutationQueue.h" -#import "Firestore/Source/Local/FSTPersistence.h" -#import "Firestore/Source/Local/FSTQueryCache.h" -#import "Firestore/Source/Local/FSTQueryData.h" -#import "Firestore/Source/Local/FSTReferenceSet.h" -#import "Firestore/Source/Local/FSTRemoteDocumentCache.h" -#import "Firestore/Source/Local/FSTRemoteDocumentChangeBuffer.h" -#import "Firestore/Source/Model/FSTDocument.h" -#import "Firestore/Source/Model/FSTDocumentDictionary.h" -#import "Firestore/Source/Model/FSTDocumentKey.h" -#import "Firestore/Source/Model/FSTMutation.h" -#import "Firestore/Source/Model/FSTMutationBatch.h" -#import "Firestore/Source/Remote/FSTRemoteEvent.h" -#import "Firestore/Source/Util/FSTAssert.h" -#import "Firestore/Source/Util/FSTLogger.h" - -NS_ASSUME_NONNULL_BEGIN - -@interface FSTLocalStore () - -/** Manages our in-memory or durable persistence. */ -@property(nonatomic, strong, readonly) id persistence; - -/** The set of all mutations that have been sent but not yet been applied to the backend. */ -@property(nonatomic, strong) id mutationQueue; - -/** The set of all cached remote documents. */ -@property(nonatomic, strong) id remoteDocumentCache; - -/** The "local" view of all documents (layering mutationQueue on top of remoteDocumentCache). */ -@property(nonatomic, strong) FSTLocalDocumentsView *localDocuments; - -/** The set of document references maintained by any local views. */ -@property(nonatomic, strong) FSTReferenceSet *localViewReferences; - -/** - * The garbage collector collects documents that should no longer be cached (e.g. if they are no - * longer retained by the above reference sets and the garbage collector is performing eager - * collection). - */ -@property(nonatomic, strong) id garbageCollector; - -/** Maps a query to the data about that query. */ -@property(nonatomic, strong) id queryCache; - -/** Maps a targetID to data about its query. */ -@property(nonatomic, strong) NSMutableDictionary *targetIDs; - -/** Used to generate targetIDs for queries tracked locally. */ -@property(nonatomic, strong) FSTTargetIDGenerator *targetIDGenerator; - -@property(nonatomic, strong) FSTListenSequence *listenSequence; - -/** - * A heldBatchResult is a mutation batch result (from a write acknowledgement) that arrived before - * the watch stream got notified of a snapshot that includes the write.  So we "hold" it until - * the watch stream catches up. It ensures that the local write remains visible (latency - * compensation) and doesn't temporarily appear reverted because the watch stream is slower than - * the write stream and so wasn't reflecting it. - * - * NOTE: Eventually we want to move this functionality into the remote store. - */ -@property(nonatomic, strong) NSMutableArray *heldBatchResults; - -@end - -@implementation FSTLocalStore - -- (instancetype)initWithPersistence:(id)persistence - garbageCollector:(id)garbageCollector - initialUser:(FSTUser *)initialUser { - if (self = [super init]) { - _persistence = persistence; - _mutationQueue = [persistence mutationQueueForUser:initialUser]; - _remoteDocumentCache = [persistence remoteDocumentCache]; - _queryCache = [persistence queryCache]; - _localDocuments = [FSTLocalDocumentsView viewWithRemoteDocumentCache:_remoteDocumentCache - mutationQueue:_mutationQueue]; - _localViewReferences = [[FSTReferenceSet alloc] init]; - - _garbageCollector = garbageCollector; - [_garbageCollector addGarbageSource:_queryCache]; - [_garbageCollector addGarbageSource:_localViewReferences]; - [_garbageCollector addGarbageSource:_mutationQueue]; - - _targetIDs = [NSMutableDictionary dictionary]; - _heldBatchResults = [NSMutableArray array]; - } - return self; -} - -- (void)start { - [self startMutationQueue]; - [self startQueryCache]; -} - -- (void)startMutationQueue { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Start MutationQueue"]; - [self.mutationQueue startWithGroup:group]; - - // If we have any leftover mutation batch results from a prior run, just drop them. - // TODO(http://b/33446471): We probably need to repopulate heldBatchResults or similar instead, - // but that is not straightforward since we're not persisting the write ack versions. - [self.heldBatchResults removeAllObjects]; - - // TODO(mikelehen): This is the only usage of getAllMutationBatchesThroughBatchId:. Consider - // removing it in favor of a getAcknowledgedBatches method. - FSTBatchID highestAck = [self.mutationQueue highestAcknowledgedBatchID]; - if (highestAck != kFSTBatchIDUnknown) { - NSArray *batches = - [self.mutationQueue allMutationBatchesThroughBatchID:highestAck]; - if (batches.count > 0) { - // NOTE: This could be more efficient if we had a removeBatchesThroughBatchID, but this set - // should be very small and this code should go away eventually. - [self.mutationQueue removeMutationBatches:batches group:group]; - } - } - [self.persistence commitGroup:group]; -} - -- (void)startQueryCache { - [self.queryCache start]; - - FSTTargetID targetID = [self.queryCache highestTargetID]; - self.targetIDGenerator = [FSTTargetIDGenerator generatorForLocalStoreStartingAfterID:targetID]; - FSTListenSequenceNumber sequenceNumber = [self.queryCache highestListenSequenceNumber]; - self.listenSequence = [[FSTListenSequence alloc] initStartingAfter:sequenceNumber]; -} - -- (void)shutdown { - [self.mutationQueue shutdown]; - [self.remoteDocumentCache shutdown]; - [self.queryCache shutdown]; -} - -- (FSTMaybeDocumentDictionary *)userDidChange:(FSTUser *)user { - // Swap out the mutation queue, grabbing the pending mutation batches before and after. - NSArray *oldBatches = [self.mutationQueue allMutationBatches]; - - [self.mutationQueue shutdown]; - [self.garbageCollector removeGarbageSource:self.mutationQueue]; - - self.mutationQueue = [self.persistence mutationQueueForUser:user]; - [self.garbageCollector addGarbageSource:self.mutationQueue]; - - [self startMutationQueue]; - - NSArray *newBatches = [self.mutationQueue allMutationBatches]; - - // Recreate our LocalDocumentsView using the new MutationQueue. - self.localDocuments = [FSTLocalDocumentsView viewWithRemoteDocumentCache:self.remoteDocumentCache - mutationQueue:self.mutationQueue]; - - // Union the old/new changed keys. - FSTDocumentKeySet *changedKeys = [FSTDocumentKeySet keySet]; - for (NSArray *batches in @[ oldBatches, newBatches ]) { - for (FSTMutationBatch *batch in batches) { - for (FSTMutation *mutation in batch.mutations) { - changedKeys = [changedKeys setByAddingObject:mutation.key]; - } - } - } - - // Return the set of all (potentially) changed documents as the result of the user change. - return [self.localDocuments documentsForKeys:changedKeys]; -} - -- (FSTLocalWriteResult *)locallyWriteMutations:(NSArray *)mutations { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Locally write mutations"]; - FSTTimestamp *localWriteTime = [FSTTimestamp timestamp]; - FSTMutationBatch *batch = [self.mutationQueue addMutationBatchWithWriteTime:localWriteTime - mutations:mutations - group:group]; - [self.persistence commitGroup:group]; - - FSTDocumentKeySet *keys = [batch keys]; - FSTMaybeDocumentDictionary *changedDocuments = [self.localDocuments documentsForKeys:keys]; - return [FSTLocalWriteResult resultForBatchID:batch.batchID changes:changedDocuments]; -} - -- (FSTMaybeDocumentDictionary *)acknowledgeBatchWithResult:(FSTMutationBatchResult *)batchResult { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Acknowledge batch"]; - id mutationQueue = self.mutationQueue; - - [mutationQueue acknowledgeBatch:batchResult.batch - streamToken:batchResult.streamToken - group:group]; - - FSTDocumentKeySet *affected; - if ([self shouldHoldBatchResultWithVersion:batchResult.commitVersion]) { - [self.heldBatchResults addObject:batchResult]; - affected = [FSTDocumentKeySet keySet]; - } else { - FSTRemoteDocumentChangeBuffer *remoteDocuments = - [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache]; - - affected = - [self releaseBatchResults:@[ batchResult ] group:group remoteDocuments:remoteDocuments]; - - [remoteDocuments applyToWriteGroup:group]; - } - - [self.persistence commitGroup:group]; - [self.mutationQueue performConsistencyCheck]; - - return [self.localDocuments documentsForKeys:affected]; -} - -- (FSTMaybeDocumentDictionary *)rejectBatchID:(FSTBatchID)batchID { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Reject batch"]; - - FSTMutationBatch *toReject = [self.mutationQueue lookupMutationBatch:batchID]; - FSTAssert(toReject, @"Attempt to reject nonexistent batch!"); - - FSTBatchID lastAcked = [self.mutationQueue highestAcknowledgedBatchID]; - FSTAssert(batchID > lastAcked, @"Acknowledged batches can't be rejected."); - - FSTDocumentKeySet *affected = [self removeMutationBatch:toReject group:group]; - - [self.persistence commitGroup:group]; - [self.mutationQueue performConsistencyCheck]; - - return [self.localDocuments documentsForKeys:affected]; -} - -- (nullable NSData *)lastStreamToken { - return [self.mutationQueue lastStreamToken]; -} - -- (void)setLastStreamToken:(nullable NSData *)streamToken { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Set stream token"]; - - [self.mutationQueue setLastStreamToken:streamToken group:group]; - [self.persistence commitGroup:group]; -} - -- (FSTSnapshotVersion *)lastRemoteSnapshotVersion { - return [self.queryCache lastRemoteSnapshotVersion]; -} - -- (FSTMaybeDocumentDictionary *)applyRemoteEvent:(FSTRemoteEvent *)remoteEvent { - id queryCache = self.queryCache; - - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Apply remote event"]; - FSTRemoteDocumentChangeBuffer *remoteDocuments = - [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache]; - - [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^( - NSNumber *targetIDNumber, FSTTargetChange *change, BOOL *stop) { - FSTTargetID targetID = targetIDNumber.intValue; - - // Do not ref/unref unassigned targetIDs - it may lead to leaks. - FSTQueryData *queryData = self.targetIDs[targetIDNumber]; - if (!queryData) { - return; - } - - FSTTargetMapping *mapping = change.mapping; - if (mapping) { - // First make sure that all references are deleted. - if ([mapping isKindOfClass:[FSTResetMapping class]]) { - FSTResetMapping *reset = (FSTResetMapping *)mapping; - [queryCache removeMatchingKeysForTargetID:targetID group:group]; - [queryCache addMatchingKeys:reset.documents forTargetID:targetID group:group]; - - } else if ([mapping isKindOfClass:[FSTUpdateMapping class]]) { - FSTUpdateMapping *update = (FSTUpdateMapping *)mapping; - [queryCache removeMatchingKeys:update.removedDocuments forTargetID:targetID group:group]; - [queryCache addMatchingKeys:update.addedDocuments forTargetID:targetID group:group]; - - } else { - FSTFail(@"Unknown mapping type: %@", mapping); - } - } - - // Update the resume token if the change includes one. Don't clear any preexisting value. - NSData *resumeToken = change.resumeToken; - if (resumeToken.length > 0) { - queryData = [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion - resumeToken:resumeToken]; - self.targetIDs[targetIDNumber] = queryData; - [self.queryCache addQueryData:queryData group:group]; - } - }]; - - // TODO(klimt): This could probably be an NSMutableDictionary. - __block FSTDocumentKeySet *changedDocKeys = [FSTDocumentKeySet keySet]; - [remoteEvent.documentUpdates - enumerateKeysAndObjectsUsingBlock:^(FSTDocumentKey *key, FSTMaybeDocument *doc, BOOL *stop) { - changedDocKeys = [changedDocKeys setByAddingObject:key]; - FSTMaybeDocument *existingDoc = [remoteDocuments entryForKey:key]; - // Make sure we don't apply an old document version to the remote cache, though we - // make an exception for [SnapshotVersion noVersion] which can happen for manufactured - // events (e.g. in the case of a limbo document resolution failing). - if (!existingDoc || [doc.version isEqual:[FSTSnapshotVersion noVersion]] || - [doc.version compare:existingDoc.version] != NSOrderedAscending) { - [remoteDocuments addEntry:doc]; - } else { - FSTLog( - @"FSTLocalStore Ignoring outdated watch update for %@. " - "Current version: %@ Watch version: %@", - key, existingDoc.version, doc.version); - } - - // The document might be garbage because it was unreferenced by everything. - // Make sure to mark it as garbage if it is... - [self.garbageCollector addPotentialGarbageKey:key]; - }]; - - // HACK: The only reason we allow omitting snapshot version is so we can synthesize remote events - // when we get permission denied errors while trying to resolve the state of a locally cached - // document that is in limbo. - FSTSnapshotVersion *lastRemoteVersion = [self.queryCache lastRemoteSnapshotVersion]; - FSTSnapshotVersion *remoteVersion = remoteEvent.snapshotVersion; - if (![remoteVersion isEqual:[FSTSnapshotVersion noVersion]]) { - FSTAssert([remoteVersion compare:lastRemoteVersion] != NSOrderedAscending, - @"Watch stream reverted to previous snapshot?? (%@ < %@)", remoteVersion, - lastRemoteVersion); - [self.queryCache setLastRemoteSnapshotVersion:remoteVersion group:group]; - } - - FSTDocumentKeySet *releasedWriteKeys = - [self releaseHeldBatchResultsWithGroup:group remoteDocuments:remoteDocuments]; - - [remoteDocuments applyToWriteGroup:group]; - - [self.persistence commitGroup:group]; - - // Union the two key sets. - __block FSTDocumentKeySet *keysToRecalc = changedDocKeys; - [releasedWriteKeys enumerateObjectsUsingBlock:^(FSTDocumentKey *key, BOOL *stop) { - keysToRecalc = [keysToRecalc setByAddingObject:key]; - }]; - - return [self.localDocuments documentsForKeys:keysToRecalc]; -} - -- (void)notifyLocalViewChanges:(NSArray *)viewChanges { - FSTReferenceSet *localViewReferences = self.localViewReferences; - for (FSTLocalViewChanges *view in viewChanges) { - FSTQueryData *queryData = [self.queryCache queryDataForQuery:view.query]; - FSTAssert(queryData, @"Local view changes contain unallocated query."); - FSTTargetID targetID = queryData.targetID; - [localViewReferences addReferencesToKeys:view.addedKeys forID:targetID]; - [localViewReferences removeReferencesToKeys:view.removedKeys forID:targetID]; - } -} - -- (nullable FSTMutationBatch *)nextMutationBatchAfterBatchID:(FSTBatchID)batchID { - return [self.mutationQueue nextMutationBatchAfterBatchID:batchID]; -} - -- (nullable FSTMaybeDocument *)readDocument:(FSTDocumentKey *)key { - return [self.localDocuments documentForKey:key]; -} - -- (FSTQueryData *)allocateQuery:(FSTQuery *)query { - FSTQueryData *cached = [self.queryCache queryDataForQuery:query]; - FSTTargetID targetID; - FSTListenSequenceNumber sequenceNumber = [self.listenSequence next]; - if (cached) { - // This query has been listened to previously, so reuse the previous targetID. - // TODO(mcg): freshen last accessed date? - targetID = cached.targetID; - } else { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Allocate query"]; - - targetID = [self.targetIDGenerator nextID]; - cached = [[FSTQueryData alloc] initWithQuery:query - targetID:targetID - listenSequenceNumber:sequenceNumber - purpose:FSTQueryPurposeListen]; - [self.queryCache addQueryData:cached group:group]; - - [self.persistence commitGroup:group]; - } - - // Sanity check to ensure that even when resuming a query it's not currently active. - FSTBoxedTargetID *boxedTargetID = @(targetID); - FSTAssert(!self.targetIDs[boxedTargetID], @"Tried to allocate an already allocated query: %@", - query); - self.targetIDs[boxedTargetID] = cached; - return cached; -} - -- (void)releaseQuery:(FSTQuery *)query { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Release query"]; - - FSTQueryData *queryData = [self.queryCache queryDataForQuery:query]; - FSTAssert(queryData, @"Tried to release nonexistent query: %@", query); - - [self.localViewReferences removeReferencesForID:queryData.targetID]; - if (self.garbageCollector.isEager) { - [self.queryCache removeQueryData:queryData group:group]; - } - [self.targetIDs removeObjectForKey:@(queryData.targetID)]; - - // If this was the last watch target, then we won't get any more watch snapshots, so we should - // release any held batch results. - if ([self.targetIDs count] == 0) { - FSTRemoteDocumentChangeBuffer *remoteDocuments = - [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache]; - - [self releaseHeldBatchResultsWithGroup:group remoteDocuments:remoteDocuments]; - - [remoteDocuments applyToWriteGroup:group]; - } - - [self.persistence commitGroup:group]; -} - -- (FSTDocumentDictionary *)executeQuery:(FSTQuery *)query { - return [self.localDocuments documentsMatchingQuery:query]; -} - -- (FSTDocumentKeySet *)remoteDocumentKeysForTarget:(FSTTargetID)targetID { - return [self.queryCache matchingKeysForTargetID:targetID]; -} - -- (void)collectGarbage { - // Call collectGarbage regardless of whether isGCEnabled so the referenceSet doesn't continue to - // accumulate the garbage keys. - NSSet *garbage = [self.garbageCollector collectGarbage]; - if (garbage.count > 0) { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Garbage Collection"]; - for (FSTDocumentKey *key in garbage) { - [self.remoteDocumentCache removeEntryForKey:key group:group]; - } - [self.persistence commitGroup:group]; - } -} - -/** - * Releases all the held mutation batches up to the current remote version received, and - * applies their mutations to the docs in the remote documents cache. - * - * @return the set of keys of docs that were modified by those writes. - */ -- (FSTDocumentKeySet *)releaseHeldBatchResultsWithGroup:(FSTWriteGroup *)group - remoteDocuments: - (FSTRemoteDocumentChangeBuffer *)remoteDocuments { - NSMutableArray *toRelease = [NSMutableArray array]; - for (FSTMutationBatchResult *batchResult in self.heldBatchResults) { - if (![self isRemoteUpToVersion:batchResult.commitVersion]) { - break; - } - [toRelease addObject:batchResult]; - } - - if (toRelease.count == 0) { - return [FSTDocumentKeySet keySet]; - } else { - [self.heldBatchResults removeObjectsInRange:NSMakeRange(0, toRelease.count)]; - return [self releaseBatchResults:toRelease group:group remoteDocuments:remoteDocuments]; - } -} - -- (BOOL)isRemoteUpToVersion:(FSTSnapshotVersion *)version { - // If there are no watch targets, then we won't get remote snapshots, and are always "up-to-date." - return [version compare:self.queryCache.lastRemoteSnapshotVersion] != NSOrderedDescending || - self.targetIDs.count == 0; -} - -- (BOOL)shouldHoldBatchResultWithVersion:(FSTSnapshotVersion *)version { - // Check if watcher isn't up to date or prior results are already held. - return ![self isRemoteUpToVersion:version] || self.heldBatchResults.count > 0; -} - -- (FSTDocumentKeySet *)releaseBatchResults:(NSArray *)batchResults - group:(FSTWriteGroup *)group - remoteDocuments:(FSTRemoteDocumentChangeBuffer *)remoteDocuments { - NSMutableArray *batches = [NSMutableArray array]; - for (FSTMutationBatchResult *batchResult in batchResults) { - [self applyBatchResult:batchResult toRemoteDocuments:remoteDocuments]; - [batches addObject:batchResult.batch]; - } - - return [self removeMutationBatches:batches group:group]; -} - -- (FSTDocumentKeySet *)removeMutationBatch:(FSTMutationBatch *)batch group:(FSTWriteGroup *)group { - return [self removeMutationBatches:@[ batch ] group:group]; -} - -/** Removes all the mutation batches named in the given array. */ -- (FSTDocumentKeySet *)removeMutationBatches:(NSArray *)batches - group:(FSTWriteGroup *)group { - // TODO(klimt): Could this be an NSMutableDictionary? - __block FSTDocumentKeySet *affectedDocs = [FSTDocumentKeySet keySet]; - - for (FSTMutationBatch *batch in batches) { - for (FSTMutation *mutation in batch.mutations) { - FSTDocumentKey *key = mutation.key; - affectedDocs = [affectedDocs setByAddingObject:key]; - } - } - - [self.mutationQueue removeMutationBatches:batches group:group]; - - return affectedDocs; -} - -- (void)applyBatchResult:(FSTMutationBatchResult *)batchResult - toRemoteDocuments:(FSTRemoteDocumentChangeBuffer *)remoteDocuments { - FSTMutationBatch *batch = batchResult.batch; - FSTDocumentKeySet *docKeys = batch.keys; - [docKeys enumerateObjectsUsingBlock:^(FSTDocumentKey *docKey, BOOL *stop) { - FSTMaybeDocument *_Nullable remoteDoc = [remoteDocuments entryForKey:docKey]; - FSTMaybeDocument *_Nullable doc = remoteDoc; - FSTSnapshotVersion *ackVersion = batchResult.docVersions[docKey]; - FSTAssert(ackVersion, @"docVersions should contain every doc in the write."); - if (!doc || [doc.version compare:ackVersion] == NSOrderedAscending) { - doc = [batch applyTo:doc documentKey:docKey mutationBatchResult:batchResult]; - if (!doc) { - FSTAssert(!remoteDoc, @"Mutation batch %@ applied to document %@ resulted in nil.", batch, - remoteDoc); - } else { - [remoteDocuments addEntry:doc]; - } - } - }]; -} - -@end - -NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Local/FSTLocalStore.mm b/Firestore/Source/Local/FSTLocalStore.mm new file mode 100644 index 0000000..fa77e37 --- /dev/null +++ b/Firestore/Source/Local/FSTLocalStore.mm @@ -0,0 +1,559 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "Firestore/Source/Local/FSTLocalStore.h" + +#import "Firestore/Source/Auth/FSTUser.h" +#import "Firestore/Source/Core/FSTListenSequence.h" +#import "Firestore/Source/Core/FSTQuery.h" +#import "Firestore/Source/Core/FSTSnapshotVersion.h" +#import "Firestore/Source/Core/FSTTimestamp.h" +#import "Firestore/Source/Local/FSTGarbageCollector.h" +#import "Firestore/Source/Local/FSTLocalDocumentsView.h" +#import "Firestore/Source/Local/FSTLocalViewChanges.h" +#import "Firestore/Source/Local/FSTLocalWriteResult.h" +#import "Firestore/Source/Local/FSTMutationQueue.h" +#import "Firestore/Source/Local/FSTPersistence.h" +#import "Firestore/Source/Local/FSTQueryCache.h" +#import "Firestore/Source/Local/FSTQueryData.h" +#import "Firestore/Source/Local/FSTReferenceSet.h" +#import "Firestore/Source/Local/FSTRemoteDocumentCache.h" +#import "Firestore/Source/Local/FSTRemoteDocumentChangeBuffer.h" +#import "Firestore/Source/Model/FSTDocument.h" +#import "Firestore/Source/Model/FSTDocumentDictionary.h" +#import "Firestore/Source/Model/FSTDocumentKey.h" +#import "Firestore/Source/Model/FSTMutation.h" +#import "Firestore/Source/Model/FSTMutationBatch.h" +#import "Firestore/Source/Remote/FSTRemoteEvent.h" +#import "Firestore/Source/Util/FSTAssert.h" +#import "Firestore/Source/Util/FSTLogger.h" + +#include "Firestore/core/src/firebase/firestore/core/target_id_generator.h" + +NS_ASSUME_NONNULL_BEGIN + +@interface FSTLocalStore () + +/** Manages our in-memory or durable persistence. */ +@property(nonatomic, strong, readonly) id persistence; + +/** The set of all mutations that have been sent but not yet been applied to the backend. */ +@property(nonatomic, strong) id mutationQueue; + +/** The set of all cached remote documents. */ +@property(nonatomic, strong) id remoteDocumentCache; + +/** The "local" view of all documents (layering mutationQueue on top of remoteDocumentCache). */ +@property(nonatomic, strong) FSTLocalDocumentsView *localDocuments; + +/** The set of document references maintained by any local views. */ +@property(nonatomic, strong) FSTReferenceSet *localViewReferences; + +/** + * The garbage collector collects documents that should no longer be cached (e.g. if they are no + * longer retained by the above reference sets and the garbage collector is performing eager + * collection). + */ +@property(nonatomic, strong) id garbageCollector; + +/** Maps a query to the data about that query. */ +@property(nonatomic, strong) id queryCache; + +/** Maps a targetID to data about its query. */ +@property(nonatomic, strong) NSMutableDictionary *targetIDs; + +@property(nonatomic, strong) FSTListenSequence *listenSequence; + +/** + * A heldBatchResult is a mutation batch result (from a write acknowledgement) that arrived before + * the watch stream got notified of a snapshot that includes the write.  So we "hold" it until + * the watch stream catches up. It ensures that the local write remains visible (latency + * compensation) and doesn't temporarily appear reverted because the watch stream is slower than + * the write stream and so wasn't reflecting it. + * + * NOTE: Eventually we want to move this functionality into the remote store. + */ +@property(nonatomic, strong) NSMutableArray *heldBatchResults; + +@end + +@implementation FSTLocalStore { + /** Used to generate targetIDs for queries tracked locally. */ + firebase::firestore::core::TargetIdGenerator _targetIDGenerator; +} + +- (instancetype)initWithPersistence:(id)persistence + garbageCollector:(id)garbageCollector + initialUser:(FSTUser *)initialUser { + if (self = [super init]) { + _persistence = persistence; + _mutationQueue = [persistence mutationQueueForUser:initialUser]; + _remoteDocumentCache = [persistence remoteDocumentCache]; + _queryCache = [persistence queryCache]; + _localDocuments = [FSTLocalDocumentsView viewWithRemoteDocumentCache:_remoteDocumentCache + mutationQueue:_mutationQueue]; + _localViewReferences = [[FSTReferenceSet alloc] init]; + + _garbageCollector = garbageCollector; + [_garbageCollector addGarbageSource:_queryCache]; + [_garbageCollector addGarbageSource:_localViewReferences]; + [_garbageCollector addGarbageSource:_mutationQueue]; + + _targetIDs = [NSMutableDictionary dictionary]; + _heldBatchResults = [NSMutableArray array]; + + _targetIDGenerator = + firebase::firestore::core::TargetIdGenerator::LocalStoreTargetIdGenerator(0); + } + return self; +} + +- (void)start { + [self startMutationQueue]; + [self startQueryCache]; +} + +- (void)startMutationQueue { + FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Start MutationQueue"]; + [self.mutationQueue startWithGroup:group]; + + // If we have any leftover mutation batch results from a prior run, just drop them. + // TODO(http://b/33446471): We probably need to repopulate heldBatchResults or similar instead, + // but that is not straightforward since we're not persisting the write ack versions. + [self.heldBatchResults removeAllObjects]; + + // TODO(mikelehen): This is the only usage of getAllMutationBatchesThroughBatchId:. Consider + // removing it in favor of a getAcknowledgedBatches method. + FSTBatchID highestAck = [self.mutationQueue highestAcknowledgedBatchID]; + if (highestAck != kFSTBatchIDUnknown) { + NSArray *batches = + [self.mutationQueue allMutationBatchesThroughBatchID:highestAck]; + if (batches.count > 0) { + // NOTE: This could be more efficient if we had a removeBatchesThroughBatchID, but this set + // should be very small and this code should go away eventually. + [self.mutationQueue removeMutationBatches:batches group:group]; + } + } + [self.persistence commitGroup:group]; +} + +- (void)startQueryCache { + [self.queryCache start]; + + FSTTargetID targetID = [self.queryCache highestTargetID]; + _targetIDGenerator = + firebase::firestore::core::TargetIdGenerator::LocalStoreTargetIdGenerator(targetID); + FSTListenSequenceNumber sequenceNumber = [self.queryCache highestListenSequenceNumber]; + self.listenSequence = [[FSTListenSequence alloc] initStartingAfter:sequenceNumber]; +} + +- (void)shutdown { + [self.mutationQueue shutdown]; + [self.remoteDocumentCache shutdown]; + [self.queryCache shutdown]; +} + +- (FSTMaybeDocumentDictionary *)userDidChange:(FSTUser *)user { + // Swap out the mutation queue, grabbing the pending mutation batches before and after. + NSArray *oldBatches = [self.mutationQueue allMutationBatches]; + + [self.mutationQueue shutdown]; + [self.garbageCollector removeGarbageSource:self.mutationQueue]; + + self.mutationQueue = [self.persistence mutationQueueForUser:user]; + [self.garbageCollector addGarbageSource:self.mutationQueue]; + + [self startMutationQueue]; + + NSArray *newBatches = [self.mutationQueue allMutationBatches]; + + // Recreate our LocalDocumentsView using the new MutationQueue. + self.localDocuments = [FSTLocalDocumentsView viewWithRemoteDocumentCache:self.remoteDocumentCache + mutationQueue:self.mutationQueue]; + + // Union the old/new changed keys. + FSTDocumentKeySet *changedKeys = [FSTDocumentKeySet keySet]; + for (NSArray *batches in @[ oldBatches, newBatches ]) { + for (FSTMutationBatch *batch in batches) { + for (FSTMutation *mutation in batch.mutations) { + changedKeys = [changedKeys setByAddingObject:mutation.key]; + } + } + } + + // Return the set of all (potentially) changed documents as the result of the user change. + return [self.localDocuments documentsForKeys:changedKeys]; +} + +- (FSTLocalWriteResult *)locallyWriteMutations:(NSArray *)mutations { + FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Locally write mutations"]; + FSTTimestamp *localWriteTime = [FSTTimestamp timestamp]; + FSTMutationBatch *batch = [self.mutationQueue addMutationBatchWithWriteTime:localWriteTime + mutations:mutations + group:group]; + [self.persistence commitGroup:group]; + + FSTDocumentKeySet *keys = [batch keys]; + FSTMaybeDocumentDictionary *changedDocuments = [self.localDocuments documentsForKeys:keys]; + return [FSTLocalWriteResult resultForBatchID:batch.batchID changes:changedDocuments]; +} + +- (FSTMaybeDocumentDictionary *)acknowledgeBatchWithResult:(FSTMutationBatchResult *)batchResult { + FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Acknowledge batch"]; + id mutationQueue = self.mutationQueue; + + [mutationQueue acknowledgeBatch:batchResult.batch + streamToken:batchResult.streamToken + group:group]; + + FSTDocumentKeySet *affected; + if ([self shouldHoldBatchResultWithVersion:batchResult.commitVersion]) { + [self.heldBatchResults addObject:batchResult]; + affected = [FSTDocumentKeySet keySet]; + } else { + FSTRemoteDocumentChangeBuffer *remoteDocuments = + [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache]; + + affected = + [self releaseBatchResults:@[ batchResult ] group:group remoteDocuments:remoteDocuments]; + + [remoteDocuments applyToWriteGroup:group]; + } + + [self.persistence commitGroup:group]; + [self.mutationQueue performConsistencyCheck]; + + return [self.localDocuments documentsForKeys:affected]; +} + +- (FSTMaybeDocumentDictionary *)rejectBatchID:(FSTBatchID)batchID { + FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Reject batch"]; + + FSTMutationBatch *toReject = [self.mutationQueue lookupMutationBatch:batchID]; + FSTAssert(toReject, @"Attempt to reject nonexistent batch!"); + + FSTBatchID lastAcked = [self.mutationQueue highestAcknowledgedBatchID]; + FSTAssert(batchID > lastAcked, @"Acknowledged batches can't be rejected."); + + FSTDocumentKeySet *affected = [self removeMutationBatch:toReject group:group]; + + [self.persistence commitGroup:group]; + [self.mutationQueue performConsistencyCheck]; + + return [self.localDocuments documentsForKeys:affected]; +} + +- (nullable NSData *)lastStreamToken { + return [self.mutationQueue lastStreamToken]; +} + +- (void)setLastStreamToken:(nullable NSData *)streamToken { + FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Set stream token"]; + + [self.mutationQueue setLastStreamToken:streamToken group:group]; + [self.persistence commitGroup:group]; +} + +- (FSTSnapshotVersion *)lastRemoteSnapshotVersion { + return [self.queryCache lastRemoteSnapshotVersion]; +} + +- (FSTMaybeDocumentDictionary *)applyRemoteEvent:(FSTRemoteEvent *)remoteEvent { + id queryCache = self.queryCache; + + FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Apply remote event"]; + FSTRemoteDocumentChangeBuffer *remoteDocuments = + [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache]; + + [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^( + NSNumber *targetIDNumber, FSTTargetChange *change, BOOL *stop) { + FSTTargetID targetID = targetIDNumber.intValue; + + // Do not ref/unref unassigned targetIDs - it may lead to leaks. + FSTQueryData *queryData = self.targetIDs[targetIDNumber]; + if (!queryData) { + return; + } + + FSTTargetMapping *mapping = change.mapping; + if (mapping) { + // First make sure that all references are deleted. + if ([mapping isKindOfClass:[FSTResetMapping class]]) { + FSTResetMapping *reset = (FSTResetMapping *)mapping; + [queryCache removeMatchingKeysForTargetID:targetID group:group]; + [queryCache addMatchingKeys:reset.documents forTargetID:targetID group:group]; + + } else if ([mapping isKindOfClass:[FSTUpdateMapping class]]) { + FSTUpdateMapping *update = (FSTUpdateMapping *)mapping; + [queryCache removeMatchingKeys:update.removedDocuments forTargetID:targetID group:group]; + [queryCache addMatchingKeys:update.addedDocuments forTargetID:targetID group:group]; + + } else { + FSTFail(@"Unknown mapping type: %@", mapping); + } + } + + // Update the resume token if the change includes one. Don't clear any preexisting value. + NSData *resumeToken = change.resumeToken; + if (resumeToken.length > 0) { + queryData = [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion + resumeToken:resumeToken]; + self.targetIDs[targetIDNumber] = queryData; + [self.queryCache addQueryData:queryData group:group]; + } + }]; + + // TODO(klimt): This could probably be an NSMutableDictionary. + __block FSTDocumentKeySet *changedDocKeys = [FSTDocumentKeySet keySet]; + [remoteEvent.documentUpdates + enumerateKeysAndObjectsUsingBlock:^(FSTDocumentKey *key, FSTMaybeDocument *doc, BOOL *stop) { + changedDocKeys = [changedDocKeys setByAddingObject:key]; + FSTMaybeDocument *existingDoc = [remoteDocuments entryForKey:key]; + // Make sure we don't apply an old document version to the remote cache, though we + // make an exception for [SnapshotVersion noVersion] which can happen for manufactured + // events (e.g. in the case of a limbo document resolution failing). + if (!existingDoc || [doc.version isEqual:[FSTSnapshotVersion noVersion]] || + [doc.version compare:existingDoc.version] != NSOrderedAscending) { + [remoteDocuments addEntry:doc]; + } else { + FSTLog( + @"FSTLocalStore Ignoring outdated watch update for %@. " + "Current version: %@ Watch version: %@", + key, existingDoc.version, doc.version); + } + + // The document might be garbage because it was unreferenced by everything. + // Make sure to mark it as garbage if it is... + [self.garbageCollector addPotentialGarbageKey:key]; + }]; + + // HACK: The only reason we allow omitting snapshot version is so we can synthesize remote events + // when we get permission denied errors while trying to resolve the state of a locally cached + // document that is in limbo. + FSTSnapshotVersion *lastRemoteVersion = [self.queryCache lastRemoteSnapshotVersion]; + FSTSnapshotVersion *remoteVersion = remoteEvent.snapshotVersion; + if (![remoteVersion isEqual:[FSTSnapshotVersion noVersion]]) { + FSTAssert([remoteVersion compare:lastRemoteVersion] != NSOrderedAscending, + @"Watch stream reverted to previous snapshot?? (%@ < %@)", remoteVersion, + lastRemoteVersion); + [self.queryCache setLastRemoteSnapshotVersion:remoteVersion group:group]; + } + + FSTDocumentKeySet *releasedWriteKeys = + [self releaseHeldBatchResultsWithGroup:group remoteDocuments:remoteDocuments]; + + [remoteDocuments applyToWriteGroup:group]; + + [self.persistence commitGroup:group]; + + // Union the two key sets. + __block FSTDocumentKeySet *keysToRecalc = changedDocKeys; + [releasedWriteKeys enumerateObjectsUsingBlock:^(FSTDocumentKey *key, BOOL *stop) { + keysToRecalc = [keysToRecalc setByAddingObject:key]; + }]; + + return [self.localDocuments documentsForKeys:keysToRecalc]; +} + +- (void)notifyLocalViewChanges:(NSArray *)viewChanges { + FSTReferenceSet *localViewReferences = self.localViewReferences; + for (FSTLocalViewChanges *view in viewChanges) { + FSTQueryData *queryData = [self.queryCache queryDataForQuery:view.query]; + FSTAssert(queryData, @"Local view changes contain unallocated query."); + FSTTargetID targetID = queryData.targetID; + [localViewReferences addReferencesToKeys:view.addedKeys forID:targetID]; + [localViewReferences removeReferencesToKeys:view.removedKeys forID:targetID]; + } +} + +- (nullable FSTMutationBatch *)nextMutationBatchAfterBatchID:(FSTBatchID)batchID { + return [self.mutationQueue nextMutationBatchAfterBatchID:batchID]; +} + +- (nullable FSTMaybeDocument *)readDocument:(FSTDocumentKey *)key { + return [self.localDocuments documentForKey:key]; +} + +- (FSTQueryData *)allocateQuery:(FSTQuery *)query { + FSTQueryData *cached = [self.queryCache queryDataForQuery:query]; + FSTTargetID targetID; + FSTListenSequenceNumber sequenceNumber = [self.listenSequence next]; + if (cached) { + // This query has been listened to previously, so reuse the previous targetID. + // TODO(mcg): freshen last accessed date? + targetID = cached.targetID; + } else { + FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Allocate query"]; + + targetID = _targetIDGenerator.NextId(); + cached = [[FSTQueryData alloc] initWithQuery:query + targetID:targetID + listenSequenceNumber:sequenceNumber + purpose:FSTQueryPurposeListen]; + [self.queryCache addQueryData:cached group:group]; + + [self.persistence commitGroup:group]; + } + + // Sanity check to ensure that even when resuming a query it's not currently active. + FSTBoxedTargetID *boxedTargetID = @(targetID); + FSTAssert(!self.targetIDs[boxedTargetID], @"Tried to allocate an already allocated query: %@", + query); + self.targetIDs[boxedTargetID] = cached; + return cached; +} + +- (void)releaseQuery:(FSTQuery *)query { + FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Release query"]; + + FSTQueryData *queryData = [self.queryCache queryDataForQuery:query]; + FSTAssert(queryData, @"Tried to release nonexistent query: %@", query); + + [self.localViewReferences removeReferencesForID:queryData.targetID]; + if (self.garbageCollector.isEager) { + [self.queryCache removeQueryData:queryData group:group]; + } + [self.targetIDs removeObjectForKey:@(queryData.targetID)]; + + // If this was the last watch target, then we won't get any more watch snapshots, so we should + // release any held batch results. + if ([self.targetIDs count] == 0) { + FSTRemoteDocumentChangeBuffer *remoteDocuments = + [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache]; + + [self releaseHeldBatchResultsWithGroup:group remoteDocuments:remoteDocuments]; + + [remoteDocuments applyToWriteGroup:group]; + } + + [self.persistence commitGroup:group]; +} + +- (FSTDocumentDictionary *)executeQuery:(FSTQuery *)query { + return [self.localDocuments documentsMatchingQuery:query]; +} + +- (FSTDocumentKeySet *)remoteDocumentKeysForTarget:(FSTTargetID)targetID { + return [self.queryCache matchingKeysForTargetID:targetID]; +} + +- (void)collectGarbage { + // Call collectGarbage regardless of whether isGCEnabled so the referenceSet doesn't continue to + // accumulate the garbage keys. + NSSet *garbage = [self.garbageCollector collectGarbage]; + if (garbage.count > 0) { + FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Garbage Collection"]; + for (FSTDocumentKey *key in garbage) { + [self.remoteDocumentCache removeEntryForKey:key group:group]; + } + [self.persistence commitGroup:group]; + } +} + +/** + * Releases all the held mutation batches up to the current remote version received, and + * applies their mutations to the docs in the remote documents cache. + * + * @return the set of keys of docs that were modified by those writes. + */ +- (FSTDocumentKeySet *)releaseHeldBatchResultsWithGroup:(FSTWriteGroup *)group + remoteDocuments: + (FSTRemoteDocumentChangeBuffer *)remoteDocuments { + NSMutableArray *toRelease = [NSMutableArray array]; + for (FSTMutationBatchResult *batchResult in self.heldBatchResults) { + if (![self isRemoteUpToVersion:batchResult.commitVersion]) { + break; + } + [toRelease addObject:batchResult]; + } + + if (toRelease.count == 0) { + return [FSTDocumentKeySet keySet]; + } else { + [self.heldBatchResults removeObjectsInRange:NSMakeRange(0, toRelease.count)]; + return [self releaseBatchResults:toRelease group:group remoteDocuments:remoteDocuments]; + } +} + +- (BOOL)isRemoteUpToVersion:(FSTSnapshotVersion *)version { + // If there are no watch targets, then we won't get remote snapshots, and are always "up-to-date." + return [version compare:self.queryCache.lastRemoteSnapshotVersion] != NSOrderedDescending || + self.targetIDs.count == 0; +} + +- (BOOL)shouldHoldBatchResultWithVersion:(FSTSnapshotVersion *)version { + // Check if watcher isn't up to date or prior results are already held. + return ![self isRemoteUpToVersion:version] || self.heldBatchResults.count > 0; +} + +- (FSTDocumentKeySet *)releaseBatchResults:(NSArray *)batchResults + group:(FSTWriteGroup *)group + remoteDocuments:(FSTRemoteDocumentChangeBuffer *)remoteDocuments { + NSMutableArray *batches = [NSMutableArray array]; + for (FSTMutationBatchResult *batchResult in batchResults) { + [self applyBatchResult:batchResult toRemoteDocuments:remoteDocuments]; + [batches addObject:batchResult.batch]; + } + + return [self removeMutationBatches:batches group:group]; +} + +- (FSTDocumentKeySet *)removeMutationBatch:(FSTMutationBatch *)batch group:(FSTWriteGroup *)group { + return [self removeMutationBatches:@[ batch ] group:group]; +} + +/** Removes all the mutation batches named in the given array. */ +- (FSTDocumentKeySet *)removeMutationBatches:(NSArray *)batches + group:(FSTWriteGroup *)group { + // TODO(klimt): Could this be an NSMutableDictionary? + __block FSTDocumentKeySet *affectedDocs = [FSTDocumentKeySet keySet]; + + for (FSTMutationBatch *batch in batches) { + for (FSTMutation *mutation in batch.mutations) { + FSTDocumentKey *key = mutation.key; + affectedDocs = [affectedDocs setByAddingObject:key]; + } + } + + [self.mutationQueue removeMutationBatches:batches group:group]; + + return affectedDocs; +} + +- (void)applyBatchResult:(FSTMutationBatchResult *)batchResult + toRemoteDocuments:(FSTRemoteDocumentChangeBuffer *)remoteDocuments { + FSTMutationBatch *batch = batchResult.batch; + FSTDocumentKeySet *docKeys = batch.keys; + [docKeys enumerateObjectsUsingBlock:^(FSTDocumentKey *docKey, BOOL *stop) { + FSTMaybeDocument *_Nullable remoteDoc = [remoteDocuments entryForKey:docKey]; + FSTMaybeDocument *_Nullable doc = remoteDoc; + FSTSnapshotVersion *ackVersion = batchResult.docVersions[docKey]; + FSTAssert(ackVersion, @"docVersions should contain every doc in the write."); + if (!doc || [doc.version compare:ackVersion] == NSOrderedAscending) { + doc = [batch applyTo:doc documentKey:docKey mutationBatchResult:batchResult]; + if (!doc) { + FSTAssert(!remoteDoc, @"Mutation batch %@ applied to document %@ resulted in nil.", batch, + remoteDoc); + } else { + [remoteDocuments addEntry:doc]; + } + } + }]; +} + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/core/src/firebase/firestore/core/target_id_generator.h b/Firestore/core/src/firebase/firestore/core/target_id_generator.h index 345f141..7d30cf9 100644 --- a/Firestore/core/src/firebase/firestore/core/target_id_generator.h +++ b/Firestore/core/src/firebase/firestore/core/target_id_generator.h @@ -37,6 +37,9 @@ enum class TargetIdGeneratorId { LocalStore = 0, SyncEngine = 1 }; */ class TargetIdGenerator { public: + // Makes Objective-C++ code happy to provide a default ctor. + TargetIdGenerator() = default; + TargetIdGenerator(const TargetIdGenerator& value); /** -- cgit v1.2.3