aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/Source/Remote/FSTRemoteStore.mm
blob: 4309c74c211ddb253dc66465caab73e080212942 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
/*
 * Copyright 2017 Google
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#import "Firestore/Source/Remote/FSTRemoteStore.h"

#include <cinttypes>

#import "Firestore/Source/Core/FSTQuery.h"
#import "Firestore/Source/Core/FSTTransaction.h"
#import "Firestore/Source/Local/FSTLocalStore.h"
#import "Firestore/Source/Local/FSTQueryData.h"
#import "Firestore/Source/Model/FSTDocument.h"
#import "Firestore/Source/Model/FSTMutation.h"
#import "Firestore/Source/Model/FSTMutationBatch.h"
#import "Firestore/Source/Remote/FSTDatastore.h"
#import "Firestore/Source/Remote/FSTExistenceFilter.h"
#import "Firestore/Source/Remote/FSTOnlineStateTracker.h"
#import "Firestore/Source/Remote/FSTRemoteEvent.h"
#import "Firestore/Source/Remote/FSTStream.h"
#import "Firestore/Source/Remote/FSTWatchChange.h"

#include "Firestore/core/src/firebase/firestore/auth/user.h"
#include "Firestore/core/src/firebase/firestore/model/document_key.h"
#include "Firestore/core/src/firebase/firestore/model/snapshot_version.h"
#include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
#include "Firestore/core/src/firebase/firestore/util/log.h"
#include "Firestore/core/src/firebase/firestore/util/string_apple.h"

namespace util = firebase::firestore::util;
using firebase::firestore::auth::User;
using firebase::firestore::model::DocumentKey;
using firebase::firestore::model::SnapshotVersion;
using firebase::firestore::model::DocumentKeySet;

NS_ASSUME_NONNULL_BEGIN

/**
 * The maximum number of pending writes to allow.
 * TODO(bjornick): Negotiate this value with the backend.
 */
static const int kMaxPendingWrites = 10;

#pragma mark - FSTRemoteStore

@interface FSTRemoteStore () <FSTWatchStreamDelegate, FSTWriteStreamDelegate>

/**
 * The local store, used to fill the write pipeline with outbound mutations and resolve existence
 * filter mismatches. Immutable after initialization.
 */
@property(nonatomic, strong, readonly) FSTLocalStore *localStore;

/** The client-side proxy for interacting with the backend. Immutable after initialization. */
@property(nonatomic, strong, readonly) FSTDatastore *datastore;

#pragma mark Watch Stream
// The watchStream is null when the network is disabled. The non-null check is performed by
// isNetworkEnabled.
@property(nonatomic, strong, nullable) FSTWatchStream *watchStream;

/**
 * A mapping of watched targets that the client cares about tracking and the
 * user has explicitly called a 'listen' for this target.
 *
 * These targets may or may not have been sent to or acknowledged by the
 * server. On re-establishing the listen stream, these targets should be sent
 * to the server. The targets removed with unlistens are removed eagerly
 * without waiting for confirmation from the listen stream. */
@property(nonatomic, strong, readonly)
    NSMutableDictionary<FSTBoxedTargetID *, FSTQueryData *> *listenTargets;

/**
 * A mapping of targetId to pending acks needed.
 *
 * If a targetId is present in this map, then we're waiting for watch to
 * acknowledge a removal or addition of the target. If a target is not in this
 * mapping, and it's in the listenTargets map, then we consider the target to
 * be active.
 *
 * We increment the count here everytime we issue a request over the stream to
 * watch or unwatch. We then decrement the count everytime we get a target
 * added or target removed message from the server. Once the count is equal to
 * 0 we know that the client and server are in the same state (once this state
 * is reached the targetId is removed from the map to free the memory).
 */

@property(nonatomic, assign) FSTBatchID lastBatchSeen;

@property(nonatomic, strong, readonly) FSTOnlineStateTracker *onlineStateTracker;

