aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/RxLibrary/GRXBufferedPipe.m
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-05-23 17:17:20 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-05-23 17:17:20 -0700
commit033db460bab9276e8c775d4eff7853160ab23b12 (patch)
tree1e3637d06971b6c111814b2f73eb8e3af6b61c89 /src/objective-c/RxLibrary/GRXBufferedPipe.m
parenta8d40b5672edbcf74c005e367f6ffd4954b35baa (diff)
Fix pipeline finishing bug
Diffstat (limited to 'src/objective-c/RxLibrary/GRXBufferedPipe.m')
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m64
1 files changed, 41 insertions, 23 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index bffabc53f2..90d51163dd 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -70,48 +70,66 @@
__weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^(void) {
GRXBufferedPipe *strongSelf = weakSelf;
- if (strongSelf) {
+ if (strongSelf && !strongSelf->_errorOrNil) {
[strongSelf->_writeable writeValue:value];
}
});
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
+ if (_inputIsFinished) {
+ return;
+ }
_inputIsFinished = YES;
_errorOrNil = errorOrNil;
if (errorOrNil) {
// No need to write pending values.
[self finishWithError:_errorOrNil];
+ } else {
+ __weak GRXBufferedPipe *weakSelf = self;
+ dispatch_async(_writeQueue, ^{
+ GRXBufferedPipe *strongSelf = weakSelf;
+ if (strongSelf) {
+ [strongSelf finishWithError:_errorOrNil];
+ }
+ });
}
}
#pragma mark GRXWriter implementation
- (void)setState:(GRXWriterState)newState {
- // Manual transitions are only allowed from the started or paused states.
- if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
- return;
- }
-
- switch (newState) {
- case GRXWriterStateFinished:
- _state = newState;
- // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
- // writeable to be messaged anymore.
- _writeable = nil;
- return;
- case GRXWriterStatePaused:
- _state = newState;
- dispatch_suspend(_writeQueue);
+ @synchronized (self) {
+ // Manual transitions are only allowed from the started or paused states.
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
return;
- case GRXWriterStateStarted:
- if (_state == GRXWriterStatePaused) {
+ }
+
+ switch (newState) {
+ case GRXWriterStateFinished:
+ if (_state == GRXWriterStatePaused) {
+ dispatch_resume(_writeQueue);
+ }
_state = newState;
- dispatch_resume(_writeQueue);
- }
- return;
- case GRXWriterStateNotStarted:
- return;
+ // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
+ // writeable to be messaged anymore.
+ _writeable = nil;
+ return;
+ case GRXWriterStatePaused:
+ if (_state == GRXWriterStateStarted) {
+ _state = newState;
+ dispatch_suspend(_writeQueue);
+ }
+ return;
+ case GRXWriterStateStarted:
+ if (_state == GRXWriterStatePaused) {
+ _state = newState;
+ dispatch_resume(_writeQueue);
+ }
+ return;
+ case GRXWriterStateNotStarted:
+ return;
+ }
}
}