From 81ee594e325a922a91557d82563132f22977c947 Mon Sep 17 00:00:00 2001 From: Michael Lehenbauer Date: Thu, 15 Feb 2018 16:17:44 -0800 Subject: DispatchQueue delayed callback improvements + testing (#784) Basically a port of https://github.com/firebase/firebase-js-sdk/commit/a1e346ff93c6cbcc0a1b3b33f0fbc3a7b66e7e12 and https://github.com/firebase/firebase-js-sdk/commit/fce4168309f42aa038125f39818fbf654b65b05f * Introduces a DelayedCallback helper class in FSTDispatchQueue to encapsulate delayed callback logic. * Adds cancellation support. * Updates the idle timer in FSTStream to use new cancellation support. * Adds a FSTTimerId enum for identifying delayed operations on the queue and uses it to identify our existing backoff and idle timers. * Added containsDelayedCallback: and runDelayedCallbacksUntil: methods to FSTDispatchQueue which can be used from tests to check for the presence of a callback or to schedule them to run early. * Removes FSTTestDispatchQueue and changes idle tests to use new test methods. --- Firestore/Source/Remote/FSTExponentialBackoff.h | 20 ++- Firestore/Source/Remote/FSTExponentialBackoff.mm | 24 +-- Firestore/Source/Remote/FSTStream.h | 12 +- Firestore/Source/Remote/FSTStream.mm | 40 +++-- Firestore/Source/Util/FSTDispatchQueue.h | 52 +++++- Firestore/Source/Util/FSTDispatchQueue.mm | 201 ++++++++++++++++++++++- 6 files changed, 305 insertions(+), 44 deletions(-) (limited to 'Firestore/Source') diff --git a/Firestore/Source/Remote/FSTExponentialBackoff.h b/Firestore/Source/Remote/FSTExponentialBackoff.h index 674b1ac..03558ee 100644 --- a/Firestore/Source/Remote/FSTExponentialBackoff.h +++ b/Firestore/Source/Remote/FSTExponentialBackoff.h @@ -16,7 +16,7 @@ #import -@class FSTDispatchQueue; +#import "Firestore/Source/Util/FSTDispatchQueue.h" NS_ASSUME_NONNULL_BEGIN @@ -29,7 +29,7 @@ NS_ASSUME_NONNULL_BEGIN @interface FSTExponentialBackoff : NSObject /** - * Creates and returns a helper for running delayed tasks following an exponential backoff curve + * Initializes a helper for running delayed tasks following an exponential backoff curve * between attempts. * * Each delay is made up of a "base" delay which follows the exponential backoff curve, and a @@ -37,6 +37,7 @@ NS_ASSUME_NONNULL_BEGIN * accidentally synchronizing their delays causing spikes of load to the backend. * * @param dispatchQueue The dispatch queue to run tasks on. + * @param timerID The ID to use when scheduling backoff operations on the FSTDispatchQueue. * @param initialDelay The initial delay (used as the base delay on the first retry attempt). * Note that jitter will still be applied, so the actual delay could be as little as * 0.5*initialDelay. @@ -45,13 +46,13 @@ NS_ASSUME_NONNULL_BEGIN * @param maxDelay The maximum base delay after which no further backoff is performed. Note that * jitter will still be applied, so the actual delay could be as much as 1.5*maxDelay. */ -+ (instancetype)exponentialBackoffWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue - initialDelay:(NSTimeInterval)initialDelay - backoffFactor:(double)backoffFactor - maxDelay:(NSTimeInterval)maxDelay; +- (instancetype)initWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue + timerID:(FSTTimerID)timerID + initialDelay:(NSTimeInterval)initialDelay + backoffFactor:(double)backoffFactor + maxDelay:(NSTimeInterval)maxDelay NS_DESIGNATED_INITIALIZER; -- (instancetype)init - __attribute__((unavailable("Use exponentialBackoffWithDispatchQueue constructor method."))); +- (instancetype)init NS_UNAVAILABLE; /** * Resets the backoff delay. @@ -68,7 +69,8 @@ NS_ASSUME_NONNULL_BEGIN - (void)resetToMax; /** - * Waits for currentDelay seconds, increases the delay and runs the specified block. + * Waits for currentDelay seconds, increases the delay and runs the specified block. If there was + * a pending block waiting to be run already, it will be canceled. * * @param block The block to run. */ diff --git a/Firestore/Source/Remote/FSTExponentialBackoff.mm b/Firestore/Source/Remote/FSTExponentialBackoff.mm index 8077357..dddf164 100644 --- a/Firestore/Source/Remote/FSTExponentialBackoff.mm +++ b/Firestore/Source/Remote/FSTExponentialBackoff.mm @@ -26,16 +26,14 @@ using firebase::firestore::util::SecureRandom; @interface FSTExponentialBackoff () -- (instancetype)initWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue - initialDelay:(NSTimeInterval)initialDelay - backoffFactor:(double)backoffFactor - maxDelay:(NSTimeInterval)maxDelay NS_DESIGNATED_INITIALIZER; @property(nonatomic, strong) FSTDispatchQueue *dispatchQueue; +@property(nonatomic, assign, readonly) FSTTimerID timerID; @property(nonatomic) double backoffFactor; @property(nonatomic) NSTimeInterval initialDelay; @property(nonatomic) NSTimeInterval maxDelay; @property(nonatomic) NSTimeInterval currentBase; +@property(nonatomic, strong, nullable) FSTDelayedCallback *timerCallback; @end @implementation FSTExponentialBackoff { @@ -43,11 +41,13 @@ using firebase::firestore::util::SecureRandom; } - (instancetype)initWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue + timerID:(FSTTimerID)timerID initialDelay:(NSTimeInterval)initialDelay backoffFactor:(double)backoffFactor maxDelay:(NSTimeInterval)maxDelay { if (self = [super init]) { _dispatchQueue = dispatchQueue; + _timerID = timerID; _initialDelay = initialDelay; _backoffFactor = backoffFactor; _maxDelay = maxDelay; @@ -57,16 +57,6 @@ using firebase::firestore::util::SecureRandom; return self; } -+ (instancetype)exponentialBackoffWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue - initialDelay:(NSTimeInterval)initialDelay - backoffFactor:(double)backoffFactor - maxDelay:(NSTimeInterval)maxDelay { - return [[FSTExponentialBackoff alloc] initWithDispatchQueue:dispatchQueue - initialDelay:initialDelay - backoffFactor:backoffFactor - maxDelay:maxDelay]; -} - - (void)reset { _currentBase = 0; } @@ -76,6 +66,9 @@ using firebase::firestore::util::SecureRandom; } - (void)backoffAndRunBlock:(void (^)(void))block { + if (self.timerCallback) { + [self.timerCallback cancel]; + } // First schedule the block using the current base (which may be 0 and should be honored as such). NSTimeInterval delayWithJitter = _currentBase + [self jitterDelay]; if (_currentBase > 0) { @@ -83,7 +76,8 @@ using firebase::firestore::util::SecureRandom; _currentBase); } - [self.dispatchQueue dispatchAfterDelay:delayWithJitter block:block]; + self.timerCallback = + [self.dispatchQueue dispatchAfterDelay:delayWithJitter timerID:self.timerID block:block]; // Apply backoff factor to determine next delay and ensure it is within bounds. _currentBase *= _backoffFactor; diff --git a/Firestore/Source/Remote/FSTStream.h b/Firestore/Source/Remote/FSTStream.h index c390dbb..6630083 100644 --- a/Firestore/Source/Remote/FSTStream.h +++ b/Firestore/Source/Remote/FSTStream.h @@ -17,6 +17,7 @@ #import #import "Firestore/Source/Core/FSTTypes.h" +#import "Firestore/Source/Util/FSTDispatchQueue.h" #include "Firestore/core/src/firebase/firestore/core/database_info.h" @@ -91,6 +92,8 @@ NS_ASSUME_NONNULL_BEGIN - (instancetype)initWithDatabase:(const firebase::firestore::core::DatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + connectionTimerID:(FSTTimerID)connectionTimerID + idleTimerID:(FSTTimerID)idleTimerID credentials:(id)credentials responseMessageClass:(Class)responseMessageClass NS_DESIGNATED_INITIALIZER; @@ -142,8 +145,13 @@ NS_ASSUME_NONNULL_BEGIN - (void)stop; /** - * Initializes the idle timer. If no write takes place within one minute, the GRPC stream will be - * closed. + * Marks this stream as idle. If no further actions are performed on the stream for one minute, the + * stream will automatically close itself and notify the stream's close handler. The stream will + * then be in a non-started state, requiring the caller to start the stream again before further + * use. + * + * Only streams that are in state 'Open' can be marked idle, as all other states imply pending + * network operations. */ - (void)markIdle; diff --git a/Firestore/Source/Remote/FSTStream.mm b/Firestore/Source/Remote/FSTStream.mm index c1479c5..f859fbb 100644 --- a/Firestore/Source/Remote/FSTStream.mm +++ b/Firestore/Source/Remote/FSTStream.mm @@ -113,7 +113,8 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { @interface FSTStream () -@property(nonatomic, getter=isIdle) BOOL idle; +@property(nonatomic, assign, readonly) FSTTimerID idleTimerID; +@property(nonatomic, strong, nullable) FSTDelayedCallback *idleTimerCallback; @property(nonatomic, weak, readwrite, nullable) id delegate; @end @@ -203,18 +204,22 @@ static const NSTimeInterval kIdleTimeout = 60.0; - (instancetype)initWithDatabase:(const DatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + connectionTimerID:(FSTTimerID)connectionTimerID + idleTimerID:(FSTTimerID)idleTimerID credentials:(id)credentials responseMessageClass:(Class)responseMessageClass { if (self = [super init]) { _databaseInfo = database; _workerDispatchQueue = workerDispatchQueue; + _idleTimerID = idleTimerID; _credentials = credentials; _responseMessageClass = responseMessageClass; - _backoff = [FSTExponentialBackoff exponentialBackoffWithDispatchQueue:workerDispatchQueue - initialDelay:kBackoffInitialDelay - backoffFactor:kBackoffFactor - maxDelay:kBackoffMaxDelay]; + _backoff = [[FSTExponentialBackoff alloc] initWithDispatchQueue:workerDispatchQueue + timerID:connectionTimerID + initialDelay:kBackoffInitialDelay + backoffFactor:kBackoffFactor + maxDelay:kBackoffMaxDelay]; _state = FSTStreamStateInitial; } return self; @@ -418,7 +423,7 @@ static const NSTimeInterval kIdleTimeout = 60.0; /** Called by the idle timer when the stream should close due to inactivity. */ - (void)handleIdleCloseTimer { [self.workerDispatchQueue verifyIsCurrentQueue]; - if (self.state == FSTStreamStateOpen && [self isIdle]) { + if ([self isOpen]) { // When timing out an idle stream there's no reason to force the stream into backoff when // it restarts so set the stream state to Initial instead of Error. [self closeWithFinalState:FSTStreamStateInitial error:nil]; @@ -427,18 +432,23 @@ static const NSTimeInterval kIdleTimeout = 60.0; - (void)markIdle { [self.workerDispatchQueue verifyIsCurrentQueue]; - if (self.state == FSTStreamStateOpen) { - self.idle = YES; - [self.workerDispatchQueue dispatchAfterDelay:kIdleTimeout - block:^() { - [self handleIdleCloseTimer]; - }]; + // Starts the idle timer if we are in state 'Open' and are not yet already running a timer (in + // which case the previous idle timeout still applies). + if ([self isOpen] && !self.idleTimerCallback) { + self.idleTimerCallback = [self.workerDispatchQueue dispatchAfterDelay:kIdleTimeout + timerID:self.idleTimerID + block:^() { + [self handleIdleCloseTimer]; + }]; } } - (void)cancelIdleCheck { [self.workerDispatchQueue verifyIsCurrentQueue]; - self.idle = NO; + if (self.idleTimerCallback) { + [self.idleTimerCallback cancel]; + self.idleTimerCallback = nil; + } } /** @@ -606,6 +616,8 @@ static const NSTimeInterval kIdleTimeout = 60.0; serializer:(FSTSerializerBeta *)serializer { self = [super initWithDatabase:database workerDispatchQueue:workerDispatchQueue + connectionTimerID:FSTTimerIDListenStreamConnection + idleTimerID:FSTTimerIDListenStreamIdle credentials:credentials responseMessageClass:[GCFSListenResponse class]]; if (self) { @@ -689,6 +701,8 @@ static const NSTimeInterval kIdleTimeout = 60.0; serializer:(FSTSerializerBeta *)serializer { self = [super initWithDatabase:database workerDispatchQueue:workerDispatchQueue + connectionTimerID:FSTTimerIDWriteStreamConnection + idleTimerID:FSTTimerIDWriteStreamIdle credentials:credentials responseMessageClass:[GCFSWriteResponse class]]; if (self) { diff --git a/Firestore/Source/Util/FSTDispatchQueue.h b/Firestore/Source/Util/FSTDispatchQueue.h index fe87887..9b28c9c 100644 --- a/Firestore/Source/Util/FSTDispatchQueue.h +++ b/Firestore/Source/Util/FSTDispatchQueue.h @@ -18,6 +18,34 @@ NS_ASSUME_NONNULL_BEGIN +/** + * Well-known "timer" IDs used when scheduling delayed callbacks on the FSTDispatchQueue. These IDs + * can then be used from tests to check for the presence of callbacks or to run them early. + */ +typedef NS_ENUM(NSInteger, FSTTimerID) { + FSTTimerIDAll, // Sentinel value to be used with runDelayedCallbacksUntil: to run all blocks. + FSTTimerIDListenStreamIdle, + FSTTimerIDListenStreamConnection, + FSTTimerIDWriteStreamIdle, + FSTTimerIDWriteStreamConnection +}; + +/** + * Handle to a callback scheduled via [FSTDispatchQueue dispatchAfterDelay:]. Supports cancellation + * via the cancel method. + */ +@interface FSTDelayedCallback : NSObject + +/** + * Cancels the callback if it hasn't already been executed or canceled. + * + * As long as the callback has not yet been run, calling cancel() (from a callback already running + * on the dispatch queue) provides a guarantee that the operation will not be run. + */ +- (void)cancel; + +@end + @interface FSTDispatchQueue : NSObject /** Creates and returns an FSTDispatchQueue wrapping the specified dispatch_queue_t. */ @@ -56,12 +84,32 @@ NS_ASSUME_NONNULL_BEGIN * Schedules a callback after the specified delay. * * Unlike dispatchAsync: this method does not require you to dispatch to a different queue than - * the current one (thus it is equivalent to a raw dispatch_after()). + * the current one. + * + * The returned FSTDelayedCallback handle can be used to cancel the callback prior to its running. * * @param block The block to run. * @param delay The delay (in seconds) after which to run the block. + * @param timerID An FSTTimerID that can be used from tests to check for the presence of this + * callback or to schedule it to run early. + * @return A FSTDelayedCallback instance that can be used for cancellation. + */ +- (FSTDelayedCallback *)dispatchAfterDelay:(NSTimeInterval)delay + timerID:(FSTTimerID)timerID + block:(void (^)(void))block; + +/** + * For Tests: Determine if a delayed callback with a particular FSTTimerID exists. + */ +- (BOOL)containsDelayedCallbackWithTimerID:(FSTTimerID)timerID; + +/** + * For Tests: Runs delayed callbacks early, blocking until completion. + * + * @param lastTimerID Only delayed callbacks up to and including one that was scheduled using this + * FSTTimerID will be run. Method throws if no matching callback exists. */ -- (void)dispatchAfterDelay:(NSTimeInterval)delay block:(void (^)(void))block; +- (void)runDelayedCallbacksUntil:(FSTTimerID)lastTimerID; /** The underlying wrapped dispatch_queue_t */ @property(nonatomic, strong, readonly) dispatch_queue_t queue; diff --git a/Firestore/Source/Util/FSTDispatchQueue.mm b/Firestore/Source/Util/FSTDispatchQueue.mm index 6ce5d74..5bd7f27 100644 --- a/Firestore/Source/Util/FSTDispatchQueue.mm +++ b/Firestore/Source/Util/FSTDispatchQueue.mm @@ -21,8 +21,138 @@ NS_ASSUME_NONNULL_BEGIN +/** + * removeDelayedCallback is used by FSTDelayedCallback and so we pre-declare it before the rest of + * the FSTDispatchQueue private interface. + */ @interface FSTDispatchQueue () +- (void)removeDelayedCallback:(FSTDelayedCallback *)callback; +@end + +#pragma mark - FSTDelayedCallback + +/** + * Represents a callback scheduled to be run in the future on an FSTDispatchQueue. + * + * It is created via [FSTDelayedCallback createAndScheduleWithQueue]. + * + * Supports cancellation (via cancel) and early execution (via skipDelay). + */ +@interface FSTDelayedCallback () + +@property(nonatomic, strong, readonly) FSTDispatchQueue *queue; +@property(nonatomic, assign, readonly) FSTTimerID timerID; +@property(nonatomic, assign, readonly) NSTimeInterval targetTime; +@property(nonatomic, copy) void (^callback)(); +/** YES if the callback has been run or canceled. */ +@property(nonatomic, getter=isDone) BOOL done; + +/** + * Creates and returns an FSTDelayedCallback that has been scheduled on the provided queue with the + * provided delay. + * + * @param queue The FSTDispatchQueue to run the callback on. + * @param timerID A FSTTimerID identifying the type of the delayed callback. + * @param delay The delay before the callback should be scheduled. + * @param callback The callback block to run. + * @return The created FSTDelayedCallback instance. + */ ++ (instancetype)createAndScheduleWithQueue:(FSTDispatchQueue *)queue + timerID:(FSTTimerID)timerID + delay:(NSTimeInterval)delay + callback:(void (^)(void))callback; + +/** + * Queues the callback to run immediately (if it hasn't already been run or canceled). + */ +- (void)skipDelay; + +@end + +@implementation FSTDelayedCallback + +- (instancetype)initWithQueue:(FSTDispatchQueue *)queue + timerID:(FSTTimerID)timerID + targetTime:(NSTimeInterval)targetTime + callback:(void (^)(void))callback { + if (self = [super init]) { + _queue = queue; + _timerID = timerID; + _targetTime = targetTime; + _callback = callback; + _done = NO; + } + return self; +} + ++ (instancetype)createAndScheduleWithQueue:(FSTDispatchQueue *)queue + timerID:(FSTTimerID)timerID + delay:(NSTimeInterval)delay + callback:(void (^)(void))callback { + NSTimeInterval targetTime = [[NSDate date] timeIntervalSince1970] + delay; + FSTDelayedCallback *delayedCallback = [[FSTDelayedCallback alloc] initWithQueue:queue + timerID:timerID + targetTime:targetTime + callback:callback]; + [delayedCallback startWithDelay:delay]; + return delayedCallback; +} + +/** + * Starts the timer. This is called immediately after construction by createAndScheduleWithQueue. + */ +- (void)startWithDelay:(NSTimeInterval)delay { + dispatch_time_t delayNs = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(delay * NSEC_PER_SEC)); + dispatch_after(delayNs, self.queue.queue, ^{ + [self delayDidElapse]; + }); +} + +- (void)skipDelay { + [self.queue dispatchAsyncAllowingSameQueue:^{ + [self delayDidElapse]; + }]; +} + +- (void)cancel { + [self.queue verifyIsCurrentQueue]; + if (!self.isDone) { + // PORTING NOTE: There's no way to actually cancel the dispatched callback, but it'll be a no-op + // since we set done to YES. + [self markDone]; + } +} + +- (void)delayDidElapse { + [self.queue verifyIsCurrentQueue]; + if (!self.isDone) { + [self markDone]; + self.callback(); + } +} + +/** + * Marks this delayed callback as done, and notifies the FSTDispatchQueue that it should be removed. + */ +- (void)markDone { + self.done = YES; + [self.queue removeDelayedCallback:self]; +} + +@end + +#pragma mark - FSTDispatchQueue + +@interface FSTDispatchQueue () + +/** + * Callbacks scheduled to be queued in the future. Callbacks are automatically removed after they + * are run or canceled. + */ +@property(nonatomic, strong, readonly) NSMutableArray *delayedCallbacks; + - (instancetype)initWithQueue:(dispatch_queue_t)queue NS_DESIGNATED_INITIALIZER; + @end @implementation FSTDispatchQueue @@ -34,6 +164,7 @@ NS_ASSUME_NONNULL_BEGIN - (instancetype)initWithQueue:(dispatch_queue_t)queue { if (self = [super init]) { _queue = queue; + _delayedCallbacks = [NSMutableArray array]; } return self; } @@ -56,9 +187,73 @@ NS_ASSUME_NONNULL_BEGIN dispatch_async(self.queue, block); } -- (void)dispatchAfterDelay:(NSTimeInterval)delay block:(void (^)(void))block { - dispatch_time_t delayNs = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(delay * NSEC_PER_SEC)); - dispatch_after(delayNs, self.queue, block); +- (FSTDelayedCallback *)dispatchAfterDelay:(NSTimeInterval)delay + timerID:(FSTTimerID)timerID + block:(void (^)(void))block { + // While not necessarily harmful, we currently don't expect to have multiple callbacks with the + // same timerID in the queue, so defensively reject them. + FSTAssert(![self containsDelayedCallbackWithTimerID:timerID], + @"Attempted to schedule multiple callbacks with id %ld", (unsigned long)timerID); + FSTDelayedCallback *delayedCallback = [FSTDelayedCallback createAndScheduleWithQueue:self + timerID:timerID + delay:delay + callback:block]; + [self.delayedCallbacks addObject:delayedCallback]; + return delayedCallback; +} + +- (BOOL)containsDelayedCallbackWithTimerID:(FSTTimerID)timerID { + NSUInteger matchIndex = [self.delayedCallbacks + indexOfObjectPassingTest:^BOOL(FSTDelayedCallback *obj, NSUInteger idx, BOOL *stop) { + return obj.timerID == timerID; + }]; + return matchIndex != NSNotFound; +} + +- (void)runDelayedCallbacksUntil:(FSTTimerID)lastTimerID { + dispatch_semaphore_t doneSemaphore = dispatch_semaphore_create(0); + + [self dispatchAsync:^{ + FSTAssert(lastTimerID == FSTTimerIDAll || [self containsDelayedCallbackWithTimerID:lastTimerID], + @"Attempted to run callbacks until missing timer ID: %ld", + (unsigned long)lastTimerID); + + [self sortDelayedCallbacks]; + for (FSTDelayedCallback *callback in self.delayedCallbacks) { + [callback skipDelay]; + if (lastTimerID != FSTTimerIDAll && callback.timerID == lastTimerID) { + break; + } + } + + // Now that the callbacks are queued, we want to enqueue an additional item to release the + // 'done' semaphore. + [self dispatchAsyncAllowingSameQueue:^{ + dispatch_semaphore_signal(doneSemaphore); + }]; + }]; + + dispatch_semaphore_wait(doneSemaphore, DISPATCH_TIME_FOREVER); +} + +// NOTE: For performance we could store the callbacks sorted (e.g. using std::priority_queue), +// but this sort only happens in tests (if runDelayedCallbacksUntil: is called), and the size +// is guaranteed to be small since we don't allow duplicate TimerIds (of which there are only 4). +- (void)sortDelayedCallbacks { + // We want to run callbacks in the same order they'd run if they ran naturally. + [self.delayedCallbacks + sortUsingComparator:^NSComparisonResult(FSTDelayedCallback *a, FSTDelayedCallback *b) { + return a.targetTime < b.targetTime + ? NSOrderedAscending + : a.targetTime > b.targetTime ? NSOrderedDescending : NSOrderedSame; + }]; +} + +/** Called by FSTDelayedCallback when a callback is run or canceled. */ +- (void)removeDelayedCallback:(FSTDelayedCallback *)callback { + NSUInteger index = [self.delayedCallbacks indexOfObject:callback]; + FSTAssert(index != NSNotFound, @"Delayed callback not found."); + [self.delayedCallbacks removeObjectAtIndex:index]; } #pragma mark - Private Methods -- cgit v1.2.3