aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore
diff options
context:
space:
mode:
authorGravatar Sebastian Schmidt <mrschmidt@google.com>2017-10-30 18:17:16 -0700
committerGravatar GitHub <noreply@github.com>2017-10-30 18:17:16 -0700
commit02ff6bbee95150eacff9563af4dd7a6e1aeaebdd (patch)
tree8a095ae29bdb6daf273f57913af021c2eae981ab /Firestore
parent1db9fd83df8d29abe5e7369ad1cbf3eb8545a78a (diff)
Closing the write and watch stream after 60s of idleness (#388)
Diffstat (limited to 'Firestore')
-rw-r--r--Firestore/Example/Firestore.xcodeproj/project.pbxproj8
-rw-r--r--Firestore/Example/Tests/Integration/API/FIRDatabaseTests.m18
-rw-r--r--Firestore/Example/Tests/Integration/FSTStreamTests.m88
-rw-r--r--Firestore/Example/Tests/SpecTests/FSTMockDatastore.m71
-rw-r--r--Firestore/Example/Tests/Util/FSTIntegrationTestCase.h5
-rw-r--r--Firestore/Example/Tests/Util/FSTIntegrationTestCase.m36
-rw-r--r--Firestore/Example/Tests/Util/FSTTestDispatchQueue.h39
-rw-r--r--Firestore/Example/Tests/Util/FSTTestDispatchQueue.m61
-rw-r--r--Firestore/Source/Remote/FSTDatastore.h44
-rw-r--r--Firestore/Source/Remote/FSTDatastore.m276
-rw-r--r--Firestore/Source/Remote/FSTExponentialBackoff.m5
-rw-r--r--Firestore/Source/Remote/FSTRemoteStore.m48
-rw-r--r--Firestore/Source/Util/FSTDispatchQueue.h13
-rw-r--r--Firestore/Source/Util/FSTDispatchQueue.m5
14 files changed, 540 insertions, 177 deletions
diff --git a/Firestore/Example/Firestore.xcodeproj/project.pbxproj b/Firestore/Example/Firestore.xcodeproj/project.pbxproj
index 2de1066..0197deb 100644
--- a/Firestore/Example/Firestore.xcodeproj/project.pbxproj
+++ b/Firestore/Example/Firestore.xcodeproj/project.pbxproj
@@ -59,6 +59,8 @@
AFE6114F0D4DAECBA7B7C089 /* Pods_Firestore_IntegrationTests.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = B2FA635DF5D116A67A7441CD /* Pods_Firestore_IntegrationTests.framework */; };
C4E749275AD0FBDF9F4716A8 /* Pods_SwiftBuildTest.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 32AD40BF6B0E849B07FFD05E /* Pods_SwiftBuildTest.framework */; };
D5B2532E4676014F57A7EAB9 /* FSTStreamTests.m in Sources */ = {isa = PBXBuildFile; fileRef = D5B25C0D4AADFCA3ADB883E4 /* FSTStreamTests.m */; };
+ D5B25474286C9800CE42B8C2 /* FSTTestDispatchQueue.m in Sources */ = {isa = PBXBuildFile; fileRef = D5B25292CED31B81FDED0411 /* FSTTestDispatchQueue.m */; };
+ D5B259FDEE8094E8D710C5BF /* FSTTestDispatchQueue.m in Sources */ = {isa = PBXBuildFile; fileRef = D5B25292CED31B81FDED0411 /* FSTTestDispatchQueue.m */; };
DE03B2C91F2149D600A30B9C /* FSTTransactionTests.m in Sources */ = {isa = PBXBuildFile; fileRef = DE51B1C61F0D48AC0013853F /* FSTTransactionTests.m */; };
DE03B2D41F2149D600A30B9C /* XCTest.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 6003F5AF195388D20070C39A /* XCTest.framework */; };
DE03B2D51F2149D600A30B9C /* UIKit.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 6003F591195388D20070C39A /* UIKit.framework */; };
@@ -222,6 +224,8 @@
B2FA635DF5D116A67A7441CD /* Pods_Firestore_IntegrationTests.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = Pods_Firestore_IntegrationTests.framework; sourceTree = BUILT_PRODUCTS_DIR; };
CE00BABB5A3AAB44A4C209E2 /* Pods-Firestore_Tests.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Firestore_Tests.debug.xcconfig"; path = "Pods/Target Support Files/Pods-Firestore_Tests/Pods-Firestore_Tests.debug.xcconfig"; sourceTree = "<group>"; };
D3CC3DC5338DCAF43A211155 /* README.md */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = net.daringfireball.markdown; name = README.md; path = ../README.md; sourceTree = "<group>"; };
+ D5B25292CED31B81FDED0411 /* FSTTestDispatchQueue.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = FSTTestDispatchQueue.m; sourceTree = "<group>"; };
+ D5B259DAA9149B80D6245B57 /* FSTTestDispatchQueue.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = FSTTestDispatchQueue.h; sourceTree = "<group>"; };
D5B25C0D4AADFCA3ADB883E4 /* FSTStreamTests.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = FSTStreamTests.m; sourceTree = "<group>"; };
DB17FEDFB80770611A935A60 /* Pods-Firestore_IntegrationTests.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Firestore_IntegrationTests.release.xcconfig"; path = "Pods/Target Support Files/Pods-Firestore_IntegrationTests/Pods-Firestore_IntegrationTests.release.xcconfig"; sourceTree = "<group>"; };
DE03B2E91F2149D600A30B9C /* Firestore_IntegrationTests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = Firestore_IntegrationTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
@@ -561,6 +565,8 @@
DE51B18A1F0D48AC0013853F /* FSTUtilTests.m */,
54E9282A1F339CAD00C1953E /* XCTestCase+Await.h */,
54E9282B1F339CAD00C1953E /* XCTestCase+Await.m */,
+ D5B259DAA9149B80D6245B57 /* FSTTestDispatchQueue.h */,
+ D5B25292CED31B81FDED0411 /* FSTTestDispatchQueue.m */,
);
path = Util;
sourceTree = "<group>";
@@ -1175,6 +1181,7 @@
DE51B1CD1F0D48CD0013853F /* FSTDatabaseInfoTests.m in Sources */,
DE51B1F21F0D49140013853F /* FSTPathTests.m in Sources */,
DE51B1DD1F0D490D0013853F /* FSTLocalStoreTests.m in Sources */,
+ D5B25474286C9800CE42B8C2 /* FSTTestDispatchQueue.m in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};
@@ -1199,6 +1206,7 @@
DE03B2C91F2149D600A30B9C /* FSTTransactionTests.m in Sources */,
54DA12B11F315F3800DD57A1 /* FIRValidationTests.m in Sources */,
D5B2532E4676014F57A7EAB9 /* FSTStreamTests.m in Sources */,
+ D5B259FDEE8094E8D710C5BF /* FSTTestDispatchQueue.m in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};
diff --git a/Firestore/Example/Tests/Integration/API/FIRDatabaseTests.m b/Firestore/Example/Tests/Integration/API/FIRDatabaseTests.m
index 68692cc..6a6e49a 100644
--- a/Firestore/Example/Tests/Integration/API/FIRDatabaseTests.m
+++ b/Firestore/Example/Tests/Integration/API/FIRDatabaseTests.m
@@ -913,4 +913,22 @@
[self awaitExpectations];
}
+- (void)testWriteStreamReconnectsAfterIdle {
+ FIRDocumentReference *doc = [self documentRef];
+ FIRFirestore *firestore = doc.firestore;
+
+ [self writeDocumentRef:doc data:@{@"foo" : @"bar"}];
+ [self waitForIdleFirestore:firestore];
+ [self writeDocumentRef:doc data:@{@"foo" : @"bar"}];
+}
+
+- (void)testWatchStreamReconnectsAfterIdle {
+ FIRDocumentReference *doc = [self documentRef];
+ FIRFirestore *firestore = doc.firestore;
+
+ [self readSnapshotForRef:[self documentRef] requireOnline:YES];
+ [self waitForIdleFirestore:firestore];
+ [self readSnapshotForRef:[self documentRef] requireOnline:YES];
+}
+
@end
diff --git a/Firestore/Example/Tests/Integration/FSTStreamTests.m b/Firestore/Example/Tests/Integration/FSTStreamTests.m
index 7ca123d..dccaa70 100644
--- a/Firestore/Example/Tests/Integration/FSTStreamTests.m
+++ b/Firestore/Example/Tests/Integration/FSTStreamTests.m
@@ -22,10 +22,10 @@
#import "Core/FSTDatabaseInfo.h"
#import "FSTHelpers.h"
#import "FSTIntegrationTestCase.h"
+#import "FSTTestDispatchQueue.h"
#import "Model/FSTDatabaseID.h"
#import "Remote/FSTDatastore.h"
#import "Util/FSTAssert.h"
-#import "Util/FSTDispatchQueue.h"
/** Exposes otherwise private methods for testing. */
@interface FSTStream (Testing)
@@ -79,14 +79,14 @@
_expectation = nil;
}
-- (void)writeStreamDidClose:(NSError *_Nullable)error {
- [_states addObject:@"writeStreamDidClose"];
+- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {
+ [_states addObject:@"writeStreamWasInterrupted"];
[_expectation fulfill];
_expectation = nil;
}
-- (void)watchStreamDidClose:(NSError *_Nullable)error {
- [_states addObject:@"watchStreamDidClose"];
+- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error {
+ [_states addObject:@"watchStreamWasInterrupted"];
[_expectation fulfill];
_expectation = nil;
}
@@ -126,10 +126,10 @@
@implementation FSTStreamTests {
dispatch_queue_t _testQueue;
+ FSTTestDispatchQueue *_workerDispatchQueue;
FSTDatabaseInfo *_databaseInfo;
FSTEmptyCredentialsProvider *_credentials;
FSTStreamStatusDelegate *_delegate;
- FSTDispatchQueue *_workerDispatchQueue;
/** Single mutation to send to the write stream. */
NSArray<FSTMutation *> *_mutations;
@@ -138,38 +138,37 @@
- (void)setUp {
[super setUp];
- _mutations = @[ FSTTestSetMutation(@"foo/bar", @{}) ];
-
FIRFirestoreSettings *settings = [FSTIntegrationTestCase settings];
FSTDatabaseID *databaseID =
[FSTDatabaseID databaseIDWithProject:[FSTIntegrationTestCase projectID]
database:kDefaultDatabaseID];
+ _testQueue = dispatch_queue_create("FSTStreamTestWorkerQueue", DISPATCH_QUEUE_SERIAL);
+ _workerDispatchQueue = [[FSTTestDispatchQueue alloc] initWithQueue:_testQueue];
+
_databaseInfo = [FSTDatabaseInfo databaseInfoWithDatabaseID:databaseID
persistenceKey:@"test-key"
host:settings.host
sslEnabled:settings.sslEnabled];
- _testQueue = dispatch_queue_create("FSTStreamTestWorkerQueue", DISPATCH_QUEUE_SERIAL);
- _workerDispatchQueue = [FSTDispatchQueue queueWith:_testQueue];
_credentials = [[FSTEmptyCredentialsProvider alloc] init];
+
+ _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue];
+
+ _mutations = @[ FSTTestSetMutation(@"foo/bar", @{}) ];
}
- (FSTWriteStream *)setUpWriteStream {
FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:_databaseInfo
workerDispatchQueue:_workerDispatchQueue
credentials:_credentials];
-
- _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue];
- return [datastore createWriteStreamWithDelegate:_delegate];
+ return [datastore createWriteStream];
}
- (FSTWatchStream *)setUpWatchStream {
FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:_databaseInfo
workerDispatchQueue:_workerDispatchQueue
credentials:_credentials];
-
- _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue];
- return [datastore createWatchStreamWithDelegate:_delegate];
+ return [datastore createWatchStream];
}
/**
@@ -190,7 +189,7 @@
FSTWatchStream *watchStream = [self setUpWatchStream];
[_delegate awaitNotificationFromBlock:^{
- [watchStream start];
+ [watchStream startWithDelegate:_delegate];
}];
// Stop must not call watchStreamDidClose because the full implementation of the delegate could
@@ -210,7 +209,7 @@
FSTWriteStream *writeStream = [self setUpWriteStream];
[_delegate awaitNotificationFromBlock:^{
- [writeStream start];
+ [writeStream startWithDelegate:_delegate];
}];
// Don't start the handshake.
@@ -231,7 +230,7 @@
FSTWriteStream *writeStream = [self setUpWriteStream];
[_delegate awaitNotificationFromBlock:^{
- [writeStream start];
+ [writeStream startWithDelegate:_delegate];
}];
// Writing before the handshake should throw
@@ -258,4 +257,55 @@
]];
}
+- (void)testStreamClosesWhenIdle {
+ FSTWriteStream *writeStream = [self setUpWriteStream];
+
+ [_delegate awaitNotificationFromBlock:^{
+ [writeStream startWithDelegate:_delegate];
+ }];
+
+ [_delegate awaitNotificationFromBlock:^{
+ [writeStream writeHandshake];
+ }];
+
+ [_delegate awaitNotificationFromBlock:^{
+ [writeStream markIdle];
+ }];
+
+ dispatch_sync(_testQueue, ^{
+ XCTAssertFalse([writeStream isOpen]);
+ });
+
+ [self verifyDelegateObservedStates:@[
+ @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake", @"writeStreamWasInterrupted"
+ ]];
+}
+
+- (void)testStreamCancelsIdleOnWrite {
+ FSTWriteStream *writeStream = [self setUpWriteStream];
+
+ [_delegate awaitNotificationFromBlock:^{
+ [writeStream startWithDelegate:_delegate];
+ }];
+
+ [_delegate awaitNotificationFromBlock:^{
+ [writeStream writeHandshake];
+ }];
+
+ // Mark the stream idle, but immediately cancel the idle timer by issuing another write.
+ [_delegate awaitNotificationFromBlock:^{
+ [writeStream markIdle];
+ [writeStream writeMutations:_mutations];
+ }];
+
+ dispatch_sync(_testQueue, ^{
+ XCTAssertTrue([writeStream isOpen]);
+ });
+
+ [self verifyDelegateObservedStates:@[
+ @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
+ @"writeStreamDidReceiveResponseWithVersion"
+ ]];
+}
+
@end
diff --git a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m
index 8b95286..6af2053 100644
--- a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m
+++ b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.m
@@ -39,19 +39,17 @@ NS_ASSUME_NONNULL_BEGIN
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer
- delegate:(id<FSTWatchStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
+ serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
- responseMessageClass:(Class)responseMessageClass
- delegate:(id<FSTWatchStreamDelegate>)delegate NS_UNAVAILABLE;
+ responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
@property(nonatomic, assign) BOOL open;
-
@property(nonatomic, strong, readonly)
NSMutableDictionary<FSTBoxedTargetID *, FSTQueryData *> *activeTargets;
+@property(nonatomic, weak, readwrite, nullable) id<FSTWatchStreamDelegate> delegate;
@end
@@ -60,13 +58,11 @@ NS_ASSUME_NONNULL_BEGIN
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer
- delegate:(id<FSTWatchStreamDelegate>)delegate {
+ serializer:(FSTSerializerBeta *)serializer {
self = [super initWithDatabase:database
workerDispatchQueue:workerDispatchQueue
credentials:credentials
- serializer:serializer
- delegate:delegate];
+ serializer:serializer];
if (self) {
FSTAssert(database, @"Database must not be nil");
_activeTargets = [NSMutableDictionary dictionary];
@@ -76,10 +72,15 @@ NS_ASSUME_NONNULL_BEGIN
#pragma mark - Overridden FSTWatchStream methods.
-- (void)start {
+- (void)startWithDelegate:(id<FSTWatchStreamDelegate>)delegate {
FSTAssert(!self.open, @"Trying to start already started watch stream");
self.open = YES;
- [self handleStreamOpen];
+ self.delegate = delegate;
+ [self notifyStreamOpen];
+}
+
+- (void)stop {
+ self.delegate = nil;
}
- (BOOL)isOpen {
@@ -90,10 +91,14 @@ NS_ASSUME_NONNULL_BEGIN
return self.open;
}
-- (void)handleStreamOpen {
+- (void)notifyStreamOpen {
[self.delegate watchStreamDidOpen];
}
+- (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
+ [self.delegate watchStreamWasInterruptedWithError:error];
+}
+
- (void)watchQuery:(FSTQueryData *)query {
FSTLog(@"watchQuery: %d: %@", query.targetID, query.query);
// Snapshot version is ignored on the wire
@@ -110,7 +115,7 @@ NS_ASSUME_NONNULL_BEGIN
- (void)failStreamWithError:(NSError *)error {
self.open = NO;
- [self.delegate watchStreamDidClose:error];
+ [self notifyStreamInterruptedWithError:error];
}
#pragma mark - Helper methods.
@@ -142,17 +147,16 @@ NS_ASSUME_NONNULL_BEGIN
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer
- delegate:(id<FSTWriteStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
+ serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
- responseMessageClass:(Class)responseMessageClass
- delegate:(id<FSTWriteStreamDelegate>)delegate NS_UNAVAILABLE;
+ responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
@property(nonatomic, assign) BOOL open;
@property(nonatomic, strong, readonly) NSMutableArray<NSArray<FSTMutation *> *> *sentMutations;
+@property(nonatomic, weak, readwrite, nullable) id<FSTWriteStreamDelegate> delegate;
@end
@@ -161,13 +165,11 @@ NS_ASSUME_NONNULL_BEGIN
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer
- delegate:(id<FSTWriteStreamDelegate>)delegate {
+ serializer:(FSTSerializerBeta *)serializer {
self = [super initWithDatabase:database
workerDispatchQueue:workerDispatchQueue
credentials:credentials
- serializer:serializer
- delegate:delegate];
+ serializer:serializer];
if (self) {
_sentMutations = [NSMutableArray array];
}
@@ -176,11 +178,16 @@ NS_ASSUME_NONNULL_BEGIN
#pragma mark - Overridden FSTWriteStream methods.
-- (void)start {
+- (void)startWithDelegate:(id<FSTWriteStreamDelegate>)delegate {
FSTAssert(!self.open, @"Trying to start already started write stream");
self.open = YES;
[self.sentMutations removeAllObjects];
- [self handleStreamOpen];
+ self.delegate = delegate;
+ [self notifyStreamOpen];
+}
+
+- (void)stop {
+ self.delegate = nil;
}
- (BOOL)isOpen {
@@ -200,10 +207,14 @@ NS_ASSUME_NONNULL_BEGIN
[self.sentMutations addObject:mutations];
}
-- (void)handleStreamOpen {
+- (void)notifyStreamOpen {
[self.delegate writeStreamDidOpen];
}
+- (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
+ [self.delegate writeStreamWasInterruptedWithError:error];
+}
+
#pragma mark - Helper methods.
/** Injects a write ack as though it had come from the backend in response to a write. */
@@ -215,7 +226,7 @@ NS_ASSUME_NONNULL_BEGIN
/** Injects a failed write response as though it had come from the backend. */
- (void)failStreamWithError:(NSError *)error {
self.open = NO;
- [self.delegate writeStreamDidClose:error];
+ [self notifyStreamInterruptedWithError:error];
}
/**
@@ -269,27 +280,25 @@ NS_ASSUME_NONNULL_BEGIN
#pragma mark - Overridden FSTDatastore methods.
-- (FSTWatchStream *)createWatchStreamWithDelegate:(id<FSTWatchStreamDelegate>)delegate {
+- (FSTWatchStream *)createWatchStream {
FSTAssert(self.databaseInfo, @"DatabaseInfo must not be nil");
self.watchStream = [[FSTMockWatchStream alloc]
initWithDatabase:self.databaseInfo
workerDispatchQueue:self.workerDispatchQueue
credentials:self.credentials
serializer:[[FSTSerializerBeta alloc]
- initWithDatabaseID:self.databaseInfo.databaseID]
- delegate:delegate];
+ initWithDatabaseID:self.databaseInfo.databaseID]];
return self.watchStream;
}
-- (FSTWriteStream *)createWriteStreamWithDelegate:(id<FSTWriteStreamDelegate>)delegate {
+- (FSTWriteStream *)createWriteStream {
FSTAssert(self.databaseInfo, @"DatabaseInfo must not be nil");
self.writeStream = [[FSTMockWriteStream alloc]
initWithDatabase:self.databaseInfo
workerDispatchQueue:self.workerDispatchQueue
credentials:self.credentials
serializer:[[FSTSerializerBeta alloc]
- initWithDatabaseID:self.databaseInfo.databaseID]
- delegate:delegate];
+ initWithDatabaseID:self.databaseInfo.databaseID]];
return self.writeStream;
}
diff --git a/Firestore/Example/Tests/Util/FSTIntegrationTestCase.h b/Firestore/Example/Tests/Util/FSTIntegrationTestCase.h
index 3dd5464..a2c08ec 100644
--- a/Firestore/Example/Tests/Util/FSTIntegrationTestCase.h
+++ b/Firestore/Example/Tests/Util/FSTIntegrationTestCase.h
@@ -56,6 +56,8 @@ NS_ASSUME_NONNULL_BEGIN
- (FIRCollectionReference *)collectionRefWithDocuments:
(NSDictionary<NSString *, NSDictionary<NSString *, id> *> *)documents;
+- (void)waitForIdleFirestore:(FIRFirestore *)firestore;
+
- (void)writeAllDocuments:(NSDictionary<NSString *, NSDictionary<NSString *, id> *> *)documents
toCollection:(FIRCollectionReference *)collection;
@@ -67,6 +69,9 @@ NS_ASSUME_NONNULL_BEGIN
- (FIRQuerySnapshot *)readDocumentSetForRef:(FIRQuery *)query;
+- (FIRDocumentSnapshot *)readSnapshotForRef:(FIRDocumentReference *)query
+ requireOnline:(BOOL)online;
+
- (void)writeDocumentRef:(FIRDocumentReference *)ref data:(NSDictionary<NSString *, id> *)data;
- (void)updateDocumentRef:(FIRDocumentReference *)ref data:(NSDictionary<NSString *, id> *)data;
diff --git a/Firestore/Example/Tests/Util/FSTIntegrationTestCase.m b/Firestore/Example/Tests/Util/FSTIntegrationTestCase.m
index 87a78c3..2e1e0a9 100644
--- a/Firestore/Example/Tests/Util/FSTIntegrationTestCase.m
+++ b/Firestore/Example/Tests/Util/FSTIntegrationTestCase.m
@@ -30,9 +30,14 @@
#import "Util/FSTUtil.h"
#import "FSTEventAccumulator.h"
+#import "FSTTestDispatchQueue.h"
NS_ASSUME_NONNULL_BEGIN
+@interface FIRFirestore (Testing)
+@property(nonatomic, strong) FSTDispatchQueue *workerDispatchQueue;
+@end
+
@implementation FSTIntegrationTestCase {
NSMutableArray<FIRFirestore *> *_firestores;
}
@@ -121,7 +126,7 @@ NS_ASSUME_NONNULL_BEGIN
- (FIRFirestore *)firestoreWithProjectID:(NSString *)projectID {
NSString *persistenceKey = [NSString stringWithFormat:@"db%lu", (unsigned long)_firestores.count];
- FSTDispatchQueue *workerDispatchQueue = [FSTDispatchQueue
+ FSTTestDispatchQueue *workerDispatchQueue = [FSTTestDispatchQueue
queueWith:dispatch_queue_create("com.google.firebase.firestore", DISPATCH_QUEUE_SERIAL)];
FSTEmptyCredentialsProvider *credentialsProvider = [[FSTEmptyCredentialsProvider alloc] init];
@@ -142,6 +147,14 @@ NS_ASSUME_NONNULL_BEGIN
return firestore;
}
+- (void)waitForIdleFirestore:(FIRFirestore *)firestore {
+ XCTestExpectation *expectation = [self expectationWithDescription:@"idle"];
+ // Note that we wait on any task that is scheduled with a delay of 60s. Currently, the idle
+ // timeout is the only task that uses this delay.
+ [((FSTTestDispatchQueue *)firestore.workerDispatchQueue) fulfillOnExecution:expectation];
+ [self awaitExpectations];
+}
+
- (void)shutdownFirestore:(FIRFirestore *)firestore {
XCTestExpectation *shutdownCompletion = [self expectationWithDescription:@"shutdown"];
[firestore shutdownWithCompletion:^(NSError *_Nullable error) {
@@ -222,6 +235,27 @@ NS_ASSUME_NONNULL_BEGIN
return result;
}
+- (FIRDocumentSnapshot *)readSnapshotForRef:(FIRDocumentReference *)ref
+ requireOnline:(BOOL)requireOnline {
+ __block FIRDocumentSnapshot *result;
+
+ XCTestExpectation *expectation = [self expectationWithDescription:@"listener"];
+ id<FIRListenerRegistration> listener = [ref
+ addSnapshotListenerWithOptions:[[FIRDocumentListenOptions options] includeMetadataChanges:YES]
+ listener:^(FIRDocumentSnapshot *snapshot, NSError *error) {
+ XCTAssertNil(error);
+ if (!requireOnline || !snapshot.metadata.fromCache) {
+ result = snapshot;
+ [expectation fulfill];
+ }
+ }];
+
+ [self awaitExpectations];
+ [listener remove];
+
+ return result;
+}
+
- (void)writeDocumentRef:(FIRDocumentReference *)ref data:(NSDictionary<NSString *, id> *)data {
XCTestExpectation *expectation = [self expectationWithDescription:@"setData"];
[ref setData:data
diff --git a/Firestore/Example/Tests/Util/FSTTestDispatchQueue.h b/Firestore/Example/Tests/Util/FSTTestDispatchQueue.h
new file mode 100644
index 0000000..4f4e13e
--- /dev/null
+++ b/Firestore/Example/Tests/Util/FSTTestDispatchQueue.h
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import "Util/FSTDispatchQueue.h"
+
+@class XCTestExpectation;
+
+NS_ASSUME_NONNULL_BEGIN
+
+/**
+ * Dispatch queue used in the integration tests that caps delayed executions at 1.0 seconds.
+ */
+@interface FSTTestDispatchQueue : FSTDispatchQueue
+
+/** Creates and returns an FSTTestDispatchQueue wrapping the specified dispatch_queue_t. */
++ (instancetype)queueWith:(dispatch_queue_t)dispatchQueue;
+
+/**
+ * Registers a test expectation that is fulfilled when the next delayed callback finished
+ * executing.
+ */
+- (void)fulfillOnExecution:(XCTestExpectation *)expectation;
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Example/Tests/Util/FSTTestDispatchQueue.m b/Firestore/Example/Tests/Util/FSTTestDispatchQueue.m
new file mode 100644
index 0000000..27b62bc
--- /dev/null
+++ b/Firestore/Example/Tests/Util/FSTTestDispatchQueue.m
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import "FSTTestDispatchQueue.h"
+
+#import <XCTest/XCTestExpectation.h>
+
+#import "Util/FSTAssert.h"
+
+@interface FSTTestDispatchQueue ()
+
+@property(nonatomic, weak) XCTestExpectation* expectation;
+
+@end
+
+@implementation FSTTestDispatchQueue
+
+/** The delay used by the idle timeout */
+static const NSTimeInterval kIdleDispatchDelay = 60.0;
+
+/** The maximum delay we use in a test run. */
+static const NSTimeInterval kTestDispatchDelay = 1.0;
+
++ (instancetype)queueWith:(dispatch_queue_t)dispatchQueue {
+ return [[FSTTestDispatchQueue alloc] initWithQueue:dispatchQueue];
+}
+
+- (instancetype)initWithQueue:(dispatch_queue_t)dispatchQueue {
+ return (self = [super initWithQueue:dispatchQueue]);
+}
+
+- (void)dispatchAfterDelay:(NSTimeInterval)delay block:(void (^)(void))block {
+ [super dispatchAfterDelay:MIN(delay, kTestDispatchDelay)
+ block:^() {
+ block();
+ if (delay == kIdleDispatchDelay) {
+ [_expectation fulfill];
+ _expectation = nil;
+ }
+ }];
+}
+
+- (void)fulfillOnExecution:(XCTestExpectation*)expectation {
+ FSTAssert(_expectation == nil, @"Previous expectation still active");
+ _expectation = expectation;
+}
+
+@end
diff --git a/Firestore/Source/Remote/FSTDatastore.h b/Firestore/Source/Remote/FSTDatastore.h
index ec2affa..b8e53f5 100644
--- a/Firestore/Source/Remote/FSTDatastore.h
+++ b/Firestore/Source/Remote/FSTDatastore.h
@@ -82,10 +82,10 @@ NS_ASSUME_NONNULL_BEGIN
completion:(FSTVoidErrorBlock)completion;
/** Creates a new watch stream. */
-- (FSTWatchStream *)createWatchStreamWithDelegate:(id<FSTWatchStreamDelegate>)delegate;
+- (FSTWatchStream *)createWatchStream;
/** Creates a new write stream. */
-- (FSTWriteStream *)createWriteStreamWithDelegate:(id<FSTWriteStreamDelegate>)delegate;
+- (FSTWriteStream *)createWriteStream;
/** The name of the database and the backend. */
@property(nonatomic, strong, readonly) FSTDatabaseInfo *databaseInfo;
@@ -122,10 +122,10 @@ NS_ASSUME_NONNULL_BEGIN
*
* An implementation of FSTStream needs to implement the following methods:
* - `createRPCWithRequestsWriter`, should create the specific RPC (a GRPCCall object).
- * - `handleStreamOpen`, should call through to the stream-specific streamDidOpen method.
* - `handleStreamMessage`, receives protocol buffer responses from GRPC and must deserialize and
* delegate to some stream specific response method.
- * - `handleStreamClose`, calls through to the stream-specific streamDidClose method.
+ * - `notifyStreamOpen`, should call through to the stream-specific streamDidOpen method.
+ * - `notifyStreamInterrupted`, calls through to the stream-specific streamWasInterrupted method.
*
* Additionally, beyond these required methods, subclasses will want to implement methods that
* take request models, serialize them, and write them to using writeRequest:.
@@ -139,7 +139,7 @@ NS_ASSUME_NONNULL_BEGIN
*
* See https://github.com/grpc/grpc/issues/10957 for the kinds of things we're trying to avoid.
*/
-@interface FSTStream : NSObject
+@interface FSTStream <__covariant FSTStreamDelegate> : NSObject
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
@@ -175,7 +175,7 @@ NS_ASSUME_NONNULL_BEGIN
*
* When start returns, -isStarted will return YES.
*/
-- (void)start;
+- (void)startWithDelegate:(id)delegate;
/**
* Stops the RPC. This call is idempotent and allowed regardless of the current isStarted state.
@@ -194,6 +194,12 @@ NS_ASSUME_NONNULL_BEGIN
- (void)stop;
/**
+ * Initializes the idle timer. If no write takes place within one minute, the GRPC stream will be
+ * closed.
+ */
+- (void)markIdle;
+
+/**
* After an error the stream will usually back off on the next attempt to start it. If the error
* warrants an immediate restart of the stream, the sender can use this to indicate that the
* receiver should not back off.
@@ -221,14 +227,14 @@ NS_ASSUME_NONNULL_BEGIN
snapshotVersion:(FSTSnapshotVersion *)snapshotVersion;
/**
- * Called by the FSTWatchStream when the underlying streaming RPC is closed for whatever reason,
- * usually because of an error, but possibly due to an idle timeout. The error passed to this
- * method may be nil, in which case the stream was closed without attributable fault.
+ * Called by the FSTWatchStream when the underlying streaming RPC is interrupted for whatever
+ * reason, usually because of an error, but possibly due to an idle timeout. The error passed to
+ * this method may be nil, in which case the stream was closed without attributable fault.
*
* NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping"
* on FSTStream for details.
*/
-- (void)watchStreamDidClose:(NSError *_Nullable)error;
+- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error;
@end
@@ -247,8 +253,7 @@ NS_ASSUME_NONNULL_BEGIN
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer
- delegate:(id<FSTWatchStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
+ serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
@@ -267,8 +272,6 @@ NS_ASSUME_NONNULL_BEGIN
/** Unregisters interest in the results of the query associated with the given target ID. */
- (void)unwatchTargetID:(FSTTargetID)targetID;
-@property(nonatomic, weak, readonly) id<FSTWatchStreamDelegate> delegate;
-
@end
#pragma mark - FSTWriteStream
@@ -292,14 +295,14 @@ NS_ASSUME_NONNULL_BEGIN
mutationResults:(NSArray<FSTMutationResult *> *)results;
/**
- * Called when the FSTWriteStream's underlying RPC is closed for whatever reason, usually because
- * of an error, but possibly due to an idle timeout. The error passed to this method may be nil, in
- * which case the stream was closed without attributable fault.
+ * Called when the FSTWriteStream's underlying RPC is interrupted for whatever reason, usually
+ * because of an error, but possibly due to an idle timeout. The error passed to this method may be
+ * nil, in which case the stream was closed without attributable fault.
*
* NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping"
* on FSTStream for details.
*/
-- (void)writeStreamDidClose:(NSError *_Nullable)error;
+- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error;
@end
@@ -324,8 +327,7 @@ NS_ASSUME_NONNULL_BEGIN
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer
- delegate:(id<FSTWriteStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
+ serializer:(FSTSerializerBeta *)serializer;
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
@@ -344,8 +346,6 @@ NS_ASSUME_NONNULL_BEGIN
/** Sends a group of mutations to the Firestore backend to apply. */
- (void)writeMutations:(NSArray<FSTMutation *> *)mutations;
-@property(nonatomic, weak, readonly) id<FSTWriteStreamDelegate> delegate;
-
/**
* Tracks whether or not a handshake has been successfully exchanged and the stream is ready to
* accept mutations.
diff --git a/Firestore/Source/Remote/FSTDatastore.m b/Firestore/Source/Remote/FSTDatastore.m
index 95ee12c..1cabc36 100644
--- a/Firestore/Source/Remote/FSTDatastore.m
+++ b/Firestore/Source/Remote/FSTDatastore.m
@@ -115,14 +115,19 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer
- delegate:(id<FSTWatchStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
+ serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
- responseMessageClass:(Class)responseMessageClass
- delegate:(id<FSTWatchStreamDelegate>)delegate NS_UNAVAILABLE;
+ responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
+
+@end
+
+@interface FSTStream ()
+
+@property(nonatomic, getter=isIdle) BOOL idle;
+@property(nonatomic, weak, readwrite, nullable) id delegate;
@end
@@ -407,20 +412,18 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
}];
}
-- (FSTWatchStream *)createWatchStreamWithDelegate:(id<FSTWatchStreamDelegate>)delegate {
+- (FSTWatchStream *)createWatchStream {
return [[FSTWatchStream alloc] initWithDatabase:_databaseInfo
workerDispatchQueue:_workerDispatchQueue
credentials:_credentials
- serializer:_serializer
- delegate:delegate];
+ serializer:_serializer];
}
-- (FSTWriteStream *)createWriteStreamWithDelegate:(id<FSTWriteStreamDelegate>)delegate {
+- (FSTWriteStream *)createWriteStream {
return [[FSTWriteStream alloc] initWithDatabase:_databaseInfo
workerDispatchQueue:_workerDispatchQueue
credentials:_credentials
- serializer:_serializer
- delegate:delegate];
+ serializer:_serializer];
}
/** Adds headers to the RPC including any OAuth access token if provided .*/
@@ -436,10 +439,60 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
@end
+#pragma mark - FSTCallbackFilter
+
+/** Filter class that allows disabling of GRPC callbacks. */
+@interface FSTCallbackFilter : NSObject <GRXWriteable>
+
+- (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER;
+- (instancetype)init NS_UNAVAILABLE;
+
+@property(atomic, readwrite) BOOL callbacksEnabled;
+@property(nonatomic, strong, readonly) FSTStream *stream;
+
+@end
+
+@implementation FSTCallbackFilter
+
+- (instancetype)initWithStream:(FSTStream *)stream {
+ if (self = [super init]) {
+ _callbacksEnabled = YES;
+ _stream = stream;
+ }
+ return self;
+}
+
+- (void)suppressCallbacks {
+ _callbacksEnabled = NO;
+}
+
+- (void)writeValue:(id)value {
+ if (_callbacksEnabled) {
+ [self.stream writeValue:value];
+ }
+}
+
+- (void)writesFinishedWithError:(NSError *)errorOrNil {
+ if (_callbacksEnabled) {
+ [self.stream writesFinishedWithError:errorOrNil];
+ }
+}
+
+@end
+
#pragma mark - FSTStream
+@interface FSTStream ()
+
+@property(nonatomic, strong, readwrite) FSTCallbackFilter *callbackFilter;
+
+@end
+
@implementation FSTStream
+/** The time a stream stays open after it is marked idle. */
+static const NSTimeInterval kIdleTimeout = 60.0;
+
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
@@ -475,11 +528,11 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
@throw FSTAbstractMethodException(); // NOLINT
}
-- (void)start {
+- (void)startWithDelegate:(id)delegate {
[self.workerDispatchQueue verifyIsCurrentQueue];
if (self.state == FSTStreamStateError) {
- [self performBackoff];
+ [self performBackoffWithDelegate:delegate];
return;
}
@@ -487,6 +540,8 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
FSTAssert(self.state == FSTStreamStateInitial, @"Already started");
self.state = FSTStreamStateAuth;
+ FSTAssert(_delegate == nil, @"Delegate must be nil");
+ _delegate = delegate;
[self.credentials
getTokenForcingRefresh:NO
@@ -522,14 +577,16 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
[FSTDatastore prepareHeadersForRPC:_rpc
databaseID:self.databaseInfo.databaseID
token:token.token];
- [_rpc startWithWriteable:self];
+ FSTAssert(_callbackFilter == nil, @"GRX Filter must be nil");
+ _callbackFilter = [[FSTCallbackFilter alloc] initWithStream:self];
+ [_rpc startWithWriteable:_callbackFilter];
self.state = FSTStreamStateOpen;
- [self handleStreamOpen];
+ [self notifyStreamOpen];
}
/** Backs off after an error. */
-- (void)performBackoff {
+- (void)performBackoffWithDelegate:(id)delegate {
FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self);
[self.workerDispatchQueue verifyIsCurrentQueue];
@@ -539,12 +596,12 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
FSTWeakify(self);
[self.backoff backoffAndRunBlock:^{
FSTStrongify(self);
- [self resumeStartFromBackoff];
+ [self resumeStartFromBackoffWithDelegate:delegate];
}];
}
/** Resumes stream start after backing off. */
-- (void)resumeStartFromBackoff {
+- (void)resumeStartFromBackoffWithDelegate:(id)delegate {
if (self.state == FSTStreamStateStopped) {
// Streams can be stopped while waiting for backoff to complete.
return;
@@ -557,21 +614,80 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
// Momentarily set state to FSTStreamStateInitial as `start` expects it.
self.state = FSTStreamStateInitial;
- [self start];
+ [self startWithDelegate:delegate];
FSTAssert([self isStarted], @"Stream should have started.");
}
-- (void)stop {
- FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self);
+/**
+ * Closes the stream and cleans up as necessary:
+ *
+ * * closes the underlying GRPC stream;
+ * * calls the onClose handler with the given 'error';
+ * * sets internal stream state to 'finalState';
+ * * adjusts the backoff timer based on the error
+ *
+ * A new stream can be opened by calling `start` unless `finalState` is set to
+ * `FSTStreamStateStopped`.
+ *
+ * @param finalState the intended state of the stream after closing.
+ * @param error the NSError the connection was closed with.
+ */
+- (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *)error {
+ FSTAssert(finalState == FSTStreamStateError || error == nil,
+ @"Can't provide an error when not in an error state.");
+
[self.workerDispatchQueue verifyIsCurrentQueue];
+ [self cancelIdleCheck];
- // Prevent any possible future restart of this stream.
- self.state = FSTStreamStateStopped;
+ if (finalState != FSTStreamStateError) {
+ // If this is an intentional close ensure we don't delay our next connection attempt.
+ [self.backoff reset];
+ } else if (error != nil && error.code == FIRFirestoreErrorCodeResourceExhausted) {
+ FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class],
+ (__bridge void *)self);
+ [self.backoff resetToMax];
+ }
- // Close the stream client side.
- FSTBufferedWriter *requestsWriter = self.requestsWriter;
- @synchronized(requestsWriter) {
- [requestsWriter finishWithError:nil];
+ // This state must be assigned before calling `notifyStreamInterrupted` to allow the callback to
+ // inhibit backoff or otherwise manipulate the state in its non-started state.
+ self.state = finalState;
+
+ if (self.requestsWriter) {
+ // Clean up the underlying RPC. If this close: is in response to an error, don't attempt to
+ // call half-close to avoid secondary failures.
+ if (finalState != FSTStreamStateError) {
+ FSTLog(@"%@ %p Closing stream client-side", [self class], (__bridge void *)self);
+ @synchronized(self.requestsWriter) {
+ [self.requestsWriter finishWithError:nil];
+ }
+ }
+ _requestsWriter = nil;
+ }
+
+ [self.callbackFilter suppressCallbacks];
+ _callbackFilter = nil;
+
+ // Clean up remaining state.
+ _messageReceived = NO;
+ _rpc = nil;
+
+ // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it
+ // could trigger undesirable recovery logic, etc.).
+ if (finalState != FSTStreamStateStopped) {
+ [self notifyStreamInterruptedWithError:error];
+ }
+
+ // Clear the delegates to avoid any possible bleed through of events from GRPC.
+ FSTAssert(_delegate,
+ @"closeWithFinalState should only be called for a started stream that has an active "
+ @"delegate.");
+ _delegate = nil;
+}
+
+- (void)stop {
+ FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self);
+ if ([self isStarted]) {
+ [self closeWithFinalState:FSTStreamStateStopped error:nil];
}
}
@@ -585,6 +701,32 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
[self.backoff reset];
}
+/** Called by the idle timer when the stream should close due to inactivity. */
+- (void)handleIdleCloseTimer {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ if (self.state == FSTStreamStateOpen && [self isIdle]) {
+ // When timing out an idle stream there's no reason to force the stream into backoff when
+ // it restarts so set the stream state to Initial instead of Error.
+ [self closeWithFinalState:FSTStreamStateInitial error:nil];
+ }
+}
+
+- (void)markIdle {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ if (self.state == FSTStreamStateOpen) {
+ self.idle = YES;
+ [self.workerDispatchQueue dispatchAfterDelay:kIdleTimeout
+ block:^() {
+ [self handleIdleCloseTimer];
+ }];
+ }
+}
+
+- (void)cancelIdleCheck {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ self.idle = NO;
+}
+
/**
* Parses a protocol buffer response from the server. If the message fails to parse, generates
* an error and closes the stream.
@@ -620,6 +762,8 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
- (void)writeRequest:(GPBMessage *)request {
NSData *data = [request data];
+ [self cancelIdleCheck];
+
FSTBufferedWriter *requestsWriter = self.requestsWriter;
@synchronized(requestsWriter) {
[requestsWriter writeValue:data];
@@ -629,13 +773,22 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
#pragma mark Template methods for subclasses
/**
- * Called by the stream after the stream has been successfully connected, authenticated, and is now
- * ready to accept messages.
+ * Called by the stream after the stream has opened.
*
- * Subclasses should relay to their stream-specific delegate. Calling [super handleStreamOpen] is
+ * Subclasses should relay to their stream-specific delegate. Calling [super notifyStreamOpen] is
* not required.
*/
-- (void)handleStreamOpen {
+- (void)notifyStreamOpen {
+}
+
+/**
+ * Called by the stream after the stream has been unexpectedly interrupted, either due to an error
+ * or due to idleness.
+ *
+ * Subclasses should relay to their stream-specific delegate. Calling [super
+ * notifyStreamInterrupted] is not required.
+ */
+- (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
}
/**
@@ -649,30 +802,16 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
/**
* Called by the stream when the underlying RPC has been closed for whatever reason.
- *
- * Subclasses should first call [super handleStreamClose:] and then call to their
- * stream-specific delegate.
*/
-- (void)handleStreamClose:(NSError *_Nullable)error {
+- (void)handleStreamClose:(nullable NSError *)error {
FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);
FSTAssert([self isStarted], @"Can't handle server close in non-started state.");
- [self.workerDispatchQueue verifyIsCurrentQueue];
-
- self.messageReceived = NO;
- self.rpc = nil;
- self.requestsWriter = nil;
// In theory the stream could close cleanly, however, in our current model we never expect this
// to happen because if we stop a stream ourselves, this callback will never be called. To
// prevent cases where we retry without a backoff accidentally, we set the stream to error
// in all cases.
- self.state = FSTStreamStateError;
-
- if (error.code == FIRFirestoreErrorCodeResourceExhausted) {
- FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class],
- (__bridge void *)self);
- [self.backoff resetToMax];
- }
+ [self closeWithFinalState:FSTStreamStateError error:error];
}
#pragma mark GRXWriteable implementation
@@ -717,7 +856,7 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
* Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to
* directly inform stream-specific logic, or call stop to tear down the stream.
*/
-- (void)writesFinishedWithError:(NSError *_Nullable)error __used {
+- (void)writesFinishedWithError:(nullable NSError *)error __used {
error = [FSTDatastore firestoreErrorForError:error];
FSTWeakify(self);
[self.workerDispatchQueue dispatchAsync:^{
@@ -744,15 +883,13 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer
- delegate:(id<FSTWatchStreamDelegate>)delegate {
+ serializer:(FSTSerializerBeta *)serializer {
self = [super initWithDatabase:database
workerDispatchQueue:workerDispatchQueue
credentials:credentials
responseMessageClass:[GCFSListenResponse class]];
if (self) {
_serializer = serializer;
- _delegate = delegate;
}
return self;
}
@@ -763,20 +900,12 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
requestsWriter:requestsWriter];
}
-- (void)stop {
- // Clear the delegate to avoid any possible bleed through of events from GRPC.
- _delegate = nil;
-
- [super stop];
-}
-
-- (void)handleStreamOpen {
+- (void)notifyStreamOpen {
[self.delegate watchStreamDidOpen];
}
-- (void)handleStreamClose:(NSError *_Nullable)error {
- [super handleStreamClose:error];
- [self.delegate watchStreamDidClose:error];
+- (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
+ [self.delegate watchStreamWasInterruptedWithError:error];
}
- (void)watchQuery:(FSTQueryData *)query {
@@ -835,15 +964,13 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
credentials:(id<FSTCredentialsProvider>)credentials
- serializer:(FSTSerializerBeta *)serializer
- delegate:(id<FSTWriteStreamDelegate>)delegate {
+ serializer:(FSTSerializerBeta *)serializer {
self = [super initWithDatabase:database
workerDispatchQueue:workerDispatchQueue
credentials:credentials
responseMessageClass:[GCFSWriteResponse class]];
if (self) {
_serializer = serializer;
- _delegate = delegate;
}
return self;
}
@@ -854,26 +981,17 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
requestsWriter:requestsWriter];
}
-- (void)start {
+- (void)startWithDelegate:(id)delegate {
self.handshakeComplete = NO;
- [super start];
+ [super startWithDelegate:delegate];
}
-- (void)stop {
- // Clear the delegate to avoid any possible bleed through of events from GRPC.
- _delegate = nil;
-
- [super stop];
-}
-
-- (void)handleStreamOpen {
+- (void)notifyStreamOpen {
[self.delegate writeStreamDidOpen];
}
-- (void)handleStreamClose:(NSError *_Nullable)error {
- [super handleStreamClose:error];
-
- [self.delegate writeStreamDidClose:error];
+- (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
+ [self.delegate writeStreamWasInterruptedWithError:error];
}
- (void)writeHandshake {
@@ -920,7 +1038,7 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
// Always capture the last stream token.
self.lastStreamToken = response.streamToken;
- if (!self.handshakeComplete) {
+ if (!self.isHandshakeComplete) {
// The first response is the handshake response
self.handshakeComplete = YES;
diff --git a/Firestore/Source/Remote/FSTExponentialBackoff.m b/Firestore/Source/Remote/FSTExponentialBackoff.m
index 179b3f9..dc589b5 100644
--- a/Firestore/Source/Remote/FSTExponentialBackoff.m
+++ b/Firestore/Source/Remote/FSTExponentialBackoff.m
@@ -75,9 +75,8 @@
FSTLog(@"Backing off for %.2f seconds (base delay: %.2f seconds)", delayWithJitter,
_currentBase);
}
- dispatch_time_t delay =
- dispatch_time(DISPATCH_TIME_NOW, (int64_t)(delayWithJitter * NSEC_PER_SEC));
- dispatch_after(delay, self.dispatchQueue.queue, block);
+
+ [self.dispatchQueue dispatchAfterDelay:delayWithJitter block:block];
// Apply backoff factor to determine next delay and ensure it is within bounds.
_currentBase *= _backoffFactor;
diff --git a/Firestore/Source/Remote/FSTRemoteStore.m b/Firestore/Source/Remote/FSTRemoteStore.m
index b4a6449..0bb37cc 100644
--- a/Firestore/Source/Remote/FSTRemoteStore.m
+++ b/Firestore/Source/Remote/FSTRemoteStore.m
@@ -204,8 +204,8 @@ static const int kOnlineAttemptsBeforeFailure = 2;
FSTAssert(self.writeStream == nil, @"enableNetwork: called with non-null writeStream.");
// Create new streams (but note they're not started yet).
- self.watchStream = [self.datastore createWatchStreamWithDelegate:self];
- self.writeStream = [self.datastore createWriteStreamWithDelegate:self];
+ self.watchStream = [self.datastore createWatchStream];
+ self.writeStream = [self.datastore createWriteStream];
// Load any saved stream token from persistent storage
self.writeStream.lastStreamToken = [self.localStore lastStreamToken];
@@ -265,7 +265,7 @@ static const int kOnlineAttemptsBeforeFailure = 2;
- (void)startWatchStream {
FSTAssert([self shouldStartWatchStream],
@"startWatchStream: called when shouldStartWatchStream: is false.");
- [self.watchStream start];
+ [self.watchStream startWithDelegate:self];
}
- (void)listenToTargetWithQueryData:(FSTQueryData *)queryData {
@@ -276,7 +276,7 @@ static const int kOnlineAttemptsBeforeFailure = 2;
self.listenTargets[targetKey] = queryData;
if ([self shouldStartWatchStream]) {
- [self.watchStream start];
+ [self startWatchStream];
} else if ([self isNetworkEnabled] && [self.watchStream isOpen]) {
[self sendWatchRequestWithQueryData:queryData];
}
@@ -295,6 +295,9 @@ static const int kOnlineAttemptsBeforeFailure = 2;
[self.listenTargets removeObjectForKey:targetKey];
if ([self isNetworkEnabled] && [self.watchStream isOpen]) {
[self sendUnwatchRequestForTargetID:targetKey];
+ if ([self.listenTargets count] == 0) {
+ [self.watchStream markIdle];
+ }
}
}
@@ -364,7 +367,7 @@ static const int kOnlineAttemptsBeforeFailure = 2;
}
}
-- (void)watchStreamDidClose:(NSError *_Nullable)error {
+- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error {
FSTAssert([self isNetworkEnabled],
@"watchStreamDidClose should only be called when the network is enabled");
@@ -374,7 +377,7 @@ static const int kOnlineAttemptsBeforeFailure = 2;
// watch targets.
if ([self shouldStartWatchStream]) {
[self updateOnlineStateAfterFailure];
- [self.watchStream start];
+ [self startWatchStream];
} else {
// We don't need to restart the watch stream because there are no active targets. The online
// state is set to unknown because there is no active attempt at establishing a connection.
@@ -514,7 +517,7 @@ static const int kOnlineAttemptsBeforeFailure = 2;
FSTAssert([self shouldStartWriteStream],
@"startWriteStream: called when shouldStartWriteStream: is false.");
- [self.writeStream start];
+ [self.writeStream startWithDelegate:self];
}
- (void)cleanUpWriteStreamState {
@@ -523,12 +526,18 @@ static const int kOnlineAttemptsBeforeFailure = 2;
}
- (void)fillWritePipeline {
- while ([self canWriteMutations]) {
- FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen];
- if (!batch) {
- break;
+ if ([self isNetworkEnabled]) {
+ while ([self canWriteMutations]) {
+ FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen];
+ if (!batch) {
+ break;
+ }
+ [self commitBatch:batch];
+ }
+
+ if ([self.pendingWrites count] == 0) {
+ [self.writeStream markIdle];
}
- [self commitBatch:batch];
}
}
@@ -613,18 +622,13 @@ static const int kOnlineAttemptsBeforeFailure = 2;
* Handles the closing of the StreamingWrite RPC, either because of an error or because the RPC
* has been terminated by the client or the server.
*/
-- (void)writeStreamDidClose:(NSError *_Nullable)error {
+- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {
FSTAssert([self isNetworkEnabled],
@"writeStreamDidClose: should only be called when the network is enabled");
- NSMutableArray *pendingWrites = self.pendingWrites;
- // Ignore close if there are no pending writes.
- if (pendingWrites.count == 0) {
- return;
- }
-
- FSTAssert(error, @"There are pending writes, but the write stream closed without an error.");
- if ([FSTDatastore isPermanentWriteError:error]) {
+ // If the write stream closed due to an error, invoke the error callbacks if there are pending
+ // writes.
+ if (error != nil && self.pendingWrites.count > 0) {
if (self.writeStream.handshakeComplete) {
// This error affects the actual writes.
[self handleWriteError:error];
@@ -637,7 +641,7 @@ static const int kOnlineAttemptsBeforeFailure = 2;
// The write stream might have been started by refilling the write pipeline for failed writes
if ([self shouldStartWriteStream]) {
- [self.writeStream start];
+ [self startWriteStream];
}
}
diff --git a/Firestore/Source/Util/FSTDispatchQueue.h b/Firestore/Source/Util/FSTDispatchQueue.h
index 256b9c0..fe87887 100644
--- a/Firestore/Source/Util/FSTDispatchQueue.h
+++ b/Firestore/Source/Util/FSTDispatchQueue.h
@@ -23,6 +23,8 @@ NS_ASSUME_NONNULL_BEGIN
/** Creates and returns an FSTDispatchQueue wrapping the specified dispatch_queue_t. */
+ (instancetype)queueWith:(dispatch_queue_t)dispatchQueue;
+- (instancetype)initWithQueue:(dispatch_queue_t)queue NS_DESIGNATED_INITIALIZER;
+
- (instancetype)init __attribute__((unavailable("Use static constructor method.")));
/**
@@ -50,6 +52,17 @@ NS_ASSUME_NONNULL_BEGIN
*/
- (void)dispatchAsyncAllowingSameQueue:(void (^)(void))block;
+/**
+ * Schedules a callback after the specified delay.
+ *
+ * Unlike dispatchAsync: this method does not require you to dispatch to a different queue than
+ * the current one (thus it is equivalent to a raw dispatch_after()).
+ *
+ * @param block The block to run.
+ * @param delay The delay (in seconds) after which to run the block.
+ */
+- (void)dispatchAfterDelay:(NSTimeInterval)delay block:(void (^)(void))block;
+
/** The underlying wrapped dispatch_queue_t */
@property(nonatomic, strong, readonly) dispatch_queue_t queue;
diff --git a/Firestore/Source/Util/FSTDispatchQueue.m b/Firestore/Source/Util/FSTDispatchQueue.m
index 9613102..8c953fe 100644
--- a/Firestore/Source/Util/FSTDispatchQueue.m
+++ b/Firestore/Source/Util/FSTDispatchQueue.m
@@ -56,6 +56,11 @@ NS_ASSUME_NONNULL_BEGIN
dispatch_async(self.queue, block);
}
+- (void)dispatchAfterDelay:(NSTimeInterval)delay block:(void (^)(void))block {
+ dispatch_time_t delayNs = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(delay * NSEC_PER_SEC));
+ dispatch_after(delayNs, self.queue, block);
+}
+
#pragma mark - Private Methods
- (NSString *)currentQueueLabel {