diff options
author | Muxi Yan <mxyan@google.com> | 2017-05-10 14:26:40 -0700 |
---|---|---|
committer | Muxi Yan <mxyan@google.com> | 2017-05-10 14:26:40 -0700 |
commit | 91d7bb0c2eb38523c44c8de3e85ae63fa69b1f15 (patch) | |
tree | 2badd2d4076fe1404f91f6c3fa823f09ee996e2b /src/objective-c/RxLibrary/GRXBufferedPipe.m | |
parent | 20f2c431749ffd732c14519bf694e0adb25c9a73 (diff) |
Sync writes with queue
Diffstat (limited to 'src/objective-c/RxLibrary/GRXBufferedPipe.m')
-rw-r--r-- | src/objective-c/RxLibrary/GRXBufferedPipe.m | 56 |
1 files changed, 37 insertions, 19 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 4820c84af0..763f107678 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -38,6 +38,7 @@ NSMutableArray *_queue; BOOL _inputIsFinished; NSError *_errorOrNil; + dispatch_queue_t _writeQueue; } @synthesize state = _state; @@ -50,6 +51,7 @@ if (self = [super init]) { _queue = [NSMutableArray array]; _state = GRXWriterStateNotStarted; + _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); } return self; } @@ -61,35 +63,51 @@ } - (void)writeBufferUntilPausedOrStopped { - while (_state == GRXWriterStateStarted && _queue.count > 0) { - [_writeable writeValue:[self popValue]]; - } - if (_inputIsFinished && _queue.count == 0) { - // Our writer finished normally while we were paused or not-started-yet. - [self finishWithError:_errorOrNil]; - } + dispatch_async(_writeQueue, ^(void) { + while (_queue.count > 0) { + BOOL started; + @synchronized (self) { + started = (_state == GRXWriterStateStarted); + } + if (started) { + [_writeable writeValue:[self popValue]]; + } else { + break; + } + } + if (_inputIsFinished && _queue.count == 0) { + // Our writer finished normally while we were paused or not-started-yet. + [self finishWithError:_errorOrNil]; + } + }); } #pragma mark GRXWriteable implementation // Returns whether events can be simply propagated to the other end of the pipe. - (BOOL)shouldFastForward { - return _state == GRXWriterStateStarted && _queue.count == 0; + BOOL started; + @synchronized (self) { + started = (_state == GRXWriterStateStarted); + } + return _state == started && _queue.count == 0; } - (void)writeValue:(id)value { - if (self.shouldFastForward) { - // Skip the queue. - [_writeable writeValue:value]; - } else { - // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. - // So just buffer the new value. - // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. - if ([value respondsToSelector:@selector(copy)]) { - value = [value copy]; - } - [_queue addObject:value]; + if ([value respondsToSelector:@selector(copy)]) { + value = [value copy]; } + dispatch_async(_writeQueue, ^(void) { + if (self.shouldFastForward) { + // Skip the queue. + [_writeable writeValue:value]; + } else { + // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. + // So just buffer the new value. + // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. + [_queue addObject:value]; + } + }); } - (void)writesFinishedWithError:(NSError *)errorOrNil { |