@property(nonatomic, strong, nullable) FSTWatchChangeAggregator *watchChangeAggregator;

#pragma mark Write Stream
// The writeStream is null when the network is disabled. The non-null check is performed by
// isNetworkEnabled.
@property(nonatomic, strong, nullable) FSTWriteStream *writeStream;

/**
 * A FIFO queue of in-flight writes. This is in-flight from the point of view of the caller of
 * writeMutations, not from the point of view from the Datastore itself. In particular, these
 * requests may not have been sent to the Datastore server if the write stream is not yet running.
 */
@property(nonatomic, strong, readonly) NSMutableArray<FSTMutationBatch *> *pendingWrites;
@end

@implementation FSTRemoteStore

- (instancetype)initWithLocalStore:(FSTLocalStore *)localStore
                         datastore:(FSTDatastore *)datastore
               workerDispatchQueue:(FSTDispatchQueue *)queue {
  if (self = [super init]) {
    _localStore = localStore;
    _datastore = datastore;
    _listenTargets = [NSMutableDictionary dictionary];

    _lastBatchSeen = kFSTBatchIDUnknown;
    _pendingWrites = [NSMutableArray array];
    _onlineStateTracker = [[FSTOnlineStateTracker alloc] initWithWorkerDispatchQueue:queue];
  }
  return self;
}

- (void)start {
  // For now, all setup is handled by enableNetwork(). We might expand on this in the future.
  [self enableNetwork];
}

@dynamic onlineStateDelegate;

- (nullable id<FSTOnlineStateDelegate>)onlineStateDelegate {
  return self.onlineStateTracker.onlineStateDelegate;
}

- (void)setOnlineStateDelegate:(nullable id<FSTOnlineStateDelegate>)delegate {
  self.onlineStateTracker.onlineStateDelegate = delegate;
}

#pragma mark Online/Offline state

- (BOOL)isNetworkEnabled {
  HARD_ASSERT((self.watchStream == nil) == (self.writeStream == nil),
              "WatchStream and WriteStream should both be null or non-null");
  return self.watchStream != nil;
}

- (void)enableNetwork {
  if ([self isNetworkEnabled]) {
    return;
  }

  // Create new streams (but note they're not started yet).
  self.watchStream = [self.datastore createWatchStream];
  self.writeStream = [self.datastore createWriteStream];

  // Load any saved stream token from persistent storage
  self.writeStream.lastStreamToken = [self.localStore lastStreamToken];

  if ([self shouldStartWatchStream]) {
    [self startWatchStream];
  } else {
    [self.onlineStateTracker updateState:FSTOnlineStateUnknown];
  }

  [self fillWritePipeline];  // This may start the writeStream.
}

- (void)disableNetwork {
  [self disableNetworkInternal];
  // Set the FSTOnlineState to Offline so get()s return from cache, etc.
  [self.onlineStateTracker updateState:FSTOnlineStateOffline];
}

/** Disables the network, setting the FSTOnlineState to the specified targetOnlineState. */
- (void)disableNetworkInternal {
  if ([self isNetworkEnabled]) {
    // NOTE: We're guaranteed not to get any further events from these streams (not even a close
    // event).
    [self.watchStream stop];
    [self.writeStream stop];

    [self cleanUpWatchStreamState];
    [self cleanUpWriteStreamState];

    self.writeStream = nil;
    self.watchStream = nil;
  }
}

#pragma mark Shutdown

- (void)shutdown {
  LOG_DEBUG("FSTRemoteStore %s shutting down", (__bridge void *)self);
  [self disableNetworkInternal];
  // Set the FSTOnlineState to Unknown (rather than Offline) to avoid potentially triggering
  // spurious listener events with cached data, etc.
  [self.onlineStateTracker updateState:FSTOnlineStateUnknown];
}

