diff options
author | Muxi Yan <mxyan@google.com> | 2017-05-23 17:17:20 -0700 |
---|---|---|
committer | Muxi Yan <mxyan@google.com> | 2017-05-23 17:17:20 -0700 |
commit | 033db460bab9276e8c775d4eff7853160ab23b12 (patch) | |
tree | 1e3637d06971b6c111814b2f73eb8e3af6b61c89 /src | |
parent | a8d40b5672edbcf74c005e367f6ffd4954b35baa (diff) |
Fix pipeline finishing bug
Diffstat (limited to 'src')
-rw-r--r-- | src/objective-c/RxLibrary/GRXBufferedPipe.m | 64 |
1 files changed, 41 insertions, 23 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index bffabc53f2..90d51163dd 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -70,48 +70,66 @@ __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^(void) { GRXBufferedPipe *strongSelf = weakSelf; - if (strongSelf) { + if (strongSelf && !strongSelf->_errorOrNil) { [strongSelf->_writeable writeValue:value]; } }); } - (void)writesFinishedWithError:(NSError *)errorOrNil { + if (_inputIsFinished) { + return; + } _inputIsFinished = YES; _errorOrNil = errorOrNil; if (errorOrNil) { // No need to write pending values. [self finishWithError:_errorOrNil]; + } else { + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^{ + GRXBufferedPipe *strongSelf = weakSelf; + if (strongSelf) { + [strongSelf finishWithError:_errorOrNil]; + } + }); } } #pragma mark GRXWriter implementation - (void)setState:(GRXWriterState)newState { - // Manual transitions are only allowed from the started or paused states. - if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { - return; - } - - switch (newState) { - case GRXWriterStateFinished: - _state = newState; - // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the - // writeable to be messaged anymore. - _writeable = nil; - return; - case GRXWriterStatePaused: - _state = newState; - dispatch_suspend(_writeQueue); + @synchronized (self) { + // Manual transitions are only allowed from the started or paused states. + if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { return; - case GRXWriterStateStarted: - if (_state == GRXWriterStatePaused) { + } + + switch (newState) { + case GRXWriterStateFinished: + if (_state == GRXWriterStatePaused) { + dispatch_resume(_writeQueue); + } _state = newState; - dispatch_resume(_writeQueue); - } - return; - case GRXWriterStateNotStarted: - return; + // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the + // writeable to be messaged anymore. + _writeable = nil; + return; + case GRXWriterStatePaused: + if (_state == GRXWriterStateStarted) { + _state = newState; + dispatch_suspend(_writeQueue); + } + return; + case GRXWriterStateStarted: + if (_state == GRXWriterStatePaused) { + _state = newState; + dispatch_resume(_writeQueue); + } + return; + case GRXWriterStateNotStarted: + return; + } } } |