diff options
author | Sebastian Schmidt <mrschmidt@google.com> | 2017-10-30 18:17:16 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-30 18:17:16 -0700 |
commit | 02ff6bbee95150eacff9563af4dd7a6e1aeaebdd (patch) | |
tree | 8a095ae29bdb6daf273f57913af021c2eae981ab /Firestore | |
parent | 1db9fd83df8d29abe5e7369ad1cbf3eb8545a78a (diff) |
Closing the write and watch stream after 60s of idleness (#388)
Diffstat (limited to 'Firestore')
-rw-r--r-- | Firestore/Example/Firestore.xcodeproj/project.pbxproj | 8 | ||||
-rw-r--r-- | Firestore/Example/Tests/Integration/API/FIRDatabaseTests.m | 18 | ||||
-rw-r--r-- | Firestore/Example/Tests/Integration/FSTStreamTests.m | 88 | ||||
-rw-r--r-- | Firestore/Example/Tests/SpecTests/FSTMockDatastore.m | 71 | ||||
-rw-r--r-- | Firestore/Example/Tests/Util/FSTIntegrationTestCase.h | 5 | ||||
-rw-r--r-- | Firestore/Example/Tests/Util/FSTIntegrationTestCase.m | 36 | ||||
-rw-r--r-- | Firestore/Example/Tests/Util/FSTTestDispatchQueue.h | 39 | ||||
-rw-r--r-- | Firestore/Example/Tests/Util/FSTTestDispatchQueue.m | 61 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTDatastore.h | 44 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTDatastore.m | 276 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTExponentialBackoff.m | 5 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTRemoteStore.m | 48 | ||||
-rw-r--r-- | Firestore/Source/Util/FSTDispatchQueue.h | 13 | ||||
-rw-r--r-- | Firestore/Source/Util/FSTDispatchQueue.m | 5 |
14 files changed, 540 insertions, 177 deletions
diff --git a/Firestore/Example/Firestore.xcodeproj/project.pbxproj b/Firestore/Example/Firestore.xcodeproj/project.pbxproj index 2de1066..0197deb 100644 --- a/Firestore/Example/Firestore.xcodeproj/project.pbxproj +++ b/Firestore/Example/Firestore.xcodeproj/project.pbxproj @@ -59,6 +59,8 @@ AFE6114F0D4DAECBA7B7C089 /* Pods_Firestore_IntegrationTests.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = B2FA635DF5D116A67A7441CD /* Pods_Firestore_IntegrationTests.framework */; }; C4E749275AD0FBDF9F4716A8 /* Pods_SwiftBuildTest.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 32AD40BF6B0E849B07FFD05E /* Pods_SwiftBuildTest.framework */; }; D5B2532E4676014F57A7EAB9 /* FSTStreamTests.m in Sources */ = {isa = PBXBuildFile; fileRef = D5B25C0D4AADFCA3ADB883E4 /* FSTStreamTests.m */; }; + D5B25474286C9800CE42B8C2 /* FSTTestDispatchQueue.m in Sources */ = {isa = PBXBuildFile; fileRef = D5B25292CED31B81FDED0411 /* FSTTestDispatchQueue.m */; }; + D5B259FDEE8094E8D710C5BF /* FSTTestDispatchQueue.m in Sources */ = {isa = PBXBuildFile; fileRef = D5B25292CED31B81FDED0411 /* FSTTestDispatchQueue.m */; }; DE03B2C91F2149D600A30B9C /* FSTTransactionTests.m in Sources */ = {isa = PBXBuildFile; fileRef = DE51B1C61F0D48AC0013853F /* FSTTransactionTests.m */; }; DE03B2D41F2149D600A30B9C /* XCTest.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 6003F5AF195388D20070C39A /* XCTest.framework */; }; DE03B2D51F2149D600A30B9C /* UIKit.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 6003F591195388D20070C39A /* UIKit.framework */; }; @@ -222,6 +224,8 @@ B2FA635DF5D116A67A7441CD /* Pods_Firestore_IntegrationTests.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = Pods_Firestore_IntegrationTests.framework; sourceTree = BUILT_PRODUCTS_DIR; }; CE00BABB5A3AAB44A4C209E2 /* Pods-Firestore_Tests.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Firestore_Tests.debug.xcconfig"; path = "Pods/Target Support Files/Pods-Firestore_Tests/Pods-Firestore_Tests.debug.xcconfig"; sourceTree = "<group>"; }; D3CC3DC5338DCAF43A211155 /* README.md */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = net.daringfireball.markdown; name = README.md; path = ../README.md; sourceTree = "<group>"; }; + D5B25292CED31B81FDED0411 /* FSTTestDispatchQueue.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = FSTTestDispatchQueue.m; sourceTree = "<group>"; }; + D5B259DAA9149B80D6245B57 /* FSTTestDispatchQueue.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = FSTTestDispatchQueue.h; sourceTree = "<group>"; }; D5B25C0D4AADFCA3ADB883E4 /* FSTStreamTests.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = FSTStreamTests.m; sourceTree = "<group>"; }; DB17FEDFB80770611A935A60 /* Pods-Firestore_IntegrationTests.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Firestore_IntegrationTests.release.xcconfig"; path = "Pods/Target Support Files/Pods-Firestore_IntegrationTests/Pods-Firestore_IntegrationTests.release.xcconfig"; sourceTree = "<group>"; }; DE03B2E91F2149D600A30B9C /* Firestore_IntegrationTests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = Firestore_IntegrationTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; }; @@ -561,6 +565,8 @@ DE51B18A1F0D48AC0013853F /* FSTUtilTests.m */, 54E9282A1F339CAD00C1953E /* XCTestCase+Await.h */, 54E9282B1F339CAD00C1953E /* XCTestCase+Await.m */, + D5B259DAA9149B80D6245B57 /* FSTTestDispatchQueue.h */, + D5B25292CED31B81FDED0411 /* FSTTestDispatchQueue.m */, ); path = Util; sourceTree = "<group>"; @@ -1175,6 +1181,7 @@ DE51B1CD1F0D48CD0013853F /* FSTDatabaseInfoTests.m in Sources */, DE51B1F21F0D49140013853F /* FSTPathTests.m in Sources */, DE51B1DD1F0D490D0013853F /* FSTLocalStoreTests.m in Sources */, + D5B25474286C9800CE42B8C2 /* FSTTestDispatchQueue.m in Sources */, ); runOnlyForDeploymentPostprocessing = 0; }; @@ -1199,6 +1206,7 @@ DE03B2C91F2149D600A30B9C /* FSTTransactionTests.m in Sources */, 54DA12B11F315F3800DD57A1 /* FIRValidationTests.m in Sources */, D5B2532E4676014F57A7EAB9 /* FSTStreamTests.m in Sources */, + D5B259FDEE8094E8D710C5BF /* FSTTestDispatchQueue.m in Sources */, ); runOnlyForDeploymentPostprocessing = 0; }; diff --git a/Firestore/Example/Tests/Integration/API/FIRDatabaseTests.m b/Firestore/Example/Tests/Integration/API/FIRDatabaseTests.m index 68692cc..6a6e49a 100644 --- a/Firestore/Example/Tests/Integration/API/FIRDatabaseTests.m +++ b/Firestore/Example/Tests/Integration/API/FIRDatabaseTests.m @@ -913,4 +913,22 @@ [self awaitExpectations]; } +- (void)testWriteStreamReconnectsAfterIdle { + FIRDocumentReference *doc = [self documentRef]; + FIRFirestore *firestore = doc.firestore; + + [self writeDocumentRef:doc data:@{@"foo" : @"bar"}]; + [self waitForIdleFirestore:firestore]; + [self writeDocumentRef:doc data:@{@"foo" : @"bar"}]; +} + +- (void)testWatchStreamReconnectsAfterIdle { + FIRDocumentReference *doc = [self documentRef]; + FIRFirestore *firestore = doc.firestore; + + [self readSnapshotForRef:[self documentRef] requireOnline:YES]; + [self waitForIdleFirestore:firestore]; + [self readSnapshotForRef:[self documentRef] requireOnline:YES]; +} + @end diff --git a/Firestore/Example/Tests/Integration/FSTStreamTests.m b/Firestore/Example/Tests/Integration/FSTStreamTests.m index 7ca123d..dccaa70 100644 --- a/Firestore/Example/Tests/Integration/FSTStreamTests.m +++ b/Firestore/Example/Tests/Integration/FSTStreamTests.m @@ -22,10 +22,10 @@ #import "Core/FSTDatabaseInfo.h" #import "FSTHelpers.h" #import "FSTIntegrationTestCase.h" +#import "FSTTestDispatchQueue.h" #import "Model/FSTDatabaseID.h" #import "Remote/FSTDatastore.h" #import "Util/FSTAssert.h" -#import "Util/FSTDispatchQueue.h" /** Exposes otherwise private methods for testing. */ @interface FSTStream (Testing) @@ -79,14 +79,14 @@ _expectation = nil; } -- (void)writeStreamDidClose:(NSError *_Nullable)error { - [_states addObject:@"writeStreamDidClose"]; +- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error { + [_states addObject:@"writeStreamWasInterrupted"]; [_expectation fulfill]; _expectation = nil; } -- (void)watchStreamDidClose:(NSError *_Nullable)error { - [_states addObject:@"watchStreamDidClose"]; +- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error { + [_states addObject:@"watchStreamWasInterrupted"]; [_expectation fulfill]; _expectation = nil; } @@ -126,10 +126,10 @@ @implementation FSTStreamTests { dispatch_queue_t _testQueue; + FSTTestDispatchQueue *_workerDispatchQueue; FSTDatabaseInfo *_databaseInfo; FSTEmptyCredentialsProvider *_credentials; FSTStreamStatusDelegate *_delegate; - FSTDispatchQueue *_workerDispatchQueue; /** Single mutation to send to the write stream. */ NSArray<FSTMutation *> *_mutations; @@ -138,38 +138,37 @@ - (void)setUp { [super setUp]; - _mutations = @[ FSTTestSetMutation(@"foo/bar", @{}) ]; - FIRFirestoreSettings *settings = [FSTIntegrationTestCase settings]; FSTDatabaseID *databaseID = [FSTDatabaseID databaseIDWithProject:[FSTIntegrationTestCase projectID] database:kDefaultDatabaseID]; + _testQueue = dispatch_queue_create("FSTStreamTestWorkerQueue", DISPATCH_QUEUE_SERIAL); + _workerDispatchQueue = [[FSTTestDispatchQueue alloc] initWithQueue:_testQueue]; + _databaseInfo = [FSTDatabaseInfo databaseInfoWithDatabaseID:databaseID persistenceKey:@"test-key" host:settings.host sslEnabled:settings.sslEnabled]; - _testQueue = dispatch_queue_create("FSTStreamTestWorkerQueue", DISPATCH_QUEUE_SERIAL); - _workerDispatchQueue = [FSTDispatchQueue queueWith:_testQueue]; _credentials = [[FSTEmptyCredentialsProvider alloc] init]; + + _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue]; + + _mutations = @[ FSTTestSetMutation(@"foo/bar", @{}) ]; } - (FSTWriteStream *)setUpWriteStream { FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:_databaseInfo workerDispatchQueue:_workerDispatchQueue credentials:_credentials]; - - _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue]; - return [datastore createWriteStreamWithDelegate:_delegate]; + return [datastore createWriteStream]; } - (FSTWatchStream *)setUpWatchStream { FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:_databaseInfo workerDispatchQueue:_workerDispatchQueue credentials:_credentials]; - - _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue]; - return [datastore createWatchStreamWithDelegate:_delegate]; + return [datastore createWatchStream]; } /** @@ -190,7 +189,7 @@ FSTWatchStream *watchStream = [self setUpWatchStream]; [_delegate awaitNotificationFromBlock:^{ - [watchStream start]; + [watchStream startWithDelegate:_delegate]; }]; // Stop must not call watchStreamDidClose because the full implementation of the delegate could @@ -210,7 +209,7 @@ FSTWriteStream *writeStream = [self setUpWriteStream]; [_delegate awaitNotificationFromBlock:^{ - [writeStream start]; + [writeStream startWithDelegate:_delegate]; }]; // Don't start the handshake. @@ -231,7 +230,7 @@ FSTWriteStream *writeStream = [self setUpWriteStream]; [_delegate awaitNotificationFromBlock:^{ - [writeStream start]; + [writeStream startWithDelegate:_delegate]; }]; // Writing before the handshake should throw @@ -258,4 +257,55 @@ ]]; } +- (void)testStreamClosesWhenIdle { + FSTWriteStream *writeStream = [self setUpWriteStream]; + + [_delegate awaitNotificationFromBlock:^{ + [writeStream startWithDelegate:_delegate]; + }]; + + [_delegate awaitNotificationFromBlock:^{ + [writeStream writeHandshake]; + }]; + + [_delegate awaitNotificationFromBlock:^{ + [writeStream markIdle]; + }]; + + dispatch_sync(_testQueue, ^{ + XCTAssertFalse([writeStream isOpen]); + }); + + [self verifyDelegateObservedStates:@[ + @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake", @"writeStreamWasInterrupted" + ]]; +} + +- (void)testStreamCancelsIdleOnWrite { + FSTWriteStream *writeStream = [self setUpWriteStream]; + + [_delegate awaitNotificationFromBlock:^{ + [writeStream startWithDelegate:_delegate]; + }]; + + [_delegate awaitNotificationFromBlock:^{ + [writeStream writeHandshake]; + }]; + + // Mark the stream idle, but immediately cancel the idle timer by issuing another write. + [_delegate awaitNotificationFromBlock:^{ + [writeStream markIdle]; + [writeStream writeMutations:_mutations]; + }]; + + dispatch_sync(_testQueue, ^{ + XCTAssertTrue([writeStream isOpen]); + }); + + [self verifyDelegateObservedStates:@[ + @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake", + @"writeStreamDidReceiveResponseWithVersion" + ]]; +} + @end diff --git a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m index 8b95286..6af2053 100644 --- a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m +++ b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m @@ -39,19 +39,17 @@ NS_ASSUME_NONNULL_BEGIN - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer - delegate:(id<FSTWatchStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER; + serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER; - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials - responseMessageClass:(Class)responseMessageClass - delegate:(id<FSTWatchStreamDelegate>)delegate NS_UNAVAILABLE; + responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE; @property(nonatomic, assign) BOOL open; - @property(nonatomic, strong, readonly) NSMutableDictionary<FSTBoxedTargetID *, FSTQueryData *> *activeTargets; +@property(nonatomic, weak, readwrite, nullable) id<FSTWatchStreamDelegate> delegate; @end @@ -60,13 +58,11 @@ NS_ASSUME_NONNULL_BEGIN - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer - delegate:(id<FSTWatchStreamDelegate>)delegate { + serializer:(FSTSerializerBeta *)serializer { self = [super initWithDatabase:database workerDispatchQueue:workerDispatchQueue credentials:credentials - serializer:serializer - delegate:delegate]; + serializer:serializer]; if (self) { FSTAssert(database, @"Database must not be nil"); _activeTargets = [NSMutableDictionary dictionary]; @@ -76,10 +72,15 @@ NS_ASSUME_NONNULL_BEGIN #pragma mark - Overridden FSTWatchStream methods. -- (void)start { +- (void)startWithDelegate:(id<FSTWatchStreamDelegate>)delegate { FSTAssert(!self.open, @"Trying to start already started watch stream"); self.open = YES; - [self handleStreamOpen]; + self.delegate = delegate; + [self notifyStreamOpen]; +} + +- (void)stop { + self.delegate = nil; } - (BOOL)isOpen { @@ -90,10 +91,14 @@ NS_ASSUME_NONNULL_BEGIN return self.open; } -- (void)handleStreamOpen { +- (void)notifyStreamOpen { [self.delegate watchStreamDidOpen]; } +- (void)notifyStreamInterruptedWithError:(nullable NSError *)error { + [self.delegate watchStreamWasInterruptedWithError:error]; +} + - (void)watchQuery:(FSTQueryData *)query { FSTLog(@"watchQuery: %d: %@", query.targetID, query.query); // Snapshot version is ignored on the wire @@ -110,7 +115,7 @@ NS_ASSUME_NONNULL_BEGIN - (void)failStreamWithError:(NSError *)error { self.open = NO; - [self.delegate watchStreamDidClose:error]; + [self notifyStreamInterruptedWithError:error]; } #pragma mark - Helper methods. @@ -142,17 +147,16 @@ NS_ASSUME_NONNULL_BEGIN - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer - delegate:(id<FSTWriteStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER; + serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER; - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials - responseMessageClass:(Class)responseMessageClass - delegate:(id<FSTWriteStreamDelegate>)delegate NS_UNAVAILABLE; + responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE; @property(nonatomic, assign) BOOL open; @property(nonatomic, strong, readonly) NSMutableArray<NSArray<FSTMutation *> *> *sentMutations; +@property(nonatomic, weak, readwrite, nullable) id<FSTWriteStreamDelegate> delegate; @end @@ -161,13 +165,11 @@ NS_ASSUME_NONNULL_BEGIN - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer - delegate:(id<FSTWriteStreamDelegate>)delegate { + serializer:(FSTSerializerBeta *)serializer { self = [super initWithDatabase:database workerDispatchQueue:workerDispatchQueue credentials:credentials - serializer:serializer - delegate:delegate]; + serializer:serializer]; if (self) { _sentMutations = [NSMutableArray array]; } @@ -176,11 +178,16 @@ NS_ASSUME_NONNULL_BEGIN #pragma mark - Overridden FSTWriteStream methods. -- (void)start { +- (void)startWithDelegate:(id<FSTWriteStreamDelegate>)delegate { FSTAssert(!self.open, @"Trying to start already started write stream"); self.open = YES; [self.sentMutations removeAllObjects]; - [self handleStreamOpen]; + self.delegate = delegate; + [self notifyStreamOpen]; +} + +- (void)stop { + self.delegate = nil; } - (BOOL)isOpen { @@ -200,10 +207,14 @@ NS_ASSUME_NONNULL_BEGIN [self.sentMutations addObject:mutations]; } -- (void)handleStreamOpen { +- (void)notifyStreamOpen { [self.delegate writeStreamDidOpen]; } +- (void)notifyStreamInterruptedWithError:(nullable NSError *)error { + [self.delegate writeStreamWasInterruptedWithError:error]; +} + #pragma mark - Helper methods. /** Injects a write ack as though it had come from the backend in response to a write. */ @@ -215,7 +226,7 @@ NS_ASSUME_NONNULL_BEGIN /** Injects a failed write response as though it had come from the backend. */ - (void)failStreamWithError:(NSError *)error { self.open = NO; - [self.delegate writeStreamDidClose:error]; + [self notifyStreamInterruptedWithError:error]; } /** @@ -269,27 +280,25 @@ NS_ASSUME_NONNULL_BEGIN #pragma mark - Overridden FSTDatastore methods. -- (FSTWatchStream *)createWatchStreamWithDelegate:(id<FSTWatchStreamDelegate>)delegate { +- (FSTWatchStream *)createWatchStream { FSTAssert(self.databaseInfo, @"DatabaseInfo must not be nil"); self.watchStream = [[FSTMockWatchStream alloc] initWithDatabase:self.databaseInfo workerDispatchQueue:self.workerDispatchQueue credentials:self.credentials serializer:[[FSTSerializerBeta alloc] - initWithDatabaseID:self.databaseInfo.databaseID] - delegate:delegate]; + initWithDatabaseID:self.databaseInfo.databaseID]]; return self.watchStream; } -- (FSTWriteStream *)createWriteStreamWithDelegate:(id<FSTWriteStreamDelegate>)delegate { +- (FSTWriteStream *)createWriteStream { FSTAssert(self.databaseInfo, @"DatabaseInfo must not be nil"); self.writeStream = [[FSTMockWriteStream alloc] initWithDatabase:self.databaseInfo workerDispatchQueue:self.workerDispatchQueue credentials:self.credentials serializer:[[FSTSerializerBeta alloc] - initWithDatabaseID:self.databaseInfo.databaseID] - delegate:delegate]; + initWithDatabaseID:self.databaseInfo.databaseID]]; return self.writeStream; } diff --git a/Firestore/Example/Tests/Util/FSTIntegrationTestCase.h b/Firestore/Example/Tests/Util/FSTIntegrationTestCase.h index 3dd5464..a2c08ec 100644 --- a/Firestore/Example/Tests/Util/FSTIntegrationTestCase.h +++ b/Firestore/Example/Tests/Util/FSTIntegrationTestCase.h @@ -56,6 +56,8 @@ NS_ASSUME_NONNULL_BEGIN - (FIRCollectionReference *)collectionRefWithDocuments: (NSDictionary<NSString *, NSDictionary<NSString *, id> *> *)documents; +- (void)waitForIdleFirestore:(FIRFirestore *)firestore; + - (void)writeAllDocuments:(NSDictionary<NSString *, NSDictionary<NSString *, id> *> *)documents toCollection:(FIRCollectionReference *)collection; @@ -67,6 +69,9 @@ NS_ASSUME_NONNULL_BEGIN - (FIRQuerySnapshot *)readDocumentSetForRef:(FIRQuery *)query; +- (FIRDocumentSnapshot *)readSnapshotForRef:(FIRDocumentReference *)query + requireOnline:(BOOL)online; + - (void)writeDocumentRef:(FIRDocumentReference *)ref data:(NSDictionary<NSString *, id> *)data; - (void)updateDocumentRef:(FIRDocumentReference *)ref data:(NSDictionary<NSString *, id> *)data; diff --git a/Firestore/Example/Tests/Util/FSTIntegrationTestCase.m b/Firestore/Example/Tests/Util/FSTIntegrationTestCase.m index 87a78c3..2e1e0a9 100644 --- a/Firestore/Example/Tests/Util/FSTIntegrationTestCase.m +++ b/Firestore/Example/Tests/Util/FSTIntegrationTestCase.m @@ -30,9 +30,14 @@ #import "Util/FSTUtil.h" #import "FSTEventAccumulator.h" +#import "FSTTestDispatchQueue.h" NS_ASSUME_NONNULL_BEGIN +@interface FIRFirestore (Testing) +@property(nonatomic, strong) FSTDispatchQueue *workerDispatchQueue; +@end + @implementation FSTIntegrationTestCase { NSMutableArray<FIRFirestore *> *_firestores; } @@ -121,7 +126,7 @@ NS_ASSUME_NONNULL_BEGIN - (FIRFirestore *)firestoreWithProjectID:(NSString *)projectID { NSString *persistenceKey = [NSString stringWithFormat:@"db%lu", (unsigned long)_firestores.count]; - FSTDispatchQueue *workerDispatchQueue = [FSTDispatchQueue + FSTTestDispatchQueue *workerDispatchQueue = [FSTTestDispatchQueue queueWith:dispatch_queue_create("com.google.firebase.firestore", DISPATCH_QUEUE_SERIAL)]; FSTEmptyCredentialsProvider *credentialsProvider = [[FSTEmptyCredentialsProvider alloc] init]; @@ -142,6 +147,14 @@ NS_ASSUME_NONNULL_BEGIN return firestore; } +- (void)waitForIdleFirestore:(FIRFirestore *)firestore { + XCTestExpectation *expectation = [self expectationWithDescription:@"idle"]; + // Note that we wait on any task that is scheduled with a delay of 60s. Currently, the idle + // timeout is the only task that uses this delay. + [((FSTTestDispatchQueue *)firestore.workerDispatchQueue) fulfillOnExecution:expectation]; + [self awaitExpectations]; +} + - (void)shutdownFirestore:(FIRFirestore *)firestore { XCTestExpectation *shutdownCompletion = [self expectationWithDescription:@"shutdown"]; [firestore shutdownWithCompletion:^(NSError *_Nullable error) { @@ -222,6 +235,27 @@ NS_ASSUME_NONNULL_BEGIN return result; } +- (FIRDocumentSnapshot *)readSnapshotForRef:(FIRDocumentReference *)ref + requireOnline:(BOOL)requireOnline { + __block FIRDocumentSnapshot *result; + + XCTestExpectation *expectation = [self expectationWithDescription:@"listener"]; + id<FIRListenerRegistration> listener = [ref + addSnapshotListenerWithOptions:[[FIRDocumentListenOptions options] includeMetadataChanges:YES] + listener:^(FIRDocumentSnapshot *snapshot, NSError *error) { + XCTAssertNil(error); + if (!requireOnline || !snapshot.metadata.fromCache) { + result = snapshot; + [expectation fulfill]; + } + }]; + + [self awaitExpectations]; + [listener remove]; + + return result; +} + - (void)writeDocumentRef:(FIRDocumentReference *)ref data:(NSDictionary<NSString *, id> *)data { XCTestExpectation *expectation = [self expectationWithDescription:@"setData"]; [ref setData:data diff --git a/Firestore/Example/Tests/Util/FSTTestDispatchQueue.h b/Firestore/Example/Tests/Util/FSTTestDispatchQueue.h new file mode 100644 index 0000000..4f4e13e --- /dev/null +++ b/Firestore/Example/Tests/Util/FSTTestDispatchQueue.h @@ -0,0 +1,39 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "Util/FSTDispatchQueue.h" + +@class XCTestExpectation; + +NS_ASSUME_NONNULL_BEGIN + +/** + * Dispatch queue used in the integration tests that caps delayed executions at 1.0 seconds. + */ +@interface FSTTestDispatchQueue : FSTDispatchQueue + +/** Creates and returns an FSTTestDispatchQueue wrapping the specified dispatch_queue_t. */ ++ (instancetype)queueWith:(dispatch_queue_t)dispatchQueue; + +/** + * Registers a test expectation that is fulfilled when the next delayed callback finished + * executing. + */ +- (void)fulfillOnExecution:(XCTestExpectation *)expectation; + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Example/Tests/Util/FSTTestDispatchQueue.m b/Firestore/Example/Tests/Util/FSTTestDispatchQueue.m new file mode 100644 index 0000000..27b62bc --- /dev/null +++ b/Firestore/Example/Tests/Util/FSTTestDispatchQueue.m @@ -0,0 +1,61 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "FSTTestDispatchQueue.h" + +#import <XCTest/XCTestExpectation.h> + +#import "Util/FSTAssert.h" + +@interface FSTTestDispatchQueue () + +@property(nonatomic, weak) XCTestExpectation* expectation; + +@end + +@implementation FSTTestDispatchQueue + +/** The delay used by the idle timeout */ +static const NSTimeInterval kIdleDispatchDelay = 60.0; + +/** The maximum delay we use in a test run. */ +static const NSTimeInterval kTestDispatchDelay = 1.0; + ++ (instancetype)queueWith:(dispatch_queue_t)dispatchQueue { + return [[FSTTestDispatchQueue alloc] initWithQueue:dispatchQueue]; +} + +- (instancetype)initWithQueue:(dispatch_queue_t)dispatchQueue { + return (self = [super initWithQueue:dispatchQueue]); +} + +- (void)dispatchAfterDelay:(NSTimeInterval)delay block:(void (^)(void))block { + [super dispatchAfterDelay:MIN(delay, kTestDispatchDelay) + block:^() { + block(); + if (delay == kIdleDispatchDelay) { + [_expectation fulfill]; + _expectation = nil; + } + }]; +} + +- (void)fulfillOnExecution:(XCTestExpectation*)expectation { + FSTAssert(_expectation == nil, @"Previous expectation still active"); + _expectation = expectation; +} + +@end diff --git a/Firestore/Source/Remote/FSTDatastore.h b/Firestore/Source/Remote/FSTDatastore.h index ec2affa..b8e53f5 100644 --- a/Firestore/Source/Remote/FSTDatastore.h +++ b/Firestore/Source/Remote/FSTDatastore.h @@ -82,10 +82,10 @@ NS_ASSUME_NONNULL_BEGIN completion:(FSTVoidErrorBlock)completion; /** Creates a new watch stream. */ -- (FSTWatchStream *)createWatchStreamWithDelegate:(id<FSTWatchStreamDelegate>)delegate; +- (FSTWatchStream *)createWatchStream; /** Creates a new write stream. */ -- (FSTWriteStream *)createWriteStreamWithDelegate:(id<FSTWriteStreamDelegate>)delegate; +- (FSTWriteStream *)createWriteStream; /** The name of the database and the backend. */ @property(nonatomic, strong, readonly) FSTDatabaseInfo *databaseInfo; @@ -122,10 +122,10 @@ NS_ASSUME_NONNULL_BEGIN * * An implementation of FSTStream needs to implement the following methods: * - `createRPCWithRequestsWriter`, should create the specific RPC (a GRPCCall object). - * - `handleStreamOpen`, should call through to the stream-specific streamDidOpen method. * - `handleStreamMessage`, receives protocol buffer responses from GRPC and must deserialize and * delegate to some stream specific response method. - * - `handleStreamClose`, calls through to the stream-specific streamDidClose method. + * - `notifyStreamOpen`, should call through to the stream-specific streamDidOpen method. + * - `notifyStreamInterrupted`, calls through to the stream-specific streamWasInterrupted method. * * Additionally, beyond these required methods, subclasses will want to implement methods that * take request models, serialize them, and write them to using writeRequest:. @@ -139,7 +139,7 @@ NS_ASSUME_NONNULL_BEGIN * * See https://github.com/grpc/grpc/issues/10957 for the kinds of things we're trying to avoid. */ -@interface FSTStream : NSObject +@interface FSTStream <__covariant FSTStreamDelegate> : NSObject - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue @@ -175,7 +175,7 @@ NS_ASSUME_NONNULL_BEGIN * * When start returns, -isStarted will return YES. */ -- (void)start; +- (void)startWithDelegate:(id)delegate; /** * Stops the RPC. This call is idempotent and allowed regardless of the current isStarted state. @@ -194,6 +194,12 @@ NS_ASSUME_NONNULL_BEGIN - (void)stop; /** + * Initializes the idle timer. If no write takes place within one minute, the GRPC stream will be + * closed. + */ +- (void)markIdle; + +/** * After an error the stream will usually back off on the next attempt to start it. If the error * warrants an immediate restart of the stream, the sender can use this to indicate that the * receiver should not back off. @@ -221,14 +227,14 @@ NS_ASSUME_NONNULL_BEGIN snapshotVersion:(FSTSnapshotVersion *)snapshotVersion; /** - * Called by the FSTWatchStream when the underlying streaming RPC is closed for whatever reason, - * usually because of an error, but possibly due to an idle timeout. The error passed to this - * method may be nil, in which case the stream was closed without attributable fault. + * Called by the FSTWatchStream when the underlying streaming RPC is interrupted for whatever + * reason, usually because of an error, but possibly due to an idle timeout. The error passed to + * this method may be nil, in which case the stream was closed without attributable fault. * * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping" * on FSTStream for details. */ -- (void)watchStreamDidClose:(NSError *_Nullable)error; +- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error; @end @@ -247,8 +253,7 @@ NS_ASSUME_NONNULL_BEGIN - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer - delegate:(id<FSTWatchStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER; + serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER; - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue @@ -267,8 +272,6 @@ NS_ASSUME_NONNULL_BEGIN /** Unregisters interest in the results of the query associated with the given target ID. */ - (void)unwatchTargetID:(FSTTargetID)targetID; -@property(nonatomic, weak, readonly) id<FSTWatchStreamDelegate> delegate; - @end #pragma mark - FSTWriteStream @@ -292,14 +295,14 @@ NS_ASSUME_NONNULL_BEGIN mutationResults:(NSArray<FSTMutationResult *> *)results; /** - * Called when the FSTWriteStream's underlying RPC is closed for whatever reason, usually because - * of an error, but possibly due to an idle timeout. The error passed to this method may be nil, in - * which case the stream was closed without attributable fault. + * Called when the FSTWriteStream's underlying RPC is interrupted for whatever reason, usually + * because of an error, but possibly due to an idle timeout. The error passed to this method may be + * nil, in which case the stream was closed without attributable fault. * * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping" * on FSTStream for details. */ -- (void)writeStreamDidClose:(NSError *_Nullable)error; +- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error; @end @@ -324,8 +327,7 @@ NS_ASSUME_NONNULL_BEGIN - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer - delegate:(id<FSTWriteStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER; + serializer:(FSTSerializerBeta *)serializer; - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue @@ -344,8 +346,6 @@ NS_ASSUME_NONNULL_BEGIN /** Sends a group of mutations to the Firestore backend to apply. */ - (void)writeMutations:(NSArray<FSTMutation *> *)mutations; -@property(nonatomic, weak, readonly) id<FSTWriteStreamDelegate> delegate; - /** * Tracks whether or not a handshake has been successfully exchanged and the stream is ready to * accept mutations. diff --git a/Firestore/Source/Remote/FSTDatastore.m b/Firestore/Source/Remote/FSTDatastore.m index 95ee12c..1cabc36 100644 --- a/Firestore/Source/Remote/FSTDatastore.m +++ b/Firestore/Source/Remote/FSTDatastore.m @@ -115,14 +115,19 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer - delegate:(id<FSTWatchStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER; + serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER; - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials - responseMessageClass:(Class)responseMessageClass - delegate:(id<FSTWatchStreamDelegate>)delegate NS_UNAVAILABLE; + responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE; + +@end + +@interface FSTStream () + +@property(nonatomic, getter=isIdle) BOOL idle; +@property(nonatomic, weak, readwrite, nullable) id delegate; @end @@ -407,20 +412,18 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { }]; } -- (FSTWatchStream *)createWatchStreamWithDelegate:(id<FSTWatchStreamDelegate>)delegate { +- (FSTWatchStream *)createWatchStream { return [[FSTWatchStream alloc] initWithDatabase:_databaseInfo workerDispatchQueue:_workerDispatchQueue credentials:_credentials - serializer:_serializer - delegate:delegate]; + serializer:_serializer]; } -- (FSTWriteStream *)createWriteStreamWithDelegate:(id<FSTWriteStreamDelegate>)delegate { +- (FSTWriteStream *)createWriteStream { return [[FSTWriteStream alloc] initWithDatabase:_databaseInfo workerDispatchQueue:_workerDispatchQueue credentials:_credentials - serializer:_serializer - delegate:delegate]; + serializer:_serializer]; } /** Adds headers to the RPC including any OAuth access token if provided .*/ @@ -436,10 +439,60 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { @end +#pragma mark - FSTCallbackFilter + +/** Filter class that allows disabling of GRPC callbacks. */ +@interface FSTCallbackFilter : NSObject <GRXWriteable> + +- (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER; +- (instancetype)init NS_UNAVAILABLE; + +@property(atomic, readwrite) BOOL callbacksEnabled; +@property(nonatomic, strong, readonly) FSTStream *stream; + +@end + +@implementation FSTCallbackFilter + +- (instancetype)initWithStream:(FSTStream *)stream { + if (self = [super init]) { + _callbacksEnabled = YES; + _stream = stream; + } + return self; +} + +- (void)suppressCallbacks { + _callbacksEnabled = NO; +} + +- (void)writeValue:(id)value { + if (_callbacksEnabled) { + [self.stream writeValue:value]; + } +} + +- (void)writesFinishedWithError:(NSError *)errorOrNil { + if (_callbacksEnabled) { + [self.stream writesFinishedWithError:errorOrNil]; + } +} + +@end + #pragma mark - FSTStream +@interface FSTStream () + +@property(nonatomic, strong, readwrite) FSTCallbackFilter *callbackFilter; + +@end + @implementation FSTStream +/** The time a stream stays open after it is marked idle. */ +static const NSTimeInterval kIdleTimeout = 60.0; + - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials @@ -475,11 +528,11 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { @throw FSTAbstractMethodException(); // NOLINT } -- (void)start { +- (void)startWithDelegate:(id)delegate { [self.workerDispatchQueue verifyIsCurrentQueue]; if (self.state == FSTStreamStateError) { - [self performBackoff]; + [self performBackoffWithDelegate:delegate]; return; } @@ -487,6 +540,8 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { FSTAssert(self.state == FSTStreamStateInitial, @"Already started"); self.state = FSTStreamStateAuth; + FSTAssert(_delegate == nil, @"Delegate must be nil"); + _delegate = delegate; [self.credentials getTokenForcingRefresh:NO @@ -522,14 +577,16 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { [FSTDatastore prepareHeadersForRPC:_rpc databaseID:self.databaseInfo.databaseID token:token.token]; - [_rpc startWithWriteable:self]; + FSTAssert(_callbackFilter == nil, @"GRX Filter must be nil"); + _callbackFilter = [[FSTCallbackFilter alloc] initWithStream:self]; + [_rpc startWithWriteable:_callbackFilter]; self.state = FSTStreamStateOpen; - [self handleStreamOpen]; + [self notifyStreamOpen]; } /** Backs off after an error. */ -- (void)performBackoff { +- (void)performBackoffWithDelegate:(id)delegate { FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self); [self.workerDispatchQueue verifyIsCurrentQueue]; @@ -539,12 +596,12 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { FSTWeakify(self); [self.backoff backoffAndRunBlock:^{ FSTStrongify(self); - [self resumeStartFromBackoff]; + [self resumeStartFromBackoffWithDelegate:delegate]; }]; } /** Resumes stream start after backing off. */ -- (void)resumeStartFromBackoff { +- (void)resumeStartFromBackoffWithDelegate:(id)delegate { if (self.state == FSTStreamStateStopped) { // Streams can be stopped while waiting for backoff to complete. return; @@ -557,21 +614,80 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { // Momentarily set state to FSTStreamStateInitial as `start` expects it. self.state = FSTStreamStateInitial; - [self start]; + [self startWithDelegate:delegate]; FSTAssert([self isStarted], @"Stream should have started."); } -- (void)stop { - FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self); +/** + * Closes the stream and cleans up as necessary: + * + * * closes the underlying GRPC stream; + * * calls the onClose handler with the given 'error'; + * * sets internal stream state to 'finalState'; + * * adjusts the backoff timer based on the error + * + * A new stream can be opened by calling `start` unless `finalState` is set to + * `FSTStreamStateStopped`. + * + * @param finalState the intended state of the stream after closing. + * @param error the NSError the connection was closed with. + */ +- (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *)error { + FSTAssert(finalState == FSTStreamStateError || error == nil, + @"Can't provide an error when not in an error state."); + [self.workerDispatchQueue verifyIsCurrentQueue]; + [self cancelIdleCheck]; - // Prevent any possible future restart of this stream. - self.state = FSTStreamStateStopped; + if (finalState != FSTStreamStateError) { + // If this is an intentional close ensure we don't delay our next connection attempt. + [self.backoff reset]; + } else if (error != nil && error.code == FIRFirestoreErrorCodeResourceExhausted) { + FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class], + (__bridge void *)self); + [self.backoff resetToMax]; + } - // Close the stream client side. - FSTBufferedWriter *requestsWriter = self.requestsWriter; - @synchronized(requestsWriter) { - [requestsWriter finishWithError:nil]; + // This state must be assigned before calling `notifyStreamInterrupted` to allow the callback to + // inhibit backoff or otherwise manipulate the state in its non-started state. + self.state = finalState; + + if (self.requestsWriter) { + // Clean up the underlying RPC. If this close: is in response to an error, don't attempt to + // call half-close to avoid secondary failures. + if (finalState != FSTStreamStateError) { + FSTLog(@"%@ %p Closing stream client-side", [self class], (__bridge void *)self); + @synchronized(self.requestsWriter) { + [self.requestsWriter finishWithError:nil]; + } + } + _requestsWriter = nil; + } + + [self.callbackFilter suppressCallbacks]; + _callbackFilter = nil; + + // Clean up remaining state. + _messageReceived = NO; + _rpc = nil; + + // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it + // could trigger undesirable recovery logic, etc.). + if (finalState != FSTStreamStateStopped) { + [self notifyStreamInterruptedWithError:error]; + } + + // Clear the delegates to avoid any possible bleed through of events from GRPC. + FSTAssert(_delegate, + @"closeWithFinalState should only be called for a started stream that has an active " + @"delegate."); + _delegate = nil; +} + +- (void)stop { + FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self); + if ([self isStarted]) { + [self closeWithFinalState:FSTStreamStateStopped error:nil]; } } @@ -585,6 +701,32 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { [self.backoff reset]; } +/** Called by the idle timer when the stream should close due to inactivity. */ +- (void)handleIdleCloseTimer { + [self.workerDispatchQueue verifyIsCurrentQueue]; + if (self.state == FSTStreamStateOpen && [self isIdle]) { + // When timing out an idle stream there's no reason to force the stream into backoff when + // it restarts so set the stream state to Initial instead of Error. + [self closeWithFinalState:FSTStreamStateInitial error:nil]; + } +} + +- (void)markIdle { + [self.workerDispatchQueue verifyIsCurrentQueue]; + if (self.state == FSTStreamStateOpen) { + self.idle = YES; + [self.workerDispatchQueue dispatchAfterDelay:kIdleTimeout + block:^() { + [self handleIdleCloseTimer]; + }]; + } +} + +- (void)cancelIdleCheck { + [self.workerDispatchQueue verifyIsCurrentQueue]; + self.idle = NO; +} + /** * Parses a protocol buffer response from the server. If the message fails to parse, generates * an error and closes the stream. @@ -620,6 +762,8 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { - (void)writeRequest:(GPBMessage *)request { NSData *data = [request data]; + [self cancelIdleCheck]; + FSTBufferedWriter *requestsWriter = self.requestsWriter; @synchronized(requestsWriter) { [requestsWriter writeValue:data]; @@ -629,13 +773,22 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { #pragma mark Template methods for subclasses /** - * Called by the stream after the stream has been successfully connected, authenticated, and is now - * ready to accept messages. + * Called by the stream after the stream has opened. * - * Subclasses should relay to their stream-specific delegate. Calling [super handleStreamOpen] is + * Subclasses should relay to their stream-specific delegate. Calling [super notifyStreamOpen] is * not required. */ -- (void)handleStreamOpen { +- (void)notifyStreamOpen { +} + +/** + * Called by the stream after the stream has been unexpectedly interrupted, either due to an error + * or due to idleness. + * + * Subclasses should relay to their stream-specific delegate. Calling [super + * notifyStreamInterrupted] is not required. + */ +- (void)notifyStreamInterruptedWithError:(nullable NSError *)error { } /** @@ -649,30 +802,16 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { /** * Called by the stream when the underlying RPC has been closed for whatever reason. - * - * Subclasses should first call [super handleStreamClose:] and then call to their - * stream-specific delegate. */ -- (void)handleStreamClose:(NSError *_Nullable)error { +- (void)handleStreamClose:(nullable NSError *)error { FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error); FSTAssert([self isStarted], @"Can't handle server close in non-started state."); - [self.workerDispatchQueue verifyIsCurrentQueue]; - - self.messageReceived = NO; - self.rpc = nil; - self.requestsWriter = nil; // In theory the stream could close cleanly, however, in our current model we never expect this // to happen because if we stop a stream ourselves, this callback will never be called. To // prevent cases where we retry without a backoff accidentally, we set the stream to error // in all cases. - self.state = FSTStreamStateError; - - if (error.code == FIRFirestoreErrorCodeResourceExhausted) { - FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class], - (__bridge void *)self); - [self.backoff resetToMax]; - } + [self closeWithFinalState:FSTStreamStateError error:error]; } #pragma mark GRXWriteable implementation @@ -717,7 +856,7 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to * directly inform stream-specific logic, or call stop to tear down the stream. */ -- (void)writesFinishedWithError:(NSError *_Nullable)error __used { +- (void)writesFinishedWithError:(nullable NSError *)error __used { error = [FSTDatastore firestoreErrorForError:error]; FSTWeakify(self); [self.workerDispatchQueue dispatchAsync:^{ @@ -744,15 +883,13 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer - delegate:(id<FSTWatchStreamDelegate>)delegate { + serializer:(FSTSerializerBeta *)serializer { self = [super initWithDatabase:database workerDispatchQueue:workerDispatchQueue credentials:credentials responseMessageClass:[GCFSListenResponse class]]; if (self) { _serializer = serializer; - _delegate = delegate; } return self; } @@ -763,20 +900,12 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { requestsWriter:requestsWriter]; } -- (void)stop { - // Clear the delegate to avoid any possible bleed through of events from GRPC. - _delegate = nil; - - [super stop]; -} - -- (void)handleStreamOpen { +- (void)notifyStreamOpen { [self.delegate watchStreamDidOpen]; } -- (void)handleStreamClose:(NSError *_Nullable)error { - [super handleStreamClose:error]; - [self.delegate watchStreamDidClose:error]; +- (void)notifyStreamInterruptedWithError:(nullable NSError *)error { + [self.delegate watchStreamWasInterruptedWithError:error]; } - (void)watchQuery:(FSTQueryData *)query { @@ -835,15 +964,13 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id<FSTCredentialsProvider>)credentials - serializer:(FSTSerializerBeta *)serializer - delegate:(id<FSTWriteStreamDelegate>)delegate { + serializer:(FSTSerializerBeta *)serializer { self = [super initWithDatabase:database workerDispatchQueue:workerDispatchQueue credentials:credentials responseMessageClass:[GCFSWriteResponse class]]; if (self) { _serializer = serializer; - _delegate = delegate; } return self; } @@ -854,26 +981,17 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { requestsWriter:requestsWriter]; } -- (void)start { +- (void)startWithDelegate:(id)delegate { self.handshakeComplete = NO; - [super start]; + [super startWithDelegate:delegate]; } -- (void)stop { - // Clear the delegate to avoid any possible bleed through of events from GRPC. - _delegate = nil; - - [super stop]; -} - -- (void)handleStreamOpen { +- (void)notifyStreamOpen { [self.delegate writeStreamDidOpen]; } -- (void)handleStreamClose:(NSError *_Nullable)error { - [super handleStreamClose:error]; - - [self.delegate writeStreamDidClose:error]; +- (void)notifyStreamInterruptedWithError:(nullable NSError *)error { + [self.delegate writeStreamWasInterruptedWithError:error]; } - (void)writeHandshake { @@ -920,7 +1038,7 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { // Always capture the last stream token. self.lastStreamToken = response.streamToken; - if (!self.handshakeComplete) { + if (!self.isHandshakeComplete) { // The first response is the handshake response self.handshakeComplete = YES; diff --git a/Firestore/Source/Remote/FSTExponentialBackoff.m b/Firestore/Source/Remote/FSTExponentialBackoff.m index 179b3f9..dc589b5 100644 --- a/Firestore/Source/Remote/FSTExponentialBackoff.m +++ b/Firestore/Source/Remote/FSTExponentialBackoff.m @@ -75,9 +75,8 @@ FSTLog(@"Backing off for %.2f seconds (base delay: %.2f seconds)", delayWithJitter, _currentBase); } - dispatch_time_t delay = - dispatch_time(DISPATCH_TIME_NOW, (int64_t)(delayWithJitter * NSEC_PER_SEC)); - dispatch_after(delay, self.dispatchQueue.queue, block); + + [self.dispatchQueue dispatchAfterDelay:delayWithJitter block:block]; // Apply backoff factor to determine next delay and ensure it is within bounds. _currentBase *= _backoffFactor; diff --git a/Firestore/Source/Remote/FSTRemoteStore.m b/Firestore/Source/Remote/FSTRemoteStore.m index b4a6449..0bb37cc 100644 --- a/Firestore/Source/Remote/FSTRemoteStore.m +++ b/Firestore/Source/Remote/FSTRemoteStore.m @@ -204,8 +204,8 @@ static const int kOnlineAttemptsBeforeFailure = 2; FSTAssert(self.writeStream == nil, @"enableNetwork: called with non-null writeStream."); // Create new streams (but note they're not started yet). - self.watchStream = [self.datastore createWatchStreamWithDelegate:self]; - self.writeStream = [self.datastore createWriteStreamWithDelegate:self]; + self.watchStream = [self.datastore createWatchStream]; + self.writeStream = [self.datastore createWriteStream]; // Load any saved stream token from persistent storage self.writeStream.lastStreamToken = [self.localStore lastStreamToken]; @@ -265,7 +265,7 @@ static const int kOnlineAttemptsBeforeFailure = 2; - (void)startWatchStream { FSTAssert([self shouldStartWatchStream], @"startWatchStream: called when shouldStartWatchStream: is false."); - [self.watchStream start]; + [self.watchStream startWithDelegate:self]; } - (void)listenToTargetWithQueryData:(FSTQueryData *)queryData { @@ -276,7 +276,7 @@ static const int kOnlineAttemptsBeforeFailure = 2; self.listenTargets[targetKey] = queryData; if ([self shouldStartWatchStream]) { - [self.watchStream start]; + [self startWatchStream]; } else if ([self isNetworkEnabled] && [self.watchStream isOpen]) { [self sendWatchRequestWithQueryData:queryData]; } @@ -295,6 +295,9 @@ static const int kOnlineAttemptsBeforeFailure = 2; [self.listenTargets removeObjectForKey:targetKey]; if ([self isNetworkEnabled] && [self.watchStream isOpen]) { [self sendUnwatchRequestForTargetID:targetKey]; + if ([self.listenTargets count] == 0) { + [self.watchStream markIdle]; + } } } @@ -364,7 +367,7 @@ static const int kOnlineAttemptsBeforeFailure = 2; } } -- (void)watchStreamDidClose:(NSError *_Nullable)error { +- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error { FSTAssert([self isNetworkEnabled], @"watchStreamDidClose should only be called when the network is enabled"); @@ -374,7 +377,7 @@ static const int kOnlineAttemptsBeforeFailure = 2; // watch targets. if ([self shouldStartWatchStream]) { [self updateOnlineStateAfterFailure]; - [self.watchStream start]; + [self startWatchStream]; } else { // We don't need to restart the watch stream because there are no active targets. The online // state is set to unknown because there is no active attempt at establishing a connection. @@ -514,7 +517,7 @@ static const int kOnlineAttemptsBeforeFailure = 2; FSTAssert([self shouldStartWriteStream], @"startWriteStream: called when shouldStartWriteStream: is false."); - [self.writeStream start]; + [self.writeStream startWithDelegate:self]; } - (void)cleanUpWriteStreamState { @@ -523,12 +526,18 @@ static const int kOnlineAttemptsBeforeFailure = 2; } - (void)fillWritePipeline { - while ([self canWriteMutations]) { - FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen]; - if (!batch) { - break; + if ([self isNetworkEnabled]) { + while ([self canWriteMutations]) { + FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen]; + if (!batch) { + break; + } + [self commitBatch:batch]; + } + + if ([self.pendingWrites count] == 0) { + [self.writeStream markIdle]; } - [self commitBatch:batch]; } } @@ -613,18 +622,13 @@ static const int kOnlineAttemptsBeforeFailure = 2; * Handles the closing of the StreamingWrite RPC, either because of an error or because the RPC * has been terminated by the client or the server. */ -- (void)writeStreamDidClose:(NSError *_Nullable)error { +- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error { FSTAssert([self isNetworkEnabled], @"writeStreamDidClose: should only be called when the network is enabled"); - NSMutableArray *pendingWrites = self.pendingWrites; - // Ignore close if there are no pending writes. - if (pendingWrites.count == 0) { - return; - } - - FSTAssert(error, @"There are pending writes, but the write stream closed without an error."); - if ([FSTDatastore isPermanentWriteError:error]) { + // If the write stream closed due to an error, invoke the error callbacks if there are pending + // writes. + if (error != nil && self.pendingWrites.count > 0) { if (self.writeStream.handshakeComplete) { // This error affects the actual writes. [self handleWriteError:error]; @@ -637,7 +641,7 @@ static const int kOnlineAttemptsBeforeFailure = 2; // The write stream might have been started by refilling the write pipeline for failed writes if ([self shouldStartWriteStream]) { - [self.writeStream start]; + [self startWriteStream]; } } diff --git a/Firestore/Source/Util/FSTDispatchQueue.h b/Firestore/Source/Util/FSTDispatchQueue.h index 256b9c0..fe87887 100644 --- a/Firestore/Source/Util/FSTDispatchQueue.h +++ b/Firestore/Source/Util/FSTDispatchQueue.h @@ -23,6 +23,8 @@ NS_ASSUME_NONNULL_BEGIN /** Creates and returns an FSTDispatchQueue wrapping the specified dispatch_queue_t. */ + (instancetype)queueWith:(dispatch_queue_t)dispatchQueue; +- (instancetype)initWithQueue:(dispatch_queue_t)queue NS_DESIGNATED_INITIALIZER; + - (instancetype)init __attribute__((unavailable("Use static constructor method."))); /** @@ -50,6 +52,17 @@ NS_ASSUME_NONNULL_BEGIN */ - (void)dispatchAsyncAllowingSameQueue:(void (^)(void))block; +/** + * Schedules a callback after the specified delay. + * + * Unlike dispatchAsync: this method does not require you to dispatch to a different queue than + * the current one (thus it is equivalent to a raw dispatch_after()). + * + * @param block The block to run. + * @param delay The delay (in seconds) after which to run the block. + */ +- (void)dispatchAfterDelay:(NSTimeInterval)delay block:(void (^)(void))block; + /** The underlying wrapped dispatch_queue_t */ @property(nonatomic, strong, readonly) dispatch_queue_t queue; diff --git a/Firestore/Source/Util/FSTDispatchQueue.m b/Firestore/Source/Util/FSTDispatchQueue.m index 9613102..8c953fe 100644 --- a/Firestore/Source/Util/FSTDispatchQueue.m +++ b/Firestore/Source/Util/FSTDispatchQueue.m @@ -56,6 +56,11 @@ NS_ASSUME_NONNULL_BEGIN dispatch_async(self.queue, block); } +- (void)dispatchAfterDelay:(NSTimeInterval)delay block:(void (^)(void))block { + dispatch_time_t delayNs = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(delay * NSEC_PER_SEC)); + dispatch_after(delayNs, self.queue, block); +} + #pragma mark - Private Methods - (NSString *)currentQueueLabel { |