aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--Firestore/Example/Tests/Integration/FSTStreamTests.mm12
-rw-r--r--Firestore/Source/Remote/FSTStream.mm101
2 files changed, 60 insertions, 53 deletions
diff --git a/Firestore/Example/Tests/Integration/FSTStreamTests.mm b/Firestore/Example/Tests/Integration/FSTStreamTests.mm
index a36361a..cb967b1 100644
--- a/Firestore/Example/Tests/Integration/FSTStreamTests.mm
+++ b/Firestore/Example/Tests/Integration/FSTStreamTests.mm
@@ -16,6 +16,8 @@
#import <XCTest/XCTest.h>
+#import <GRPCClient/GRPCCall.h>
+
#import <FirebaseFirestore/FIRFirestoreSettings.h>
#import "Firestore/Example/Tests/Util/FSTHelpers.h"
@@ -35,7 +37,7 @@ using firebase::firestore::model::DatabaseId;
/** Exposes otherwise private methods for testing. */
@interface FSTStream (Testing)
-- (void)writesFinishedWithError:(NSError *_Nullable)error;
+@property(nonatomic, strong, readwrite) id<GRXWriteable> callbackFilter;
@end
/**
@@ -202,7 +204,9 @@ using firebase::firestore::model::DatabaseId;
}];
// Simulate a final callback from GRPC
- [watchStream writesFinishedWithError:nil];
+ [_workerDispatchQueue dispatchAsync:^{
+ [watchStream.callbackFilter writesFinishedWithError:nil];
+ }];
[self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]];
}
@@ -224,7 +228,9 @@ using firebase::firestore::model::DatabaseId;
}];
// Simulate a final callback from GRPC
- [writeStream writesFinishedWithError:nil];
+ [_workerDispatchQueue dispatchAsync:^{
+ [writeStream.callbackFilter writesFinishedWithError:nil];
+ }];
[self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]];
}
diff --git a/Firestore/Source/Remote/FSTStream.mm b/Firestore/Source/Remote/FSTStream.mm
index 079ae72..a9aa245 100644
--- a/Firestore/Source/Remote/FSTStream.mm
+++ b/Firestore/Source/Remote/FSTStream.mm
@@ -152,7 +152,12 @@ typedef NS_ENUM(NSInteger, FSTStreamState) {
#pragma mark - FSTCallbackFilter
-/** Filter class that allows disabling of GRPC callbacks. */
+/**
+ * Implements callbacks from gRPC via the GRXWriteable protocol. This is separate from the main
+ * FSTStream to allow the stream to be stopped externally (either by the user or via idle timer)
+ * and be able to completely prevent any subsequent events from gRPC from calling back into the
+ * FSTSTream.
+ */
@interface FSTCallbackFilter : NSObject <GRXWriteable>
- (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER;
@@ -269,12 +274,12 @@ static const NSTimeInterval kIdleTimeout = 60.0;
/** Add an access token to our RPC, after obtaining one from the credentials provider. */
- (void)resumeStartWithToken:(const Token &)token error:(NSError *)error {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
if (self.state == FSTStreamStateStopped) {
// Streams can be stopped while waiting for authorization.
return;
}
-
- [self.workerDispatchQueue verifyIsCurrentQueue];
FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
(long)self.state);
@@ -288,6 +293,8 @@ static const NSTimeInterval kIdleTimeout = 60.0;
self.requestsWriter = [[FSTBufferedWriter alloc] init];
_rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
+ [_rpc setResponseDispatchQueue:self.workerDispatchQueue.queue];
+
[FSTDatastore prepareHeadersForRPC:_rpc
databaseID:&self.databaseInfo->database_id()
token:(token.is_valid() ? token.token() : absl::string_view())];
@@ -369,7 +376,10 @@ static const NSTimeInterval kIdleTimeout = 60.0;
[self.backoff resetToMax];
}
- [self tearDown];
+ if (finalState != FSTStreamStateError) {
+ FSTLog(@"%@ %p Performing stream teardown", [self class], (__bridge void *)self);
+ [self tearDown];
+ }
if (self.requestsWriter) {
// Clean up the underlying RPC. If this close: is in response to an error, don't attempt to
@@ -400,8 +410,9 @@ static const NSTimeInterval kIdleTimeout = 60.0;
[self notifyStreamInterruptedWithError:error];
}
- // Clear the delegates to avoid any possible bleed through of events from GRPC.
- _delegate = nil;
+ // PORTING NOTE: notifyStreamInterruptedWithError may have restarted the stream with a new
+ // delegate so we do /not/ want to clear the delegate here. And since we've already suppressed
+ // callbacks via our callbackFilter, there is no worry about bleed through of events from GRPC.
}
- (void)stop {
@@ -530,11 +541,7 @@ static const NSTimeInterval kIdleTimeout = 60.0;
*/
- (void)handleStreamClose:(nullable NSError *)error {
FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);
-
- if (![self isStarted]) { // The stream could have already been closed by the idle close timer.
- FSTLog(@"%@ Ignoring server close for already closed stream.", NSStringFromClass([self class]));
- return;
- }
+ FSTAssert([self isStarted], @"handleStreamClose: called for non-started stream.");
// In theory the stream could close cleanly, however, in our current model we never expect this
// to happen because if we stop a stream ourselves, this callback will never be called. To
@@ -547,56 +554,50 @@ static const NSTimeInterval kIdleTimeout = 60.0;
// The GRXWriteable implementation defines the receive side of the RPC stream.
/**
- * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately
- * redispatch back onto our own worker queue.
+ * Called by GRPC when it publishes a value.
+ *
+ * GRPC must be configured to use our worker queue by calling
+ * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
+ * the RPC.
*/
-- (void)writeValue:(id)value __used {
- // TODO(mcg): remove the double-dispatch once GRPCCall at head is released.
- // Once released we can set the responseDispatchQueue property on the GRPCCall and then this
- // method can call handleStreamMessage directly.
- FSTWeakify(self);
- [self.workerDispatchQueue dispatchAsync:^{
- FSTStrongify(self);
- if (![self isStarted]) {
- FSTLog(@"%@ Ignoring stream message from inactive stream.", NSStringFromClass([self class]));
- return;
- }
-
- if (!self.messageReceived) {
- self.messageReceived = YES;
- if ([FIRFirestore isLoggingEnabled]) {
- FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
- (__bridge void *)self,
- [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
- }
- }
- NSError *error;
- id proto = [self parseProto:self.responseMessageClass data:value error:&error];
- if (proto) {
- [self handleStreamMessage:proto];
- } else {
- [_rpc finishWithError:error];
+- (void)writeValue:(id)value {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ FSTAssert([self isStarted], @"writeValue: called for stopped stream.");
+
+ if (!self.messageReceived) {
+ self.messageReceived = YES;
+ if ([FIRFirestore isLoggingEnabled]) {
+ FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
+ (__bridge void *)self,
+ [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
}
- }];
+ }
+ NSError *error;
+ id proto = [self parseProto:self.responseMessageClass data:value error:&error];
+ if (proto) {
+ [self handleStreamMessage:proto];
+ } else {
+ [_rpc finishWithError:error];
+ }
}
/**
* Called by GRPC when it closed the stream with an error representing the final state of the
* stream.
*
- * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to
- * directly inform stream-specific logic, or call stop to tear down the stream.
+ * GRPC must be configured to use our worker queue by calling
+ * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
+ * the RPC.
+ *
+ * Do not call directly. Call handleStreamClose to directly inform stream-specific logic, or call
+ * stop to tear down the stream.
*/
- (void)writesFinishedWithError:(nullable NSError *)error __used {
error = [FSTDatastore firestoreErrorForError:error];
- FSTWeakify(self);
- [self.workerDispatchQueue dispatchAsync:^{
- FSTStrongify(self);
- if (!self || self.state == FSTStreamStateStopped) {
- return;
- }
- [self handleStreamClose:error];
- }];
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ FSTAssert([self isStarted], @"writesFinishedWithError: called for stopped stream.");
+
+ [self handleStreamClose:error];
}
@end