aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/Source/Remote/FSTStream.mm
diff options
context:
space:
mode:
Diffstat (limited to 'Firestore/Source/Remote/FSTStream.mm')
-rw-r--r--Firestore/Source/Remote/FSTStream.mm80
1 files changed, 39 insertions, 41 deletions
diff --git a/Firestore/Source/Remote/FSTStream.mm b/Firestore/Source/Remote/FSTStream.mm
index f4ec675..3a6c035 100644
--- a/Firestore/Source/Remote/FSTStream.mm
+++ b/Firestore/Source/Remote/FSTStream.mm
@@ -26,10 +26,8 @@
#import "Firestore/Source/Remote/FSTExponentialBackoff.h"
#import "Firestore/Source/Remote/FSTSerializerBeta.h"
#import "Firestore/Source/Remote/FSTStream.h"
-#import "Firestore/Source/Util/FSTAssert.h"
#import "Firestore/Source/Util/FSTClasses.h"
#import "Firestore/Source/Util/FSTDispatchQueue.h"
-#import "Firestore/Source/Util/FSTLogger.h"
#import "Firestore/Protos/objc/google/firestore/v1beta1/Firestore.pbrpc.h"
@@ -38,6 +36,8 @@
#include "Firestore/core/src/firebase/firestore/model/database_id.h"
#include "Firestore/core/src/firebase/firestore/model/snapshot_version.h"
#include "Firestore/core/src/firebase/firestore/util/error_apple.h"
+#include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
+#include "Firestore/core/src/firebase/firestore/util/log.h"
#include "Firestore/core/src/firebase/firestore/util/string_apple.h"
namespace util = firebase::firestore::util;
@@ -258,11 +258,11 @@ static const NSTimeInterval kIdleTimeout = 60.0;
return;
}
- FSTLog(@"%@ %p start", NSStringFromClass([self class]), (__bridge void *)self);
- FSTAssert(self.state == FSTStreamStateInitial, @"Already started");
+ LOG_DEBUG("%s %s start", NSStringFromClass([self class]), (__bridge void *)self);
+ HARD_ASSERT(self.state == FSTStreamStateInitial, "Already started");
self.state = FSTStreamStateAuth;
- FSTAssert(_delegate == nil, @"Delegate must be nil");
+ HARD_ASSERT(_delegate == nil, "Delegate must be nil");
_delegate = delegate;
_credentials->GetToken(
@@ -281,8 +281,7 @@ static const NSTimeInterval kIdleTimeout = 60.0;
// Streams can be stopped while waiting for authorization.
return;
}
- FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
- (long)self.state);
+ HARD_ASSERT(self.state == FSTStreamStateAuth, "State should still be auth (was %s)", self.state);
// TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token,
// but I'm not sure how to detect that right now. http://b/32762461
@@ -301,7 +300,7 @@ static const NSTimeInterval kIdleTimeout = 60.0;
prepareHeadersForRPC:_rpc
databaseID:&self.databaseInfo->database_id()
token:(token.user().is_authenticated() ? token.token() : absl::string_view())];
- FSTAssert(_callbackFilter == nil, @"GRX Filter must be nil");
+ HARD_ASSERT(_callbackFilter == nil, "GRX Filter must be nil");
_callbackFilter = [[FSTCallbackFilter alloc] initWithStream:self];
[_rpc startWithWriteable:_callbackFilter];
@@ -311,10 +310,10 @@ static const NSTimeInterval kIdleTimeout = 60.0;
/** Backs off after an error. */
- (void)performBackoffWithDelegate:(id)delegate {
- FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self);
+ LOG_DEBUG("%s %s backoff", NSStringFromClass([self class]), (__bridge void *)self);
[self.workerDispatchQueue verifyIsCurrentQueue];
- FSTAssert(self.state == FSTStreamStateError, @"Should only perform backoff in an error case");
+ HARD_ASSERT(self.state == FSTStreamStateError, "Should only perform backoff in an error case");
self.state = FSTStreamStateBackoff;
FSTWeakify(self);
@@ -334,13 +333,13 @@ static const NSTimeInterval kIdleTimeout = 60.0;
// In order to have performed a backoff the stream must have been in an error state just prior
// to entering the backoff state. If we weren't stopped we must be in the backoff state.
- FSTAssert(self.state == FSTStreamStateBackoff, @"State should still be backoff (was %ld)",
- (long)self.state);
+ HARD_ASSERT(self.state == FSTStreamStateBackoff, "State should still be backoff (was %s)",
+ self.state);
// Momentarily set state to FSTStreamStateInitial as `start` expects it.
self.state = FSTStreamStateInitial;
[self startWithDelegate:delegate];
- FSTAssert([self isStarted], @"Stream should have started.");
+ HARD_ASSERT([self isStarted], "Stream should have started.");
}
/**
@@ -365,8 +364,8 @@ static const NSTimeInterval kIdleTimeout = 60.0;
* @param error the NSError the connection was closed with.
*/
- (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *)error {
- FSTAssert(finalState == FSTStreamStateError || error == nil,
- @"Can't provide an error when not in an error state.");
+ HARD_ASSERT(finalState == FSTStreamStateError || error == nil,
+ "Can't provide an error when not in an error state.");
[self.workerDispatchQueue verifyIsCurrentQueue];
@@ -381,13 +380,13 @@ static const NSTimeInterval kIdleTimeout = 60.0;
// If this is an intentional close ensure we don't delay our next connection attempt.
[self.backoff reset];
} else if (error != nil && error.code == FIRFirestoreErrorCodeResourceExhausted) {
- FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class],
- (__bridge void *)self);
+ LOG_DEBUG("%s %s Using maximum backoff delay to prevent overloading the backend.", [self class],
+ (__bridge void *)self);
[self.backoff resetToMax];
}
if (finalState != FSTStreamStateError) {
- FSTLog(@"%@ %p Performing stream teardown", [self class], (__bridge void *)self);
+ LOG_DEBUG("%s %s Performing stream teardown", [self class], (__bridge void *)self);
[self tearDown];
}
@@ -395,7 +394,7 @@ static const NSTimeInterval kIdleTimeout = 60.0;
// Clean up the underlying RPC. If this close: is in response to an error, don't attempt to
// call half-close to avoid secondary failures.
if (finalState != FSTStreamStateError) {
- FSTLog(@"%@ %p Closing stream client-side", [self class], (__bridge void *)self);
+ LOG_DEBUG("%s %s Closing stream client-side", [self class], (__bridge void *)self);
@synchronized(self.requestsWriter) {
[self.requestsWriter finishWithError:nil];
}
@@ -426,15 +425,14 @@ static const NSTimeInterval kIdleTimeout = 60.0;
}
- (void)stop {
- FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self);
+ LOG_DEBUG("%s %s stop", NSStringFromClass([self class]), (__bridge void *)self);
if ([self isStarted]) {
[self closeWithFinalState:FSTStreamStateStopped error:nil];
}
}
- (void)inhibitBackoff {
- FSTAssert(![self isStarted], @"Can only inhibit backoff after an error (was %ld)",
- (long)self.state);
+ HARD_ASSERT(![self isStarted], "Can only inhibit backoff after an error (was %s)", self.state);
[self.workerDispatchQueue verifyIsCurrentQueue];
// Clear the error condition.
@@ -550,8 +548,8 @@ static const NSTimeInterval kIdleTimeout = 60.0;
* Called by the stream when the underlying RPC has been closed for whatever reason.
*/
- (void)handleStreamClose:(nullable NSError *)error {
- FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);
- FSTAssert([self isStarted], @"handleStreamClose: called for non-started stream.");
+ LOG_DEBUG("%s %s close: %s", NSStringFromClass([self class]), (__bridge void *)self, error);
+ HARD_ASSERT([self isStarted], "handleStreamClose: called for non-started stream.");
// In theory the stream could close cleanly, however, in our current model we never expect this
// to happen because if we stop a stream ourselves, this callback will never be called. To
@@ -572,14 +570,14 @@ static const NSTimeInterval kIdleTimeout = 60.0;
*/
- (void)writeValue:(id)value {
[self.workerDispatchQueue enterCheckedOperation:^{
- FSTAssert([self isStarted], @"writeValue: called for stopped stream.");
+ HARD_ASSERT([self isStarted], "writeValue: called for stopped stream.");
if (!self.messageReceived) {
self.messageReceived = YES;
if ([FIRFirestore isLoggingEnabled]) {
- FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
- (__bridge void *)self,
- [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
+ LOG_DEBUG("%s %s headers (whitelisted): %s", NSStringFromClass([self class]),
+ (__bridge void *)self,
+ [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
}
}
NSError *error;
@@ -606,7 +604,7 @@ static const NSTimeInterval kIdleTimeout = 60.0;
- (void)writesFinishedWithError:(nullable NSError *)error __used {
error = [FSTDatastore firestoreErrorForError:error];
[self.workerDispatchQueue enterCheckedOperation:^{
- FSTAssert([self isStarted], @"writesFinishedWithError: called for stopped stream.");
+ HARD_ASSERT([self isStarted], "writesFinishedWithError: called for stopped stream.");
[self handleStreamClose:error];
}];
@@ -657,7 +655,7 @@ static const NSTimeInterval kIdleTimeout = 60.0;
}
- (void)watchQuery:(FSTQueryData *)query {
- FSTAssert([self isOpen], @"Not yet open");
+ HARD_ASSERT([self isOpen], "Not yet open");
[self.workerDispatchQueue verifyIsCurrentQueue];
GCFSListenRequest *request = [GCFSListenRequest message];
@@ -665,19 +663,19 @@ static const NSTimeInterval kIdleTimeout = 60.0;
request.addTarget = [_serializer encodedTarget:query];
request.labels = [_serializer encodedListenRequestLabelsForQueryData:query];
- FSTLog(@"FSTWatchStream %p watch: %@", (__bridge void *)self, request);
+ LOG_DEBUG("FSTWatchStream %s watch: %s", (__bridge void *)self, request);
[self writeRequest:request];
}
- (void)unwatchTargetID:(FSTTargetID)targetID {
- FSTAssert([self isOpen], @"Not yet open");
+ HARD_ASSERT([self isOpen], "Not yet open");
[self.workerDispatchQueue verifyIsCurrentQueue];
GCFSListenRequest *request = [GCFSListenRequest message];
request.database = [_serializer encodedDatabaseID];
request.removeTarget = targetID;
- FSTLog(@"FSTWatchStream %p unwatch: %@", (__bridge void *)self, request);
+ LOG_DEBUG("FSTWatchStream %s unwatch: %s", (__bridge void *)self, request);
[self writeRequest:request];
}
@@ -686,7 +684,7 @@ static const NSTimeInterval kIdleTimeout = 60.0;
* watchStreamDidChange:snapshotVersion: callback.
*/
- (void)handleStreamMessage:(GCFSListenResponse *)proto {
- FSTLog(@"FSTWatchStream %p response: %@", (__bridge void *)self, proto);
+ LOG_DEBUG("FSTWatchStream %s response: %s", (__bridge void *)self, proto);
[self.workerDispatchQueue verifyIsCurrentQueue];
// A successful response means the stream is healthy.
@@ -756,8 +754,8 @@ static const NSTimeInterval kIdleTimeout = 60.0;
- (void)writeHandshake {
// The initial request cannot contain mutations, but must contain a projectID.
- FSTAssert([self isOpen], @"Not yet open");
- FSTAssert(!self.handshakeComplete, @"Handshake sent out of turn");
+ HARD_ASSERT([self isOpen], "Not yet open");
+ HARD_ASSERT(!self.handshakeComplete, "Handshake sent out of turn");
[self.workerDispatchQueue verifyIsCurrentQueue];
GCFSWriteRequest *request = [GCFSWriteRequest message];
@@ -765,13 +763,13 @@ static const NSTimeInterval kIdleTimeout = 60.0;
// TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the
// handshake, ignoring any stream token we might have.
- FSTLog(@"FSTWriteStream %p initial request: %@", (__bridge void *)self, request);
+ LOG_DEBUG("FSTWriteStream %s initial request: %s", (__bridge void *)self, request);
[self writeRequest:request];
}
- (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
- FSTAssert([self isOpen], @"Not yet open");
- FSTAssert(self.handshakeComplete, @"Mutations sent out of turn");
+ HARD_ASSERT([self isOpen], "Not yet open");
+ HARD_ASSERT(self.handshakeComplete, "Mutations sent out of turn");
[self.workerDispatchQueue verifyIsCurrentQueue];
NSMutableArray<GCFSWrite *> *protos = [NSMutableArray arrayWithCapacity:mutations.count];
@@ -783,7 +781,7 @@ static const NSTimeInterval kIdleTimeout = 60.0;
request.writesArray = protos;
request.streamToken = self.lastStreamToken;
- FSTLog(@"FSTWriteStream %p mutation request: %@", (__bridge void *)self, request);
+ LOG_DEBUG("FSTWriteStream %s mutation request: %s", (__bridge void *)self, request);
[self writeRequest:request];
}
@@ -792,7 +790,7 @@ static const NSTimeInterval kIdleTimeout = 60.0;
* that on to the mutationResultsHandler.
*/
- (void)handleStreamMessage:(GCFSWriteResponse *)response {
- FSTLog(@"FSTWriteStream %p response: %@", (__bridge void *)self, response);
+ LOG_DEBUG("FSTWriteStream %s response: %s", (__bridge void *)self, response);
[self.workerDispatchQueue verifyIsCurrentQueue];
// Always capture the last stream token.