aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m27
1 files changed, 9 insertions, 18 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index 3bb9e2a0cf..fbdaf9d338 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -41,7 +41,7 @@
@implementation GRXBufferedPipe {
NSError *_errorOrNil;
dispatch_queue_t _writeQueue;
- dispatch_once_t _finishQueue;
+ dispatch_once_t _finishAction;
}
@synthesize state = _state;
@@ -54,7 +54,6 @@
if (self = [super init]) {
_state = GRXWriterStateNotStarted;
_writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
- self.inputIsFinished = NO;
dispatch_suspend(_writeQueue);
}
return self;
@@ -74,10 +73,7 @@
}
__weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^(void) {
- GRXBufferedPipe *strongSelf = weakSelf;
- if (strongSelf && strongSelf.writeable) {
- [strongSelf.writeable writeValue:value];
- }
+ [weakSelf.writeable writeValue:value];
});
}
@@ -86,23 +82,18 @@
return;
}
self.inputIsFinished = YES;
- if (errorOrNil) {
- // No need to write pending values.
- dispatch_once(&_finishQueue, ^{
+ dispatch_once(&_finishAction, ^{
+ if (errorOrNil) {
+ // No need to write pending values.
_errorOrNil = errorOrNil;
[self finishWithError:_errorOrNil];
- });
- } else {
- dispatch_once(&_finishQueue, ^{
+ } else {
__weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^{
- GRXBufferedPipe *strongSelf = weakSelf;
- if (strongSelf) {
- [strongSelf finishWithError:nil];
- }
+ [weakSelf finishWithError:nil];
});
- });
- }
+ }
+ });
}
#pragma mark GRXWriter implementation