- (void)userDidChange:(const User &)user {
  LOG_DEBUG("FSTRemoteStore %s changing users: %s", (__bridge void *)self, user.uid());
  if ([self isNetworkEnabled]) {
    // Tear down and re-create our network streams. This will ensure we get a fresh auth token
    // for the new user and re-fill the write pipeline with new mutations from the LocalStore
    // (since mutations are per-user).
    [self disableNetworkInternal];
    [self.onlineStateTracker updateState:FSTOnlineStateUnknown];
    [self enableNetwork];
  }
}

#pragma mark Watch Stream

- (void)startWatchStream {
  HARD_ASSERT([self shouldStartWatchStream],
              "startWatchStream: called when shouldStartWatchStream: is false.");
  _watchChangeAggregator = [[FSTWatchChangeAggregator alloc] initWithTargetMetadataProvider:self];
  [self.watchStream startWithDelegate:self];
  [self.onlineStateTracker handleWatchStreamStart];
}

- (void)listenToTargetWithQueryData:(FSTQueryData *)queryData {
  NSNumber *targetKey = @(queryData.targetID);
  HARD_ASSERT(!self.listenTargets[targetKey], "listenToQuery called with duplicate target id: %s",
              targetKey);

  self.listenTargets[targetKey] = queryData;

  if ([self shouldStartWatchStream]) {
    [self startWatchStream];
  } else if ([self isNetworkEnabled] && [self.watchStream isOpen]) {
    [self sendWatchRequestWithQueryData:queryData];
  }
}

- (void)sendWatchRequestWithQueryData:(FSTQueryData *)queryData {
  [self.watchChangeAggregator recordTargetRequest:@(queryData.targetID)];
  [self.watchStream watchQuery:queryData];
}

- (void)stopListeningToTargetID:(FSTTargetID)targetID {
  FSTBoxedTargetID *targetKey = @(targetID);
  FSTQueryData *queryData = self.listenTargets[targetKey];
  HARD_ASSERT(queryData, "unlistenToTarget: target not currently watched: %s", targetKey);

  [self.listenTargets removeObjectForKey:targetKey];
  if ([self isNetworkEnabled] && [self.watchStream isOpen]) {
    [self sendUnwatchRequestForTargetID:targetKey];
    if ([self.listenTargets count] == 0) {
      [self.watchStream markIdle];
    }
  }
}

- (void)sendUnwatchRequestForTargetID:(FSTBoxedTargetID *)targetID {
  [self.watchChangeAggregator recordTargetRequest:targetID];
  [self.watchStream unwatchTargetID:[targetID intValue]];
}

/**
 * Returns YES if the network is enabled, the watch stream has not yet been started and there are
 * active watch targets.
 */
- (BOOL)shouldStartWatchStream {
  return [self isNetworkEnabled] && ![self.watchStream isStarted] && self.listenTargets.count > 0;
}

- (void)cleanUpWatchStreamState {
  _watchChangeAggregator = nil;
}

- (void)watchStreamDidOpen {
  // Restore any existing watches.
  for (FSTQueryData *queryData in [self.listenTargets objectEnumerator]) {
    [self sendWatchRequestWithQueryData:queryData];
  }
}

- (void)watchStreamDidChange:(FSTWatchChange *)change
             snapshotVersion:(const SnapshotVersion &)snapshotVersion {
  // Mark the connection as Online because we got a message from the server.
  [self.onlineStateTracker updateState:FSTOnlineStateOnline];

  if ([change isKindOfClass:[FSTWatchTargetChange class]]) {
    FSTWatchTargetChange *watchTargetChange = (FSTWatchTargetChange *)change;
    if (watchTargetChange.state == FSTWatchTargetChangeStateRemoved && watchTargetChange.cause) {
      // There was an error on a target, don't wait for a consistent snapshot to raise events
      return [self processTargetErrorForWatchChange:watchTargetChange];
    } else {
      [self.watchChangeAggregator handleTargetChange:watchTargetChange];
    }
  } else if ([change isKindOfClass:[FSTDocumentWatchChange class]]) {
    [self.watchChangeAggregator handleDocumentChange:(FSTDocumentWatchChange *)change];
  } else {
    HARD_ASSERT([change isKindOfClass:[FSTExistenceFilterWatchChange class]],
                "Expected watchChange to be an instance of FSTExistenceFilterWatchChange");
    [self.watchChangeAggregator handleExistenceFilter:(FSTExistenceFilterWatchChange *)change];
  }

  if (snapshotVersion != SnapshotVersion::None() &&
      snapshotVersion >= [self.localStore lastRemoteSnapshotVersion]) {
    // We have received a target change with a global snapshot if the snapshot version is not equal
    // to SnapshotVersion.None().
    [self raiseWatchSnapshotWithSnapshotVersion:snapshotVersion];
  }
}

