diff options
Diffstat (limited to 'src/objective-c/RxLibrary/GRXBufferedPipe.m')
-rw-r--r-- | src/objective-c/RxLibrary/GRXBufferedPipe.m | 45 |
1 files changed, 27 insertions, 18 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index eee41cc847..3bb9e2a0cf 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -34,13 +34,14 @@ #import "GRXBufferedPipe.h" @interface GRXBufferedPipe () -@property(atomic) NSError *errorOrNil; +@property(atomic) id<GRXWriteable> writeable; +@property(atomic) BOOL inputIsFinished; @end @implementation GRXBufferedPipe { - id<GRXWriteable> _writeable; - BOOL _inputIsFinished; + NSError *_errorOrNil; dispatch_queue_t _writeQueue; + dispatch_once_t _finishQueue; } @synthesize state = _state; @@ -53,6 +54,7 @@ if (self = [super init]) { _state = GRXWriterStateNotStarted; _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + self.inputIsFinished = NO; dispatch_suspend(_writeQueue); } return self; @@ -61,7 +63,7 @@ #pragma mark GRXWriteable implementation - (void)writeValue:(id)value { - if (_inputIsFinished) { + if (self.inputIsFinished) { return; } if ([value respondsToSelector:@selector(copy)]) { @@ -73,28 +75,32 @@ __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^(void) { GRXBufferedPipe *strongSelf = weakSelf; - if (strongSelf && !strongSelf.errorOrNil) { - [strongSelf->_writeable writeValue:value]; + if (strongSelf && strongSelf.writeable) { + [strongSelf.writeable writeValue:value]; } }); } - (void)writesFinishedWithError:(NSError *)errorOrNil { - if (_inputIsFinished) { + if (self.inputIsFinished) { return; } - _inputIsFinished = YES; - self.errorOrNil = errorOrNil; + self.inputIsFinished = YES; if (errorOrNil) { // No need to write pending values. - [self finishWithError:_errorOrNil]; + dispatch_once(&_finishQueue, ^{ + _errorOrNil = errorOrNil; + [self finishWithError:_errorOrNil]; + }); } else { - __weak GRXBufferedPipe *weakSelf = self; - dispatch_async(_writeQueue, ^{ - GRXBufferedPipe *strongSelf = weakSelf; - if (strongSelf) { - [strongSelf finishWithError:nil]; - } + dispatch_once(&_finishQueue, ^{ + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^{ + GRXBufferedPipe *strongSelf = weakSelf; + if (strongSelf) { + [strongSelf finishWithError:nil]; + } + }); }); } } @@ -143,9 +149,12 @@ } - (void)finishWithError:(NSError *)errorOrNil { - id<GRXWriteable> writeable = _writeable; + id<GRXWriteable> writeable = self.writeable; + self.writeable = nil; self.state = GRXWriterStateFinished; - [writeable writesFinishedWithError:errorOrNil]; + dispatch_async(_writeQueue, ^{ + [writeable writesFinishedWithError:errorOrNil]; + }); } @end |