aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/Source/Remote
diff options
context:
space:
mode:
authorGravatar Michael Lehenbauer <mikelehen@gmail.com>2018-02-20 13:03:40 -0800
committerGravatar GitHub <noreply@github.com>2018-02-20 13:03:40 -0800
commit9bf73ab5f06c0cf03a43e99a24527e37ccd12ea7 (patch)
tree197de7c5bf86542e1265e9a2fe1a33237f11b74d /Firestore/Source/Remote
parent7a4a2ea10844afd6a58dace46854fae74399f55c (diff)
Fix two stream close issues (b/73167987, b/73382103). (#810)
* Fix b/73167987: Upon receiving a permanent write error when we had additional pendingWrites to send, we were restarting the stream with a new delegate and then immediately setting the delegate to nil, causing the stream to ignore all GRPC events for the stream. * Fix b/73382103: We were attempting to gracefully teardown the write stream (i.e. send an empty WriteRequest) even when the stream was already failed due to an error. This caused no harm other than log pollution, but I fixed it. * Use -[GRPCCall setResponseDispatchQueue] to dispatch GRPC callbacks directly onto the Firestore worker queue. This saves a double-dispatch and simplifies our logic. * Add stricter assertions regarding stream state now that dispatch queue / callback filter race conditions are eliminated.
Diffstat (limited to 'Firestore/Source/Remote')
-rw-r--r--Firestore/Source/Remote/FSTStream.mm101
1 files changed, 51 insertions, 50 deletions
diff --git a/Firestore/Source/Remote/FSTStream.mm b/Firestore/Source/Remote/FSTStream.mm
index 079ae72..a9aa245 100644
--- a/Firestore/Source/Remote/FSTStream.mm
+++ b/Firestore/Source/Remote/FSTStream.mm
@@ -152,7 +152,12 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
#pragma mark - FSTCallbackFilter
-/** Filter class that allows disabling of GRPC callbacks. */
+/**
+ * Implements callbacks from gRPC via the GRXWriteable protocol. This is separate from the main
+ * FSTStream to allow the stream to be stopped externally (either by the user or via idle timer)
+ * and be able to completely prevent any subsequent events from gRPC from calling back into the
+ * FSTSTream.
+ */
@interface FSTCallbackFilter : NSObject <GRXWriteable>
- (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER;
@@ -269,12 +274,12 @@ static const NSTimeInterval kIdleTimeout = 60.0;
/** Add an access token to our RPC, after obtaining one from the credentials provider. */
- (void)resumeStartWithToken:(const Token &)token error:(NSError *)error {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
if (self.state == FSTStreamStateStopped) {
// Streams can be stopped while waiting for authorization.
return;
}
-
- [self.workerDispatchQueue verifyIsCurrentQueue];
FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
(long)self.state);
@@ -288,6 +293,8 @@ static const NSTimeInterval kIdleTimeout = 60.0;
self.requestsWriter = [[FSTBufferedWriter alloc] init];
_rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
+ [_rpc setResponseDispatchQueue:self.workerDispatchQueue.queue];
+
[FSTDatastore prepareHeadersForRPC:_rpc
databaseID:&self.databaseInfo->database_id()
token:(token.is_valid() ? token.token() : absl::string_view())];
@@ -369,7 +376,10 @@ static const NSTimeInterval kIdleTimeout = 60.0;
[self.backoff resetToMax];
}
- [self tearDown];
+ if (finalState != FSTStreamStateError) {
+ FSTLog(@"%@ %p Performing stream teardown", [self class], (__bridge void *)self);
+ [self tearDown];
+ }
if (self.requestsWriter) {
// Clean up the underlying RPC. If this close: is in response to an error, don't attempt to
@@ -400,8 +410,9 @@ static const NSTimeInterval kIdleTimeout = 60.0;
[self notifyStreamInterruptedWithError:error];
}
- // Clear the delegates to avoid any possible bleed through of events from GRPC.
- _delegate = nil;
+ // PORTING NOTE: notifyStreamInterruptedWithError may have restarted the stream with a new
+ // delegate so we do /not/ want to clear the delegate here. And since we've already suppressed
+ // callbacks via our callbackFilter, there is no worry about bleed through of events from GRPC.
}
- (void)stop {
@@ -530,11 +541,7 @@ static const NSTimeInterval kIdleTimeout = 60.0;
*/
- (void)handleStreamClose:(nullable NSError *)error {
FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);
-
- if (![self isStarted]) { // The stream could have already been closed by the idle close timer.
- FSTLog(@"%@ Ignoring server close for already closed stream.", NSStringFromClass([self class]));
- return;
- }
+ FSTAssert([self isStarted], @"handleStreamClose: called for non-started stream.");
// In theory the stream could close cleanly, however, in our current model we never expect this
// to happen because if we stop a stream ourselves, this callback will never be called. To
@@ -547,56 +554,50 @@ static const NSTimeInterval kIdleTimeout = 60.0;
// The GRXWriteable implementation defines the receive side of the RPC stream.
/**
- * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately
- * redispatch back onto our own worker queue.
+ * Called by GRPC when it publishes a value.
+ *
+ * GRPC must be configured to use our worker queue by calling
+ * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
+ * the RPC.
*/
-- (void)writeValue:(id)value __used {
- // TODO(mcg): remove the double-dispatch once GRPCCall at head is released.
- // Once released we can set the responseDispatchQueue property on the GRPCCall and then this
- // method can call handleStreamMessage directly.
- FSTWeakify(self);
- [self.workerDispatchQueue dispatchAsync:^{
- FSTStrongify(self);
- if (![self isStarted]) {
- FSTLog(@"%@ Ignoring stream message from inactive stream.", NSStringFromClass([self class]));
- return;
- }
-
- if (!self.messageReceived) {
- self.messageReceived = YES;
- if ([FIRFirestore isLoggingEnabled]) {
- FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
- (__bridge void *)self,
- [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
- }
- }
- NSError *error;
- id proto = [self parseProto:self.responseMessageClass data:value error:&error];
- if (proto) {
- [self handleStreamMessage:proto];
- } else {
- [_rpc finishWithError:error];
+- (void)writeValue:(id)value {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ FSTAssert([self isStarted], @"writeValue: called for stopped stream.");
+
+ if (!self.messageReceived) {
+ self.messageReceived = YES;
+ if ([FIRFirestore isLoggingEnabled]) {
+ FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
+ (__bridge void *)self,
+ [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
}
- }];
+ }
+ NSError *error;
+ id proto = [self parseProto:self.responseMessageClass data:value error:&error];
+ if (proto) {
+ [self handleStreamMessage:proto];
+ } else {
+ [_rpc finishWithError:error];
+ }
}
/**
* Called by GRPC when it closed the stream with an error representing the final state of the
* stream.
*
- * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to
- * directly inform stream-specific logic, or call stop to tear down the stream.
+ * GRPC must be configured to use our worker queue by calling
+ * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
+ * the RPC.
+ *
+ * Do not call directly. Call handleStreamClose to directly inform stream-specific logic, or call
+ * stop to tear down the stream.
*/
- (void)writesFinishedWithError:(nullable NSError *)error __used {
error = [FSTDatastore firestoreErrorForError:error];
- FSTWeakify(self);
- [self.workerDispatchQueue dispatchAsync:^{
- FSTStrongify(self);
- if (!self || self.state == FSTStreamStateStopped) {
- return;
- }
- [self handleStreamClose:error];
- }];
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ FSTAssert([self isStarted], @"writesFinishedWithError: called for stopped stream.");
+
+ [self handleStreamClose:error];
}
@end