- (void)watchStreamWasInterruptedWithError:(nullable NSError *)error {
  HARD_ASSERT([self isNetworkEnabled],
              "watchStreamWasInterruptedWithError: should only be called when the network is "
              "enabled");

  [self cleanUpWatchStreamState];
  [self.onlineStateTracker handleWatchStreamFailure];

  // If the watch stream closed due to an error, retry the connection if there are any active
  // watch targets.
  if ([self shouldStartWatchStream]) {
    [self startWatchStream];
  } else {
    // We don't need to restart the watch stream because there are no active targets. The online
    // state is set to unknown because there is no active attempt at establishing a connection.
    [self.onlineStateTracker updateState:FSTOnlineStateUnknown];
  }
}

/**
 * Takes a batch of changes from the Datastore, repackages them as a RemoteEvent, and passes that
 * on to the SyncEngine.
 */
- (void)raiseWatchSnapshotWithSnapshotVersion:(const SnapshotVersion &)snapshotVersion {
  HARD_ASSERT(snapshotVersion != SnapshotVersion::None(),
              "Can't raise event for unknown SnapshotVersion");

  FSTRemoteEvent *remoteEvent =
      [self.watchChangeAggregator remoteEventAtSnapshotVersion:snapshotVersion];

  // Update in-memory resume tokens. FSTLocalStore will update the persistent view of these when
  // applying the completed FSTRemoteEvent.
  for (const auto &entry : remoteEvent.targetChanges) {
    NSData *resumeToken = entry.second.resumeToken;
    if (resumeToken.length > 0) {
      FSTBoxedTargetID *targetID = @(entry.first);
      FSTQueryData *queryData = _listenTargets[targetID];
      // A watched target might have been removed already.
      if (queryData) {
        _listenTargets[targetID] =
            [queryData queryDataByReplacingSnapshotVersion:snapshotVersion
                                               resumeToken:resumeToken
                                            sequenceNumber:queryData.sequenceNumber];
      }
    }
  }

  // Re-establish listens for the targets that have been invalidated by existence filter mismatches.
  for (FSTTargetID targetID : remoteEvent.targetMismatches) {
    FSTQueryData *queryData = self.listenTargets[@(targetID)];

    if (!queryData) {
      // A watched target might have been removed already.
      continue;
    }

    // Clear the resume token for the query, since we're in a known mismatch state.
    queryData = [[FSTQueryData alloc] initWithQuery:queryData.query
                                           targetID:targetID
                               listenSequenceNumber:queryData.sequenceNumber
                                            purpose:queryData.purpose];
    self.listenTargets[@(targetID)] = queryData;

    // Cause a hard reset by unwatching and rewatching immediately, but deliberately don't send a
    // resume token so that we get a full update.
    [self sendUnwatchRequestForTargetID:@(targetID)];

    // Mark the query we send as being on behalf of an existence filter mismatch, but don't actually
    // retain that in listenTargets. This ensures that we flag the first re-listen this way without
    // impacting future listens of this target (that might happen e.g. on reconnect).
    FSTQueryData *requestQueryData =
        [[FSTQueryData alloc] initWithQuery:queryData.query
                                   targetID:targetID
                       listenSequenceNumber:queryData.sequenceNumber
                                    purpose:FSTQueryPurposeExistenceFilterMismatch];
    [self sendWatchRequestWithQueryData:requestQueryData];
  }

  // Finally handle remote event
  [self.syncEngine applyRemoteEvent:remoteEvent];
}

