aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/Source/Remote
diff options
context:
space:
mode:
authorGravatar Gil <mcg@google.com>2017-10-03 08:55:22 -0700
committerGravatar GitHub <noreply@github.com>2017-10-03 08:55:22 -0700
commitbde743ed25166a0b320ae157bfb1d68064f531c9 (patch)
tree4dd7525d9df32fa5dbdb721d4b0d4f9b87f5e884 /Firestore/Source/Remote
parentbf550507ffa8beee149383a5bf1e2363bccefbb4 (diff)
Release 4.3.0 (#327)
Initial release of Firestore at 0.8.0 Bump FirebaseCommunity to 0.1.3
Diffstat (limited to 'Firestore/Source/Remote')
-rw-r--r--Firestore/Source/Remote/FSTBufferedWriter.h44
-rw-r--r--Firestore/Source/Remote/FSTBufferedWriter.m134
-rw-r--r--Firestore/Source/Remote/FSTDatastore.h365
-rw-r--r--Firestore/Source/Remote/FSTDatastore.m1027
-rw-r--r--Firestore/Source/Remote/FSTExistenceFilter.h31
-rw-r--r--Firestore/Source/Remote/FSTExistenceFilter.m53
-rw-r--r--Firestore/Source/Remote/FSTExponentialBackoff.h79
-rw-r--r--Firestore/Source/Remote/FSTExponentialBackoff.m97
-rw-r--r--Firestore/Source/Remote/FSTRemoteEvent.h213
-rw-r--r--Firestore/Source/Remote/FSTRemoteEvent.m516
-rw-r--r--Firestore/Source/Remote/FSTRemoteStore.h143
-rw-r--r--Firestore/Source/Remote/FSTRemoteStore.m599
-rw-r--r--Firestore/Source/Remote/FSTSerializerBeta.h110
-rw-r--r--Firestore/Source/Remote/FSTSerializerBeta.m1084
-rw-r--r--Firestore/Source/Remote/FSTWatchChange.h118
-rw-r--r--Firestore/Source/Remote/FSTWatchChange.m150
16 files changed, 4763 insertions, 0 deletions
diff --git a/Firestore/Source/Remote/FSTBufferedWriter.h b/Firestore/Source/Remote/FSTBufferedWriter.h
new file mode 100644
index 0000000..83fada6
--- /dev/null
+++ b/Firestore/Source/Remote/FSTBufferedWriter.h
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import <Foundation/Foundation.h>
+#import <RxLibrary/GRXWriteable.h>
+#import <RxLibrary/GRXWriter.h>
+
+NS_ASSUME_NONNULL_BEGIN
+
+/**
+ * A buffered GRXWriter.
+ *
+ * GRPC only allows a single message to be written to a channel at a time. While the channel is
+ * sending, GRPC sets the state of the GRXWriter representing the request stream to
+ * GRXWriterStatePaused. Once the channel is ready to accept more messages GRPC sets the state of
+ * the writer to GRXWriterStateStarted.
+ *
+ * This class is NOT thread safe, even though it is accessed from multiple threads. To conform with
+ * the contract GRPC uses, all method calls on the FSTBufferedWriter must be @synchronized on the
+ * receiver.
+ */
+@interface FSTBufferedWriter : GRXWriter <GRXWriteable>
+
+/**
+ * Writes a message into the buffer. Must be called inside an @synchronized block on the receiver.
+ */
+- (void)writeValue:(id)value;
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTBufferedWriter.m b/Firestore/Source/Remote/FSTBufferedWriter.m
new file mode 100644
index 0000000..d86e03a
--- /dev/null
+++ b/Firestore/Source/Remote/FSTBufferedWriter.m
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import <Protobuf/GPBProtocolBuffers.h>
+
+#import "FSTBufferedWriter.h"
+
+NS_ASSUME_NONNULL_BEGIN
+
+@implementation FSTBufferedWriter {
+ GRXWriterState _state;
+ NSMutableArray<NSData *> *_queue;
+
+ id<GRXWriteable> _writeable;
+}
+
+- (instancetype)init {
+ if (self = [super init]) {
+ _state = GRXWriterStateNotStarted;
+ _queue = [[NSMutableArray alloc] init];
+ }
+ return self;
+}
+
+#pragma mark - GRXWriteable implementation
+
+/** Push the next value of the sequence to the receiving object. */
+- (void)writeValue:(id)value {
+ if (_state == GRXWriterStateStarted && _queue.count == 0) {
+ // Skip the queue.
+ [_writeable writeValue:value];
+ } else {
+ // Buffer the new value. Note that the value is assumed to be transient and doesn't need to
+ // be copied.
+ [_queue addObject:value];
+ }
+}
+
+/**
+ * Signal that the sequence is completed, or that an error ocurred. After this message is sent to
+ * the receiver, neither it nor writeValue: may be called again.
+ */
+- (void)writesFinishedWithError:(nullable NSError *)error {
+ // Unimplemented. If we ever wanted to implement sender-side initiated half close we could do so
+ // by buffering (or sending) and error.
+ [self doesNotRecognizeSelector:_cmd];
+}
+
+#pragma mark GRXWriter implementation
+// The GRXWriter implementation defines the send side of the RPC stream. Once the RPC is ready it
+// will call startWithWriteable passing a GRXWriteable into which requests can be written but only
+// when the GRXWriter is in the started state.
+
+/**
+ * Called by GRPCCall when it is ready to accept for the first request. Requests should be written
+ * to the passed writeable.
+ *
+ * GRPCCall will synchronize on the receiver around this call.
+ */
+- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ _state = GRXWriterStateStarted;
+ _writeable = writeable;
+}
+
+/**
+ * Called by GRPCCall to implement flow control on the sending side of the stream. After each
+ * writeValue: on the requestsWriteable, GRPCCall will call setState:GRXWriterStatePaused to apply
+ * backpressure. Once the stream is ready to accept another message, GRPCCall will call
+ * setState:GRXWriterStateStarted.
+ *
+ * GRPCCall will synchronize on the receiver around this call.
+ */
+- (void)setState:(GRXWriterState)newState {
+ // Manual transitions are only allowed from the started or paused states.
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
+ return;
+ }
+
+ switch (newState) {
+ case GRXWriterStateFinished:
+ _state = newState;
+ // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
+ // writeable to be messaged anymore.
+ _queue = nil;
+ _writeable = nil;
+ return;
+ case GRXWriterStatePaused:
+ _state = newState;
+ return;
+ case GRXWriterStateStarted:
+ if (_state == GRXWriterStatePaused) {
+ _state = newState;
+ [self writeBufferedMessages];
+ }
+ return;
+ case GRXWriterStateNotStarted:
+ return;
+ }
+}
+
+- (void)finishWithError:(nullable NSError *)error {
+ [_writeable writesFinishedWithError:error];
+ self.state = GRXWriterStateFinished;
+}
+
+- (void)writeBufferedMessages {
+ while (_state == GRXWriterStateStarted && _queue.count > 0) {
+ id value = _queue[0];
+ [_queue removeObjectAtIndex:0];
+
+ // In addition to writing the value here GRPC will apply backpressure by pausing the GRXWriter
+ // wrapping this buffer. That writer must call -pauseMessages which will cause this loop to
+ // exit. Synchronization is not required since the callback happens within the body of the
+ // writeValue implementation.
+ [_writeable writeValue:value];
+ }
+}
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTDatastore.h b/Firestore/Source/Remote/FSTDatastore.h
new file mode 100644
index 0000000..840d2fe
--- /dev/null
+++ b/Firestore/Source/Remote/FSTDatastore.h
@@ -0,0 +1,365 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import <Foundation/Foundation.h>
+
+#import "FSTTypes.h"
+
+@class FSTDatabaseInfo;
+@class FSTDocumentKey;
+@class FSTDispatchQueue;
+@class FSTMutation;
+@class FSTMutationResult;
+@class FSTQueryData;
+@class FSTSnapshotVersion;
+@class FSTWatchChange;
+@class FSTWatchStream;
+@class FSTWriteStream;
+@class GRPCCall;
+@class GRXWriter;
+
+@protocol FSTCredentialsProvider;
+@protocol FSTWatchStreamDelegate;
+@protocol FSTWriteStreamDelegate;
+
+NS_ASSUME_NONNULL_BEGIN
+
+/**
+ * FSTDatastore represents a proxy for the remote server, hiding details of the RPC layer. It:
+ *
+ * - Manages connections to the server
+ * - Authenticates to the server
+ * - Manages threading and keeps higher-level code running on the worker queue
+ * - Serializes internal model objects to and from protocol buffers
+ *
+ * The FSTDatastore is generally not responsible for understanding the higher-level protocol
+ * involved in actually making changes or reading data, and aside from the connections it manages
+ * is otherwise stateless.
+ */
+@interface FSTDatastore : NSObject
+
+/** Creates a new Datastore instance with the given database info. */
++ (instancetype)datastoreWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials;
+
+- (instancetype)init __attribute__((unavailable("Use a static constructor method.")));
+
+- (instancetype)initWithDatabaseInfo:(FSTDatabaseInfo *)databaseInfo
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ NS_DESIGNATED_INITIALIZER;
+
+/** Converts the error to a FIRFirestoreErrorDomain error. */
++ (NSError *)firestoreErrorForError:(NSError *)error;
+
+/** Returns YES if the given error indicates the RPC associated with it may not be retried. */
++ (BOOL)isPermanentWriteError:(NSError *)error;
+
+/** Returns YES if the given error is a GRPC ABORTED error. **/
++ (BOOL)isAbortedError:(NSError *)error;
+
+/** Looks up a list of documents in datastore. */
+- (void)lookupDocuments:(NSArray<FSTDocumentKey *> *)keys
+ completion:(FSTVoidMaybeDocumentArrayErrorBlock)completion;
+
+/** Commits data to datastore. */
+- (void)commitMutations:(NSArray<FSTMutation *> *)mutations
+ completion:(FSTVoidErrorBlock)completion;
+
+/** Creates a new watch stream. */
+- (FSTWatchStream *)createWatchStreamWithDelegate:(id<FSTWatchStreamDelegate>)delegate;
+
+/** Creates a new write stream. */
+- (FSTWriteStream *)createWriteStreamWithDelegate:(id<FSTWriteStreamDelegate>)delegate;
+
+/** The name of the database and the backend. */
+@property(nonatomic, strong, readonly) FSTDatabaseInfo *databaseInfo;
+
+@end
+
+/**
+ * An FSTStream is an abstract base class that represents a restartable streaming RPC to the
+ * Firestore backend. It's built on top of GRPC's own support for streaming RPCs, and adds several
+ * critical features for our clients:
+ *
+ * - Restarting a stream is allowed (after failure)
+ * - Exponential backoff on failure (independent of the underlying channel)
+ * - Authentication via FSTCredentialsProvider
+ * - Dispatching all callbacks into the shared worker queue
+ *
+ * Subclasses of FSTStream implement serialization of models to and from bytes (via protocol
+ * buffers) for a specific streaming RPC and emit events specific to the stream.
+ *
+ * ## Starting and Stopping
+ *
+ * Streaming RPCs are stateful and need to be started before messages can be sent and received.
+ * The FSTStream will call its delegate's specific streamDidOpen method once the stream is ready
+ * to accept requests.
+ *
+ * Should a `start` fail, FSTStream will call its delegate's specific streamDidClose method with an
+ * NSError indicating what went wrong. The delegate is free to call start again.
+ *
+ * An FSTStream can also be explicitly stopped which indicates that the caller has discarded the
+ * stream and no further events should be emitted. Once explicitly stopped, a stream cannot be
+ * restarted.
+ *
+ * ## Subclassing Notes
+ *
+ * An implementation of FSTStream needs to implement the following methods:
+ * - `createRPCWithRequestsWriter`, should create the specific RPC (a GRPCCall object).
+ * - `handleStreamOpen`, should call through to the stream-specific streamDidOpen method.
+ * - `handleStreamMessage`, receives protocol buffer responses from GRPC and must deserialize and
+ * delegate to some stream specific response method.
+ * - `handleStreamClose`, calls through to the stream-specific streamDidClose method.
+ *
+ * Additionally, beyond these required methods, subclasses will want to implement methods that
+ * take request models, serialize them, and write them to using writeRequest:.
+ *
+ * ## RPC Message Type
+ *
+ * FSTStream intentionally uses the GRPCCall interface to GRPC directly, bypassing both GRPCProtoRPC
+ * and GRXBufferedPipe for sending data. This has been done to avoid race conditions that come out
+ * of a loosely specified locking contract on GRXWriter. There's essentially no way to safely use
+ * any of the wrapper objects for GRXWriter (that perform buffering or conversion to/from protos).
+ *
+ * See https://github.com/grpc/grpc/issues/10957 for the kinds of things we're trying to avoid.
+ */
+@interface FSTStream : NSObject
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass NS_DESIGNATED_INITIALIZER;
+
+- (instancetype)init NS_UNAVAILABLE;
+
+/**
+ * An abstract method used by `start` to create a streaming RPC specific to this type of stream.
+ * The RPC should be created such that requests are taken from `self`.
+ *
+ * Note that the returned GRPCCall must not be a GRPCProtoRPC, since the rest of the streaming
+ * mechanism assumes it is dealing in bytes-level requests and responses.
+ */
+- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter;
+
+/**
+ * Returns YES if `start` has been called and no error has occurred. YES indicates the stream is
+ * open or in the process of opening (which encompasses respecting backoff, getting auth tokens,
+ * and starting the actual RPC). Use `isOpen` to determine if the stream is open and ready for
+ * outbound requests.
+ */
+- (BOOL)isStarted;
+
+/** Returns YES if the underlying RPC is open and the stream is ready for outbound requests. */
+- (BOOL)isOpen;
+
+/**
+ * Starts the RPC. Only allowed if isStarted returns NO. The stream is not immediately ready for
+ * use: the delegate's watchStreamDidOpen method will be invoked when the RPC is ready for outbound
+ * requests, at which point `isOpen` will return YES.
+ *
+ * When start returns, -isStarted will return YES.
+ */
+- (void)start;
+
+/**
+ * Stops the RPC. This call is idempotent and allowed regardless of the current isStarted state.
+ *
+ * Unlike a transient stream close, stopping a stream is permanent. This is guaranteed NOT to emit
+ * any further events on the stream-specific delegate, including the streamDidClose method.
+ *
+ * NOTE: This no-events contract may seem counter-intuitive but allows the caller to
+ * straightforwardly sequence stream tear-down without having to worry about when the delegate's
+ * streamDidClose methods will get called. For example if the stream must be exchanged for another
+ * during a user change this allows `stop` to be called eagerly without worrying about the
+ * streamDidClose method accidentally restarting the stream before the new one is ready.
+ *
+ * When stop returns, -isStarted and -isOpen will both return NO.
+ */
+- (void)stop;
+
+/**
+ * After an error the stream will usually back off on the next attempt to start it. If the error
+ * warrants an immediate restart of the stream, the sender can use this to indicate that the
+ * receiver should not back off.
+ *
+ * Each error will call the stream-specific streamDidClose method. That method can decide to
+ * inhibit backoff if required.
+ */
+- (void)inhibitBackoff;
+
+@end
+
+#pragma mark - FSTWatchStream
+
+/** A protocol defining the events that can be emitted by the FSTWatchStream. */
+@protocol FSTWatchStreamDelegate <NSObject>
+
+/** Called by the FSTWatchStream when it is ready to accept outbound request messages. */
+- (void)watchStreamDidOpen;
+
+/**
+ * Called by the FSTWatchStream with changes and the snapshot versions included in in the
+ * WatchChange responses sent back by the server.
+ */
+- (void)watchStreamDidChange:(FSTWatchChange *)change
+ snapshotVersion:(FSTSnapshotVersion *)snapshotVersion;
+
+/**
+ * Called by the FSTWatchStream when the underlying streaming RPC is closed for whatever reason,
+ * usually because of an error, but possibly due to an idle timeout. The error passed to this
+ * method may be nil, in which case the stream was closed without attributable fault.
+ *
+ * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping"
+ * on FSTStream for details.
+ */
+- (void)watchStreamDidClose:(NSError *_Nullable)error;
+
+@end
+
+/**
+ * An FSTStream that implements the StreamingWatch RPC.
+ *
+ * Once the FSTWatchStream has called the streamDidOpen method, any number of watchQuery and
+ * unwatchTargetId calls can be sent to control what changes will be sent from the server for
+ * WatchChanges.
+ */
+@interface FSTWatchStream : FSTStream
+
+/**
+ * Initializes the watch stream with its dependencies.
+ */
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass
+ delegate:(id<FSTWatchStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
+
+- (instancetype)init NS_UNAVAILABLE;
+
+/**
+ * Registers interest in the results of the given query. If the query includes a resumeToken it
+ * will be included in the request. Results that affect the query will be streamed back as
+ * WatchChange messages that reference the targetID included in |query|.
+ */
+- (void)watchQuery:(FSTQueryData *)query;
+
+/** Unregisters interest in the results of the query associated with the given target ID. */
+- (void)unwatchTargetID:(FSTTargetID)targetID;
+
+@property(nonatomic, weak, readonly) id<FSTWatchStreamDelegate> delegate;
+
+@end
+
+#pragma mark - FSTWriteStream
+
+@protocol FSTWriteStreamDelegate <NSObject>
+
+/** Called by the FSTWriteStream when it is ready to accept outbound request messages. */
+- (void)writeStreamDidOpen;
+
+/**
+ * Called by the FSTWriteStream upon a successful handshake response from the server, which is the
+ * receiver's cue to send any pending writes.
+ */
+- (void)writeStreamDidCompleteHandshake;
+
+/**
+ * Called by the FSTWriteStream upon receiving a StreamingWriteResponse from the server that
+ * contains mutation results.
+ */
+- (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion
+ mutationResults:(NSArray<FSTMutationResult *> *)results;
+
+/**
+ * Called when the FSTWriteStream's underlying RPC is closed for whatever reason, usually because
+ * of an error, but possibly due to an idle timeout. The error passed to this method may be nil, in
+ * which case the stream was closed without attributable fault.
+ *
+ * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping"
+ * on FSTStream for details.
+ */
+- (void)writeStreamDidClose:(NSError *_Nullable)error;
+
+@end
+
+/**
+ * An FSTStream that implements the StreamingWrite RPC.
+ *
+ * The StreamingWrite RPC requires the caller to maintain special `streamToken` state in between
+ * calls, to help the server understand which responses the client has processed by the time the
+ * next request is made. Every response may contain a `streamToken`; this value must be passed to
+ * the next request.
+ *
+ * After calling `start` on this stream, the next request must be a handshake, containing whatever
+ * streamToken is on hand. Once a response to this request is received, all pending mutations may
+ * be submitted. When submitting multiple batches of mutations at the same time, it's okay to use
+ * the same streamToken for the calls to `writeMutations:`.
+ */
+@interface FSTWriteStream : FSTStream
+
+/**
+ * Initializes the write stream with its dependencies.
+ */
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass
+ delegate:(id<FSTWriteStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
+
+- (instancetype)init NS_UNAVAILABLE;
+
+/**
+ * Sends an initial streamToken to the server, performing the handshake required to make the
+ * StreamingWrite RPC work. Subsequent `writeMutations:` calls should wait until a response has
+ * been delivered to the delegate's writeStreamDidCompleteHandshake method.
+ */
+- (void)writeHandshake;
+
+/** Sends a group of mutations to the Firestore backend to apply. */
+- (void)writeMutations:(NSArray<FSTMutation *> *)mutations;
+
+@property(nonatomic, weak, readonly) id<FSTWriteStreamDelegate> delegate;
+
+/**
+ * Tracks whether or not a handshake has been successfully exchanged and the stream is ready to
+ * accept mutations.
+ */
+@property(nonatomic, assign, readwrite, getter=isHandshakeComplete) BOOL handshakeComplete;
+
+/**
+ * The last received stream token from the server, used to acknowledge which responses the client
+ * has processed. Stream tokens are opaque checkpoint markers whose only real value is their
+ * inclusion in the next request.
+ *
+ * FSTWriteStream manages propagating this value from responses to the next request.
+ */
+@property(nonatomic, strong, nullable) NSData *lastStreamToken;
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTDatastore.m b/Firestore/Source/Remote/FSTDatastore.m
new file mode 100644
index 0000000..3ed2729
--- /dev/null
+++ b/Firestore/Source/Remote/FSTDatastore.m
@@ -0,0 +1,1027 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import "FSTDatastore.h"
+
+#import <GRPCClient/GRPCCall+OAuth2.h>
+#import <GRPCClient/GRPCCall.h>
+#import <ProtoRPC/ProtoRPC.h>
+
+#import "FIRFirestore+Internal.h"
+#import "FIRFirestoreErrors.h"
+#import "FIRFirestoreVersion.h"
+#import "FSTAssert.h"
+#import "FSTBufferedWriter.h"
+#import "FSTClasses.h"
+#import "FSTCredentialsProvider.h"
+#import "FSTDatabaseID.h"
+#import "FSTDatabaseInfo.h"
+#import "FSTDispatchQueue.h"
+#import "FSTDocument.h"
+#import "FSTDocumentKey.h"
+#import "FSTExponentialBackoff.h"
+#import "FSTLocalStore.h"
+#import "FSTLogger.h"
+#import "FSTMutation.h"
+#import "FSTQueryData.h"
+#import "FSTSerializerBeta.h"
+
+#import "Firestore.pbrpc.h"
+
+NS_ASSUME_NONNULL_BEGIN
+
+// GRPC does not publicly declare a means of disabling SSL, which we need for testing. Firestore
+// directly exposes an sslEnabled setting so this is required to plumb that through. Note that our
+// own tests depend on this working so we'll know if this changes upstream.
+@interface GRPCHost
++ (nullable instancetype)hostWithAddress:(NSString *)address;
+@property(nonatomic, getter=isSecure) BOOL secure;
+@end
+
+/**
+ * Initial backoff time in seconds after an error.
+ * Set to 1s according to https://cloud.google.com/apis/design/errors.
+ */
+static const NSTimeInterval kBackoffInitialDelay = 1;
+static const NSTimeInterval kBackoffMaxDelay = 60.0;
+static const double kBackoffFactor = 1.5;
+static NSString *const kXGoogAPIClientHeader = @"x-goog-api-client";
+static NSString *const kGoogleCloudResourcePrefix = @"google-cloud-resource-prefix";
+
+/** Function typedef used to create RPCs. */
+typedef GRPCProtoCall * (^RPCFactory)();
+
+#pragma mark - FSTStream
+
+/** The state of a stream. */
+typedef NS_ENUM(NSInteger, FSTStreamState) {
+ /**
+ * The streaming RPC is not running and there's no error condition. Calling `start` will
+ * start the stream immediately without backoff. While in this state -isStarted will return NO.
+ */
+ FSTStreamStateInitial = 0,
+
+ /**
+ * The stream is starting, and is waiting for an auth token to attach to the initial request.
+ * While in this state, isStarted will return YES but isOpen will return NO.
+ */
+ FSTStreamStateAuth,
+
+ /**
+ * The streaming RPC is up and running. Requests and responses can flow freely. Both
+ * isStarted and isOpen will return YES.
+ */
+ FSTStreamStateOpen,
+
+ /**
+ * The stream encountered an error. The next start attempt will back off. While in this state
+ * -isStarted will return NO.
+ */
+ FSTStreamStateError,
+
+ /**
+ * An in-between state after an error where the stream is waiting before re-starting. After
+ * waiting is complete, the stream will try to open. While in this state -isStarted will
+ * return YES but isOpen will return NO.
+ */
+ FSTStreamStateBackoff,
+
+ /**
+ * The stream has been explicitly stopped; no further events will be emitted.
+ */
+ FSTStreamStateStopped,
+};
+
+// We need to declare these classes first so that Datastore can alloc them.
+
+@interface FSTWatchStream ()
+
+/** The delegate that will receive events generated by the watch stream. */
+@property(nonatomic, weak, nullable) id<FSTWatchStreamDelegate> delegate;
+
+@end
+
+@interface FSTBetaWatchStream : FSTWatchStream
+
+/**
+ * Initializes the watch stream with its dependencies.
+ */
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ serializer:(FSTSerializerBeta *)serializer
+ delegate:(id<FSTWatchStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass
+ delegate:(id<FSTWatchStreamDelegate>)delegate NS_UNAVAILABLE;
+
+@end
+
+@interface FSTWriteStream ()
+
+@property(nonatomic, weak, nullable) id<FSTWriteStreamDelegate> delegate;
+
+@end
+
+@interface FSTBetaWriteStream : FSTWriteStream
+
+/**
+ * Initializes the write stream with its dependencies.
+ */
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ serializer:(FSTSerializerBeta *)serializer
+ delegate:(id<FSTWriteStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass
+ delegate:(id<FSTWriteStreamDelegate>)delegate NS_UNAVAILABLE;
+
+@end
+
+@interface FSTStream () <GRXWriteable>
+
+@property(nonatomic, strong, readonly) FSTDatabaseInfo *databaseInfo;
+@property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue;
+@property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials;
+@property(nonatomic, unsafe_unretained, readonly) Class responseMessageClass;
+@property(nonatomic, strong, readonly) FSTExponentialBackoff *backoff;
+
+/** A flag tracking whether the stream received a message from the backend. */
+@property(nonatomic, assign) BOOL messageReceived;
+
+/**
+ * Stream state as exposed to consumers of FSTStream. This differs from GRXWriter's notion of the
+ * state of the stream.
+ */
+@property(nonatomic, assign) FSTStreamState state;
+
+/** The RPC handle. Used for cancellation. */
+@property(nonatomic, strong, nullable) GRPCCall *rpc;
+
+/**
+ * The send-side of the RPC stream in which to submit requests, but only once the underlying RPC has
+ * started.
+ */
+@property(nonatomic, strong, nullable) FSTBufferedWriter *requestsWriter;
+
+@end
+
+#pragma mark - FSTDatastore
+
+@interface FSTDatastore ()
+
+/** The GRPC service for Firestore. */
+@property(nonatomic, strong, readonly) GCFSFirestore *service;
+
+@property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue;
+
+/** An object for getting an auth token before each request. */
+@property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials;
+
+@property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
+
+@end
+
+@implementation FSTDatastore
+
++ (instancetype)datastoreWithDatabase:(FSTDatabaseInfo *)databaseInfo
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials {
+ return [[FSTDatastore alloc] initWithDatabaseInfo:databaseInfo
+ workerDispatchQueue:workerDispatchQueue
+ credentials:credentials];
+}
+
+- (instancetype)initWithDatabaseInfo:(FSTDatabaseInfo *)databaseInfo
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials {
+ if (self = [super init]) {
+ _databaseInfo = databaseInfo;
+ if (!databaseInfo.isSSLEnabled) {
+ GRPCHost *hostConfig = [GRPCHost hostWithAddress:databaseInfo.host];
+ hostConfig.secure = NO;
+ }
+ _service = [GCFSFirestore serviceWithHost:databaseInfo.host];
+ _workerDispatchQueue = workerDispatchQueue;
+ _credentials = credentials;
+ _serializer = [[FSTSerializerBeta alloc] initWithDatabaseID:databaseInfo.databaseID];
+ }
+ return self;
+}
+
+- (NSString *)description {
+ return [NSString stringWithFormat:@"<FSTDatastore: %@>", self.databaseInfo];
+}
+
+/**
+ * Converts the error to an error within the domain FIRFirestoreErrorDomain.
+ */
++ (NSError *)firestoreErrorForError:(NSError *)error {
+ if (!error) {
+ return error;
+ } else if ([error.domain isEqualToString:FIRFirestoreErrorDomain]) {
+ return error;
+ } else if ([error.domain isEqualToString:kGRPCErrorDomain]) {
+ FSTAssert(error.code >= GRPCErrorCodeCancelled && error.code <= GRPCErrorCodeUnauthenticated,
+ @"Unknown GRPC error code: %ld", (long)error.code);
+ return
+ [NSError errorWithDomain:FIRFirestoreErrorDomain code:error.code userInfo:error.userInfo];
+ } else {
+ return [NSError errorWithDomain:FIRFirestoreErrorDomain
+ code:FIRFirestoreErrorCodeUnknown
+ userInfo:@{NSUnderlyingErrorKey : error}];
+ }
+}
+
++ (BOOL)isAbortedError:(NSError *)error {
+ FSTAssert([error.domain isEqualToString:FIRFirestoreErrorDomain],
+ @"isAbortedError: only works with errors emitted by FSTDatastore.");
+ return error.code == FIRFirestoreErrorCodeAborted;
+}
+
++ (BOOL)isPermanentWriteError:(NSError *)error {
+ FSTAssert([error.domain isEqualToString:FIRFirestoreErrorDomain],
+ @"isPerminanteWriteError: only works with errors emitted by FSTDatastore.");
+ switch (error.code) {
+ case FIRFirestoreErrorCodeCancelled:
+ case FIRFirestoreErrorCodeUnknown:
+ case FIRFirestoreErrorCodeDeadlineExceeded:
+ case FIRFirestoreErrorCodeResourceExhausted:
+ case FIRFirestoreErrorCodeInternal:
+ case FIRFirestoreErrorCodeUnavailable:
+ case FIRFirestoreErrorCodeUnauthenticated:
+ // Unauthenticated means something went wrong with our token and we need
+ // to retry with new credentials which will happen automatically.
+ // TODO(b/37325376): Give up after second unauthenticated error.
+ return NO;
+ case FIRFirestoreErrorCodeInvalidArgument:
+ case FIRFirestoreErrorCodeNotFound:
+ case FIRFirestoreErrorCodeAlreadyExists:
+ case FIRFirestoreErrorCodePermissionDenied:
+ case FIRFirestoreErrorCodeFailedPrecondition:
+ case FIRFirestoreErrorCodeAborted:
+ // Aborted might be retried in some scenarios, but that is dependant on
+ // the context and should handled individually by the calling code.
+ // See https://cloud.google.com/apis/design/errors
+ case FIRFirestoreErrorCodeOutOfRange:
+ case FIRFirestoreErrorCodeUnimplemented:
+ case FIRFirestoreErrorCodeDataLoss:
+ default:
+ return YES;
+ }
+}
+
+/** Returns the string to be used as x-goog-api-client header value. */
++ (NSString *)googAPIClientHeaderValue {
+ // TODO(dimond): This should ideally also include the grpc version, however, gRPC defines the
+ // version as a macro, so it would be hardcoded based on version we have at compile time of
+ // the Firestore library, rather than the version available at runtime/at compile time by the
+ // user of the library.
+ return [NSString stringWithFormat:@"gl-objc/ fire/%s grpc/", FirebaseFirestoreVersionString];
+}
+
+/** Returns the string to be used as google-cloud-resource-prefix header value. */
++ (NSString *)googleCloudResourcePrefixForDatabaseID:(FSTDatabaseID *)databaseID {
+ return [NSString
+ stringWithFormat:@"projects/%@/databases/%@", databaseID.projectID, databaseID.databaseID];
+}
+/**
+ * Takes a dictionary of (HTTP) response headers and returns the set of whitelisted headers
+ * (for logging purposes).
+ */
++ (NSDictionary<NSString *, NSString *> *)extractWhiteListedHeaders:
+ (NSDictionary<NSString *, NSString *> *)headers {
+ NSMutableDictionary<NSString *, NSString *> *whiteListedHeaders =
+ [NSMutableDictionary dictionary];
+ NSArray<NSString *> *whiteList = @[
+ @"date", @"x-google-backends", @"x-google-netmon-label", @"x-google-service",
+ @"x-google-gfe-request-trace"
+ ];
+ [headers
+ enumerateKeysAndObjectsUsingBlock:^(NSString *headerName, NSString *headerValue, BOOL *stop) {
+ if ([whiteList containsObject:[headerName lowercaseString]]) {
+ whiteListedHeaders[headerName] = headerValue;
+ }
+ }];
+ return whiteListedHeaders;
+}
+
+/** Logs the (whitelisted) headers returned for an GRPCProtoCall RPC. */
++ (void)logHeadersForRPC:(GRPCProtoCall *)rpc RPCName:(NSString *)rpcName {
+ if ([FIRFirestore isLoggingEnabled]) {
+ FSTLog(@"RPC %@ returned headers (whitelisted): %@", rpcName,
+ [FSTDatastore extractWhiteListedHeaders:rpc.responseHeaders]);
+ }
+}
+
+- (void)commitMutations:(NSArray<FSTMutation *> *)mutations
+ completion:(FSTVoidErrorBlock)completion {
+ GCFSCommitRequest *request = [GCFSCommitRequest message];
+ request.database = [self.serializer encodedDatabaseID];
+
+ NSMutableArray<GCFSWrite *> *mutationProtos = [NSMutableArray array];
+ for (FSTMutation *mutation in mutations) {
+ [mutationProtos addObject:[self.serializer encodedMutation:mutation]];
+ }
+ request.writesArray = mutationProtos;
+
+ RPCFactory rpcFactory = ^GRPCProtoCall * {
+ __block GRPCProtoCall *rpc = [self.service
+ RPCToCommitWithRequest:request
+ handler:^(GCFSCommitResponse *response, NSError *_Nullable error) {
+ error = [FSTDatastore firestoreErrorForError:error];
+ [self.workerDispatchQueue dispatchAsync:^{
+ FSTLog(@"RPC CommitRequest completed. Error: %@", error);
+ [FSTDatastore logHeadersForRPC:rpc RPCName:@"CommitRequest"];
+ completion(error);
+ }];
+ }];
+ return rpc;
+ };
+
+ [self invokeRPCWithFactory:rpcFactory errorHandler:completion];
+}
+
+- (void)lookupDocuments:(NSArray<FSTDocumentKey *> *)keys
+ completion:(FSTVoidMaybeDocumentArrayErrorBlock)completion {
+ GCFSBatchGetDocumentsRequest *request = [GCFSBatchGetDocumentsRequest message];
+ request.database = [self.serializer encodedDatabaseID];
+ for (FSTDocumentKey *key in keys) {
+ [request.documentsArray addObject:[self.serializer encodedDocumentKey:key]];
+ }
+
+ __block FSTMaybeDocumentDictionary *results =
+ [FSTMaybeDocumentDictionary maybeDocumentDictionary];
+
+ RPCFactory rpcFactory = ^GRPCProtoCall * {
+ __block GRPCProtoCall *rpc = [self.service
+ RPCToBatchGetDocumentsWithRequest:request
+ eventHandler:^(BOOL done,
+ GCFSBatchGetDocumentsResponse *_Nullable response,
+ NSError *_Nullable error) {
+ error = [FSTDatastore firestoreErrorForError:error];
+ [self.workerDispatchQueue dispatchAsync:^{
+ if (error) {
+ FSTLog(@"RPC BatchGetDocuments completed. Error: %@", error);
+ [FSTDatastore logHeadersForRPC:rpc RPCName:@"BatchGetDocuments"];
+ completion(nil, error);
+ return;
+ }
+
+ if (!done) {
+ // Streaming response, accumulate result
+ FSTMaybeDocument *doc =
+ [self.serializer decodedMaybeDocumentFromBatch:response];
+ results = [results dictionaryBySettingObject:doc forKey:doc.key];
+ } else {
+ // Streaming response is done, call completion
+ FSTLog(@"RPC BatchGetDocuments completed successfully.");
+ [FSTDatastore logHeadersForRPC:rpc RPCName:@"BatchGetDocuments"];
+ FSTAssert(!response, @"Got response after done.");
+ NSMutableArray<FSTMaybeDocument *> *docs =
+ [NSMutableArray arrayWithCapacity:keys.count];
+ for (FSTDocumentKey *key in keys) {
+ [docs addObject:results[key]];
+ }
+ completion(docs, nil);
+ }
+ }];
+ }];
+ return rpc;
+ };
+
+ [self invokeRPCWithFactory:rpcFactory
+ errorHandler:^(NSError *_Nonnull error) {
+ error = [FSTDatastore firestoreErrorForError:error];
+ completion(nil, error);
+ }];
+}
+
+- (void)invokeRPCWithFactory:(GRPCProtoCall * (^)())rpcFactory
+ errorHandler:(FSTVoidErrorBlock)errorHandler {
+ // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token,
+ // but I'm not sure how to detect that right now. http://b/32762461
+ [self.credentials
+ getTokenForcingRefresh:NO
+ completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) {
+ error = [FSTDatastore firestoreErrorForError:error];
+ [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{
+ if (error) {
+ errorHandler(error);
+ } else {
+ GRPCProtoCall *rpc = rpcFactory();
+ [FSTDatastore prepareHeadersForRPC:rpc
+ databaseID:self.databaseInfo.databaseID
+ token:result.token];
+ [rpc start];
+ }
+ }];
+ }];
+}
+
+- (FSTWatchStream *)createWatchStreamWithDelegate:(id<FSTWatchStreamDelegate>)delegate {
+ return [[FSTBetaWatchStream alloc] initWithDatabase:_databaseInfo
+ workerDispatchQueue:_workerDispatchQueue
+ credentials:_credentials
+ serializer:_serializer
+ delegate:delegate];
+}
+
+- (FSTWriteStream *)createWriteStreamWithDelegate:(id<FSTWriteStreamDelegate>)delegate {
+ return [[FSTBetaWriteStream alloc] initWithDatabase:_databaseInfo
+ workerDispatchQueue:_workerDispatchQueue
+ credentials:_credentials
+ serializer:_serializer
+ delegate:delegate];
+}
+
+/** Adds headers to the RPC including any OAuth access token if provided .*/
++ (void)prepareHeadersForRPC:(GRPCCall *)rpc
+ databaseID:(FSTDatabaseID *)databaseID
+ token:(nullable NSString *)token {
+ rpc.oauth2AccessToken = token;
+ rpc.requestHeaders[kXGoogAPIClientHeader] = [FSTDatastore googAPIClientHeaderValue];
+ // This header is used to improve routing and project isolation by the backend.
+ rpc.requestHeaders[kGoogleCloudResourcePrefix] =
+ [FSTDatastore googleCloudResourcePrefixForDatabaseID:databaseID];
+}
+
+@end
+
+#pragma mark - FSTStream
+
+@implementation FSTStream
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass {
+ if (self = [super init]) {
+ _databaseInfo = database;
+ _workerDispatchQueue = workerDispatchQueue;
+ _credentials = credentials;
+ _responseMessageClass = responseMessageClass;
+
+ _backoff = [FSTExponentialBackoff exponentialBackoffWithDispatchQueue:workerDispatchQueue
+ initialDelay:kBackoffInitialDelay
+ backoffFactor:kBackoffFactor
+ maxDelay:kBackoffMaxDelay];
+ _state = FSTStreamStateInitial;
+ }
+ return self;
+}
+
+- (BOOL)isStarted {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ FSTStreamState state = self.state;
+ return state == FSTStreamStateBackoff || state == FSTStreamStateAuth ||
+ state == FSTStreamStateOpen;
+}
+
+- (BOOL)isOpen {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ return self.state == FSTStreamStateOpen;
+}
+
+- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
+ @throw FSTAbstractMethodException(); // NOLINT
+}
+
+- (void)start {
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ if (self.state == FSTStreamStateError) {
+ [self performBackoff];
+ return;
+ }
+
+ FSTLog(@"%@ %p start", NSStringFromClass([self class]), (__bridge void *)self);
+ FSTAssert(self.state == FSTStreamStateInitial, @"Already started");
+
+ self.state = FSTStreamStateAuth;
+
+ [self.credentials
+ getTokenForcingRefresh:NO
+ completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) {
+ error = [FSTDatastore firestoreErrorForError:error];
+ [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{
+ [self resumeStartWithToken:result error:error];
+ }];
+ }];
+}
+
+/** Add an access token to our RPC, after obtaining one from the credentials provider. */
+- (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error {
+ if (self.state == FSTStreamStateStopped) {
+ // Streams can be stopped while waiting for authorization.
+ return;
+ }
+
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+ FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
+ (long)self.state);
+
+ // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token,
+ // but I'm not sure how to detect that right now. http://b/32762461
+ if (error) {
+ // RPC has not been started yet, so just invoke higher-level close handler.
+ [self handleStreamClose:error];
+ return;
+ }
+
+ self.requestsWriter = [[FSTBufferedWriter alloc] init];
+ _rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
+ [FSTDatastore prepareHeadersForRPC:_rpc
+ databaseID:self.databaseInfo.databaseID
+ token:token.token];
+ [_rpc startWithWriteable:self];
+
+ self.state = FSTStreamStateOpen;
+ [self handleStreamOpen];
+}
+
+/** Backs off after an error. */
+- (void)performBackoff {
+ FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self);
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ FSTAssert(self.state == FSTStreamStateError, @"Should only perform backoff in an error case");
+ self.state = FSTStreamStateBackoff;
+
+ FSTWeakify(self);
+ [self.backoff backoffAndRunBlock:^{
+ FSTStrongify(self);
+ [self resumeStartFromBackoff];
+ }];
+}
+
+/** Resumes stream start after backing off. */
+- (void)resumeStartFromBackoff {
+ if (self.state == FSTStreamStateStopped) {
+ // Streams can be stopped while waiting for backoff to complete.
+ return;
+ }
+
+ // In order to have performed a backoff the stream must have been in an error state just prior
+ // to entering the backoff state. If we weren't stopped we must be in the backoff state.
+ FSTAssert(self.state == FSTStreamStateBackoff, @"State should still be backoff (was %ld)",
+ (long)self.state);
+
+ // Momentarily set state to FSTStreamStateInitial as `start` expects it.
+ self.state = FSTStreamStateInitial;
+ [self start];
+ FSTAssert([self isStarted], @"Stream should have started.");
+}
+
+- (void)stop {
+ FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self);
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ // Prevent any possible future restart of this stream.
+ self.state = FSTStreamStateStopped;
+
+ // Close the stream client side.
+ FSTBufferedWriter *requestsWriter = self.requestsWriter;
+ @synchronized(requestsWriter) {
+ [requestsWriter finishWithError:nil];
+ }
+}
+
+- (void)inhibitBackoff {
+ FSTAssert(![self isStarted], @"Can only inhibit backoff after an error (was %ld)",
+ (long)self.state);
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ // Clear the error condition.
+ self.state = FSTStreamStateInitial;
+ [self.backoff reset];
+}
+
+/**
+ * Parses a protocol buffer response from the server. If the message fails to parse, generates
+ * an error and closes the stream.
+ *
+ * @param protoClass A protocol buffer message class object, that responds to parseFromData:error:.
+ * @param data The bytes in the response as returned from GRPC.
+ * @return An instance of the protocol buffer message, parsed from the data if parsing was
+ * successful, or nil otherwise.
+ */
+- (nullable id)parseProto:(Class)protoClass data:(NSData *)data error:(NSError **)error {
+ NSError *parseError;
+ id parsed = [protoClass parseFromData:data error:&parseError];
+ if (parsed) {
+ *error = nil;
+ return parsed;
+ } else {
+ NSDictionary *info = @{
+ NSLocalizedDescriptionKey : @"Unable to parse response from the server",
+ NSUnderlyingErrorKey : parseError,
+ @"Expected class" : protoClass,
+ @"Received value" : data,
+ };
+ *error = [NSError errorWithDomain:FIRFirestoreErrorDomain
+ code:FIRFirestoreErrorCodeInternal
+ userInfo:info];
+ return nil;
+ }
+}
+
+/**
+ * Writes a request proto into the stream.
+ */
+- (void)writeRequest:(GPBMessage *)request {
+ NSData *data = [request data];
+
+ FSTBufferedWriter *requestsWriter = self.requestsWriter;
+ @synchronized(requestsWriter) {
+ [requestsWriter writeValue:data];
+ }
+}
+
+#pragma mark Template methods for subclasses
+
+/**
+ * Called by the stream after the stream has been successfully connected, authenticated, and is now
+ * ready to accept messages.
+ *
+ * Subclasses should relay to their stream-specific delegate. Calling [super handleStreamOpen] is
+ * not required.
+ */
+- (void)handleStreamOpen {
+}
+
+/**
+ * Called by the stream for each incoming protocol message coming from the server.
+ *
+ * Subclasses should implement this to deserialize the value and relay to their stream-specific
+ * delegate, if appropriate. Calling [super handleStreamMessage] is not required.
+ */
+- (void)handleStreamMessage:(id)value {
+}
+
+/**
+ * Called by the stream when the underlying RPC has been closed for whatever reason.
+ *
+ * Subclasses should first call [super handleStreamClose:] and then call to their
+ * stream-specific delegate.
+ */
+- (void)handleStreamClose:(NSError *_Nullable)error {
+ FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);
+ FSTAssert([self isStarted], @"Can't handle server close in non-started state.");
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ self.messageReceived = NO;
+ self.rpc = nil;
+ self.requestsWriter = nil;
+
+ // In theory the stream could close cleanly, however, in our current model we never expect this
+ // to happen because if we stop a stream ourselves, this callback will never be called. To
+ // prevent cases where we retry without a backoff accidentally, we set the stream to error
+ // in all cases.
+ self.state = FSTStreamStateError;
+
+ if (error.code == FIRFirestoreErrorCodeResourceExhausted) {
+ FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class],
+ (__bridge void *)self);
+ [self.backoff resetToMax];
+ }
+}
+
+#pragma mark GRXWriteable implementation
+// The GRXWriteable implementation defines the receive side of the RPC stream.
+
+/**
+ * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately
+ * redispatch back onto our own worker queue.
+ */
+- (void)writeValue:(id)value __used {
+ // TODO(mcg): remove the double-dispatch once GRPCCall at head is released.
+ // Once released we can set the responseDispatchQueue property on the GRPCCall and then this
+ // method can call handleStreamMessage directly.
+ FSTWeakify(self);
+ [self.workerDispatchQueue dispatchAsync:^{
+ FSTStrongify(self);
+ if (!self || self.state == FSTStreamStateStopped) {
+ return;
+ }
+ if (!self.messageReceived) {
+ self.messageReceived = YES;
+ if ([FIRFirestore isLoggingEnabled]) {
+ FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
+ (__bridge void *)self,
+ [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
+ }
+ }
+ NSError *error;
+ id proto = [self parseProto:self.responseMessageClass data:value error:&error];
+ if (proto) {
+ [self handleStreamMessage:proto];
+ } else {
+ [_rpc finishWithError:error];
+ }
+ }];
+}
+
+/**
+ * Called by GRPC when it closed the stream with an error representing the final state of the
+ * stream.
+ *
+ * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to
+ * directly inform stream-specific logic, or call stop to tear down the stream.
+ */
+- (void)writesFinishedWithError:(NSError *_Nullable)error __used {
+ error = [FSTDatastore firestoreErrorForError:error];
+ FSTWeakify(self);
+ [self.workerDispatchQueue dispatchAsync:^{
+ FSTStrongify(self);
+ if (!self || self.state == FSTStreamStateStopped) {
+ return;
+ }
+ [self handleStreamClose:error];
+ }];
+}
+
+@end
+
+#pragma mark - FSTWatchStream
+
+@implementation FSTWatchStream
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass
+ delegate:(id<FSTWatchStreamDelegate>)delegate {
+ self = [super initWithDatabase:database
+ workerDispatchQueue:workerDispatchQueue
+ credentials:credentials
+ responseMessageClass:responseMessageClass];
+ if (self) {
+ _delegate = delegate;
+ }
+ return self;
+}
+
+- (void)stop {
+ // Clear the delegate to avoid any possible bleed through of events from GRPC.
+ self.delegate = nil;
+
+ [super stop];
+}
+
+- (void)watchQuery:(FSTQueryData *)query {
+ @throw FSTAbstractMethodException(); // NOLINT
+}
+
+- (void)unwatchTargetID:(FSTTargetID)targetID {
+ @throw FSTAbstractMethodException(); // NOLINT
+}
+
+- (void)handleStreamOpen {
+ [self.delegate watchStreamDidOpen];
+}
+
+- (void)handleStreamClose:(NSError *_Nullable)error {
+ [super handleStreamClose:error];
+ [self.delegate watchStreamDidClose:error];
+}
+
+@end
+
+#pragma mark - FSTBetaWatchStream
+
+@implementation FSTBetaWatchStream {
+ FSTSerializerBeta *_serializer;
+}
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ serializer:(FSTSerializerBeta *)serializer
+ delegate:(id<FSTWatchStreamDelegate>)delegate {
+ self = [super initWithDatabase:database
+ workerDispatchQueue:workerDispatchQueue
+ credentials:credentials
+ responseMessageClass:[GCFSListenResponse class]
+ delegate:delegate];
+ if (self) {
+ _serializer = serializer;
+ }
+ return self;
+}
+
+- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
+ return [[GRPCCall alloc] initWithHost:self.databaseInfo.host
+ path:@"/google.firestore.v1beta1.Firestore/Listen"
+ requestsWriter:requestsWriter];
+}
+
+- (void)watchQuery:(FSTQueryData *)query {
+ FSTAssert([self isOpen], @"Not yet open");
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ GCFSListenRequest *request = [GCFSListenRequest message];
+ request.database = [_serializer encodedDatabaseID];
+ request.addTarget = [_serializer encodedTarget:query];
+ request.labels = [_serializer encodedListenRequestLabelsForQueryData:query];
+
+ FSTLog(@"FSTWatchStream %p watch: %@", (__bridge void *)self, request);
+ [self writeRequest:request];
+}
+
+- (void)unwatchTargetID:(FSTTargetID)targetID {
+ FSTAssert([self isOpen], @"Not yet open");
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ GCFSListenRequest *request = [GCFSListenRequest message];
+ request.database = [_serializer encodedDatabaseID];
+ request.removeTarget = targetID;
+
+ FSTLog(@"FSTWatchStream %p unwatch: %@", (__bridge void *)self, request);
+ [self writeRequest:request];
+}
+
+/**
+ * Receives an inbound message from GRPC, deserializes, and then passes that on to the delegate's
+ * watchStreamDidChange:snapshotVersion: callback.
+ */
+- (void)handleStreamMessage:(GCFSListenResponse *)proto {
+ FSTLog(@"FSTWatchStream %p response: %@", (__bridge void *)self, proto);
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ // A successful response means the stream is healthy.
+ [self.backoff reset];
+
+ FSTWatchChange *change = [_serializer decodedWatchChange:proto];
+ FSTSnapshotVersion *snap = [_serializer versionFromListenResponse:proto];
+ [self.delegate watchStreamDidChange:change snapshotVersion:snap];
+}
+
+@end
+
+#pragma mark - FSTWriteStream
+
+@implementation FSTWriteStream
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ responseMessageClass:(Class)responseMessageClass
+ delegate:(id<FSTWriteStreamDelegate>)delegate {
+ self = [super initWithDatabase:database
+ workerDispatchQueue:workerDispatchQueue
+ credentials:credentials
+ responseMessageClass:responseMessageClass];
+ if (self) {
+ _delegate = delegate;
+ }
+ return self;
+}
+
+- (void)start {
+ self.handshakeComplete = NO;
+ [super start];
+}
+
+- (void)stop {
+ // Clear the delegate to avoid any possible bleed through of events from GRPC.
+ self.delegate = nil;
+
+ [super stop];
+}
+
+- (void)writeHandshake {
+ @throw FSTAbstractMethodException(); // NOLINT
+}
+
+- (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
+ @throw FSTAbstractMethodException(); // NOLINT
+}
+
+- (void)handleStreamOpen {
+ [self.delegate writeStreamDidOpen];
+}
+
+- (void)handleStreamClose:(NSError *_Nullable)error {
+ [super handleStreamClose:error];
+
+ [self.delegate writeStreamDidClose:error];
+}
+
+@end
+
+#pragma mark - FSTBetaWriteStream
+
+@implementation FSTBetaWriteStream {
+ FSTSerializerBeta *_serializer;
+}
+
+- (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
+ workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
+ credentials:(id<FSTCredentialsProvider>)credentials
+ serializer:(FSTSerializerBeta *)serializer
+ delegate:(id<FSTWriteStreamDelegate>)delegate {
+ self = [super initWithDatabase:database
+ workerDispatchQueue:workerDispatchQueue
+ credentials:credentials
+ responseMessageClass:[GCFSWriteResponse class]
+ delegate:delegate];
+ if (self) {
+ _serializer = serializer;
+ }
+ return self;
+}
+
+- (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
+ return [[GRPCCall alloc] initWithHost:self.databaseInfo.host
+ path:@"/google.firestore.v1beta1.Firestore/Write"
+ requestsWriter:requestsWriter];
+}
+
+- (void)writeHandshake {
+ // The initial request cannot contain mutations, but must contain a projectID.
+ FSTAssert([self isOpen], @"Not yet open");
+ FSTAssert(!self.handshakeComplete, @"Handshake sent out of turn");
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ GCFSWriteRequest *request = [GCFSWriteRequest message];
+ request.database = [_serializer encodedDatabaseID];
+ // TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the
+ // handshake, ignoring any stream token we might have.
+
+ FSTLog(@"FSTWriteStream %p initial request: %@", (__bridge void *)self, request);
+ [self writeRequest:request];
+}
+
+- (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
+ FSTAssert([self isOpen], @"Not yet open");
+ FSTAssert(self.handshakeComplete, @"Mutations sent out of turn");
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ NSMutableArray<GCFSWrite *> *protos = [NSMutableArray arrayWithCapacity:mutations.count];
+ for (FSTMutation *mutation in mutations) {
+ [protos addObject:[_serializer encodedMutation:mutation]];
+ };
+
+ GCFSWriteRequest *request = [GCFSWriteRequest message];
+ request.writesArray = protos;
+ request.streamToken = self.lastStreamToken;
+
+ FSTLog(@"FSTWriteStream %p mutation request: %@", (__bridge void *)self, request);
+ [self writeRequest:request];
+}
+
+/**
+ * Implements GRXWriteable to receive an inbound message from GRPC, deserialize, and then pass
+ * that on to the mutationResultsHandler.
+ */
+- (void)handleStreamMessage:(GCFSWriteResponse *)response {
+ FSTLog(@"FSTWriteStream %p response: %@", (__bridge void *)self, response);
+ [self.workerDispatchQueue verifyIsCurrentQueue];
+
+ // A successful response means the stream is healthy.
+ [self.backoff reset];
+
+ // Always capture the last stream token.
+ self.lastStreamToken = response.streamToken;
+
+ if (!self.handshakeComplete) {
+ // The first response is the handshake response
+ self.handshakeComplete = YES;
+
+ [self.delegate writeStreamDidCompleteHandshake];
+ } else {
+ FSTSnapshotVersion *commitVersion = [_serializer decodedVersion:response.commitTime];
+ NSMutableArray<GCFSWriteResult *> *protos = response.writeResultsArray;
+ NSMutableArray<FSTMutationResult *> *results = [NSMutableArray arrayWithCapacity:protos.count];
+ for (GCFSWriteResult *proto in protos) {
+ [results addObject:[_serializer decodedMutationResult:proto]];
+ };
+
+ [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results];
+ }
+}
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTExistenceFilter.h b/Firestore/Source/Remote/FSTExistenceFilter.h
new file mode 100644
index 0000000..df95950
--- /dev/null
+++ b/Firestore/Source/Remote/FSTExistenceFilter.h
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import <Foundation/Foundation.h>
+
+NS_ASSUME_NONNULL_BEGIN
+
+@interface FSTExistenceFilter : NSObject
+
++ (instancetype)filterWithCount:(int32_t)count;
+
+- (instancetype)init __attribute__((unavailable("Use a static constructor")));
+
+@property(nonatomic, assign, readonly) int32_t count;
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTExistenceFilter.m b/Firestore/Source/Remote/FSTExistenceFilter.m
new file mode 100644
index 0000000..7c0ded2
--- /dev/null
+++ b/Firestore/Source/Remote/FSTExistenceFilter.m
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import "FSTExistenceFilter.h"
+
+@interface FSTExistenceFilter ()
+
+- (instancetype)initWithCount:(int32_t)count NS_DESIGNATED_INITIALIZER;
+
+@end
+
+@implementation FSTExistenceFilter
+
++ (instancetype)filterWithCount:(int32_t)count {
+ return [[FSTExistenceFilter alloc] initWithCount:count];
+}
+
+- (instancetype)initWithCount:(int32_t)count {
+ if (self = [super init]) {
+ _count = count;
+ }
+ return self;
+}
+
+- (BOOL)isEqual:(id)other {
+ if (other == self) {
+ return YES;
+ }
+ if (![other isMemberOfClass:[FSTExistenceFilter class]]) {
+ return NO;
+ }
+
+ return _count == ((FSTExistenceFilter *)other).count;
+}
+
+- (NSUInteger)hash {
+ return _count;
+}
+
+@end
diff --git a/Firestore/Source/Remote/FSTExponentialBackoff.h b/Firestore/Source/Remote/FSTExponentialBackoff.h
new file mode 100644
index 0000000..0bee2bd
--- /dev/null
+++ b/Firestore/Source/Remote/FSTExponentialBackoff.h
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import <Foundation/Foundation.h>
+
+@class FSTDispatchQueue;
+
+NS_ASSUME_NONNULL_BEGIN
+
+/**
+ * Helper to implement exponential backoff.
+ *
+ * In general, call -reset after each successful round-trip. Call -backoffAndRunBlock before
+ * retrying after an error. Each backoffAndRunBlock will increase the delay between retries.
+ */
+@interface FSTExponentialBackoff : NSObject
+
+/**
+ * Creates and returns a helper for running delayed tasks following an exponential backoff curve
+ * between attempts.
+ *
+ * Each delay is made up of a "base" delay which follows the exponential backoff curve, and a
+ * +/- 50% "jitter" that is calculated and added to the base delay. This prevents clients from
+ * accidentally synchronizing their delays causing spikes of load to the backend.
+ *
+ * @param dispatchQueue The dispatch queue to run tasks on.
+ * @param initialDelay The initial delay (used as the base delay on the first retry attempt).
+ * Note that jitter will still be applied, so the actual delay could be as little as
+ * 0.5*initialDelay.
+ * @param backoffFactor The multiplier to use to determine the extended base delay after each
+ * attempt.
+ * @param maxDelay The maximum base delay after which no further backoff is performed. Note that
+ * jitter will still be applied, so the actual delay could be as much as 1.5*maxDelay.
+ */
++ (instancetype)exponentialBackoffWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue
+ initialDelay:(NSTimeInterval)initialDelay
+ backoffFactor:(double)backoffFactor
+ maxDelay:(NSTimeInterval)maxDelay;
+
+- (instancetype)init
+ __attribute__((unavailable("Use exponentialBackoffWithDispatchQueue constructor method.")));
+
+/**
+ * Resets the backoff delay.
+ *
+ * The very next backoffAndRunBlock: will have no delay. If it is called again (i.e. due to an
+ * error), initialDelay (plus jitter) will be used, and subsequent ones will increase according
+ * to the backoffFactor.
+ */
+- (void)reset;
+
+/**
+ * Resets the backoff to the maximum delay (e.g. for use after a RESOURCE_EXHAUSTED error).
+ */
+- (void)resetToMax;
+
+/**
+ * Waits for currentDelay seconds, increases the delay and runs the specified block.
+ *
+ * @param block The block to run.
+ */
+- (void)backoffAndRunBlock:(void (^)())block;
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTExponentialBackoff.m b/Firestore/Source/Remote/FSTExponentialBackoff.m
new file mode 100644
index 0000000..ec21282
--- /dev/null
+++ b/Firestore/Source/Remote/FSTExponentialBackoff.m
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import "FSTExponentialBackoff.h"
+
+#import "FSTDispatchQueue.h"
+#import "FSTLogger.h"
+#import "FSTUtil.h"
+
+@interface FSTExponentialBackoff ()
+- (instancetype)initWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue
+ initialDelay:(NSTimeInterval)initialDelay
+ backoffFactor:(double)backoffFactor
+ maxDelay:(NSTimeInterval)maxDelay NS_DESIGNATED_INITIALIZER;
+
+@property(nonatomic, strong) FSTDispatchQueue *dispatchQueue;
+@property(nonatomic) double backoffFactor;
+@property(nonatomic) NSTimeInterval initialDelay;
+@property(nonatomic) NSTimeInterval maxDelay;
+@property(nonatomic) NSTimeInterval currentBase;
+@end
+
+@implementation FSTExponentialBackoff
+
+- (instancetype)initWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue
+ initialDelay:(NSTimeInterval)initialDelay
+ backoffFactor:(double)backoffFactor
+ maxDelay:(NSTimeInterval)maxDelay {
+ if (self = [super init]) {
+ _dispatchQueue = dispatchQueue;
+ _initialDelay = initialDelay;
+ _backoffFactor = backoffFactor;
+ _maxDelay = maxDelay;
+
+ [self reset];
+ }
+ return self;
+}
+
++ (instancetype)exponentialBackoffWithDispatchQueue:(FSTDispatchQueue *)dispatchQueue
+ initialDelay:(NSTimeInterval)initialDelay
+ backoffFactor:(double)backoffFactor
+ maxDelay:(NSTimeInterval)maxDelay {
+ return [[FSTExponentialBackoff alloc] initWithDispatchQueue:dispatchQueue
+ initialDelay:initialDelay
+ backoffFactor:backoffFactor
+ maxDelay:maxDelay];
+}
+
+- (void)reset {
+ _currentBase = 0;
+}
+
+- (void)resetToMax {
+ _currentBase = _maxDelay;
+}
+
+- (void)backoffAndRunBlock:(void (^)())block {
+ // First schedule the block using the current base (which may be 0 and should be honored as such).
+ NSTimeInterval delayWithJitter = _currentBase + [self jitterDelay];
+ if (_currentBase > 0) {
+ FSTLog(@"Backing off for %.2f seconds (base delay: %.2f seconds)", delayWithJitter,
+ _currentBase);
+ }
+ dispatch_time_t delay =
+ dispatch_time(DISPATCH_TIME_NOW, (int64_t)(delayWithJitter * NSEC_PER_SEC));
+ dispatch_after(delay, self.dispatchQueue.queue, block);
+
+ // Apply backoff factor to determine next delay and ensure it is within bounds.
+ _currentBase *= _backoffFactor;
+ if (_currentBase < _initialDelay) {
+ _currentBase = _initialDelay;
+ }
+ if (_currentBase > _maxDelay) {
+ _currentBase = _maxDelay;
+ }
+}
+
+/** Returns a random value in the range [-currentBase/2, currentBase/2] */
+- (NSTimeInterval)jitterDelay {
+ return ([FSTUtil randomDouble] - 0.5) * _currentBase;
+}
+
+@end
diff --git a/Firestore/Source/Remote/FSTRemoteEvent.h b/Firestore/Source/Remote/FSTRemoteEvent.h
new file mode 100644
index 0000000..939a027
--- /dev/null
+++ b/Firestore/Source/Remote/FSTRemoteEvent.h
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import <Foundation/Foundation.h>
+
+#import "FSTDocumentDictionary.h"
+#import "FSTDocumentKeySet.h"
+#import "FSTTypes.h"
+
+@class FSTDocument;
+@class FSTDocumentKey;
+@class FSTExistenceFilter;
+@class FSTMaybeDocument;
+@class FSTSnapshotVersion;
+@class FSTWatchChange;
+@class FSTQueryData;
+
+NS_ASSUME_NONNULL_BEGIN
+
+#pragma mark - FSTTargetMapping
+
+/**
+ * TargetMapping represents a change to the documents in a query from the server. This can either
+ * be an incremental Update or a full Reset.
+ *
+ * <p>This is an empty abstract class so that all the different kinds of changes can have a common
+ * base class.
+ */
+@interface FSTTargetMapping : NSObject
+@end
+
+#pragma mark - FSTResetMapping
+
+/** The new set of documents to replace the current documents for a target. */
+@interface FSTResetMapping : FSTTargetMapping
+
+/**
+ * Creates a new mapping with the keys for the given documents added. This is intended primarily
+ * for testing.
+ */
++ (FSTResetMapping *)mappingWithDocuments:(NSArray<FSTDocument *> *)documents;
+
+/** The new set of documents for the target. */
+@property(nonatomic, strong, readonly) FSTDocumentKeySet *documents;
+@end
+
+#pragma mark - FSTUpdateMapping
+
+/**
+ * A target should update its set of documents with the given added/removed set of documents.
+ */
+@interface FSTUpdateMapping : FSTTargetMapping
+
+/**
+ * Creates a new mapping with the keys for the given documents added. This is intended primarily
+ * for testing.
+ */
++ (FSTUpdateMapping *)mappingWithAddedDocuments:(NSArray<FSTDocument *> *)added
+ removedDocuments:(NSArray<FSTDocument *> *)removed;
+
+- (FSTDocumentKeySet *)applyTo:(FSTDocumentKeySet *)keys;
+
+/** The documents added to the target. */
+@property(nonatomic, strong, readonly) FSTDocumentKeySet *addedDocuments;
+/** The documents removed from the target. */
+@property(nonatomic, strong, readonly) FSTDocumentKeySet *removedDocuments;
+@end
+
+#pragma mark - FSTTargetChange
+
+/**
+ * Represents an update to the current status of a target, either explicitly having no new state, or
+ * the new value to set. Note "current" has special meaning in the RPC protocol that implies that a
+ * target is both up-to-date and consistent with the rest of the watch stream.
+ */
+typedef NS_ENUM(NSUInteger, FSTCurrentStatusUpdate) {
+ /** The current status is not affected and should not be modified */
+ FSTCurrentStatusUpdateNone,
+ /** The target must be marked as no longer "current" */
+ FSTCurrentStatusUpdateMarkNotCurrent,
+ /** The target must be marked as "current" */
+ FSTCurrentStatusUpdateMarkCurrent,
+};
+
+/**
+ * A part of an FSTRemoteEvent specifying set of changes to a specific target. These changes track
+ * what documents are currently included in the target as well as the current snapshot version and
+ * resume token but the actual changes *to* documents are not part of the FSTTargetChange since
+ * documents may be part of multiple targets.
+ */
+@interface FSTTargetChange : NSObject
+
+/**
+ * Creates a new target change with the given documents. Instances of FSTDocument are considered
+ * added. Instance of FSTDeletedDocument are considered removed. This is intended primarily for
+ * testing.
+ */
++ (instancetype)changeWithDocuments:(NSArray<FSTMaybeDocument *> *)docs
+ currentStatusUpdate:(FSTCurrentStatusUpdate)currentStatusUpdate;
+
+/**
+ * The new "current" (synced) status of this target. Set to CurrentStatusUpdateNone if the status
+ * should not be updated. Note "current" has special meaning for in the RPC protocol that implies
+ * that a target is both up-to-date and consistent with the rest of the watch stream.
+ */
+@property(nonatomic, assign, readonly) FSTCurrentStatusUpdate currentStatusUpdate;
+
+/** A set of changes to documents in this target. */
+@property(nonatomic, strong, readonly) FSTTargetMapping *mapping;
+
+/**
+ * The snapshot version representing the last state at which this target received a consistent
+ * snapshot from the backend.
+ */
+@property(nonatomic, strong, readonly) FSTSnapshotVersion *snapshotVersion;
+
+/**
+ * An opaque, server-assigned token that allows watching a query to be resumed after disconnecting
+ * without retransmitting all the data that matches the query. The resume token essentially
+ * identifies a point in time from which the server should resume sending results.
+ */
+@property(nonatomic, strong, readonly) NSData *resumeToken;
+
+@end
+
+#pragma mark - FSTRemoteEvent
+
+/**
+ * An event from the RemoteStore. It is split into targetChanges (changes to the state or the set
+ * of documents in our watched targets) and documentUpdates (changes to the actual documents).
+ */
+@interface FSTRemoteEvent : NSObject
+
++ (instancetype)
+eventWithSnapshotVersion:(FSTSnapshotVersion *)snapshotVersion
+ targetChanges:(NSMutableDictionary<NSNumber *, FSTTargetChange *> *)targetChanges
+ documentUpdates:
+ (NSMutableDictionary<FSTDocumentKey *, FSTMaybeDocument *> *)documentUpdates;
+
+/** The snapshot version this event brings us up to. */
+@property(nonatomic, strong, readonly) FSTSnapshotVersion *snapshotVersion;
+
+/** A map from target to changes to the target. See TargetChange. */
+@property(nonatomic, strong, readonly)
+ NSDictionary<FSTBoxedTargetID *, FSTTargetChange *> *targetChanges;
+
+/**
+ * A set of which documents have changed or been deleted, along with the doc's new values
+ * (if not deleted).
+ */
+@property(nonatomic, strong, readonly)
+ NSDictionary<FSTDocumentKey *, FSTMaybeDocument *> *documentUpdates;
+
+/** Adds a document update to this remote event */
+- (void)addDocumentUpdate:(FSTMaybeDocument *)document;
+
+/** Handles an existence filter mismatch */
+- (void)handleExistenceFilterMismatchForTargetID:(FSTBoxedTargetID *)targetID;
+
+@end
+
+#pragma mark - FSTWatchChangeAggregator
+
+/**
+ * A helper class to accumulate watch changes into a FSTRemoteEvent and other target
+ * information.
+ */
+@interface FSTWatchChangeAggregator : NSObject
+
+- (instancetype)
+initWithSnapshotVersion:(FSTSnapshotVersion *)snapshotVersion
+ listenTargets:(NSDictionary<FSTBoxedTargetID *, FSTQueryData *> *)listenTargets
+ pendingTargetResponses:(NSDictionary<FSTBoxedTargetID *, NSNumber *> *)pendingTargetResponses
+ NS_DESIGNATED_INITIALIZER;
+
+- (instancetype)init NS_UNAVAILABLE;
+
+/** The number of pending responses that are being waited on from watch */
+@property(nonatomic, strong, readonly)
+ NSMutableDictionary<FSTBoxedTargetID *, NSNumber *> *pendingTargetResponses;
+
+/** Aggregates a watch change into the current state */
+- (void)addWatchChange:(FSTWatchChange *)watchChange;
+
+/** Aggregates all provided watch changes to the current state in order */
+- (void)addWatchChanges:(NSArray<FSTWatchChange *> *)watchChanges;
+
+/**
+ * Converts the current state into a remote event with the snapshot version taken from the
+ * initializer.
+ */
+- (FSTRemoteEvent *)remoteEvent;
+
+/** The existence filters - if any - for the given target IDs. */
+@property(nonatomic, strong, readonly)
+ NSDictionary<FSTBoxedTargetID *, FSTExistenceFilter *> *existenceFilters;
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTRemoteEvent.m b/Firestore/Source/Remote/FSTRemoteEvent.m
new file mode 100644
index 0000000..5c75998
--- /dev/null
+++ b/Firestore/Source/Remote/FSTRemoteEvent.m
@@ -0,0 +1,516 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import "FSTRemoteEvent.h"
+
+#import "FSTAssert.h"
+#import "FSTClasses.h"
+#import "FSTDocument.h"
+#import "FSTDocumentKey.h"
+#import "FSTLogger.h"
+#import "FSTSnapshotVersion.h"
+#import "FSTWatchChange.h"
+
+NS_ASSUME_NONNULL_BEGIN
+
+#pragma mark - FSTTargetMapping
+
+@interface FSTTargetMapping ()
+
+/** Private mutator method to add a document key to the mapping */
+- (void)addDocumentKey:(FSTDocumentKey *)documentKey;
+
+/** Private mutator method to remove a document key from the mapping */
+- (void)removeDocumentKey:(FSTDocumentKey *)documentKey;
+
+@end
+
+@implementation FSTTargetMapping
+
+- (void)addDocumentKey:(FSTDocumentKey *)documentKey {
+ @throw FSTAbstractMethodException(); // NOLINT
+}
+
+- (void)removeDocumentKey:(FSTDocumentKey *)documentKey {
+ @throw FSTAbstractMethodException(); // NOLINT
+}
+
+@end
+
+#pragma mark - FSTResetMapping
+
+@interface FSTResetMapping ()
+@property(nonatomic, strong) FSTDocumentKeySet *documents;
+@end
+
+@implementation FSTResetMapping
+
++ (instancetype)mappingWithDocuments:(NSArray<FSTDocument *> *)documents {
+ FSTResetMapping *mapping = [[FSTResetMapping alloc] init];
+ for (FSTDocument *doc in documents) {
+ mapping.documents = [mapping.documents setByAddingObject:doc.key];
+ }
+ return mapping;
+}
+
+- (instancetype)init {
+ self = [super init];
+ if (self) {
+ _documents = [FSTDocumentKeySet keySet];
+ }
+ return self;
+}
+
+- (BOOL)isEqual:(id)other {
+ if (other == self) {
+ return YES;
+ }
+ if (![other isMemberOfClass:[FSTResetMapping class]]) {
+ return NO;
+ }
+
+ FSTResetMapping *otherMapping = (FSTResetMapping *)other;
+ return [self.documents isEqual:otherMapping.documents];
+}
+
+- (NSUInteger)hash {
+ return self.documents.hash;
+}
+
+- (void)addDocumentKey:(FSTDocumentKey *)documentKey {
+ self.documents = [self.documents setByAddingObject:documentKey];
+}
+
+- (void)removeDocumentKey:(FSTDocumentKey *)documentKey {
+ self.documents = [self.documents setByRemovingObject:documentKey];
+}
+
+@end
+
+#pragma mark - FSTUpdateMapping
+
+@interface FSTUpdateMapping ()
+@property(nonatomic, strong) FSTDocumentKeySet *addedDocuments;
+@property(nonatomic, strong) FSTDocumentKeySet *removedDocuments;
+@end
+
+@implementation FSTUpdateMapping
+
++ (FSTUpdateMapping *)mappingWithAddedDocuments:(NSArray<FSTDocument *> *)added
+ removedDocuments:(NSArray<FSTDocument *> *)removed {
+ FSTUpdateMapping *mapping = [[FSTUpdateMapping alloc] init];
+ for (FSTDocument *doc in added) {
+ mapping.addedDocuments = [mapping.addedDocuments setByAddingObject:doc.key];
+ }
+ for (FSTDocument *doc in removed) {
+ mapping.removedDocuments = [mapping.removedDocuments setByAddingObject:doc.key];
+ }
+ return mapping;
+}
+
+- (instancetype)init {
+ self = [super init];
+ if (self) {
+ _addedDocuments = [FSTDocumentKeySet keySet];
+ _removedDocuments = [FSTDocumentKeySet keySet];
+ }
+ return self;
+}
+
+- (BOOL)isEqual:(id)other {
+ if (other == self) {
+ return YES;
+ }
+ if (![other isMemberOfClass:[FSTUpdateMapping class]]) {
+ return NO;
+ }
+
+ FSTUpdateMapping *otherMapping = (FSTUpdateMapping *)other;
+ return [self.addedDocuments isEqual:otherMapping.addedDocuments] &&
+ [self.removedDocuments isEqual:otherMapping.removedDocuments];
+}
+
+- (NSUInteger)hash {
+ return self.addedDocuments.hash * 31 + self.removedDocuments.hash;
+}
+
+- (FSTDocumentKeySet *)applyTo:(FSTDocumentKeySet *)keys {
+ __block FSTDocumentKeySet *result = keys;
+ [self.addedDocuments enumerateObjectsUsingBlock:^(FSTDocumentKey *key, BOOL *stop) {
+ result = [result setByAddingObject:key];
+ }];
+ [self.removedDocuments enumerateObjectsUsingBlock:^(FSTDocumentKey *key, BOOL *stop) {
+ result = [result setByRemovingObject:key];
+ }];
+ return result;
+}
+
+- (void)addDocumentKey:(FSTDocumentKey *)documentKey {
+ self.addedDocuments = [self.addedDocuments setByAddingObject:documentKey];
+ self.removedDocuments = [self.removedDocuments setByRemovingObject:documentKey];
+}
+
+- (void)removeDocumentKey:(FSTDocumentKey *)documentKey {
+ self.addedDocuments = [self.addedDocuments setByRemovingObject:documentKey];
+ self.removedDocuments = [self.removedDocuments setByAddingObject:documentKey];
+}
+
+@end
+
+#pragma mark - FSTTargetChange
+
+@interface FSTTargetChange ()
+@property(nonatomic, assign) FSTCurrentStatusUpdate currentStatusUpdate;
+@property(nonatomic, strong, nullable) FSTTargetMapping *mapping;
+@property(nonatomic, strong) FSTSnapshotVersion *snapshotVersion;
+@property(nonatomic, strong) NSData *resumeToken;
+@end
+
+@implementation FSTTargetChange
+
+- (instancetype)init {
+ if (self = [super init]) {
+ _currentStatusUpdate = FSTCurrentStatusUpdateNone;
+ _resumeToken = [NSData data];
+ }
+ return self;
+}
+
++ (instancetype)changeWithDocuments:(NSArray<FSTMaybeDocument *> *)docs
+ currentStatusUpdate:(FSTCurrentStatusUpdate)currentStatusUpdate {
+ FSTUpdateMapping *mapping = [[FSTUpdateMapping alloc] init];
+ for (FSTMaybeDocument *doc in docs) {
+ if ([doc isKindOfClass:[FSTDeletedDocument class]]) {
+ mapping.removedDocuments = [mapping.removedDocuments setByAddingObject:doc.key];
+ } else {
+ mapping.addedDocuments = [mapping.addedDocuments setByAddingObject:doc.key];
+ }
+ }
+ FSTTargetChange *change = [[FSTTargetChange alloc] init];
+ change.mapping = mapping;
+ change.currentStatusUpdate = currentStatusUpdate;
+ return change;
+}
+
++ (instancetype)changeWithMapping:(FSTTargetMapping *)mapping
+ snapshotVersion:(FSTSnapshotVersion *)snapshotVersion
+ currentStatusUpdate:(FSTCurrentStatusUpdate)currentStatusUpdate {
+ FSTTargetChange *change = [[FSTTargetChange alloc] init];
+ change.mapping = mapping;
+ change.snapshotVersion = snapshotVersion;
+ change.currentStatusUpdate = currentStatusUpdate;
+ return change;
+}
+
+- (FSTTargetMapping *)mapping {
+ if (!_mapping) {
+ // Create an FSTUpdateMapping by default, since resets are always explicit
+ _mapping = [[FSTUpdateMapping alloc] init];
+ }
+ return _mapping;
+}
+
+/**
+ * Sets the resume token but only when it has a new value. Empty resumeTokens are
+ * discarded.
+ */
+- (void)setResumeToken:(NSData *)resumeToken {
+ if (resumeToken.length > 0) {
+ _resumeToken = resumeToken;
+ }
+}
+
+@end
+
+#pragma mark - FSTRemoteEvent
+
+@interface FSTRemoteEvent () {
+ NSMutableDictionary<FSTDocumentKey *, FSTMaybeDocument *> *_documentUpdates;
+ NSMutableDictionary<FSTBoxedTargetID *, FSTTargetChange *> *_targetChanges;
+}
+
+- (instancetype)
+initWithSnapshotVersion:(FSTSnapshotVersion *)snapshotVersion
+ targetChanges:(NSMutableDictionary<FSTBoxedTargetID *, FSTTargetChange *> *)targetChanges
+ documentUpdates:
+ (NSMutableDictionary<FSTDocumentKey *, FSTMaybeDocument *> *)documentUpdates;
+
+@property(nonatomic, strong) FSTSnapshotVersion *snapshotVersion;
+
+@end
+
+@implementation FSTRemoteEvent
+
++ (instancetype)
+eventWithSnapshotVersion:(FSTSnapshotVersion *)snapshotVersion
+ targetChanges:(NSMutableDictionary<NSNumber *, FSTTargetChange *> *)targetChanges
+ documentUpdates:
+ (NSMutableDictionary<FSTDocumentKey *, FSTMaybeDocument *> *)documentUpdates {
+ return [[FSTRemoteEvent alloc] initWithSnapshotVersion:snapshotVersion
+ targetChanges:targetChanges
+ documentUpdates:documentUpdates];
+}
+
+- (instancetype)
+initWithSnapshotVersion:(FSTSnapshotVersion *)snapshotVersion
+ targetChanges:(NSMutableDictionary<NSNumber *, FSTTargetChange *> *)targetChanges
+ documentUpdates:
+ (NSMutableDictionary<FSTDocumentKey *, FSTMaybeDocument *> *)documentUpdates {
+ self = [super init];
+ if (self) {
+ _snapshotVersion = snapshotVersion;
+ _targetChanges = targetChanges;
+ _documentUpdates = documentUpdates;
+ }
+ return self;
+}
+
+/** Adds a document update to this remote event */
+- (void)addDocumentUpdate:(FSTMaybeDocument *)document {
+ _documentUpdates[document.key] = document;
+}
+
+/** Handles an existence filter mismatch */
+- (void)handleExistenceFilterMismatchForTargetID:(FSTBoxedTargetID *)targetID {
+ // An existence filter mismatch will reset the query and we need to reset the mapping to contain
+ // no documents and an empty resume token.
+ //
+ // Note:
+ // * The reset mapping is empty, specifically forcing the consumer of the change to
+ // forget all keys for this targetID;
+ // * The resume snapshot for this target must be reset
+ // * The target must be unacked because unwatching and rewatching introduces a race for
+ // changes.
+ //
+ // TODO(dimond): keep track of reset targets not to raise.
+ FSTTargetChange *targetChange =
+ [FSTTargetChange changeWithMapping:[[FSTResetMapping alloc] init]
+ snapshotVersion:[FSTSnapshotVersion noVersion]
+ currentStatusUpdate:FSTCurrentStatusUpdateMarkNotCurrent];
+ _targetChanges[targetID] = targetChange;
+}
+
+@end
+
+#pragma mark - FSTWatchChangeAggregator
+
+@interface FSTWatchChangeAggregator ()
+
+/** The snapshot version for every target change this creates. */
+@property(nonatomic, strong, readonly) FSTSnapshotVersion *snapshotVersion;
+
+/** Keeps track of the current target mappings */
+@property(nonatomic, strong, readonly)
+ NSMutableDictionary<FSTBoxedTargetID *, FSTTargetChange *> *targetChanges;
+
+/** Keeps track of document to update */
+@property(nonatomic, strong, readonly)
+ NSMutableDictionary<FSTDocumentKey *, FSTMaybeDocument *> *documentUpdates;
+
+/** The set of open listens on the client */
+@property(nonatomic, strong, readonly)
+ NSDictionary<FSTBoxedTargetID *, FSTQueryData *> *listenTargets;
+
+/** Whether this aggregator was frozen and can no longer be modified */
+@property(nonatomic, assign) BOOL frozen;
+
+@end
+
+@implementation FSTWatchChangeAggregator {
+ NSMutableDictionary<FSTBoxedTargetID *, FSTExistenceFilter *> *_existenceFilters;
+}
+
+- (instancetype)
+initWithSnapshotVersion:(FSTSnapshotVersion *)snapshotVersion
+ listenTargets:(NSDictionary<FSTBoxedTargetID *, FSTQueryData *> *)listenTargets
+ pendingTargetResponses:(NSDictionary<FSTBoxedTargetID *, NSNumber *> *)pendingTargetResponses {
+ self = [super init];
+ if (self) {
+ _snapshotVersion = snapshotVersion;
+
+ _frozen = NO;
+ _targetChanges = [NSMutableDictionary dictionary];
+ _listenTargets = listenTargets;
+ _pendingTargetResponses = [NSMutableDictionary dictionaryWithDictionary:pendingTargetResponses];
+
+ _existenceFilters = [NSMutableDictionary dictionary];
+ _documentUpdates = [NSMutableDictionary dictionary];
+ }
+ return self;
+}
+
+- (FSTTargetChange *)targetChangeForTargetID:(FSTBoxedTargetID *)targetID {
+ FSTTargetChange *change = self.targetChanges[targetID];
+ if (!change) {
+ change = [[FSTTargetChange alloc] init];
+ change.snapshotVersion = self.snapshotVersion;
+ self.targetChanges[targetID] = change;
+ }
+ return change;
+}
+
+- (void)addWatchChanges:(NSArray<FSTWatchChange *> *)watchChanges {
+ FSTAssert(!self.frozen, @"Trying to modify frozen FSTWatchChangeAggregator");
+ for (FSTWatchChange *watchChange in watchChanges) {
+ [self addWatchChange:watchChange];
+ }
+}
+
+- (void)addWatchChange:(FSTWatchChange *)watchChange {
+ FSTAssert(!self.frozen, @"Trying to modify frozen FSTWatchChangeAggregator");
+ if ([watchChange isKindOfClass:[FSTDocumentWatchChange class]]) {
+ [self addDocumentChange:(FSTDocumentWatchChange *)watchChange];
+ } else if ([watchChange isKindOfClass:[FSTWatchTargetChange class]]) {
+ [self addTargetChange:(FSTWatchTargetChange *)watchChange];
+ } else if ([watchChange isKindOfClass:[FSTExistenceFilterWatchChange class]]) {
+ [self addExistenceFilterChange:(FSTExistenceFilterWatchChange *)watchChange];
+ } else {
+ FSTFail(@"Unknown watch change: %@", watchChange);
+ }
+}
+
+- (void)addDocumentChange:(FSTDocumentWatchChange *)docChange {
+ BOOL relevant = NO;
+
+ for (FSTBoxedTargetID *targetID in docChange.updatedTargetIDs) {
+ if ([self isActiveTarget:targetID]) {
+ FSTTargetChange *change = [self targetChangeForTargetID:targetID];
+ [change.mapping addDocumentKey:docChange.documentKey];
+ relevant = YES;
+ }
+ }
+
+ for (FSTBoxedTargetID *targetID in docChange.removedTargetIDs) {
+ if ([self isActiveTarget:targetID]) {
+ FSTTargetChange *change = [self targetChangeForTargetID:targetID];
+ [change.mapping removeDocumentKey:docChange.documentKey];
+ relevant = YES;
+ }
+ }
+
+ // Only update the document if there is a new document to replace, this might be just a target
+ // update instead.
+ if (docChange.document && relevant) {
+ self.documentUpdates[docChange.documentKey] = docChange.document;
+ }
+}
+
+- (void)addTargetChange:(FSTWatchTargetChange *)targetChange {
+ for (FSTBoxedTargetID *targetID in targetChange.targetIDs) {
+ FSTTargetChange *change = [self targetChangeForTargetID:targetID];
+ switch (targetChange.state) {
+ case FSTWatchTargetChangeStateNoChange:
+ if ([self isActiveTarget:targetID]) {
+ // Creating the change above satisfies the semantics of no-change.
+ change.resumeToken = targetChange.resumeToken;
+ }
+ break;
+ case FSTWatchTargetChangeStateAdded:
+ [self recordResponseForTargetID:targetID];
+ if (![self.pendingTargetResponses objectForKey:targetID]) {
+ // We have a freshly added target, so we need to reset any state that we had previously
+ // This can happen e.g. when remove and add back a target for existence filter
+ // mismatches.
+ change.mapping = nil;
+ change.currentStatusUpdate = FSTCurrentStatusUpdateNone;
+ [_existenceFilters removeObjectForKey:targetID];
+ }
+ change.resumeToken = targetChange.resumeToken;
+ break;
+ case FSTWatchTargetChangeStateRemoved:
+ // We need to keep track of removed targets to we can post-filter and remove any target
+ // changes.
+ [self recordResponseForTargetID:targetID];
+ FSTAssert(!targetChange.cause, @"WatchChangeAggregator does not handle errored targets.");
+ break;
+ case FSTWatchTargetChangeStateCurrent:
+ if ([self isActiveTarget:targetID]) {
+ change.currentStatusUpdate = FSTCurrentStatusUpdateMarkCurrent;
+ change.resumeToken = targetChange.resumeToken;
+ }
+ break;
+ case FSTWatchTargetChangeStateReset:
+ if ([self isActiveTarget:targetID]) {
+ // Overwrite any existing target mapping with a reset mapping. Every subsequent update
+ // will modify the reset mapping, not an update mapping.
+ change.mapping = [[FSTResetMapping alloc] init];
+ change.resumeToken = targetChange.resumeToken;
+ }
+ break;
+ default:
+ FSTWarn(@"Unknown target watch change type: %ld", (long)targetChange.state);
+ }
+ }
+}
+
+/**
+ * Records that we got a watch target add/remove by decrementing the number of pending target
+ * responses that we have.
+ */
+- (void)recordResponseForTargetID:(FSTBoxedTargetID *)targetID {
+ NSNumber *count = [self.pendingTargetResponses objectForKey:targetID];
+ int newCount = count ? [count intValue] - 1 : -1;
+ if (newCount == 0) {
+ [self.pendingTargetResponses removeObjectForKey:targetID];
+ } else {
+ [self.pendingTargetResponses setObject:[NSNumber numberWithInt:newCount] forKey:targetID];
+ }
+}
+
+/**
+ * Returns true if the given targetId is active. Active targets are those for which there are no
+ * pending requests to add a listen and are in the current list of targets the client cares about.
+ *
+ * Clients can repeatedly listen and stop listening to targets, so this check is useful in
+ * preventing in preventing race conditions for a target where events arrive but the server hasn't
+ * yet acknowledged the intended change in state.
+ */
+- (BOOL)isActiveTarget:(FSTBoxedTargetID *)targetID {
+ return [self.listenTargets objectForKey:targetID] &&
+ ![self.pendingTargetResponses objectForKey:targetID];
+}
+
+- (void)addExistenceFilterChange:(FSTExistenceFilterWatchChange *)existenceFilterChange {
+ FSTBoxedTargetID *targetID = @(existenceFilterChange.targetID);
+ if ([self isActiveTarget:targetID]) {
+ _existenceFilters[targetID] = existenceFilterChange.filter;
+ }
+}
+
+- (FSTRemoteEvent *)remoteEvent {
+ NSMutableDictionary<FSTBoxedTargetID *, FSTTargetChange *> *targetChanges = self.targetChanges;
+
+ NSMutableArray *targetsToRemove = [NSMutableArray array];
+
+ // Apply any inactive targets.
+ for (FSTBoxedTargetID *targetID in [targetChanges keyEnumerator]) {
+ if (![self isActiveTarget:targetID]) {
+ [targetsToRemove addObject:targetID];
+ }
+ }
+
+ [targetChanges removeObjectsForKeys:targetsToRemove];
+
+ // Mark this aggregator as frozen so no further modifications are made.
+ self.frozen = YES;
+ return [FSTRemoteEvent eventWithSnapshotVersion:self.snapshotVersion
+ targetChanges:targetChanges
+ documentUpdates:self.documentUpdates];
+}
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTRemoteStore.h b/Firestore/Source/Remote/FSTRemoteStore.h
new file mode 100644
index 0000000..94208e1
--- /dev/null
+++ b/Firestore/Source/Remote/FSTRemoteStore.h
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import <Foundation/Foundation.h>
+
+#import "FSTDocumentVersionDictionary.h"
+#import "FSTTypes.h"
+
+@class FSTDatabaseInfo;
+@class FSTDatastore;
+@class FSTDocumentKey;
+@class FSTLocalStore;
+@class FSTMutationBatch;
+@class FSTMutationBatchResult;
+@class FSTQuery;
+@class FSTQueryData;
+@class FSTRemoteEvent;
+@class FSTTransaction;
+@class FSTUser;
+
+NS_ASSUME_NONNULL_BEGIN
+
+#pragma mark - FSTRemoteSyncer
+
+/**
+ * A protocol that describes the actions the FSTRemoteStore needs to perform on a cooperating
+ * synchronization engine.
+ */
+@protocol FSTRemoteSyncer
+
+/**
+ * Applies one remote event to the sync engine, notifying any views of the changes, and releasing
+ * any pending mutation batches that would become visible because of the snapshot version the
+ * remote event contains.
+ */
+- (void)applyRemoteEvent:(FSTRemoteEvent *)remoteEvent;
+
+/**
+ * Rejects the listen for the given targetID. This can be triggered by the backend for any active
+ * target.
+ *
+ * @param targetID The targetID corresponding to a listen initiated via
+ * -listenToTargetWithQueryData: on FSTRemoteStore.
+ * @param error A description of the condition that has forced the rejection. Nearly always this
+ * will be an indication that the user is no longer authorized to see the data matching the
+ * target.
+ */
+- (void)rejectListenWithTargetID:(FSTBoxedTargetID *)targetID error:(NSError *)error;
+
+/**
+ * Applies the result of a successful write of a mutation batch to the sync engine, emitting
+ * snapshots in any views that the mutation applies to, and removing the batch from the mutation
+ * queue.
+ */
+- (void)applySuccessfulWriteWithResult:(FSTMutationBatchResult *)batchResult;
+
+/**
+ * Rejects the batch, removing the batch from the mutation queue, recomputing the local view of
+ * any documents affected by the batch and then, emitting snapshots with the reverted value.
+ */
+- (void)rejectFailedWriteWithBatchID:(FSTBatchID)batchID error:(NSError *)error;
+
+@end
+
+/**
+ * A protocol for the FSTRemoteStore online state delegate, called whenever the state of the
+ * online streams of the FSTRemoteStore changes.
+ * Note that this protocol only supports the watch stream for now.
+ */
+@protocol FSTOnlineStateDelegate <NSObject>
+
+/** Called whenever the online state of the watch stream changes */
+- (void)watchStreamDidChangeOnlineState:(FSTOnlineState)onlineState;
+
+@end
+
+#pragma mark - FSTRemoteStore
+
+/**
+ * FSTRemoteStore handles all interaction with the backend through a simple, clean interface. This
+ * class is not thread safe and should be only called from the worker dispatch queue.
+ */
+@interface FSTRemoteStore : NSObject
+
++ (instancetype)remoteStoreWithLocalStore:(FSTLocalStore *)localStore
+ datastore:(FSTDatastore *)datastore;
+
+- (instancetype)init __attribute__((unavailable("Use static constructor method.")));
+
+@property(nonatomic, weak) id<FSTRemoteSyncer> syncEngine;
+
+@property(nonatomic, weak) id<FSTOnlineStateDelegate> onlineStateDelegate;
+
+/** Starts up the remote store, creating streams, restoring state from LocalStore, etc. */
+- (void)start;
+
+/** Shuts down the remote store, tearing down connections and otherwise cleaning up. */
+- (void)shutdown;
+
+/**
+ * Tells the FSTRemoteStore that the currently authenticated user has changed.
+ *
+ * In response the remote store tears down streams and clears up any tracked operations that should
+ * not persist across users. Restarts the streams if appropriate.
+ */
+- (void)userDidChange:(FSTUser *)user;
+
+/** Listens to the target identified by the given FSTQueryData. */
+- (void)listenToTargetWithQueryData:(FSTQueryData *)queryData;
+
+/** Stops listening to the target with the given target ID. */
+- (void)stopListeningToTargetID:(FSTTargetID)targetID;
+
+/**
+ * Tells the FSTRemoteStore that there are new mutations to process in the queue. This is typically
+ * called by FSTSyncEngine after it has sent mutations to FSTLocalStore.
+ *
+ * In response the remote store will pull mutations from the local store until the datastore
+ * instance reports that it cannot accept further in-progress writes. This mechanism serves to
+ * maintain a pipeline of in-flight requests between the FSTDatastore and the server that
+ * applies them.
+ */
+- (void)fillWritePipeline;
+
+/** Returns a new transaction backed by this remote store. */
+- (FSTTransaction *)transaction;
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTRemoteStore.m b/Firestore/Source/Remote/FSTRemoteStore.m
new file mode 100644
index 0000000..cea2ce8
--- /dev/null
+++ b/Firestore/Source/Remote/FSTRemoteStore.m
@@ -0,0 +1,599 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import "FSTRemoteStore.h"
+
+#import "FSTAssert.h"
+#import "FSTDatastore.h"
+#import "FSTDocument.h"
+#import "FSTDocumentKey.h"
+#import "FSTExistenceFilter.h"
+#import "FSTLocalStore.h"
+#import "FSTLogger.h"
+#import "FSTMutation.h"
+#import "FSTMutationBatch.h"
+#import "FSTQuery.h"
+#import "FSTQueryData.h"
+#import "FSTRemoteEvent.h"
+#import "FSTSnapshotVersion.h"
+#import "FSTTransaction.h"
+#import "FSTWatchChange.h"
+
+NS_ASSUME_NONNULL_BEGIN
+
+/**
+ * The maximum number of pending writes to allow.
+ * TODO(bjornick): Negotiate this value with the backend.
+ */
+static const NSUInteger kMaxPendingWrites = 10;
+
+#pragma mark - FSTRemoteStore
+
+@interface FSTRemoteStore () <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
+
+- (instancetype)initWithLocalStore:(FSTLocalStore *)localStore
+ datastore:(FSTDatastore *)datastore NS_DESIGNATED_INITIALIZER;
+
+/**
+ * The local store, used to fill the write pipeline with outbound mutations and resolve existence
+ * filter mismatches. Immutable after initialization.
+ */
+@property(nonatomic, strong, readonly) FSTLocalStore *localStore;
+
+/** The client-side proxy for interacting with the backend. Immutable after initialization. */
+@property(nonatomic, strong, readonly) FSTDatastore *datastore;
+
+#pragma mark Watch Stream
+@property(nonatomic, strong, nullable) FSTWatchStream *watchStream;
+
+/**
+ * A mapping of watched targets that the client cares about tracking and the
+ * user has explicitly called a 'listen' for this target.
+ *
+ * These targets may or may not have been sent to or acknowledged by the
+ * server. On re-establishing the listen stream, these targets should be sent
+ * to the server. The targets removed with unlistens are removed eagerly
+ * without waiting for confirmation from the listen stream. */
+@property(nonatomic, strong, readonly)
+ NSMutableDictionary<FSTBoxedTargetID *, FSTQueryData *> *listenTargets;
+
+/**
+ * A mapping of targetId to pending acks needed.
+ *
+ * If a targetId is present in this map, then we're waiting for watch to
+ * acknowledge a removal or addition of the target. If a target is not in this
+ * mapping, and it's in the listenTargets map, then we consider the target to
+ * be active.
+ *
+ * We increment the count here everytime we issue a request over the stream to
+ * watch or unwatch. We then decrement the count everytime we get a target
+ * added or target removed message from the server. Once the count is equal to
+ * 0 we know that the client and server are in the same state (once this state
+ * is reached the targetId is removed from the map to free the memory).
+ */
+@property(nonatomic, strong, readonly)
+ NSMutableDictionary<FSTBoxedTargetID *, NSNumber *> *pendingTargetResponses;
+
+@property(nonatomic, strong) NSMutableArray<FSTWatchChange *> *accumulatedChanges;
+@property(nonatomic, assign) FSTBatchID lastBatchSeen;
+
+/**
+ * The online state of the watch stream. The state is set to healthy if and only if there are
+ * messages received by the backend.
+ */
+@property(nonatomic, assign) FSTOnlineState watchStreamOnlineState;
+
+#pragma mark Write Stream
+@property(nonatomic, strong, nullable) FSTWriteStream *writeStream;
+
+/**
+ * The approximate time the StreamingWrite stream was opened. Used to estimate if stream was
+ * closed due to an auth expiration (a recoverable error) or some other more permanent error.
+ */
+@property(nonatomic, strong, nullable) NSDate *writeStreamOpenTime;
+
+/**
+ * A FIFO queue of in-flight writes. This is in-flight from the point of view of the caller of
+ * writeMutations, not from the point of view from the Datastore itself. In particular, these
+ * requests may not have been sent to the Datastore server if the write stream is not yet running.
+ */
+@property(nonatomic, strong, readonly) NSMutableArray<FSTMutationBatch *> *pendingWrites;
+@end
+
+@implementation FSTRemoteStore
+
++ (instancetype)remoteStoreWithLocalStore:(FSTLocalStore *)localStore
+ datastore:(FSTDatastore *)datastore {
+ return [[FSTRemoteStore alloc] initWithLocalStore:localStore datastore:datastore];
+}
+
+- (instancetype)initWithLocalStore:(FSTLocalStore *)localStore datastore:(FSTDatastore *)datastore {
+ if (self = [super init]) {
+ _localStore = localStore;
+ _datastore = datastore;
+ _listenTargets = [NSMutableDictionary dictionary];
+ _pendingTargetResponses = [NSMutableDictionary dictionary];
+ _accumulatedChanges = [NSMutableArray array];
+
+ _lastBatchSeen = kFSTBatchIDUnknown;
+ _watchStreamOnlineState = FSTOnlineStateUnknown;
+ _pendingWrites = [NSMutableArray array];
+ }
+ return self;
+}
+
+- (void)start {
+ [self setupStreams];
+
+ // Resume any writes
+ [self fillWritePipeline];
+}
+
+- (void)updateAndNotifyAboutOnlineState:(FSTOnlineState)watchStreamOnlineState {
+ BOOL didChange = (watchStreamOnlineState != self.watchStreamOnlineState);
+ self.watchStreamOnlineState = watchStreamOnlineState;
+ if (didChange) {
+ [self.onlineStateDelegate watchStreamDidChangeOnlineState:watchStreamOnlineState];
+ }
+}
+
+- (void)setupStreams {
+ self.watchStream = [self.datastore createWatchStreamWithDelegate:self];
+ self.writeStream = [self.datastore createWriteStreamWithDelegate:self];
+
+ // Load any saved stream token from persistent storage
+ self.writeStream.lastStreamToken = [self.localStore lastStreamToken];
+}
+
+#pragma mark Shutdown
+
+- (void)shutdown {
+ FSTLog(@"FSTRemoteStore %p shutting down", (__bridge void *)self);
+
+ self.watchStreamOnlineState = FSTOnlineStateUnknown;
+ [self cleanupWatchStreamState];
+ [self.watchStream stop];
+ [self.writeStream stop];
+}
+
+- (void)userDidChange:(FSTUser *)user {
+ FSTLog(@"FSTRemoteStore %p changing users: %@", (__bridge void *)self, user);
+
+ // Clear pending writes because those are per-user. Watched targets persist across users so
+ // don't clear those.
+ _lastBatchSeen = kFSTBatchIDUnknown;
+ [self.pendingWrites removeAllObjects];
+
+ // Stop the streams. They promise not to call us back.
+ [self.watchStream stop];
+ [self.writeStream stop];
+
+ [self cleanupWatchStreamState];
+
+ // Create new streams (but note they're not started yet).
+ [self setupStreams];
+
+ // If there are any watchedTargets properly handle the stream restart now that FSTRemoteStore
+ // is ready to handle them.
+ if ([self shouldStartWatchStream]) {
+ [self.watchStream start];
+ }
+
+ // Resume any writes
+ [self fillWritePipeline];
+
+ // User change moves us back to the unknown state because we might not
+ // want to re-open the stream
+ [self updateAndNotifyAboutOnlineState:FSTOnlineStateUnknown];
+}
+
+#pragma mark Watch Stream
+
+- (void)listenToTargetWithQueryData:(FSTQueryData *)queryData {
+ NSNumber *targetKey = @(queryData.targetID);
+ FSTAssert(!self.listenTargets[targetKey], @"listenToQuery called with duplicate target id: %@",
+ targetKey);
+
+ self.listenTargets[targetKey] = queryData;
+ if ([self.watchStream isOpen]) {
+ [self sendWatchRequestWithQueryData:queryData];
+ } else if (![self.watchStream isStarted]) {
+ [self.watchStream start];
+ }
+}
+
+- (void)sendWatchRequestWithQueryData:(FSTQueryData *)queryData {
+ [self recordPendingRequestForTargetID:@(queryData.targetID)];
+ [self.watchStream watchQuery:queryData];
+}
+
+- (void)stopListeningToTargetID:(FSTTargetID)targetID {
+ FSTBoxedTargetID *targetKey = @(targetID);
+ FSTQueryData *queryData = self.listenTargets[targetKey];
+ FSTAssert(queryData, @"unlistenToTarget: target not currently watched: %@", targetKey);
+
+ [self.listenTargets removeObjectForKey:targetKey];
+ if ([self.watchStream isOpen]) {
+ [self sendUnwatchRequestForTargetID:targetKey];
+ }
+}
+
+- (void)sendUnwatchRequestForTargetID:(FSTBoxedTargetID *)targetID {
+ [self recordPendingRequestForTargetID:targetID];
+ [self.watchStream unwatchTargetID:[targetID intValue]];
+}
+
+- (void)recordPendingRequestForTargetID:(FSTBoxedTargetID *)targetID {
+ NSNumber *count = [self.pendingTargetResponses objectForKey:targetID];
+ count = @([count intValue] + 1);
+ [self.pendingTargetResponses setObject:count forKey:targetID];
+}
+
+/**
+ * Returns whether the watch stream should be started because there are active targets trying to
+ * be listened to.
+ */
+- (BOOL)shouldStartWatchStream {
+ return self.listenTargets.count > 0;
+}
+
+- (void)cleanupWatchStreamState {
+ // If the connection is closed then we'll never get a snapshot version for the accumulated
+ // changes and so we'll never be able to complete the batch. When we start up again the server
+ // is going to resend these changes anyway, so just toss the accumulated state.
+ [self.accumulatedChanges removeAllObjects];
+ [self.pendingTargetResponses removeAllObjects];
+}
+
+- (void)watchStreamDidOpen {
+ // Restore any existing watches.
+ for (FSTQueryData *queryData in [self.listenTargets objectEnumerator]) {
+ [self sendWatchRequestWithQueryData:queryData];
+ }
+}
+
+- (void)watchStreamDidChange:(FSTWatchChange *)change
+ snapshotVersion:(FSTSnapshotVersion *)snapshotVersion {
+ // Mark the connection as healthy because we got a message from the server.
+ [self updateAndNotifyAboutOnlineState:FSTOnlineStateHealthy];
+
+ FSTWatchTargetChange *watchTargetChange =
+ [change isKindOfClass:[FSTWatchTargetChange class]] ? (FSTWatchTargetChange *)change : nil;
+
+ if (watchTargetChange && watchTargetChange.state == FSTWatchTargetChangeStateRemoved &&
+ watchTargetChange.cause) {
+ // There was an error on a target, don't wait for a consistent snapshot to raise events
+ [self processTargetErrorForWatchChange:(FSTWatchTargetChange *)change];
+ } else {
+ // Accumulate watch changes but don't process them if there's no snapshotVersion or it's
+ // older than a previous snapshot we've processed (can happen after we resume a target
+ // using a resume token).
+ [self.accumulatedChanges addObject:change];
+ FSTAssert(snapshotVersion, @"snapshotVersion must not be nil.");
+ if ([snapshotVersion isEqual:[FSTSnapshotVersion noVersion]] ||
+ [snapshotVersion compare:[self.localStore lastRemoteSnapshotVersion]] ==
+ NSOrderedAscending) {
+ return;
+ }
+
+ // Create a batch, giving it the accumulatedChanges array.
+ NSArray<FSTWatchChange *> *changes = self.accumulatedChanges;
+ self.accumulatedChanges = [NSMutableArray array];
+
+ [self processBatchedWatchChanges:changes snapshotVersion:snapshotVersion];
+ }
+}
+
+- (void)watchStreamDidClose:(NSError *_Nullable)error {
+ [self cleanupWatchStreamState];
+
+ // If there was an error, retry the connection.
+ if ([self shouldStartWatchStream]) {
+ // If the connection fails before the stream has become healthy, consider the online state
+ // failed. Otherwise consider the online state unknown and the next connection attempt will
+ // resolve the online state. For example, if a healthy stream is closed due to an expired token
+ // we want to have one more try at reconnecting before we consider the connection unhealthy.
+ if (self.watchStreamOnlineState == FSTOnlineStateHealthy) {
+ [self updateAndNotifyAboutOnlineState:FSTOnlineStateUnknown];
+ } else {
+ [self updateAndNotifyAboutOnlineState:FSTOnlineStateFailed];
+ }
+ [self.watchStream start];
+ } else {
+ // No need to restart watch stream because there are no active targets. The online state is set
+ // to unknown because there is no active attempt at establishing a connection.
+ [self updateAndNotifyAboutOnlineState:FSTOnlineStateUnknown];
+ }
+}
+
+/**
+ * Takes a batch of changes from the Datastore, repackages them as a RemoteEvent, and passes that
+ * on to the SyncEngine.
+ */
+- (void)processBatchedWatchChanges:(NSArray<FSTWatchChange *> *)changes
+ snapshotVersion:(FSTSnapshotVersion *)snapshotVersion {
+ FSTWatchChangeAggregator *aggregator =
+ [[FSTWatchChangeAggregator alloc] initWithSnapshotVersion:snapshotVersion
+ listenTargets:self.listenTargets
+ pendingTargetResponses:self.pendingTargetResponses];
+ [aggregator addWatchChanges:changes];
+ FSTRemoteEvent *remoteEvent = [aggregator remoteEvent];
+ [self.pendingTargetResponses removeAllObjects];
+ [self.pendingTargetResponses setDictionary:aggregator.pendingTargetResponses];
+
+ // Handle existence filters and existence filter mismatches
+ [aggregator.existenceFilters enumerateKeysAndObjectsUsingBlock:^(FSTBoxedTargetID *target,
+ FSTExistenceFilter *filter,
+ BOOL *stop) {
+ FSTTargetID targetID = target.intValue;
+
+ FSTQueryData *queryData = self.listenTargets[target];
+ FSTQuery *query = queryData.query;
+ if (!queryData) {
+ // A watched target might have been removed already.
+ return;
+
+ } else if ([query isDocumentQuery]) {
+ if (filter.count == 0) {
+ // The existence filter told us the document does not exist.
+ // We need to deduce that this document does not exist and apply a deleted document to our
+ // updates. Without applying a deleted document there might be another query that will
+ // raise this document as part of a snapshot until it is resolved, essentially exposing
+ // inconsistency between queries
+ FSTDocumentKey *key = [FSTDocumentKey keyWithPath:query.path];
+ FSTDeletedDocument *deletedDoc =
+ [FSTDeletedDocument documentWithKey:key version:snapshotVersion];
+ [remoteEvent addDocumentUpdate:deletedDoc];
+ } else {
+ FSTAssert(filter.count == 1, @"Single document existence filter with count: %" PRId32,
+ filter.count);
+ }
+
+ } else {
+ // Not a document query.
+ FSTDocumentKeySet *trackedRemote = [self.localStore remoteDocumentKeysForTarget:targetID];
+ FSTTargetMapping *mapping = remoteEvent.targetChanges[target].mapping;
+ if (mapping) {
+ if ([mapping isKindOfClass:[FSTUpdateMapping class]]) {
+ FSTUpdateMapping *update = (FSTUpdateMapping *)mapping;
+ trackedRemote = [update applyTo:trackedRemote];
+ } else {
+ FSTAssert([mapping isKindOfClass:[FSTResetMapping class]],
+ @"Expected either reset or update mapping but got something else %@", mapping);
+ trackedRemote = ((FSTResetMapping *)mapping).documents;
+ }
+ }
+
+ if (trackedRemote.count != (NSUInteger)filter.count) {
+ FSTLog(@"Existence filter mismatch, resetting mapping");
+
+ // Make sure the mismatch is exposed in the remote event
+ [remoteEvent handleExistenceFilterMismatchForTargetID:target];
+
+ // Clear the resume token for the query, since we're in a known mismatch state.
+ queryData =
+ [[FSTQueryData alloc] initWithQuery:query targetID:targetID purpose:queryData.purpose];
+ self.listenTargets[target] = queryData;
+
+ // Cause a hard reset by unwatching and rewatching immediately, but deliberately don't
+ // send a resume token so that we get a full update.
+ [self sendUnwatchRequestForTargetID:@(targetID)];
+
+ // Mark the query we send as being on behalf of an existence filter mismatch, but don't
+ // actually retain that in listenTargets. This ensures that we flag the first re-listen
+ // this way without impacting future listens of this target (that might happen e.g. on
+ // reconnect).
+ FSTQueryData *requestQueryData =
+ [[FSTQueryData alloc] initWithQuery:query
+ targetID:targetID
+ purpose:FSTQueryPurposeExistenceFilterMismatch];
+ [self sendWatchRequestWithQueryData:requestQueryData];
+ }
+ }
+ }];
+
+ // Update in-memory resume tokens. FSTLocalStore will update the persistent view of these when
+ // applying the completed FSTRemoteEvent.
+ [remoteEvent.targetChanges enumerateKeysAndObjectsUsingBlock:^(
+ FSTBoxedTargetID *target, FSTTargetChange *change, BOOL *stop) {
+ NSData *resumeToken = change.resumeToken;
+ if (resumeToken.length > 0) {
+ FSTQueryData *queryData = _listenTargets[target];
+ // A watched target might have been removed already.
+ if (queryData) {
+ _listenTargets[target] =
+ [queryData queryDataByReplacingSnapshotVersion:change.snapshotVersion
+ resumeToken:resumeToken];
+ }
+ }
+ }];
+
+ // Finally handle remote event
+ [self.syncEngine applyRemoteEvent:remoteEvent];
+}
+
+/** Process a target error and passes the error along to SyncEngine. */
+- (void)processTargetErrorForWatchChange:(FSTWatchTargetChange *)change {
+ FSTAssert(change.cause, @"Handling target error without a cause");
+ // Ignore targets that have been removed already.
+ for (FSTBoxedTargetID *targetID in change.targetIDs) {
+ if (self.listenTargets[targetID]) {
+ [self.listenTargets removeObjectForKey:targetID];
+ [self.syncEngine rejectListenWithTargetID:targetID error:change.cause];
+ }
+ }
+}
+
+#pragma mark Write Stream
+
+- (void)fillWritePipeline {
+ while ([self canWriteMutations]) {
+ FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen];
+ if (!batch) {
+ break;
+ }
+ [self commitBatch:batch];
+ }
+}
+
+/**
+ * Returns YES if the backend can accept additional write requests.
+ *
+ * When sending mutations to the write stream (e.g. in -fillWritePipeline), call this method first
+ * to check if more mutations can be sent.
+ *
+ * Currently the only thing that can prevent the backend from accepting write requests is if
+ * there are too many requests already outstanding. As writes complete the backend will be able
+ * to accept more.
+ */
+- (BOOL)canWriteMutations {
+ return self.pendingWrites.count < kMaxPendingWrites;
+}
+
+/** Given mutations to commit, actually commits them to the backend. */
+- (void)commitBatch:(FSTMutationBatch *)batch {
+ FSTAssert([self canWriteMutations], @"commitBatch called when mutations can't be written");
+ self.lastBatchSeen = batch.batchID;
+
+ if (!self.writeStream.isStarted) {
+ [self.writeStream start];
+ }
+
+ [self.pendingWrites addObject:batch];
+
+ if (self.writeStream.handshakeComplete) {
+ [self.writeStream writeMutations:batch.mutations];
+ }
+}
+
+- (void)writeStreamDidOpen {
+ self.writeStreamOpenTime = [NSDate date];
+
+ [self.writeStream writeHandshake];
+}
+
+/**
+ * Handles a successful handshake response from the server, which is our cue to send any pending
+ * writes.
+ */
+- (void)writeStreamDidCompleteHandshake {
+ // Record the stream token.
+ [self.localStore setLastStreamToken:self.writeStream.lastStreamToken];
+
+ // Drain any pending writes.
+ //
+ // Note that at this point pendingWrites contains mutations that have already been accepted by
+ // fillWritePipeline/commitBatch. If the pipeline is full, canWriteMutations will be NO, despite
+ // the fact that we actually need to send mutations over.
+ //
+ // This also means that this method indirectly respects the limits imposed by canWriteMutations
+ // since writes can't be added to the pendingWrites array when canWriteMutations is NO. If the
+ // limits imposed by canWriteMutations actually protect us from DOSing ourselves then those limits
+ // won't be exceeded here and we'll continue to make progress.
+ for (FSTMutationBatch *write in self.pendingWrites) {
+ [self.writeStream writeMutations:write.mutations];
+ }
+}
+
+/** Handles a successful StreamingWriteResponse from the server that contains a mutation result. */
+- (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion
+ mutationResults:(NSArray<FSTMutationResult *> *)results {
+ // This is a response to a write containing mutations and should be correlated to the first
+ // pending write.
+ NSMutableArray *pendingWrites = self.pendingWrites;
+ FSTMutationBatch *batch = pendingWrites[0];
+ [pendingWrites removeObjectAtIndex:0];
+
+ FSTMutationBatchResult *batchResult =
+ [FSTMutationBatchResult resultWithBatch:batch
+ commitVersion:commitVersion
+ mutationResults:results
+ streamToken:self.writeStream.lastStreamToken];
+ [self.syncEngine applySuccessfulWriteWithResult:batchResult];
+
+ // It's possible that with the completion of this mutation another slot has freed up.
+ [self fillWritePipeline];
+}
+
+/**
+ * Handles the closing of the StreamingWrite RPC, either because of an error or because the RPC
+ * has been terminated by the client or the server.
+ */
+- (void)writeStreamDidClose:(NSError *_Nullable)error {
+ NSMutableArray *pendingWrites = self.pendingWrites;
+ // Ignore close if there are no pending writes.
+ if (pendingWrites.count == 0) {
+ return;
+ }
+
+ FSTAssert(error, @"There are pending writes, but the write stream closed without an error.");
+ if ([FSTDatastore isPermanentWriteError:error]) {
+ if (self.writeStream.handshakeComplete) {
+ // This error affects the actual writes.
+ [self handleWriteError:error];
+ } else {
+ // If there was an error before the handshake finished, it's possible that the server is
+ // unable to process the stream token we're sending. (Perhaps it's too old?)
+ [self handleHandshakeError:error];
+ }
+ }
+
+ // The write stream might have been started by refilling the write pipeline for failed writes
+ if (pendingWrites.count > 0 && !self.writeStream.isStarted) {
+ [self.writeStream start];
+ }
+}
+
+- (void)handleHandshakeError:(NSError *)error {
+ // Reset the token if it's a permanent error or the error code is ABORTED, signaling the write
+ // stream is no longer valid.
+ if ([FSTDatastore isPermanentWriteError:error] || [FSTDatastore isAbortedError:error]) {
+ NSString *token = [self.writeStream.lastStreamToken base64EncodedStringWithOptions:0];
+ FSTLog(@"FSTRemoteStore %p error before completed handshake; resetting stream token %@: %@",
+ (__bridge void *)self, token, error);
+ self.writeStream.lastStreamToken = nil;
+ [self.localStore setLastStreamToken:nil];
+ }
+}
+
+- (void)handleWriteError:(NSError *)error {
+ // Only handle permanent error. If it's transient, just let the retry logic kick in.
+ if (![FSTDatastore isPermanentWriteError:error]) {
+ return;
+ }
+
+ // If this was a permanent error, the request itself was the problem so it's not going to
+ // succeed if we resend it.
+ FSTMutationBatch *batch = self.pendingWrites[0];
+ [self.pendingWrites removeObjectAtIndex:0];
+
+ // In this case it's also unlikely that the server itself is melting down--this was just a
+ // bad request so inhibit backoff on the next restart.
+ [self.writeStream inhibitBackoff];
+
+ [self.syncEngine rejectFailedWriteWithBatchID:batch.batchID error:error];
+
+ // It's possible that with the completion of this mutation another slot has freed up.
+ [self fillWritePipeline];
+}
+
+- (FSTTransaction *)transaction {
+ return [FSTTransaction transactionWithDatastore:self.datastore];
+}
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTSerializerBeta.h b/Firestore/Source/Remote/FSTSerializerBeta.h
new file mode 100644
index 0000000..973f866
--- /dev/null
+++ b/Firestore/Source/Remote/FSTSerializerBeta.h
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import <Foundation/Foundation.h>
+
+@class FSTDatabaseID;
+@class FSTDocumentKey;
+@class FSTFieldValue;
+@class FSTMaybeDocument;
+@class FSTMutation;
+@class FSTMutationBatch;
+@class FSTMutationResult;
+@class FSTObjectValue;
+@class FSTQuery;
+@class FSTQueryData;
+@class FSTSnapshotVersion;
+@class FSTTimestamp;
+@class FSTWatchChange;
+
+@class GCFSBatchGetDocumentsResponse;
+@class GCFSDocument;
+@class GCFSDocumentMask;
+@class GCFSListenResponse;
+@class GCFSTarget;
+@class GCFSTarget_DocumentsTarget;
+@class GCFSTarget_QueryTarget;
+@class GCFSValue;
+@class GCFSWrite;
+@class GCFSWriteResult;
+
+@class GPBTimestamp;
+
+NS_ASSUME_NONNULL_BEGIN
+
+/**
+ * Converts internal model objects to their equivalent protocol buffer form. Methods starting with
+ * "encoded" convert to a protocol buffer and methods starting with "decoded" convert from a
+ * protocol buffer.
+ *
+ * Throws an exception if a protocol buffer is missing a critical field or has a value we can't
+ * interpret.
+ */
+@interface FSTSerializerBeta : NSObject
+
+- (instancetype)init NS_UNAVAILABLE;
+
+- (instancetype)initWithDatabaseID:(FSTDatabaseID *)databaseID NS_DESIGNATED_INITIALIZER;
+
+- (GPBTimestamp *)encodedTimestamp:(FSTTimestamp *)timestamp;
+- (FSTTimestamp *)decodedTimestamp:(GPBTimestamp *)timestamp;
+
+- (GPBTimestamp *)encodedVersion:(FSTSnapshotVersion *)version;
+- (FSTSnapshotVersion *)decodedVersion:(GPBTimestamp *)version;
+
+/** Returns the database ID, such as `projects/{project id}/databases/{database_id}`. */
+- (NSString *)encodedDatabaseID;
+
+- (NSString *)encodedDocumentKey:(FSTDocumentKey *)key;
+- (FSTDocumentKey *)decodedDocumentKey:(NSString *)key;
+
+- (GCFSValue *)encodedFieldValue:(FSTFieldValue *)fieldValue;
+- (FSTFieldValue *)decodedFieldValue:(GCFSValue *)valueProto;
+
+- (GCFSWrite *)encodedMutation:(FSTMutation *)mutation;
+- (FSTMutation *)decodedMutation:(GCFSWrite *)mutation;
+
+- (FSTMutationResult *)decodedMutationResult:(GCFSWriteResult *)mutation;
+
+- (nullable NSMutableDictionary<NSString *, NSString *> *)encodedListenRequestLabelsForQueryData:
+ (FSTQueryData *)queryData;
+
+- (GCFSTarget *)encodedTarget:(FSTQueryData *)queryData;
+
+- (GCFSTarget_DocumentsTarget *)encodedDocumentsTarget:(FSTQuery *)query;
+- (FSTQuery *)decodedQueryFromDocumentsTarget:(GCFSTarget_DocumentsTarget *)target;
+
+- (GCFSTarget_QueryTarget *)encodedQueryTarget:(FSTQuery *)query;
+- (FSTQuery *)decodedQueryFromQueryTarget:(GCFSTarget_QueryTarget *)target;
+
+- (FSTWatchChange *)decodedWatchChange:(GCFSListenResponse *)watchChange;
+- (FSTSnapshotVersion *)versionFromListenResponse:(GCFSListenResponse *)watchChange;
+
+- (GCFSDocument *)encodedDocumentWithFields:(FSTObjectValue *)objectValue key:(FSTDocumentKey *)key;
+
+/**
+ * Encodes an FSTObjectValue into a dictionary.
+ * @return a new dictionary that can be assigned to a field in another proto.
+ */
+- (NSMutableDictionary<NSString *, GCFSValue *> *)encodedFields:(FSTObjectValue *)value;
+
+- (FSTObjectValue *)decodedFields:(NSDictionary<NSString *, GCFSValue *> *)fields;
+
+- (FSTMaybeDocument *)decodedMaybeDocumentFromBatch:(GCFSBatchGetDocumentsResponse *)response;
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTSerializerBeta.m b/Firestore/Source/Remote/FSTSerializerBeta.m
new file mode 100644
index 0000000..418dabd
--- /dev/null
+++ b/Firestore/Source/Remote/FSTSerializerBeta.m
@@ -0,0 +1,1084 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import "FSTSerializerBeta.h"
+
+#import <GRPCClient/GRPCCall.h>
+
+#import "Common.pbobjc.h"
+#import "Document.pbobjc.h"
+#import "Firestore.pbobjc.h"
+#import "Latlng.pbobjc.h"
+#import "Query.pbobjc.h"
+#import "Status.pbobjc.h"
+#import "Write.pbobjc.h"
+
+#import "FIRFirestoreErrors.h"
+#import "FIRGeoPoint.h"
+#import "FSTAssert.h"
+#import "FSTDatabaseID.h"
+#import "FSTDocument.h"
+#import "FSTDocumentKey.h"
+#import "FSTExistenceFilter.h"
+#import "FSTFieldValue.h"
+#import "FSTMutation.h"
+#import "FSTMutationBatch.h"
+#import "FSTPath.h"
+#import "FSTQuery.h"
+#import "FSTQueryData.h"
+#import "FSTSnapshotVersion.h"
+#import "FSTTimestamp.h"
+#import "FSTWatchChange.h"
+
+NS_ASSUME_NONNULL_BEGIN
+
+@interface FSTSerializerBeta ()
+@property(nonatomic, strong, readonly) FSTDatabaseID *databaseID;
+@end
+
+@implementation FSTSerializerBeta
+
+- (instancetype)initWithDatabaseID:(FSTDatabaseID *)databaseID {
+ self = [super init];
+ if (self) {
+ _databaseID = databaseID;
+ }
+ return self;
+}
+
+#pragma mark - FSTSnapshotVersion <=> GPBTimestamp
+
+- (GPBTimestamp *)encodedTimestamp:(FSTTimestamp *)timestamp {
+ GPBTimestamp *result = [GPBTimestamp message];
+ result.seconds = timestamp.seconds;
+ result.nanos = timestamp.nanos;
+ return result;
+}
+
+- (FSTTimestamp *)decodedTimestamp:(GPBTimestamp *)timestamp {
+ return [[FSTTimestamp alloc] initWithSeconds:timestamp.seconds nanos:timestamp.nanos];
+}
+
+- (GPBTimestamp *)encodedVersion:(FSTSnapshotVersion *)version {
+ return [self encodedTimestamp:version.timestamp];
+}
+
+- (FSTSnapshotVersion *)decodedVersion:(GPBTimestamp *)version {
+ return [FSTSnapshotVersion versionWithTimestamp:[self decodedTimestamp:version]];
+}
+
+#pragma mark - FIRGeoPoint <=> GTPLatLng
+
+- (GTPLatLng *)encodedGeoPoint:(FIRGeoPoint *)geoPoint {
+ GTPLatLng *latLng = [GTPLatLng message];
+ latLng.latitude = geoPoint.latitude;
+ latLng.longitude = geoPoint.longitude;
+ return latLng;
+}
+
+- (FIRGeoPoint *)decodedGeoPoint:(GTPLatLng *)latLng {
+ return [[FIRGeoPoint alloc] initWithLatitude:latLng.latitude longitude:latLng.longitude];
+}
+
+#pragma mark - FSTDocumentKey <=> Key proto
+
+- (NSString *)encodedDocumentKey:(FSTDocumentKey *)key {
+ return [self encodedResourcePathForDatabaseID:self.databaseID path:key.path];
+}
+
+- (FSTDocumentKey *)decodedDocumentKey:(NSString *)name {
+ FSTResourcePath *path = [self decodedResourcePathWithDatabaseID:name];
+ FSTAssert([[path segmentAtIndex:1] isEqualToString:self.databaseID.projectID],
+ @"Tried to deserialize key from different project.");
+ FSTAssert([[path segmentAtIndex:3] isEqualToString:self.databaseID.databaseID],
+ @"Tried to deserialize key from different datbase.");
+ return [FSTDocumentKey keyWithPath:[self localResourcePathForQualifiedResourcePath:path]];
+}
+
+- (NSString *)encodedResourcePathForDatabaseID:(FSTDatabaseID *)databaseID
+ path:(FSTResourcePath *)path {
+ return [[[[self encodedResourcePathForDatabaseID:databaseID] pathByAppendingSegment:@"documents"]
+ pathByAppendingPath:path] canonicalString];
+}
+
+- (FSTResourcePath *)decodedResourcePathWithDatabaseID:(NSString *)name {
+ FSTResourcePath *path = [FSTResourcePath pathWithString:name];
+ FSTAssert([self validQualifiedResourcePath:path], @"Tried to deserialize invalid key %@", path);
+ return path;
+}
+
+- (NSString *)encodedQueryPath:(FSTResourcePath *)path {
+ if (path.length == 0) {
+ // If the path is empty, the backend requires we leave off the /documents at the end.
+ return [self encodedDatabaseID];
+ }
+ return [self encodedResourcePathForDatabaseID:self.databaseID path:path];
+}
+
+- (FSTResourcePath *)decodedQueryPath:(NSString *)name {
+ FSTResourcePath *resource = [self decodedResourcePathWithDatabaseID:name];
+ if (resource.length == 4) {
+ return [FSTResourcePath pathWithSegments:@[]];
+ } else {
+ return [self localResourcePathForQualifiedResourcePath:resource];
+ }
+}
+
+- (FSTResourcePath *)encodedResourcePathForDatabaseID:(FSTDatabaseID *)databaseID {
+ return [FSTResourcePath
+ pathWithSegments:@[ @"projects", databaseID.projectID, @"databases", databaseID.databaseID ]];
+}
+
+- (FSTResourcePath *)localResourcePathForQualifiedResourcePath:(FSTResourcePath *)resourceName {
+ FSTAssert(
+ resourceName.length > 4 && [[resourceName segmentAtIndex:4] isEqualToString:@"documents"],
+ @"Tried to deserialize invalid key %@", resourceName);
+ return [resourceName pathByRemovingFirstSegments:5];
+}
+
+- (BOOL)validQualifiedResourcePath:(FSTResourcePath *)path {
+ return path.length >= 4 && [[path segmentAtIndex:0] isEqualToString:@"projects"] &&
+ [[path segmentAtIndex:2] isEqualToString:@"databases"];
+}
+
+- (NSString *)encodedDatabaseID {
+ return [[self encodedResourcePathForDatabaseID:self.databaseID] canonicalString];
+}
+
+#pragma mark - FSTFieldValue <=> Value proto
+
+- (GCFSValue *)encodedFieldValue:(FSTFieldValue *)fieldValue {
+ Class class = [fieldValue class];
+ if (class == [FSTNullValue class]) {
+ return [self encodedNull];
+
+ } else if (class == [FSTBooleanValue class]) {
+ return [self encodedBool:[[fieldValue value] boolValue]];
+
+ } else if (class == [FSTIntegerValue class]) {
+ return [self encodedInteger:[[fieldValue value] longLongValue]];
+
+ } else if (class == [FSTDoubleValue class]) {
+ return [self encodedDouble:[[fieldValue value] doubleValue]];
+
+ } else if (class == [FSTStringValue class]) {
+ return [self encodedString:[fieldValue value]];
+
+ } else if (class == [FSTTimestampValue class]) {
+ return [self encodedTimestampValue:((FSTTimestampValue *)fieldValue).internalValue];
+
+ } else if (class == [FSTGeoPointValue class]) {
+ return [self encodedGeoPointValue:[fieldValue value]];
+
+ } else if (class == [FSTBlobValue class]) {
+ return [self encodedBlobValue:[fieldValue value]];
+
+ } else if (class == [FSTReferenceValue class]) {
+ FSTReferenceValue *ref = (FSTReferenceValue *)fieldValue;
+ return [self encodedReferenceValueForDatabaseID:[ref databaseID] key:[ref value]];
+
+ } else if (class == [FSTObjectValue class]) {
+ GCFSValue *result = [GCFSValue message];
+ result.mapValue = [self encodedMapValue:(FSTObjectValue *)fieldValue];
+ return result;
+
+ } else if (class == [FSTArrayValue class]) {
+ GCFSValue *result = [GCFSValue message];
+ result.arrayValue = [self encodedArrayValue:(FSTArrayValue *)fieldValue];
+ return result;
+
+ } else {
+ FSTFail(@"Unhandled type %@ on %@", NSStringFromClass([fieldValue class]), fieldValue);
+ }
+}
+
+- (FSTFieldValue *)decodedFieldValue:(GCFSValue *)valueProto {
+ switch (valueProto.valueTypeOneOfCase) {
+ case GCFSValue_ValueType_OneOfCase_NullValue:
+ return [FSTNullValue nullValue];
+
+ case GCFSValue_ValueType_OneOfCase_BooleanValue:
+ return [FSTBooleanValue booleanValue:valueProto.booleanValue];
+
+ case GCFSValue_ValueType_OneOfCase_IntegerValue:
+ return [FSTIntegerValue integerValue:valueProto.integerValue];
+
+ case GCFSValue_ValueType_OneOfCase_DoubleValue:
+ return [FSTDoubleValue doubleValue:valueProto.doubleValue];
+
+ case GCFSValue_ValueType_OneOfCase_StringValue:
+ return [FSTStringValue stringValue:valueProto.stringValue];
+
+ case GCFSValue_ValueType_OneOfCase_TimestampValue:
+ return [FSTTimestampValue timestampValue:[self decodedTimestamp:valueProto.timestampValue]];
+
+ case GCFSValue_ValueType_OneOfCase_GeoPointValue:
+ return [FSTGeoPointValue geoPointValue:[self decodedGeoPoint:valueProto.geoPointValue]];
+
+ case GCFSValue_ValueType_OneOfCase_BytesValue:
+ return [FSTBlobValue blobValue:valueProto.bytesValue];
+
+ case GCFSValue_ValueType_OneOfCase_ReferenceValue:
+ return [self decodedReferenceValue:valueProto.referenceValue];
+
+ case GCFSValue_ValueType_OneOfCase_ArrayValue:
+ return [self decodedArrayValue:valueProto.arrayValue];
+
+ case GCFSValue_ValueType_OneOfCase_MapValue:
+ return [self decodedMapValue:valueProto.mapValue];
+
+ default:
+ FSTFail(@"Unhandled type %d on %@", valueProto.valueTypeOneOfCase, valueProto);
+ }
+}
+
+- (GCFSValue *)encodedNull {
+ GCFSValue *result = [GCFSValue message];
+ result.nullValue = GPBNullValue_NullValue;
+ return result;
+}
+
+- (GCFSValue *)encodedBool:(BOOL)value {
+ GCFSValue *result = [GCFSValue message];
+ result.booleanValue = value;
+ return result;
+}
+
+- (GCFSValue *)encodedDouble:(double)value {
+ GCFSValue *result = [GCFSValue message];
+ result.doubleValue = value;
+ return result;
+}
+
+- (GCFSValue *)encodedInteger:(int64_t)value {
+ GCFSValue *result = [GCFSValue message];
+ result.integerValue = value;
+ return result;
+}
+
+- (GCFSValue *)encodedString:(NSString *)value {
+ GCFSValue *result = [GCFSValue message];
+ result.stringValue = value;
+ return result;
+}
+
+- (GCFSValue *)encodedTimestampValue:(FSTTimestamp *)value {
+ GCFSValue *result = [GCFSValue message];
+ result.timestampValue = [self encodedTimestamp:value];
+ return result;
+}
+
+- (GCFSValue *)encodedGeoPointValue:(FIRGeoPoint *)value {
+ GCFSValue *result = [GCFSValue message];
+ result.geoPointValue = [self encodedGeoPoint:value];
+ return result;
+}
+
+- (GCFSValue *)encodedBlobValue:(NSData *)value {
+ GCFSValue *result = [GCFSValue message];
+ result.bytesValue = value;
+ return result;
+}
+
+- (GCFSValue *)encodedReferenceValueForDatabaseID:(FSTDatabaseID *)databaseID
+ key:(FSTDocumentKey *)key {
+ GCFSValue *result = [GCFSValue message];
+ result.referenceValue = [self encodedResourcePathForDatabaseID:databaseID path:key.path];
+ return result;
+}
+
+- (FSTReferenceValue *)decodedReferenceValue:(NSString *)resourceName {
+ FSTResourcePath *path = [self decodedResourcePathWithDatabaseID:resourceName];
+ NSString *project = [path segmentAtIndex:1];
+ NSString *database = [path segmentAtIndex:3];
+ FSTDatabaseID *databaseID = [FSTDatabaseID databaseIDWithProject:project database:database];
+ FSTDocumentKey *key =
+ [FSTDocumentKey keyWithPath:[self localResourcePathForQualifiedResourcePath:path]];
+ return [FSTReferenceValue referenceValue:key databaseID:databaseID];
+}
+
+- (GCFSArrayValue *)encodedArrayValue:(FSTArrayValue *)arrayValue {
+ GCFSArrayValue *proto = [GCFSArrayValue message];
+ NSMutableArray<GCFSValue *> *protoContents = [proto valuesArray];
+
+ [[arrayValue internalValue]
+ enumerateObjectsUsingBlock:^(FSTFieldValue *value, NSUInteger idx, BOOL *stop) {
+ GCFSValue *converted = [self encodedFieldValue:value];
+ [protoContents addObject:converted];
+ }];
+ return proto;
+}
+
+- (FSTArrayValue *)decodedArrayValue:(GCFSArrayValue *)arrayValue {
+ NSMutableArray<FSTFieldValue *> *contents =
+ [NSMutableArray arrayWithCapacity:arrayValue.valuesArray_Count];
+
+ [arrayValue.valuesArray
+ enumerateObjectsUsingBlock:^(GCFSValue *value, NSUInteger idx, BOOL *stop) {
+ [contents addObject:[self decodedFieldValue:value]];
+ }];
+ return [[FSTArrayValue alloc] initWithValueNoCopy:contents];
+}
+
+- (GCFSMapValue *)encodedMapValue:(FSTObjectValue *)value {
+ GCFSMapValue *result = [GCFSMapValue message];
+ result.fields = [self encodedFields:value];
+ return result;
+}
+
+- (FSTObjectValue *)decodedMapValue:(GCFSMapValue *)map {
+ return [self decodedFields:map.fields];
+}
+
+/**
+ * Encodes an FSTObjectValue into a dictionary.
+ * @return a new dictionary that can be assigned to a field in another proto.
+ */
+- (NSMutableDictionary<NSString *, GCFSValue *> *)encodedFields:(FSTObjectValue *)value {
+ FSTImmutableSortedDictionary<NSString *, FSTFieldValue *> *fields = value.internalValue;
+ NSMutableDictionary<NSString *, GCFSValue *> *result = [NSMutableDictionary dictionary];
+ [fields enumerateKeysAndObjectsUsingBlock:^(NSString *key, FSTFieldValue *obj, BOOL *stop) {
+ GCFSValue *converted = [self encodedFieldValue:obj];
+ result[key] = converted;
+ }];
+ return result;
+}
+
+- (FSTObjectValue *)decodedFields:(NSDictionary<NSString *, GCFSValue *> *)fields {
+ __block FSTObjectValue *result = [FSTObjectValue objectValue];
+ [fields enumerateKeysAndObjectsUsingBlock:^(NSString *_Nonnull key, GCFSValue *_Nonnull obj,
+ BOOL *_Nonnull stop) {
+ FSTFieldPath *path = [FSTFieldPath pathWithSegments:@[ key ]];
+ FSTFieldValue *value = [self decodedFieldValue:obj];
+ result = [result objectBySettingValue:value forPath:path];
+ }];
+ return result;
+}
+
+#pragma mark - FSTObjectValue <=> Document proto
+
+- (GCFSDocument *)encodedDocumentWithFields:(FSTObjectValue *)objectValue
+ key:(FSTDocumentKey *)key {
+ GCFSDocument *proto = [GCFSDocument message];
+ proto.name = [self encodedDocumentKey:key];
+ proto.fields = [self encodedFields:objectValue];
+ return proto;
+}
+
+#pragma mark - FSTMaybeDocument <= BatchGetDocumentsResponse proto
+
+- (FSTMaybeDocument *)decodedMaybeDocumentFromBatch:(GCFSBatchGetDocumentsResponse *)response {
+ switch (response.resultOneOfCase) {
+ case GCFSBatchGetDocumentsResponse_Result_OneOfCase_Found:
+ return [self decodedFoundDocument:response];
+ case GCFSBatchGetDocumentsResponse_Result_OneOfCase_Missing:
+ return [self decodedDeletedDocument:response];
+ default:
+ FSTFail(@"Unknown document type: %@", response);
+ }
+}
+
+- (FSTDocument *)decodedFoundDocument:(GCFSBatchGetDocumentsResponse *)response {
+ FSTAssert(!!response.found, @"Tried to deserialize a found document from a deleted document.");
+ FSTDocumentKey *key = [self decodedDocumentKey:response.found.name];
+ FSTObjectValue *value = [self decodedFields:response.found.fields];
+ FSTSnapshotVersion *version = [self decodedVersion:response.found.updateTime];
+ FSTAssert(![version isEqual:[FSTSnapshotVersion noVersion]],
+ @"Got a document response with no snapshot version");
+
+ return [FSTDocument documentWithData:value key:key version:version hasLocalMutations:NO];
+}
+
+- (FSTDeletedDocument *)decodedDeletedDocument:(GCFSBatchGetDocumentsResponse *)response {
+ FSTAssert(!!response.missing, @"Tried to deserialize a deleted document from a found document.");
+ FSTDocumentKey *key = [self decodedDocumentKey:response.missing];
+ FSTSnapshotVersion *version = [self decodedVersion:response.readTime];
+ FSTAssert(![version isEqual:[FSTSnapshotVersion noVersion]],
+ @"Got a no document response with no snapshot version");
+ return [FSTDeletedDocument documentWithKey:key version:version];
+}
+
+#pragma mark - FSTMutation => GCFSWrite proto
+
+- (GCFSWrite *)encodedMutation:(FSTMutation *)mutation {
+ GCFSWrite *proto = [GCFSWrite message];
+
+ Class mutationClass = [mutation class];
+ if (mutationClass == [FSTSetMutation class]) {
+ FSTSetMutation *set = (FSTSetMutation *)mutation;
+ proto.update = [self encodedDocumentWithFields:set.value key:set.key];
+
+ } else if (mutationClass == [FSTPatchMutation class]) {
+ FSTPatchMutation *patch = (FSTPatchMutation *)mutation;
+ proto.update = [self encodedDocumentWithFields:patch.value key:patch.key];
+ proto.updateMask = [self encodedFieldMask:patch.fieldMask];
+
+ } else if (mutationClass == [FSTTransformMutation class]) {
+ FSTTransformMutation *transform = (FSTTransformMutation *)mutation;
+
+ proto.transform = [GCFSDocumentTransform message];
+ proto.transform.document = [self encodedDocumentKey:transform.key];
+ proto.transform.fieldTransformsArray = [self encodedFieldTransforms:transform.fieldTransforms];
+ // NOTE: We set a precondition of exists: true as a safety-check, since we always combine
+ // FSTTransformMutations with an FSTSetMutation or FSTPatchMutation which (if successful) should
+ // end up with an existing document.
+ proto.currentDocument.exists = YES;
+
+ } else if (mutationClass == [FSTDeleteMutation class]) {
+ FSTDeleteMutation *delete = (FSTDeleteMutation *)mutation;
+ proto.delete_p = [self encodedDocumentKey:delete.key];
+
+ } else {
+ FSTFail(@"Unknown mutation type %@", NSStringFromClass(mutationClass));
+ }
+
+ if (!mutation.precondition.isNone) {
+ proto.currentDocument = [self encodedPrecondition:mutation.precondition];
+ }
+
+ return proto;
+}
+
+- (FSTMutation *)decodedMutation:(GCFSWrite *)mutation {
+ FSTPrecondition *precondition = [mutation hasCurrentDocument]
+ ? [self decodedPrecondition:mutation.currentDocument]
+ : [FSTPrecondition none];
+
+ switch (mutation.operationOneOfCase) {
+ case GCFSWrite_Operation_OneOfCase_Update:
+ if (mutation.hasUpdateMask) {
+ return [[FSTPatchMutation alloc] initWithKey:[self decodedDocumentKey:mutation.update.name]
+ fieldMask:[self decodedFieldMask:mutation.updateMask]
+ value:[self decodedFields:mutation.update.fields]
+ precondition:precondition];
+ } else {
+ return [[FSTSetMutation alloc] initWithKey:[self decodedDocumentKey:mutation.update.name]
+ value:[self decodedFields:mutation.update.fields]
+ precondition:precondition];
+ }
+
+ case GCFSWrite_Operation_OneOfCase_Delete_p:
+ return [[FSTDeleteMutation alloc] initWithKey:[self decodedDocumentKey:mutation.delete_p]
+ precondition:precondition];
+
+ case GCFSWrite_Operation_OneOfCase_Transform: {
+ FSTPreconditionExists exists = precondition.exists;
+ FSTAssert(exists == FSTPreconditionExistsYes,
+ @"Transforms must have precondition \"exists == true\"");
+
+ return [[FSTTransformMutation alloc]
+ initWithKey:[self decodedDocumentKey:mutation.transform.document]
+ fieldTransforms:[self decodedFieldTransforms:mutation.transform.fieldTransformsArray]];
+ }
+
+ default:
+ // Note that insert is intentionally unhandled, since we don't ever deal in them.
+ FSTFail(@"Unknown mutation operation: %d", mutation.operationOneOfCase);
+ }
+}
+
+- (GCFSPrecondition *)encodedPrecondition:(FSTPrecondition *)precondition {
+ FSTAssert(!precondition.isNone, @"Can't serialize an empty precondition");
+ GCFSPrecondition *message = [GCFSPrecondition message];
+ if (precondition.updateTime) {
+ message.updateTime = [self encodedVersion:precondition.updateTime];
+ } else if (precondition.exists != FSTPreconditionExistsNotSet) {
+ message.exists = precondition.exists == FSTPreconditionExistsYes;
+ } else {
+ FSTFail(@"Unknown precondition: %@", precondition);
+ }
+ return message;
+}
+
+- (FSTPrecondition *)decodedPrecondition:(GCFSPrecondition *)precondition {
+ switch (precondition.conditionTypeOneOfCase) {
+ case GCFSPrecondition_ConditionType_OneOfCase_GPBUnsetOneOfCase:
+ return [FSTPrecondition none];
+
+ case GCFSPrecondition_ConditionType_OneOfCase_Exists:
+ return [FSTPrecondition preconditionWithExists:precondition.exists];
+
+ case GCFSPrecondition_ConditionType_OneOfCase_UpdateTime:
+ return [FSTPrecondition
+ preconditionWithUpdateTime:[self decodedVersion:precondition.updateTime]];
+
+ default:
+ FSTFail(@"Unrecognized Precondition one-of case %@", precondition);
+ }
+}
+
+- (GCFSDocumentMask *)encodedFieldMask:(FSTFieldMask *)fieldMask {
+ GCFSDocumentMask *mask = [GCFSDocumentMask message];
+ for (FSTFieldPath *field in fieldMask.fields) {
+ [mask.fieldPathsArray addObject:field.canonicalString];
+ }
+ return mask;
+}
+
+- (FSTFieldMask *)decodedFieldMask:(GCFSDocumentMask *)fieldMask {
+ NSMutableArray<FSTFieldPath *> *fields =
+ [NSMutableArray arrayWithCapacity:fieldMask.fieldPathsArray_Count];
+ for (NSString *path in fieldMask.fieldPathsArray) {
+ [fields addObject:[FSTFieldPath pathWithServerFormat:path]];
+ }
+ return [[FSTFieldMask alloc] initWithFields:fields];
+}
+
+- (NSMutableArray<GCFSDocumentTransform_FieldTransform *> *)encodedFieldTransforms:
+ (NSArray<FSTFieldTransform *> *)fieldTransforms {
+ NSMutableArray *protos = [NSMutableArray array];
+ for (FSTFieldTransform *fieldTransform in fieldTransforms) {
+ FSTAssert([fieldTransform.transform isKindOfClass:[FSTServerTimestampTransform class]],
+ @"Unknown transform: %@", fieldTransform.transform);
+ GCFSDocumentTransform_FieldTransform *proto = [GCFSDocumentTransform_FieldTransform message];
+ proto.fieldPath = fieldTransform.path.canonicalString;
+ proto.setToServerValue = GCFSDocumentTransform_FieldTransform_ServerValue_RequestTime;
+ [protos addObject:proto];
+ }
+ return protos;
+}
+
+- (NSArray<FSTFieldTransform *> *)decodedFieldTransforms:
+ (NSArray<GCFSDocumentTransform_FieldTransform *> *)protos {
+ NSMutableArray<FSTFieldTransform *> *fieldTransforms = [NSMutableArray array];
+ for (GCFSDocumentTransform_FieldTransform *proto in protos) {
+ FSTAssert(
+ proto.setToServerValue == GCFSDocumentTransform_FieldTransform_ServerValue_RequestTime,
+ @"Unknown transform setToServerValue: %d", proto.setToServerValue);
+ [fieldTransforms
+ addObject:[[FSTFieldTransform alloc]
+ initWithPath:[FSTFieldPath pathWithServerFormat:proto.fieldPath]
+ transform:[FSTServerTimestampTransform serverTimestampTransform]]];
+ }
+ return fieldTransforms;
+}
+
+#pragma mark - FSTMutationResult <= GCFSWriteResult proto
+
+- (FSTMutationResult *)decodedMutationResult:(GCFSWriteResult *)mutation {
+ // NOTE: Deletes don't have an updateTime.
+ FSTSnapshotVersion *_Nullable version =
+ mutation.updateTime ? [self decodedVersion:mutation.updateTime] : nil;
+ NSMutableArray *_Nullable transformResults = nil;
+ if (mutation.transformResultsArray.count > 0) {
+ transformResults = [NSMutableArray array];
+ for (GCFSValue *result in mutation.transformResultsArray) {
+ [transformResults addObject:[self decodedFieldValue:result]];
+ }
+ }
+ return [[FSTMutationResult alloc] initWithVersion:version transformResults:transformResults];
+}
+
+#pragma mark - FSTQueryData => GCFSTarget proto
+
+- (nullable NSMutableDictionary<NSString *, NSString *> *)encodedListenRequestLabelsForQueryData:
+ (FSTQueryData *)queryData {
+ NSString *value = [self encodedLabelForPurpose:queryData.purpose];
+ if (!value) {
+ return nil;
+ }
+
+ NSMutableDictionary<NSString *, NSString *> *result =
+ [NSMutableDictionary dictionaryWithCapacity:1];
+ [result setObject:value forKey:@"goog-listen-tags"];
+ return result;
+}
+
+- (nullable NSString *)encodedLabelForPurpose:(FSTQueryPurpose)purpose {
+ switch (purpose) {
+ case FSTQueryPurposeListen:
+ return nil;
+ case FSTQueryPurposeExistenceFilterMismatch:
+ return @"existence-filter-mismatch";
+ case FSTQueryPurposeLimboResolution:
+ return @"limbo-document";
+ default:
+ FSTFail(@"Unrecognized query purpose: %lu", (unsigned long)purpose);
+ }
+}
+
+- (GCFSTarget *)encodedTarget:(FSTQueryData *)queryData {
+ GCFSTarget *result = [GCFSTarget message];
+ FSTQuery *query = queryData.query;
+
+ if ([query isDocumentQuery]) {
+ result.documents = [self encodedDocumentsTarget:query];
+ } else {
+ result.query = [self encodedQueryTarget:query];
+ }
+
+ result.targetId = queryData.targetID;
+ if (queryData.resumeToken.length > 0) {
+ result.resumeToken = queryData.resumeToken;
+ }
+
+ return result;
+}
+
+- (GCFSTarget_DocumentsTarget *)encodedDocumentsTarget:(FSTQuery *)query {
+ GCFSTarget_DocumentsTarget *result = [GCFSTarget_DocumentsTarget message];
+ NSMutableArray<NSString *> *docs = result.documentsArray;
+ [docs addObject:[self encodedQueryPath:query.path]];
+ return result;
+}
+
+- (FSTQuery *)decodedQueryFromDocumentsTarget:(GCFSTarget_DocumentsTarget *)target {
+ NSArray<NSString *> *documents = target.documentsArray;
+ FSTAssert(documents.count == 1, @"DocumentsTarget contained other than 1 document %lu",
+ (unsigned long)documents.count);
+
+ NSString *name = documents[0];
+ return [FSTQuery queryWithPath:[self decodedQueryPath:name]];
+}
+
+- (GCFSTarget_QueryTarget *)encodedQueryTarget:(FSTQuery *)query {
+ // Dissect the path into parent, collectionId, and optional key filter.
+ GCFSTarget_QueryTarget *queryTarget = [GCFSTarget_QueryTarget message];
+ if (query.path.length == 0) {
+ queryTarget.parent = [self encodedQueryPath:query.path];
+ } else {
+ FSTResourcePath *path = query.path;
+ FSTAssert(path.length % 2 != 0, @"Document queries with filters are not supported.");
+ queryTarget.parent = [self encodedQueryPath:[path pathByRemovingLastSegment]];
+ GCFSStructuredQuery_CollectionSelector *from = [GCFSStructuredQuery_CollectionSelector message];
+ from.collectionId = path.lastSegment;
+ [queryTarget.structuredQuery.fromArray addObject:from];
+ }
+
+ // Encode the filters.
+ GCFSStructuredQuery_Filter *_Nullable where = [self encodedFilters:query.filters];
+ if (where) {
+ queryTarget.structuredQuery.where = where;
+ }
+
+ NSArray<GCFSStructuredQuery_Order *> *orders = [self encodedSortOrders:query.sortOrders];
+ if (orders.count) {
+ [queryTarget.structuredQuery.orderByArray addObjectsFromArray:orders];
+ }
+
+ if (query.limit != NSNotFound) {
+ queryTarget.structuredQuery.limit.value = (int32_t)query.limit;
+ }
+
+ if (query.startAt) {
+ queryTarget.structuredQuery.startAt = [self encodedBound:query.startAt];
+ }
+
+ if (query.endAt) {
+ queryTarget.structuredQuery.endAt = [self encodedBound:query.endAt];
+ }
+
+ return queryTarget;
+}
+
+- (FSTQuery *)decodedQueryFromQueryTarget:(GCFSTarget_QueryTarget *)target {
+ FSTResourcePath *path = [self decodedQueryPath:target.parent];
+
+ GCFSStructuredQuery *query = target.structuredQuery;
+ NSUInteger fromCount = query.fromArray_Count;
+ if (fromCount > 0) {
+ FSTAssert(fromCount == 1,
+ @"StructuredQuery.from with more than one collection is not supported.");
+
+ GCFSStructuredQuery_CollectionSelector *from = query.fromArray[0];
+ path = [path pathByAppendingSegment:from.collectionId];
+ }
+
+ NSArray<id<FSTFilter>> *filterBy;
+ if (query.hasWhere) {
+ filterBy = [self decodedFilters:query.where];
+ } else {
+ filterBy = @[];
+ }
+
+ NSArray<FSTSortOrder *> *orderBy;
+ if (query.orderByArray_Count > 0) {
+ orderBy = [self decodedSortOrders:query.orderByArray];
+ } else {
+ orderBy = @[];
+ }
+
+ NSInteger limit = NSNotFound;
+ if (query.hasLimit) {
+ limit = query.limit.value;
+ }
+
+ FSTBound *_Nullable startAt;
+ if (query.hasStartAt) {
+ startAt = [self decodedBound:query.startAt];
+ }
+
+ FSTBound *_Nullable endAt;
+ if (query.hasEndAt) {
+ endAt = [self decodedBound:query.endAt];
+ }
+
+ return [[FSTQuery alloc] initWithPath:path
+ filterBy:filterBy
+ orderBy:orderBy
+ limit:limit
+ startAt:startAt
+ endAt:endAt];
+}
+
+#pragma mark Filters
+
+- (GCFSStructuredQuery_Filter *_Nullable)encodedFilters:(NSArray<id<FSTFilter>> *)filters {
+ if (filters.count == 0) {
+ return nil;
+ }
+ NSMutableArray<GCFSStructuredQuery_Filter *> *protos = [NSMutableArray array];
+ for (id<FSTFilter> filter in filters) {
+ if ([filter isKindOfClass:[FSTRelationFilter class]]) {
+ [protos addObject:[self encodedRelationFilter:filter]];
+ } else {
+ [protos addObject:[self encodedUnaryFilter:filter]];
+ }
+ }
+ if (protos.count == 1) {
+ // Special case: no existing filters and we only need to add one filter. This can be made the
+ // single root filter without a composite filter.
+ return protos[0];
+ }
+ GCFSStructuredQuery_Filter *composite = [GCFSStructuredQuery_Filter message];
+ composite.compositeFilter.op = GCFSStructuredQuery_CompositeFilter_Operator_And;
+ composite.compositeFilter.filtersArray = protos;
+ return composite;
+}
+
+- (NSArray<id<FSTFilter>> *)decodedFilters:(GCFSStructuredQuery_Filter *)proto {
+ NSMutableArray<id<FSTFilter>> *result = [NSMutableArray array];
+
+ NSArray<GCFSStructuredQuery_Filter *> *filters;
+ if (proto.filterTypeOneOfCase ==
+ GCFSStructuredQuery_Filter_FilterType_OneOfCase_CompositeFilter) {
+ FSTAssert(proto.compositeFilter.op == GCFSStructuredQuery_CompositeFilter_Operator_And,
+ @"Only AND-type composite filters are supported, got %d", proto.compositeFilter.op);
+ filters = proto.compositeFilter.filtersArray;
+ } else {
+ filters = @[ proto ];
+ }
+
+ for (GCFSStructuredQuery_Filter *filter in filters) {
+ switch (filter.filterTypeOneOfCase) {
+ case GCFSStructuredQuery_Filter_FilterType_OneOfCase_CompositeFilter:
+ FSTFail(@"Nested composite filters are not supported");
+
+ case GCFSStructuredQuery_Filter_FilterType_OneOfCase_FieldFilter:
+ [result addObject:[self decodedRelationFilter:filter.fieldFilter]];
+ break;
+
+ case GCFSStructuredQuery_Filter_FilterType_OneOfCase_UnaryFilter:
+ [result addObject:[self decodedUnaryFilter:filter.unaryFilter]];
+ break;
+
+ default:
+ FSTFail(@"Unrecognized Filter.filterType %d", filter.filterTypeOneOfCase);
+ }
+ }
+ return result;
+}
+
+- (GCFSStructuredQuery_Filter *)encodedRelationFilter:(FSTRelationFilter *)filter {
+ GCFSStructuredQuery_Filter *proto = [GCFSStructuredQuery_Filter message];
+ GCFSStructuredQuery_FieldFilter *fieldFilter = proto.fieldFilter;
+ fieldFilter.field = [self encodedFieldPath:filter.field];
+ fieldFilter.op = [self encodedRelationFilterOperator:filter.filterOperator];
+ fieldFilter.value = [self encodedFieldValue:filter.value];
+ return proto;
+}
+
+- (FSTRelationFilter *)decodedRelationFilter:(GCFSStructuredQuery_FieldFilter *)proto {
+ FSTFieldPath *fieldPath = [FSTFieldPath pathWithServerFormat:proto.field.fieldPath];
+ FSTRelationFilterOperator filterOperator = [self decodedRelationFilterOperator:proto.op];
+ FSTFieldValue *value = [self decodedFieldValue:proto.value];
+ return [FSTRelationFilter filterWithField:fieldPath filterOperator:filterOperator value:value];
+}
+
+- (GCFSStructuredQuery_Filter *)encodedUnaryFilter:(id<FSTFilter>)filter {
+ GCFSStructuredQuery_Filter *proto = [GCFSStructuredQuery_Filter message];
+ proto.unaryFilter.field = [self encodedFieldPath:filter.field];
+ if ([filter isKindOfClass:[FSTNanFilter class]]) {
+ proto.unaryFilter.op = GCFSStructuredQuery_UnaryFilter_Operator_IsNan;
+ } else if ([filter isKindOfClass:[FSTNullFilter class]]) {
+ proto.unaryFilter.op = GCFSStructuredQuery_UnaryFilter_Operator_IsNull;
+ } else {
+ FSTFail(@"Unrecognized filter: %@", filter);
+ }
+ return proto;
+}
+
+- (id<FSTFilter>)decodedUnaryFilter:(GCFSStructuredQuery_UnaryFilter *)proto {
+ FSTFieldPath *field = [FSTFieldPath pathWithServerFormat:proto.field.fieldPath];
+ switch (proto.op) {
+ case GCFSStructuredQuery_UnaryFilter_Operator_IsNan:
+ return [[FSTNanFilter alloc] initWithField:field];
+
+ case GCFSStructuredQuery_UnaryFilter_Operator_IsNull:
+ return [[FSTNullFilter alloc] initWithField:field];
+
+ default:
+ FSTFail(@"Unrecognized UnaryFilter.operator %d", proto.op);
+ }
+}
+
+- (GCFSStructuredQuery_FieldReference *)encodedFieldPath:(FSTFieldPath *)fieldPath {
+ GCFSStructuredQuery_FieldReference *ref = [GCFSStructuredQuery_FieldReference message];
+ ref.fieldPath = fieldPath.canonicalString;
+ return ref;
+}
+
+- (GCFSStructuredQuery_FieldFilter_Operator)encodedRelationFilterOperator:
+ (FSTRelationFilterOperator)filterOperator {
+ switch (filterOperator) {
+ case FSTRelationFilterOperatorLessThan:
+ return GCFSStructuredQuery_FieldFilter_Operator_LessThan;
+ case FSTRelationFilterOperatorLessThanOrEqual:
+ return GCFSStructuredQuery_FieldFilter_Operator_LessThanOrEqual;
+ case FSTRelationFilterOperatorEqual:
+ return GCFSStructuredQuery_FieldFilter_Operator_Equal;
+ case FSTRelationFilterOperatorGreaterThanOrEqual:
+ return GCFSStructuredQuery_FieldFilter_Operator_GreaterThanOrEqual;
+ case FSTRelationFilterOperatorGreaterThan:
+ return GCFSStructuredQuery_FieldFilter_Operator_GreaterThan;
+ default:
+ FSTFail(@"Unhandled FSTRelationFilterOperator: %ld", (long)filterOperator);
+ }
+}
+
+- (FSTRelationFilterOperator)decodedRelationFilterOperator:
+ (GCFSStructuredQuery_FieldFilter_Operator)filterOperator {
+ switch (filterOperator) {
+ case GCFSStructuredQuery_FieldFilter_Operator_LessThan:
+ return FSTRelationFilterOperatorLessThan;
+ case GCFSStructuredQuery_FieldFilter_Operator_LessThanOrEqual:
+ return FSTRelationFilterOperatorLessThanOrEqual;
+ case GCFSStructuredQuery_FieldFilter_Operator_Equal:
+ return FSTRelationFilterOperatorEqual;
+ case GCFSStructuredQuery_FieldFilter_Operator_GreaterThanOrEqual:
+ return FSTRelationFilterOperatorGreaterThanOrEqual;
+ case GCFSStructuredQuery_FieldFilter_Operator_GreaterThan:
+ return FSTRelationFilterOperatorGreaterThan;
+ default:
+ FSTFail(@"Unhandled FieldFilter.operator: %d", filterOperator);
+ }
+}
+
+#pragma mark Property Orders
+
+- (NSArray<GCFSStructuredQuery_Order *> *)encodedSortOrders:(NSArray<FSTSortOrder *> *)orders {
+ NSMutableArray<GCFSStructuredQuery_Order *> *protos = [NSMutableArray array];
+ for (FSTSortOrder *order in orders) {
+ [protos addObject:[self encodedSortOrder:order]];
+ }
+ return protos;
+}
+
+- (NSArray<FSTSortOrder *> *)decodedSortOrders:(NSArray<GCFSStructuredQuery_Order *> *)protos {
+ NSMutableArray<FSTSortOrder *> *result = [NSMutableArray arrayWithCapacity:protos.count];
+ for (GCFSStructuredQuery_Order *orderProto in protos) {
+ [result addObject:[self decodedSortOrder:orderProto]];
+ }
+ return result;
+}
+
+- (GCFSStructuredQuery_Order *)encodedSortOrder:(FSTSortOrder *)sortOrder {
+ GCFSStructuredQuery_Order *proto = [GCFSStructuredQuery_Order message];
+ proto.field = [self encodedFieldPath:sortOrder.field];
+ if (sortOrder.ascending) {
+ proto.direction = GCFSStructuredQuery_Direction_Ascending;
+ } else {
+ proto.direction = GCFSStructuredQuery_Direction_Descending;
+ }
+ return proto;
+}
+
+- (FSTSortOrder *)decodedSortOrder:(GCFSStructuredQuery_Order *)proto {
+ FSTFieldPath *fieldPath = [FSTFieldPath pathWithServerFormat:proto.field.fieldPath];
+ BOOL ascending;
+ switch (proto.direction) {
+ case GCFSStructuredQuery_Direction_Ascending:
+ ascending = YES;
+ break;
+ case GCFSStructuredQuery_Direction_Descending:
+ ascending = NO;
+ break;
+ default:
+ FSTFail(@"Unrecognized GCFSStructuredQuery_Direction %d", proto.direction);
+ }
+ return [FSTSortOrder sortOrderWithFieldPath:fieldPath ascending:ascending];
+}
+
+#pragma mark - Bounds/Cursors
+
+- (GCFSCursor *)encodedBound:(FSTBound *)bound {
+ GCFSCursor *proto = [GCFSCursor message];
+ proto.before = bound.isBefore;
+ for (FSTFieldValue *fieldValue in bound.position) {
+ GCFSValue *value = [self encodedFieldValue:fieldValue];
+ [proto.valuesArray addObject:value];
+ }
+ return proto;
+}
+
+- (FSTBound *)decodedBound:(GCFSCursor *)proto {
+ NSMutableArray<FSTFieldValue *> *indexComponents = [NSMutableArray array];
+
+ for (GCFSValue *valueProto in proto.valuesArray) {
+ FSTFieldValue *value = [self decodedFieldValue:valueProto];
+ [indexComponents addObject:value];
+ }
+
+ return [FSTBound boundWithPosition:indexComponents isBefore:proto.before];
+}
+
+#pragma mark - FSTWatchChange <= GCFSListenResponse proto
+
+- (FSTWatchChange *)decodedWatchChange:(GCFSListenResponse *)watchChange {
+ switch (watchChange.responseTypeOneOfCase) {
+ case GCFSListenResponse_ResponseType_OneOfCase_TargetChange:
+ return [self decodedTargetChangeFromWatchChange:watchChange.targetChange];
+
+ case GCFSListenResponse_ResponseType_OneOfCase_DocumentChange:
+ return [self decodedDocumentChange:watchChange.documentChange];
+
+ case GCFSListenResponse_ResponseType_OneOfCase_DocumentDelete:
+ return [self decodedDocumentDelete:watchChange.documentDelete];
+
+ case GCFSListenResponse_ResponseType_OneOfCase_DocumentRemove:
+ return [self decodedDocumentRemove:watchChange.documentRemove];
+
+ case GCFSListenResponse_ResponseType_OneOfCase_Filter:
+ return [self decodedExistenceFilterWatchChange:watchChange.filter];
+
+ default:
+ FSTFail(@"Unknown WatchChange.changeType %" PRId32, watchChange.responseTypeOneOfCase);
+ }
+}
+
+- (FSTSnapshotVersion *)versionFromListenResponse:(GCFSListenResponse *)watchChange {
+ // We have only reached a consistent snapshot for the entire stream if there is a read_time set
+ // and it applies to all targets (i.e. the list of targets is empty). The backend is guaranteed to
+ // send such responses.
+ if (watchChange.responseTypeOneOfCase != GCFSListenResponse_ResponseType_OneOfCase_TargetChange) {
+ return [FSTSnapshotVersion noVersion];
+ }
+ if (watchChange.targetChange.targetIdsArray.count != 0) {
+ return [FSTSnapshotVersion noVersion];
+ }
+ return [self decodedVersion:watchChange.targetChange.readTime];
+}
+
+- (FSTWatchTargetChange *)decodedTargetChangeFromWatchChange:(GCFSTargetChange *)change {
+ FSTWatchTargetChangeState state = [self decodedWatchTargetChangeState:change.targetChangeType];
+ NSMutableArray<NSNumber *> *targetIDs =
+ [NSMutableArray arrayWithCapacity:change.targetIdsArray_Count];
+
+ [change.targetIdsArray enumerateValuesWithBlock:^(int32_t value, NSUInteger idx, BOOL *stop) {
+ [targetIDs addObject:@(value)];
+ }];
+
+ NSError *cause = nil;
+ if (change.hasCause) {
+ cause = [NSError errorWithDomain:FIRFirestoreErrorDomain
+ code:change.cause.code
+ userInfo:@{NSLocalizedDescriptionKey : change.cause.message}];
+ }
+
+ return [[FSTWatchTargetChange alloc] initWithState:state
+ targetIDs:targetIDs
+ resumeToken:change.resumeToken
+ cause:cause];
+}
+
+- (FSTWatchTargetChangeState)decodedWatchTargetChangeState:
+ (GCFSTargetChange_TargetChangeType)state {
+ switch (state) {
+ case GCFSTargetChange_TargetChangeType_NoChange:
+ return FSTWatchTargetChangeStateNoChange;
+ case GCFSTargetChange_TargetChangeType_Add:
+ return FSTWatchTargetChangeStateAdded;
+ case GCFSTargetChange_TargetChangeType_Remove:
+ return FSTWatchTargetChangeStateRemoved;
+ case GCFSTargetChange_TargetChangeType_Current:
+ return FSTWatchTargetChangeStateCurrent;
+ case GCFSTargetChange_TargetChangeType_Reset:
+ return FSTWatchTargetChangeStateReset;
+ default:
+ FSTFail(@"Unexpected TargetChange.state: %" PRId32, state);
+ }
+}
+
+- (NSArray<NSNumber *> *)decodedIntegerArray:(GPBInt32Array *)values {
+ NSMutableArray<NSNumber *> *result = [NSMutableArray arrayWithCapacity:values.count];
+ [values enumerateValuesWithBlock:^(int32_t value, NSUInteger idx, BOOL *stop) {
+ [result addObject:@(value)];
+ }];
+ return result;
+}
+
+- (FSTDocumentWatchChange *)decodedDocumentChange:(GCFSDocumentChange *)change {
+ FSTObjectValue *value = [self decodedFields:change.document.fields];
+ FSTDocumentKey *key = [self decodedDocumentKey:change.document.name];
+ FSTSnapshotVersion *version = [self decodedVersion:change.document.updateTime];
+ FSTAssert(![version isEqual:[FSTSnapshotVersion noVersion]],
+ @"Got a document change with no snapshot version");
+ FSTMaybeDocument *document =
+ [FSTDocument documentWithData:value key:key version:version hasLocalMutations:NO];
+
+ NSArray<NSNumber *> *updatedTargetIds = [self decodedIntegerArray:change.targetIdsArray];
+ NSArray<NSNumber *> *removedTargetIds = [self decodedIntegerArray:change.removedTargetIdsArray];
+
+ return [[FSTDocumentWatchChange alloc] initWithUpdatedTargetIDs:updatedTargetIds
+ removedTargetIDs:removedTargetIds
+ documentKey:document.key
+ document:document];
+}
+
+- (FSTDocumentWatchChange *)decodedDocumentDelete:(GCFSDocumentDelete *)change {
+ FSTDocumentKey *key = [self decodedDocumentKey:change.document];
+ // Note that version might be unset in which case we use [FSTSnapshotVersion noVersion]
+ FSTSnapshotVersion *version = [self decodedVersion:change.readTime];
+ FSTMaybeDocument *document = [FSTDeletedDocument documentWithKey:key version:version];
+
+ NSArray<NSNumber *> *removedTargetIds = [self decodedIntegerArray:change.removedTargetIdsArray];
+
+ return [[FSTDocumentWatchChange alloc] initWithUpdatedTargetIDs:@[]
+ removedTargetIDs:removedTargetIds
+ documentKey:document.key
+ document:document];
+}
+
+- (FSTDocumentWatchChange *)decodedDocumentRemove:(GCFSDocumentRemove *)change {
+ FSTDocumentKey *key = [self decodedDocumentKey:change.document];
+ NSArray<NSNumber *> *removedTargetIds = [self decodedIntegerArray:change.removedTargetIdsArray];
+
+ return [[FSTDocumentWatchChange alloc] initWithUpdatedTargetIDs:@[]
+ removedTargetIDs:removedTargetIds
+ documentKey:key
+ document:nil];
+}
+
+- (FSTExistenceFilterWatchChange *)decodedExistenceFilterWatchChange:(GCFSExistenceFilter *)filter {
+ // TODO(dimond): implement existence filter parsing
+ FSTExistenceFilter *existenceFilter = [FSTExistenceFilter filterWithCount:filter.count];
+ FSTTargetID targetID = filter.targetId;
+ return [FSTExistenceFilterWatchChange changeWithFilter:existenceFilter targetID:targetID];
+}
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTWatchChange.h b/Firestore/Source/Remote/FSTWatchChange.h
new file mode 100644
index 0000000..6b65279
--- /dev/null
+++ b/Firestore/Source/Remote/FSTWatchChange.h
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import <Foundation/Foundation.h>
+
+#import "FSTTypes.h"
+
+@class FSTDocumentKey;
+@class FSTExistenceFilter;
+@class FSTMaybeDocument;
+@class FSTSnapshotVersion;
+
+NS_ASSUME_NONNULL_BEGIN
+
+/**
+ * FSTWatchChange is the internal representation of the watcher API protocol buffers.
+ * This is an empty abstract class so that all the different kinds of changes can have a common
+ * base class.
+ */
+@interface FSTWatchChange : NSObject
+@end
+
+/**
+ * FSTDocumentWatchChange represents a changed document and a list of target ids to which this
+ * change applies. If the document has been deleted, the deleted document will be provided.
+ */
+@interface FSTDocumentWatchChange : FSTWatchChange
+
+- (instancetype)initWithUpdatedTargetIDs:(NSArray<NSNumber *> *)updatedTargetIDs
+ removedTargetIDs:(NSArray<NSNumber *> *)removedTargetIDs
+ documentKey:(FSTDocumentKey *)documentKey
+ document:(nullable FSTMaybeDocument *)document
+ NS_DESIGNATED_INITIALIZER;
+
+- (instancetype)init NS_UNAVAILABLE;
+
+/** The new document applies to all of these targets. */
+@property(nonatomic, strong, readonly) NSArray<NSNumber *> *updatedTargetIDs;
+
+/** The new document is removed from all of these targets. */
+@property(nonatomic, strong, readonly) NSArray<NSNumber *> *removedTargetIDs;
+
+/** The key of the document for this change. */
+@property(nonatomic, strong, readonly) FSTDocumentKey *documentKey;
+
+/**
+ * The new document or DeletedDocument if it was deleted. Is null if the document went out of
+ * view without the server sending a new document.
+ */
+@property(nonatomic, strong, readonly, nullable) FSTMaybeDocument *document;
+
+@end
+
+/**
+ * An ExistenceFilterWatchChange applies to the targets and is required to verify the current client
+ * state against expected state sent from the server.
+ */
+@interface FSTExistenceFilterWatchChange : FSTWatchChange
+
++ (instancetype)changeWithFilter:(FSTExistenceFilter *)filter targetID:(FSTTargetID)targetID;
+
+- (instancetype)init NS_UNAVAILABLE;
+
+@property(nonatomic, strong, readonly) FSTExistenceFilter *filter;
+@property(nonatomic, assign, readonly) FSTTargetID targetID;
+@end
+
+/** FSTWatchTargetChangeState is the kind of change that happened to the watch target. */
+typedef NS_ENUM(NSInteger, FSTWatchTargetChangeState) {
+ FSTWatchTargetChangeStateNoChange,
+ FSTWatchTargetChangeStateAdded,
+ FSTWatchTargetChangeStateRemoved,
+ FSTWatchTargetChangeStateCurrent,
+ FSTWatchTargetChangeStateReset,
+};
+
+/** FSTWatchTargetChange is a change to a watch target. */
+@interface FSTWatchTargetChange : FSTWatchChange
+
+- (instancetype)initWithState:(FSTWatchTargetChangeState)state
+ targetIDs:(NSArray<NSNumber *> *)targetIDs
+ resumeToken:(NSData *)resumeToken
+ cause:(nullable NSError *)cause NS_DESIGNATED_INITIALIZER;
+
+- (instancetype)init NS_UNAVAILABLE;
+
+/** What kind of change occurred to the watch target. */
+@property(nonatomic, assign, readonly) FSTWatchTargetChangeState state;
+
+/** The target IDs that were added/removed/set. */
+@property(nonatomic, strong, readonly) NSArray<NSNumber *> *targetIDs;
+
+/**
+ * An opaque, server-assigned token that allows watching a query to be resumed after disconnecting
+ * without retransmitting all the data that matches the query. The resume token essentially
+ * identifies a point in time from which the server should resume sending results.
+ */
+@property(nonatomic, strong, readonly) NSData *resumeToken;
+
+/** An RPC error indicating why the watch failed. */
+@property(nonatomic, strong, readonly, nullable) NSError *cause;
+
+@end
+
+NS_ASSUME_NONNULL_END
diff --git a/Firestore/Source/Remote/FSTWatchChange.m b/Firestore/Source/Remote/FSTWatchChange.m
new file mode 100644
index 0000000..1ace26e
--- /dev/null
+++ b/Firestore/Source/Remote/FSTWatchChange.m
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import "FSTWatchChange.h"
+
+#import "FSTDocument.h"
+#import "FSTDocumentKey.h"
+#import "FSTExistenceFilter.h"
+
+NS_ASSUME_NONNULL_BEGIN
+
+@implementation FSTWatchChange
+@end
+
+@implementation FSTDocumentWatchChange
+
+- (instancetype)initWithUpdatedTargetIDs:(NSArray<NSNumber *> *)updatedTargetIDs
+ removedTargetIDs:(NSArray<NSNumber *> *)removedTargetIDs
+ documentKey:(FSTDocumentKey *)documentKey
+ document:(nullable FSTMaybeDocument *)document {
+ self = [super init];
+ if (self) {
+ _updatedTargetIDs = updatedTargetIDs;
+ _removedTargetIDs = removedTargetIDs;
+ _documentKey = documentKey;
+ _document = document;
+ }
+ return self;
+}
+
+- (BOOL)isEqual:(id)other {
+ if (other == self) {
+ return YES;
+ }
+ if (![other isMemberOfClass:[FSTDocumentWatchChange class]]) {
+ return NO;
+ }
+
+ FSTDocumentWatchChange *otherChange = (FSTDocumentWatchChange *)other;
+ return [_updatedTargetIDs isEqual:otherChange.updatedTargetIDs] &&
+ [_removedTargetIDs isEqual:otherChange.removedTargetIDs] &&
+ [_documentKey isEqual:otherChange.documentKey] &&
+ (_document == otherChange.document || [_document isEqual:otherChange.document]);
+}
+
+- (NSUInteger)hash {
+ NSUInteger hash = self.updatedTargetIDs.hash;
+ hash = hash * 31 + self.removedTargetIDs.hash;
+ hash = hash * 31 + self.documentKey.hash;
+ hash = hash * 31 + self.document.hash;
+ return hash;
+}
+
+@end
+
+@interface FSTExistenceFilterWatchChange ()
+
+- (instancetype)initWithFilter:(FSTExistenceFilter *)filter
+ targetID:(FSTTargetID)targetID NS_DESIGNATED_INITIALIZER;
+
+@end
+
+@implementation FSTExistenceFilterWatchChange
+
++ (instancetype)changeWithFilter:(FSTExistenceFilter *)filter targetID:(FSTTargetID)targetID {
+ return [[FSTExistenceFilterWatchChange alloc] initWithFilter:filter targetID:targetID];
+}
+
+- (instancetype)initWithFilter:(FSTExistenceFilter *)filter targetID:(FSTTargetID)targetID {
+ self = [super init];
+ if (self) {
+ _filter = filter;
+ _targetID = targetID;
+ }
+ return self;
+}
+
+- (BOOL)isEqual:(id)other {
+ if (other == self) {
+ return YES;
+ }
+ if (![other isMemberOfClass:[FSTExistenceFilterWatchChange class]]) {
+ return NO;
+ }
+
+ FSTExistenceFilterWatchChange *otherChange = (FSTExistenceFilterWatchChange *)other;
+ return [_filter isEqual:otherChange->_filter] && _targetID == otherChange->_targetID;
+}
+
+- (NSUInteger)hash {
+ return self.filter.hash;
+}
+
+@end
+
+@implementation FSTWatchTargetChange
+
+- (instancetype)initWithState:(FSTWatchTargetChangeState)state
+ targetIDs:(NSArray<NSNumber *> *)targetIDs
+ resumeToken:(NSData *)resumeToken
+ cause:(nullable NSError *)cause {
+ self = [super init];
+ if (self) {
+ _state = state;
+ _targetIDs = targetIDs;
+ _resumeToken = resumeToken;
+ _cause = cause;
+ }
+ return self;
+}
+
+- (BOOL)isEqual:(id)other {
+ if (other == self) {
+ return YES;
+ }
+ if (![other isMemberOfClass:[FSTWatchTargetChange class]]) {
+ return NO;
+ }
+
+ FSTWatchTargetChange *otherChange = (FSTWatchTargetChange *)other;
+ return _state == otherChange->_state && [_targetIDs isEqual:otherChange->_targetIDs] &&
+ [_resumeToken isEqual:otherChange->_resumeToken] &&
+ (_cause == otherChange->_cause || [_cause isEqual:otherChange->_cause]);
+}
+
+- (NSUInteger)hash {
+ NSUInteger hash = (NSUInteger)self.state;
+
+ hash = hash * 31 + self.targetIDs.hash;
+ hash = hash * 31 + self.resumeToken.hash;
+ hash = hash * 31 + self.cause.hash;
+ return hash;
+}
+
+@end
+
+NS_ASSUME_NONNULL_END