aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/Source/Remote/FSTRemoteStore.m
diff options
context:
space:
mode:
authorGravatar Sebastian Schmidt <mrschmidt@google.com>2017-10-30 18:17:16 -0700
committerGravatar GitHub <noreply@github.com>2017-10-30 18:17:16 -0700
commit02ff6bbee95150eacff9563af4dd7a6e1aeaebdd (patch)
tree8a095ae29bdb6daf273f57913af021c2eae981ab /Firestore/Source/Remote/FSTRemoteStore.m
parent1db9fd83df8d29abe5e7369ad1cbf3eb8545a78a (diff)
Closing the write and watch stream after 60s of idleness (#388)
Diffstat (limited to 'Firestore/Source/Remote/FSTRemoteStore.m')
-rw-r--r--Firestore/Source/Remote/FSTRemoteStore.m48
1 files changed, 26 insertions, 22 deletions
diff --git a/Firestore/Source/Remote/FSTRemoteStore.m b/Firestore/Source/Remote/FSTRemoteStore.m
index b4a6449..0bb37cc 100644
--- a/Firestore/Source/Remote/FSTRemoteStore.m
+++ b/Firestore/Source/Remote/FSTRemoteStore.m
@@ -204,8 +204,8 @@ static const int kOnlineAttemptsBeforeFailure = 2;
FSTAssert(self.writeStream == nil, @"enableNetwork: called with non-null writeStream.");
// Create new streams (but note they're not started yet).
- self.watchStream = [self.datastore createWatchStreamWithDelegate:self];
- self.writeStream = [self.datastore createWriteStreamWithDelegate:self];
+ self.watchStream = [self.datastore createWatchStream];
+ self.writeStream = [self.datastore createWriteStream];
// Load any saved stream token from persistent storage
self.writeStream.lastStreamToken = [self.localStore lastStreamToken];
@@ -265,7 +265,7 @@ static const int kOnlineAttemptsBeforeFailure = 2;
- (void)startWatchStream {
FSTAssert([self shouldStartWatchStream],
@"startWatchStream: called when shouldStartWatchStream: is false.");
- [self.watchStream start];
+ [self.watchStream startWithDelegate:self];
}
- (void)listenToTargetWithQueryData:(FSTQueryData *)queryData {
@@ -276,7 +276,7 @@ static const int kOnlineAttemptsBeforeFailure = 2;
self.listenTargets[targetKey] = queryData;
if ([self shouldStartWatchStream]) {
- [self.watchStream start];
+ [self startWatchStream];
} else if ([self isNetworkEnabled] && [self.watchStream isOpen]) {
[self sendWatchRequestWithQueryData:queryData];
}
@@ -295,6 +295,9 @@ static const int kOnlineAttemptsBeforeFailure = 2;
[self.listenTargets removeObjectForKey:targetKey];
if ([self isNetworkEnabled] && [self.watchStream isOpen]) {
[self sendUnwatchRequestForTargetID:targetKey];
+ if ([self.listenTargets count] == 0) {
+ [self.watchStream markIdle];
+ }
}
}
@@ -364,7 +367,7 @@ static const int kOnlineAttemptsBeforeFailure = 2;
}
}
-- (void)watchStreamDidClose:(NSError *_Nullable)error {
+- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error {
FSTAssert([self isNetworkEnabled],
@"watchStreamDidClose should only be called when the network is enabled");
@@ -374,7 +377,7 @@ static const int kOnlineAttemptsBeforeFailure = 2;
// watch targets.
if ([self shouldStartWatchStream]) {
[self updateOnlineStateAfterFailure];
- [self.watchStream start];
+ [self startWatchStream];
} else {
// We don't need to restart the watch stream because there are no active targets. The online
// state is set to unknown because there is no active attempt at establishing a connection.
@@ -514,7 +517,7 @@ static const int kOnlineAttemptsBeforeFailure = 2;
FSTAssert([self shouldStartWriteStream],
@"startWriteStream: called when shouldStartWriteStream: is false.");
- [self.writeStream start];
+ [self.writeStream startWithDelegate:self];
}
- (void)cleanUpWriteStreamState {
@@ -523,12 +526,18 @@ static const int kOnlineAttemptsBeforeFailure = 2;
}
- (void)fillWritePipeline {
- while ([self canWriteMutations]) {
- FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen];
- if (!batch) {
- break;
+ if ([self isNetworkEnabled]) {
+ while ([self canWriteMutations]) {
+ FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen];
+ if (!batch) {
+ break;
+ }
+ [self commitBatch:batch];
+ }
+
+ if ([self.pendingWrites count] == 0) {
+ [self.writeStream markIdle];
}
- [self commitBatch:batch];
}
}
@@ -613,18 +622,13 @@ static const int kOnlineAttemptsBeforeFailure = 2;
* Handles the closing of the StreamingWrite RPC, either because of an error or because the RPC
* has been terminated by the client or the server.
*/
-- (void)writeStreamDidClose:(NSError *_Nullable)error {
+- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {
FSTAssert([self isNetworkEnabled],
@"writeStreamDidClose: should only be called when the network is enabled");
- NSMutableArray *pendingWrites = self.pendingWrites;
- // Ignore close if there are no pending writes.
- if (pendingWrites.count == 0) {
- return;
- }
-
- FSTAssert(error, @"There are pending writes, but the write stream closed without an error.");
- if ([FSTDatastore isPermanentWriteError:error]) {
+ // If the write stream closed due to an error, invoke the error callbacks if there are pending
+ // writes.
+ if (error != nil && self.pendingWrites.count > 0) {
if (self.writeStream.handshakeComplete) {
// This error affects the actual writes.
[self handleWriteError:error];
@@ -637,7 +641,7 @@ static const int kOnlineAttemptsBeforeFailure = 2;
// The write stream might have been started by refilling the write pipeline for failed writes
if ([self shouldStartWriteStream]) {
- [self.writeStream start];
+ [self startWriteStream];
}
}