/** Process a target error and passes the error along to SyncEngine. */
- (void)processTargetErrorForWatchChange:(FSTWatchTargetChange *)change {
  HARD_ASSERT(change.cause, "Handling target error without a cause");
  // Ignore targets that have been removed already.
  for (FSTBoxedTargetID *targetID in change.targetIDs) {
    if (self.listenTargets[targetID]) {
      int unboxedTargetId = targetID.intValue;
      [self.listenTargets removeObjectForKey:targetID];
      [self.watchChangeAggregator removeTarget:unboxedTargetId];
      [self.syncEngine rejectListenWithTargetID:unboxedTargetId error:change.cause];
    }
  }
}

- (firebase::firestore::model::DocumentKeySet)remoteKeysForTarget:(FSTBoxedTargetID *)targetID {
  return [self.syncEngine remoteKeysForTarget:targetID];
}

- (nullable FSTQueryData *)queryDataForTarget:(FSTBoxedTargetID *)targetID {
  return self.listenTargets[targetID];
}

#pragma mark Write Stream

/**
 * Returns YES if the network is enabled, the write stream has not yet been started and there are
 * pending writes.
 */
- (BOOL)shouldStartWriteStream {
  return [self isNetworkEnabled] && ![self.writeStream isStarted] && self.pendingWrites.count > 0;
}

- (void)startWriteStream {
  HARD_ASSERT([self shouldStartWriteStream],
              "startWriteStream: called when shouldStartWriteStream: is false.");

  [self.writeStream startWithDelegate:self];
}

- (void)cleanUpWriteStreamState {
  self.lastBatchSeen = kFSTBatchIDUnknown;
  LOG_DEBUG("Stopping write stream with %s pending writes", [self.pendingWrites count]);
  [self.pendingWrites removeAllObjects];
}

- (void)fillWritePipeline {
  if ([self isNetworkEnabled]) {
    while ([self canWriteMutations]) {
      FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:self.lastBatchSeen];
      if (!batch) {
        break;
      }
      [self commitBatch:batch];
    }

    if ([self.pendingWrites count] == 0) {
      [self.writeStream markIdle];
    }
  }
}

/**
 * Returns YES if the backend can accept additional write requests.
 *
 * When sending mutations to the write stream (e.g. in -fillWritePipeline), call this method first
 * to check if more mutations can be sent.
 *
 * Currently the only thing that can prevent the backend from accepting write requests is if
 * there are too many requests already outstanding. As writes complete the backend will be able
 * to accept more.
 */
- (BOOL)canWriteMutations {
  return [self isNetworkEnabled] && self.pendingWrites.count < kMaxPendingWrites;
}

/** Given mutations to commit, actually commits them to the backend. */
- (void)commitBatch:(FSTMutationBatch *)batch {
  HARD_ASSERT([self canWriteMutations], "commitBatch called when mutations can't be written");
  self.lastBatchSeen = batch.batchID;

  [self.pendingWrites addObject:batch];

  if ([self shouldStartWriteStream]) {
    [self startWriteStream];
  } else if ([self isNetworkEnabled] && self.writeStream.handshakeComplete) {
    [self.writeStream writeMutations:batch.mutations];
  }
}

- (void)writeStreamDidOpen {
  [self.writeStream writeHandshake];
}

/**
 * Handles a successful handshake response from the server, which is our cue to send any pending
 * writes.
 */
