aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/RxLibrary/GRXBufferedPipe.m
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-06-17 15:20:09 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-06-17 15:20:09 -0700
commit9ae2a7dc8653f1f9e7a83dfb249b4e93ea697a7c (patch)
tree33aa06eb352cd19e80432887e240119d4b15543f /src/objective-c/RxLibrary/GRXBufferedPipe.m
parentd4792e9d8dc9cdfe0514305eb4e14a6ec4d322da (diff)
Remove conditionals covered by invoke agreement
Diffstat (limited to 'src/objective-c/RxLibrary/GRXBufferedPipe.m')
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m29
1 files changed, 9 insertions, 20 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index fbdaf9d338..b69305b41f 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -35,13 +35,11 @@
@interface GRXBufferedPipe ()
@property(atomic) id<GRXWriteable> writeable;
-@property(atomic) BOOL inputIsFinished;
@end
@implementation GRXBufferedPipe {
NSError *_errorOrNil;
dispatch_queue_t _writeQueue;
- dispatch_once_t _finishAction;
}
@synthesize state = _state;
@@ -62,9 +60,6 @@
#pragma mark GRXWriteable implementation
- (void)writeValue:(id)value {
- if (self.inputIsFinished) {
- return;
- }
if ([value respondsToSelector:@selector(copy)]) {
// Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
// So just buffer the new value.
@@ -78,22 +73,16 @@
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- if (self.inputIsFinished) {
- return;
+ if (errorOrNil) {
+ // No need to write pending values.
+ _errorOrNil = errorOrNil;
+ [self finishWithError:_errorOrNil];
+ } else {
+ __weak GRXBufferedPipe *weakSelf = self;
+ dispatch_async(_writeQueue, ^{
+ [weakSelf finishWithError:nil];
+ });
}
- self.inputIsFinished = YES;
- dispatch_once(&_finishAction, ^{
- if (errorOrNil) {
- // No need to write pending values.
- _errorOrNil = errorOrNil;
- [self finishWithError:_errorOrNil];
- } else {
- __weak GRXBufferedPipe *weakSelf = self;
- dispatch_async(_writeQueue, ^{
- [weakSelf finishWithError:nil];
- });
- }
- });
}
#pragma mark GRXWriter implementation