aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firestore/Source/Remote/FSTBufferedWriter.mm
blob: 47dbb21c0fc82a0d4064afafd4e83831e9482ff0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*
 * Copyright 2017 Google
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#import <Protobuf/GPBProtocolBuffers.h>

#import "Firestore/Source/Remote/FSTBufferedWriter.h"

NS_ASSUME_NONNULL_BEGIN

@implementation FSTBufferedWriter {
  GRXWriterState _state;
  NSMutableArray<NSData *> *_queue;

  id<GRXWriteable> _writeable;
}

- (instancetype)init {
  if (self = [super init]) {
    _state = GRXWriterStateNotStarted;
    _queue = [[NSMutableArray alloc] init];
  }
  return self;
}

#pragma mark - GRXWriteable implementation

/** Push the next value of the sequence to the receiving object. */
- (void)writeValue:(id)value {
  if (_state == GRXWriterStateStarted && _queue.count == 0) {
    // Skip the queue.
    [_writeable writeValue:value];
  } else {
    // Buffer the new value. Note that the value is assumed to be transient and doesn't need to
    // be copied.
    [_queue addObject:value];
  }
}

/**
 * Signal that the sequence is completed, or that an error ocurred. After this message is sent to
 * the receiver, neither it nor writeValue: may be called again.
 */
- (void)writesFinishedWithError:(nullable NSError *)error {
  // Unimplemented. If we ever wanted to implement sender-side initiated half close we could do so
  // by buffering (or sending) and error.
  [self doesNotRecognizeSelector:_cmd];
}

#pragma mark GRXWriter implementation
// The GRXWriter implementation defines the send side of the RPC stream. Once the RPC is ready it
// will call startWithWriteable passing a GRXWriteable into which requests can be written but only
// when the GRXWriter is in the started state.

/**
 * Called by GRPCCall when it is ready to accept for the first request. Requests should be written
 * to the passed writeable.
 *
 * GRPCCall will synchronize on the receiver around this call.
 */
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
  _state = GRXWriterStateStarted;
  _writeable = writeable;
}

/**
 * Called by GRPCCall to implement flow control on the sending side of the stream. After each
 * writeValue: on the requestsWriteable, GRPCCall will call setState:GRXWriterStatePaused to apply
 * backpressure. Once the stream is ready to accept another message, GRPCCall will call
 * setState:GRXWriterStateStarted.
 *
 * GRPCCall will synchronize on the receiver around this call.
 */
- (void)setState:(GRXWriterState)newState {
  // Manual transitions are only allowed from the started or paused states.
  if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
    return;
  }

  switch (newState) {
    case GRXWriterStateFinished:
      _state = newState;
      // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
      // writeable to be messaged anymore.
      _queue = nil;
      _writeable = nil;
      return;
    case GRXWriterStatePaused:
      _state = newState;
      return;
    case GRXWriterStateStarted:
      if (_state == GRXWriterStatePaused) {
        _state = newState;
        [self writeBufferedMessages];
      }
      return;
    case GRXWriterStateNotStarted:
      return;
  }
}

- (void)finishWithError:(nullable NSError *)error {
  [_writeable writesFinishedWithError:error];
  self.state = GRXWriterStateFinished;
}

- (void)writeBufferedMessages {
  while (_state == GRXWriterStateStarted && _queue.count > 0) {
    id value = _queue[0];
    [_queue removeObjectAtIndex:0];

    // In addition to writing the value here GRPC will apply backpressure by pausing the GRXWriter
    // wrapping this buffer. That writer must call -pauseMessages which will cause this loop to
    // exit. Synchronization is not required since the callback happens within the body of the
    // writeValue implementation.
    [_writeable writeValue:value];
  }
}

@end

NS_ASSUME_NONNULL_END