aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/RxLibrary/GRXBufferedPipe.m
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-05-10 14:26:40 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-05-10 14:26:40 -0700
commit91d7bb0c2eb38523c44c8de3e85ae63fa69b1f15 (patch)
tree2badd2d4076fe1404f91f6c3fa823f09ee996e2b /src/objective-c/RxLibrary/GRXBufferedPipe.m
parent20f2c431749ffd732c14519bf694e0adb25c9a73 (diff)
Sync writes with queue
Diffstat (limited to 'src/objective-c/RxLibrary/GRXBufferedPipe.m')
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m56
1 files changed, 37 insertions, 19 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index 4820c84af0..763f107678 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -38,6 +38,7 @@
NSMutableArray *_queue;
BOOL _inputIsFinished;
NSError *_errorOrNil;
+ dispatch_queue_t _writeQueue;
}
@synthesize state = _state;
@@ -50,6 +51,7 @@
if (self = [super init]) {
_queue = [NSMutableArray array];
_state = GRXWriterStateNotStarted;
+ _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
}
return self;
}
@@ -61,35 +63,51 @@
}
- (void)writeBufferUntilPausedOrStopped {
- while (_state == GRXWriterStateStarted && _queue.count > 0) {
- [_writeable writeValue:[self popValue]];
- }
- if (_inputIsFinished && _queue.count == 0) {
- // Our writer finished normally while we were paused or not-started-yet.
- [self finishWithError:_errorOrNil];
- }
+ dispatch_async(_writeQueue, ^(void) {
+ while (_queue.count > 0) {
+ BOOL started;
+ @synchronized (self) {
+ started = (_state == GRXWriterStateStarted);
+ }
+ if (started) {
+ [_writeable writeValue:[self popValue]];
+ } else {
+ break;
+ }
+ }
+ if (_inputIsFinished && _queue.count == 0) {
+ // Our writer finished normally while we were paused or not-started-yet.
+ [self finishWithError:_errorOrNil];
+ }
+ });
}
#pragma mark GRXWriteable implementation
// Returns whether events can be simply propagated to the other end of the pipe.
- (BOOL)shouldFastForward {
- return _state == GRXWriterStateStarted && _queue.count == 0;
+ BOOL started;
+ @synchronized (self) {
+ started = (_state == GRXWriterStateStarted);
+ }
+ return _state == started && _queue.count == 0;
}
- (void)writeValue:(id)value {
- if (self.shouldFastForward) {
- // Skip the queue.
- [_writeable writeValue:value];
- } else {
- // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
- // So just buffer the new value.
- // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
- if ([value respondsToSelector:@selector(copy)]) {
- value = [value copy];
- }
- [_queue addObject:value];
+ if ([value respondsToSelector:@selector(copy)]) {
+ value = [value copy];
}
+ dispatch_async(_writeQueue, ^(void) {
+ if (self.shouldFastForward) {
+ // Skip the queue.
+ [_writeable writeValue:value];
+ } else {
+ // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
+ // So just buffer the new value.
+ // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
+ [_queue addObject:value];
+ }
+ });
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {