From 033db460bab9276e8c775d4eff7853160ab23b12 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 23 May 2017 17:17:20 -0700 Subject: Fix pipeline finishing bug --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 64 ++++++++++++++++++----------- 1 file changed, 41 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index bffabc53f2..90d51163dd 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -70,48 +70,66 @@ __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^(void) { GRXBufferedPipe *strongSelf = weakSelf; - if (strongSelf) { + if (strongSelf && !strongSelf->_errorOrNil) { [strongSelf->_writeable writeValue:value]; } }); } - (void)writesFinishedWithError:(NSError *)errorOrNil { + if (_inputIsFinished) { + return; + } _inputIsFinished = YES; _errorOrNil = errorOrNil; if (errorOrNil) { // No need to write pending values. [self finishWithError:_errorOrNil]; + } else { + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^{ + GRXBufferedPipe *strongSelf = weakSelf; + if (strongSelf) { + [strongSelf finishWithError:_errorOrNil]; + } + }); } } #pragma mark GRXWriter implementation - (void)setState:(GRXWriterState)newState { - // Manual transitions are only allowed from the started or paused states. - if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { - return; - } - - switch (newState) { - case GRXWriterStateFinished: - _state = newState; - // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the - // writeable to be messaged anymore. - _writeable = nil; - return; - case GRXWriterStatePaused: - _state = newState; - dispatch_suspend(_writeQueue); + @synchronized (self) { + // Manual transitions are only allowed from the started or paused states. + if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { return; - case GRXWriterStateStarted: - if (_state == GRXWriterStatePaused) { + } + + switch (newState) { + case GRXWriterStateFinished: + if (_state == GRXWriterStatePaused) { + dispatch_resume(_writeQueue); + } _state = newState; - dispatch_resume(_writeQueue); - } - return; - case GRXWriterStateNotStarted: - return; + // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the + // writeable to be messaged anymore. + _writeable = nil; + return; + case GRXWriterStatePaused: + if (_state == GRXWriterStateStarted) { + _state = newState; + dispatch_suspend(_writeQueue); + } + return; + case GRXWriterStateStarted: + if (_state == GRXWriterStatePaused) { + _state = newState; + dispatch_resume(_writeQueue); + } + return; + case GRXWriterStateNotStarted: + return; + } } } -- cgit v1.2.3