diff options
author | Gil <mcg@google.com> | 2017-10-03 08:55:22 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-03 08:55:22 -0700 |
commit | bde743ed25166a0b320ae157bfb1d68064f531c9 (patch) | |
tree | 4dd7525d9df32fa5dbdb721d4b0d4f9b87f5e884 /Firestore/Source/Remote | |
parent | bf550507ffa8beee149383a5bf1e2363bccefbb4 (diff) |
Release 4.3.0 (#327)
Initial release of Firestore at 0.8.0
Bump FirebaseCommunity to 0.1.3
Diffstat (limited to 'Firestore/Source/Remote')
-rw-r--r-- | Firestore/Source/Remote/FSTBufferedWriter.h | 44 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTBufferedWriter.m | 134 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTDatastore.h | 365 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTDatastore.m | 1027 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTExistenceFilter.h | 31 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTExistenceFilter.m | 53 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTExponentialBackoff.h | 79 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTExponentialBackoff.m | 97 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTRemoteEvent.h | 213 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTRemoteEvent.m | 516 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTRemoteStore.h | 143 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTRemoteStore.m | 599 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTSerializerBeta.h | 110 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTSerializerBeta.m | 1084 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTWatchChange.h | 118 | ||||
-rw-r--r-- | Firestore/Source/Remote/FSTWatchChange.m | 150 |
16 files changed, 4763 insertions, 0 deletions
diff --git a/Firestore/Source/Remote/FSTBufferedWriter.h b/Firestore/Source/Remote/FSTBufferedWriter.h new file mode 100644 index 0000000..83fada6 --- /dev/null +++ b/Firestore/Source/Remote/FSTBufferedWriter.h @@ -0,0 +1,44 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import <Foundation/Foundation.h> +#import <RxLibrary/GRXWriteable.h> +#import <RxLibrary/GRXWriter.h> + +NS_ASSUME_NONNULL_BEGIN + +/** + * A buffered GRXWriter. + * + * GRPC only allows a single message to be written to a channel at a time. While the channel is + * sending, GRPC sets the state of the GRXWriter representing the request stream to + * GRXWriterStatePaused. Once the channel is ready to accept more messages GRPC sets the state of + * the writer to GRXWriterStateStarted. + * + * This class is NOT thread safe, even though it is accessed from multiple threads. To conform with + * the contract GRPC uses, all method calls on the FSTBufferedWriter must be @synchronized on the + * receiver. + */ +@interface FSTBufferedWriter : GRXWriter <GRXWriteable> + +/** + * Writes a message into the buffer. Must be called inside an @synchronized block on the receiver. + */ +- (void)writeValue:(id)value; + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTBufferedWriter.m b/Firestore/Source/Remote/FSTBufferedWriter.m new file mode 100644 index 0000000..d86e03a --- /dev/null +++ b/Firestore/Source/Remote/FSTBufferedWriter.m @@ -0,0 +1,134 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import <Protobuf/GPBProtocolBuffers.h> + +#import "FSTBufferedWriter.h" + +NS_ASSUME_NONNULL_BEGIN + +@implementation FSTBufferedWriter { + GRXWriterState _state; + NSMutableArray<NSData *> *_queue; + + id<GRXWriteable> _writeable; +} + +- (instancetype)init { + if (self = [super init]) { + _state = GRXWriterStateNotStarted; + _queue = [[NSMutableArray alloc] init]; + } + return self; +} + +#pragma mark - GRXWriteable implementation + +/** Push the next value of the sequence to the receiving object. */ +- (void)writeValue:(id)value { + if (_state == GRXWriterStateStarted && _queue.count == 0) { + // Skip the queue. + [_writeable writeValue:value]; + } else { + // Buffer the new value. Note that the value is assumed to be transient and doesn't need to + // be copied. + [_queue addObject:value]; + } +} + +/** + * Signal that the sequence is completed, or that an error ocurred. After this message is sent to + * the receiver, neither it nor writeValue: may be called again. + */ +- (void)writesFinishedWithError:(nullable NSError *)error { + // Unimplemented. If we ever wanted to implement sender-side initiated half close we could do so + // by buffering (or sending) and error. + [self doesNotRecognizeSelector:_cmd]; +} + +#pragma mark GRXWriter implementation +// The GRXWriter implementation defines the send side of the RPC stream. Once the RPC is ready it +// will call startWithWriteable passing a GRXWriteable into which requests can be written but only +// when the GRXWriter is in the started state. + +/** + * Called by GRPCCall when it is ready to accept for the first request. Requests should be written + * to the passed writeable. + * + * GRPCCall will synchronize on the receiver around this call. + */ +- (void)startWithWriteable:(id<GRXWriteable>)writeable { + _state = GRXWriterStateStarted; + _writeable = writeable; +} + +/** + * Called by GRPCCall to implement flow control on the sending side of the stream. After each + * writeValue: on the requestsWriteable, GRPCCall will call setState:GRXWriterStatePaused to apply + * backpressure. Once the stream is ready to accept another message, GRPCCall will call + * setState:GRXWriterStateStarted. + * + * GRPCCall will synchronize on the receiver around this call. + */ +- (void)setState:(GRXWriterState)newState { + // Manual transitions are only allowed from the started or paused states. + if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { + return; + } + + switch (newState) { + case GRXWriterStateFinished: + _state = newState; + // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the + // writeable to be messaged anymore. + _queue = nil; + _writeable = nil; + return; + case GRXWriterStatePaused: + _state = newState; + return; + case GRXWriterStateStarted: + if (_state == GRXWriterStatePaused) { + _state = newState; + [self writeBufferedMessages]; + } + return; + case GRXWriterStateNotStarted: + return; + } +} + +- (void)finishWithError:(nullable NSError *)error { + [_writeable writesFinishedWithError:error]; + self.state = GRXWriterStateFinished; +} + +- (void)writeBufferedMessages { + while (_state == GRXWriterStateStarted && _queue.count > 0) { + id value = _queue[0]; + [_queue removeObjectAtIndex:0]; + + // In addition to writing the value here GRPC will apply backpressure by pausing the GRXWriter + // wrapping this buffer. That writer must call -pauseMessages which will cause this loop to + // exit. Synchronization is not required since the callback happens within the body of the + // writeValue implementation. + [_writeable writeValue:value]; + } +} + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTDatastore.h b/Firestore/Source/Remote/FSTDatastore.h new file mode 100644 index 0000000..840d2fe --- /dev/null +++ b/Firestore/Source/Remote/FSTDatastore.h @@ -0,0 +1,365 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import <Foundation/Foundation.h> + +#import "FSTTypes.h" + +@class FSTDatabaseInfo; +@class FSTDocumentKey; +@class FSTDispatchQueue; +@class FSTMutation; +@class FSTMutationResult; +@class FSTQueryData; +@class FSTSnapshotVersion; +@class FSTWatchChange; +@class FSTWatchStream; +@class FSTWriteStream; +@class GRPCCall; +@class GRXWriter; + +@protocol FSTCredentialsProvider; +@protocol FSTWatchStreamDelegate; +@protocol FSTWriteStreamDelegate; + +NS_ASSUME_NONNULL_BEGIN + +/** + * FSTDatastore represents a proxy for the remote server, hiding details of the RPC layer. It: + * + * - Manages connections to the server + * - Authenticates to the server + * - Manages threading and keeps higher-level code running on the worker queue + * - Serializes internal model objects to and from protocol buffers + * + * The FSTDatastore is generally not responsible for understanding the higher-level protocol + * involved in actually making changes or reading data, and aside from the connections it manages + * is otherwise stateless. + */ +@interface FSTDatastore : NSObject + +/** Creates a new Datastore instance with the given database info. */ ++ (instancetype)datastoreWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials; + +- (instancetype)init __attribute__((unavailable("Use a static constructor method."))); + +- (instancetype)initWithDatabaseInfo:(FSTDatabaseInfo *)databaseInfo + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + NS_DESIGNATED_INITIALIZER; + +/** Converts the error to a FIRFirestoreErrorDomain error. */ ++ (NSError *)firestoreErrorForError:(NSError *)error; + +/** Returns YES if the given error indicates the RPC associated with it may not be retried. */ ++ (BOOL)isPermanentWriteError:(NSError *)error; + +/** Returns YES if the given error is a GRPC ABORTED error. **/ ++ (BOOL)isAbortedError:(NSError *)error; + +/** Looks up a list of documents in datastore. */ +- (void)lookupDocuments:(NSArray<FSTDocumentKey *> *)keys + completion:(FSTVoidMaybeDocumentArrayErrorBlock)completion; + +/** Commits data to datastore. */ +- (void)commitMutations:(NSArray<FSTMutation *> *)mutations + completion:(FSTVoidErrorBlock)completion; + +/** Creates a new watch stream. */ +- (FSTWatchStream *)createWatchStreamWithDelegate:(id<FSTWatchStreamDelegate>)delegate; + +/** Creates a new write stream. */ +- (FSTWriteStream *)createWriteStreamWithDelegate:(id<FSTWriteStreamDelegate>)delegate; + +/** The name of the database and the backend. */ +@property(nonatomic, strong, readonly) FSTDatabaseInfo *databaseInfo; + +@end + +/** + * An FSTStream is an abstract base class that represents a restartable streaming RPC to the + * Firestore backend. It's built on top of GRPC's own support for streaming RPCs, and adds several + * critical features for our clients: + * + * - Restarting a stream is allowed (after failure) + * - Exponential backoff on failure (independent of the underlying channel) + * - Authentication via FSTCredentialsProvider + * - Dispatching all callbacks into the shared worker queue + * + * Subclasses of FSTStream implement serialization of models to and from bytes (via protocol + * buffers) for a specific streaming RPC and emit events specific to the stream. + * + * ## Starting and Stopping + * + * Streaming RPCs are stateful and need to be started before messages can be sent and received. + * The FSTStream will call its delegate's specific streamDidOpen method once the stream is ready + * to accept requests. + * + * Should a `start` fail, FSTStream will call its delegate's specific streamDidClose method with an + * NSError indicating what went wrong. The delegate is free to call start again. + * + * An FSTStream can also be explicitly stopped which indicates that the caller has discarded the + * stream and no further events should be emitted. Once explicitly stopped, a stream cannot be + * restarted. + * + * ## Subclassing Notes + * + * An implementation of FSTStream needs to implement the following methods: + * - `createRPCWithRequestsWriter`, should create the specific RPC (a GRPCCall object). + * - `handleStreamOpen`, should call through to the stream-specific streamDidOpen method. + * - `handleStreamMessage`, receives protocol buffer responses from GRPC and must deserialize and + * delegate to some stream specific response method. + * - `handleStreamClose`, calls through to the stream-specific streamDidClose method. + * + * Additionally, beyond these required methods, subclasses will want to implement methods that + * take request models, serialize them, and write them to using writeRequest:. + * + * ## RPC Message Type + * + * FSTStream intentionally uses the GRPCCall interface to GRPC directly, bypassing both GRPCProtoRPC + * and GRXBufferedPipe for sending data. This has been done to avoid race conditions that come out + * of a loosely specified locking contract on GRXWriter. There's essentially no way to safely use + * any of the wrapper objects for GRXWriter (that perform buffering or conversion to/from protos). + * + * See https://github.com/grpc/grpc/issues/10957 for the kinds of things we're trying to avoid. + */ +@interface FSTStream : NSObject + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass NS_DESIGNATED_INITIALIZER; + +- (instancetype)init NS_UNAVAILABLE; + +/** + * An abstract method used by `start` to create a streaming RPC specific to this type of stream. + * The RPC should be created such that requests are taken from `self`. + * + * Note that the returned GRPCCall must not be a GRPCProtoRPC, since the rest of the streaming + * mechanism assumes it is dealing in bytes-level requests and responses. + */ +- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter; + +/** + * Returns YES if `start` has been called and no error has occurred. YES indicates the stream is + * open or in the process of opening (which encompasses respecting backoff, getting auth tokens, + * and starting the actual RPC). Use `isOpen` to determine if the stream is open and ready for + * outbound requests. + */ +- (BOOL)isStarted; + +/** Returns YES if the underlying RPC is open and the stream is ready for outbound requests. */ +- (BOOL)isOpen; + +/** + * Starts the RPC. Only allowed if isStarted returns NO. The stream is not immediately ready for + * use: the delegate's watchStreamDidOpen method will be invoked when the RPC is ready for outbound + * requests, at which point `isOpen` will return YES. + * + * When start returns, -isStarted will return YES. + */ +- (void)start; + +/** + * Stops the RPC. This call is idempotent and allowed regardless of the current isStarted state. + * + * Unlike a transient stream close, stopping a stream is permanent. This is guaranteed NOT to emit + * any further events on the stream-specific delegate, including the streamDidClose method. + * + * NOTE: This no-events contract may seem counter-intuitive but allows the caller to + * straightforwardly sequence stream tear-down without having to worry about when the delegate's + * streamDidClose methods will get called. For example if the stream must be exchanged for another + * during a user change this allows `stop` to be called eagerly without worrying about the + * streamDidClose method accidentally restarting the stream before the new one is ready. + * + * When stop returns, -isStarted and -isOpen will both return NO. + */ +- (void)stop; + +/** + * After an error the stream will usually back off on the next attempt to start it. If the error + * warrants an immediate restart of the stream, the sender can use this to indicate that the + * receiver should not back off. + * + * Each error will call the stream-specific streamDidClose method. That method can decide to + * inhibit backoff if required. + */ +- (void)inhibitBackoff; + +@end + +#pragma mark - FSTWatchStream + +/** A protocol defining the events that can be emitted by the FSTWatchStream. */ +@protocol FSTWatchStreamDelegate <NSObject> + +/** Called by the FSTWatchStream when it is ready to accept outbound request messages. */ +- (void)watchStreamDidOpen; + +/** + * Called by the FSTWatchStream with changes and the snapshot versions included in in the + * WatchChange responses sent back by the server. + */ +- (void)watchStreamDidChange:(FSTWatchChange *)change + snapshotVersion:(FSTSnapshotVersion *)snapshotVersion; + +/** + * Called by the FSTWatchStream when the underlying streaming RPC is closed for whatever reason, + * usually because of an error, but possibly due to an idle timeout. The error passed to this + * method may be nil, in which case the stream was closed without attributable fault. + * + * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping" + * on FSTStream for details. + */ +- (void)watchStreamDidClose:(NSError *_Nullable)error; + +@end + +/** + * An FSTStream that implements the StreamingWatch RPC. + * + * Once the FSTWatchStream has called the streamDidOpen method, any number of watchQuery and + * unwatchTargetId calls can be sent to control what changes will be sent from the server for + * WatchChanges. + */ +@interface FSTWatchStream : FSTStream + +/** + * Initializes the watch stream with its dependencies. + */ +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass + delegate:(id<FSTWatchStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER; + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE; + +- (instancetype)init NS_UNAVAILABLE; + +/** + * Registers interest in the results of the given query. If the query includes a resumeToken it + * will be included in the request. Results that affect the query will be streamed back as + * WatchChange messages that reference the targetID included in |query|. + */ +- (void)watchQuery:(FSTQueryData *)query; + +/** Unregisters interest in the results of the query associated with the given target ID. */ +- (void)unwatchTargetID:(FSTTargetID)targetID; + +@property(nonatomic, weak, readonly) id<FSTWatchStreamDelegate> delegate; + +@end + +#pragma mark - FSTWriteStream + +@protocol FSTWriteStreamDelegate <NSObject> + +/** Called by the FSTWriteStream when it is ready to accept outbound request messages. */ +- (void)writeStreamDidOpen; + +/** + * Called by the FSTWriteStream upon a successful handshake response from the server, which is the + * receiver's cue to send any pending writes. + */ +- (void)writeStreamDidCompleteHandshake; + +/** + * Called by the FSTWriteStream upon receiving a StreamingWriteResponse from the server that + * contains mutation results. + */ +- (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion + mutationResults:(NSArray<FSTMutationResult *> *)results; + +/** + * Called when the FSTWriteStream's underlying RPC is closed for whatever reason, usually because + * of an error, but possibly due to an idle timeout. The error passed to this method may be nil, in + * which case the stream was closed without attributable fault. + * + * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping" + * on FSTStream for details. + */ +- (void)writeStreamDidClose:(NSError *_Nullable)error; + +@end + +/** + * An FSTStream that implements the StreamingWrite RPC. + * + * The StreamingWrite RPC requires the caller to maintain special `streamToken` state in between + * calls, to help the server understand which responses the client has processed by the time the + * next request is made. Every response may contain a `streamToken`; this value must be passed to + * the next request. + * + * After calling `start` on this stream, the next request must be a handshake, containing whatever + * streamToken is on hand. Once a response to this request is received, all pending mutations may + * be submitted. When submitting multiple batches of mutations at the same time, it's okay to use + * the same streamToken for the calls to `writeMutations:`. + */ +@interface FSTWriteStream : FSTStream + +/** + * Initializes the write stream with its dependencies. + */ +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass + delegate:(id<FSTWriteStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER; + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE; + +- (instancetype)init NS_UNAVAILABLE; + +/** + * Sends an initial streamToken to the server, performing the handshake required to make the + * StreamingWrite RPC work. Subsequent `writeMutations:` calls should wait until a response has + * been delivered to the delegate's writeStreamDidCompleteHandshake method. + */ +- (void)writeHandshake; + +/** Sends a group of mutations to the Firestore backend to apply. */ +- (void)writeMutations:(NSArray<FSTMutation *> *)mutations; + +@property(nonatomic, weak, readonly) id<FSTWriteStreamDelegate> delegate; + +/** + * Tracks whether or not a handshake has been successfully exchanged and the stream is ready to + * accept mutations. + */ +@property(nonatomic, assign, readwrite, getter=isHandshakeComplete) BOOL handshakeComplete; + +/** + * The last received stream token from the server, used to acknowledge which responses the client + * has processed. Stream tokens are opaque checkpoint markers whose only real value is their + * inclusion in the next request. + * + * FSTWriteStream manages propagating this value from responses to the next request. + */ +@property(nonatomic, strong, nullable) NSData *lastStreamToken; + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTDatastore.m b/Firestore/Source/Remote/FSTDatastore.m new file mode 100644 index 0000000..3ed2729 --- /dev/null +++ b/Firestore/Source/Remote/FSTDatastore.m @@ -0,0 +1,1027 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "FSTDatastore.h" + +#import <GRPCClient/GRPCCall+OAuth2.h> +#import <GRPCClient/GRPCCall.h> +#import <ProtoRPC/ProtoRPC.h> + +#import "FIRFirestore+Internal.h" +#import "FIRFirestoreErrors.h" +#import "FIRFirestoreVersion.h" +#import "FSTAssert.h" +#import "FSTBufferedWriter.h" +#import "FSTClasses.h" +#import "FSTCredentialsProvider.h" +#import "FSTDatabaseID.h" +#import "FSTDatabaseInfo.h" +#import "FSTDispatchQueue.h" +#import "FSTDocument.h" +#import "FSTDocumentKey.h" +#import "FSTExponentialBackoff.h" +#import "FSTLocalStore.h" +#import "FSTLogger.h" +#import "FSTMutation.h" +#import "FSTQueryData.h" +#import "FSTSerializerBeta.h" + +#import "Firestore.pbrpc.h" + +NS_ASSUME_NONNULL_BEGIN + +// GRPC does not publicly declare a means of disabling SSL, which we need for testing. Firestore +// directly exposes an sslEnabled setting so this is required to plumb that through. Note that our +// own tests depend on this working so we'll know if this changes upstream. +@interface GRPCHost ++ (nullable instancetype)hostWithAddress:(NSString *)address; +@property(nonatomic, getter=isSecure) BOOL secure; +@end + +/** + * Initial backoff time in seconds after an error. + * Set to 1s according to https://cloud.google.com/apis/design/errors. + */ +static const NSTimeInterval kBackoffInitialDelay = 1; +static const NSTimeInterval kBackoffMaxDelay = 60.0; +static const double kBackoffFactor = 1.5; +static NSString *const kXGoogAPIClientHeader = @"x-goog-api-client"; +static NSString *const kGoogleCloudResourcePrefix = @"google-cloud-resource-prefix"; + +/** Function typedef used to create RPCs. */ +typedef GRPCProtoCall * (^RPCFactory)(); + +#pragma mark - FSTStream + +/** The state of a stream. */ +typedef NS_ENUM(NSInteger, FSTStreamState) { + /** + * The streaming RPC is not running and there's no error condition. Calling `start` will + * start the stream immediately without backoff. While in this state -isStarted will return NO. + */ + FSTStreamStateInitial = 0, + + /** + * The stream is starting, and is waiting for an auth token to attach to the initial request. + * While in this state, isStarted will return YES but isOpen will return NO. + */ + FSTStreamStateAuth, + + /** + * The streaming RPC is up and running. Requests and responses can flow freely. Both + * isStarted and isOpen will return YES. + */ + FSTStreamStateOpen, + + /** + * The stream encountered an error. The next start attempt will back off. While in this state + * -isStarted will return NO. + */ + FSTStreamStateError, + + /** + * An in-between state after an error where the stream is waiting before re-starting. After + * waiting is complete, the stream will try to open. While in this state -isStarted will + * return YES but isOpen will return NO. + */ + FSTStreamStateBackoff, + + /** + * The stream has been explicitly stopped; no further events will be emitted. + */ + FSTStreamStateStopped, +}; + +// We need to declare these classes first so that Datastore can alloc them. + +@interface FSTWatchStream () + +/** The delegate that will receive events generated by the watch stream. */ +@property(nonatomic, weak, nullable) id<FSTWatchStreamDelegate> delegate; + +@end + +@interface FSTBetaWatchStream : FSTWatchStream + +/** + * Initializes the watch stream with its dependencies. + */ +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + serializer:(FSTSerializerBeta *)serializer + delegate:(id<FSTWatchStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER; + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass + delegate:(id<FSTWatchStreamDelegate>)delegate NS_UNAVAILABLE; + +@end + +@interface FSTWriteStream () + +@property(nonatomic, weak, nullable) id<FSTWriteStreamDelegate> delegate; + +@end + +@interface FSTBetaWriteStream : FSTWriteStream + +/** + * Initializes the write stream with its dependencies. + */ +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + serializer:(FSTSerializerBeta *)serializer + delegate:(id<FSTWriteStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER; + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass + delegate:(id<FSTWriteStreamDelegate>)delegate NS_UNAVAILABLE; + +@end + +@interface FSTStream () <GRXWriteable> + +@property(nonatomic, strong, readonly) FSTDatabaseInfo *databaseInfo; +@property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue; +@property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials; +@property(nonatomic, unsafe_unretained, readonly) Class responseMessageClass; +@property(nonatomic, strong, readonly) FSTExponentialBackoff *backoff; + +/** A flag tracking whether the stream received a message from the backend. */ +@property(nonatomic, assign) BOOL messageReceived; + +/** + * Stream state as exposed to consumers of FSTStream. This differs from GRXWriter's notion of the + * state of the stream. + */ +@property(nonatomic, assign) FSTStreamState state; + +/** The RPC handle. Used for cancellation. */ +@property(nonatomic, strong, nullable) GRPCCall *rpc; + +/** + * The send-side of the RPC stream in which to submit requests, but only once the underlying RPC has + * started. + */ +@property(nonatomic, strong, nullable) FSTBufferedWriter *requestsWriter; + +@end + +#pragma mark - FSTDatastore + +@interface FSTDatastore () + +/** The GRPC service for Firestore. */ +@property(nonatomic, strong, readonly) GCFSFirestore *service; + +@property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue; + +/** An object for getting an auth token before each request. */ +@property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials; + +@property(nonatomic, strong, readonly) FSTSerializerBeta *serializer; + +@end + +@implementation FSTDatastore + ++ (instancetype)datastoreWithDatabase:(FSTDatabaseInfo *)databaseInfo + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials { + return [[FSTDatastore alloc] initWithDatabaseInfo:databaseInfo + workerDispatchQueue:workerDispatchQueue + credentials:credentials]; +} + +- (instancetype)initWithDatabaseInfo:(FSTDatabaseInfo *)databaseInfo + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials { + if (self = [super init]) { + _databaseInfo = databaseInfo; + if (!databaseInfo.isSSLEnabled) { + GRPCHost *hostConfig = [GRPCHost hostWithAddress:databaseInfo.host]; + hostConfig.secure = NO; + } + _service = [GCFSFirestore serviceWithHost:databaseInfo.host]; + _workerDispatchQueue = workerDispatchQueue; + _credentials = credentials; + _serializer = [[FSTSerializerBeta alloc] initWithDatabaseID:databaseInfo.databaseID]; + } + return self; +} + +- (NSString *)description { + return [NSString stringWithFormat:@"<FSTDatastore: %@>", self.databaseInfo]; +} + +/** + * Converts the error to an error within the domain FIRFirestoreErrorDomain. + */ ++ (NSError *)firestoreErrorForError:(NSError *)error { + if (!error) { + return error; + } else if ([error.domain isEqualToString:FIRFirestoreErrorDomain]) { + return error; + } else if ([error.domain isEqualToString:kGRPCErrorDomain]) { + FSTAssert(error.code >= GRPCErrorCodeCancelled && error.code <= GRPCErrorCodeUnauthenticated, + @"Unknown GRPC error code: %ld", (long)error.code); + return + [NSError errorWithDomain:FIRFirestoreErrorDomain code:error.code userInfo:error.userInfo]; + } else { + return [NSError errorWithDomain:FIRFirestoreErrorDomain + code:FIRFirestoreErrorCodeUnknown + userInfo:@{NSUnderlyingErrorKey : error}]; + } +} + ++ (BOOL)isAbortedError:(NSError *)error { + FSTAssert([error.domain isEqualToString:FIRFirestoreErrorDomain], + @"isAbortedError: only works with errors emitted by FSTDatastore."); + return error.code == FIRFirestoreErrorCodeAborted; +} + ++ (BOOL)isPermanentWriteError:(NSError *)error { + FSTAssert([error.domain isEqualToString:FIRFirestoreErrorDomain], + @"isPerminanteWriteError: only works with errors emitted by FSTDatastore."); + switch (error.code) { + case FIRFirestoreErrorCodeCancelled: + case FIRFirestoreErrorCodeUnknown: + case FIRFirestoreErrorCodeDeadlineExceeded: + case FIRFirestoreErrorCodeResourceExhausted: + case FIRFirestoreErrorCodeInternal: + case FIRFirestoreErrorCodeUnavailable: + case FIRFirestoreErrorCodeUnauthenticated: + // Unauthenticated means something went wrong with our token and we need + // to retry with new credentials which will happen automatically. + // TODO(b/37325376): Give up after second unauthenticated error. + return NO; + case FIRFirestoreErrorCodeInvalidArgument: + case FIRFirestoreErrorCodeNotFound: + case FIRFirestoreErrorCodeAlreadyExists: + case FIRFirestoreErrorCodePermissionDenied: + case FIRFirestoreErrorCodeFailedPrecondition: + case FIRFirestoreErrorCodeAborted: + // Aborted might be retried in some scenarios, but that is dependant on + // the context and should handled individually by the calling code. + // See https://cloud.google.com/apis/design/errors + case FIRFirestoreErrorCodeOutOfRange: + case FIRFirestoreErrorCodeUnimplemented: + case FIRFirestoreErrorCodeDataLoss: + default: + return YES; + } +} + +/** Returns the string to be used as x-goog-api-client header value. */ ++ (NSString *)googAPIClientHeaderValue { + // TODO(dimond): This should ideally also include the grpc version, however, gRPC defines the + // version as a macro, so it would be hardcoded based on version we have at compile time of + // the Firestore library, rather than the version available at runtime/at compile time by the + // user of the library. + return [NSString stringWithFormat:@"gl-objc/ fire/%s grpc/", FirebaseFirestoreVersionString]; +} + +/** Returns the string to be used as google-cloud-resource-prefix header value. */ ++ (NSString *)googleCloudResourcePrefixForDatabaseID:(FSTDatabaseID *)databaseID { + return [NSString + stringWithFormat:@"projects/%@/databases/%@", databaseID.projectID, databaseID.databaseID]; +} +/** + * Takes a dictionary of (HTTP) response headers and returns the set of whitelisted headers + * (for logging purposes). + */ ++ (NSDictionary<NSString *, NSString *> *)extractWhiteListedHeaders: + (NSDictionary<NSString *, NSString *> *)headers { + NSMutableDictionary<NSString *, NSString *> *whiteListedHeaders = + [NSMutableDictionary dictionary]; + NSArray<NSString *> *whiteList = @[ + @"date", @"x-google-backends", @"x-google-netmon-label", @"x-google-service", + @"x-google-gfe-request-trace" + ]; + [headers + enumerateKeysAndObjectsUsingBlock:^(NSString *headerName, NSString *headerValue, BOOL *stop) { + if ([whiteList containsObject:[headerName lowercaseString]]) { + whiteListedHeaders[headerName] = headerValue; + } + }]; + return whiteListedHeaders; +} + +/** Logs the (whitelisted) headers returned for an GRPCProtoCall RPC. */ ++ (void)logHeadersForRPC:(GRPCProtoCall *)rpc RPCName:(NSString *)rpcName { + if ([FIRFirestore isLoggingEnabled]) { + FSTLog(@"RPC %@ returned headers (whitelisted): %@", rpcName, + [FSTDatastore extractWhiteListedHeaders:rpc.responseHeaders]); + } +} + +- (void)commitMutations:(NSArray<FSTMutation *> *)mutations + completion:(FSTVoidErrorBlock)completion { + GCFSCommitRequest *request = [GCFSCommitRequest message]; + request.database = [self.serializer encodedDatabaseID]; + + NSMutableArray<GCFSWrite *> *mutationProtos = [NSMutableArray array]; + for (FSTMutation *mutation in mutations) { + [mutationProtos addObject:[self.serializer encodedMutation:mutation]]; + } + request.writesArray = mutationProtos; + + RPCFactory rpcFactory = ^GRPCProtoCall * { + __block GRPCProtoCall *rpc = [self.service + RPCToCommitWithRequest:request + handler:^(GCFSCommitResponse *response, NSError *_Nullable error) { + error = [FSTDatastore firestoreErrorForError:error]; + [self.workerDispatchQueue dispatchAsync:^{ + FSTLog(@"RPC CommitRequest completed. Error: %@", error); + [FSTDatastore logHeadersForRPC:rpc RPCName:@"CommitRequest"]; + completion(error); + }]; + }]; + return rpc; + }; + + [self invokeRPCWithFactory:rpcFactory errorHandler:completion]; +} + +- (void)lookupDocuments:(NSArray<FSTDocumentKey *> *)keys + completion:(FSTVoidMaybeDocumentArrayErrorBlock)completion { + GCFSBatchGetDocumentsRequest *request = [GCFSBatchGetDocumentsRequest message]; + request.database = [self.serializer encodedDatabaseID]; + for (FSTDocumentKey *key in keys) { + [request.documentsArray addObject:[self.serializer encodedDocumentKey:key]]; + } + + __block FSTMaybeDocumentDictionary *results = + [FSTMaybeDocumentDictionary maybeDocumentDictionary]; + + RPCFactory rpcFactory = ^GRPCProtoCall * { + __block GRPCProtoCall *rpc = [self.service + RPCToBatchGetDocumentsWithRequest:request + eventHandler:^(BOOL done, + GCFSBatchGetDocumentsResponse *_Nullable response, + NSError *_Nullable error) { + error = [FSTDatastore firestoreErrorForError:error]; + [self.workerDispatchQueue dispatchAsync:^{ + if (error) { + FSTLog(@"RPC BatchGetDocuments completed. Error: %@", error); + [FSTDatastore logHeadersForRPC:rpc RPCName:@"BatchGetDocuments"]; + completion(nil, error); + return; + } + + if (!done) { + // Streaming response, accumulate result + FSTMaybeDocument *doc = + [self.serializer decodedMaybeDocumentFromBatch:response]; + results = [results dictionaryBySettingObject:doc forKey:doc.key]; + } else { + // Streaming response is done, call completion + FSTLog(@"RPC BatchGetDocuments completed successfully."); + [FSTDatastore logHeadersForRPC:rpc RPCName:@"BatchGetDocuments"]; + FSTAssert(!response, @"Got response after done."); + NSMutableArray<FSTMaybeDocument *> *docs = + [NSMutableArray arrayWithCapacity:keys.count]; + for (FSTDocumentKey *key in keys) { + [docs addObject:results[key]]; + } + completion(docs, nil); + } + }]; + }]; + return rpc; + }; + + [self invokeRPCWithFactory:rpcFactory + errorHandler:^(NSError *_Nonnull error) { + error = [FSTDatastore firestoreErrorForError:error]; + completion(nil, error); + }]; +} + +- (void)invokeRPCWithFactory:(GRPCProtoCall * (^)())rpcFactory + errorHandler:(FSTVoidErrorBlock)errorHandler { + // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token, + // but I'm not sure how to detect that right now. http://b/32762461 + [self.credentials + getTokenForcingRefresh:NO + completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) { + error = [FSTDatastore firestoreErrorForError:error]; + [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{ + if (error) { + errorHandler(error); + } else { + GRPCProtoCall *rpc = rpcFactory(); + [FSTDatastore prepareHeadersForRPC:rpc + databaseID:self.databaseInfo.databaseID + token:result.token]; + [rpc start]; + } + }]; + }]; +} + +- (FSTWatchStream *)createWatchStreamWithDelegate:(id<FSTWatchStreamDelegate>)delegate { + return [[FSTBetaWatchStream alloc] initWithDatabase:_databaseInfo + workerDispatchQueue:_workerDispatchQueue + credentials:_credentials + serializer:_serializer + delegate:delegate]; +} + +- (FSTWriteStream *)createWriteStreamWithDelegate:(id<FSTWriteStreamDelegate>)delegate { + return [[FSTBetaWriteStream alloc] initWithDatabase:_databaseInfo + workerDispatchQueue:_workerDispatchQueue + credentials:_credentials + serializer:_serializer + delegate:delegate]; +} + +/** Adds headers to the RPC including any OAuth access token if provided .*/ ++ (void)prepareHeadersForRPC:(GRPCCall *)rpc + databaseID:(FSTDatabaseID *)databaseID + token:(nullable NSString *)token { + rpc.oauth2AccessToken = token; + rpc.requestHeaders[kXGoogAPIClientHeader] = [FSTDatastore googAPIClientHeaderValue]; + // This header is used to improve routing and project isolation by the backend. + rpc.requestHeaders[kGoogleCloudResourcePrefix] = + [FSTDatastore googleCloudResourcePrefixForDatabaseID:databaseID]; +} + +@end + +#pragma mark - FSTStream + +@implementation FSTStream + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass { + if (self = [super init]) { + _databaseInfo = database; + _workerDispatchQueue = workerDispatchQueue; + _credentials = credentials; + _responseMessageClass = responseMessageClass; + + _backoff = [FSTExponentialBackoff exponentialBackoffWithDispatchQueue:workerDispatchQueue + initialDelay:kBackoffInitialDelay + backoffFactor:kBackoffFactor + maxDelay:kBackoffMaxDelay]; + _state = FSTStreamStateInitial; + } + return self; +} + +- (BOOL)isStarted { + [self.workerDispatchQueue verifyIsCurrentQueue]; + FSTStreamState state = self.state; + return state == FSTStreamStateBackoff || state == FSTStreamStateAuth || + state == FSTStreamStateOpen; +} + +- (BOOL)isOpen { + [self.workerDispatchQueue verifyIsCurrentQueue]; + return self.state == FSTStreamStateOpen; +} + +- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter { + @throw FSTAbstractMethodException(); // NOLINT +} + +- (void)start { + [self.workerDispatchQueue verifyIsCurrentQueue]; + + if (self.state == FSTStreamStateError) { + [self performBackoff]; + return; + } + + FSTLog(@"%@ %p start", NSStringFromClass([self class]), (__bridge void *)self); + FSTAssert(self.state == FSTStreamStateInitial, @"Already started"); + + self.state = FSTStreamStateAuth; + + [self.credentials + getTokenForcingRefresh:NO + completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) { + error = [FSTDatastore firestoreErrorForError:error]; + [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{ + [self resumeStartWithToken:result error:error]; + }]; + }]; +} + +/** Add an access token to our RPC, after obtaining one from the credentials provider. */ +- (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error { + if (self.state == FSTStreamStateStopped) { + // Streams can be stopped while waiting for authorization. + return; + } + + [self.workerDispatchQueue verifyIsCurrentQueue]; + FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)", + (long)self.state); + + // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token, + // but I'm not sure how to detect that right now. http://b/32762461 + if (error) { + // RPC has not been started yet, so just invoke higher-level close handler. + [self handleStreamClose:error]; + return; + } + + self.requestsWriter = [[FSTBufferedWriter alloc] init]; + _rpc = [self createRPCWithRequestsWriter:self.requestsWriter]; + [FSTDatastore prepareHeadersForRPC:_rpc + databaseID:self.databaseInfo.databaseID + token:token.token]; + [_rpc startWithWriteable:self]; + + self.state = FSTStreamStateOpen; + [self handleStreamOpen]; +} + +/** Backs off after an error. */ +- (void)performBackoff { + FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + FSTAssert(self.state == FSTStreamStateError, @"Should only perform backoff in an error case"); + self.state = FSTStreamStateBackoff; + + FSTWeakify(self); + [self.backoff backoffAndRunBlock:^{ + FSTStrongify(self); + [self resumeStartFromBackoff]; + }]; +} + +/** Resumes stream start after backing off. */ +- (void)resumeStartFromBackoff { + if (self.state == FSTStreamStateStopped) { + // Streams can be stopped while waiting for backoff to complete. + return; + } + + // In order to have performed a backoff the stream must have been in an error state just prior + // to entering the backoff state. If we weren't stopped we must be in the backoff state. + FSTAssert(self.state == FSTStreamStateBackoff, @"State should still be backoff (was %ld)", + (long)self.state); + + // Momentarily set state to FSTStreamStateInitial as `start` expects it. + self.state = FSTStreamStateInitial; + [self start]; + FSTAssert([self isStarted], @"Stream should have started."); +} + +- (void)stop { + FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + // Prevent any possible future restart of this stream. + self.state = FSTStreamStateStopped; + + // Close the stream client side. + FSTBufferedWriter *requestsWriter = self.requestsWriter; + @synchronized(requestsWriter) { + [requestsWriter finishWithError:nil]; + } +} + +- (void)inhibitBackoff { + FSTAssert(![self isStarted], @"Can only inhibit backoff after an error (was %ld)", + (long)self.state); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + // Clear the error condition. + self.state = FSTStreamStateInitial; + [self.backoff reset]; +} + +/** + * Parses a protocol buffer response from the server. If the message fails to parse, generates + * an error and closes the stream. + * + * @param protoClass A protocol buffer message class object, that responds to parseFromData:error:. + * @param data The bytes in the response as returned from GRPC. + * @return An instance of the protocol buffer message, parsed from the data if parsing was + * successful, or nil otherwise. + */ +- (nullable id)parseProto:(Class)protoClass data:(NSData *)data error:(NSError **)error { + NSError *parseError; + id parsed = [protoClass parseFromData:data error:&parseError]; + if (parsed) { + *error = nil; + return parsed; + } else { + NSDictionary *info = @{ + NSLocalizedDescriptionKey : @"Unable to parse response from the server", + NSUnderlyingErrorKey : parseError, + @"Expected class" : protoClass, + @"Received value" : data, + }; + *error = [NSError errorWithDomain:FIRFirestoreErrorDomain + code:FIRFirestoreErrorCodeInternal + userInfo:info]; + return nil; + } +} + +/** + * Writes a request proto into the stream. + */ +- (void)writeRequest:(GPBMessage *)request { + NSData *data = [request data]; + + FSTBufferedWriter *requestsWriter = self.requestsWriter; + @synchronized(requestsWriter) { + [requestsWriter writeValue:data]; + } +} + +#pragma mark Template methods for subclasses + +/** + * Called by the stream after the stream has been successfully connected, authenticated, and is now + * ready to accept messages. + * + * Subclasses should relay to their stream-specific delegate. Calling [super handleStreamOpen] is + * not required. + */ +- (void)handleStreamOpen { +} + +/** + * Called by the stream for each incoming protocol message coming from the server. + * + * Subclasses should implement this to deserialize the value and relay to their stream-specific + * delegate, if appropriate. Calling [super handleStreamMessage] is not required. + */ +- (void)handleStreamMessage:(id)value { +} + +/** + * Called by the stream when the underlying RPC has been closed for whatever reason. + * + * Subclasses should first call [super handleStreamClose:] and then call to their + * stream-specific delegate. + */ +- (void)handleStreamClose:(NSError *_Nullable)error { + FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error); + FSTAssert([self isStarted], @"Can't handle server close in non-started state."); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + self.messageReceived = NO; + self.rpc = nil; + self.requestsWriter = nil; + + // In theory the stream could close cleanly, however, in our current model we never expect this + // to happen because if we stop a stream ourselves, this callback will never be called. To + // prevent cases where we retry without a backoff accidentally, we set the stream to error + // in all cases. + self.state = FSTStreamStateError; + + if (error.code == FIRFirestoreErrorCodeResourceExhausted) { + FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class], + (__bridge void *)self); + [self.backoff resetToMax]; + } +} + +#pragma mark GRXWriteable implementation +// The GRXWriteable implementation defines the receive side of the RPC stream. + +/** + * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately + * redispatch back onto our own worker queue. + */ +- (void)writeValue:(id)value __used { + // TODO(mcg): remove the double-dispatch once GRPCCall at head is released. + // Once released we can set the responseDispatchQueue property on the GRPCCall and then this + // method can call handleStreamMessage directly. + FSTWeakify(self); + [self.workerDispatchQueue dispatchAsync:^{ + FSTStrongify(self); + if (!self || self.state == FSTStreamStateStopped) { + return; + } + if (!self.messageReceived) { + self.messageReceived = YES; + if ([FIRFirestore isLoggingEnabled]) { + FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]), + (__bridge void *)self, + [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]); + } + } + NSError *error; + id proto = [self parseProto:self.responseMessageClass data:value error:&error]; + if (proto) { + [self handleStreamMessage:proto]; + } else { + [_rpc finishWithError:error]; + } + }]; +} + +/** + * Called by GRPC when it closed the stream with an error representing the final state of the + * stream. + * + * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to + * directly inform stream-specific logic, or call stop to tear down the stream. + */ +- (void)writesFinishedWithError:(NSError *_Nullable)error __used { + error = [FSTDatastore firestoreErrorForError:error]; + FSTWeakify(self); + [self.workerDispatchQueue dispatchAsync:^{ + FSTStrongify(self); + if (!self || self.state == FSTStreamStateStopped) { + return; + } + [self handleStreamClose:error]; + }]; +} + +@end + +#pragma mark - FSTWatchStream + +@implementation FSTWatchStream + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass + delegate:(id<FSTWatchStreamDelegate>)delegate { + self = [super initWithDatabase:database + workerDispatchQueue:workerDispatchQueue + credentials:credentials + responseMessageClass:responseMessageClass]; + if (self) { + _delegate = delegate; + } + return self; +} + +- (void)stop { + // Clear the delegate to avoid any possible bleed through of events from GRPC. + self.delegate = nil; + + [super stop]; +} + +- (void)watchQuery:(FSTQueryData *)query { + @throw FSTAbstractMethodException(); // NOLINT +} + +- (void)unwatchTargetID:(FSTTargetID)targetID { + @throw FSTAbstractMethodException(); // NOLINT +} + +- (void)handleStreamOpen { + [self.delegate watchStreamDidOpen]; +} + +- (void)handleStreamClose:(NSError *_Nullable)error { + [super handleStreamClose:error]; + [self.delegate watchStreamDidClose:error]; +} + +@end + +#pragma mark - FSTBetaWatchStream + +@implementation FSTBetaWatchStream { + FSTSerializerBeta *_serializer; +} + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + serializer:(FSTSerializerBeta *)serializer + delegate:(id<FSTWatchStreamDelegate>)delegate { + self = [super initWithDatabase:database + workerDispatchQueue:workerDispatchQueue + credentials:credentials + responseMessageClass:[GCFSListenResponse class] + delegate:delegate]; + if (self) { + _serializer = serializer; + } + return self; +} + +- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter { + return [[GRPCCall alloc] initWithHost:self.databaseInfo.host + path:@"/google.firestore.v1beta1.Firestore/Listen" + requestsWriter:requestsWriter]; +} + +- (void)watchQuery:(FSTQueryData *)query { + FSTAssert([self isOpen], @"Not yet open"); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + GCFSListenRequest *request = [GCFSListenRequest message]; + request.database = [_serializer encodedDatabaseID]; + request.addTarget = [_serializer encodedTarget:query]; + request.labels = [_serializer encodedListenRequestLabelsForQueryData:query]; + + FSTLog(@"FSTWatchStream %p watch: %@", (__bridge void *)self, request); + [self writeRequest:request]; +} + +- (void)unwatchTargetID:(FSTTargetID)targetID { + FSTAssert([self isOpen], @"Not yet open"); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + GCFSListenRequest *request = [GCFSListenRequest message]; + request.database = [_serializer encodedDatabaseID]; + request.removeTarget = targetID; + + FSTLog(@"FSTWatchStream %p unwatch: %@", (__bridge void *)self, request); + [self writeRequest:request]; +} + +/** + * Receives an inbound message from GRPC, deserializes, and then passes that on to the delegate's + * watchStreamDidChange:snapshotVersion: callback. + */ +- (void)handleStreamMessage:(GCFSListenResponse *)proto { + FSTLog(@"FSTWatchStream %p response: %@", (__bridge void *)self, proto); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + // A successful response means the stream is healthy. + [self.backoff reset]; + + FSTWatchChange *change = [_serializer decodedWatchChange:proto]; + FSTSnapshotVersion *snap = [_serializer versionFromListenResponse:proto]; + [self.delegate watchStreamDidChange:change snapshotVersion:snap]; +} + +@end + +#pragma mark - FSTWriteStream + +@implementation FSTWriteStream + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + responseMessageClass:(Class)responseMessageClass + delegate:(id<FSTWriteStreamDelegate>)delegate { + self = [super initWithDatabase:database + workerDispatchQueue:workerDispatchQueue + credentials:credentials + responseMessageClass:responseMessageClass]; + if (self) { + _delegate = delegate; + } + return self; +} + +- (void)start { + self.handshakeComplete = NO; + [super start]; +} + +- (void)stop { + // Clear the delegate to avoid any possible bleed through of events from GRPC. + self.delegate = nil; + + [super stop]; +} + +- (void)writeHandshake { + @throw FSTAbstractMethodException(); // NOLINT +} + +- (void)writeMutations:(NSArray<FSTMutation *> *)mutations { + @throw FSTAbstractMethodException(); // NOLINT +} + +- (void)handleStreamOpen { + [self.delegate writeStreamDidOpen]; +} + +- (void)handleStreamClose:(NSError *_Nullable)error { + [super handleStreamClose:error]; + + [self.delegate writeStreamDidClose:error]; +} + +@end + +#pragma mark - FSTBetaWriteStream + +@implementation FSTBetaWriteStream { + FSTSerializerBeta *_serializer; +} + +- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database + workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue + credentials:(id<FSTCredentialsProvider>)credentials + serializer:(FSTSerializerBeta *)serializer + delegate:(id<FSTWriteStreamDelegate>)delegate { + self = [super initWithDatabase:database + workerDispatchQueue:workerDispatchQueue + credentials:credentials + responseMessageClass:[GCFSWriteResponse class] + delegate:delegate]; + if (self) { + _serializer = serializer; + } + return self; +} + +- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter { + return [[GRPCCall alloc] initWithHost:self.databaseInfo.host + path:@"/google.firestore.v1beta1.Firestore/Write" + requestsWriter:requestsWriter]; +} + +- (void)writeHandshake { + // The initial request cannot contain mutations, but must contain a projectID. + FSTAssert([self isOpen], @"Not yet open"); + FSTAssert(!self.handshakeComplete, @"Handshake sent out of turn"); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + GCFSWriteRequest *request = [GCFSWriteRequest message]; + request.database = [_serializer encodedDatabaseID]; + // TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the + // handshake, ignoring any stream token we might have. + + FSTLog(@"FSTWriteStream %p initial request: %@", (__bridge void *)self, request); + [self writeRequest:request]; +} + +- (void)writeMutations:(NSArray<FSTMutation *> *)mutations { + FSTAssert([self isOpen], @"Not yet open"); + FSTAssert(self.handshakeComplete, @"Mutations sent out of turn"); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + NSMutableArray<GCFSWrite *> *protos = [NSMutableArray arrayWithCapacity:mutations.count]; + for (FSTMutation *mutation in mutations) { + [protos addObject:[_serializer encodedMutation:mutation]]; + }; + + GCFSWriteRequest *request = [GCFSWriteRequest message]; + request.writesArray = protos; + request.streamToken = self.lastStreamToken; + + FSTLog(@"FSTWriteStream %p mutation request: %@", (__bridge void *)self, request); + [self writeRequest:request]; +} + +/** + * Implements GRXWriteable to receive an inbound message from GRPC, deserialize, and then pass + * that on to the mutationResultsHandler. + */ +- (void)handleStreamMessage:(GCFSWriteResponse *)response { + FSTLog(@"FSTWriteStream %p response: %@", (__bridge void *)self, response); + [self.workerDispatchQueue verifyIsCurrentQueue]; + + // A successful response means the stream is healthy. + [self.backoff reset]; + + // Always capture the last stream token. + self.lastStreamToken = response.streamToken; + + if (!self.handshakeComplete) { + // The first response is the handshake response + self.handshakeComplete = YES; + + [self.delegate writeStreamDidCompleteHandshake]; + } else { + FSTSnapshotVersion *commitVersion = [_serializer decodedVersion:response.commitTime]; + NSMutableArray<GCFSWriteResult *> *protos = response.writeResultsArray; + NSMutableArray<FSTMutationResult *> *results = [NSMutableArray arrayWithCapacity:protos.count]; + for (GCFSWriteResult *proto in protos) { + [results addObject:[_serializer decodedMutationResult:proto]]; + }; + + [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results]; + } +} + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTExistenceFilter.h b/Firestore/Source/Remote/FSTExistenceFilter.h new file mode 100644 index 0000000..df95950 --- /dev/null +++ b/Firestore/Source/Remote/FSTExistenceFilter.h @@ -0,0 +1,31 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import <Foundation/Foundation.h> + +NS_ASSUME_NONNULL_BEGIN + +@interface FSTExistenceFilter : NSObject + ++ (instancetype)filterWithCount:(int32_t)count; + +- (instancetype)init __attribute__((unavailable("Use a static constructor"))); + +@property(nonatomic, assign, readonly) int32_t count; + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTExistenceFilter.m b/Firestore/Source/Remote/FSTExistenceFilter.m new file mode 100644 index 0000000..7c0ded2 --- /dev/null +++ b/Firestore/Source/Remote/FSTExistenceFilter.m @@ -0,0 +1,53 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "FSTExistenceFilter.h" + +@interface FSTExistenceFilter () + +- (instancetype)initWithCount:(int32_t)count NS_DESIGNATED_INITIALIZER; + +@end + +@implementation FSTExistenceFilter + ++ (instancetype)filterWithCount:(int32_t)count { + return [[FSTExistenceFilter alloc] initWithCount:count]; +} + +- (instancetype)initWithCount:(int32_t)count { + if (self = [super init]) { + _count = count; + } + return self; +} + +- (BOOL)isEqual:(id)other { + if (other == self) { + return YES; + } + if (![other isMemberOfClass:[FSTExistenceFilter class]]) { + return NO; + } + + return _count == ((FSTExistenceFilter *)other).count; +} + +- (NSUInteger)hash { + return _count; +} + +@end diff --git a/Firestore/Source/Remote/FSTExponentialBackoff.h b/Firestore/Source/Remote/FSTExponentialBackoff.h new file mode 100644 index 0000000..0bee2bd --- /dev/null +++ b/Firestore/Source/Remote/FSTExponentialBackoff.h @@ -0,0 +1,79 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import <Foundation/Foundation.h> + +@class FSTDispatchQueue; + +NS_ASSUME_NONNULL_BEGIN + +/** + * Helper to implement exponential backoff. + * + * In general, call -reset after each successful round-trip. Call -backoffAndRunBlock before + * retrying after an error. Each backoffAndRunBlock will increase the delay between retries. + */ +@interface FSTExponentialBackoff : NSObject + +/** + * Creates and returns a helper for running delayed tasks following an exponential backoff curve + * between attempts. + * + * Each delay is made up of a "base" delay which follows the exponential backoff curve, and a + * +/- 50% "jitter" that is calculated and added to the base delay. This prevents clients from + * accidentally synchronizing their delays causing spikes of load to the backend. + * + * @param dispatchQueue The dispatch queue to run tasks on. + * @param initialDelay The initial delay (used as the base delay on the first retry attempt). + * Note that jitter will still be applied, so the actual delay could be as little as + * 0.5*initialDelay. + * @param backoffFactor The multiplier to use to determine the extended base delay after each + * attempt. + * @param maxDelay The maximum base delay after which no further backoff is performed. Note that + * jitter will still be applied, so the actual delay could be as much as 1.5*maxDelay. + */ ++ (instancetype)exponentialBackoffWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue + initialDelay:(NSTimeInterval)initialDelay + backoffFactor:(double)backoffFactor + maxDelay:(NSTimeInterval)maxDelay; + +- (instancetype)init + __attribute__((unavailable("Use exponentialBackoffWithDispatchQueue constructor method."))); + +/** + * Resets the backoff delay. + * + * The very next backoffAndRunBlock: will have no delay. If it is called again (i.e. due to an + * error), initialDelay (plus jitter) will be used, and subsequent ones will increase according + * to the backoffFactor. + */ +- (void)reset; + +/** + * Resets the backoff to the maximum delay (e.g. for use after a RESOURCE_EXHAUSTED error). + */ +- (void)resetToMax; + +/** + * Waits for currentDelay seconds, increases the delay and runs the specified block. + * + * @param block The block to run. + */ +- (void)backoffAndRunBlock:(void (^)())block; + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTExponentialBackoff.m b/Firestore/Source/Remote/FSTExponentialBackoff.m new file mode 100644 index 0000000..ec21282 --- /dev/null +++ b/Firestore/Source/Remote/FSTExponentialBackoff.m @@ -0,0 +1,97 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "FSTExponentialBackoff.h" + +#import "FSTDispatchQueue.h" +#import "FSTLogger.h" +#import "FSTUtil.h" + +@interface FSTExponentialBackoff () +- (instancetype)initWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue + initialDelay:(NSTimeInterval)initialDelay + backoffFactor:(double)backoffFactor + maxDelay:(NSTimeInterval)maxDelay NS_DESIGNATED_INITIALIZER; + +@property(nonatomic, strong) FSTDispatchQueue *dispatchQueue; +@property(nonatomic) double backoffFactor; +@property(nonatomic) NSTimeInterval initialDelay; +@property(nonatomic) NSTimeInterval maxDelay; +@property(nonatomic) NSTimeInterval currentBase; +@end + +@implementation FSTExponentialBackoff + +- (instancetype)initWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue + initialDelay:(NSTimeInterval)initialDelay + backoffFactor:(double)backoffFactor + maxDelay:(NSTimeInterval)maxDelay { + if (self = [super init]) { + _dispatchQueue = dispatchQueue; + _initialDelay = initialDelay; + _backoffFactor = backoffFactor; + _maxDelay = maxDelay; + + [self reset]; + } + return self; +} + ++ (instancetype)exponentialBackoffWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue + initialDelay:(NSTimeInterval)initialDelay + backoffFactor:(double)backoffFactor + maxDelay:(NSTimeInterval)maxDelay { + return [[FSTExponentialBackoff alloc] initWithDispatchQueue:dispatchQueue + initialDelay:initialDelay + backoffFactor:backoffFactor + maxDelay:maxDelay]; +} + +- (void)reset { + _currentBase = 0; +} + +- (void)resetToMax { + _currentBase = _maxDelay; +} + +- (void)backoffAndRunBlock:(void (^)())block { + // First schedule the block using the current base (which may be 0 and should be honored as such). + NSTimeInterval delayWithJitter = _currentBase + [self jitterDelay]; + if (_currentBase > 0) { + FSTLog(@"Backing off for %.2f seconds (base delay: %.2f seconds)", delayWithJitter, + _currentBase); + } + dispatch_time_t delay = + dispatch_time(DISPATCH_TIME_NOW, (int64_t)(delayWithJitter * NSEC_PER_SEC)); + dispatch_after(delay, self.dispatchQueue.queue, block); + + // Apply backoff factor to determine next delay and ensure it is within bounds. + _currentBase *= _backoffFactor; + if (_currentBase < _initialDelay) { + _currentBase = _initialDelay; + } + if (_currentBase > _maxDelay) { + _currentBase = _maxDelay; + } +} + +/** Returns a random value in the range [-currentBase/2, currentBase/2] */ +- (NSTimeInterval)jitterDelay { + return ([FSTUtil randomDouble] - 0.5) * _currentBase; +} + +@end diff --git a/Firestore/Source/Remote/FSTRemoteEvent.h b/Firestore/Source/Remote/FSTRemoteEvent.h new file mode 100644 index 0000000..939a027 --- /dev/null +++ b/Firestore/Source/Remote/FSTRemoteEvent.h @@ -0,0 +1,213 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import <Foundation/Foundation.h> + +#import "FSTDocumentDictionary.h" +#import "FSTDocumentKeySet.h" +#import "FSTTypes.h" + +@class FSTDocument; +@class FSTDocumentKey; +@class FSTExistenceFilter; +@class FSTMaybeDocument; +@class FSTSnapshotVersion; +@class FSTWatchChange; +@class FSTQueryData; + +NS_ASSUME_NONNULL_BEGIN + +#pragma mark - FSTTargetMapping + +/** + * TargetMapping represents a change to the documents in a query from the server. This can either + * be an incremental Update or a full Reset. + * + * <p>This is an empty abstract class so that all the different kinds of changes can have a common + * base class. + */ +@interface FSTTargetMapping : NSObject +@end + +#pragma mark - FSTResetMapping + +/** The new set of documents to replace the current documents for a target. */ +@interface FSTResetMapping : FSTTargetMapping + +/** + * Creates a new mapping with the keys for the given documents added. This is intended primarily + * for testing. + */ ++ (FSTResetMapping *)mappingWithDocuments:(NSArray<FSTDocument *> *)documents; + +/** The new set of documents for the target. */ +@property(nonatomic, strong, readonly) FSTDocumentKeySet *documents; +@end + +#pragma mark - FSTUpdateMapping + +/** + * A target should update its set of documents with the given added/removed set of documents. + */ +@interface FSTUpdateMapping : FSTTargetMapping + +/** + * Creates a new mapping with the keys for the given documents added. This is intended primarily + * for testing. + */ ++ (FSTUpdateMapping *)mappingWithAddedDocuments:(NSArray<FSTDocument *> *)added + removedDocuments:(NSArray<FSTDocument *> *)removed; + +- (FSTDocumentKeySet *)applyTo:(FSTDocumentKeySet *)keys; + +/** The documents added to the target. */ +@property(nonatomic, strong, readonly) FSTDocumentKeySet *addedDocuments; +/** The documents removed from the target. */ +@property(nonatomic, strong, readonly) FSTDocumentKeySet *removedDocuments; +@end + +#pragma mark - FSTTargetChange + +/** + * Represents an update to the current status of a target, either explicitly having no new state, or + * the new value to set. Note "current" has special meaning in the RPC protocol that implies that a + * target is both up-to-date and consistent with the rest of the watch stream. + */ +typedef NS_ENUM(NSUInteger, FSTCurrentStatusUpdate) { + /** The current status is not affected and should not be modified */ + FSTCurrentStatusUpdateNone, + /** The target must be marked as no longer "current" */ + FSTCurrentStatusUpdateMarkNotCurrent, + /** The target must be marked as "current" */ + FSTCurrentStatusUpdateMarkCurrent, +}; + +/** + * A part of an FSTRemoteEvent specifying set of changes to a specific target. These changes track + * what documents are currently included in the target as well as the current snapshot version and + * resume token but the actual changes *to* documents are not part of the FSTTargetChange since + * documents may be part of multiple targets. + */ +@interface FSTTargetChange : NSObject + +/** + * Creates a new target change with the given documents. Instances of FSTDocument are considered + * added. Instance of FSTDeletedDocument are considered removed. This is intended primarily for + * testing. + */ ++ (instancetype)changeWithDocuments:(NSArray<FSTMaybeDocument *> *)docs + currentStatusUpdate:(FSTCurrentStatusUpdate)currentStatusUpdate; + +/** + * The new "current" (synced) status of this target. Set to CurrentStatusUpdateNone if the status + * should not be updated. Note "current" has special meaning for in the RPC protocol that implies + * that a target is both up-to-date and consistent with the rest of the watch stream. + */ +@property(nonatomic, assign, readonly) FSTCurrentStatusUpdate currentStatusUpdate; + +/** A set of changes to documents in this target. */ +@property(nonatomic, strong, readonly) FSTTargetMapping *mapping; + +/** + * The snapshot version representing the last state at which this target received a consistent + * snapshot from the backend. + */ +@property(nonatomic, strong, readonly) FSTSnapshotVersion *snapshotVersion; + +/** + * An opaque, server-assigned token that allows watching a query to be resumed after disconnecting + * without retransmitting all the data that matches the query. The resume token essentially + * identifies a point in time from which the server should resume sending results. + */ +@property(nonatomic, strong, readonly) NSData *resumeToken; + +@end + +#pragma mark - FSTRemoteEvent + +/** + * An event from the RemoteStore. It is split into targetChanges (changes to the state or the set + * of documents in our watched targets) and documentUpdates (changes to the actual documents). + */ +@interface FSTRemoteEvent : NSObject + ++ (instancetype) +eventWithSnapshotVersion:(FSTSnapshotVersion *)snapshotVersion + targetChanges:(NSMutableDictionary<NSNumber *, FSTTargetChange *> *)targetChanges + documentUpdates: + (NSMutableDictionary<FSTDocumentKey *, FSTMaybeDocument *> *)documentUpdates; + +/** The snapshot version this event brings us up to. */ +@property(nonatomic, strong, readonly) FSTSnapshotVersion *snapshotVersion; + +/** A map from target to changes to the target. See TargetChange. */ +@property(nonatomic, strong, readonly) + NSDictionary<FSTBoxedTargetID *, FSTTargetChange *> *targetChanges; + +/** + * A set of which documents have changed or been deleted, along with the doc's new values + * (if not deleted). + */ +@property(nonatomic, strong, readonly) + NSDictionary<FSTDocumentKey *, FSTMaybeDocument *> *documentUpdates; + +/** Adds a document update to this remote event */ +- (void)addDocumentUpdate:(FSTMaybeDocument *)document; + +/** Handles an existence filter mismatch */ +- (void)handleExistenceFilterMismatchForTargetID:(FSTBoxedTargetID *)targetID; + +@end + +#pragma mark - FSTWatchChangeAggregator + +/** + * A helper class to accumulate watch changes into a FSTRemoteEvent and other target + * information. + */ +@interface FSTWatchChangeAggregator : NSObject + +- (instancetype) +initWithSnapshotVersion:(FSTSnapshotVersion *)snapshotVersion + listenTargets:(NSDictionary<FSTBoxedTargetID *, FSTQueryData *> *)listenTargets + pendingTargetResponses:(NSDictionary<FSTBoxedTargetID *, NSNumber *> *)pendingTargetResponses + NS_DESIGNATED_INITIALIZER; + +- (instancetype)init NS_UNAVAILABLE; + +/** The number of pending responses that are being waited on from watch */ +@property(nonatomic, strong, readonly) + NSMutableDictionary<FSTBoxedTargetID *, NSNumber *> *pendingTargetResponses; + +/** Aggregates a watch change into the current state */ +- (void)addWatchChange:(FSTWatchChange *)watchChange; + +/** Aggregates all provided watch changes to the current state in order */ +- (void)addWatchChanges:(NSArray<FSTWatchChange *> *)watchChanges; + +/** + * Converts the current state into a remote event with the snapshot version taken from the + * initializer. + */ +- (FSTRemoteEvent *)remoteEvent; + +/** The existence filters - if any - for the given target IDs. */ +@property(nonatomic, strong, readonly) + NSDictionary<FSTBoxedTargetID *, FSTExistenceFilter *> *existenceFilters; + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTRemoteEvent.m b/Firestore/Source/Remote/FSTRemoteEvent.m new file mode 100644 index 0000000..5c75998 --- /dev/null +++ b/Firestore/Source/Remote/FSTRemoteEvent.m @@ -0,0 +1,516 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "FSTRemoteEvent.h" + +#import "FSTAssert.h" +#import "FSTClasses.h" +#import "FSTDocument.h" +#import "FSTDocumentKey.h" +#import "FSTLogger.h" +#import "FSTSnapshotVersion.h" +#import "FSTWatchChange.h" + +NS_ASSUME_NONNULL_BEGIN + +#pragma mark - FSTTargetMapping + +@interface FSTTargetMapping () + +/** Private mutator method to add a document key to the mapping */ +- (void)addDocumentKey:(FSTDocumentKey *)documentKey; + +/** Private mutator method to remove a document key from the mapping */ +- (void)removeDocumentKey:(FSTDocumentKey *)documentKey; + +@end + +@implementation FSTTargetMapping + +- (void)addDocumentKey:(FSTDocumentKey *)documentKey { + @throw FSTAbstractMethodException(); // NOLINT +} + +- (void)removeDocumentKey:(FSTDocumentKey *)documentKey { + @throw FSTAbstractMethodException(); // NOLINT +} + +@end + +#pragma mark - FSTResetMapping + +@interface FSTResetMapping () +@property(nonatomic, strong) FSTDocumentKeySet *documents; +@end + +@implementation FSTResetMapping + ++ (instancetype)mappingWithDocuments:(NSArray<FSTDocument *> *)documents { + FSTResetMapping *mapping = [[FSTResetMapping alloc] init]; + for (FSTDocument *doc in documents) { + mapping.documents = [mapping.documents setByAddingObject:doc.key]; + } + return mapping; +} + +- (instancetype)init { + self = [super init]; + if (self) { + _documents = [FSTDocumentKeySet keySet]; + } + return self; +} + +- (BOOL)isEqual:(id)other { + if (other == self) { + return YES; + } + if (![other isMemberOfClass:[FSTResetMapping class]]) { + return NO; + } + + FSTResetMapping *otherMapping = (FSTResetMapping *)other; + return [self.documents isEqual:otherMapping.documents]; +} + +- (NSUInteger)hash { + return self.documents.hash; +} + +- (void)addDocumentKey:(FSTDocumentKey *)documentKey { + self.documents = [self.documents setByAddingObject:documentKey]; +} + +- (void)removeDocumentKey:(FSTDocumentKey *)documentKey { + self.documents = [self.documents setByRemovingObject:documentKey]; +} + +@end + +#pragma mark - FSTUpdateMapping + +@interface FSTUpdateMapping () +@property(nonatomic, strong) FSTDocumentKeySet *addedDocuments; +@property(nonatomic, strong) FSTDocumentKeySet *removedDocuments; +@end + +@implementation FSTUpdateMapping + ++ (FSTUpdateMapping *)mappingWithAddedDocuments:(NSArray<FSTDocument *> *)added + removedDocuments:(NSArray<FSTDocument *> *)removed { + FSTUpdateMapping *mapping = [[FSTUpdateMapping alloc] init]; + for (FSTDocument *doc in added) { + mapping.addedDocuments = [mapping.addedDocuments setByAddingObject:doc.key]; + } + for (FSTDocument *doc in removed) { + mapping.removedDocuments = [mapping.removedDocuments setByAddingObject:doc.key]; + } + return mapping; +} + +- (instancetype)init { + self = [super init]; + if (self) { + _addedDocuments = [FSTDocumentKeySet keySet]; + _removedDocuments = [FSTDocumentKeySet keySet]; + } + return self; +} + +- (BOOL)isEqual:(id)other { + if (other == self) { + return YES; + } + if (![other isMemberOfClass:[FSTUpdateMapping class]]) { + return NO; + } + + FSTUpdateMapping *otherMapping = (FSTUpdateMapping *)other; + return [self.addedDocuments isEqual:otherMapping.addedDocuments] && + [self.removedDocuments isEqual:otherMapping.removedDocuments]; +} + +- (NSUInteger)hash { + return self.addedDocuments.hash * 31 + self.removedDocuments.hash; +} + +- (FSTDocumentKeySet *)applyTo:(FSTDocumentKeySet *)keys { + __block FSTDocumentKeySet *result = keys; + [self.addedDocuments enumerateObjectsUsingBlock:^(FSTDocumentKey *key, BOOL *stop) { + result = [result setByAddingObject:key]; + }]; + [self.removedDocuments enumerateObjectsUsingBlock:^(FSTDocumentKey *key, BOOL *stop) { + result = [result setByRemovingObject:key]; + }]; + return result; +} + +- (void)addDocumentKey:(FSTDocumentKey *)documentKey { + self.addedDocuments = [self.addedDocuments setByAddingObject:documentKey]; + self.removedDocuments = [self.removedDocuments setByRemovingObject:documentKey]; +} + +- (void)removeDocumentKey:(FSTDocumentKey *)documentKey { + self.addedDocuments = [self.addedDocuments setByRemovingObject:documentKey]; + self.removedDocuments = [self.removedDocuments setByAddingObject:documentKey]; +} + +@end + +#pragma mark - FSTTargetChange + +@interface FSTTargetChange () +@property(nonatomic, assign) FSTCurrentStatusUpdate currentStatusUpdate; +@property(nonatomic, strong, nullable) FSTTargetMapping *mapping; +@property(nonatomic, strong) FSTSnapshotVersion *snapshotVersion; +@property(nonatomic, strong) NSData *resumeToken; +@end + +@implementation FSTTargetChange + +- (instancetype)init { + if (self = [super init]) { + _currentStatusUpdate = FSTCurrentStatusUpdateNone; + _resumeToken = [NSData data]; + } + return self; +} + ++ (instancetype)changeWithDocuments:(NSArray<FSTMaybeDocument *> *)docs + currentStatusUpdate:(FSTCurrentStatusUpdate)currentStatusUpdate { + FSTUpdateMapping *mapping = [[FSTUpdateMapping alloc] init]; + for (FSTMaybeDocument *doc in docs) { + if ([doc isKindOfClass:[FSTDeletedDocument class]]) { + mapping.removedDocuments = [mapping.removedDocuments setByAddingObject:doc.key]; + } else { + mapping.addedDocuments = [mapping.addedDocuments setByAddingObject:doc.key]; + } + } + FSTTargetChange *change = [[FSTTargetChange alloc] init]; + change.mapping = mapping; + change.currentStatusUpdate = currentStatusUpdate; + return change; +} + ++ (instancetype)changeWithMapping:(FSTTargetMapping *)mapping + snapshotVersion:(FSTSnapshotVersion *)snapshotVersion + currentStatusUpdate:(FSTCurrentStatusUpdate)currentStatusUpdate { + FSTTargetChange *change = [[FSTTargetChange alloc] init]; + change.mapping = mapping; + change.snapshotVersion = snapshotVersion; + change.currentStatusUpdate = currentStatusUpdate; + return change; +} + +- (FSTTargetMapping *)mapping { + if (!_mapping) { + // Create an FSTUpdateMapping by default, since resets are always explicit + _mapping = [[FSTUpdateMapping alloc] init]; + } + return _mapping; +} + +/** + * Sets the resume token but only when it has a new value. Empty resumeTokens are + * discarded. + */ +- (void)setResumeToken:(NSData *)resumeToken { + if (resumeToken.length > 0) { + _resumeToken = resumeToken; + } +} + +@end + +#pragma mark - FSTRemoteEvent + +@interface FSTRemoteEvent () { + NSMutableDictionary<FSTDocumentKey *, FSTMaybeDocument *> *_documentUpdates; + NSMutableDictionary<FSTBoxedTargetID *, FSTTargetChange *> *_targetChanges; +} + +- (instancetype) +initWithSnapshotVersion:(FSTSnapshotVersion *)snapshotVersion + targetChanges:(NSMutableDictionary<FSTBoxedTargetID *, FSTTargetChange *> *)targetChanges + documentUpdates: + (NSMutableDictionary<FSTDocumentKey *, FSTMaybeDocument *> *)documentUpdates; + +@property(nonatomic, strong) FSTSnapshotVersion *snapshotVersion; + +@end + +@implementation FSTRemoteEvent + ++ (instancetype) +eventWithSnapshotVersion:(FSTSnapshotVersion *)snapshotVersion + targetChanges:(NSMutableDictionary<NSNumber *, FSTTargetChange *> *)targetChanges + documentUpdates: + (NSMutableDictionary<FSTDocumentKey *, FSTMaybeDocument *> *)documentUpdates { + return [[FSTRemoteEvent alloc] initWithSnapshotVersion:snapshotVersion + targetChanges:targetChanges + documentUpdates:documentUpdates]; +} + +- (instancetype) +initWithSnapshotVersion:(FSTSnapshotVersion *)snapshotVersion + targetChanges:(NSMutableDictionary<NSNumber *, FSTTargetChange *> *)targetChanges + documentUpdates: + (NSMutableDictionary<FSTDocumentKey *, FSTMaybeDocument *> *)documentUpdates { + self = [super init]; + if (self) { + _snapshotVersion = snapshotVersion; + _targetChanges = targetChanges; + _documentUpdates = documentUpdates; + } + return self; +} + +/** Adds a document update to this remote event */ +- (void)addDocumentUpdate:(FSTMaybeDocument *)document { + _documentUpdates[document.key] = document; +} + +/** Handles an existence filter mismatch */ +- (void)handleExistenceFilterMismatchForTargetID:(FSTBoxedTargetID *)targetID { + // An existence filter mismatch will reset the query and we need to reset the mapping to contain + // no documents and an empty resume token. + // + // Note: + // * The reset mapping is empty, specifically forcing the consumer of the change to + // forget all keys for this targetID; + // * The resume snapshot for this target must be reset + // * The target must be unacked because unwatching and rewatching introduces a race for + // changes. + // + // TODO(dimond): keep track of reset targets not to raise. + FSTTargetChange *targetChange = + [FSTTargetChange changeWithMapping:[[FSTResetMapping alloc] init] + snapshotVersion:[FSTSnapshotVersion noVersion] + currentStatusUpdate:FSTCurrentStatusUpdateMarkNotCurrent]; + _targetChanges[targetID] = targetChange; +} + +@end + +#pragma mark - FSTWatchChangeAggregator + +@interface FSTWatchChangeAggregator () + +/** The snapshot version for every target change this creates. */ +@property(nonatomic, strong, readonly) FSTSnapshotVersion *snapshotVersion; + +/** Keeps track of the current target mappings */ +@property(nonatomic, strong, readonly) + NSMutableDictionary<FSTBoxedTargetID *, FSTTargetChange *> *targetChanges; + +/** Keeps track of document to update */ +@property(nonatomic, strong, readonly) + NSMutableDictionary<FSTDocumentKey *, FSTMaybeDocument *> *documentUpdates; + +/** The set of open listens on the client */ +@property(nonatomic, strong, readonly) + NSDictionary<FSTBoxedTargetID *, FSTQueryData *> *listenTargets; + +/** Whether this aggregator was frozen and can no longer be modified */ +@property(nonatomic, assign) BOOL frozen; + +@end + +@implementation FSTWatchChangeAggregator { + NSMutableDictionary<FSTBoxedTargetID *, FSTExistenceFilter *> *_existenceFilters; +} + +- (instancetype) +initWithSnapshotVersion:(FSTSnapshotVersion *)snapshotVersion + listenTargets:(NSDictionary<FSTBoxedTargetID *, FSTQueryData *> *)listenTargets + pendingTargetResponses:(NSDictionary<FSTBoxedTargetID *, NSNumber *> *)pendingTargetResponses { + self = [super init]; + if (self) { + _snapshotVersion = snapshotVersion; + + _frozen = NO; + _targetChanges = [NSMutableDictionary dictionary]; + _listenTargets = listenTargets; + _pendingTargetResponses = [NSMutableDictionary dictionaryWithDictionary:pendingTargetResponses]; + + _existenceFilters = [NSMutableDictionary dictionary]; + _documentUpdates = [NSMutableDictionary dictionary]; + } + return self; +} + +- (FSTTargetChange *)targetChangeForTargetID:(FSTBoxedTargetID *)targetID { + FSTTargetChange *change = self.targetChanges[targetID]; + if (!change) { + change = [[FSTTargetChange alloc] init]; + change.snapshotVersion = self.snapshotVersion; + self.targetChanges[targetID] = change; + } + return change; +} + +- (void)addWatchChanges:(NSArray<FSTWatchChange *> *)watchChanges { + FSTAssert(!self.frozen, @"Trying to modify frozen FSTWatchChangeAggregator"); + for (FSTWatchChange *watchChange in watchChanges) { + [self addWatchChange:watchChange]; + } +} + +- (void)addWatchChange:(FSTWatchChange *)watchChange { + FSTAssert(!self.frozen, @"Trying to modify frozen FSTWatchChangeAggregator"); + if ([watchChange isKindOfClass:[FSTDocumentWatchChange class]]) { + [self addDocumentChange:(FSTDocumentWatchChange *)watchChange]; + } else if ([watchChange isKindOfClass:[FSTWatchTargetChange class]]) { + [self addTargetChange:(FSTWatchTargetChange *)watchChange]; + } else if ([watchChange isKindOfClass:[FSTExistenceFilterWatchChange class]]) { + [self addExistenceFilterChange:(FSTExistenceFilterWatchChange *)watchChange]; + } else { + FSTFail(@"Unknown watch change: %@", watchChange); + } +} + +- (void)addDocumentChange:(FSTDocumentWatchChange *)docChange { + BOOL relevant = NO; + + for (FSTBoxedTargetID *targetID in docChange.updatedTargetIDs) { + if ([self isActiveTarget:targetID]) { + FSTTargetChange *change = [self targetChangeForTargetID:targetID]; + [change.mapping addDocumentKey:docChange.documentKey]; + relevant = YES; + } + } + + for (FSTBoxedTargetID *targetID in docChange.removedTargetIDs) { + if ([self isActiveTarget:targetID]) { + FSTTargetChange *change = [self targetChangeForTargetID:targetID]; + [change.mapping removeDocumentKey:docChange.documentKey]; + relevant = YES; + } + } + + // Only update the document if there is a new document to replace, this might be just a target + // update instead. + if (docChange.document && relevant) { + self.documentUpdates[docChange.documentKey] = docChange.document; + } +} + +- (void)addTargetChange:(FSTWatchTargetChange *)targetChange { + for (FSTBoxedTargetID *targetID in targetChange.targetIDs) { + FSTTargetChange *change = [self targetChangeForTargetID:targetID]; + switch (targetChange.state) { + case FSTWatchTargetChangeStateNoChange: + if ([self isActiveTarget:targetID]) { + // Creating the change above satisfies the semantics of no-change. + change.resumeToken = targetChange.resumeToken; + } + break; + case FSTWatchTargetChangeStateAdded: + [self recordResponseForTargetID:targetID]; + if (![self.pendingTargetResponses objectForKey:targetID]) { + // We have a freshly added target, so we need to reset any state that we had previously + // This can happen e.g. when remove and add back a target for existence filter + // mismatches. + change.mapping = nil; + change.currentStatusUpdate = FSTCurrentStatusUpdateNone; + [_existenceFilters removeObjectForKey:targetID]; + } + change.resumeToken = targetChange.resumeToken; + break; + case FSTWatchTargetChangeStateRemoved: + // We need to keep track of removed targets to we can post-filter and remove any target + // changes. + [self recordResponseForTargetID:targetID]; + FSTAssert(!targetChange.cause, @"WatchChangeAggregator does not handle errored targets."); + break; + case FSTWatchTargetChangeStateCurrent: + if ([self isActiveTarget:targetID]) { + change.currentStatusUpdate = FSTCurrentStatusUpdateMarkCurrent; + change.resumeToken = targetChange.resumeToken; + } + break; + case FSTWatchTargetChangeStateReset: + if ([self isActiveTarget:targetID]) { + // Overwrite any existing target mapping with a reset mapping. Every subsequent update + // will modify the reset mapping, not an update mapping. + change.mapping = [[FSTResetMapping alloc] init]; + change.resumeToken = targetChange.resumeToken; + } + break; + default: + FSTWarn(@"Unknown target watch change type: %ld", (long)targetChange.state); + } + } +} + +/** + * Records that we got a watch target add/remove by decrementing the number of pending target + * responses that we have. + */ +- (void)recordResponseForTargetID:(FSTBoxedTargetID *)targetID { + NSNumber *count = [self.pendingTargetResponses objectForKey:targetID]; + int newCount = count ? [count intValue] - 1 : -1; + if (newCount == 0) { + [self.pendingTargetResponses removeObjectForKey:targetID]; + } else { + [self.pendingTargetResponses setObject:[NSNumber numberWithInt:newCount] forKey:targetID]; + } +} + +/** + * Returns true if the given targetId is active. Active targets are those for which there are no + * pending requests to add a listen and are in the current list of targets the client cares about. + * + * Clients can repeatedly listen and stop listening to targets, so this check is useful in + * preventing in preventing race conditions for a target where events arrive but the server hasn't + * yet acknowledged the intended change in state. + */ +- (BOOL)isActiveTarget:(FSTBoxedTargetID *)targetID { + return [self.listenTargets objectForKey:targetID] && + ![self.pendingTargetResponses objectForKey:targetID]; +} + +- (void)addExistenceFilterChange:(FSTExistenceFilterWatchChange *)existenceFilterChange { + FSTBoxedTargetID *targetID = @(existenceFilterChange.targetID); + if ([self isActiveTarget:targetID]) { + _existenceFilters[targetID] = existenceFilterChange.filter; + } +} + +- (FSTRemoteEvent *)remoteEvent { + NSMutableDictionary<FSTBoxedTargetID *, FSTTargetChange *> *targetChanges = self.targetChanges; + + NSMutableArray *targetsToRemove = [NSMutableArray array]; + + // Apply any inactive targets. + for (FSTBoxedTargetID *targetID in [targetChanges keyEnumerator]) { + if (![self isActiveTarget:targetID]) { + [targetsToRemove addObject:targetID]; + } + } + + [targetChanges removeObjectsForKeys:targetsToRemove]; + + // Mark this aggregator as frozen so no further modifications are made. + self.frozen = YES; + return [FSTRemoteEvent eventWithSnapshotVersion:self.snapshotVersion + targetChanges:targetChanges + documentUpdates:self.documentUpdates]; +} + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTRemoteStore.h b/Firestore/Source/Remote/FSTRemoteStore.h new file mode 100644 index 0000000..94208e1 --- /dev/null +++ b/Firestore/Source/Remote/FSTRemoteStore.h @@ -0,0 +1,143 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import <Foundation/Foundation.h> + +#import "FSTDocumentVersionDictionary.h" +#import "FSTTypes.h" + +@class FSTDatabaseInfo; +@class FSTDatastore; +@class FSTDocumentKey; +@class FSTLocalStore; +@class FSTMutationBatch; +@class FSTMutationBatchResult; +@class FSTQuery; +@class FSTQueryData; +@class FSTRemoteEvent; +@class FSTTransaction; +@class FSTUser; + +NS_ASSUME_NONNULL_BEGIN + +#pragma mark - FSTRemoteSyncer + +/** + * A protocol that describes the actions the FSTRemoteStore needs to perform on a cooperating + * synchronization engine. + */ +@protocol FSTRemoteSyncer + +/** + * Applies one remote event to the sync engine, notifying any views of the changes, and releasing + * any pending mutation batches that would become visible because of the snapshot version the + * remote event contains. + */ +- (void)applyRemoteEvent:(FSTRemoteEvent *)remoteEvent; + +/** + * Rejects the listen for the given targetID. This can be triggered by the backend for any active + * target. + * + * @param targetID The targetID corresponding to a listen initiated via + * -listenToTargetWithQueryData: on FSTRemoteStore. + * @param error A description of the condition that has forced the rejection. Nearly always this + * will be an indication that the user is no longer authorized to see the data matching the + * target. + */ +- (void)rejectListenWithTargetID:(FSTBoxedTargetID *)targetID error:(NSError *)error; + +/** + * Applies the result of a successful write of a mutation batch to the sync engine, emitting + * snapshots in any views that the mutation applies to, and removing the batch from the mutation + * queue. + */ +- (void)applySuccessfulWriteWithResult:(FSTMutationBatchResult *)batchResult; + +/** + * Rejects the batch, removing the batch from the mutation queue, recomputing the local view of + * any documents affected by the batch and then, emitting snapshots with the reverted value. + */ +- (void)rejectFailedWriteWithBatchID:(FSTBatchID)batchID error:(NSError *)error; + +@end + +/** + * A protocol for the FSTRemoteStore online state delegate, called whenever the state of the + * online streams of the FSTRemoteStore changes. + * Note that this protocol only supports the watch stream for now. + */ +@protocol FSTOnlineStateDelegate <NSObject> + +/** Called whenever the online state of the watch stream changes */ +- (void)watchStreamDidChangeOnlineState:(FSTOnlineState)onlineState; + +@end + +#pragma mark - FSTRemoteStore + +/** + * FSTRemoteStore handles all interaction with the backend through a simple, clean interface. This + * class is not thread safe and should be only called from the worker dispatch queue. + */ +@interface FSTRemoteStore : NSObject + ++ (instancetype)remoteStoreWithLocalStore:(FSTLocalStore *)localStore + datastore:(FSTDatastore *)datastore; + +- (instancetype)init __attribute__((unavailable("Use static constructor method."))); + +@property(nonatomic, weak) id<FSTRemoteSyncer> syncEngine; + +@property(nonatomic, weak) id<FSTOnlineStateDelegate> onlineStateDelegate; + +/** Starts up the remote store, creating streams, restoring state from LocalStore, etc. */ +- (void)start; + +/** Shuts down the remote store, tearing down connections and otherwise cleaning up. */ +- (void)shutdown; + +/** + * Tells the FSTRemoteStore that the currently authenticated user has changed. + * + * In response the remote store tears down streams and clears up any tracked operations that should + * not persist across users. Restarts the streams if appropriate. + */ +- (void)userDidChange:(FSTUser *)user; + +/** Listens to the target identified by the given FSTQueryData. */ +- (void)listenToTargetWithQueryData:(FSTQueryData *)queryData; + +/** Stops listening to the target with the given target ID. */ +- (void)stopListeningToTargetID:(FSTTargetID)targetID; + +/** + * Tells the FSTRemoteStore that there are new mutations to process in the queue. This is typically + * called by FSTSyncEngine after it has sent mutations to FSTLocalStore. + * + * In response the remote store will pull mutations from the local store until the datastore + * instance reports that it cannot accept further in-progress writes. This mechanism serves to + * maintain a pipeline of in-flight requests between the FSTDatastore and the server that + * applies them. + */ +- (void)fillWritePipeline; + +/** Returns a new transaction backed by this remote store. */ +- (FSTTransaction *)transaction; + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTRemoteStore.m b/Firestore/Source/Remote/FSTRemoteStore.m new file mode 100644 index 0000000..cea2ce8 --- /dev/null +++ b/Firestore/Source/Remote/FSTRemoteStore.m @@ -0,0 +1,599 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "FSTRemoteStore.h" + +#import "FSTAssert.h" +#import "FSTDatastore.h" +#import "FSTDocument.h" +#import "FSTDocumentKey.h" +#import "FSTExistenceFilter.h" +#import "FSTLocalStore.h" +#import "FSTLogger.h" +#import "FSTMutation.h" +#import "FSTMutationBatch.h" +#import "FSTQuery.h" +#import "FSTQueryData.h" +#import "FSTRemoteEvent.h" +#import "FSTSnapshotVersion.h" +#import "FSTTransaction.h" +#import "FSTWatchChange.h" + +NS_ASSUME_NONNULL_BEGIN + +/** + * The maximum number of pending writes to allow. + * TODO(bjornick): Negotiate this value with the backend. + */ +static const NSUInteger kMaxPendingWrites = 10; + +#pragma mark - FSTRemoteStore + +@interface FSTRemoteStore () <FSTWatchStreamDelegate, FSTWriteStreamDelegate> + +- (instancetype)initWithLocalStore:(FSTLocalStore *)localStore + datastore:(FSTDatastore *)datastore NS_DESIGNATED_INITIALIZER; + +/** + * The local store, used to fill the write pipeline with outbound mutations and resolve existence + * filter mismatches. Immutable after initialization. + */ +@property(nonatomic, strong, readonly) FSTLocalStore *localStore; + +/** The client-side proxy for interacting with the backend. Immutable after initialization. */ +@property(nonatomic, strong, readonly) FSTDatastore *datastore; + +#pragma mark Watch Stream +@property(nonatomic, strong, nullable) FSTWatchStream *watchStream; + +/** + * A mapping of watched targets that the client cares about tracking and the + * user has explicitly called a 'listen' for this target. + * + * These targets may or may not have been sent to or acknowledged by the + * server. On re-establishing the listen stream, these targets should be sent + * to the server. The targets removed with unlistens are removed eagerly + * without waiting for confirmation from the listen stream. */ +@property(nonatomic, strong, readonly) + NSMutableDictionary<FSTBoxedTargetID *, FSTQueryData *> *listenTargets; + +/** + * A mapping of targetId to pending acks needed. + * + * If a targetId is present in this map, then we're waiting for watch to + * acknowledge a removal or addition of the target. If a target is not in this + * mapping, and it's in the listenTargets map, then we consider the target to + * be active. + * + * We increment the count here everytime we issue a request over the stream to + * watch or unwatch. We then decrement the count everytime we get a target + * added or target removed message from the server. Once the count is equal to + * 0 we know that the client and server are in the same state (once this state + * is reached the targetId is removed from the map to free the memory). + */ +@property(nonatomic, strong, readonly) + NSMutableDictionary<FSTBoxedTargetID *, NSNumber *> *pendingTargetResponses; + +@property(nonatomic, strong) NSMutableArray<FSTWatchChange *> *accumulatedChanges; +@property(nonatomic, assign) FSTBatchID lastBatchSeen; + +/** + * The online state of the watch stream. The state is set to healthy if and only if there are + * messages received by the backend. + */ +@property(nonatomic, assign) FSTOnlineState watchStreamOnlineState; + +#pragma mark Write Stream +@property(nonatomic, strong, nullable) FSTWriteStream *writeStream; + +/** + * The approximate time the StreamingWrite stream was opened. Used to estimate if stream was + * closed due to an auth expiration (a recoverable error) or some other more permanent error. + */ +@property(nonatomic, strong, nullable) NSDate *writeStreamOpenTime; + +/** + * A FIFO queue of in-flight writes. This is in-flight from the point of view of the caller of + * writeMutations, not from the point of view from the Datastore itself. In particular, these + * requests may not have been sent to the Datastore server if the write stream is not yet running. + */ +@property(nonatomic, strong, readonly) NSMutableArray<FSTMutationBatch *> *pendingWrites; +@end + +@implementation FSTRemoteStore + ++ (instancetype)remoteStoreWithLocalStore:(FSTLocalStore *)localStore + datastore:(FSTDatastore *)datastore { + return [[FSTRemoteStore alloc] initWithLocalStore:localStore datastore:datastore]; +} + +- (instancetype)initWithLocalStore:(FSTLocalStore *)localStore datastore:(FSTDatastore *)datastore { + if (self = [super init]) { + _localStore = localStore; + _datastore = datastore; + _listenTargets = [NSMutableDictionary dictionary]; + _pendingTargetResponses = [NSMutableDictionary dictionary]; + _accumulatedChanges = [NSMutableArray array]; + + _lastBatchSeen = kFSTBatchIDUnknown; + _watchStreamOnlineState = FSTOnlineStateUnknown; + _pendingWrites = [NSMutableArray array]; + } + return self; +} + +- (void)start { + [self setupStreams]; + + // Resume any writes + [self fillWritePipeline]; +} + +- (void)updateAndNotifyAboutOnlineState:(FSTOnlineState)watchStreamOnlineState { + BOOL didChange = (watchStreamOnlineState != self.watchStreamOnlineState); + self.watchStreamOnlineState = watchStreamOnlineState; + if (didChange) { + [self.onlineStateDelegate watchStreamDidChangeOnlineState:watchStreamOnlineState]; + } +} + +- (void)setupStreams { + self.watchStream = [self.datastore createWatchStreamWithDelegate:self]; + self.writeStream = [self.datastore createWriteStreamWithDelegate:self]; + + // Load any saved stream token from persistent storage + self.writeStream.lastStreamToken = [self.localStore lastStreamToken]; +} + +#pragma mark Shutdown + +- (void)shutdown { + FSTLog(@"FSTRemoteStore %p shutting down", (__bridge void *)self); + + self.watchStreamOnlineState = FSTOnlineStateUnknown; + [self cleanupWatchStreamState]; + [self.watchStream stop]; + [self.writeStream stop]; +} + +- (void)userDidChange:(FSTUser *)user { + FSTLog(@"FSTRemoteStore %p changing users: %@", (__bridge void *)self, user); + + // Clear pending writes because those are per-user. Watched targets persist across users so + // don't clear those. + _lastBatchSeen = kFSTBatchIDUnknown; + [self.pendingWrites removeAllObjects]; + + // Stop the streams. They promise not to call us back. + [self.watchStream stop]; + [self.writeStream stop]; + + [self cleanupWatchStreamState]; + + // Create new streams (but note they're not started yet). + [self setupStreams]; + + // If there are any watchedTargets properly handle the stream restart now that FSTRemoteStore + // is ready to handle them. + if ([self shouldStartWatchStream]) { + [self.watchStream start]; + } + + // Resume any writes + [self fillWritePipeline]; + + // User change moves us back to the unknown state because we might not + // want to re-open the stream + [self updateAndNotifyAboutOnlineState:FSTOnlineStateUnknown]; +} + +#pragma mark Watch Stream + +- (void)listenToTargetWithQueryData:(FSTQueryData *)queryData { + NSNumber *targetKey = @(queryData.targetID); + FSTAssert(!self.listenTargets[targetKey], @"listenToQuery called with duplicate target id: %@", + targetKey); + + self.listenTargets[targetKey] = queryData; + if ([self.watchStream isOpen]) { + [self sendWatchRequestWithQueryData:queryData]; + } else if (![self.watchStream isStarted]) { + [self.watchStream start]; + } +} + +- (void)sendWatchRequestWithQueryData:(FSTQueryData *)queryData { + [self recordPendingRequestForTargetID:@(queryData.targetID)]; + [self.watchStream watchQuery:queryData]; +} + +- (void)stopListeningToTargetID:(FSTTargetID)targetID { + FSTBoxedTargetID *targetKey = @(targetID); + FSTQueryData *queryData = self.listenTargets[targetKey]; + FSTAssert(queryData, @"unlistenToTarget: target not currently watched: %@", targetKey); + + [self.listenTargets removeObjectForKey:targetKey]; + if ([self.watchStream isOpen]) { + [self sendUnwatchRequestForTargetID:targetKey]; + } +} + +- (void)sendUnwatchRequestForTargetID:(FSTBoxedTargetID *)targetID { + [self recordPendingRequestForTargetID:targetID]; + [self.watchStream unwatchTargetID:[targetID intValue]]; +} + +- (void)recordPendingRequestForTargetID:(FSTBoxedTargetID *)targetID { + NSNumber *count = [self.pendingTargetResponses objectForKey:targetID]; + count = @([count intValue] + 1); + [self.pendingTargetResponses setObject:count forKey:targetID]; +} + +/** + * Returns whether the watch stream should be started because there are active targets trying to + * be listened to. + */ +- (BOOL)shouldStartWatchStream { + return self.listenTargets.count > 0; +} + +- (void)cleanupWatchStreamState { + // If the connection is closed then we'll never get a snapshot version for the accumulated + // changes and so we'll never be able to complete the batch. When we start up again the server + // is going to resend these changes anyway, so just toss the accumulated state. + [self.accumulatedChanges removeAllObjects]; + [self.pendingTargetResponses removeAllObjects]; +} + +- (void)watchStreamDidOpen { + // Restore any existing watches. + for (FSTQueryData *queryData in [self.listenTargets objectEnumerator]) { + [self sendWatchRequestWithQueryData:queryData]; + } +} + +- (void)watchStreamDidChange:(FSTWatchChange *)change + snapshotVersion:(FSTSnapshotVersion *)snapshotVersion { + // Mark the connection as healthy because we got a message from the server. + [self updateAndNotifyAboutOnlineState:FSTOnlineStateHealthy]; + + FSTWatchTargetChange *watchTargetChange = + [change isKindOfClass:[FSTWatchTargetChange class]] ? (FSTWatchTargetChange *)change : nil; + + if (watchTargetChange && watchTargetChange.state == FSTWatchTargetChangeStateRemoved && + watchTargetChange.cause) { + // There was an error on a target, don't wait for a consistent snapshot to raise events + [self processTargetErrorForWatchChange:(FSTWatchTargetChange *)change]; + } else { + // Accumulate watch changes but don't process them if there's no snapshotVersion or it's + // older than a previous snapshot we've processed (can happen after we resume a target + // using a resume token). + [self.accumulatedChanges addObject:change]; + FSTAssert(snapshotVersion, @"snapshotVersion must not be nil."); + if ([snapshotVersion isEqual:[FSTSnapshotVersion noVersion]] || + [snapshotVersion compare:[self.localStore lastRemoteSnapshotVersion]] == + NSOrderedAscending) { + return; + } + + // Create a batch, giving it the accumulatedChanges array. + NSArray<FSTWatchChange *> *changes = self.accumulatedChanges; + self.accumulatedChanges = [NSMutableArray array]; + + [self processBatchedWatchChanges:changes snapshotVersion:snapshotVersion]; + } +} + +- (void)watchStreamDidClose:(NSError *_Nullable)error { + [self cleanupWatchStreamState]; + + // If there was an error, retry the connection. + if ([self shouldStartWatchStream]) { + // If the connection fails before the stream has become healthy, consider the online state + // failed. Otherwise consider the online state unknown and the next connection attempt will + // resolve the online state. For example, if a healthy stream is closed due to an expired token + // we want to have one more try at reconnecting before we consider the connection unhealthy. + if (self.watchStreamOnlineState == FSTOnlineStateHealthy) { + [self updateAndNotifyAboutOnlineState:FSTOnlineStateUnknown]; + } else { + [self updateAndNotifyAboutOnlineState:FSTOnlineStateFailed]; + } + [self.watchStream start]; + } else { + // No need to restart watch stream because there are no active targets. The online state is set + // to unknown because there is no active attempt at establishing a connection. + [self updateAndNotifyAboutOnlineState:FSTOnlineStateUnknown]; + } +} + +/** + * Takes a batch of changes from the Datastore, repackages them as a RemoteEvent, and passes that + * on to the SyncEngine. + */ +- (void)processBatchedWatchChanges:(NSArray<FSTWatchChange *> *)changes + snapshotVersion:(FSTSnapshotVersion *)snapshotVersion { + FSTWatchChangeAggregator *aggregator = + [[FSTWatchChangeAggregator alloc] initWithSnapshotVersion:snapshotVersion + listenTargets:self.listenTargets + pendingTargetResponses:self.pendingTargetResponses]; + [aggregator addWatchChanges:changes]; + FSTRemoteEvent *remoteEvent = [aggregator remoteEvent]; + [self.pendingTargetResponses removeAllObjects]; + [self.pendingTargetResponses setDictionary:aggregator.pendingTargetResponses]; + + // Handle existence filters and existence filter mismatches + [aggregator.existenceFilters enumerateKeysAndObjectsUsingBlock:^(FSTBoxedTargetID *target, + FSTExistenceFilter *filter, + BOOL *stop) { + FSTTargetID targetID = target.intValue; + + FSTQueryData *queryData = self.listenTargets[target]; + FSTQuery *query = queryData.query; + if (!queryData) { + // A watched target might have been removed already. + return; + + } else if ([query isDocumentQuery]) { + if (filter.count == 0) { + // The existence filter told us the document does not exist. + // We need to deduce that this document does not exist and apply a deleted document to our + // updates. Without applying a deleted document there might be another query that will + // raise this document as part of a snapshot until it is resolved, essentially exposing + // inconsistency between queries + FSTDocumentKey *key = [FSTDocumentKey keyWithPath:query.path]; + FSTDeletedDocument *deletedDoc = + [FSTDeletedDocument documentWithKey:key version:snapshotVersion]; + [remoteEvent addDocumentUpdate:deletedDoc]; + } else { + FSTAssert(filter.count == 1, @"Single document existence filter with count: %" PRId32, + filter.count); + } + + } else { + // Not a document query. + FSTDocumentKeySet *trackedRemote = [self.localStore remoteDocumentKeysForTarget:targetID]; + FSTTargetMapping *mapping = remoteEvent.targetChanges[target].mapping; + if (mapping) { + if ([mapping isKindOfClass:[FSTUpdateMapping class]]) { + FSTUpdateMapping *update = (FSTUpdateMapping *)mapping; + trackedRemote = [update applyTo:trackedRemote]; + } else { + FSTAssert([mapping isKindOfClass:[FSTResetMapping class]], + @"Expected either reset or update mapping but got something else %@", mapping); + trackedRemote = ((FSTResetMapping *)mapping).documents; + } + } + + if (trackedRemote.count != (NSUInteger)filter.count) { + FSTLog(@"Existence filter mismatch, resetting mapping"); + + // Make sure the mismatch is exposed in the remote event + [remoteEvent handleExistenceFilterMismatchForTargetID:target]; + + // Clear the resume token for the query, since we're in a known mismatch state. + queryData = + [[FSTQueryData alloc] initWithQuery:query targetID:targetID purpose:queryData.purpose]; + self.listenTargets[target] = queryData; + + // Cause a hard reset by unwatching and rewatching immediately, but deliberately don't + // send a resume token so that we get a full update. + [self sendUnwatchRequestForTargetID:@(targetID)]; + + // Mark the query we send as being on behalf of an existence filter mismatch, but don't + // actually retain that in listenTargets. This ensures that we flag the first re-listen + // this way without impacting future listens of this target (that might happen e.g. on + // reconnect). + FSTQueryData *requestQueryData = + [[FSTQueryData alloc] initWithQuery:query + targetID:targetID + purpose:FSTQueryPurposeExistenceFilterMismatch]; + [self sendWatchRequestWithQueryData:requestQueryData]; + } + } + }]; + + // Update in-memory resume tokens. FSTLocalStore will update the persistent view of these when + // applying the completed FSTRemoteEvent. + [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^( + FSTBoxedTargetID *target, FSTTargetChange *change, BOOL *stop) { + NSData *resumeToken = change.resumeToken; + if (resumeToken.length > 0) { + FSTQueryData *queryData = _listenTargets[target]; + // A watched target might have been removed already. + if (queryData) { + _listenTargets[target] = + [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion + resumeToken:resumeToken]; + } + } + }]; + + // Finally handle remote event + [self.syncEngine applyRemoteEvent:remoteEvent]; +} + +/** Process a target error and passes the error along to SyncEngine. */ +- (void)processTargetErrorForWatchChange:(FSTWatchTargetChange *)change { + FSTAssert(change.cause, @"Handling target error without a cause"); + // Ignore targets that have been removed already. + for (FSTBoxedTargetID *targetID in change.targetIDs) { + if (self.listenTargets[targetID]) { + [self.listenTargets removeObjectForKey:targetID]; + [self.syncEngine rejectListenWithTargetID:targetID error:change.cause]; + } + } +} + +#pragma mark Write Stream + +- (void)fillWritePipeline { + while ([self canWriteMutations]) { + FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen]; + if (!batch) { + break; + } + [self commitBatch:batch]; + } +} + +/** + * Returns YES if the backend can accept additional write requests. + * + * When sending mutations to the write stream (e.g. in -fillWritePipeline), call this method first + * to check if more mutations can be sent. + * + * Currently the only thing that can prevent the backend from accepting write requests is if + * there are too many requests already outstanding. As writes complete the backend will be able + * to accept more. + */ +- (BOOL)canWriteMutations { + return self.pendingWrites.count < kMaxPendingWrites; +} + +/** Given mutations to commit, actually commits them to the backend. */ +- (void)commitBatch:(FSTMutationBatch *)batch { + FSTAssert([self canWriteMutations], @"commitBatch called when mutations can't be written"); + self.lastBatchSeen = batch.batchID; + + if (!self.writeStream.isStarted) { + [self.writeStream start]; + } + + [self.pendingWrites addObject:batch]; + + if (self.writeStream.handshakeComplete) { + [self.writeStream writeMutations:batch.mutations]; + } +} + +- (void)writeStreamDidOpen { + self.writeStreamOpenTime = [NSDate date]; + + [self.writeStream writeHandshake]; +} + +/** + * Handles a successful handshake response from the server, which is our cue to send any pending + * writes. + */ +- (void)writeStreamDidCompleteHandshake { + // Record the stream token. + [self.localStore setLastStreamToken:self.writeStream.lastStreamToken]; + + // Drain any pending writes. + // + // Note that at this point pendingWrites contains mutations that have already been accepted by + // fillWritePipeline/commitBatch. If the pipeline is full, canWriteMutations will be NO, despite + // the fact that we actually need to send mutations over. + // + // This also means that this method indirectly respects the limits imposed by canWriteMutations + // since writes can't be added to the pendingWrites array when canWriteMutations is NO. If the + // limits imposed by canWriteMutations actually protect us from DOSing ourselves then those limits + // won't be exceeded here and we'll continue to make progress. + for (FSTMutationBatch *write in self.pendingWrites) { + [self.writeStream writeMutations:write.mutations]; + } +} + +/** Handles a successful StreamingWriteResponse from the server that contains a mutation result. */ +- (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion + mutationResults:(NSArray<FSTMutationResult *> *)results { + // This is a response to a write containing mutations and should be correlated to the first + // pending write. + NSMutableArray *pendingWrites = self.pendingWrites; + FSTMutationBatch *batch = pendingWrites[0]; + [pendingWrites removeObjectAtIndex:0]; + + FSTMutationBatchResult *batchResult = + [FSTMutationBatchResult resultWithBatch:batch + commitVersion:commitVersion + mutationResults:results + streamToken:self.writeStream.lastStreamToken]; + [self.syncEngine applySuccessfulWriteWithResult:batchResult]; + + // It's possible that with the completion of this mutation another slot has freed up. + [self fillWritePipeline]; +} + +/** + * Handles the closing of the StreamingWrite RPC, either because of an error or because the RPC + * has been terminated by the client or the server. + */ +- (void)writeStreamDidClose:(NSError *_Nullable)error { + NSMutableArray *pendingWrites = self.pendingWrites; + // Ignore close if there are no pending writes. + if (pendingWrites.count == 0) { + return; + } + + FSTAssert(error, @"There are pending writes, but the write stream closed without an error."); + if ([FSTDatastore isPermanentWriteError:error]) { + if (self.writeStream.handshakeComplete) { + // This error affects the actual writes. + [self handleWriteError:error]; + } else { + // If there was an error before the handshake finished, it's possible that the server is + // unable to process the stream token we're sending. (Perhaps it's too old?) + [self handleHandshakeError:error]; + } + } + + // The write stream might have been started by refilling the write pipeline for failed writes + if (pendingWrites.count > 0 && !self.writeStream.isStarted) { + [self.writeStream start]; + } +} + +- (void)handleHandshakeError:(NSError *)error { + // Reset the token if it's a permanent error or the error code is ABORTED, signaling the write + // stream is no longer valid. + if ([FSTDatastore isPermanentWriteError:error] || [FSTDatastore isAbortedError:error]) { + NSString *token = [self.writeStream.lastStreamToken base64EncodedStringWithOptions:0]; + FSTLog(@"FSTRemoteStore %p error before completed handshake; resetting stream token %@: %@", + (__bridge void *)self, token, error); + self.writeStream.lastStreamToken = nil; + [self.localStore setLastStreamToken:nil]; + } +} + +- (void)handleWriteError:(NSError *)error { + // Only handle permanent error. If it's transient, just let the retry logic kick in. + if (![FSTDatastore isPermanentWriteError:error]) { + return; + } + + // If this was a permanent error, the request itself was the problem so it's not going to + // succeed if we resend it. + FSTMutationBatch *batch = self.pendingWrites[0]; + [self.pendingWrites removeObjectAtIndex:0]; + + // In this case it's also unlikely that the server itself is melting down--this was just a + // bad request so inhibit backoff on the next restart. + [self.writeStream inhibitBackoff]; + + [self.syncEngine rejectFailedWriteWithBatchID:batch.batchID error:error]; + + // It's possible that with the completion of this mutation another slot has freed up. + [self fillWritePipeline]; +} + +- (FSTTransaction *)transaction { + return [FSTTransaction transactionWithDatastore:self.datastore]; +} + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTSerializerBeta.h b/Firestore/Source/Remote/FSTSerializerBeta.h new file mode 100644 index 0000000..973f866 --- /dev/null +++ b/Firestore/Source/Remote/FSTSerializerBeta.h @@ -0,0 +1,110 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import <Foundation/Foundation.h> + +@class FSTDatabaseID; +@class FSTDocumentKey; +@class FSTFieldValue; +@class FSTMaybeDocument; +@class FSTMutation; +@class FSTMutationBatch; +@class FSTMutationResult; +@class FSTObjectValue; +@class FSTQuery; +@class FSTQueryData; +@class FSTSnapshotVersion; +@class FSTTimestamp; +@class FSTWatchChange; + +@class GCFSBatchGetDocumentsResponse; +@class GCFSDocument; +@class GCFSDocumentMask; +@class GCFSListenResponse; +@class GCFSTarget; +@class GCFSTarget_DocumentsTarget; +@class GCFSTarget_QueryTarget; +@class GCFSValue; +@class GCFSWrite; +@class GCFSWriteResult; + +@class GPBTimestamp; + +NS_ASSUME_NONNULL_BEGIN + +/** + * Converts internal model objects to their equivalent protocol buffer form. Methods starting with + * "encoded" convert to a protocol buffer and methods starting with "decoded" convert from a + * protocol buffer. + * + * Throws an exception if a protocol buffer is missing a critical field or has a value we can't + * interpret. + */ +@interface FSTSerializerBeta : NSObject + +- (instancetype)init NS_UNAVAILABLE; + +- (instancetype)initWithDatabaseID:(FSTDatabaseID *)databaseID NS_DESIGNATED_INITIALIZER; + +- (GPBTimestamp *)encodedTimestamp:(FSTTimestamp *)timestamp; +- (FSTTimestamp *)decodedTimestamp:(GPBTimestamp *)timestamp; + +- (GPBTimestamp *)encodedVersion:(FSTSnapshotVersion *)version; +- (FSTSnapshotVersion *)decodedVersion:(GPBTimestamp *)version; + +/** Returns the database ID, such as `projects/{project id}/databases/{database_id}`. */ +- (NSString *)encodedDatabaseID; + +- (NSString *)encodedDocumentKey:(FSTDocumentKey *)key; +- (FSTDocumentKey *)decodedDocumentKey:(NSString *)key; + +- (GCFSValue *)encodedFieldValue:(FSTFieldValue *)fieldValue; +- (FSTFieldValue *)decodedFieldValue:(GCFSValue *)valueProto; + +- (GCFSWrite *)encodedMutation:(FSTMutation *)mutation; +- (FSTMutation *)decodedMutation:(GCFSWrite *)mutation; + +- (FSTMutationResult *)decodedMutationResult:(GCFSWriteResult *)mutation; + +- (nullable NSMutableDictionary<NSString *, NSString *> *)encodedListenRequestLabelsForQueryData: + (FSTQueryData *)queryData; + +- (GCFSTarget *)encodedTarget:(FSTQueryData *)queryData; + +- (GCFSTarget_DocumentsTarget *)encodedDocumentsTarget:(FSTQuery *)query; +- (FSTQuery *)decodedQueryFromDocumentsTarget:(GCFSTarget_DocumentsTarget *)target; + +- (GCFSTarget_QueryTarget *)encodedQueryTarget:(FSTQuery *)query; +- (FSTQuery *)decodedQueryFromQueryTarget:(GCFSTarget_QueryTarget *)target; + +- (FSTWatchChange *)decodedWatchChange:(GCFSListenResponse *)watchChange; +- (FSTSnapshotVersion *)versionFromListenResponse:(GCFSListenResponse *)watchChange; + +- (GCFSDocument *)encodedDocumentWithFields:(FSTObjectValue *)objectValue key:(FSTDocumentKey *)key; + +/** + * Encodes an FSTObjectValue into a dictionary. + * @return a new dictionary that can be assigned to a field in another proto. + */ +- (NSMutableDictionary<NSString *, GCFSValue *> *)encodedFields:(FSTObjectValue *)value; + +- (FSTObjectValue *)decodedFields:(NSDictionary<NSString *, GCFSValue *> *)fields; + +- (FSTMaybeDocument *)decodedMaybeDocumentFromBatch:(GCFSBatchGetDocumentsResponse *)response; + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTSerializerBeta.m b/Firestore/Source/Remote/FSTSerializerBeta.m new file mode 100644 index 0000000..418dabd --- /dev/null +++ b/Firestore/Source/Remote/FSTSerializerBeta.m @@ -0,0 +1,1084 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "FSTSerializerBeta.h" + +#import <GRPCClient/GRPCCall.h> + +#import "Common.pbobjc.h" +#import "Document.pbobjc.h" +#import "Firestore.pbobjc.h" +#import "Latlng.pbobjc.h" +#import "Query.pbobjc.h" +#import "Status.pbobjc.h" +#import "Write.pbobjc.h" + +#import "FIRFirestoreErrors.h" +#import "FIRGeoPoint.h" +#import "FSTAssert.h" +#import "FSTDatabaseID.h" +#import "FSTDocument.h" +#import "FSTDocumentKey.h" +#import "FSTExistenceFilter.h" +#import "FSTFieldValue.h" +#import "FSTMutation.h" +#import "FSTMutationBatch.h" +#import "FSTPath.h" +#import "FSTQuery.h" +#import "FSTQueryData.h" +#import "FSTSnapshotVersion.h" +#import "FSTTimestamp.h" +#import "FSTWatchChange.h" + +NS_ASSUME_NONNULL_BEGIN + +@interface FSTSerializerBeta () +@property(nonatomic, strong, readonly) FSTDatabaseID *databaseID; +@end + +@implementation FSTSerializerBeta + +- (instancetype)initWithDatabaseID:(FSTDatabaseID *)databaseID { + self = [super init]; + if (self) { + _databaseID = databaseID; + } + return self; +} + +#pragma mark - FSTSnapshotVersion <=> GPBTimestamp + +- (GPBTimestamp *)encodedTimestamp:(FSTTimestamp *)timestamp { + GPBTimestamp *result = [GPBTimestamp message]; + result.seconds = timestamp.seconds; + result.nanos = timestamp.nanos; + return result; +} + +- (FSTTimestamp *)decodedTimestamp:(GPBTimestamp *)timestamp { + return [[FSTTimestamp alloc] initWithSeconds:timestamp.seconds nanos:timestamp.nanos]; +} + +- (GPBTimestamp *)encodedVersion:(FSTSnapshotVersion *)version { + return [self encodedTimestamp:version.timestamp]; +} + +- (FSTSnapshotVersion *)decodedVersion:(GPBTimestamp *)version { + return [FSTSnapshotVersion versionWithTimestamp:[self decodedTimestamp:version]]; +} + +#pragma mark - FIRGeoPoint <=> GTPLatLng + +- (GTPLatLng *)encodedGeoPoint:(FIRGeoPoint *)geoPoint { + GTPLatLng *latLng = [GTPLatLng message]; + latLng.latitude = geoPoint.latitude; + latLng.longitude = geoPoint.longitude; + return latLng; +} + +- (FIRGeoPoint *)decodedGeoPoint:(GTPLatLng *)latLng { + return [[FIRGeoPoint alloc] initWithLatitude:latLng.latitude longitude:latLng.longitude]; +} + +#pragma mark - FSTDocumentKey <=> Key proto + +- (NSString *)encodedDocumentKey:(FSTDocumentKey *)key { + return [self encodedResourcePathForDatabaseID:self.databaseID path:key.path]; +} + +- (FSTDocumentKey *)decodedDocumentKey:(NSString *)name { + FSTResourcePath *path = [self decodedResourcePathWithDatabaseID:name]; + FSTAssert([[path segmentAtIndex:1] isEqualToString:self.databaseID.projectID], + @"Tried to deserialize key from different project."); + FSTAssert([[path segmentAtIndex:3] isEqualToString:self.databaseID.databaseID], + @"Tried to deserialize key from different datbase."); + return [FSTDocumentKey keyWithPath:[self localResourcePathForQualifiedResourcePath:path]]; +} + +- (NSString *)encodedResourcePathForDatabaseID:(FSTDatabaseID *)databaseID + path:(FSTResourcePath *)path { + return [[[[self encodedResourcePathForDatabaseID:databaseID] pathByAppendingSegment:@"documents"] + pathByAppendingPath:path] canonicalString]; +} + +- (FSTResourcePath *)decodedResourcePathWithDatabaseID:(NSString *)name { + FSTResourcePath *path = [FSTResourcePath pathWithString:name]; + FSTAssert([self validQualifiedResourcePath:path], @"Tried to deserialize invalid key %@", path); + return path; +} + +- (NSString *)encodedQueryPath:(FSTResourcePath *)path { + if (path.length == 0) { + // If the path is empty, the backend requires we leave off the /documents at the end. + return [self encodedDatabaseID]; + } + return [self encodedResourcePathForDatabaseID:self.databaseID path:path]; +} + +- (FSTResourcePath *)decodedQueryPath:(NSString *)name { + FSTResourcePath *resource = [self decodedResourcePathWithDatabaseID:name]; + if (resource.length == 4) { + return [FSTResourcePath pathWithSegments:@[]]; + } else { + return [self localResourcePathForQualifiedResourcePath:resource]; + } +} + +- (FSTResourcePath *)encodedResourcePathForDatabaseID:(FSTDatabaseID *)databaseID { + return [FSTResourcePath + pathWithSegments:@[ @"projects", databaseID.projectID, @"databases", databaseID.databaseID ]]; +} + +- (FSTResourcePath *)localResourcePathForQualifiedResourcePath:(FSTResourcePath *)resourceName { + FSTAssert( + resourceName.length > 4 && [[resourceName segmentAtIndex:4] isEqualToString:@"documents"], + @"Tried to deserialize invalid key %@", resourceName); + return [resourceName pathByRemovingFirstSegments:5]; +} + +- (BOOL)validQualifiedResourcePath:(FSTResourcePath *)path { + return path.length >= 4 && [[path segmentAtIndex:0] isEqualToString:@"projects"] && + [[path segmentAtIndex:2] isEqualToString:@"databases"]; +} + +- (NSString *)encodedDatabaseID { + return [[self encodedResourcePathForDatabaseID:self.databaseID] canonicalString]; +} + +#pragma mark - FSTFieldValue <=> Value proto + +- (GCFSValue *)encodedFieldValue:(FSTFieldValue *)fieldValue { + Class class = [fieldValue class]; + if (class == [FSTNullValue class]) { + return [self encodedNull]; + + } else if (class == [FSTBooleanValue class]) { + return [self encodedBool:[[fieldValue value] boolValue]]; + + } else if (class == [FSTIntegerValue class]) { + return [self encodedInteger:[[fieldValue value] longLongValue]]; + + } else if (class == [FSTDoubleValue class]) { + return [self encodedDouble:[[fieldValue value] doubleValue]]; + + } else if (class == [FSTStringValue class]) { + return [self encodedString:[fieldValue value]]; + + } else if (class == [FSTTimestampValue class]) { + return [self encodedTimestampValue:((FSTTimestampValue *)fieldValue).internalValue]; + + } else if (class == [FSTGeoPointValue class]) { + return [self encodedGeoPointValue:[fieldValue value]]; + + } else if (class == [FSTBlobValue class]) { + return [self encodedBlobValue:[fieldValue value]]; + + } else if (class == [FSTReferenceValue class]) { + FSTReferenceValue *ref = (FSTReferenceValue *)fieldValue; + return [self encodedReferenceValueForDatabaseID:[ref databaseID] key:[ref value]]; + + } else if (class == [FSTObjectValue class]) { + GCFSValue *result = [GCFSValue message]; + result.mapValue = [self encodedMapValue:(FSTObjectValue *)fieldValue]; + return result; + + } else if (class == [FSTArrayValue class]) { + GCFSValue *result = [GCFSValue message]; + result.arrayValue = [self encodedArrayValue:(FSTArrayValue *)fieldValue]; + return result; + + } else { + FSTFail(@"Unhandled type %@ on %@", NSStringFromClass([fieldValue class]), fieldValue); + } +} + +- (FSTFieldValue *)decodedFieldValue:(GCFSValue *)valueProto { + switch (valueProto.valueTypeOneOfCase) { + case GCFSValue_ValueType_OneOfCase_NullValue: + return [FSTNullValue nullValue]; + + case GCFSValue_ValueType_OneOfCase_BooleanValue: + return [FSTBooleanValue booleanValue:valueProto.booleanValue]; + + case GCFSValue_ValueType_OneOfCase_IntegerValue: + return [FSTIntegerValue integerValue:valueProto.integerValue]; + + case GCFSValue_ValueType_OneOfCase_DoubleValue: + return [FSTDoubleValue doubleValue:valueProto.doubleValue]; + + case GCFSValue_ValueType_OneOfCase_StringValue: + return [FSTStringValue stringValue:valueProto.stringValue]; + + case GCFSValue_ValueType_OneOfCase_TimestampValue: + return [FSTTimestampValue timestampValue:[self decodedTimestamp:valueProto.timestampValue]]; + + case GCFSValue_ValueType_OneOfCase_GeoPointValue: + return [FSTGeoPointValue geoPointValue:[self decodedGeoPoint:valueProto.geoPointValue]]; + + case GCFSValue_ValueType_OneOfCase_BytesValue: + return [FSTBlobValue blobValue:valueProto.bytesValue]; + + case GCFSValue_ValueType_OneOfCase_ReferenceValue: + return [self decodedReferenceValue:valueProto.referenceValue]; + + case GCFSValue_ValueType_OneOfCase_ArrayValue: + return [self decodedArrayValue:valueProto.arrayValue]; + + case GCFSValue_ValueType_OneOfCase_MapValue: + return [self decodedMapValue:valueProto.mapValue]; + + default: + FSTFail(@"Unhandled type %d on %@", valueProto.valueTypeOneOfCase, valueProto); + } +} + +- (GCFSValue *)encodedNull { + GCFSValue *result = [GCFSValue message]; + result.nullValue = GPBNullValue_NullValue; + return result; +} + +- (GCFSValue *)encodedBool:(BOOL)value { + GCFSValue *result = [GCFSValue message]; + result.booleanValue = value; + return result; +} + +- (GCFSValue *)encodedDouble:(double)value { + GCFSValue *result = [GCFSValue message]; + result.doubleValue = value; + return result; +} + +- (GCFSValue *)encodedInteger:(int64_t)value { + GCFSValue *result = [GCFSValue message]; + result.integerValue = value; + return result; +} + +- (GCFSValue *)encodedString:(NSString *)value { + GCFSValue *result = [GCFSValue message]; + result.stringValue = value; + return result; +} + +- (GCFSValue *)encodedTimestampValue:(FSTTimestamp *)value { + GCFSValue *result = [GCFSValue message]; + result.timestampValue = [self encodedTimestamp:value]; + return result; +} + +- (GCFSValue *)encodedGeoPointValue:(FIRGeoPoint *)value { + GCFSValue *result = [GCFSValue message]; + result.geoPointValue = [self encodedGeoPoint:value]; + return result; +} + +- (GCFSValue *)encodedBlobValue:(NSData *)value { + GCFSValue *result = [GCFSValue message]; + result.bytesValue = value; + return result; +} + +- (GCFSValue *)encodedReferenceValueForDatabaseID:(FSTDatabaseID *)databaseID + key:(FSTDocumentKey *)key { + GCFSValue *result = [GCFSValue message]; + result.referenceValue = [self encodedResourcePathForDatabaseID:databaseID path:key.path]; + return result; +} + +- (FSTReferenceValue *)decodedReferenceValue:(NSString *)resourceName { + FSTResourcePath *path = [self decodedResourcePathWithDatabaseID:resourceName]; + NSString *project = [path segmentAtIndex:1]; + NSString *database = [path segmentAtIndex:3]; + FSTDatabaseID *databaseID = [FSTDatabaseID databaseIDWithProject:project database:database]; + FSTDocumentKey *key = + [FSTDocumentKey keyWithPath:[self localResourcePathForQualifiedResourcePath:path]]; + return [FSTReferenceValue referenceValue:key databaseID:databaseID]; +} + +- (GCFSArrayValue *)encodedArrayValue:(FSTArrayValue *)arrayValue { + GCFSArrayValue *proto = [GCFSArrayValue message]; + NSMutableArray<GCFSValue *> *protoContents = [proto valuesArray]; + + [[arrayValue internalValue] + enumerateObjectsUsingBlock:^(FSTFieldValue *value, NSUInteger idx, BOOL *stop) { + GCFSValue *converted = [self encodedFieldValue:value]; + [protoContents addObject:converted]; + }]; + return proto; +} + +- (FSTArrayValue *)decodedArrayValue:(GCFSArrayValue *)arrayValue { + NSMutableArray<FSTFieldValue *> *contents = + [NSMutableArray arrayWithCapacity:arrayValue.valuesArray_Count]; + + [arrayValue.valuesArray + enumerateObjectsUsingBlock:^(GCFSValue *value, NSUInteger idx, BOOL *stop) { + [contents addObject:[self decodedFieldValue:value]]; + }]; + return [[FSTArrayValue alloc] initWithValueNoCopy:contents]; +} + +- (GCFSMapValue *)encodedMapValue:(FSTObjectValue *)value { + GCFSMapValue *result = [GCFSMapValue message]; + result.fields = [self encodedFields:value]; + return result; +} + +- (FSTObjectValue *)decodedMapValue:(GCFSMapValue *)map { + return [self decodedFields:map.fields]; +} + +/** + * Encodes an FSTObjectValue into a dictionary. + * @return a new dictionary that can be assigned to a field in another proto. + */ +- (NSMutableDictionary<NSString *, GCFSValue *> *)encodedFields:(FSTObjectValue *)value { + FSTImmutableSortedDictionary<NSString *, FSTFieldValue *> *fields = value.internalValue; + NSMutableDictionary<NSString *, GCFSValue *> *result = [NSMutableDictionary dictionary]; + [fields enumerateKeysAndObjectsUsingBlock:^(NSString *key, FSTFieldValue *obj, BOOL *stop) { + GCFSValue *converted = [self encodedFieldValue:obj]; + result[key] = converted; + }]; + return result; +} + +- (FSTObjectValue *)decodedFields:(NSDictionary<NSString *, GCFSValue *> *)fields { + __block FSTObjectValue *result = [FSTObjectValue objectValue]; + [fields enumerateKeysAndObjectsUsingBlock:^(NSString *_Nonnull key, GCFSValue *_Nonnull obj, + BOOL *_Nonnull stop) { + FSTFieldPath *path = [FSTFieldPath pathWithSegments:@[ key ]]; + FSTFieldValue *value = [self decodedFieldValue:obj]; + result = [result objectBySettingValue:value forPath:path]; + }]; + return result; +} + +#pragma mark - FSTObjectValue <=> Document proto + +- (GCFSDocument *)encodedDocumentWithFields:(FSTObjectValue *)objectValue + key:(FSTDocumentKey *)key { + GCFSDocument *proto = [GCFSDocument message]; + proto.name = [self encodedDocumentKey:key]; + proto.fields = [self encodedFields:objectValue]; + return proto; +} + +#pragma mark - FSTMaybeDocument <= BatchGetDocumentsResponse proto + +- (FSTMaybeDocument *)decodedMaybeDocumentFromBatch:(GCFSBatchGetDocumentsResponse *)response { + switch (response.resultOneOfCase) { + case GCFSBatchGetDocumentsResponse_Result_OneOfCase_Found: + return [self decodedFoundDocument:response]; + case GCFSBatchGetDocumentsResponse_Result_OneOfCase_Missing: + return [self decodedDeletedDocument:response]; + default: + FSTFail(@"Unknown document type: %@", response); + } +} + +- (FSTDocument *)decodedFoundDocument:(GCFSBatchGetDocumentsResponse *)response { + FSTAssert(!!response.found, @"Tried to deserialize a found document from a deleted document."); + FSTDocumentKey *key = [self decodedDocumentKey:response.found.name]; + FSTObjectValue *value = [self decodedFields:response.found.fields]; + FSTSnapshotVersion *version = [self decodedVersion:response.found.updateTime]; + FSTAssert(![version isEqual:[FSTSnapshotVersion noVersion]], + @"Got a document response with no snapshot version"); + + return [FSTDocument documentWithData:value key:key version:version hasLocalMutations:NO]; +} + +- (FSTDeletedDocument *)decodedDeletedDocument:(GCFSBatchGetDocumentsResponse *)response { + FSTAssert(!!response.missing, @"Tried to deserialize a deleted document from a found document."); + FSTDocumentKey *key = [self decodedDocumentKey:response.missing]; + FSTSnapshotVersion *version = [self decodedVersion:response.readTime]; + FSTAssert(![version isEqual:[FSTSnapshotVersion noVersion]], + @"Got a no document response with no snapshot version"); + return [FSTDeletedDocument documentWithKey:key version:version]; +} + +#pragma mark - FSTMutation => GCFSWrite proto + +- (GCFSWrite *)encodedMutation:(FSTMutation *)mutation { + GCFSWrite *proto = [GCFSWrite message]; + + Class mutationClass = [mutation class]; + if (mutationClass == [FSTSetMutation class]) { + FSTSetMutation *set = (FSTSetMutation *)mutation; + proto.update = [self encodedDocumentWithFields:set.value key:set.key]; + + } else if (mutationClass == [FSTPatchMutation class]) { + FSTPatchMutation *patch = (FSTPatchMutation *)mutation; + proto.update = [self encodedDocumentWithFields:patch.value key:patch.key]; + proto.updateMask = [self encodedFieldMask:patch.fieldMask]; + + } else if (mutationClass == [FSTTransformMutation class]) { + FSTTransformMutation *transform = (FSTTransformMutation *)mutation; + + proto.transform = [GCFSDocumentTransform message]; + proto.transform.document = [self encodedDocumentKey:transform.key]; + proto.transform.fieldTransformsArray = [self encodedFieldTransforms:transform.fieldTransforms]; + // NOTE: We set a precondition of exists: true as a safety-check, since we always combine + // FSTTransformMutations with an FSTSetMutation or FSTPatchMutation which (if successful) should + // end up with an existing document. + proto.currentDocument.exists = YES; + + } else if (mutationClass == [FSTDeleteMutation class]) { + FSTDeleteMutation *delete = (FSTDeleteMutation *)mutation; + proto.delete_p = [self encodedDocumentKey:delete.key]; + + } else { + FSTFail(@"Unknown mutation type %@", NSStringFromClass(mutationClass)); + } + + if (!mutation.precondition.isNone) { + proto.currentDocument = [self encodedPrecondition:mutation.precondition]; + } + + return proto; +} + +- (FSTMutation *)decodedMutation:(GCFSWrite *)mutation { + FSTPrecondition *precondition = [mutation hasCurrentDocument] + ? [self decodedPrecondition:mutation.currentDocument] + : [FSTPrecondition none]; + + switch (mutation.operationOneOfCase) { + case GCFSWrite_Operation_OneOfCase_Update: + if (mutation.hasUpdateMask) { + return [[FSTPatchMutation alloc] initWithKey:[self decodedDocumentKey:mutation.update.name] + fieldMask:[self decodedFieldMask:mutation.updateMask] + value:[self decodedFields:mutation.update.fields] + precondition:precondition]; + } else { + return [[FSTSetMutation alloc] initWithKey:[self decodedDocumentKey:mutation.update.name] + value:[self decodedFields:mutation.update.fields] + precondition:precondition]; + } + + case GCFSWrite_Operation_OneOfCase_Delete_p: + return [[FSTDeleteMutation alloc] initWithKey:[self decodedDocumentKey:mutation.delete_p] + precondition:precondition]; + + case GCFSWrite_Operation_OneOfCase_Transform: { + FSTPreconditionExists exists = precondition.exists; + FSTAssert(exists == FSTPreconditionExistsYes, + @"Transforms must have precondition \"exists == true\""); + + return [[FSTTransformMutation alloc] + initWithKey:[self decodedDocumentKey:mutation.transform.document] + fieldTransforms:[self decodedFieldTransforms:mutation.transform.fieldTransformsArray]]; + } + + default: + // Note that insert is intentionally unhandled, since we don't ever deal in them. + FSTFail(@"Unknown mutation operation: %d", mutation.operationOneOfCase); + } +} + +- (GCFSPrecondition *)encodedPrecondition:(FSTPrecondition *)precondition { + FSTAssert(!precondition.isNone, @"Can't serialize an empty precondition"); + GCFSPrecondition *message = [GCFSPrecondition message]; + if (precondition.updateTime) { + message.updateTime = [self encodedVersion:precondition.updateTime]; + } else if (precondition.exists != FSTPreconditionExistsNotSet) { + message.exists = precondition.exists == FSTPreconditionExistsYes; + } else { + FSTFail(@"Unknown precondition: %@", precondition); + } + return message; +} + +- (FSTPrecondition *)decodedPrecondition:(GCFSPrecondition *)precondition { + switch (precondition.conditionTypeOneOfCase) { + case GCFSPrecondition_ConditionType_OneOfCase_GPBUnsetOneOfCase: + return [FSTPrecondition none]; + + case GCFSPrecondition_ConditionType_OneOfCase_Exists: + return [FSTPrecondition preconditionWithExists:precondition.exists]; + + case GCFSPrecondition_ConditionType_OneOfCase_UpdateTime: + return [FSTPrecondition + preconditionWithUpdateTime:[self decodedVersion:precondition.updateTime]]; + + default: + FSTFail(@"Unrecognized Precondition one-of case %@", precondition); + } +} + +- (GCFSDocumentMask *)encodedFieldMask:(FSTFieldMask *)fieldMask { + GCFSDocumentMask *mask = [GCFSDocumentMask message]; + for (FSTFieldPath *field in fieldMask.fields) { + [mask.fieldPathsArray addObject:field.canonicalString]; + } + return mask; +} + +- (FSTFieldMask *)decodedFieldMask:(GCFSDocumentMask *)fieldMask { + NSMutableArray<FSTFieldPath *> *fields = + [NSMutableArray arrayWithCapacity:fieldMask.fieldPathsArray_Count]; + for (NSString *path in fieldMask.fieldPathsArray) { + [fields addObject:[FSTFieldPath pathWithServerFormat:path]]; + } + return [[FSTFieldMask alloc] initWithFields:fields]; +} + +- (NSMutableArray<GCFSDocumentTransform_FieldTransform *> *)encodedFieldTransforms: + (NSArray<FSTFieldTransform *> *)fieldTransforms { + NSMutableArray *protos = [NSMutableArray array]; + for (FSTFieldTransform *fieldTransform in fieldTransforms) { + FSTAssert([fieldTransform.transform isKindOfClass:[FSTServerTimestampTransform class]], + @"Unknown transform: %@", fieldTransform.transform); + GCFSDocumentTransform_FieldTransform *proto = [GCFSDocumentTransform_FieldTransform message]; + proto.fieldPath = fieldTransform.path.canonicalString; + proto.setToServerValue = GCFSDocumentTransform_FieldTransform_ServerValue_RequestTime; + [protos addObject:proto]; + } + return protos; +} + +- (NSArray<FSTFieldTransform *> *)decodedFieldTransforms: + (NSArray<GCFSDocumentTransform_FieldTransform *> *)protos { + NSMutableArray<FSTFieldTransform *> *fieldTransforms = [NSMutableArray array]; + for (GCFSDocumentTransform_FieldTransform *proto in protos) { + FSTAssert( + proto.setToServerValue == GCFSDocumentTransform_FieldTransform_ServerValue_RequestTime, + @"Unknown transform setToServerValue: %d", proto.setToServerValue); + [fieldTransforms + addObject:[[FSTFieldTransform alloc] + initWithPath:[FSTFieldPath pathWithServerFormat:proto.fieldPath] + transform:[FSTServerTimestampTransform serverTimestampTransform]]]; + } + return fieldTransforms; +} + +#pragma mark - FSTMutationResult <= GCFSWriteResult proto + +- (FSTMutationResult *)decodedMutationResult:(GCFSWriteResult *)mutation { + // NOTE: Deletes don't have an updateTime. + FSTSnapshotVersion *_Nullable version = + mutation.updateTime ? [self decodedVersion:mutation.updateTime] : nil; + NSMutableArray *_Nullable transformResults = nil; + if (mutation.transformResultsArray.count > 0) { + transformResults = [NSMutableArray array]; + for (GCFSValue *result in mutation.transformResultsArray) { + [transformResults addObject:[self decodedFieldValue:result]]; + } + } + return [[FSTMutationResult alloc] initWithVersion:version transformResults:transformResults]; +} + +#pragma mark - FSTQueryData => GCFSTarget proto + +- (nullable NSMutableDictionary<NSString *, NSString *> *)encodedListenRequestLabelsForQueryData: + (FSTQueryData *)queryData { + NSString *value = [self encodedLabelForPurpose:queryData.purpose]; + if (!value) { + return nil; + } + + NSMutableDictionary<NSString *, NSString *> *result = + [NSMutableDictionary dictionaryWithCapacity:1]; + [result setObject:value forKey:@"goog-listen-tags"]; + return result; +} + +- (nullable NSString *)encodedLabelForPurpose:(FSTQueryPurpose)purpose { + switch (purpose) { + case FSTQueryPurposeListen: + return nil; + case FSTQueryPurposeExistenceFilterMismatch: + return @"existence-filter-mismatch"; + case FSTQueryPurposeLimboResolution: + return @"limbo-document"; + default: + FSTFail(@"Unrecognized query purpose: %lu", (unsigned long)purpose); + } +} + +- (GCFSTarget *)encodedTarget:(FSTQueryData *)queryData { + GCFSTarget *result = [GCFSTarget message]; + FSTQuery *query = queryData.query; + + if ([query isDocumentQuery]) { + result.documents = [self encodedDocumentsTarget:query]; + } else { + result.query = [self encodedQueryTarget:query]; + } + + result.targetId = queryData.targetID; + if (queryData.resumeToken.length > 0) { + result.resumeToken = queryData.resumeToken; + } + + return result; +} + +- (GCFSTarget_DocumentsTarget *)encodedDocumentsTarget:(FSTQuery *)query { + GCFSTarget_DocumentsTarget *result = [GCFSTarget_DocumentsTarget message]; + NSMutableArray<NSString *> *docs = result.documentsArray; + [docs addObject:[self encodedQueryPath:query.path]]; + return result; +} + +- (FSTQuery *)decodedQueryFromDocumentsTarget:(GCFSTarget_DocumentsTarget *)target { + NSArray<NSString *> *documents = target.documentsArray; + FSTAssert(documents.count == 1, @"DocumentsTarget contained other than 1 document %lu", + (unsigned long)documents.count); + + NSString *name = documents[0]; + return [FSTQuery queryWithPath:[self decodedQueryPath:name]]; +} + +- (GCFSTarget_QueryTarget *)encodedQueryTarget:(FSTQuery *)query { + // Dissect the path into parent, collectionId, and optional key filter. + GCFSTarget_QueryTarget *queryTarget = [GCFSTarget_QueryTarget message]; + if (query.path.length == 0) { + queryTarget.parent = [self encodedQueryPath:query.path]; + } else { + FSTResourcePath *path = query.path; + FSTAssert(path.length % 2 != 0, @"Document queries with filters are not supported."); + queryTarget.parent = [self encodedQueryPath:[path pathByRemovingLastSegment]]; + GCFSStructuredQuery_CollectionSelector *from = [GCFSStructuredQuery_CollectionSelector message]; + from.collectionId = path.lastSegment; + [queryTarget.structuredQuery.fromArray addObject:from]; + } + + // Encode the filters. + GCFSStructuredQuery_Filter *_Nullable where = [self encodedFilters:query.filters]; + if (where) { + queryTarget.structuredQuery.where = where; + } + + NSArray<GCFSStructuredQuery_Order *> *orders = [self encodedSortOrders:query.sortOrders]; + if (orders.count) { + [queryTarget.structuredQuery.orderByArray addObjectsFromArray:orders]; + } + + if (query.limit != NSNotFound) { + queryTarget.structuredQuery.limit.value = (int32_t)query.limit; + } + + if (query.startAt) { + queryTarget.structuredQuery.startAt = [self encodedBound:query.startAt]; + } + + if (query.endAt) { + queryTarget.structuredQuery.endAt = [self encodedBound:query.endAt]; + } + + return queryTarget; +} + +- (FSTQuery *)decodedQueryFromQueryTarget:(GCFSTarget_QueryTarget *)target { + FSTResourcePath *path = [self decodedQueryPath:target.parent]; + + GCFSStructuredQuery *query = target.structuredQuery; + NSUInteger fromCount = query.fromArray_Count; + if (fromCount > 0) { + FSTAssert(fromCount == 1, + @"StructuredQuery.from with more than one collection is not supported."); + + GCFSStructuredQuery_CollectionSelector *from = query.fromArray[0]; + path = [path pathByAppendingSegment:from.collectionId]; + } + + NSArray<id<FSTFilter>> *filterBy; + if (query.hasWhere) { + filterBy = [self decodedFilters:query.where]; + } else { + filterBy = @[]; + } + + NSArray<FSTSortOrder *> *orderBy; + if (query.orderByArray_Count > 0) { + orderBy = [self decodedSortOrders:query.orderByArray]; + } else { + orderBy = @[]; + } + + NSInteger limit = NSNotFound; + if (query.hasLimit) { + limit = query.limit.value; + } + + FSTBound *_Nullable startAt; + if (query.hasStartAt) { + startAt = [self decodedBound:query.startAt]; + } + + FSTBound *_Nullable endAt; + if (query.hasEndAt) { + endAt = [self decodedBound:query.endAt]; + } + + return [[FSTQuery alloc] initWithPath:path + filterBy:filterBy + orderBy:orderBy + limit:limit + startAt:startAt + endAt:endAt]; +} + +#pragma mark Filters + +- (GCFSStructuredQuery_Filter *_Nullable)encodedFilters:(NSArray<id<FSTFilter>> *)filters { + if (filters.count == 0) { + return nil; + } + NSMutableArray<GCFSStructuredQuery_Filter *> *protos = [NSMutableArray array]; + for (id<FSTFilter> filter in filters) { + if ([filter isKindOfClass:[FSTRelationFilter class]]) { + [protos addObject:[self encodedRelationFilter:filter]]; + } else { + [protos addObject:[self encodedUnaryFilter:filter]]; + } + } + if (protos.count == 1) { + // Special case: no existing filters and we only need to add one filter. This can be made the + // single root filter without a composite filter. + return protos[0]; + } + GCFSStructuredQuery_Filter *composite = [GCFSStructuredQuery_Filter message]; + composite.compositeFilter.op = GCFSStructuredQuery_CompositeFilter_Operator_And; + composite.compositeFilter.filtersArray = protos; + return composite; +} + +- (NSArray<id<FSTFilter>> *)decodedFilters:(GCFSStructuredQuery_Filter *)proto { + NSMutableArray<id<FSTFilter>> *result = [NSMutableArray array]; + + NSArray<GCFSStructuredQuery_Filter *> *filters; + if (proto.filterTypeOneOfCase == + GCFSStructuredQuery_Filter_FilterType_OneOfCase_CompositeFilter) { + FSTAssert(proto.compositeFilter.op == GCFSStructuredQuery_CompositeFilter_Operator_And, + @"Only AND-type composite filters are supported, got %d", proto.compositeFilter.op); + filters = proto.compositeFilter.filtersArray; + } else { + filters = @[ proto ]; + } + + for (GCFSStructuredQuery_Filter *filter in filters) { + switch (filter.filterTypeOneOfCase) { + case GCFSStructuredQuery_Filter_FilterType_OneOfCase_CompositeFilter: + FSTFail(@"Nested composite filters are not supported"); + + case GCFSStructuredQuery_Filter_FilterType_OneOfCase_FieldFilter: + [result addObject:[self decodedRelationFilter:filter.fieldFilter]]; + break; + + case GCFSStructuredQuery_Filter_FilterType_OneOfCase_UnaryFilter: + [result addObject:[self decodedUnaryFilter:filter.unaryFilter]]; + break; + + default: + FSTFail(@"Unrecognized Filter.filterType %d", filter.filterTypeOneOfCase); + } + } + return result; +} + +- (GCFSStructuredQuery_Filter *)encodedRelationFilter:(FSTRelationFilter *)filter { + GCFSStructuredQuery_Filter *proto = [GCFSStructuredQuery_Filter message]; + GCFSStructuredQuery_FieldFilter *fieldFilter = proto.fieldFilter; + fieldFilter.field = [self encodedFieldPath:filter.field]; + fieldFilter.op = [self encodedRelationFilterOperator:filter.filterOperator]; + fieldFilter.value = [self encodedFieldValue:filter.value]; + return proto; +} + +- (FSTRelationFilter *)decodedRelationFilter:(GCFSStructuredQuery_FieldFilter *)proto { + FSTFieldPath *fieldPath = [FSTFieldPath pathWithServerFormat:proto.field.fieldPath]; + FSTRelationFilterOperator filterOperator = [self decodedRelationFilterOperator:proto.op]; + FSTFieldValue *value = [self decodedFieldValue:proto.value]; + return [FSTRelationFilter filterWithField:fieldPath filterOperator:filterOperator value:value]; +} + +- (GCFSStructuredQuery_Filter *)encodedUnaryFilter:(id<FSTFilter>)filter { + GCFSStructuredQuery_Filter *proto = [GCFSStructuredQuery_Filter message]; + proto.unaryFilter.field = [self encodedFieldPath:filter.field]; + if ([filter isKindOfClass:[FSTNanFilter class]]) { + proto.unaryFilter.op = GCFSStructuredQuery_UnaryFilter_Operator_IsNan; + } else if ([filter isKindOfClass:[FSTNullFilter class]]) { + proto.unaryFilter.op = GCFSStructuredQuery_UnaryFilter_Operator_IsNull; + } else { + FSTFail(@"Unrecognized filter: %@", filter); + } + return proto; +} + +- (id<FSTFilter>)decodedUnaryFilter:(GCFSStructuredQuery_UnaryFilter *)proto { + FSTFieldPath *field = [FSTFieldPath pathWithServerFormat:proto.field.fieldPath]; + switch (proto.op) { + case GCFSStructuredQuery_UnaryFilter_Operator_IsNan: + return [[FSTNanFilter alloc] initWithField:field]; + + case GCFSStructuredQuery_UnaryFilter_Operator_IsNull: + return [[FSTNullFilter alloc] initWithField:field]; + + default: + FSTFail(@"Unrecognized UnaryFilter.operator %d", proto.op); + } +} + +- (GCFSStructuredQuery_FieldReference *)encodedFieldPath:(FSTFieldPath *)fieldPath { + GCFSStructuredQuery_FieldReference *ref = [GCFSStructuredQuery_FieldReference message]; + ref.fieldPath = fieldPath.canonicalString; + return ref; +} + +- (GCFSStructuredQuery_FieldFilter_Operator)encodedRelationFilterOperator: + (FSTRelationFilterOperator)filterOperator { + switch (filterOperator) { + case FSTRelationFilterOperatorLessThan: + return GCFSStructuredQuery_FieldFilter_Operator_LessThan; + case FSTRelationFilterOperatorLessThanOrEqual: + return GCFSStructuredQuery_FieldFilter_Operator_LessThanOrEqual; + case FSTRelationFilterOperatorEqual: + return GCFSStructuredQuery_FieldFilter_Operator_Equal; + case FSTRelationFilterOperatorGreaterThanOrEqual: + return GCFSStructuredQuery_FieldFilter_Operator_GreaterThanOrEqual; + case FSTRelationFilterOperatorGreaterThan: + return GCFSStructuredQuery_FieldFilter_Operator_GreaterThan; + default: + FSTFail(@"Unhandled FSTRelationFilterOperator: %ld", (long)filterOperator); + } +} + +- (FSTRelationFilterOperator)decodedRelationFilterOperator: + (GCFSStructuredQuery_FieldFilter_Operator)filterOperator { + switch (filterOperator) { + case GCFSStructuredQuery_FieldFilter_Operator_LessThan: + return FSTRelationFilterOperatorLessThan; + case GCFSStructuredQuery_FieldFilter_Operator_LessThanOrEqual: + return FSTRelationFilterOperatorLessThanOrEqual; + case GCFSStructuredQuery_FieldFilter_Operator_Equal: + return FSTRelationFilterOperatorEqual; + case GCFSStructuredQuery_FieldFilter_Operator_GreaterThanOrEqual: + return FSTRelationFilterOperatorGreaterThanOrEqual; + case GCFSStructuredQuery_FieldFilter_Operator_GreaterThan: + return FSTRelationFilterOperatorGreaterThan; + default: + FSTFail(@"Unhandled FieldFilter.operator: %d", filterOperator); + } +} + +#pragma mark Property Orders + +- (NSArray<GCFSStructuredQuery_Order *> *)encodedSortOrders:(NSArray<FSTSortOrder *> *)orders { + NSMutableArray<GCFSStructuredQuery_Order *> *protos = [NSMutableArray array]; + for (FSTSortOrder *order in orders) { + [protos addObject:[self encodedSortOrder:order]]; + } + return protos; +} + +- (NSArray<FSTSortOrder *> *)decodedSortOrders:(NSArray<GCFSStructuredQuery_Order *> *)protos { + NSMutableArray<FSTSortOrder *> *result = [NSMutableArray arrayWithCapacity:protos.count]; + for (GCFSStructuredQuery_Order *orderProto in protos) { + [result addObject:[self decodedSortOrder:orderProto]]; + } + return result; +} + +- (GCFSStructuredQuery_Order *)encodedSortOrder:(FSTSortOrder *)sortOrder { + GCFSStructuredQuery_Order *proto = [GCFSStructuredQuery_Order message]; + proto.field = [self encodedFieldPath:sortOrder.field]; + if (sortOrder.ascending) { + proto.direction = GCFSStructuredQuery_Direction_Ascending; + } else { + proto.direction = GCFSStructuredQuery_Direction_Descending; + } + return proto; +} + +- (FSTSortOrder *)decodedSortOrder:(GCFSStructuredQuery_Order *)proto { + FSTFieldPath *fieldPath = [FSTFieldPath pathWithServerFormat:proto.field.fieldPath]; + BOOL ascending; + switch (proto.direction) { + case GCFSStructuredQuery_Direction_Ascending: + ascending = YES; + break; + case GCFSStructuredQuery_Direction_Descending: + ascending = NO; + break; + default: + FSTFail(@"Unrecognized GCFSStructuredQuery_Direction %d", proto.direction); + } + return [FSTSortOrder sortOrderWithFieldPath:fieldPath ascending:ascending]; +} + +#pragma mark - Bounds/Cursors + +- (GCFSCursor *)encodedBound:(FSTBound *)bound { + GCFSCursor *proto = [GCFSCursor message]; + proto.before = bound.isBefore; + for (FSTFieldValue *fieldValue in bound.position) { + GCFSValue *value = [self encodedFieldValue:fieldValue]; + [proto.valuesArray addObject:value]; + } + return proto; +} + +- (FSTBound *)decodedBound:(GCFSCursor *)proto { + NSMutableArray<FSTFieldValue *> *indexComponents = [NSMutableArray array]; + + for (GCFSValue *valueProto in proto.valuesArray) { + FSTFieldValue *value = [self decodedFieldValue:valueProto]; + [indexComponents addObject:value]; + } + + return [FSTBound boundWithPosition:indexComponents isBefore:proto.before]; +} + +#pragma mark - FSTWatchChange <= GCFSListenResponse proto + +- (FSTWatchChange *)decodedWatchChange:(GCFSListenResponse *)watchChange { + switch (watchChange.responseTypeOneOfCase) { + case GCFSListenResponse_ResponseType_OneOfCase_TargetChange: + return [self decodedTargetChangeFromWatchChange:watchChange.targetChange]; + + case GCFSListenResponse_ResponseType_OneOfCase_DocumentChange: + return [self decodedDocumentChange:watchChange.documentChange]; + + case GCFSListenResponse_ResponseType_OneOfCase_DocumentDelete: + return [self decodedDocumentDelete:watchChange.documentDelete]; + + case GCFSListenResponse_ResponseType_OneOfCase_DocumentRemove: + return [self decodedDocumentRemove:watchChange.documentRemove]; + + case GCFSListenResponse_ResponseType_OneOfCase_Filter: + return [self decodedExistenceFilterWatchChange:watchChange.filter]; + + default: + FSTFail(@"Unknown WatchChange.changeType %" PRId32, watchChange.responseTypeOneOfCase); + } +} + +- (FSTSnapshotVersion *)versionFromListenResponse:(GCFSListenResponse *)watchChange { + // We have only reached a consistent snapshot for the entire stream if there is a read_time set + // and it applies to all targets (i.e. the list of targets is empty). The backend is guaranteed to + // send such responses. + if (watchChange.responseTypeOneOfCase != GCFSListenResponse_ResponseType_OneOfCase_TargetChange) { + return [FSTSnapshotVersion noVersion]; + } + if (watchChange.targetChange.targetIdsArray.count != 0) { + return [FSTSnapshotVersion noVersion]; + } + return [self decodedVersion:watchChange.targetChange.readTime]; +} + +- (FSTWatchTargetChange *)decodedTargetChangeFromWatchChange:(GCFSTargetChange *)change { + FSTWatchTargetChangeState state = [self decodedWatchTargetChangeState:change.targetChangeType]; + NSMutableArray<NSNumber *> *targetIDs = + [NSMutableArray arrayWithCapacity:change.targetIdsArray_Count]; + + [change.targetIdsArray enumerateValuesWithBlock:^(int32_t value, NSUInteger idx, BOOL *stop) { + [targetIDs addObject:@(value)]; + }]; + + NSError *cause = nil; + if (change.hasCause) { + cause = [NSError errorWithDomain:FIRFirestoreErrorDomain + code:change.cause.code + userInfo:@{NSLocalizedDescriptionKey : change.cause.message}]; + } + + return [[FSTWatchTargetChange alloc] initWithState:state + targetIDs:targetIDs + resumeToken:change.resumeToken + cause:cause]; +} + +- (FSTWatchTargetChangeState)decodedWatchTargetChangeState: + (GCFSTargetChange_TargetChangeType)state { + switch (state) { + case GCFSTargetChange_TargetChangeType_NoChange: + return FSTWatchTargetChangeStateNoChange; + case GCFSTargetChange_TargetChangeType_Add: + return FSTWatchTargetChangeStateAdded; + case GCFSTargetChange_TargetChangeType_Remove: + return FSTWatchTargetChangeStateRemoved; + case GCFSTargetChange_TargetChangeType_Current: + return FSTWatchTargetChangeStateCurrent; + case GCFSTargetChange_TargetChangeType_Reset: + return FSTWatchTargetChangeStateReset; + default: + FSTFail(@"Unexpected TargetChange.state: %" PRId32, state); + } +} + +- (NSArray<NSNumber *> *)decodedIntegerArray:(GPBInt32Array *)values { + NSMutableArray<NSNumber *> *result = [NSMutableArray arrayWithCapacity:values.count]; + [values enumerateValuesWithBlock:^(int32_t value, NSUInteger idx, BOOL *stop) { + [result addObject:@(value)]; + }]; + return result; +} + +- (FSTDocumentWatchChange *)decodedDocumentChange:(GCFSDocumentChange *)change { + FSTObjectValue *value = [self decodedFields:change.document.fields]; + FSTDocumentKey *key = [self decodedDocumentKey:change.document.name]; + FSTSnapshotVersion *version = [self decodedVersion:change.document.updateTime]; + FSTAssert(![version isEqual:[FSTSnapshotVersion noVersion]], + @"Got a document change with no snapshot version"); + FSTMaybeDocument *document = + [FSTDocument documentWithData:value key:key version:version hasLocalMutations:NO]; + + NSArray<NSNumber *> *updatedTargetIds = [self decodedIntegerArray:change.targetIdsArray]; + NSArray<NSNumber *> *removedTargetIds = [self decodedIntegerArray:change.removedTargetIdsArray]; + + return [[FSTDocumentWatchChange alloc] initWithUpdatedTargetIDs:updatedTargetIds + removedTargetIDs:removedTargetIds + documentKey:document.key + document:document]; +} + +- (FSTDocumentWatchChange *)decodedDocumentDelete:(GCFSDocumentDelete *)change { + FSTDocumentKey *key = [self decodedDocumentKey:change.document]; + // Note that version might be unset in which case we use [FSTSnapshotVersion noVersion] + FSTSnapshotVersion *version = [self decodedVersion:change.readTime]; + FSTMaybeDocument *document = [FSTDeletedDocument documentWithKey:key version:version]; + + NSArray<NSNumber *> *removedTargetIds = [self decodedIntegerArray:change.removedTargetIdsArray]; + + return [[FSTDocumentWatchChange alloc] initWithUpdatedTargetIDs:@[] + removedTargetIDs:removedTargetIds + documentKey:document.key + document:document]; +} + +- (FSTDocumentWatchChange *)decodedDocumentRemove:(GCFSDocumentRemove *)change { + FSTDocumentKey *key = [self decodedDocumentKey:change.document]; + NSArray<NSNumber *> *removedTargetIds = [self decodedIntegerArray:change.removedTargetIdsArray]; + + return [[FSTDocumentWatchChange alloc] initWithUpdatedTargetIDs:@[] + removedTargetIDs:removedTargetIds + documentKey:key + document:nil]; +} + +- (FSTExistenceFilterWatchChange *)decodedExistenceFilterWatchChange:(GCFSExistenceFilter *)filter { + // TODO(dimond): implement existence filter parsing + FSTExistenceFilter *existenceFilter = [FSTExistenceFilter filterWithCount:filter.count]; + FSTTargetID targetID = filter.targetId; + return [FSTExistenceFilterWatchChange changeWithFilter:existenceFilter targetID:targetID]; +} + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTWatchChange.h b/Firestore/Source/Remote/FSTWatchChange.h new file mode 100644 index 0000000..6b65279 --- /dev/null +++ b/Firestore/Source/Remote/FSTWatchChange.h @@ -0,0 +1,118 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import <Foundation/Foundation.h> + +#import "FSTTypes.h" + +@class FSTDocumentKey; +@class FSTExistenceFilter; +@class FSTMaybeDocument; +@class FSTSnapshotVersion; + +NS_ASSUME_NONNULL_BEGIN + +/** + * FSTWatchChange is the internal representation of the watcher API protocol buffers. + * This is an empty abstract class so that all the different kinds of changes can have a common + * base class. + */ +@interface FSTWatchChange : NSObject +@end + +/** + * FSTDocumentWatchChange represents a changed document and a list of target ids to which this + * change applies. If the document has been deleted, the deleted document will be provided. + */ +@interface FSTDocumentWatchChange : FSTWatchChange + +- (instancetype)initWithUpdatedTargetIDs:(NSArray<NSNumber *> *)updatedTargetIDs + removedTargetIDs:(NSArray<NSNumber *> *)removedTargetIDs + documentKey:(FSTDocumentKey *)documentKey + document:(nullable FSTMaybeDocument *)document + NS_DESIGNATED_INITIALIZER; + +- (instancetype)init NS_UNAVAILABLE; + +/** The new document applies to all of these targets. */ +@property(nonatomic, strong, readonly) NSArray<NSNumber *> *updatedTargetIDs; + +/** The new document is removed from all of these targets. */ +@property(nonatomic, strong, readonly) NSArray<NSNumber *> *removedTargetIDs; + +/** The key of the document for this change. */ +@property(nonatomic, strong, readonly) FSTDocumentKey *documentKey; + +/** + * The new document or DeletedDocument if it was deleted. Is null if the document went out of + * view without the server sending a new document. + */ +@property(nonatomic, strong, readonly, nullable) FSTMaybeDocument *document; + +@end + +/** + * An ExistenceFilterWatchChange applies to the targets and is required to verify the current client + * state against expected state sent from the server. + */ +@interface FSTExistenceFilterWatchChange : FSTWatchChange + ++ (instancetype)changeWithFilter:(FSTExistenceFilter *)filter targetID:(FSTTargetID)targetID; + +- (instancetype)init NS_UNAVAILABLE; + +@property(nonatomic, strong, readonly) FSTExistenceFilter *filter; +@property(nonatomic, assign, readonly) FSTTargetID targetID; +@end + +/** FSTWatchTargetChangeState is the kind of change that happened to the watch target. */ +typedef NS_ENUM(NSInteger, FSTWatchTargetChangeState) { + FSTWatchTargetChangeStateNoChange, + FSTWatchTargetChangeStateAdded, + FSTWatchTargetChangeStateRemoved, + FSTWatchTargetChangeStateCurrent, + FSTWatchTargetChangeStateReset, +}; + +/** FSTWatchTargetChange is a change to a watch target. */ +@interface FSTWatchTargetChange : FSTWatchChange + +- (instancetype)initWithState:(FSTWatchTargetChangeState)state + targetIDs:(NSArray<NSNumber *> *)targetIDs + resumeToken:(NSData *)resumeToken + cause:(nullable NSError *)cause NS_DESIGNATED_INITIALIZER; + +- (instancetype)init NS_UNAVAILABLE; + +/** What kind of change occurred to the watch target. */ +@property(nonatomic, assign, readonly) FSTWatchTargetChangeState state; + +/** The target IDs that were added/removed/set. */ +@property(nonatomic, strong, readonly) NSArray<NSNumber *> *targetIDs; + +/** + * An opaque, server-assigned token that allows watching a query to be resumed after disconnecting + * without retransmitting all the data that matches the query. The resume token essentially + * identifies a point in time from which the server should resume sending results. + */ +@property(nonatomic, strong, readonly) NSData *resumeToken; + +/** An RPC error indicating why the watch failed. */ +@property(nonatomic, strong, readonly, nullable) NSError *cause; + +@end + +NS_ASSUME_NONNULL_END diff --git a/Firestore/Source/Remote/FSTWatchChange.m b/Firestore/Source/Remote/FSTWatchChange.m new file mode 100644 index 0000000..1ace26e --- /dev/null +++ b/Firestore/Source/Remote/FSTWatchChange.m @@ -0,0 +1,150 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "FSTWatchChange.h" + +#import "FSTDocument.h" +#import "FSTDocumentKey.h" +#import "FSTExistenceFilter.h" + +NS_ASSUME_NONNULL_BEGIN + +@implementation FSTWatchChange +@end + +@implementation FSTDocumentWatchChange + +- (instancetype)initWithUpdatedTargetIDs:(NSArray<NSNumber *> *)updatedTargetIDs + removedTargetIDs:(NSArray<NSNumber *> *)removedTargetIDs + documentKey:(FSTDocumentKey *)documentKey + document:(nullable FSTMaybeDocument *)document { + self = [super init]; + if (self) { + _updatedTargetIDs = updatedTargetIDs; + _removedTargetIDs = removedTargetIDs; + _documentKey = documentKey; + _document = document; + } + return self; +} + +- (BOOL)isEqual:(id)other { + if (other == self) { + return YES; + } + if (![other isMemberOfClass:[FSTDocumentWatchChange class]]) { + return NO; + } + + FSTDocumentWatchChange *otherChange = (FSTDocumentWatchChange *)other; + return [_updatedTargetIDs isEqual:otherChange.updatedTargetIDs] && + [_removedTargetIDs isEqual:otherChange.removedTargetIDs] && + [_documentKey isEqual:otherChange.documentKey] && + (_document == otherChange.document || [_document isEqual:otherChange.document]); +} + +- (NSUInteger)hash { + NSUInteger hash = self.updatedTargetIDs.hash; + hash = hash * 31 + self.removedTargetIDs.hash; + hash = hash * 31 + self.documentKey.hash; + hash = hash * 31 + self.document.hash; + return hash; +} + +@end + +@interface FSTExistenceFilterWatchChange () + +- (instancetype)initWithFilter:(FSTExistenceFilter *)filter + targetID:(FSTTargetID)targetID NS_DESIGNATED_INITIALIZER; + +@end + +@implementation FSTExistenceFilterWatchChange + ++ (instancetype)changeWithFilter:(FSTExistenceFilter *)filter targetID:(FSTTargetID)targetID { + return [[FSTExistenceFilterWatchChange alloc] initWithFilter:filter targetID:targetID]; +} + +- (instancetype)initWithFilter:(FSTExistenceFilter *)filter targetID:(FSTTargetID)targetID { + self = [super init]; + if (self) { + _filter = filter; + _targetID = targetID; + } + return self; +} + +- (BOOL)isEqual:(id)other { + if (other == self) { + return YES; + } + if (![other isMemberOfClass:[FSTExistenceFilterWatchChange class]]) { + return NO; + } + + FSTExistenceFilterWatchChange *otherChange = (FSTExistenceFilterWatchChange *)other; + return [_filter isEqual:otherChange->_filter] && _targetID == otherChange->_targetID; +} + +- (NSUInteger)hash { + return self.filter.hash; +} + +@end + +@implementation FSTWatchTargetChange + +- (instancetype)initWithState:(FSTWatchTargetChangeState)state + targetIDs:(NSArray<NSNumber *> *)targetIDs + resumeToken:(NSData *)resumeToken + cause:(nullable NSError *)cause { + self = [super init]; + if (self) { + _state = state; + _targetIDs = targetIDs; + _resumeToken = resumeToken; + _cause = cause; + } + return self; +} + +- (BOOL)isEqual:(id)other { + if (other == self) { + return YES; + } + if (![other isMemberOfClass:[FSTWatchTargetChange class]]) { + return NO; + } + + FSTWatchTargetChange *otherChange = (FSTWatchTargetChange *)other; + return _state == otherChange->_state && [_targetIDs isEqual:otherChange->_targetIDs] && + [_resumeToken isEqual:otherChange->_resumeToken] && + (_cause == otherChange->_cause || [_cause isEqual:otherChange->_cause]); +} + +- (NSUInteger)hash { + NSUInteger hash = (NSUInteger)self.state; + + hash = hash * 31 + self.targetIDs.hash; + hash = hash * 31 + self.resumeToken.hash; + hash = hash * 31 + self.cause.hash; + return hash; +} + +@end + +NS_ASSUME_NONNULL_END |