diff options
Diffstat (limited to 'Firebase/Database/Core/FRepo.m')
-rw-r--r-- | Firebase/Database/Core/FRepo.m | 1116 |
1 files changed, 1116 insertions, 0 deletions
diff --git a/Firebase/Database/Core/FRepo.m b/Firebase/Database/Core/FRepo.m new file mode 100644 index 0000000..06cc253 --- /dev/null +++ b/Firebase/Database/Core/FRepo.m @@ -0,0 +1,1116 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import <dlfcn.h> +#import "FRepo.h" +#import "FSnapshotUtilities.h" +#import "FConstants.h" +#import "FIRDatabaseQuery_Private.h" +#import "FQuerySpec.h" +#import "FTupleNodePath.h" +#import "FRepo_Private.h" +#import "FRepoManager.h" +#import "FServerValues.h" +#import "FTupleSetIdPath.h" +#import "FSyncTree.h" +#import "FEventRegistration.h" +#import "FAtomicNumber.h" +#import "FSyncTree.h" +#import "FListenProvider.h" +#import "FEventRaiser.h" +#import "FSnapshotHolder.h" +#import "FIRDatabaseConfig_Private.h" +#import "FLevelDBStorageEngine.h" +#import "FPersistenceManager.h" +#import "FWriteRecord.h" +#import "FCachePolicy.h" +#import "FClock.h" +#import "FIRDatabase_Private.h" +#import "FTree.h" +#import "FTupleTransaction.h" +#import "FIRTransactionResult.h" +#import "FIRTransactionResult_Private.h" +#import "FIRMutableData.h" +#import "FIRMutableData_Private.h" +#import "FIRDataSnapshot.h" +#import "FIRDataSnapshot_Private.h" +#import "FValueEventRegistration.h" +#import "FEmptyNode.h" + +#ifdef TARGET_OS_IPHONE +#import <UIKit/UIKit.h> +#endif + +@interface FRepo() + +@property (nonatomic, strong) FOffsetClock *serverClock; +@property (nonatomic, strong) FPersistenceManager* persistenceManager; +@property (nonatomic, strong) FIRDatabase *database; +@property (nonatomic, strong, readwrite) FAuthenticationManager *auth; +@property (nonatomic, strong) FSyncTree *infoSyncTree; +@property (nonatomic) NSInteger writeIdCounter; +@property (nonatomic) BOOL hijackHash; +@property (nonatomic, strong) FTree *transactionQueueTree; +@property (nonatomic) BOOL loggedTransactionPersistenceWarning; + +/** +* Test only. For load testing the server. +*/ +@property (nonatomic, strong) id (^interceptServerDataCallback)(NSString *pathString, id data); +@end + + +@implementation FRepo + +- (id)initWithRepoInfo:(FRepoInfo*)info config:(FIRDatabaseConfig *)config database:(FIRDatabase *)database { + self = [super init]; + if (self) { + self.repoInfo = info; + self.config = config; + self.database = database; + + // Access can occur outside of shared queue, so the clock needs to be initialized here + self.serverClock = [[FOffsetClock alloc] initWithClock:[FSystemClock clock] offset:0]; + + self.connection = [[FPersistentConnection alloc] initWithRepoInfo:self.repoInfo dispatchQueue:[FIRDatabaseQuery sharedQueue] config:self.config]; + + // Needs to be called before authentication manager is instantiated + self.eventRaiser = [[FEventRaiser alloc] initWithQueue:self.config.callbackQueue]; + + dispatch_async([FIRDatabaseQuery sharedQueue], ^{ + [self deferredInit]; + }); + } + return self; +} + +- (void)deferredInit { + // TODO: cleanup on dealloc + __weak FRepo *weakSelf = self; + [self.config.authTokenProvider listenForTokenChanges:^(NSString *token) { + [weakSelf.connection refreshAuthToken:token]; + }]; + + // Open connection now so that by the time we are connected the deferred init has run + // This relies on the fact that all callbacks run on repos queue + self.connection.delegate = self; + [self.connection open]; + + self.dataUpdateCount = 0; + self.rangeMergeUpdateCount = 0; + self.interceptServerDataCallback = nil; + + if (self.config.persistenceEnabled) { + NSString* repoHashString = [NSString stringWithFormat:@"%@_%@", self.repoInfo.host, self.repoInfo.namespace]; + NSString* persistencePrefix = [NSString stringWithFormat:@"%@/%@", self.config.sessionIdentifier, repoHashString]; + + id<FCachePolicy> cachePolicy = [[FLRUCachePolicy alloc] initWithMaxSize:self.config.persistenceCacheSizeBytes]; + + id<FStorageEngine> engine; + if (self.config.forceStorageEngine != nil) { + engine = self.config.forceStorageEngine; + } else { + FLevelDBStorageEngine *levelDBEngine = [[FLevelDBStorageEngine alloc] initWithPath:persistencePrefix]; + // We need the repo info to run the legacy migration. Future migrations will be managed by the database itself + // Remove this once we are confident that no-one is using legacy migration anymore... + [levelDBEngine runLegacyMigration:self.repoInfo]; + engine = levelDBEngine; + } + + self.persistenceManager = [[FPersistenceManager alloc] initWithStorageEngine:engine cachePolicy:cachePolicy]; + } else { + self.persistenceManager = nil; + } + + [self initTransactions]; + + // A list of data pieces and paths to be set when this client disconnects + self.onDisconnect = [[FSparseSnapshotTree alloc] init]; + self.infoData = [[FSnapshotHolder alloc] init]; + + FListenProvider *infoListenProvider = [[FListenProvider alloc] init]; + infoListenProvider.startListening = ^(FQuerySpec *query, + NSNumber *tagId, + id<FSyncTreeHash> hash, + fbt_nsarray_nsstring onComplete) { + NSArray *infoEvents = @[]; + FRepo *strongSelf = weakSelf; + id<FNode> node = [strongSelf.infoData getNode:query.path]; + // This is possibly a hack, but we have different semantics for .info endpoints. We don't raise null events + // on initial data... + if (![node isEmpty]) { + infoEvents = [strongSelf.infoSyncTree applyServerOverwriteAtPath:query.path newData:node]; + [strongSelf.eventRaiser raiseCallback:^{ + onComplete(kFWPResponseForActionStatusOk); + }]; + } + return infoEvents; + }; + infoListenProvider.stopListening = ^(FQuerySpec *query, NSNumber *tagId) {}; + self.infoSyncTree = [[FSyncTree alloc] initWithListenProvider:infoListenProvider]; + + FListenProvider *serverListenProvider = [[FListenProvider alloc] init]; + serverListenProvider.startListening = ^(FQuerySpec *query, + NSNumber *tagId, + id<FSyncTreeHash> hash, + fbt_nsarray_nsstring onComplete) { + [weakSelf.connection listen:query tagId:tagId hash:hash onComplete:^(NSString *status) { + NSArray *events = onComplete(status); + [weakSelf.eventRaiser raiseEvents:events]; + }]; + // No synchronous events for network-backed sync trees + return @[]; + }; + serverListenProvider.stopListening = ^(FQuerySpec *query, NSNumber *tag) { + [weakSelf.connection unlisten:query tagId:tag]; + }; + self.serverSyncTree = [[FSyncTree alloc] initWithPersistenceManager:self.persistenceManager + listenProvider:serverListenProvider]; + + [self restoreWrites]; + + [self updateInfo:kDotInfoConnected withValue:@NO]; + + [self setupNotifications]; +} + + +- (void) restoreWrites { + NSArray *writes = self.persistenceManager.userWrites; + + NSDictionary *serverValues = [FServerValues generateServerValues:self.serverClock]; + __block NSInteger lastWriteId = NSIntegerMin; + [writes enumerateObjectsUsingBlock:^(FWriteRecord *write, NSUInteger idx, BOOL *stop) { + NSInteger writeId = write.writeId; + fbt_void_nsstring_nsstring callback = ^(NSString *status, NSString *errorReason) { + [self warnIfWriteFailedAtPath:write.path status:status message:@"Persisted write"]; + [self ackWrite:writeId rerunTransactionsAtPath:write.path status:status]; + }; + if (lastWriteId >= writeId) { + [NSException raise:NSInternalInconsistencyException format:@"Restored writes were not in order!"]; + } + lastWriteId = writeId; + self.writeIdCounter = writeId + 1; + if ([write isOverwrite]) { + FFLog(@"I-RDB038001", @"Restoring overwrite with id %ld", (long)write.writeId); + [self.connection putData:[write.overwrite valForExport:YES] + forPath:[write.path toString] + withHash:nil + withCallback:callback]; + id<FNode> resolved = [FServerValues resolveDeferredValueSnapshot:write.overwrite withServerValues:serverValues]; + [self.serverSyncTree applyUserOverwriteAtPath:write.path newData:resolved writeId:writeId isVisible:YES]; + } else { + FFLog(@"I-RDB038002", @"Restoring merge with id %ld", (long)write.writeId); + [self.connection mergeData:[write.merge valForExport:YES] + forPath:[write.path toString] + withCallback:callback]; + FCompoundWrite *resolved = [FServerValues resolveDeferredValueCompoundWrite:write.merge withServerValues:serverValues]; + [self.serverSyncTree applyUserMergeAtPath:write.path changedChildren:resolved writeId:writeId]; + } + }]; +} + +- (NSString*)name { + return self.repoInfo.namespace; +} + +- (NSString *) description { + return [self.repoInfo description]; +} + +- (void) interrupt { + [self.connection interruptForReason:kFInterruptReasonRepoInterrupt]; +} + +- (void) resume { + [self.connection resumeForReason:kFInterruptReasonRepoInterrupt]; +} + +// NOTE: Typically if you're calling this, you should be in an @autoreleasepool block to make sure that ARC kicks +// in and cleans up things no longer referenced (i.e. pendingPutsDB). +- (void) dispose { + [self.connection interruptForReason:kFInterruptReasonRepoInterrupt]; + + // We need to nil out any references to LevelDB, to make sure the + // LevelDB exclusive locks are released. + [self.persistenceManager close]; +} + +- (NSInteger) nextWriteId { + return self->_writeIdCounter++; +} + +- (NSTimeInterval) serverTime { + return [self.serverClock currentTime]; +} + +- (void) set:(FPath *)path withNode:(id<FNode>)node withCallback:(fbt_void_nserror_ref)onComplete { + id value = [node valForExport:YES]; + FFLog(@"I-RDB038003", @"Setting: %@ with %@ pri: %@", [path toString], [value description], [[node getPriority] val]); + + // TODO: Optimize this behavior to either (a) store flag to skip resolving where possible and / or + // (b) store unresolved paths on JSON parse + NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock]; + id<FNode> newNode = [FServerValues resolveDeferredValueSnapshot:node withServerValues:serverValues]; + + NSInteger writeId = [self nextWriteId]; + [self.persistenceManager saveUserOverwrite:node atPath:path writeId:writeId]; + NSArray *events = [self.serverSyncTree applyUserOverwriteAtPath:path newData:newNode writeId:writeId isVisible:YES]; + [self.eventRaiser raiseEvents:events]; + + [self.connection putData:value forPath:[path toString] withHash:nil withCallback:^(NSString *status, NSString *errorReason) { + [self warnIfWriteFailedAtPath:path status:status message:@"setValue: or removeValue:"]; + [self ackWrite:writeId rerunTransactionsAtPath:path status:status]; + [self callOnComplete:onComplete withStatus:status errorReason:errorReason andPath:path]; + }]; + + FPath* affectedPath = [self abortTransactionsAtPath:path error:kFTransactionSet]; + [self rerunTransactionsForPath:affectedPath]; +} + +- (void) update:(FPath *)path withNodes:(FCompoundWrite *)nodes withCallback:(fbt_void_nserror_ref)callback { + NSDictionary *values = [nodes valForExport:YES]; + + FFLog(@"I-RDB038004", @"Updating: %@ with %@", [path toString], [values description]); + NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock]; + FCompoundWrite *resolved = [FServerValues resolveDeferredValueCompoundWrite:nodes withServerValues:serverValues]; + + if (!resolved.isEmpty) { + NSInteger writeId = [self nextWriteId]; + [self.persistenceManager saveUserMerge:nodes atPath:path writeId:writeId]; + NSArray *events = [self.serverSyncTree applyUserMergeAtPath:path changedChildren:resolved writeId:writeId]; + [self.eventRaiser raiseEvents:events]; + + [self.connection mergeData:values forPath:[path description] withCallback:^(NSString *status, NSString *errorReason) { + [self warnIfWriteFailedAtPath:path status:status message:@"updateChildValues:"]; + [self ackWrite:writeId rerunTransactionsAtPath:path status:status]; + [self callOnComplete:callback withStatus:status errorReason:errorReason andPath:path]; + }]; + + [nodes enumerateWrites:^(FPath *childPath, id<FNode> node, BOOL *stop) { + FPath* pathFromRoot = [path child:childPath]; + FFLog(@"I-RDB038005", @"Cancelling transactions at path: %@", pathFromRoot); + FPath *affectedPath = [self abortTransactionsAtPath:pathFromRoot error:kFTransactionSet]; + [self rerunTransactionsForPath:affectedPath]; + }]; + } else { + FFLog(@"I-RDB038006", @"update called with empty data. Doing nothing"); + // Do nothing, just call the callback + [self callOnComplete:callback withStatus:@"ok" errorReason:nil andPath:path]; + } +} + +- (void) onDisconnectCancel:(FPath *)path withCallback:(fbt_void_nserror_ref)callback { + [self.connection onDisconnectCancelPath:path withCallback:^(NSString *status, NSString *errorReason) { + BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk]; + if (success) { + [self.onDisconnect forgetPath:path]; + } else { + FFLog(@"I-RDB038007", @"cancelDisconnectOperations: at %@ failed: %@", path, status); + } + + [self callOnComplete:callback withStatus:status errorReason:errorReason andPath:path]; + }]; +} + +- (void) onDisconnectSet:(FPath *)path withNode:(id<FNode>)node withCallback:(fbt_void_nserror_ref)callback { + [self.connection onDisconnectPutData:[node valForExport:YES] forPath:path withCallback:^(NSString *status, NSString *errorReason) { + BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk]; + if (success) { + [self.onDisconnect rememberData:node onPath:path]; + } else { + FFWarn(@"I-RDB038008", @"onDisconnectSetValue: or onDisconnectRemoveValue: at %@ failed: %@", path, status); + } + + [self callOnComplete:callback withStatus:status errorReason:errorReason andPath:path]; + }]; +} + +- (void) onDisconnectUpdate:(FPath *)path withNodes:(FCompoundWrite *)nodes withCallback:(fbt_void_nserror_ref)callback { + if (!nodes.isEmpty) { + NSDictionary *values = [nodes valForExport:YES]; + + [self.connection onDisconnectMergeData:values forPath:path withCallback:^(NSString *status, NSString *errorReason) { + BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk]; + if (success) { + [nodes enumerateWrites:^(FPath *relativePath, id<FNode> nodeUnresolved, BOOL *stop) { + FPath* childPath = [path child:relativePath]; + [self.onDisconnect rememberData:nodeUnresolved onPath:childPath]; + }]; + } else { + FFWarn(@"I-RDB038009", @"onDisconnectUpdateChildValues: at %@ failed %@", path, status); + } + + [self callOnComplete:callback withStatus:status errorReason:errorReason andPath:path]; + }]; + } else { + // Do nothing, just call the callback + [self callOnComplete:callback withStatus:@"ok" errorReason:nil andPath:path]; + } +} + +- (void) purgeOutstandingWrites { + FFLog(@"I-RDB038010", @"Purging outstanding writes"); + NSArray *events = [self.serverSyncTree removeAllWrites]; + [self.eventRaiser raiseEvents:events]; + // Abort any transactions + [self abortTransactionsAtPath:[FPath empty] error:kFErrorWriteCanceled]; + // Remove outstanding writes from connection + [self.connection purgeOutstandingWrites]; +} + +- (void) addEventRegistration:(id <FEventRegistration>)eventRegistration forQuery:(FQuerySpec *)query { + NSArray *events = nil; + if ([[query.path getFront] isEqualToString:kDotInfoPrefix]) { + events = [self.infoSyncTree addEventRegistration:eventRegistration forQuery:query]; + } else { + events = [self.serverSyncTree addEventRegistration:eventRegistration forQuery:query]; + } + [self.eventRaiser raiseEvents:events]; +} + +- (void) removeEventRegistration:(id<FEventRegistration>)eventRegistration forQuery:(FQuerySpec *)query { + // These are guaranteed not to raise events, since we're not passing in a cancelError. However we can future-proof + // a little bit by handling the return values anyways. + FFLog(@"I-RDB038011", @"Removing event registration with hande: %lu", (unsigned long)eventRegistration.handle); + NSArray *events = nil; + if ([[query.path getFront] isEqualToString:kDotInfoPrefix]) { + events = [self.infoSyncTree removeEventRegistration:eventRegistration forQuery:query cancelError:nil]; + } else { + events = [self.serverSyncTree removeEventRegistration:eventRegistration forQuery:query cancelError:nil]; + } + [self.eventRaiser raiseEvents:events]; +} + +- (void) keepQuery:(FQuerySpec *)query synced:(BOOL)synced { + NSAssert(![[query.path getFront] isEqualToString:kDotInfoPrefix], @"Can't keep .info tree synced!"); + [self.serverSyncTree keepQuery:query synced:synced]; +} + +- (void) updateInfo:(NSString *) pathString withValue:(id)value { + // hack to make serverTimeOffset available in a threadsafe way. Property is marked as atomic + if ([pathString isEqualToString:kDotInfoServerTimeOffset]) { + NSTimeInterval offset = [(NSNumber *)value doubleValue]/1000.0; + self.serverClock = [[FOffsetClock alloc] initWithClock:[FSystemClock clock] offset:offset]; + } + + FPath* path = [[FPath alloc] initWith:[NSString stringWithFormat:@"%@/%@", kDotInfoPrefix, pathString]]; + id<FNode> newNode = [FSnapshotUtilities nodeFrom:value]; + [self.infoData updateSnapshot:path withNewSnapshot:newNode]; + NSArray *events = [self.infoSyncTree applyServerOverwriteAtPath:path newData:newNode]; + [self.eventRaiser raiseEvents:events]; +} + +- (void) callOnComplete:(fbt_void_nserror_ref)onComplete withStatus:(NSString *)status errorReason:(NSString *)errorReason andPath:(FPath *)path { + if (onComplete) { + FIRDatabaseReference * ref = [[FIRDatabaseReference alloc] initWithRepo:self path:path]; + BOOL statusOk = [status isEqualToString:kFWPResponseForActionStatusOk]; + NSError* err = nil; + if (!statusOk) { + err = [FUtilities errorForStatus:status andReason:errorReason]; + } + [self.eventRaiser raiseCallback:^{ + onComplete(err, ref); + }]; + } +} + +- (void)ackWrite:(NSInteger)writeId rerunTransactionsAtPath:(FPath *)path status:(NSString *)status { + if ([status isEqualToString:kFErrorWriteCanceled]) { + // This write was already removed, we just need to ignore it... + } else { + BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk]; + NSArray *clearEvents = [self.serverSyncTree ackUserWriteWithWriteId:writeId revert:!success persist:YES clock:self.serverClock]; + if ([clearEvents count] > 0) { + [self rerunTransactionsForPath:path]; + } + [self.eventRaiser raiseEvents:clearEvents]; + } +} + +- (void) warnIfWriteFailedAtPath:(FPath *)path status:(NSString *)status message:(NSString *)message { + if (!([status isEqualToString:kFWPResponseForActionStatusOk] || [status isEqualToString:kFErrorWriteCanceled])) { + FFWarn(@"I-RDB038012", @"%@ at %@ failed: %@", message, path, status); + } +} + +#pragma mark - +#pragma mark FPersistentConnectionDelegate methods + +- (void) onDataUpdate:(FPersistentConnection *)fpconnection forPath:(NSString *)pathString message:(id)data isMerge:(BOOL)isMerge tagId:(NSNumber *)tagId { + FFLog(@"I-RDB038013", @"onDataUpdateForPath: %@ withMessage: %@", pathString, data); + + // For testing. + self.dataUpdateCount++; + + FPath* path = [[FPath alloc] initWith:pathString]; + data = self.interceptServerDataCallback ? self.interceptServerDataCallback(pathString, data) : data; + NSArray *events = nil; + + if (tagId != nil) { + if (isMerge) { + NSDictionary *message = data; + FCompoundWrite *taggedChildren = [FCompoundWrite compoundWriteWithValueDictionary:message]; + events = [self.serverSyncTree applyTaggedQueryMergeAtPath:path changedChildren:taggedChildren tagId:tagId]; + } else { + id<FNode> taggedSnap = [FSnapshotUtilities nodeFrom:data]; + events = [self.serverSyncTree applyTaggedQueryOverwriteAtPath:path newData:taggedSnap tagId:tagId]; + } + } else if (isMerge) { + NSDictionary *message = data; + FCompoundWrite *changedChildren = [FCompoundWrite compoundWriteWithValueDictionary:message]; + events = [self.serverSyncTree applyServerMergeAtPath:path changedChildren:changedChildren]; + } else { + id<FNode> snap = [FSnapshotUtilities nodeFrom:data]; + events = [self.serverSyncTree applyServerOverwriteAtPath:path newData:snap]; + } + + if ([events count] > 0) { + // Since we have a listener outstanding for each transaction, receiving any events + // is a proxy for some change having occurred. + [self rerunTransactionsForPath:path]; + } + + [self.eventRaiser raiseEvents:events]; +} + +- (void)onRangeMerge:(NSArray *)ranges forPath:(NSString *)pathString tagId:(NSNumber *)tag { + FFLog(@"I-RDB038014", @"onRangeMerge: %@ => %@", pathString, ranges); + + // For testing + self.rangeMergeUpdateCount++; + + FPath* path = [[FPath alloc] initWith:pathString]; + NSArray *events; + if (tag != nil) { + events = [self.serverSyncTree applyTaggedServerRangeMergeAtPath:path updates:ranges tagId:tag]; + } else { + events = [self.serverSyncTree applyServerRangeMergeAtPath:path updates:ranges]; + } + if (events.count > 0) { + // Since we have a listener outstanding for each transaction, receiving any events + // is a proxy for some change having occurred. + [self rerunTransactionsForPath:path]; + } + + [self.eventRaiser raiseEvents:events]; +} + +- (void)onConnect:(FPersistentConnection *)fpconnection { + [self updateInfo:kDotInfoConnected withValue:@true]; +} + +- (void)onDisconnect:(FPersistentConnection *)fpconnection { + [self updateInfo:kDotInfoConnected withValue:@false]; + [self runOnDisconnectEvents]; +} + +- (void)onServerInfoUpdate:(FPersistentConnection *)fpconnection updates:(NSDictionary *)updates { + for (NSString* key in updates) { + id val = [updates objectForKey:key]; + [self updateInfo:key withValue:val]; + } +} + +- (void) setupNotifications { + NSString * const *backgroundConstant = (NSString * const *) dlsym(RTLD_DEFAULT, "UIApplicationDidEnterBackgroundNotification"); + if (backgroundConstant) { + FFLog(@"I-RDB038015", @"Registering for background notification."); + [[NSNotificationCenter defaultCenter] addObserver:self + selector:@selector(didEnterBackground) + name:*backgroundConstant + object:nil]; + } else { + FFLog(@"I-RDB038016", @"Skipped registering for background notification."); + } +} + +- (void) didEnterBackground { + if (!self.config.persistenceEnabled) + return; + + // Targetted compilation is ONLY for testing. UIKit is weak-linked in actual release build. +#if TARGET_OS_IPHONE + // The idea is to wait until any outstanding sets get written to disk. Since the sets might still be in our + // dispatch queue, we wait for the dispatch queue to catch up and for persistence to catch up. + // This may be undesirable though. The dispatch queue might just be processing a bunch of incoming data or + // something. We might want to keep track of whether there are any unpersisted sets or something. + FFLog(@"I-RDB038017", @"Entering background. Starting background task to finish work."); + Class uiApplicationClass = NSClassFromString(@"UIApplication"); + assert(uiApplicationClass); // If we are here, we should be on iOS and UIApplication should be available. + + UIApplication *application = [uiApplicationClass sharedApplication]; + __block UIBackgroundTaskIdentifier bgTask = [application beginBackgroundTaskWithExpirationHandler:^{ + [application endBackgroundTask:bgTask]; + }]; + + NSDate *start = [NSDate date]; + dispatch_async([FIRDatabaseQuery sharedQueue], ^{ + NSTimeInterval finishTime = [start timeIntervalSinceNow]*-1; + FFLog(@"I-RDB038018", @"Background task completed. Queue time: %f", finishTime); + [application endBackgroundTask:bgTask]; + }); +#endif +} + +#pragma mark - +#pragma mark Internal methods + +/** +* Applies all the changes stored up in the onDisconnect tree +*/ +- (void) runOnDisconnectEvents { + FFLog(@"I-RDB038019", @"Running onDisconnectEvents"); + NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock]; + FSparseSnapshotTree* resolvedTree = [FServerValues resolveDeferredValueTree:self.onDisconnect withServerValues:serverValues]; + NSMutableArray *events = [[NSMutableArray alloc] init]; + + [resolvedTree forEachTreeAtPath:[FPath empty] do:^(FPath *path, id<FNode> node) { + [events addObjectsFromArray:[self.serverSyncTree applyServerOverwriteAtPath:path newData:node]]; + FPath* affectedPath = [self abortTransactionsAtPath:path error:kFTransactionSet]; + [self rerunTransactionsForPath:affectedPath]; + }]; + + self.onDisconnect = [[FSparseSnapshotTree alloc] init]; + [self.eventRaiser raiseEvents:events]; +} + +- (NSDictionary *) dumpListens { + return [self.connection dumpListens]; +} + +#pragma mark - +#pragma mark Transactions + +/** + * Setup the transaction data structures + */ +- (void) initTransactions { + self.transactionQueueTree = [[FTree alloc] init]; + self.hijackHash = NO; + self.loggedTransactionPersistenceWarning = NO; +} + +/** + * Creates a new transaction, add its to the transactions we're tracking, and sends it to the server if possible + */ +- (void) startTransactionOnPath:(FPath *)path update:(fbt_transactionresult_mutabledata)update onComplete:(fbt_void_nserror_bool_datasnapshot)onComplete withLocalEvents:(BOOL)applyLocally { + if (self.config.persistenceEnabled && !self.loggedTransactionPersistenceWarning) { + self.loggedTransactionPersistenceWarning = YES; + FFInfo(@"I-RDB038020", @"runTransactionBlock: usage detected while persistence is enabled. Please be aware that transactions " + @"*will not* be persisted across app restarts. " + @"See https://www.firebase.com/docs/ios/guide/offline-capabilities.html#section-handling-transactions-offline for more details."); + } + + FIRDatabaseReference * watchRef = [[FIRDatabaseReference alloc] initWithRepo:self path:path]; + // make sure we're listening on this node + // Note: we can't do this asynchronously. To preserve event ordering, it has to be done in this block. + // This is ok, this block is guaranteed to be our own event loop + NSUInteger handle = [[FUtilities LUIDGenerator] integerValue]; + fbt_void_datasnapshot cb = ^(FIRDataSnapshot *snapshot) {}; + FValueEventRegistration *registration = [[FValueEventRegistration alloc] initWithRepo:self + handle:handle + callback:cb + cancelCallback:nil]; + [watchRef.repo addEventRegistration:registration forQuery:watchRef.querySpec]; + fbt_void_void unwatcher = ^{ [watchRef removeObserverWithHandle:handle]; }; + + // Save all the data that represents this transaction + FTupleTransaction* transaction = [[FTupleTransaction alloc] init]; + transaction.path = path; + transaction.update = update; + transaction.onComplete = onComplete; + transaction.status = FTransactionInitializing; + transaction.order = [FUtilities LUIDGenerator]; + transaction.applyLocally = applyLocally; + transaction.retryCount = 0; + transaction.unwatcher = unwatcher; + transaction.currentWriteId = nil; + transaction.currentInputSnapshot = nil; + transaction.currentOutputSnapshotRaw = nil; + transaction.currentOutputSnapshotResolved = nil; + + // Run transaction initially + id<FNode> currentState = [self latestStateAtPath:path excludeWriteIds:nil]; + transaction.currentInputSnapshot = currentState; + FIRMutableData * mutableCurrent = [[FIRMutableData alloc] initWithNode:currentState]; + FIRTransactionResult * result = transaction.update(mutableCurrent); + + if (!result.isSuccess) { + // Abort the transaction + transaction.unwatcher(); + transaction.currentOutputSnapshotRaw = nil; + transaction.currentOutputSnapshotResolved = nil; + if (transaction.onComplete) { + FIRDatabaseReference *ref = [[FIRDatabaseReference alloc] initWithRepo:self path:transaction.path]; + FIndexedNode *indexedNode = [FIndexedNode indexedNodeWithNode:transaction.currentInputSnapshot]; + FIRDataSnapshot *snap = [[FIRDataSnapshot alloc] initWithRef:ref indexedNode:indexedNode]; + [self.eventRaiser raiseCallback:^{ + transaction.onComplete(nil, NO, snap); + }]; + } + } else { + // Note: different from js. We don't need to validate, FIRMutableData does validation. + // We also don't have to worry about priorities. Just mark as run and add to queue. + transaction.status = FTransactionRun; + FTree* queueNode = [self.transactionQueueTree subTree:transaction.path]; + NSMutableArray* nodeQueue = [queueNode getValue]; + if (nodeQueue == nil) { + nodeQueue = [[NSMutableArray alloc] init]; + } + [nodeQueue addObject:transaction]; + [queueNode setValue:nodeQueue]; + + // Update visibleData and raise events + // Note: We intentionally raise events after updating all of our transaction state, since the user could + // start new transactions from the event callbacks + NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock]; + id<FNode> newValUnresolved = [result.update nodeValue]; + id<FNode> newVal = [FServerValues resolveDeferredValueSnapshot:newValUnresolved withServerValues:serverValues]; + transaction.currentOutputSnapshotRaw = newValUnresolved; + transaction.currentOutputSnapshotResolved = newVal; + transaction.currentWriteId = [NSNumber numberWithInteger:[self nextWriteId]]; + + NSArray *events = [self.serverSyncTree applyUserOverwriteAtPath:path newData:newVal + writeId:[transaction.currentWriteId integerValue] + isVisible:transaction.applyLocally]; + [self.eventRaiser raiseEvents:events]; + + [self sendAllReadyTransactions]; + } +} + +/** + * @param writeIdsToExclude A specific set to exclude + */ +- (id<FNode>) latestStateAtPath:(FPath *)path excludeWriteIds:(NSArray *)writeIdsToExclude { + id<FNode> latestState = [self.serverSyncTree calcCompleteEventCacheAtPath:path excludeWriteIds:writeIdsToExclude]; + return latestState ? latestState : [FEmptyNode emptyNode]; +} + +/** + * Sends any already-run transactions that aren't waiting for outstanding transactions to complete. + * + * Externally, call the version with no arguments. + * Internally, calls itself recursively with a particular transactionQueueTree node to recurse through the tree + */ +- (void) sendAllReadyTransactions { + FTree* node = self.transactionQueueTree; + + [self pruneCompletedTransactionsBelowNode:node]; + [self sendReadyTransactionsForTree:node]; +} + +- (void) sendReadyTransactionsForTree:(FTree *)node { + NSMutableArray* queue = [node getValue]; + if (queue != nil) { + queue = [self buildTransactionQueueAtNode:node]; + NSAssert([queue count] > 0, @"Sending zero length transaction queue"); + + NSUInteger notRunIndex = [queue indexOfObjectPassingTest:^BOOL(id obj, NSUInteger idx, BOOL *stop) { + return ((FTupleTransaction*)obj).status != FTransactionRun; + }]; + + // If they're all run (and not sent), we can send them. Else, we must wait. + if (notRunIndex == NSNotFound) { + [self sendTransactionQueue:queue atPath:node.path]; + } + } else if ([node hasChildren]) { + [node forEachChild:^(FTree *child) { + [self sendReadyTransactionsForTree:child]; + }]; + } +} + +/** + * Given a list of run transactions, send them to the server and then handle the result (success or failure). + */ +- (void) sendTransactionQueue:(NSMutableArray *)queue atPath:(FPath *)path { + // Mark transactions as sent and bump the retry count + NSMutableArray *writeIdsToExclude = [[NSMutableArray alloc] init]; + for (FTupleTransaction *transaction in queue) { + [writeIdsToExclude addObject:transaction.currentWriteId]; + } + id<FNode> latestState = [self latestStateAtPath:path excludeWriteIds:writeIdsToExclude]; + id<FNode> snapToSend = latestState; + NSString *latestHash = [latestState dataHash]; + for (FTupleTransaction* transaction in queue) { + NSAssert(transaction.status == FTransactionRun, @"[FRepo sendTransactionQueue:] items in queue should all be run."); + FFLog(@"I-RDB038021", @"Transaction at %@ set to SENT", transaction.path); + transaction.status = FTransactionSent; + transaction.retryCount++; + FPath *relativePath = [FPath relativePathFrom:path to:transaction.path]; + // If we've gotten to this point, the output snapshot must be defined. + snapToSend = [snapToSend updateChild:relativePath withNewChild:transaction.currentOutputSnapshotRaw]; + } + + id dataToSend = [snapToSend valForExport:YES]; + NSString *pathToSend = [path description]; + latestHash = self.hijackHash ? @"badhash" : latestHash; + + // Send the put + [self.connection putData:dataToSend forPath:pathToSend withHash:latestHash withCallback:^(NSString *status, NSString *errorReason) { + FFLog(@"I-RDB038022", @"Transaction put response: %@ : %@", pathToSend, status); + + NSMutableArray *events = [[NSMutableArray alloc] init]; + if ([status isEqualToString:kFWPResponseForActionStatusOk]) { + // Queue up the callbacks and fire them after cleaning up all of our transaction state, since + // the callback could trigger more transactions or sets. + NSMutableArray *callbacks = [[NSMutableArray alloc] init]; + for (FTupleTransaction *transaction in queue) { + transaction.status = FTransactionCompleted; + [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue] + revert:NO + persist:NO + clock:self.serverClock]]; + if (transaction.onComplete) { + // We never unset the output snapshot, and given that this transaction is complete, it should be set + id <FNode> node = transaction.currentOutputSnapshotResolved; + FIndexedNode *indexedNode = [FIndexedNode indexedNodeWithNode:node]; + FIRDatabaseReference *ref = [[FIRDatabaseReference alloc] initWithRepo:self path:transaction.path]; + FIRDataSnapshot *snapshot = [[FIRDataSnapshot alloc] initWithRef:ref indexedNode:indexedNode]; + fbt_void_void cb = ^{ + transaction.onComplete(nil, YES, snapshot); + }; + [callbacks addObject:[cb copy]]; + } + transaction.unwatcher(); + } + + // Now remove the completed transactions. + [self pruneCompletedTransactionsBelowNode:[self.transactionQueueTree subTree:path]]; + // There may be pending transactions that we can now send. + [self sendAllReadyTransactions]; + + // Finally, trigger onComplete callbacks + [self.eventRaiser raiseCallbacks:callbacks]; + } else { + // transactions are no longer sent. Update their status appropriately. + if ([status isEqualToString:kFWPResponseForActionStatusDataStale]) { + for (FTupleTransaction *transaction in queue) { + if (transaction.status == FTransactionSentNeedsAbort) { + transaction.status = FTransactionNeedsAbort; + } else { + transaction.status = FTransactionRun; + } + } + } else { + FFWarn(@"I-RDB038023", @"runTransactionBlock: at %@ failed: %@", path, status); + for (FTupleTransaction *transaction in queue) { + transaction.status = FTransactionNeedsAbort; + [transaction setAbortStatus:status reason:errorReason]; + } + } + } + + [self rerunTransactionsForPath:path]; + [self.eventRaiser raiseEvents:events]; + }]; +} + +/** + * Finds all transactions dependent on the data at changed Path and reruns them. + * + * Should be called any time cached data changes. + * + * Return the highest path that was affected by rerunning transactions. This is the path at which events need to + * be raised for. + */ +- (FPath *) rerunTransactionsForPath:(FPath *)changedPath { + // For the common case that there are no transactions going on, skip all this! + if ([self.transactionQueueTree isEmpty]) { + return changedPath; + } else { + FTree* rootMostTransactionNode = [self getAncestorTransactionNodeForPath:changedPath]; + FPath* path = rootMostTransactionNode.path; + + NSArray* queue = [self buildTransactionQueueAtNode:rootMostTransactionNode]; + [self rerunTransactionQueue:queue atPath:path]; + + return path; + } +} + +/** + * Does all the work of rerunning transactions (as well as cleans up aborted transactions and whatnot). + */ +- (void) rerunTransactionQueue:(NSArray *)queue atPath:(FPath *)path { + if (queue.count == 0) { + return; // nothing to do + } + + // Queue up the callbacks and fire them after cleaning up all of our transaction state, since + // the callback could trigger more transactions or sets. + NSMutableArray *events = [[NSMutableArray alloc] init]; + NSMutableArray *callbacks = [[NSMutableArray alloc] init]; + + // Ignore, by default, all of the sets in this queue, since we're re-running all of them. However, we want to include + // the results of new sets triggered as part of this re-run, so we don't want to ignore a range, just these specific + // sets. + NSMutableArray *writeIdsToExclude = [[NSMutableArray alloc] init]; + for (FTupleTransaction *transaction in queue) { + [writeIdsToExclude addObject:transaction.currentWriteId]; + } + + for (FTupleTransaction* transaction in queue) { + FPath* relativePath __unused = [FPath relativePathFrom:path to:transaction.path]; + BOOL abortTransaction = NO; + NSAssert(relativePath != nil, @"[FRepo rerunTransactionsQueue:] relativePath should not be null."); + + if (transaction.status == FTransactionNeedsAbort) { + abortTransaction = YES; + if (![transaction.abortStatus isEqualToString:kFErrorWriteCanceled]) { + NSArray *ackEvents = [self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue] + revert:YES + persist:NO + clock:self.serverClock]; + [events addObjectsFromArray:ackEvents]; + } + } else if (transaction.status == FTransactionRun) { + if (transaction.retryCount >= kFTransactionMaxRetries) { + abortTransaction = YES; + [transaction setAbortStatus:kFTransactionTooManyRetries reason:nil]; + [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue] + revert:YES + persist:NO + clock:self.serverClock]]; + } else { + // This code reruns a transaction + id<FNode> currentNode = [self latestStateAtPath:transaction.path excludeWriteIds:writeIdsToExclude]; + transaction.currentInputSnapshot = currentNode; + FIRMutableData * mutableCurrent = [[FIRMutableData alloc] initWithNode:currentNode]; + FIRTransactionResult * result = transaction.update(mutableCurrent); + if (result.isSuccess) { + NSNumber *oldWriteId = transaction.currentWriteId; + NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock]; + + id<FNode> newVal = [result.update nodeValue]; + id<FNode> newValResolved = [FServerValues resolveDeferredValueSnapshot:newVal withServerValues:serverValues]; + + transaction.currentOutputSnapshotRaw = newVal; + transaction.currentOutputSnapshotResolved = newValResolved; + + transaction.currentWriteId = [NSNumber numberWithInteger:[self nextWriteId]]; + // Mutates writeIdsToExclude in place + [writeIdsToExclude removeObject:oldWriteId]; + [events addObjectsFromArray:[self.serverSyncTree applyUserOverwriteAtPath:transaction.path + newData:transaction.currentOutputSnapshotResolved + writeId:[transaction.currentWriteId integerValue] + isVisible:transaction.applyLocally]]; + [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[oldWriteId integerValue] + revert:YES + persist:NO + clock:self.serverClock]]; + } else { + abortTransaction = YES; + // The user aborted the transaction. JS treats ths as a "nodata" abort, but it's not an error, so we don't send them an error. + [transaction setAbortStatus:nil reason:nil]; + [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue] + revert:YES + persist:NO + clock:self.serverClock]]; + } + } + } + + [self.eventRaiser raiseEvents:events]; + events = nil; + + if (abortTransaction) { + // Abort + transaction.status = FTransactionCompleted; + transaction.unwatcher(); + if (transaction.onComplete) { + FIRDatabaseReference * ref = [[FIRDatabaseReference alloc] initWithRepo:self path:transaction.path]; + FIndexedNode *lastInput = [FIndexedNode indexedNodeWithNode:transaction.currentInputSnapshot]; + FIRDataSnapshot * snap = [[FIRDataSnapshot alloc] initWithRef:ref indexedNode:lastInput]; + fbt_void_void cb = ^{ + // Unlike JS, no need to check for "nodata" because ObjC has abortError = nil + transaction.onComplete(transaction.abortError, NO, snap); + }; + [callbacks addObject:[cb copy]]; + } + } + } + + // Note: unlike current js client, we don't need to preserve priority. Users can set priority via FIRMutableData + + // Clean up completed transactions. + [self pruneCompletedTransactionsBelowNode:self.transactionQueueTree]; + + // Now fire callbacks, now that we're in a good, known state. + [self.eventRaiser raiseCallbacks:callbacks]; + + // Try to send the transaction result to the server + [self sendAllReadyTransactions]; +} + +- (FTree *) getAncestorTransactionNodeForPath:(FPath *)path { + FTree* transactionNode = self.transactionQueueTree; + + while (![path isEmpty] && [transactionNode getValue] == nil) { + NSString* front = [path getFront]; + transactionNode = [transactionNode subTree:[[FPath alloc] initWith:front]]; + path = [path popFront]; + } + + return transactionNode; +} + +- (NSMutableArray *) buildTransactionQueueAtNode:(FTree *)node { + NSMutableArray* queue = [[NSMutableArray alloc] init]; + [self aggregateTransactionQueuesForNode:node andQueue:queue]; + + [queue sortUsingComparator:^NSComparisonResult(FTupleTransaction* obj1, FTupleTransaction* obj2) { + return [obj1.order compare:obj2.order]; + }]; + + return queue; +} + +- (void) aggregateTransactionQueuesForNode:(FTree *)node andQueue:(NSMutableArray *)queue { + NSArray* nodeQueue = [node getValue]; + [queue addObjectsFromArray:nodeQueue]; + + [node forEachChild:^(FTree *child) { + [self aggregateTransactionQueuesForNode:child andQueue:queue]; + }]; +} + +/** + * Remove COMPLETED transactions at or below this node in the transactionQueueTree + */ +- (void) pruneCompletedTransactionsBelowNode:(FTree *)node { + NSMutableArray* queue = [node getValue]; + if (queue != nil) { + int i = 0; + // remove all of the completed transactions from the queue + while (i < queue.count) { + FTupleTransaction* transaction = [queue objectAtIndex:i]; + if (transaction.status == FTransactionCompleted) { + [queue removeObjectAtIndex:i]; + } else { + i++; + } + } + if (queue.count > 0) { + [node setValue:queue]; + } else { + [node setValue:nil]; + } + } + + [node forEachChildMutationSafe:^(FTree *child) { + [self pruneCompletedTransactionsBelowNode:child]; + }]; +} + +/** + * Aborts all transactions on ancestors or descendants of the specified path. Called when doing a setValue: or + * updateChildValues: since we consider them incompatible with transactions + * + * @param path path for which we want to abort related transactions. + */ +- (FPath *) abortTransactionsAtPath:(FPath *)path error:(NSString *)error { + // For the common case that there are no transactions going on, skip all this! + if ([self.transactionQueueTree isEmpty]) { + return path; + } else { + FPath* affectedPath = [self getAncestorTransactionNodeForPath:path].path; + + FTree* transactionNode = [self.transactionQueueTree subTree:path]; + [transactionNode forEachAncestor:^BOOL(FTree *ancestor) { + [self abortTransactionsAtNode:ancestor error:error]; + return NO; + }]; + + [self abortTransactionsAtNode:transactionNode error:error]; + + [transactionNode forEachDescendant:^(FTree *child) { + [self abortTransactionsAtNode:child error:error]; + }]; + + return affectedPath; + } +} + +/** + * Abort transactions stored in this transactions queue node. + * + * @param node Node to abort transactions for. + */ +- (void) abortTransactionsAtNode:(FTree *)node error:(NSString *)error { + NSMutableArray* queue = [node getValue]; + if (queue != nil) { + + // Queue up the callbacks and fire them after cleaning up all of our transaction state, since + // can be immediately aborted and removed. + NSMutableArray* callbacks = [[NSMutableArray alloc] init]; + + // Go through queue. Any already-sent transactions must be marked for abort, while the unsent ones + // can be immediately aborted and removed + NSMutableArray *events = [[NSMutableArray alloc] init]; + int lastSent = -1; + // Note: all of the sent transactions will be at the front of the queue, so safe to increment lastSent + for (FTupleTransaction* transaction in queue) { + if (transaction.status == FTransactionSentNeedsAbort) { + // No-op. already marked. + } else if (transaction.status == FTransactionSent) { + // Mark this transaction for abort when it returns + lastSent++; + transaction.status = FTransactionSentNeedsAbort; + [transaction setAbortStatus:error reason:nil]; + } else { + // we can abort this immediately + transaction.unwatcher(); + if ([error isEqualToString:kFTransactionSet]) { + [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue] + revert:YES + persist:NO + clock:self.serverClock]]; + } else { + // If it was cancelled it was already removed from the sync tree, no need to ack + NSAssert([error isEqualToString:kFErrorWriteCanceled], nil); + } + + if (transaction.onComplete) { + NSError* abortReason = [FUtilities errorForStatus:error andReason:nil]; + FIRDataSnapshot * snapshot = nil; + fbt_void_void cb = ^{ + transaction.onComplete(abortReason, NO, snapshot); + }; + [callbacks addObject:[cb copy]]; + } + } + } + if (lastSent == -1) { + // We're not waiting for any sent transactions. We can clear the queue. + [node setValue:nil]; + } else { + // Remove the transactions we aborted + NSRange theRange; + theRange.location = lastSent + 1; + theRange.length = queue.count - theRange.location; + [queue removeObjectsInRange:theRange]; + } + + // Now fire the callbacks + [self.eventRaiser raiseEvents:events]; + [self.eventRaiser raiseCallbacks:callbacks]; + } +} + +@end |