diff options
-rw-r--r-- | src/objective-c/RxLibrary/GRXBufferedPipe.m | 72 | ||||
-rw-r--r-- | src/objective-c/tests/RxLibraryUnitTests.m | 1 |
2 files changed, 24 insertions, 49 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 763f107678..9dfe332eef 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -35,7 +35,6 @@ @implementation GRXBufferedPipe { id<GRXWriteable> _writeable; - NSMutableArray *_queue; BOOL _inputIsFinished; NSError *_errorOrNil; dispatch_queue_t _writeQueue; @@ -49,63 +48,30 @@ - (instancetype)init { if (self = [super init]) { - _queue = [NSMutableArray array]; _state = GRXWriterStateNotStarted; _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + dispatch_suspend(_writeQueue); } return self; } -- (id)popValue { - id value = _queue[0]; - [_queue removeObjectAtIndex:0]; - return value; -} - -- (void)writeBufferUntilPausedOrStopped { - dispatch_async(_writeQueue, ^(void) { - while (_queue.count > 0) { - BOOL started; - @synchronized (self) { - started = (_state == GRXWriterStateStarted); - } - if (started) { - [_writeable writeValue:[self popValue]]; - } else { - break; - } - } - if (_inputIsFinished && _queue.count == 0) { - // Our writer finished normally while we were paused or not-started-yet. - [self finishWithError:_errorOrNil]; - } - }); -} - #pragma mark GRXWriteable implementation -// Returns whether events can be simply propagated to the other end of the pipe. -- (BOOL)shouldFastForward { - BOOL started; - @synchronized (self) { - started = (_state == GRXWriterStateStarted); - } - return _state == started && _queue.count == 0; -} - - (void)writeValue:(id)value { + if (_inputIsFinished) { + return; + } if ([value respondsToSelector:@selector(copy)]) { + // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. + // So just buffer the new value. + // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. value = [value copy]; } + __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^(void) { - if (self.shouldFastForward) { - // Skip the queue. - [_writeable writeValue:value]; - } else { - // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. - // So just buffer the new value. - // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. - [_queue addObject:value]; + GRXBufferedPipe *strongSelf = weakSelf; + if (strongSelf) { + [strongSelf->_writeable writeValue:value]; } }); } @@ -113,9 +79,14 @@ - (void)writesFinishedWithError:(NSError *)errorOrNil { _inputIsFinished = YES; _errorOrNil = errorOrNil; - if (errorOrNil || self.shouldFastForward) { + if (errorOrNil) { // No need to write pending values. [self finishWithError:_errorOrNil]; + } else { + // Wait until all the pending writes to be finished. + dispatch_sync(_writeQueue, ^{ + return; + }); } } @@ -130,18 +101,18 @@ switch (newState) { case GRXWriterStateFinished: _state = newState; - _queue = nil; // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the // writeable to be messaged anymore. _writeable = nil; return; case GRXWriterStatePaused: _state = newState; + dispatch_suspend(_writeQueue); return; case GRXWriterStateStarted: if (_state == GRXWriterStatePaused) { _state = newState; - [self writeBufferUntilPausedOrStopped]; + dispatch_resume(_writeQueue); } return; case GRXWriterStateNotStarted: @@ -150,9 +121,12 @@ } - (void)startWithWriteable:(id<GRXWriteable>)writeable { + if (_state != GRXWriterStateNotStarted) { + return; + } _state = GRXWriterStateStarted; _writeable = writeable; - [self writeBufferUntilPausedOrStopped]; + dispatch_resume(_writeQueue); } - (void)finishWithError:(NSError *)errorOrNil { diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m index 62fbdfcdf6..feade14eea 100644 --- a/src/objective-c/tests/RxLibraryUnitTests.m +++ b/src/objective-c/tests/RxLibraryUnitTests.m @@ -164,6 +164,7 @@ GRXBufferedPipe *pipe = [GRXBufferedPipe pipe]; [pipe startWithWriteable:writeable]; [pipe writeValue:anyValue]; + [pipe writesFinishedWithError:nil]; // Then: XCTAssertEqual(handler.timesCalled, 1); |