- (void)writeStreamDidCompleteHandshake {
  // Record the stream token.
  [self.localStore setLastStreamToken:self.writeStream.lastStreamToken];

  // Drain any pending writes.
  //
  // Note that at this point pendingWrites contains mutations that have already been accepted by
  // fillWritePipeline/commitBatch. If the pipeline is full, canWriteMutations will be NO, despite
  // the fact that we actually need to send mutations over.
  //
  // This also means that this method indirectly respects the limits imposed by canWriteMutations
  // since writes can't be added to the pendingWrites array when canWriteMutations is NO. If the
  // limits imposed by canWriteMutations actually protect us from DOSing ourselves then those limits
  // won't be exceeded here and we'll continue to make progress.
  for (FSTMutationBatch *write in self.pendingWrites) {
    [self.writeStream writeMutations:write.mutations];
  }
}

/** Handles a successful StreamingWriteResponse from the server that contains a mutation result. */
- (void)writeStreamDidReceiveResponseWithVersion:(const SnapshotVersion &)commitVersion
                                 mutationResults:(NSArray<FSTMutationResult *> *)results {
  // This is a response to a write containing mutations and should be correlated to the first
  // pending write.
  NSMutableArray *pendingWrites = self.pendingWrites;
  FSTMutationBatch *batch = pendingWrites[0];
  [pendingWrites removeObjectAtIndex:0];

  FSTMutationBatchResult *batchResult =
      [FSTMutationBatchResult resultWithBatch:batch
                                commitVersion:commitVersion
                              mutationResults:results
                                  streamToken:self.writeStream.lastStreamToken];
  [self.syncEngine applySuccessfulWriteWithResult:batchResult];

  // It's possible that with the completion of this mutation another slot has freed up.
  [self fillWritePipeline];
}

/**
 * Handles the closing of the StreamingWrite RPC, either because of an error or because the RPC
 * has been terminated by the client or the server.
 */
- (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {
  HARD_ASSERT([self isNetworkEnabled],
              "writeStreamDidClose: should only be called when the network is enabled");

  // If the write stream closed due to an error, invoke the error callbacks if there are pending
  // writes.
  if (error != nil && self.pendingWrites.count > 0) {
    if (self.writeStream.handshakeComplete) {
      // This error affects the actual writes.
      [self handleWriteError:error];
    } else {
      // If there was an error before the handshake finished, it's possible that the server is
      // unable to process the stream token we're sending. (Perhaps it's too old?)
      [self handleHandshakeError:error];
    }
  }

  // The write stream might have been started by refilling the write pipeline for failed writes
  if ([self shouldStartWriteStream]) {
    [self startWriteStream];
  }
}

- (void)handleHandshakeError:(NSError *)error {
  // Reset the token if it's a permanent error or the error code is ABORTED, signaling the write
  // stream is no longer valid.
  if ([FSTDatastore isPermanentWriteError:error] || [FSTDatastore isAbortedError:error]) {
    NSString *token = [self.writeStream.lastStreamToken base64EncodedStringWithOptions:0];
    LOG_DEBUG("FSTRemoteStore %s error before completed handshake; resetting stream token %s: %s",
              (__bridge void *)self, token, error);
    self.writeStream.lastStreamToken = nil;
    [self.localStore setLastStreamToken:nil];
  }
}

- (void)handleWriteError:(NSError *)error {
  // Only handle permanent error. If it's transient, just let the retry logic kick in.
  if (![FSTDatastore isPermanentWriteError:error]) {
    return;
  }

  // If this was a permanent error, the request itself was the problem so it's not going to
  // succeed if we resend it.
  FSTMutationBatch *batch = self.pendingWrites[0];
  [self.pendingWrites removeObjectAtIndex:0];

  // In this case it's also unlikely that the server itself is melting down--this was just a
  // bad request so inhibit backoff on the next restart.
  [self.writeStream inhibitBackoff];

  [self.syncEngine rejectFailedWriteWithBatchID:batch.batchID error:error];

  // It's possible that with the completion of this mutation another slot has freed up.
  [self fillWritePipeline];
}

- (FSTTransaction *)transaction {
  return [FSTTransaction transactionWithDatastore:self.datastore];
}

@end

NS_ASSUME_NONNULL_END