From 3f36f5467de4c191fa2903743e5a210420e9d49a Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Fri, 30 Mar 2018 10:18:25 -0700 Subject: Drop FSTWriteGroup (#986) * Drop write group from remote document change buffer * Unwind some group dependendencies in local store * Write group dropped from local store * Drop write group from mutation queue tests * Drop write group usage from query cache tests * Drop write groups from remote document cache tests * Drop write groups from remote document change buffer tests * Drop write groups and the write group tracker * Style * Put the action in transaction * Merge master, fix test * Fix some compiler warnings but mostly trigger travis * Responses to feedback --- Firestore/Source/Local/FSTLocalStore.mm | 472 +++++++++++++++----------------- 1 file changed, 226 insertions(+), 246 deletions(-) (limited to 'Firestore/Source/Local/FSTLocalStore.mm') diff --git a/Firestore/Source/Local/FSTLocalStore.mm b/Firestore/Source/Local/FSTLocalStore.mm index 74eab48..412ade2 100644 --- a/Firestore/Source/Local/FSTLocalStore.mm +++ b/Firestore/Source/Local/FSTLocalStore.mm @@ -31,7 +31,6 @@ #import "Firestore/Source/Local/FSTReferenceSet.h" #import "Firestore/Source/Local/FSTRemoteDocumentCache.h" #import "Firestore/Source/Local/FSTRemoteDocumentChangeBuffer.h" -#import "Firestore/Source/Local/FSTWriteGroup.h" #import "Firestore/Source/Model/FSTDocument.h" #import "Firestore/Source/Model/FSTDocumentDictionary.h" #import "Firestore/Source/Model/FSTMutation.h" @@ -132,27 +131,27 @@ NS_ASSUME_NONNULL_BEGIN } - (void)startMutationQueue { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Start MutationQueue"]; - [self.mutationQueue start]; - - // If we have any leftover mutation batch results from a prior run, just drop them. - // TODO(http://b/33446471): We probably need to repopulate heldBatchResults or similar instead, - // but that is not straightforward since we're not persisting the write ack versions. - [self.heldBatchResults removeAllObjects]; - - // TODO(mikelehen): This is the only usage of getAllMutationBatchesThroughBatchId:. Consider - // removing it in favor of a getAcknowledgedBatches method. - FSTBatchID highestAck = [self.mutationQueue highestAcknowledgedBatchID]; - if (highestAck != kFSTBatchIDUnknown) { - NSArray *batches = - [self.mutationQueue allMutationBatchesThroughBatchID:highestAck]; - if (batches.count > 0) { - // NOTE: This could be more efficient if we had a removeBatchesThroughBatchID, but this set - // should be very small and this code should go away eventually. - [self.mutationQueue removeMutationBatches:batches]; + self.persistence.run("Start MutationQueue", [&]() { + [self.mutationQueue start]; + + // If we have any leftover mutation batch results from a prior run, just drop them. + // TODO(http://b/33446471): We probably need to repopulate heldBatchResults or similar instead, + // but that is not straightforward since we're not persisting the write ack versions. + [self.heldBatchResults removeAllObjects]; + + // TODO(mikelehen): This is the only usage of getAllMutationBatchesThroughBatchId:. Consider + // removing it in favor of a getAcknowledgedBatches method. + FSTBatchID highestAck = [self.mutationQueue highestAcknowledgedBatchID]; + if (highestAck != kFSTBatchIDUnknown) { + NSArray *batches = + [self.mutationQueue allMutationBatchesThroughBatchID:highestAck]; + if (batches.count > 0) { + // NOTE: This could be more efficient if we had a removeBatchesThroughBatchID, but this set + // should be very small and this code should go away eventually. + [self.mutationQueue removeMutationBatches:batches]; + } } - } - [self.persistence commitGroup:group]; + }); } - (void)startQueryCache { @@ -166,9 +165,9 @@ NS_ASSUME_NONNULL_BEGIN - (FSTMaybeDocumentDictionary *)userDidChange:(const User &)user { // Swap out the mutation queue, grabbing the pending mutation batches before and after. - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"OldBatches"]; - NSArray *oldBatches = [self.mutationQueue allMutationBatches]; - [self.persistence commitGroup:group]; + NSArray *oldBatches = self.persistence.run( + "OldBatches", + [&]() -> NSArray * { return [self.mutationQueue allMutationBatches]; }); [self.garbageCollector removeGarbageSource:self.mutationQueue]; @@ -177,83 +176,79 @@ NS_ASSUME_NONNULL_BEGIN [self startMutationQueue]; - group = [self.persistence startGroupWithAction:@"NewBatches"]; - NSArray *newBatches = [self.mutationQueue allMutationBatches]; - - // Recreate our LocalDocumentsView using the new MutationQueue. - self.localDocuments = [FSTLocalDocumentsView viewWithRemoteDocumentCache:self.remoteDocumentCache - mutationQueue:self.mutationQueue]; - - // Union the old/new changed keys. - FSTDocumentKeySet *changedKeys = [FSTDocumentKeySet keySet]; - for (NSArray *batches in @[ oldBatches, newBatches ]) { - for (FSTMutationBatch *batch in batches) { - for (FSTMutation *mutation in batch.mutations) { - changedKeys = [changedKeys setByAddingObject:mutation.key]; + return self.persistence.run("NewBatches", [&]() -> FSTMaybeDocumentDictionary * { + NSArray *newBatches = [self.mutationQueue allMutationBatches]; + + // Recreate our LocalDocumentsView using the new MutationQueue. + self.localDocuments = + [FSTLocalDocumentsView viewWithRemoteDocumentCache:self.remoteDocumentCache + mutationQueue:self.mutationQueue]; + + // Union the old/new changed keys. + FSTDocumentKeySet *changedKeys = [FSTDocumentKeySet keySet]; + for (NSArray *batches in @[ oldBatches, newBatches ]) { + for (FSTMutationBatch *batch in batches) { + for (FSTMutation *mutation in batch.mutations) { + changedKeys = [changedKeys setByAddingObject:mutation.key]; + } } } - } - // Return the set of all (potentially) changed documents as the result of the user change. - FSTMaybeDocumentDictionary *result = [self.localDocuments documentsForKeys:changedKeys]; - [self.persistence commitGroup:group]; - return result; + // Return the set of all (potentially) changed documents as the result of the user change. + return [self.localDocuments documentsForKeys:changedKeys]; + }); } - (FSTLocalWriteResult *)locallyWriteMutations:(NSArray *)mutations { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Locally write mutations"]; - FIRTimestamp *localWriteTime = [FIRTimestamp timestamp]; - FSTMutationBatch *batch = - [self.mutationQueue addMutationBatchWithWriteTime:localWriteTime mutations:mutations]; - FSTDocumentKeySet *keys = [batch keys]; - FSTMaybeDocumentDictionary *changedDocuments = [self.localDocuments documentsForKeys:keys]; - [self.persistence commitGroup:group]; - return [FSTLocalWriteResult resultForBatchID:batch.batchID changes:changedDocuments]; + return self.persistence.run("Locally write mutations", [&]() -> FSTLocalWriteResult * { + FIRTimestamp *localWriteTime = [FIRTimestamp timestamp]; + FSTMutationBatch *batch = + [self.mutationQueue addMutationBatchWithWriteTime:localWriteTime mutations:mutations]; + FSTDocumentKeySet *keys = [batch keys]; + FSTMaybeDocumentDictionary *changedDocuments = [self.localDocuments documentsForKeys:keys]; + return [FSTLocalWriteResult resultForBatchID:batch.batchID changes:changedDocuments]; + }); } - (FSTMaybeDocumentDictionary *)acknowledgeBatchWithResult:(FSTMutationBatchResult *)batchResult { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Acknowledge batch"]; - id mutationQueue = self.mutationQueue; + return self.persistence.run("Acknowledge batch", [&]() -> FSTMaybeDocumentDictionary * { + id mutationQueue = self.mutationQueue; - [mutationQueue acknowledgeBatch:batchResult.batch streamToken:batchResult.streamToken]; + [mutationQueue acknowledgeBatch:batchResult.batch streamToken:batchResult.streamToken]; - FSTDocumentKeySet *affected; - if ([self shouldHoldBatchResultWithVersion:batchResult.commitVersion]) { - [self.heldBatchResults addObject:batchResult]; - affected = [FSTDocumentKeySet keySet]; - } else { - FSTRemoteDocumentChangeBuffer *remoteDocuments = - [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache]; + FSTDocumentKeySet *affected; + if ([self shouldHoldBatchResultWithVersion:batchResult.commitVersion]) { + [self.heldBatchResults addObject:batchResult]; + affected = [FSTDocumentKeySet keySet]; + } else { + FSTRemoteDocumentChangeBuffer *remoteDocuments = + [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache]; - affected = - [self releaseBatchResults:@[ batchResult ] group:group remoteDocuments:remoteDocuments]; + affected = [self releaseBatchResults:@[ batchResult ] remoteDocuments:remoteDocuments]; - [remoteDocuments applyToWriteGroup:group]; - } + [remoteDocuments apply]; + } - [self.mutationQueue performConsistencyCheck]; + [self.mutationQueue performConsistencyCheck]; - FSTMaybeDocumentDictionary *result = [self.localDocuments documentsForKeys:affected]; - [self.persistence commitGroup:group]; - return result; + return [self.localDocuments documentsForKeys:affected]; + }); } - (FSTMaybeDocumentDictionary *)rejectBatchID:(FSTBatchID)batchID { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Reject batch"]; - - FSTMutationBatch *toReject = [self.mutationQueue lookupMutationBatch:batchID]; - FSTAssert(toReject, @"Attempt to reject nonexistent batch!"); + return self.persistence.run("Reject batch", [&]() -> FSTMaybeDocumentDictionary * { + FSTMutationBatch *toReject = [self.mutationQueue lookupMutationBatch:batchID]; + FSTAssert(toReject, @"Attempt to reject nonexistent batch!"); - FSTBatchID lastAcked = [self.mutationQueue highestAcknowledgedBatchID]; - FSTAssert(batchID > lastAcked, @"Acknowledged batches can't be rejected."); + FSTBatchID lastAcked = [self.mutationQueue highestAcknowledgedBatchID]; + FSTAssert(batchID > lastAcked, @"Acknowledged batches can't be rejected."); - FSTDocumentKeySet *affected = [self removeMutationBatch:toReject group:group]; + FSTDocumentKeySet *affected = [self removeMutationBatch:toReject]; - [self.mutationQueue performConsistencyCheck]; + [self.mutationQueue performConsistencyCheck]; - FSTMaybeDocumentDictionary *result = [self.localDocuments documentsForKeys:affected]; - [self.persistence commitGroup:group]; - return result; + return [self.localDocuments documentsForKeys:affected]; + }); } - (nullable NSData *)lastStreamToken { @@ -261,10 +256,8 @@ NS_ASSUME_NONNULL_BEGIN } - (void)setLastStreamToken:(nullable NSData *)streamToken { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Set stream token"]; - - [self.mutationQueue setLastStreamToken:streamToken]; - [self.persistence commitGroup:group]; + self.persistence.run("Set stream token", + [&]() { [self.mutationQueue setLastStreamToken:streamToken]; }); } - (FSTSnapshotVersion *)lastRemoteSnapshotVersion { @@ -272,207 +265,197 @@ NS_ASSUME_NONNULL_BEGIN } - (FSTMaybeDocumentDictionary *)applyRemoteEvent:(FSTRemoteEvent *)remoteEvent { - id queryCache = self.queryCache; - - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Apply remote event"]; - FSTRemoteDocumentChangeBuffer *remoteDocuments = - [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache]; + return self.persistence.run("Apply remote event", [&]() -> FSTMaybeDocumentDictionary * { + id queryCache = self.queryCache; - [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^( - NSNumber *targetIDNumber, FSTTargetChange *change, BOOL *stop) { - FSTTargetID targetID = targetIDNumber.intValue; + FSTRemoteDocumentChangeBuffer *remoteDocuments = + [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache]; - // Do not ref/unref unassigned targetIDs - it may lead to leaks. - FSTQueryData *queryData = self.targetIDs[targetIDNumber]; - if (!queryData) { - return; - } + [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^( + NSNumber *targetIDNumber, FSTTargetChange *change, BOOL *stop) { + FSTTargetID targetID = targetIDNumber.intValue; - FSTTargetMapping *mapping = change.mapping; - if (mapping) { - // First make sure that all references are deleted. - if ([mapping isKindOfClass:[FSTResetMapping class]]) { - FSTResetMapping *reset = (FSTResetMapping *)mapping; - [queryCache removeMatchingKeysForTargetID:targetID]; - [queryCache addMatchingKeys:reset.documents forTargetID:targetID]; + // Do not ref/unref unassigned targetIDs - it may lead to leaks. + FSTQueryData *queryData = self.targetIDs[targetIDNumber]; + if (!queryData) { + return; + } - } else if ([mapping isKindOfClass:[FSTUpdateMapping class]]) { - FSTUpdateMapping *update = (FSTUpdateMapping *)mapping; - [queryCache removeMatchingKeys:update.removedDocuments forTargetID:targetID]; - [queryCache addMatchingKeys:update.addedDocuments forTargetID:targetID]; + FSTTargetMapping *mapping = change.mapping; + if (mapping) { + // First make sure that all references are deleted. + if ([mapping isKindOfClass:[FSTResetMapping class]]) { + FSTResetMapping *reset = (FSTResetMapping *)mapping; + [queryCache removeMatchingKeysForTargetID:targetID]; + [queryCache addMatchingKeys:reset.documents forTargetID:targetID]; + + } else if ([mapping isKindOfClass:[FSTUpdateMapping class]]) { + FSTUpdateMapping *update = (FSTUpdateMapping *)mapping; + [queryCache removeMatchingKeys:update.removedDocuments forTargetID:targetID]; + [queryCache addMatchingKeys:update.addedDocuments forTargetID:targetID]; + + } else { + FSTFail(@"Unknown mapping type: %@", mapping); + } + } + // Update the resume token if the change includes one. Don't clear any preexisting value. + NSData *resumeToken = change.resumeToken; + if (resumeToken.length > 0) { + queryData = [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion + resumeToken:resumeToken]; + self.targetIDs[targetIDNumber] = queryData; + [self.queryCache updateQueryData:queryData]; + } + }]; + + // TODO(klimt): This could probably be an NSMutableDictionary. + FSTDocumentKeySet *changedDocKeys = [FSTDocumentKeySet keySet]; + for (const auto &kv : remoteEvent.documentUpdates) { + const DocumentKey &key = kv.first; + FSTMaybeDocument *doc = kv.second; + changedDocKeys = [changedDocKeys setByAddingObject:key]; + FSTMaybeDocument *existingDoc = [remoteDocuments entryForKey:key]; + // Make sure we don't apply an old document version to the remote cache, though we + // make an exception for [SnapshotVersion noVersion] which can happen for manufactured + // events (e.g. in the case of a limbo document resolution failing). + if (!existingDoc || [doc.version isEqual:[FSTSnapshotVersion noVersion]] || + [doc.version compare:existingDoc.version] != NSOrderedAscending) { + [remoteDocuments addEntry:doc]; } else { - FSTFail(@"Unknown mapping type: %@", mapping); + FSTLog( + @"FSTLocalStore Ignoring outdated watch update for %s. " + "Current version: %@ Watch version: %@", + key.ToString().c_str(), existingDoc.version, doc.version); } - } - // Update the resume token if the change includes one. Don't clear any preexisting value. - NSData *resumeToken = change.resumeToken; - if (resumeToken.length > 0) { - queryData = [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion - resumeToken:resumeToken]; - self.targetIDs[targetIDNumber] = queryData; - [self.queryCache updateQueryData:queryData]; + // The document might be garbage because it was unreferenced by everything. + // Make sure to mark it as garbage if it is... + [self.garbageCollector addPotentialGarbageKey:key]; } - }]; - // TODO(klimt): This could probably be an NSMutableDictionary. - FSTDocumentKeySet *changedDocKeys = [FSTDocumentKeySet keySet]; - for (const auto &kv : remoteEvent.documentUpdates) { - const DocumentKey &key = kv.first; - FSTMaybeDocument *doc = kv.second; - changedDocKeys = [changedDocKeys setByAddingObject:key]; - FSTMaybeDocument *existingDoc = [remoteDocuments entryForKey:key]; - // Make sure we don't apply an old document version to the remote cache, though we - // make an exception for [SnapshotVersion noVersion] which can happen for manufactured - // events (e.g. in the case of a limbo document resolution failing). - if (!existingDoc || [doc.version isEqual:[FSTSnapshotVersion noVersion]] || - [doc.version compare:existingDoc.version] != NSOrderedAscending) { - [remoteDocuments addEntry:doc]; - } else { - FSTLog( - @"FSTLocalStore Ignoring outdated watch update for %s. " - "Current version: %@ Watch version: %@", - key.ToString().c_str(), existingDoc.version, doc.version); + // HACK: The only reason we allow omitting snapshot version is so we can synthesize remote + // events when we get permission denied errors while trying to resolve the state of a locally + // cached document that is in limbo. + FSTSnapshotVersion *lastRemoteVersion = [self.queryCache lastRemoteSnapshotVersion]; + FSTSnapshotVersion *remoteVersion = remoteEvent.snapshotVersion; + if (![remoteVersion isEqual:[FSTSnapshotVersion noVersion]]) { + FSTAssert([remoteVersion compare:lastRemoteVersion] != NSOrderedAscending, + @"Watch stream reverted to previous snapshot?? (%@ < %@)", remoteVersion, + lastRemoteVersion); + [self.queryCache setLastRemoteSnapshotVersion:remoteVersion]; } - // The document might be garbage because it was unreferenced by everything. - // Make sure to mark it as garbage if it is... - [self.garbageCollector addPotentialGarbageKey:key]; - } - - // HACK: The only reason we allow omitting snapshot version is so we can synthesize remote events - // when we get permission denied errors while trying to resolve the state of a locally cached - // document that is in limbo. - FSTSnapshotVersion *lastRemoteVersion = [self.queryCache lastRemoteSnapshotVersion]; - FSTSnapshotVersion *remoteVersion = remoteEvent.snapshotVersion; - if (![remoteVersion isEqual:[FSTSnapshotVersion noVersion]]) { - FSTAssert([remoteVersion compare:lastRemoteVersion] != NSOrderedAscending, - @"Watch stream reverted to previous snapshot?? (%@ < %@)", remoteVersion, - lastRemoteVersion); - [self.queryCache setLastRemoteSnapshotVersion:remoteVersion]; - } - - FSTDocumentKeySet *releasedWriteKeys = - [self releaseHeldBatchResultsWithGroup:group remoteDocuments:remoteDocuments]; + FSTDocumentKeySet *releasedWriteKeys = + [self releaseHeldBatchResultsWithRemoteDocuments:remoteDocuments]; - [remoteDocuments applyToWriteGroup:group]; + [remoteDocuments apply]; - // Union the two key sets. - __block FSTDocumentKeySet *keysToRecalc = changedDocKeys; - [releasedWriteKeys enumerateObjectsUsingBlock:^(FSTDocumentKey *key, BOOL *stop) { - keysToRecalc = [keysToRecalc setByAddingObject:key]; - }]; + // Union the two key sets. + __block FSTDocumentKeySet *keysToRecalc = changedDocKeys; + [releasedWriteKeys enumerateObjectsUsingBlock:^(FSTDocumentKey *key, BOOL *stop) { + keysToRecalc = [keysToRecalc setByAddingObject:key]; + }]; - FSTMaybeDocumentDictionary *result = [self.localDocuments documentsForKeys:keysToRecalc]; - [self.persistence commitGroup:group]; - return result; + return [self.localDocuments documentsForKeys:keysToRecalc]; + }); } - (void)notifyLocalViewChanges:(NSArray *)viewChanges { - FSTReferenceSet *localViewReferences = self.localViewReferences; - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"NotifyLocalViewChanges"]; - for (FSTLocalViewChanges *view in viewChanges) { - FSTQueryData *queryData = [self.queryCache queryDataForQuery:view.query]; - FSTAssert(queryData, @"Local view changes contain unallocated query."); - FSTTargetID targetID = queryData.targetID; - [localViewReferences addReferencesToKeys:view.addedKeys forID:targetID]; - [localViewReferences removeReferencesToKeys:view.removedKeys forID:targetID]; - } - [self.persistence commitGroup:group]; + self.persistence.run("NotifyLocalViewChanges", [&]() { + FSTReferenceSet *localViewReferences = self.localViewReferences; + for (FSTLocalViewChanges *view in viewChanges) { + FSTQueryData *queryData = [self.queryCache queryDataForQuery:view.query]; + FSTAssert(queryData, @"Local view changes contain unallocated query."); + FSTTargetID targetID = queryData.targetID; + [localViewReferences addReferencesToKeys:view.addedKeys forID:targetID]; + [localViewReferences removeReferencesToKeys:view.removedKeys forID:targetID]; + } + }); } - (nullable FSTMutationBatch *)nextMutationBatchAfterBatchID:(FSTBatchID)batchID { - FSTMutationBatch *result = self.persistence.run([&]() -> FSTMutationBatch * { - return [self.mutationQueue nextMutationBatchAfterBatchID:batchID]; - }); + FSTMutationBatch *result = + self.persistence.run("NextMutationBatchAfterBatchID", [&]() -> FSTMutationBatch * { + return [self.mutationQueue nextMutationBatchAfterBatchID:batchID]; + }); return result; } - (nullable FSTMaybeDocument *)readDocument:(const DocumentKey &)key { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"ReadDocument"]; - FSTMaybeDocument *result = [self.localDocuments documentForKey:key]; - [self.persistence commitGroup:group]; - return result; + return self.persistence.run("ReadDocument", [&]() -> FSTMaybeDocument *_Nullable { + return [self.localDocuments documentForKey:key]; + }); } - (FSTQueryData *)allocateQuery:(FSTQuery *)query { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Allocate query"]; - FSTQueryData *cached = [self.queryCache queryDataForQuery:query]; - FSTTargetID targetID; - FSTListenSequenceNumber sequenceNumber = [self.listenSequence next]; - if (cached) { - // This query has been listened to previously, so reuse the previous targetID. - // TODO(mcg): freshen last accessed date? - targetID = cached.targetID; - } else { - targetID = _targetIDGenerator.NextId(); - cached = [[FSTQueryData alloc] initWithQuery:query - targetID:targetID - listenSequenceNumber:sequenceNumber - purpose:FSTQueryPurposeListen]; - [self.queryCache addQueryData:cached]; - } - [self.persistence commitGroup:group]; + FSTQueryData *queryData = self.persistence.run("Allocate query", [&]() -> FSTQueryData * { + FSTQueryData *cached = [self.queryCache queryDataForQuery:query]; + // TODO(mcg): freshen last accessed date if cached exists? + if (!cached) { + cached = [[FSTQueryData alloc] initWithQuery:query + targetID:_targetIDGenerator.NextId() + listenSequenceNumber:[self.listenSequence next] + purpose:FSTQueryPurposeListen]; + [self.queryCache addQueryData:cached]; + } + return cached; + }); // Sanity check to ensure that even when resuming a query it's not currently active. - FSTBoxedTargetID *boxedTargetID = @(targetID); + FSTBoxedTargetID *boxedTargetID = @(queryData.targetID); FSTAssert(!self.targetIDs[boxedTargetID], @"Tried to allocate an already allocated query: %@", query); - self.targetIDs[boxedTargetID] = cached; - return cached; + self.targetIDs[boxedTargetID] = queryData; + return queryData; } - (void)releaseQuery:(FSTQuery *)query { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Release query"]; - - FSTQueryData *queryData = [self.queryCache queryDataForQuery:query]; - FSTAssert(queryData, @"Tried to release nonexistent query: %@", query); - - [self.localViewReferences removeReferencesForID:queryData.targetID]; - if (self.garbageCollector.isEager) { - [self.queryCache removeQueryData:queryData]; - } - [self.targetIDs removeObjectForKey:@(queryData.targetID)]; + self.persistence.run("Release query", [&]() { + FSTQueryData *queryData = [self.queryCache queryDataForQuery:query]; + FSTAssert(queryData, @"Tried to release nonexistent query: %@", query); - // If this was the last watch target, then we won't get any more watch snapshots, so we should - // release any held batch results. - if ([self.targetIDs count] == 0) { - FSTRemoteDocumentChangeBuffer *remoteDocuments = - [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache]; + [self.localViewReferences removeReferencesForID:queryData.targetID]; + if (self.garbageCollector.isEager) { + [self.queryCache removeQueryData:queryData]; + } + [self.targetIDs removeObjectForKey:@(queryData.targetID)]; - [self releaseHeldBatchResultsWithGroup:group remoteDocuments:remoteDocuments]; + // If this was the last watch target, then we won't get any more watch snapshots, so we should + // release any held batch results. + if ([self.targetIDs count] == 0) { + FSTRemoteDocumentChangeBuffer *remoteDocuments = + [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache]; - [remoteDocuments applyToWriteGroup:group]; - } + [self releaseHeldBatchResultsWithRemoteDocuments:remoteDocuments]; - [self.persistence commitGroup:group]; + [remoteDocuments apply]; + } + }); } - (FSTDocumentDictionary *)executeQuery:(FSTQuery *)query { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"ExecuteQuery"]; - FSTDocumentDictionary *result = [self.localDocuments documentsMatchingQuery:query]; - [self.persistence commitGroup:group]; - return result; + return self.persistence.run("ExecuteQuery", [&]() -> FSTDocumentDictionary * { + return [self.localDocuments documentsMatchingQuery:query]; + }); } - (FSTDocumentKeySet *)remoteDocumentKeysForTarget:(FSTTargetID)targetID { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"RemoteDocumentKeysForTarget"]; - FSTDocumentKeySet *keySet = [self.queryCache matchingKeysForTargetID:targetID]; - [self.persistence commitGroup:group]; - return keySet; + return self.persistence.run("RemoteDocumentKeysForTarget", [&]() -> FSTDocumentKeySet * { + return [self.queryCache matchingKeysForTargetID:targetID]; + }); } - (void)collectGarbage { - FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Garbage Collection"]; - // Call collectGarbage regardless of whether isGCEnabled so the referenceSet doesn't continue to - // accumulate the garbage keys. - std::set garbage = [self.garbageCollector collectGarbage]; - if (garbage.size() > 0) { - for (const DocumentKey &key : garbage) { - [self.remoteDocumentCache removeEntryForKey:key]; + self.persistence.run("Garbage Collection", [&]() { + // Call collectGarbage regardless of whether isGCEnabled so the referenceSet doesn't continue to + // accumulate the garbage keys. + std::set garbage = [self.garbageCollector collectGarbage]; + if (garbage.size() > 0) { + for (const DocumentKey &key : garbage) { + [self.remoteDocumentCache removeEntryForKey:key]; + } } - } - [self.persistence commitGroup:group]; + }); } /** @@ -481,9 +464,8 @@ NS_ASSUME_NONNULL_BEGIN * * @return the set of keys of docs that were modified by those writes. */ -- (FSTDocumentKeySet *)releaseHeldBatchResultsWithGroup:(FSTWriteGroup *)group - remoteDocuments: - (FSTRemoteDocumentChangeBuffer *)remoteDocuments { +- (FSTDocumentKeySet *)releaseHeldBatchResultsWithRemoteDocuments: + (FSTRemoteDocumentChangeBuffer *)remoteDocuments { NSMutableArray *toRelease = [NSMutableArray array]; for (FSTMutationBatchResult *batchResult in self.heldBatchResults) { if (![self isRemoteUpToVersion:batchResult.commitVersion]) { @@ -496,7 +478,7 @@ NS_ASSUME_NONNULL_BEGIN return [FSTDocumentKeySet keySet]; } else { [self.heldBatchResults removeObjectsInRange:NSMakeRange(0, toRelease.count)]; - return [self releaseBatchResults:toRelease group:group remoteDocuments:remoteDocuments]; + return [self releaseBatchResults:toRelease remoteDocuments:remoteDocuments]; } } @@ -512,7 +494,6 @@ NS_ASSUME_NONNULL_BEGIN } - (FSTDocumentKeySet *)releaseBatchResults:(NSArray *)batchResults - group:(FSTWriteGroup *)group remoteDocuments:(FSTRemoteDocumentChangeBuffer *)remoteDocuments { NSMutableArray *batches = [NSMutableArray array]; for (FSTMutationBatchResult *batchResult in batchResults) { @@ -520,16 +501,15 @@ NS_ASSUME_NONNULL_BEGIN [batches addObject:batchResult.batch]; } - return [self removeMutationBatches:batches group:group]; + return [self removeMutationBatches:batches]; } -- (FSTDocumentKeySet *)removeMutationBatch:(FSTMutationBatch *)batch group:(FSTWriteGroup *)group { - return [self removeMutationBatches:@[ batch ] group:group]; +- (FSTDocumentKeySet *)removeMutationBatch:(FSTMutationBatch *)batch { + return [self removeMutationBatches:@[ batch ]]; } /** Removes all the mutation batches named in the given array. */ -- (FSTDocumentKeySet *)removeMutationBatches:(NSArray *)batches - group:(FSTWriteGroup *)group { +- (FSTDocumentKeySet *)removeMutationBatches:(NSArray *)batches { // TODO(klimt): Could this be an NSMutableDictionary? __block FSTDocumentKeySet *affectedDocs = [FSTDocumentKeySet keySet]; -- cgit v1.2.3