/* * Copyright 2017 Google * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #import #import #import #import #include #import "Firestore/Example/Tests/Util/FSTHelpers.h" #import "Firestore/Example/Tests/Util/FSTIntegrationTestCase.h" #import "Firestore/Source/Remote/FSTDatastore.h" #import "Firestore/Source/Remote/FSTStream.h" #include "Firestore/core/src/firebase/firestore/auth/empty_credentials_provider.h" #include "Firestore/core/src/firebase/firestore/core/database_info.h" #include "Firestore/core/src/firebase/firestore/model/database_id.h" #include "Firestore/core/src/firebase/firestore/model/snapshot_version.h" #include "Firestore/core/src/firebase/firestore/util/hard_assert.h" #include "Firestore/core/src/firebase/firestore/util/string_apple.h" namespace util = firebase::firestore::util; using firebase::firestore::auth::EmptyCredentialsProvider; using firebase::firestore::core::DatabaseInfo; using firebase::firestore::model::DatabaseId; using firebase::firestore::model::SnapshotVersion; /** Exposes otherwise private methods for testing. */ @interface FSTStream (Testing) @property(nonatomic, strong, readwrite) id callbackFilter; @end /** * Implements FSTWatchStreamDelegate and FSTWriteStreamDelegate and supports waiting on callbacks * via `fulfillOnCallback`. */ @interface FSTStreamStatusDelegate : NSObject - (instancetype)initWithTestCase:(XCTestCase *)testCase queue:(FSTDispatchQueue *)dispatchQueue NS_DESIGNATED_INITIALIZER; - (instancetype)init NS_UNAVAILABLE; @property(nonatomic, weak, readonly) XCTestCase *testCase; @property(nonatomic, strong, readonly) FSTDispatchQueue *dispatchQueue; @property(nonatomic, readonly) NSMutableArray *states; @property(nonatomic, strong) XCTestExpectation *expectation; @end @implementation FSTStreamStatusDelegate - (instancetype)initWithTestCase:(XCTestCase *)testCase queue:(FSTDispatchQueue *)dispatchQueue { if (self = [super init]) { _testCase = testCase; _dispatchQueue = dispatchQueue; _states = [NSMutableArray new]; } return self; } - (void)watchStreamDidOpen { [_states addObject:@"watchStreamDidOpen"]; [_expectation fulfill]; _expectation = nil; } - (void)writeStreamDidOpen { [_states addObject:@"writeStreamDidOpen"]; [_expectation fulfill]; _expectation = nil; } - (void)writeStreamDidCompleteHandshake { [_states addObject:@"writeStreamDidCompleteHandshake"]; [_expectation fulfill]; _expectation = nil; } - (void)writeStreamWasInterruptedWithError:(nullable NSError *)error { [_states addObject:@"writeStreamWasInterrupted"]; [_expectation fulfill]; _expectation = nil; } - (void)watchStreamWasInterruptedWithError:(nullable NSError *)error { [_states addObject:@"watchStreamWasInterrupted"]; [_expectation fulfill]; _expectation = nil; } - (void)watchStreamDidChange:(FSTWatchChange *)change snapshotVersion:(const SnapshotVersion &)snapshotVersion { [_states addObject:@"watchStreamDidChange"]; [_expectation fulfill]; _expectation = nil; } - (void)writeStreamDidReceiveResponseWithVersion:(const SnapshotVersion &)commitVersion mutationResults:(NSArray *)results { [_states addObject:@"writeStreamDidReceiveResponseWithVersion"]; [_expectation fulfill]; _expectation = nil; } /** * Executes 'block' using the provided FSTDispatchQueue and waits for any callback on this delegate * to be called. */ - (void)awaitNotificationFromBlock:(void (^)(void))block { HARD_ASSERT(_expectation == nil, "Previous expectation still active"); XCTestExpectation *expectation = [self.testCase expectationWithDescription:@"awaitCallbackInBlock"]; _expectation = expectation; [self.dispatchQueue dispatchAsync:block]; [self.testCase awaitExpectations]; } @end @interface FSTStreamTests : XCTestCase @end @implementation FSTStreamTests { dispatch_queue_t _testQueue; FSTDispatchQueue *_workerDispatchQueue; DatabaseInfo _databaseInfo; EmptyCredentialsProvider _credentials; FSTStreamStatusDelegate *_delegate; /** Single mutation to send to the write stream. */ NSArray *_mutations; } - (void)setUp { [super setUp]; FIRFirestoreSettings *settings = [FSTIntegrationTestCase settings]; DatabaseId database_id(util::MakeStringView([FSTIntegrationTestCase projectID]), DatabaseId::kDefault); _testQueue = dispatch_queue_create("FSTStreamTestWorkerQueue", DISPATCH_QUEUE_SERIAL); _workerDispatchQueue = [[FSTDispatchQueue alloc] initWithQueue:_testQueue]; _databaseInfo = DatabaseInfo(database_id, "test-key", util::MakeStringView(settings.host), settings.sslEnabled); _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue]; _mutations = @[ FSTTestSetMutation(@"foo/bar", @{}) ]; } - (FSTWriteStream *)setUpWriteStream { FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo workerDispatchQueue:_workerDispatchQueue credentials:&_credentials]; return [datastore createWriteStream]; } - (FSTWatchStream *)setUpWatchStream { FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo workerDispatchQueue:_workerDispatchQueue credentials:&_credentials]; return [datastore createWatchStream]; } /** * Drains the test queue and asserts that all the observed callbacks (up to this point) match * 'expectedStates'. Clears the list of observed callbacks on completion. */ - (void)verifyDelegateObservedStates:(NSArray *)expectedStates { // Drain queue dispatch_sync(_testQueue, ^{ }); XCTAssertEqualObjects(_delegate.states, expectedStates); [_delegate.states removeAllObjects]; } /** Verifies that the watch stream does not issue an onClose callback after a call to stop(). */ - (void)testWatchStreamStopBeforeHandshake { FSTWatchStream *watchStream = [self setUpWatchStream]; [_delegate awaitNotificationFromBlock:^{ [watchStream startWithDelegate:_delegate]; }]; // Stop must not call watchStreamDidClose because the full implementation of the delegate could // attempt to restart the stream in the event it had pending watches. [_workerDispatchQueue dispatchAsync:^{ [watchStream stop]; }]; // Simulate a final callback from GRPC [_workerDispatchQueue dispatchAsync:^{ [watchStream.callbackFilter writesFinishedWithError:nil]; }]; [self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]]; } /** Verifies that the write stream does not issue an onClose callback after a call to stop(). */ - (void)testWriteStreamStopBeforeHandshake { FSTWriteStream *writeStream = [self setUpWriteStream]; [_delegate awaitNotificationFromBlock:^{ [writeStream startWithDelegate:_delegate]; }]; // Don't start the handshake. // Stop must not call watchStreamDidClose because the full implementation of the delegate could // attempt to restart the stream in the event it had pending watches. [_workerDispatchQueue dispatchAsync:^{ [writeStream stop]; }]; // Simulate a final callback from GRPC [_workerDispatchQueue dispatchAsync:^{ [writeStream.callbackFilter writesFinishedWithError:nil]; }]; [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]]; } - (void)testWriteStreamStopAfterHandshake { FSTWriteStream *writeStream = [self setUpWriteStream]; [_delegate awaitNotificationFromBlock:^{ [writeStream startWithDelegate:_delegate]; }]; // Writing before the handshake should throw [_workerDispatchQueue dispatchSync:^{ XCTAssertThrows([writeStream writeMutations:_mutations]); }]; [_delegate awaitNotificationFromBlock:^{ [writeStream writeHandshake]; }]; // Now writes should succeed [_delegate awaitNotificationFromBlock:^{ [writeStream writeMutations:_mutations]; }]; [_workerDispatchQueue dispatchAsync:^{ [writeStream stop]; }]; [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake", @"writeStreamDidReceiveResponseWithVersion" ]]; } - (void)testStreamClosesWhenIdle { FSTWriteStream *writeStream = [self setUpWriteStream]; [_delegate awaitNotificationFromBlock:^{ [writeStream startWithDelegate:_delegate]; }]; [_delegate awaitNotificationFromBlock:^{ [writeStream writeHandshake]; }]; [_workerDispatchQueue dispatchAsync:^{ [writeStream markIdle]; XCTAssertTrue( [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]); }]; [_workerDispatchQueue runDelayedCallbacksUntil:FSTTimerIDWriteStreamIdle]; [_workerDispatchQueue dispatchSync:^{ XCTAssertFalse([writeStream isOpen]); }]; [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake", @"writeStreamWasInterrupted" ]]; } - (void)testStreamCancelsIdleOnWrite { FSTWriteStream *writeStream = [self setUpWriteStream]; [_delegate awaitNotificationFromBlock:^{ [writeStream startWithDelegate:_delegate]; }]; [_delegate awaitNotificationFromBlock:^{ [writeStream writeHandshake]; }]; // Mark the stream idle, but immediately cancel the idle timer by issuing another write. [_delegate awaitNotificationFromBlock:^{ [writeStream markIdle]; XCTAssertTrue( [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]); [writeStream writeMutations:_mutations]; XCTAssertFalse( [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]); }]; [_workerDispatchQueue dispatchSync:^{ XCTAssertTrue([writeStream isOpen]); }]; [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake", @"writeStreamDidReceiveResponseWithVersion" ]]; } class MockCredentialsProvider : public firebase::firestore::auth::EmptyCredentialsProvider { public: MockCredentialsProvider() { observed_states_ = [NSMutableArray new]; } void GetToken(firebase::firestore::auth::TokenListener completion) override { [observed_states_ addObject:@"GetToken"]; EmptyCredentialsProvider::GetToken(std::move(completion)); } void InvalidateToken() override { [observed_states_ addObject:@"InvalidateToken"]; EmptyCredentialsProvider::InvalidateToken(); } NSMutableArray *observed_states() const { return observed_states_; } private: NSMutableArray *observed_states_; }; - (void)testStreamRefreshesTokenUponExpiration { MockCredentialsProvider credentials; FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo workerDispatchQueue:_workerDispatchQueue credentials:&credentials]; FSTWatchStream *watchStream = [datastore createWatchStream]; [_delegate awaitNotificationFromBlock:^{ [watchStream startWithDelegate:_delegate]; }]; // Simulate callback from GRPC with an unauthenticated error -- this should invalidate the token. NSError *unauthenticatedError = [NSError errorWithDomain:FIRFirestoreErrorDomain code:FIRFirestoreErrorCodeUnauthenticated userInfo:nil]; dispatch_async(_testQueue, ^{ [watchStream.callbackFilter writesFinishedWithError:unauthenticatedError]; }); // Drain the queue. dispatch_sync(_testQueue, ^{ }); // Try reconnecting. [_delegate awaitNotificationFromBlock:^{ [watchStream startWithDelegate:_delegate]; }]; // Simulate a different error -- token should not be invalidated this time. NSError *unavailableError = [NSError errorWithDomain:FIRFirestoreErrorDomain code:FIRFirestoreErrorCodeUnavailable userInfo:nil]; dispatch_async(_testQueue, ^{ [watchStream.callbackFilter writesFinishedWithError:unavailableError]; }); dispatch_sync(_testQueue, ^{ }); [_delegate awaitNotificationFromBlock:^{ [watchStream startWithDelegate:_delegate]; }]; dispatch_sync(_testQueue, ^{ }); NSArray *expected = @[ @"GetToken", @"InvalidateToken", @"GetToken", @"GetToken" ]; XCTAssertEqualObjects(credentials.observed_states(), expected); } @end