diff options
author | 2017-10-30 18:17:16 -0700 | |
---|---|---|
committer | 2017-10-30 18:17:16 -0700 | |
commit | 02ff6bbee95150eacff9563af4dd7a6e1aeaebdd (patch) | |
tree | 8a095ae29bdb6daf273f57913af021c2eae981ab /Firestore/Source/Remote/FSTRemoteStore.m | |
parent | 1db9fd83df8d29abe5e7369ad1cbf3eb8545a78a (diff) |
Closing the write and watch stream after 60s of idleness (#388)
Diffstat (limited to 'Firestore/Source/Remote/FSTRemoteStore.m')
-rw-r--r-- | Firestore/Source/Remote/FSTRemoteStore.m | 48 |
1 files changed, 26 insertions, 22 deletions
diff --git a/Firestore/Source/Remote/FSTRemoteStore.m b/Firestore/Source/Remote/FSTRemoteStore.m index b4a6449..0bb37cc 100644 --- a/Firestore/Source/Remote/FSTRemoteStore.m +++ b/Firestore/Source/Remote/FSTRemoteStore.m @@ -204,8 +204,8 @@ static const int kOnlineAttemptsBeforeFailure = 2; FSTAssert(self.writeStream == nil, @"enableNetwork: called with non-null writeStream."); // Create new streams (but note they're not started yet). - self.watchStream = [self.datastore createWatchStreamWithDelegate:self]; - self.writeStream = [self.datastore createWriteStreamWithDelegate:self]; + self.watchStream = [self.datastore createWatchStream]; + self.writeStream = [self.datastore createWriteStream]; // Load any saved stream token from persistent storage self.writeStream.lastStreamToken = [self.localStore lastStreamToken]; @@ -265,7 +265,7 @@ static const int kOnlineAttemptsBeforeFailure = 2; - (void)startWatchStream { FSTAssert([self shouldStartWatchStream], @"startWatchStream: called when shouldStartWatchStream: is false."); - [self.watchStream start]; + [self.watchStream startWithDelegate:self]; } - (void)listenToTargetWithQueryData:(FSTQueryData *)queryData { @@ -276,7 +276,7 @@ static const int kOnlineAttemptsBeforeFailure = 2; self.listenTargets[targetKey] = queryData; if ([self shouldStartWatchStream]) { - [self.watchStream start]; + [self startWatchStream]; } else if ([self isNetworkEnabled] && [self.watchStream isOpen]) { [self sendWatchRequestWithQueryData:queryData]; } @@ -295,6 +295,9 @@ static const int kOnlineAttemptsBeforeFailure = 2; [self.listenTargets removeObjectForKey:targetKey]; if ([self isNetworkEnabled] && [self.watchStream isOpen]) { [self sendUnwatchRequestForTargetID:targetKey]; + if ([self.listenTargets count] == 0) { + [self.watchStream markIdle]; + } } } @@ -364,7 +367,7 @@ static const int kOnlineAttemptsBeforeFailure = 2; } } -- (void)watchStreamDidClose:(NSError *_Nullable)error { +- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error { FSTAssert([self isNetworkEnabled], @"watchStreamDidClose should only be called when the network is enabled"); @@ -374,7 +377,7 @@ static const int kOnlineAttemptsBeforeFailure = 2; // watch targets. if ([self shouldStartWatchStream]) { [self updateOnlineStateAfterFailure]; - [self.watchStream start]; + [self startWatchStream]; } else { // We don't need to restart the watch stream because there are no active targets. The online // state is set to unknown because there is no active attempt at establishing a connection. @@ -514,7 +517,7 @@ static const int kOnlineAttemptsBeforeFailure = 2; FSTAssert([self shouldStartWriteStream], @"startWriteStream: called when shouldStartWriteStream: is false."); - [self.writeStream start]; + [self.writeStream startWithDelegate:self]; } - (void)cleanUpWriteStreamState { @@ -523,12 +526,18 @@ static const int kOnlineAttemptsBeforeFailure = 2; } - (void)fillWritePipeline { - while ([self canWriteMutations]) { - FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen]; - if (!batch) { - break; + if ([self isNetworkEnabled]) { + while ([self canWriteMutations]) { + FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen]; + if (!batch) { + break; + } + [self commitBatch:batch]; + } + + if ([self.pendingWrites count] == 0) { + [self.writeStream markIdle]; } - [self commitBatch:batch]; } } @@ -613,18 +622,13 @@ static const int kOnlineAttemptsBeforeFailure = 2; * Handles the closing of the StreamingWrite RPC, either because of an error or because the RPC * has been terminated by the client or the server. */ -- (void)writeStreamDidClose:(NSError *_Nullable)error { +- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error { FSTAssert([self isNetworkEnabled], @"writeStreamDidClose: should only be called when the network is enabled"); - NSMutableArray *pendingWrites = self.pendingWrites; - // Ignore close if there are no pending writes. - if (pendingWrites.count == 0) { - return; - } - - FSTAssert(error, @"There are pending writes, but the write stream closed without an error."); - if ([FSTDatastore isPermanentWriteError:error]) { + // If the write stream closed due to an error, invoke the error callbacks if there are pending + // writes. + if (error != nil && self.pendingWrites.count > 0) { if (self.writeStream.handshakeComplete) { // This error affects the actual writes. [self handleWriteError:error]; @@ -637,7 +641,7 @@ static const int kOnlineAttemptsBeforeFailure = 2; // The write stream might have been started by refilling the write pipeline for failed writes if ([self shouldStartWriteStream]) { - [self.writeStream start]; + [self startWriteStream]; } } |