diff options
Diffstat (limited to 'Firestore/Source/Remote')
-rw-r--r-- | Firestore/Source/Remote/FSTStream.mm | 101 |
1 files changed, 51 insertions, 50 deletions
diff --git a/Firestore/Source/Remote/FSTStream.mm b/Firestore/Source/Remote/FSTStream.mm index 079ae72..a9aa245 100644 --- a/Firestore/Source/Remote/FSTStream.mm +++ b/Firestore/Source/Remote/FSTStream.mm @@ -152,7 +152,12 @@ typedef NS_ENUM(NSInteger, FSTStreamState) { #pragma mark - FSTCallbackFilter -/** Filter class that allows disabling of GRPC callbacks. */ +/** + * Implements callbacks from gRPC via the GRXWriteable protocol. This is separate from the main + * FSTStream to allow the stream to be stopped externally (either by the user or via idle timer) + * and be able to completely prevent any subsequent events from gRPC from calling back into the + * FSTSTream. + */ @interface FSTCallbackFilter : NSObject <GRXWriteable> - (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER; @@ -269,12 +274,12 @@ static const NSTimeInterval kIdleTimeout = 60.0; /** Add an access token to our RPC, after obtaining one from the credentials provider. */ - (void)resumeStartWithToken:(const Token &)token error:(NSError *)error { + [self.workerDispatchQueue verifyIsCurrentQueue]; + if (self.state == FSTStreamStateStopped) { // Streams can be stopped while waiting for authorization. return; } - - [self.workerDispatchQueue verifyIsCurrentQueue]; FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)", (long)self.state); @@ -288,6 +293,8 @@ static const NSTimeInterval kIdleTimeout = 60.0; self.requestsWriter = [[FSTBufferedWriter alloc] init]; _rpc = [self createRPCWithRequestsWriter:self.requestsWriter]; + [_rpc setResponseDispatchQueue:self.workerDispatchQueue.queue]; + [FSTDatastore prepareHeadersForRPC:_rpc databaseID:&self.databaseInfo->database_id() token:(token.is_valid() ? token.token() : absl::string_view())]; @@ -369,7 +376,10 @@ static const NSTimeInterval kIdleTimeout = 60.0; [self.backoff resetToMax]; } - [self tearDown]; + if (finalState != FSTStreamStateError) { + FSTLog(@"%@ %p Performing stream teardown", [self class], (__bridge void *)self); + [self tearDown]; + } if (self.requestsWriter) { // Clean up the underlying RPC. If this close: is in response to an error, don't attempt to @@ -400,8 +410,9 @@ static const NSTimeInterval kIdleTimeout = 60.0; [self notifyStreamInterruptedWithError:error]; } - // Clear the delegates to avoid any possible bleed through of events from GRPC. - _delegate = nil; + // PORTING NOTE: notifyStreamInterruptedWithError may have restarted the stream with a new + // delegate so we do /not/ want to clear the delegate here. And since we've already suppressed + // callbacks via our callbackFilter, there is no worry about bleed through of events from GRPC. } - (void)stop { @@ -530,11 +541,7 @@ static const NSTimeInterval kIdleTimeout = 60.0; */ - (void)handleStreamClose:(nullable NSError *)error { FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error); - - if (![self isStarted]) { // The stream could have already been closed by the idle close timer. - FSTLog(@"%@ Ignoring server close for already closed stream.", NSStringFromClass([self class])); - return; - } + FSTAssert([self isStarted], @"handleStreamClose: called for non-started stream."); // In theory the stream could close cleanly, however, in our current model we never expect this // to happen because if we stop a stream ourselves, this callback will never be called. To @@ -547,56 +554,50 @@ static const NSTimeInterval kIdleTimeout = 60.0; // The GRXWriteable implementation defines the receive side of the RPC stream. /** - * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately - * redispatch back onto our own worker queue. + * Called by GRPC when it publishes a value. + * + * GRPC must be configured to use our worker queue by calling + * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting + * the RPC. */ -- (void)writeValue:(id)value __used { - // TODO(mcg): remove the double-dispatch once GRPCCall at head is released. - // Once released we can set the responseDispatchQueue property on the GRPCCall and then this - // method can call handleStreamMessage directly. - FSTWeakify(self); - [self.workerDispatchQueue dispatchAsync:^{ - FSTStrongify(self); - if (![self isStarted]) { - FSTLog(@"%@ Ignoring stream message from inactive stream.", NSStringFromClass([self class])); - return; - } - - if (!self.messageReceived) { - self.messageReceived = YES; - if ([FIRFirestore isLoggingEnabled]) { - FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]), - (__bridge void *)self, - [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]); - } - } - NSError *error; - id proto = [self parseProto:self.responseMessageClass data:value error:&error]; - if (proto) { - [self handleStreamMessage:proto]; - } else { - [_rpc finishWithError:error]; +- (void)writeValue:(id)value { + [self.workerDispatchQueue verifyIsCurrentQueue]; + FSTAssert([self isStarted], @"writeValue: called for stopped stream."); + + if (!self.messageReceived) { + self.messageReceived = YES; + if ([FIRFirestore isLoggingEnabled]) { + FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]), + (__bridge void *)self, + [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]); } - }]; + } + NSError *error; + id proto = [self parseProto:self.responseMessageClass data:value error:&error]; + if (proto) { + [self handleStreamMessage:proto]; + } else { + [_rpc finishWithError:error]; + } } /** * Called by GRPC when it closed the stream with an error representing the final state of the * stream. * - * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to - * directly inform stream-specific logic, or call stop to tear down the stream. + * GRPC must be configured to use our worker queue by calling + * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting + * the RPC. + * + * Do not call directly. Call handleStreamClose to directly inform stream-specific logic, or call + * stop to tear down the stream. */ - (void)writesFinishedWithError:(nullable NSError *)error __used { error = [FSTDatastore firestoreErrorForError:error]; - FSTWeakify(self); - [self.workerDispatchQueue dispatchAsync:^{ - FSTStrongify(self); - if (!self || self.state == FSTStreamStateStopped) { - return; - } - [self handleStreamClose:error]; - }]; + [self.workerDispatchQueue verifyIsCurrentQueue]; + FSTAssert([self isStarted], @"writesFinishedWithError: called for stopped stream."); + + [self handleStreamClose:error]; } @end |