aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firebase/Database/Core/FSyncTree.m
diff options
context:
space:
mode:
Diffstat (limited to 'Firebase/Database/Core/FSyncTree.m')
-rw-r--r--Firebase/Database/Core/FSyncTree.m817
1 files changed, 817 insertions, 0 deletions
diff --git a/Firebase/Database/Core/FSyncTree.m b/Firebase/Database/Core/FSyncTree.m
new file mode 100644
index 0000000..37100c1
--- /dev/null
+++ b/Firebase/Database/Core/FSyncTree.m
@@ -0,0 +1,817 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import "FSyncTree.h"
+#import "FListenProvider.h"
+#import "FWriteTree.h"
+#import "FNode.h"
+#import "FPath.h"
+#import "FEventRegistration.h"
+#import "FImmutableTree.h"
+#import "FOperation.h"
+#import "FWriteTreeRef.h"
+#import "FOverwrite.h"
+#import "FOperationSource.h"
+#import "FMerge.h"
+#import "FAckUserWrite.h"
+#import "FView.h"
+#import "FSyncPoint.h"
+#import "FEmptyNode.h"
+#import "FQueryParams.h"
+#import "FQuerySpec.h"
+#import "FSnapshotHolder.h"
+#import "FChildrenNode.h"
+#import "FTupleRemovedQueriesEvents.h"
+#import "FAtomicNumber.h"
+#import "FEventRaiser.h"
+#import "FListenComplete.h"
+#import "FSnapshotUtilities.h"
+#import "FCacheNode.h"
+#import "FUtilities.h"
+#import "FCompoundWrite.h"
+#import "FWriteRecord.h"
+#import "FPersistenceManager.h"
+#import "FKeepSyncedEventRegistration.h"
+#import "FServerValues.h"
+#import "FCompoundHash.h"
+#import "FRangeMerge.h"
+
+// Size after which we start including the compound hash
+static const NSUInteger kFSizeThresholdForCompoundHash = 1024;
+
+@interface FListenContainer : NSObject<FSyncTreeHash>
+
+@property (nonatomic, strong) FView *view;
+@property (nonatomic, copy) fbt_nsarray_nsstring onComplete;
+
+@end
+
+@implementation FListenContainer
+
+- (instancetype)initWithView:(FView *)view onComplete:(fbt_nsarray_nsstring)onComplete {
+ self = [super init];
+ if (self != nil) {
+ self->_view = view;
+ self->_onComplete = onComplete;
+ }
+ return self;
+}
+
+- (id<FNode>)serverCache {
+ return self.view.serverCache;
+}
+
+- (FCompoundHash *)compoundHash {
+ return [FCompoundHash fromNode:[self serverCache]];
+}
+
+- (NSString *)simpleHash {
+ return [[self serverCache] dataHash];
+}
+
+- (BOOL)includeCompoundHash {
+ return [FSnapshotUtilities estimateSerializedNodeSize:[self serverCache]] > kFSizeThresholdForCompoundHash;
+}
+
+@end
+
+@interface FSyncTree ()
+
+/**
+* Tree of SyncPoints. There's a SyncPoint at any location that has 1 or more views.
+*/
+@property (nonatomic, strong) FImmutableTree *syncPointTree;
+
+/**
+* A tree of all pending user writes (user-initiated set, transactions, updates, etc)
+*/
+@property (nonatomic, strong) FWriteTree *pendingWriteTree;
+
+/**
+* Maps tagId -> FTuplePathQueryParams
+*/
+@property (nonatomic, strong) NSMutableDictionary *tagToQueryMap;
+@property (nonatomic, strong) NSMutableDictionary *queryToTagMap;
+@property (nonatomic, strong) FListenProvider *listenProvider;
+@property (nonatomic, strong) FPersistenceManager *persistenceManager;
+@property (nonatomic, strong) FAtomicNumber *queryTagCounter;
+@property (nonatomic, strong) NSMutableSet *keepSyncedQueries;
+
+@end
+
+/**
+* SyncTree is the central class for managing event callback registration, data caching, views
+* (query processing), and event generation. There are typically two SyncTree instances for
+* each Repo, one for the normal Firebase data, and one for the .info data.
+*
+* It has a number of responsibilities, including:
+* - Tracking all user event callbacks (registered via addEventRegistration: and removeEventRegistration:).
+* - Applying and caching data changes for user setValue:, runTransactionBlock:, and updateChildValues: calls
+* (applyUserOverwriteAtPath:, applyUserMergeAtPath:).
+* - Applying and caching data changes for server data changes (applyServerOverwriteAtPath:,
+* applyServerMergeAtPath:).
+* - Generating user-facing events for server and user changes (all of the apply* methods
+* return the set of events that need to be raised as a result).
+* - Maintaining the appropriate set of server listens to ensure we are always subscribed
+* to the correct set of paths and queries to satisfy the current set of user event
+* callbacks (listens are started/stopped using the provided listenProvider).
+*
+* NOTE: Although SyncTree tracks event callbacks and calculates events to raise, the actual
+* events are returned to the caller rather than raised synchronously.
+*/
+@implementation FSyncTree
+
+- (id) initWithListenProvider:(FListenProvider *)provider {
+ return [self initWithPersistenceManager:nil listenProvider:provider];
+}
+
+- (id) initWithPersistenceManager:(FPersistenceManager *)persistenceManager listenProvider:(FListenProvider *)provider {
+ self = [super init];
+ if (self) {
+ self.syncPointTree = [FImmutableTree empty];
+ self.pendingWriteTree = [[FWriteTree alloc] init];
+ self.tagToQueryMap = [[NSMutableDictionary alloc] init];
+ self.queryToTagMap = [[NSMutableDictionary alloc] init];
+ self.listenProvider = provider;
+ self.persistenceManager = persistenceManager;
+ self.queryTagCounter = [[FAtomicNumber alloc] init];
+ self.keepSyncedQueries = [NSMutableSet set];
+ }
+ return self;
+}
+
+#pragma mark -
+#pragma mark Apply Operations
+
+/**
+* Apply data changes for a user-generated setValue: runTransactionBlock: updateChildValues:, etc.
+* @return NSArray of FEvent to raise.
+*/
+- (NSArray *) applyUserOverwriteAtPath:(FPath *)path newData:(id <FNode>)newData writeId:(NSInteger)writeId isVisible:(BOOL)visible {
+ // Record pending write
+ [self.pendingWriteTree addOverwriteAtPath:path newData:newData writeId:writeId isVisible:visible];
+ if (!visible) {
+ return @[];
+ } else {
+ FOverwrite *operation = [[FOverwrite alloc] initWithSource:[FOperationSource userInstance] path:path snap:newData];
+ return [self applyOperationToSyncPoints:operation];
+ }
+}
+
+/**
+* Apply the data from a user-generated updateChildValues: call
+* @return NSArray of FEvent to raise.
+*/
+- (NSArray *) applyUserMergeAtPath:(FPath *)path changedChildren:(FCompoundWrite *)changedChildren writeId:(NSInteger)writeId {
+ // Record pending merge
+ [self.pendingWriteTree addMergeAtPath:path changedChildren:changedChildren writeId:writeId];
+
+ FMerge *operation = [[FMerge alloc] initWithSource:[FOperationSource userInstance] path:path children:changedChildren];
+ return [self applyOperationToSyncPoints:operation];
+}
+
+/**
+ * Acknowledge a pending user write that was previously registered with applyUserOverwriteAtPath: or applyUserMergeAtPath:
+ * TODO[offline]: Taking a serverClock here is awkward, but server values are awkward. :-(
+ * @return NSArray of FEvent to raise.
+ */
+- (NSArray *) ackUserWriteWithWriteId:(NSInteger)writeId revert:(BOOL)revert persist:(BOOL)persist clock:(id<FClock>)clock {
+ FWriteRecord *write = [self.pendingWriteTree writeForId:writeId];
+ BOOL needToReevaluate = [self.pendingWriteTree removeWriteId:writeId];
+ if (write.visible) {
+ if (persist) {
+ [self.persistenceManager removeUserWrite:writeId];
+ }
+ if (!revert) {
+ NSDictionary *serverValues = [FServerValues generateServerValues:clock];
+ if ([write isOverwrite]) {
+ id<FNode> resolvedNode = [FServerValues resolveDeferredValueSnapshot:write.overwrite withServerValues:serverValues];
+ [self.persistenceManager applyUserWrite:resolvedNode toServerCacheAtPath:write.path];
+ } else {
+ FCompoundWrite *resolvedMerge = [FServerValues resolveDeferredValueCompoundWrite:write.merge withServerValues:serverValues];
+ [self.persistenceManager applyUserMerge:resolvedMerge toServerCacheAtPath:write.path];
+ }
+ }
+ }
+ if (!needToReevaluate) {
+ return @[];
+ } else {
+ __block FImmutableTree *affectedTree = [FImmutableTree empty];
+ if (write.isOverwrite) {
+ affectedTree = [affectedTree setValue:@YES atPath:[FPath empty]];
+ } else {
+ [write.merge enumerateWrites:^(FPath *path, id <FNode> node, BOOL *stop) {
+ affectedTree = [affectedTree setValue:@YES atPath:path];
+ }];
+ }
+ FAckUserWrite *operation = [[FAckUserWrite alloc] initWithPath:write.path affectedTree:affectedTree revert:revert];
+ return [self applyOperationToSyncPoints:operation];
+ }
+}
+
+/**
+* Apply new server data for the specified path
+* @return NSArray of FEvent to raise.
+*/
+- (NSArray *) applyServerOverwriteAtPath:(FPath *)path newData:(id <FNode>)newData {
+ [self.persistenceManager updateServerCacheWithNode:newData forQuery:[FQuerySpec defaultQueryAtPath:path]];
+ FOverwrite *operation = [[FOverwrite alloc] initWithSource:[FOperationSource serverInstance] path:path snap:newData];
+ return [self applyOperationToSyncPoints:operation];
+}
+
+/**
+* Applied new server data to be merged in at the specified path
+* @return NSArray of FEvent to raise.
+*/
+- (NSArray *) applyServerMergeAtPath:(FPath *)path changedChildren:(FCompoundWrite *)changedChildren {
+ [self.persistenceManager updateServerCacheWithMerge:changedChildren atPath:path];
+ FMerge *operation = [[FMerge alloc] initWithSource:[FOperationSource serverInstance] path:path children:changedChildren];
+ return [self applyOperationToSyncPoints:operation];
+}
+
+- (NSArray *) applyServerRangeMergeAtPath:(FPath *)path updates:(NSArray *)ranges {
+ FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
+ if (syncPoint == nil) {
+ // Removed view, so it's safe to just ignore this update
+ return @[];
+ } else {
+ // This could be for any "complete" (unfiltered) view, and if there is more than one complete view, they should
+ // each have the same cache so it doesn't matter which one we use.
+ FView *view = [syncPoint completeView];
+ if (view != nil) {
+ id<FNode> serverNode = [view serverCache];
+ for (FRangeMerge *merge in ranges) {
+ serverNode = [merge applyToNode:serverNode];
+ }
+ return [self applyServerOverwriteAtPath:path newData:serverNode];
+ } else {
+ // There doesn't exist a view for this update, so it was removed and it's safe to just ignore this range
+ // merge
+ return @[];
+ }
+ }
+}
+
+/**
+* Apply a listen complete to a path
+* @return NSArray of FEvent to raise.
+*/
+- (NSArray *) applyListenCompleteAtPath:(FPath *)path {
+ [self.persistenceManager setQueryComplete:[FQuerySpec defaultQueryAtPath:path]];
+ id<FOperation> operation = [[FListenComplete alloc] initWithSource:[FOperationSource serverInstance] path:path];
+ return [self applyOperationToSyncPoints:operation];
+}
+
+/**
+* Apply a listen complete to a path
+* @return NSArray of FEvent to raise.
+*/
+- (NSArray *) applyTaggedListenCompleteAtPath:(FPath *)path tagId:(NSNumber *)tagId {
+ FQuerySpec *query = [self queryForTag:tagId];
+ if (query != nil) {
+ [self.persistenceManager setQueryComplete:query];
+ FPath *relativePath = [FPath relativePathFrom:query.path to:path];
+ id<FOperation> op = [[FListenComplete alloc] initWithSource:[FOperationSource forServerTaggedQuery:query.params]
+ path:relativePath];
+ return [self applyTaggedOperation:op atPath:query.path];
+ } else {
+ // We've already removed the query. No big deal, ignore the update.
+ return @[];
+ }
+}
+
+/**
+* Internal helper method to apply tagged operation
+*/
+- (NSArray *) applyTaggedOperation:(id<FOperation>)operation atPath:(FPath *)path {
+ FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
+ NSAssert(syncPoint != nil, @"Missing sync point for query tag that we're tracking.");
+ FWriteTreeRef *writesCache = [self.pendingWriteTree childWritesForPath:path];
+ return [syncPoint applyOperation:operation writesCache:writesCache serverCache:nil];
+}
+
+/**
+* Apply new server data for the specified tagged query
+* @return NSArray of FEvent to raise.
+*/
+- (NSArray *) applyTaggedQueryOverwriteAtPath:(FPath *)path newData:(id <FNode>)newData tagId:(NSNumber *)tagId {
+ FQuerySpec *query = [self queryForTag:tagId];
+ if (query != nil) {
+ FPath *relativePath = [FPath relativePathFrom:query.path to:path];
+ FQuerySpec *queryToOverwrite = relativePath.isEmpty ? query : [FQuerySpec defaultQueryAtPath:path];
+ [self.persistenceManager updateServerCacheWithNode:newData forQuery:queryToOverwrite];
+ FOverwrite *operation = [[FOverwrite alloc] initWithSource:[FOperationSource forServerTaggedQuery:query.params]
+ path:relativePath snap:newData];
+ return [self applyTaggedOperation:operation atPath:query.path];
+ } else {
+ // Query must have been removed already
+ return @[];
+ }
+}
+
+/**
+* Apply server data to be merged in for the specified tagged query
+* @return NSArray of FEvent to raise.
+*/
+- (NSArray *) applyTaggedQueryMergeAtPath:(FPath *)path changedChildren:(FCompoundWrite *)changedChildren tagId:(NSNumber *)tagId {
+ FQuerySpec *query = [self queryForTag:tagId];
+ if (query != nil) {
+ FPath *relativePath = [FPath relativePathFrom:query.path to:path];
+ [self.persistenceManager updateServerCacheWithMerge:changedChildren atPath:path];
+ FMerge *operation = [[FMerge alloc] initWithSource:[FOperationSource forServerTaggedQuery:query.params]
+ path:relativePath
+ children:changedChildren];
+ return [self applyTaggedOperation:operation atPath:query.path];
+ } else {
+ // We've already removed the query. No big deal, ignore the update.
+ return @[];
+ }
+}
+
+- (NSArray *) applyTaggedServerRangeMergeAtPath:(FPath *)path updates:(NSArray *)ranges tagId:(NSNumber *)tagId {
+ FQuerySpec *query = [self queryForTag:tagId];
+ if (query != nil) {
+ NSAssert([path isEqual:query.path], @"Tagged update path and query path must match");
+ FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
+ NSAssert(syncPoint != nil, @"Missing sync point for query tag that we're tracking.");
+ FView *view = [syncPoint viewForQuery:query];
+ NSAssert(view != nil, @"Missing view for query tag that we're tracking");
+ id<FNode> serverNode = [view serverCache];
+ for (FRangeMerge *merge in ranges) {
+ serverNode = [merge applyToNode:serverNode];
+ }
+ return [self applyTaggedQueryOverwriteAtPath:path newData:serverNode tagId:tagId];
+ } else {
+ // We've already removed the query. No big deal, ignore the update.
+ return @[];
+ }
+}
+
+/**
+* Add an event callback for the specified query
+* @return NSArray of FEvent to raise.
+*/
+- (NSArray *) addEventRegistration:(id<FEventRegistration>)eventRegistration forQuery:(FQuerySpec *)query {
+ FPath *path = query.path;
+
+ __block BOOL foundAncestorDefaultView = NO;
+ [self.syncPointTree forEachOnPath:query.path whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
+ foundAncestorDefaultView = foundAncestorDefaultView || [syncPoint hasCompleteView];
+ return !foundAncestorDefaultView;
+ }];
+
+ [self.persistenceManager setQueryActive:query];
+
+ FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
+ if (syncPoint == nil) {
+ syncPoint = [[FSyncPoint alloc] initWithPersistenceManager:self.persistenceManager];
+ self.syncPointTree = [self.syncPointTree setValue:syncPoint atPath:path];
+ }
+
+ BOOL viewAlreadyExists = [syncPoint viewExistsForQuery:query];
+ NSArray *events;
+ if (viewAlreadyExists) {
+ events = [syncPoint addEventRegistration:eventRegistration forExistingViewForQuery:query];
+ } else {
+ if (![query loadsAllData]) {
+ // We need to track a tag for this query
+ NSAssert(self.queryToTagMap[query] == nil, @"View does not exist, but we have a tag");
+ NSNumber *tagId = [self.queryTagCounter getAndIncrement];
+ self.queryToTagMap[query] = tagId;
+ self.tagToQueryMap[tagId] = query;
+ }
+
+ FWriteTreeRef *writesCache = [self.pendingWriteTree childWritesForPath:path];
+ FCacheNode *serverCache = [self serverCacheForQuery:query];
+ events = [syncPoint addEventRegistration:eventRegistration
+ forNonExistingViewForQuery:query
+ writesCache:writesCache
+ serverCache:serverCache];
+
+ // There was no view and no default listen
+ if (!foundAncestorDefaultView) {
+ FView *view = [syncPoint viewForQuery:query];
+ NSMutableArray *mutableEvents = [events mutableCopy];
+ [mutableEvents addObjectsFromArray:[self setupListenerOnQuery:query view:view]];
+ events = mutableEvents;
+ }
+ }
+
+ return events;
+}
+
+- (FCacheNode *)serverCacheForQuery:(FQuerySpec *)query {
+ __block id<FNode> serverCacheNode = nil;
+
+ [self.syncPointTree forEachOnPath:query.path whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
+ FPath *relativePath = [FPath relativePathFrom:pathToSyncPoint to:query.path];
+ serverCacheNode = [syncPoint completeServerCacheAtPath:relativePath];
+ return serverCacheNode == nil;
+ }];
+
+ FCacheNode *serverCache;
+ if (serverCacheNode != nil) {
+ FIndexedNode *indexed = [FIndexedNode indexedNodeWithNode:serverCacheNode index:query.index];
+ serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed isFullyInitialized:YES isFiltered:NO];
+ } else {
+ FCacheNode *persistenceServerCache = [self.persistenceManager serverCacheForQuery:query];
+ if (persistenceServerCache.isFullyInitialized) {
+ serverCache = persistenceServerCache;
+ } else {
+ serverCacheNode = [FEmptyNode emptyNode];
+
+ FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:query.path];
+ [subtree forEachChild:^(NSString *childKey, FSyncPoint *childSyncPoint) {
+ id<FNode> completeCache = [childSyncPoint completeServerCacheAtPath:[FPath empty]];
+ if (completeCache) {
+ serverCacheNode = [serverCacheNode updateImmediateChild:childKey withNewChild:completeCache];
+ }
+ }];
+ // Fill the node with any available children we have
+ [persistenceServerCache.node enumerateChildrenUsingBlock:^(NSString *key, id<FNode> node, BOOL *stop) {
+ if (![serverCacheNode hasChild:key]) {
+ serverCacheNode = [serverCacheNode updateImmediateChild:key withNewChild:node];
+ }
+ }];
+ FIndexedNode *indexed = [FIndexedNode indexedNodeWithNode:serverCacheNode index:query.index];
+ serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed isFullyInitialized:NO isFiltered:NO];
+ }
+ }
+
+ return serverCache;
+}
+
+/**
+* Remove event callback(s).
+*
+* If query is the default query, we'll check all queries for the specified eventRegistration.
+* If eventRegistration is null, we'll remove all callbacks for the specified query/queries.
+*
+* @param eventRegistration if nil, all callbacks are removed
+* @param cancelError If provided, appropriate cancel events will be returned
+* @return NSArray of FEvent to raise.
+*/
+- (NSArray *) removeEventRegistration:(id <FEventRegistration>)eventRegistration
+ forQuery:(FQuerySpec *)query
+ cancelError:(NSError *)cancelError {
+ // Find the syncPoint first. Then deal with whether or not it has matching listeners
+ FPath *path = query.path;
+ FSyncPoint *maybeSyncPoint = [self.syncPointTree valueAtPath:path];
+ NSArray *cancelEvents = @[];
+
+ // A removal on a default query affects all queries at that location. A removal on an indexed query, even one without
+ // other query constraints, does *not* affect all queries at that location. So this check must be for 'default', and
+ // not loadsAllData:
+ if (maybeSyncPoint && ([query isDefault] || [maybeSyncPoint viewExistsForQuery:query])) {
+ FTupleRemovedQueriesEvents *removedAndEvents = [maybeSyncPoint removeEventRegistration:eventRegistration forQuery:query cancelError:cancelError];
+ if ([maybeSyncPoint isEmpty]) {
+ self.syncPointTree = [self.syncPointTree removeValueAtPath:path];
+ }
+ NSArray *removed = removedAndEvents.removedQueries;
+ cancelEvents = removedAndEvents.cancelEvents;
+
+ // We may have just removed one of many listeners and can short-circuit this whole process
+ // We may also not have removed a default listener, in which case all of the descendant listeners should already
+ // be properly set up.
+ //
+ // Since indexed queries can shadow if they don't have other query constraints, check for loadsAllData: instead
+ // of isDefault:
+ NSUInteger defaultQueryIndex = [removed indexOfObjectPassingTest:^BOOL(FQuerySpec *q, NSUInteger idx, BOOL *stop) {
+ return [q loadsAllData];
+ }];
+ BOOL removingDefault = defaultQueryIndex != NSNotFound;
+ [removed enumerateObjectsUsingBlock:^(FQuerySpec *query, NSUInteger idx, BOOL *stop) {
+ [self.persistenceManager setQueryInactive:query];
+ }];
+ NSNumber *covered = [self.syncPointTree findOnPath:path andApplyBlock:^id(FPath *relativePath, FSyncPoint *parentSyncPoint) {
+ return [NSNumber numberWithBool:[parentSyncPoint hasCompleteView]];
+ }];
+
+ if (removingDefault && ![covered boolValue]) {
+ FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
+ // There are potentially child listeners. Determine what if any listens we need to send before executing
+ // the removal
+ if (![subtree isEmpty]) {
+ // We need to fold over our subtree and collect the listeners to send
+ NSArray *newViews = [self collectDistinctViewsForSubTree:subtree];
+
+ // Ok, we've collected all the listens we need. Set them up.
+ [newViews enumerateObjectsUsingBlock:^(FView *view, NSUInteger idx, BOOL *stop) {
+ FQuerySpec *newQuery = view.query;
+ FListenContainer *listenContainer = [self createListenerForView:view];
+ self.listenProvider.startListening([self queryForListening:newQuery], [self tagForQuery:newQuery],
+ listenContainer, listenContainer.onComplete);
+ }];
+ } else {
+ // There's nothing below us, so nothing we need to start listening on
+ }
+ }
+
+ // If we removed anything and we're not covered by a higher up listen, we need to stop listening on this query.
+ // The above block has us covered in terms of making sure we're set up on listens lower in the tree.
+ // Also, note that if we have a cancelError, it's already been removed at the provider level.
+ if (![covered boolValue] && [removed count] > 0 && cancelError == nil) {
+ // If we removed a default, then we weren't listening on any of the other queries here. Just cancel the one
+ // default. Otherwise, we need to iterate through and cancel each individual query
+ if (removingDefault) {
+ // We don't tag default listeners
+ self.listenProvider.stopListening([self queryForListening:query], nil);
+ } else {
+ [removed enumerateObjectsUsingBlock:^(FQuerySpec *queryToRemove, NSUInteger idx, BOOL *stop) {
+ NSNumber *tagToRemove = [self.queryToTagMap objectForKey:queryToRemove];
+ self.listenProvider.stopListening([self queryForListening:queryToRemove], tagToRemove);
+ }];
+ }
+ }
+ // Now, clear all the tags we're tracking for the removed listens.
+ [self removeTags:removed];
+ } else {
+ // No-op, this listener must've been already removed
+ }
+ return cancelEvents;
+}
+
+- (void)keepQuery:(FQuerySpec *)query synced:(BOOL)keepSynced {
+ // Only do something if we actually need to add/remove an event registration
+ if (keepSynced && ![self.keepSyncedQueries containsObject:query]) {
+ [self addEventRegistration:[FKeepSyncedEventRegistration instance] forQuery:query];
+ [self.keepSyncedQueries addObject:query];
+ } else if (!keepSynced && [self.keepSyncedQueries containsObject:query]) {
+ [self removeEventRegistration:[FKeepSyncedEventRegistration instance] forQuery:query cancelError:nil];
+ [self.keepSyncedQueries removeObject:query];
+ }
+}
+
+- (NSArray *) removeAllWrites {
+ [self.persistenceManager removeAllUserWrites];
+ NSArray *removedWrites = [self.pendingWriteTree removeAllWrites];
+ if (removedWrites.count > 0) {
+ FImmutableTree *affectedTree = [[FImmutableTree empty] setValue:@YES atPath:[FPath empty]];
+ return [self applyOperationToSyncPoints:[[FAckUserWrite alloc] initWithPath:[FPath empty]
+ affectedTree:affectedTree revert:YES]];
+ } else {
+ return @[];
+ }
+}
+
+/**
+* Returns a complete cache, if we have one, of the data at a particular path. The location must have a listener above
+* it, but as this is only used by transaction code, that should always be the case anyways.
+*
+* Note: this method will *include* hidden writes from transaction with applyLocally set to false.
+* @param path The path to the data we want
+* @param writeIdsToExclude A specific set to be excluded
+*/
+- (id <FNode>) calcCompleteEventCacheAtPath:(FPath *)path excludeWriteIds:(NSArray *)writeIdsToExclude {
+ BOOL includeHiddenSets = YES;
+ FWriteTree *writeTree = self.pendingWriteTree;
+ id<FNode> serverCache = [self.syncPointTree findOnPath:path andApplyBlock:^id<FNode>(FPath *pathSoFar, FSyncPoint *syncPoint) {
+ FPath *relativePath = [FPath relativePathFrom:pathSoFar to:path];
+ id<FNode> serverCache = [syncPoint completeServerCacheAtPath:relativePath];
+ if (serverCache) {
+ return serverCache;
+ } else {
+ return nil;
+ }
+ }];
+ return [writeTree calculateCompleteEventCacheAtPath:path completeServerCache:serverCache excludeWriteIds:writeIdsToExclude includeHiddenWrites:includeHiddenSets];
+}
+
+#pragma mark -
+#pragma mark Private Methods
+/**
+* This collapses multiple unfiltered views into a single view, since we only need a single
+* listener for them.
+* @return NSArray of FView
+*/
+- (NSArray *) collectDistinctViewsForSubTree:(FImmutableTree *)subtree {
+ return [subtree foldWithBlock:^NSArray *(FPath *relativePath, FSyncPoint *maybeChildSyncPoint, NSDictionary *childMap) {
+ if (maybeChildSyncPoint && [maybeChildSyncPoint hasCompleteView]) {
+ FView *completeView = [maybeChildSyncPoint completeView];
+ return @[completeView];
+ } else {
+ // No complete view here, flatten any deeper listens into an array
+ NSMutableArray *views = [[NSMutableArray alloc] init];
+ if (maybeChildSyncPoint) {
+ views = [[maybeChildSyncPoint queryViews] mutableCopy];
+ }
+ [childMap enumerateKeysAndObjectsUsingBlock:^(NSString *childKey, NSArray *childViews, BOOL *stop) {
+ [views addObjectsFromArray:childViews];
+ }];
+ return views;
+ }
+ }];
+}
+
+/**
+* @param queries NSArray of FQuerySpec
+*/
+- (void) removeTags:(NSArray *)queries {
+ [queries enumerateObjectsUsingBlock:^(FQuerySpec *removedQuery, NSUInteger idx, BOOL *stop) {
+ if (![removedQuery loadsAllData]) {
+ // We should have a tag for this
+ NSNumber *removedQueryTag = self.queryToTagMap[removedQuery];
+ [self.queryToTagMap removeObjectForKey:removedQuery];
+ [self.tagToQueryMap removeObjectForKey:removedQueryTag];
+ }
+ }];
+}
+
+- (FQuerySpec *) queryForListening:(FQuerySpec *)query {
+ if (query.loadsAllData && !query.isDefault) {
+ // We treat queries that load all data as default queries
+ return [FQuerySpec defaultQueryAtPath:query.path];
+ } else {
+ return query;
+ }
+}
+
+/**
+* For a given new listen, manage the de-duplication of outstanding subscriptions.
+* @return NSArray of FEvent events to support synchronous data sources
+*/
+- (NSArray *) setupListenerOnQuery:(FQuerySpec *)query view:(FView *)view {
+ FPath *path = query.path;
+ NSNumber *tagId = [self tagForQuery:query];
+ FListenContainer *listenContainer = [self createListenerForView:view];
+
+ NSArray *events = self.listenProvider.startListening([self queryForListening:query], tagId, listenContainer,
+ listenContainer.onComplete);
+
+ FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
+ // The root of this subtree has our query. We're here because we definitely need to send a listen for that, but we
+ // may need to shadow other listens as well.
+ if (tagId != nil) {
+ NSAssert(![subtree.value hasCompleteView], @"If we're adding a query, it shouldn't be shadowed");
+ } else {
+ // Shadow everything at or below this location, this is a default listener.
+ NSArray *queriesToStop = [subtree foldWithBlock:^id(FPath *relativePath, FSyncPoint *maybeChildSyncPoint, NSDictionary *childMap) {
+ if (![relativePath isEmpty] && maybeChildSyncPoint != nil && [maybeChildSyncPoint hasCompleteView]) {
+ return @[[maybeChildSyncPoint completeView].query];
+ } else {
+ // No default listener here, flatten any deeper queries into an array
+ NSMutableArray *queries = [[NSMutableArray alloc] init];
+ if (maybeChildSyncPoint != nil) {
+ for (FView *view in [maybeChildSyncPoint queryViews]) {
+ [queries addObject:view.query];
+ }
+ }
+ [childMap enumerateKeysAndObjectsUsingBlock:^(NSString *key, NSArray *childQueries, BOOL *stop) {
+ [queries addObjectsFromArray:childQueries];
+ }];
+ return queries;
+ }
+ }];
+ for (FQuerySpec *queryToStop in queriesToStop) {
+ self.listenProvider.stopListening([self queryForListening:queryToStop], [self tagForQuery:queryToStop]);
+ }
+ }
+ return events;
+}
+
+- (FListenContainer *) createListenerForView:(FView *)view {
+ FQuerySpec *query = view.query;
+ NSNumber *tagId = [self tagForQuery:query];
+
+ FListenContainer *listenContainer = [[FListenContainer alloc] initWithView:view
+ onComplete:^(NSString *status) {
+ if ([status isEqualToString:@"ok"]) {
+ if (tagId != nil) {
+ return [self applyTaggedListenCompleteAtPath:query.path tagId:tagId];
+ } else {
+ return [self applyListenCompleteAtPath:query.path];
+ }
+ } else {
+ // If a listen failed, kill all of the listeners here, not just the one that triggered the error.
+ // Note that this may need to be scoped to just this listener if we change permissions on filtered children
+ NSError *error = [FUtilities errorForStatus:status andReason:nil];
+ FFWarn(@"I-RDB038012", @"Listener at %@ failed: %@", query.path, status);
+ return [self removeEventRegistration:nil forQuery:query cancelError:error];
+ }
+ }];
+
+ return listenContainer;
+}
+
+/**
+* @return The query associated with the given tag, if we have one
+*/
+- (FQuerySpec *) queryForTag:(NSNumber *)tagId {
+ return self.tagToQueryMap[tagId];
+}
+
+/**
+* @return The tag associated with the given query
+*/
+- (NSNumber *) tagForQuery:(FQuerySpec *)query {
+ return self.queryToTagMap[query];
+}
+
+#pragma mark -
+#pragma mark applyOperation Helpers
+
+/**
+* A helper method that visits all descendant and ancestor SyncPoints, applying the operation.
+*
+* NOTES:
+* - Descendant SyncPoints will be visited first (since we raise events depth-first).
+
+* - We call applyOperation: on each SyncPoint passing three things:
+* 1. A version of the Operation that has been made relative to the SyncPoint location.
+* 2. A WriteTreeRef of any writes we have cached at the SyncPoint location.
+* 3. A snapshot Node with cached server data, if we have it.
+
+* - We concatenate all of the events returned by each SyncPoint and return the result.
+*
+* @return Array of FEvent
+*/
+- (NSArray *) applyOperationToSyncPoints:(id<FOperation>)operation {
+ return [self applyOperationHelper:operation syncPointTree:self.syncPointTree serverCache:nil
+ writesCache:[self.pendingWriteTree childWritesForPath:[FPath empty]]];
+}
+
+/**
+* Recursive helper for applyOperationToSyncPoints_
+*/
+- (NSArray *) applyOperationHelper:(id<FOperation>)operation syncPointTree:(FImmutableTree *)syncPointTree
+ serverCache:(id<FNode>)serverCache writesCache:(FWriteTreeRef *)writesCache {
+ if ([operation.path isEmpty]) {
+ return [self applyOperationDescendantsHelper:operation syncPointTree:syncPointTree serverCache:serverCache writesCache:writesCache];
+ } else {
+ FSyncPoint *syncPoint = syncPointTree.value;
+
+ // If we don't have cached server data, see if we can get it from this SyncPoint
+ if (serverCache == nil && syncPoint != nil) {
+ serverCache = [syncPoint completeServerCacheAtPath:[FPath empty]];
+ }
+
+ NSMutableArray *events = [[NSMutableArray alloc] init];
+ NSString *childKey = [operation.path getFront];
+ id<FOperation> childOperation = [operation operationForChild:childKey];
+ FImmutableTree *childTree = [syncPointTree.children get:childKey];
+ if (childTree != nil && childOperation != nil) {
+ id<FNode> childServerCache = serverCache ? [serverCache getImmediateChild:childKey] : nil;
+ FWriteTreeRef *childWritesCache = [writesCache childWriteTreeRef:childKey];
+ [events addObjectsFromArray:[self applyOperationHelper:childOperation syncPointTree:childTree serverCache:childServerCache writesCache:childWritesCache]];
+ }
+
+ if (syncPoint) {
+ [events addObjectsFromArray:[syncPoint applyOperation:operation writesCache:writesCache serverCache:serverCache]];
+ }
+
+ return events;
+ }
+}
+
+/**
+* Recursive helper for applyOperationToSyncPoints:
+*/
+- (NSArray *) applyOperationDescendantsHelper:(id<FOperation>)operation syncPointTree:(FImmutableTree *)syncPointTree
+ serverCache:(id<FNode>)serverCache writesCache:(FWriteTreeRef *)writesCache {
+ FSyncPoint *syncPoint = syncPointTree.value;
+
+ // If we don't have cached server data, see if we can get it from this SyncPoint
+ id<FNode> resolvedServerCache;
+ if (serverCache == nil & syncPoint != nil) {
+ resolvedServerCache = [syncPoint completeServerCacheAtPath:[FPath empty]];
+ } else {
+ resolvedServerCache = serverCache;
+ }
+
+ NSMutableArray *events = [[NSMutableArray alloc] init];
+ [syncPointTree.children enumerateKeysAndObjectsUsingBlock:^(NSString *childKey, FImmutableTree *childTree, BOOL *stop) {
+ id<FNode> childServerCache = nil;
+ if (resolvedServerCache != nil) {
+ childServerCache = [resolvedServerCache getImmediateChild:childKey];
+ }
+ FWriteTreeRef *childWritesCache = [writesCache childWriteTreeRef:childKey];
+ id<FOperation> childOperation = [operation operationForChild:childKey];
+ if (childOperation != nil) {
+ [events addObjectsFromArray:[self applyOperationDescendantsHelper:childOperation
+ syncPointTree:childTree
+ serverCache:childServerCache
+ writesCache:childWritesCache]];
+ }
+ }];
+
+ if (syncPoint) {
+ [events addObjectsFromArray:[syncPoint applyOperation:operation writesCache:writesCache serverCache:resolvedServerCache]];
+ }
+
+ return events;
+}
+
+@end