aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/Source/Local/FSTLocalStore.mm
diff options
context:
space:
mode:
authorGravatar Greg Soltis <gsoltis@google.com>2018-03-30 10:18:25 -0700
committerGravatar GitHub <noreply@github.com>2018-03-30 10:18:25 -0700
commit3f36f5467de4c191fa2903743e5a210420e9d49a (patch)
treee73fa2cd4fa299d6220e4ab04daab515ceeca89a /Firestore/Source/Local/FSTLocalStore.mm
parentea490a2c6492e41e892397e044477f778ce358b8 (diff)
Drop FSTWriteGroup (#986)
* Drop write group from remote document change buffer * Unwind some group dependendencies in local store * Write group dropped from local store * Drop write group from mutation queue tests * Drop write group usage from query cache tests * Drop write groups from remote document cache tests * Drop write groups from remote document change buffer tests * Drop write groups and the write group tracker * Style * Put the action in transaction * Merge master, fix test * Fix some compiler warnings but mostly trigger travis * Responses to feedback
Diffstat (limited to 'Firestore/Source/Local/FSTLocalStore.mm')
-rw-r--r--Firestore/Source/Local/FSTLocalStore.mm472
1 files changed, 226 insertions, 246 deletions
diff --git a/Firestore/Source/Local/FSTLocalStore.mm b/Firestore/Source/Local/FSTLocalStore.mm
index 74eab48..412ade2 100644
--- a/Firestore/Source/Local/FSTLocalStore.mm
+++ b/Firestore/Source/Local/FSTLocalStore.mm
@@ -31,7 +31,6 @@
#import "Firestore/Source/Local/FSTReferenceSet.h"
#import "Firestore/Source/Local/FSTRemoteDocumentCache.h"
#import "Firestore/Source/Local/FSTRemoteDocumentChangeBuffer.h"
-#import "Firestore/Source/Local/FSTWriteGroup.h"
#import "Firestore/Source/Model/FSTDocument.h"
#import "Firestore/Source/Model/FSTDocumentDictionary.h"
#import "Firestore/Source/Model/FSTMutation.h"
@@ -132,27 +131,27 @@ NS_ASSUME_NONNULL_BEGIN
}
- (void)startMutationQueue {
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Start MutationQueue"];
- [self.mutationQueue start];
-
- // If we have any leftover mutation batch results from a prior run, just drop them.
- // TODO(http://b/33446471): We probably need to repopulate heldBatchResults or similar instead,
- // but that is not straightforward since we're not persisting the write ack versions.
- [self.heldBatchResults removeAllObjects];
-
- // TODO(mikelehen): This is the only usage of getAllMutationBatchesThroughBatchId:. Consider
- // removing it in favor of a getAcknowledgedBatches method.
- FSTBatchID highestAck = [self.mutationQueue highestAcknowledgedBatchID];
- if (highestAck != kFSTBatchIDUnknown) {
- NSArray<FSTMutationBatch *> *batches =
- [self.mutationQueue allMutationBatchesThroughBatchID:highestAck];
- if (batches.count > 0) {
- // NOTE: This could be more efficient if we had a removeBatchesThroughBatchID, but this set
- // should be very small and this code should go away eventually.
- [self.mutationQueue removeMutationBatches:batches];
+ self.persistence.run("Start MutationQueue", [&]() {
+ [self.mutationQueue start];
+
+ // If we have any leftover mutation batch results from a prior run, just drop them.
+ // TODO(http://b/33446471): We probably need to repopulate heldBatchResults or similar instead,
+ // but that is not straightforward since we're not persisting the write ack versions.
+ [self.heldBatchResults removeAllObjects];
+
+ // TODO(mikelehen): This is the only usage of getAllMutationBatchesThroughBatchId:. Consider
+ // removing it in favor of a getAcknowledgedBatches method.
+ FSTBatchID highestAck = [self.mutationQueue highestAcknowledgedBatchID];
+ if (highestAck != kFSTBatchIDUnknown) {
+ NSArray<FSTMutationBatch *> *batches =
+ [self.mutationQueue allMutationBatchesThroughBatchID:highestAck];
+ if (batches.count > 0) {
+ // NOTE: This could be more efficient if we had a removeBatchesThroughBatchID, but this set
+ // should be very small and this code should go away eventually.
+ [self.mutationQueue removeMutationBatches:batches];
+ }
}
- }
- [self.persistence commitGroup:group];
+ });
}
- (void)startQueryCache {
@@ -166,9 +165,9 @@ NS_ASSUME_NONNULL_BEGIN
- (FSTMaybeDocumentDictionary *)userDidChange:(const User &)user {
// Swap out the mutation queue, grabbing the pending mutation batches before and after.
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"OldBatches"];
- NSArray<FSTMutationBatch *> *oldBatches = [self.mutationQueue allMutationBatches];
- [self.persistence commitGroup:group];
+ NSArray<FSTMutationBatch *> *oldBatches = self.persistence.run(
+ "OldBatches",
+ [&]() -> NSArray<FSTMutationBatch *> * { return [self.mutationQueue allMutationBatches]; });
[self.garbageCollector removeGarbageSource:self.mutationQueue];
@@ -177,83 +176,79 @@ NS_ASSUME_NONNULL_BEGIN
[self startMutationQueue];
- group = [self.persistence startGroupWithAction:@"NewBatches"];
- NSArray<FSTMutationBatch *> *newBatches = [self.mutationQueue allMutationBatches];
-
- // Recreate our LocalDocumentsView using the new MutationQueue.
- self.localDocuments = [FSTLocalDocumentsView viewWithRemoteDocumentCache:self.remoteDocumentCache
- mutationQueue:self.mutationQueue];
-
- // Union the old/new changed keys.
- FSTDocumentKeySet *changedKeys = [FSTDocumentKeySet keySet];
- for (NSArray<FSTMutationBatch *> *batches in @[ oldBatches, newBatches ]) {
- for (FSTMutationBatch *batch in batches) {
- for (FSTMutation *mutation in batch.mutations) {
- changedKeys = [changedKeys setByAddingObject:mutation.key];
+ return self.persistence.run("NewBatches", [&]() -> FSTMaybeDocumentDictionary * {
+ NSArray<FSTMutationBatch *> *newBatches = [self.mutationQueue allMutationBatches];
+
+ // Recreate our LocalDocumentsView using the new MutationQueue.
+ self.localDocuments =
+ [FSTLocalDocumentsView viewWithRemoteDocumentCache:self.remoteDocumentCache
+ mutationQueue:self.mutationQueue];
+
+ // Union the old/new changed keys.
+ FSTDocumentKeySet *changedKeys = [FSTDocumentKeySet keySet];
+ for (NSArray<FSTMutationBatch *> *batches in @[ oldBatches, newBatches ]) {
+ for (FSTMutationBatch *batch in batches) {
+ for (FSTMutation *mutation in batch.mutations) {
+ changedKeys = [changedKeys setByAddingObject:mutation.key];
+ }
}
}
- }
- // Return the set of all (potentially) changed documents as the result of the user change.
- FSTMaybeDocumentDictionary *result = [self.localDocuments documentsForKeys:changedKeys];
- [self.persistence commitGroup:group];
- return result;
+ // Return the set of all (potentially) changed documents as the result of the user change.
+ return [self.localDocuments documentsForKeys:changedKeys];
+ });
}
- (FSTLocalWriteResult *)locallyWriteMutations:(NSArray<FSTMutation *> *)mutations {
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Locally write mutations"];
- FIRTimestamp *localWriteTime = [FIRTimestamp timestamp];
- FSTMutationBatch *batch =
- [self.mutationQueue addMutationBatchWithWriteTime:localWriteTime mutations:mutations];
- FSTDocumentKeySet *keys = [batch keys];
- FSTMaybeDocumentDictionary *changedDocuments = [self.localDocuments documentsForKeys:keys];
- [self.persistence commitGroup:group];
- return [FSTLocalWriteResult resultForBatchID:batch.batchID changes:changedDocuments];
+ return self.persistence.run("Locally write mutations", [&]() -> FSTLocalWriteResult * {
+ FIRTimestamp *localWriteTime = [FIRTimestamp timestamp];
+ FSTMutationBatch *batch =
+ [self.mutationQueue addMutationBatchWithWriteTime:localWriteTime mutations:mutations];
+ FSTDocumentKeySet *keys = [batch keys];
+ FSTMaybeDocumentDictionary *changedDocuments = [self.localDocuments documentsForKeys:keys];
+ return [FSTLocalWriteResult resultForBatchID:batch.batchID changes:changedDocuments];
+ });
}
- (FSTMaybeDocumentDictionary *)acknowledgeBatchWithResult:(FSTMutationBatchResult *)batchResult {
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Acknowledge batch"];
- id<FSTMutationQueue> mutationQueue = self.mutationQueue;
+ return self.persistence.run("Acknowledge batch", [&]() -> FSTMaybeDocumentDictionary * {
+ id<FSTMutationQueue> mutationQueue = self.mutationQueue;
- [mutationQueue acknowledgeBatch:batchResult.batch streamToken:batchResult.streamToken];
+ [mutationQueue acknowledgeBatch:batchResult.batch streamToken:batchResult.streamToken];
- FSTDocumentKeySet *affected;
- if ([self shouldHoldBatchResultWithVersion:batchResult.commitVersion]) {
- [self.heldBatchResults addObject:batchResult];
- affected = [FSTDocumentKeySet keySet];
- } else {
- FSTRemoteDocumentChangeBuffer *remoteDocuments =
- [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache];
+ FSTDocumentKeySet *affected;
+ if ([self shouldHoldBatchResultWithVersion:batchResult.commitVersion]) {
+ [self.heldBatchResults addObject:batchResult];
+ affected = [FSTDocumentKeySet keySet];
+ } else {
+ FSTRemoteDocumentChangeBuffer *remoteDocuments =
+ [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache];
- affected =
- [self releaseBatchResults:@[ batchResult ] group:group remoteDocuments:remoteDocuments];
+ affected = [self releaseBatchResults:@[ batchResult ] remoteDocuments:remoteDocuments];
- [remoteDocuments applyToWriteGroup:group];
- }
+ [remoteDocuments apply];
+ }
- [self.mutationQueue performConsistencyCheck];
+ [self.mutationQueue performConsistencyCheck];
- FSTMaybeDocumentDictionary *result = [self.localDocuments documentsForKeys:affected];
- [self.persistence commitGroup:group];
- return result;
+ return [self.localDocuments documentsForKeys:affected];
+ });
}
- (FSTMaybeDocumentDictionary *)rejectBatchID:(FSTBatchID)batchID {
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Reject batch"];
-
- FSTMutationBatch *toReject = [self.mutationQueue lookupMutationBatch:batchID];
- FSTAssert(toReject, @"Attempt to reject nonexistent batch!");
+ return self.persistence.run("Reject batch", [&]() -> FSTMaybeDocumentDictionary * {
+ FSTMutationBatch *toReject = [self.mutationQueue lookupMutationBatch:batchID];
+ FSTAssert(toReject, @"Attempt to reject nonexistent batch!");
- FSTBatchID lastAcked = [self.mutationQueue highestAcknowledgedBatchID];
- FSTAssert(batchID > lastAcked, @"Acknowledged batches can't be rejected.");
+ FSTBatchID lastAcked = [self.mutationQueue highestAcknowledgedBatchID];
+ FSTAssert(batchID > lastAcked, @"Acknowledged batches can't be rejected.");
- FSTDocumentKeySet *affected = [self removeMutationBatch:toReject group:group];
+ FSTDocumentKeySet *affected = [self removeMutationBatch:toReject];
- [self.mutationQueue performConsistencyCheck];
+ [self.mutationQueue performConsistencyCheck];
- FSTMaybeDocumentDictionary *result = [self.localDocuments documentsForKeys:affected];
- [self.persistence commitGroup:group];
- return result;
+ return [self.localDocuments documentsForKeys:affected];
+ });
}
- (nullable NSData *)lastStreamToken {
@@ -261,10 +256,8 @@ NS_ASSUME_NONNULL_BEGIN
}
- (void)setLastStreamToken:(nullable NSData *)streamToken {
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Set stream token"];
-
- [self.mutationQueue setLastStreamToken:streamToken];
- [self.persistence commitGroup:group];
+ self.persistence.run("Set stream token",
+ [&]() { [self.mutationQueue setLastStreamToken:streamToken]; });
}
- (FSTSnapshotVersion *)lastRemoteSnapshotVersion {
@@ -272,207 +265,197 @@ NS_ASSUME_NONNULL_BEGIN
}
- (FSTMaybeDocumentDictionary *)applyRemoteEvent:(FSTRemoteEvent *)remoteEvent {
- id<FSTQueryCache> queryCache = self.queryCache;
-
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Apply remote event"];
- FSTRemoteDocumentChangeBuffer *remoteDocuments =
- [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache];
+ return self.persistence.run("Apply remote event", [&]() -> FSTMaybeDocumentDictionary * {
+ id<FSTQueryCache> queryCache = self.queryCache;
- [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^(
- NSNumber *targetIDNumber, FSTTargetChange *change, BOOL *stop) {
- FSTTargetID targetID = targetIDNumber.intValue;
+ FSTRemoteDocumentChangeBuffer *remoteDocuments =
+ [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache];
- // Do not ref/unref unassigned targetIDs - it may lead to leaks.
- FSTQueryData *queryData = self.targetIDs[targetIDNumber];
- if (!queryData) {
- return;
- }
+ [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^(
+ NSNumber *targetIDNumber, FSTTargetChange *change, BOOL *stop) {
+ FSTTargetID targetID = targetIDNumber.intValue;
- FSTTargetMapping *mapping = change.mapping;
- if (mapping) {
- // First make sure that all references are deleted.
- if ([mapping isKindOfClass:[FSTResetMapping class]]) {
- FSTResetMapping *reset = (FSTResetMapping *)mapping;
- [queryCache removeMatchingKeysForTargetID:targetID];
- [queryCache addMatchingKeys:reset.documents forTargetID:targetID];
+ // Do not ref/unref unassigned targetIDs - it may lead to leaks.
+ FSTQueryData *queryData = self.targetIDs[targetIDNumber];
+ if (!queryData) {
+ return;
+ }
- } else if ([mapping isKindOfClass:[FSTUpdateMapping class]]) {
- FSTUpdateMapping *update = (FSTUpdateMapping *)mapping;
- [queryCache removeMatchingKeys:update.removedDocuments forTargetID:targetID];
- [queryCache addMatchingKeys:update.addedDocuments forTargetID:targetID];
+ FSTTargetMapping *mapping = change.mapping;
+ if (mapping) {
+ // First make sure that all references are deleted.
+ if ([mapping isKindOfClass:[FSTResetMapping class]]) {
+ FSTResetMapping *reset = (FSTResetMapping *)mapping;
+ [queryCache removeMatchingKeysForTargetID:targetID];
+ [queryCache addMatchingKeys:reset.documents forTargetID:targetID];
+
+ } else if ([mapping isKindOfClass:[FSTUpdateMapping class]]) {
+ FSTUpdateMapping *update = (FSTUpdateMapping *)mapping;
+ [queryCache removeMatchingKeys:update.removedDocuments forTargetID:targetID];
+ [queryCache addMatchingKeys:update.addedDocuments forTargetID:targetID];
+
+ } else {
+ FSTFail(@"Unknown mapping type: %@", mapping);
+ }
+ }
+ // Update the resume token if the change includes one. Don't clear any preexisting value.
+ NSData *resumeToken = change.resumeToken;
+ if (resumeToken.length > 0) {
+ queryData = [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion
+ resumeToken:resumeToken];
+ self.targetIDs[targetIDNumber] = queryData;
+ [self.queryCache updateQueryData:queryData];
+ }
+ }];
+
+ // TODO(klimt): This could probably be an NSMutableDictionary.
+ FSTDocumentKeySet *changedDocKeys = [FSTDocumentKeySet keySet];
+ for (const auto &kv : remoteEvent.documentUpdates) {
+ const DocumentKey &key = kv.first;
+ FSTMaybeDocument *doc = kv.second;
+ changedDocKeys = [changedDocKeys setByAddingObject:key];
+ FSTMaybeDocument *existingDoc = [remoteDocuments entryForKey:key];
+ // Make sure we don't apply an old document version to the remote cache, though we
+ // make an exception for [SnapshotVersion noVersion] which can happen for manufactured
+ // events (e.g. in the case of a limbo document resolution failing).
+ if (!existingDoc || [doc.version isEqual:[FSTSnapshotVersion noVersion]] ||
+ [doc.version compare:existingDoc.version] != NSOrderedAscending) {
+ [remoteDocuments addEntry:doc];
} else {
- FSTFail(@"Unknown mapping type: %@", mapping);
+ FSTLog(
+ @"FSTLocalStore Ignoring outdated watch update for %s. "
+ "Current version: %@ Watch version: %@",
+ key.ToString().c_str(), existingDoc.version, doc.version);
}
- }
- // Update the resume token if the change includes one. Don't clear any preexisting value.
- NSData *resumeToken = change.resumeToken;
- if (resumeToken.length > 0) {
- queryData = [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion
- resumeToken:resumeToken];
- self.targetIDs[targetIDNumber] = queryData;
- [self.queryCache updateQueryData:queryData];
+ // The document might be garbage because it was unreferenced by everything.
+ // Make sure to mark it as garbage if it is...
+ [self.garbageCollector addPotentialGarbageKey:key];
}
- }];
- // TODO(klimt): This could probably be an NSMutableDictionary.
- FSTDocumentKeySet *changedDocKeys = [FSTDocumentKeySet keySet];
- for (const auto &kv : remoteEvent.documentUpdates) {
- const DocumentKey &key = kv.first;
- FSTMaybeDocument *doc = kv.second;
- changedDocKeys = [changedDocKeys setByAddingObject:key];
- FSTMaybeDocument *existingDoc = [remoteDocuments entryForKey:key];
- // Make sure we don't apply an old document version to the remote cache, though we
- // make an exception for [SnapshotVersion noVersion] which can happen for manufactured
- // events (e.g. in the case of a limbo document resolution failing).
- if (!existingDoc || [doc.version isEqual:[FSTSnapshotVersion noVersion]] ||
- [doc.version compare:existingDoc.version] != NSOrderedAscending) {
- [remoteDocuments addEntry:doc];
- } else {
- FSTLog(
- @"FSTLocalStore Ignoring outdated watch update for %s. "
- "Current version: %@ Watch version: %@",
- key.ToString().c_str(), existingDoc.version, doc.version);
+ // HACK: The only reason we allow omitting snapshot version is so we can synthesize remote
+ // events when we get permission denied errors while trying to resolve the state of a locally
+ // cached document that is in limbo.
+ FSTSnapshotVersion *lastRemoteVersion = [self.queryCache lastRemoteSnapshotVersion];
+ FSTSnapshotVersion *remoteVersion = remoteEvent.snapshotVersion;
+ if (![remoteVersion isEqual:[FSTSnapshotVersion noVersion]]) {
+ FSTAssert([remoteVersion compare:lastRemoteVersion] != NSOrderedAscending,
+ @"Watch stream reverted to previous snapshot?? (%@ < %@)", remoteVersion,
+ lastRemoteVersion);
+ [self.queryCache setLastRemoteSnapshotVersion:remoteVersion];
}
- // The document might be garbage because it was unreferenced by everything.
- // Make sure to mark it as garbage if it is...
- [self.garbageCollector addPotentialGarbageKey:key];
- }
-
- // HACK: The only reason we allow omitting snapshot version is so we can synthesize remote events
- // when we get permission denied errors while trying to resolve the state of a locally cached
- // document that is in limbo.
- FSTSnapshotVersion *lastRemoteVersion = [self.queryCache lastRemoteSnapshotVersion];
- FSTSnapshotVersion *remoteVersion = remoteEvent.snapshotVersion;
- if (![remoteVersion isEqual:[FSTSnapshotVersion noVersion]]) {
- FSTAssert([remoteVersion compare:lastRemoteVersion] != NSOrderedAscending,
- @"Watch stream reverted to previous snapshot?? (%@ < %@)", remoteVersion,
- lastRemoteVersion);
- [self.queryCache setLastRemoteSnapshotVersion:remoteVersion];
- }
-
- FSTDocumentKeySet *releasedWriteKeys =
- [self releaseHeldBatchResultsWithGroup:group remoteDocuments:remoteDocuments];
+ FSTDocumentKeySet *releasedWriteKeys =
+ [self releaseHeldBatchResultsWithRemoteDocuments:remoteDocuments];
- [remoteDocuments applyToWriteGroup:group];
+ [remoteDocuments apply];
- // Union the two key sets.
- __block FSTDocumentKeySet *keysToRecalc = changedDocKeys;
- [releasedWriteKeys enumerateObjectsUsingBlock:^(FSTDocumentKey *key, BOOL *stop) {
- keysToRecalc = [keysToRecalc setByAddingObject:key];
- }];
+ // Union the two key sets.
+ __block FSTDocumentKeySet *keysToRecalc = changedDocKeys;
+ [releasedWriteKeys enumerateObjectsUsingBlock:^(FSTDocumentKey *key, BOOL *stop) {
+ keysToRecalc = [keysToRecalc setByAddingObject:key];
+ }];
- FSTMaybeDocumentDictionary *result = [self.localDocuments documentsForKeys:keysToRecalc];
- [self.persistence commitGroup:group];
- return result;
+ return [self.localDocuments documentsForKeys:keysToRecalc];
+ });
}
- (void)notifyLocalViewChanges:(NSArray<FSTLocalViewChanges *> *)viewChanges {
- FSTReferenceSet *localViewReferences = self.localViewReferences;
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"NotifyLocalViewChanges"];
- for (FSTLocalViewChanges *view in viewChanges) {
- FSTQueryData *queryData = [self.queryCache queryDataForQuery:view.query];
- FSTAssert(queryData, @"Local view changes contain unallocated query.");
- FSTTargetID targetID = queryData.targetID;
- [localViewReferences addReferencesToKeys:view.addedKeys forID:targetID];
- [localViewReferences removeReferencesToKeys:view.removedKeys forID:targetID];
- }
- [self.persistence commitGroup:group];
+ self.persistence.run("NotifyLocalViewChanges", [&]() {
+ FSTReferenceSet *localViewReferences = self.localViewReferences;
+ for (FSTLocalViewChanges *view in viewChanges) {
+ FSTQueryData *queryData = [self.queryCache queryDataForQuery:view.query];
+ FSTAssert(queryData, @"Local view changes contain unallocated query.");
+ FSTTargetID targetID = queryData.targetID;
+ [localViewReferences addReferencesToKeys:view.addedKeys forID:targetID];
+ [localViewReferences removeReferencesToKeys:view.removedKeys forID:targetID];
+ }
+ });
}
- (nullable FSTMutationBatch *)nextMutationBatchAfterBatchID:(FSTBatchID)batchID {
- FSTMutationBatch *result = self.persistence.run([&]() -> FSTMutationBatch * {
- return [self.mutationQueue nextMutationBatchAfterBatchID:batchID];
- });
+ FSTMutationBatch *result =
+ self.persistence.run("NextMutationBatchAfterBatchID", [&]() -> FSTMutationBatch * {
+ return [self.mutationQueue nextMutationBatchAfterBatchID:batchID];
+ });
return result;
}
- (nullable FSTMaybeDocument *)readDocument:(const DocumentKey &)key {
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"ReadDocument"];
- FSTMaybeDocument *result = [self.localDocuments documentForKey:key];
- [self.persistence commitGroup:group];
- return result;
+ return self.persistence.run("ReadDocument", [&]() -> FSTMaybeDocument *_Nullable {
+ return [self.localDocuments documentForKey:key];
+ });
}
- (FSTQueryData *)allocateQuery:(FSTQuery *)query {
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Allocate query"];
- FSTQueryData *cached = [self.queryCache queryDataForQuery:query];
- FSTTargetID targetID;
- FSTListenSequenceNumber sequenceNumber = [self.listenSequence next];
- if (cached) {
- // This query has been listened to previously, so reuse the previous targetID.
- // TODO(mcg): freshen last accessed date?
- targetID = cached.targetID;
- } else {
- targetID = _targetIDGenerator.NextId();
- cached = [[FSTQueryData alloc] initWithQuery:query
- targetID:targetID
- listenSequenceNumber:sequenceNumber
- purpose:FSTQueryPurposeListen];
- [self.queryCache addQueryData:cached];
- }
- [self.persistence commitGroup:group];
+ FSTQueryData *queryData = self.persistence.run("Allocate query", [&]() -> FSTQueryData * {
+ FSTQueryData *cached = [self.queryCache queryDataForQuery:query];
+ // TODO(mcg): freshen last accessed date if cached exists?
+ if (!cached) {
+ cached = [[FSTQueryData alloc] initWithQuery:query
+ targetID:_targetIDGenerator.NextId()
+ listenSequenceNumber:[self.listenSequence next]
+ purpose:FSTQueryPurposeListen];
+ [self.queryCache addQueryData:cached];
+ }
+ return cached;
+ });
// Sanity check to ensure that even when resuming a query it's not currently active.
- FSTBoxedTargetID *boxedTargetID = @(targetID);
+ FSTBoxedTargetID *boxedTargetID = @(queryData.targetID);
FSTAssert(!self.targetIDs[boxedTargetID], @"Tried to allocate an already allocated query: %@",
query);
- self.targetIDs[boxedTargetID] = cached;
- return cached;
+ self.targetIDs[boxedTargetID] = queryData;
+ return queryData;
}
- (void)releaseQuery:(FSTQuery *)query {
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Release query"];
-
- FSTQueryData *queryData = [self.queryCache queryDataForQuery:query];
- FSTAssert(queryData, @"Tried to release nonexistent query: %@", query);
-
- [self.localViewReferences removeReferencesForID:queryData.targetID];
- if (self.garbageCollector.isEager) {
- [self.queryCache removeQueryData:queryData];
- }
- [self.targetIDs removeObjectForKey:@(queryData.targetID)];
+ self.persistence.run("Release query", [&]() {
+ FSTQueryData *queryData = [self.queryCache queryDataForQuery:query];
+ FSTAssert(queryData, @"Tried to release nonexistent query: %@", query);
- // If this was the last watch target, then we won't get any more watch snapshots, so we should
- // release any held batch results.
- if ([self.targetIDs count] == 0) {
- FSTRemoteDocumentChangeBuffer *remoteDocuments =
- [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache];
+ [self.localViewReferences removeReferencesForID:queryData.targetID];
+ if (self.garbageCollector.isEager) {
+ [self.queryCache removeQueryData:queryData];
+ }
+ [self.targetIDs removeObjectForKey:@(queryData.targetID)];
- [self releaseHeldBatchResultsWithGroup:group remoteDocuments:remoteDocuments];
+ // If this was the last watch target, then we won't get any more watch snapshots, so we should
+ // release any held batch results.
+ if ([self.targetIDs count] == 0) {
+ FSTRemoteDocumentChangeBuffer *remoteDocuments =
+ [FSTRemoteDocumentChangeBuffer changeBufferWithCache:self.remoteDocumentCache];
- [remoteDocuments applyToWriteGroup:group];
- }
+ [self releaseHeldBatchResultsWithRemoteDocuments:remoteDocuments];
- [self.persistence commitGroup:group];
+ [remoteDocuments apply];
+ }
+ });
}
- (FSTDocumentDictionary *)executeQuery:(FSTQuery *)query {
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"ExecuteQuery"];
- FSTDocumentDictionary *result = [self.localDocuments documentsMatchingQuery:query];
- [self.persistence commitGroup:group];
- return result;
+ return self.persistence.run("ExecuteQuery", [&]() -> FSTDocumentDictionary * {
+ return [self.localDocuments documentsMatchingQuery:query];
+ });
}
- (FSTDocumentKeySet *)remoteDocumentKeysForTarget:(FSTTargetID)targetID {
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"RemoteDocumentKeysForTarget"];
- FSTDocumentKeySet *keySet = [self.queryCache matchingKeysForTargetID:targetID];
- [self.persistence commitGroup:group];
- return keySet;
+ return self.persistence.run("RemoteDocumentKeysForTarget", [&]() -> FSTDocumentKeySet * {
+ return [self.queryCache matchingKeysForTargetID:targetID];
+ });
}
- (void)collectGarbage {
- FSTWriteGroup *group = [self.persistence startGroupWithAction:@"Garbage Collection"];
- // Call collectGarbage regardless of whether isGCEnabled so the referenceSet doesn't continue to
- // accumulate the garbage keys.
- std::set<DocumentKey> garbage = [self.garbageCollector collectGarbage];
- if (garbage.size() > 0) {
- for (const DocumentKey &key : garbage) {
- [self.remoteDocumentCache removeEntryForKey:key];
+ self.persistence.run("Garbage Collection", [&]() {
+ // Call collectGarbage regardless of whether isGCEnabled so the referenceSet doesn't continue to
+ // accumulate the garbage keys.
+ std::set<DocumentKey> garbage = [self.garbageCollector collectGarbage];
+ if (garbage.size() > 0) {
+ for (const DocumentKey &key : garbage) {
+ [self.remoteDocumentCache removeEntryForKey:key];
+ }
}
- }
- [self.persistence commitGroup:group];
+ });
}
/**
@@ -481,9 +464,8 @@ NS_ASSUME_NONNULL_BEGIN
*
* @return the set of keys of docs that were modified by those writes.
*/
-- (FSTDocumentKeySet *)releaseHeldBatchResultsWithGroup:(FSTWriteGroup *)group
- remoteDocuments:
- (FSTRemoteDocumentChangeBuffer *)remoteDocuments {
+- (FSTDocumentKeySet *)releaseHeldBatchResultsWithRemoteDocuments:
+ (FSTRemoteDocumentChangeBuffer *)remoteDocuments {
NSMutableArray<FSTMutationBatchResult *> *toRelease = [NSMutableArray array];
for (FSTMutationBatchResult *batchResult in self.heldBatchResults) {
if (![self isRemoteUpToVersion:batchResult.commitVersion]) {
@@ -496,7 +478,7 @@ NS_ASSUME_NONNULL_BEGIN
return [FSTDocumentKeySet keySet];
} else {
[self.heldBatchResults removeObjectsInRange:NSMakeRange(0, toRelease.count)];
- return [self releaseBatchResults:toRelease group:group remoteDocuments:remoteDocuments];
+ return [self releaseBatchResults:toRelease remoteDocuments:remoteDocuments];
}
}
@@ -512,7 +494,6 @@ NS_ASSUME_NONNULL_BEGIN
}
- (FSTDocumentKeySet *)releaseBatchResults:(NSArray<FSTMutationBatchResult *> *)batchResults
- group:(FSTWriteGroup *)group
remoteDocuments:(FSTRemoteDocumentChangeBuffer *)remoteDocuments {
NSMutableArray<FSTMutationBatch *> *batches = [NSMutableArray array];
for (FSTMutationBatchResult *batchResult in batchResults) {
@@ -520,16 +501,15 @@ NS_ASSUME_NONNULL_BEGIN
[batches addObject:batchResult.batch];
}
- return [self removeMutationBatches:batches group:group];
+ return [self removeMutationBatches:batches];
}
-- (FSTDocumentKeySet *)removeMutationBatch:(FSTMutationBatch *)batch group:(FSTWriteGroup *)group {
- return [self removeMutationBatches:@[ batch ] group:group];
+- (FSTDocumentKeySet *)removeMutationBatch:(FSTMutationBatch *)batch {
+ return [self removeMutationBatches:@[ batch ]];
}
/** Removes all the mutation batches named in the given array. */
-- (FSTDocumentKeySet *)removeMutationBatches:(NSArray<FSTMutationBatch *> *)batches
- group:(FSTWriteGroup *)group {
+- (FSTDocumentKeySet *)removeMutationBatches:(NSArray<FSTMutationBatch *> *)batches {
// TODO(klimt): Could this be an NSMutableDictionary?
__block FSTDocumentKeySet *affectedDocs = [FSTDocumentKeySet keySet];