aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/RxLibrary/GRXBufferedPipe.m
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-06-06 14:39:15 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-06-06 14:39:15 -0700
commitd7d6a2e12bb4ae0057f45b964555018245865cb2 (patch)
tree1d731c45048db751d2a3d70f06e1ec587185cde0 /src/objective-c/RxLibrary/GRXBufferedPipe.m
parente09e47ccb83e701c183bee0a0ad3468a8b246be7 (diff)
Better concurrency handling
Diffstat (limited to 'src/objective-c/RxLibrary/GRXBufferedPipe.m')
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m45
1 files changed, 27 insertions, 18 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index eee41cc847..3bb9e2a0cf 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -34,13 +34,14 @@
#import "GRXBufferedPipe.h"
@interface GRXBufferedPipe ()
-@property(atomic) NSError *errorOrNil;
+@property(atomic) id<GRXWriteable> writeable;
+@property(atomic) BOOL inputIsFinished;
@end
@implementation GRXBufferedPipe {
- id<GRXWriteable> _writeable;
- BOOL _inputIsFinished;
+ NSError *_errorOrNil;
dispatch_queue_t _writeQueue;
+ dispatch_once_t _finishQueue;
}
@synthesize state = _state;
@@ -53,6 +54,7 @@
if (self = [super init]) {
_state = GRXWriterStateNotStarted;
_writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ self.inputIsFinished = NO;
dispatch_suspend(_writeQueue);
}
return self;
@@ -61,7 +63,7 @@
#pragma mark GRXWriteable implementation
- (void)writeValue:(id)value {
- if (_inputIsFinished) {
+ if (self.inputIsFinished) {
return;
}
if ([value respondsToSelector:@selector(copy)]) {
@@ -73,28 +75,32 @@
__weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^(void) {
GRXBufferedPipe *strongSelf = weakSelf;
- if (strongSelf && !strongSelf.errorOrNil) {
- [strongSelf->_writeable writeValue:value];
+ if (strongSelf && strongSelf.writeable) {
+ [strongSelf.writeable writeValue:value];
}
});
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- if (_inputIsFinished) {
+ if (self.inputIsFinished) {
return;
}
- _inputIsFinished = YES;
- self.errorOrNil = errorOrNil;
+ self.inputIsFinished = YES;
if (errorOrNil) {
// No need to write pending values.
- [self finishWithError:_errorOrNil];
+ dispatch_once(&_finishQueue, ^{
+ _errorOrNil = errorOrNil;
+ [self finishWithError:_errorOrNil];
+ });
} else {
- __weak GRXBufferedPipe *weakSelf = self;
- dispatch_async(_writeQueue, ^{
- GRXBufferedPipe *strongSelf = weakSelf;
- if (strongSelf) {
- [strongSelf finishWithError:nil];
- }
+ dispatch_once(&_finishQueue, ^{
+ __weak GRXBufferedPipe *weakSelf = self;
+ dispatch_async(_writeQueue, ^{
+ GRXBufferedPipe *strongSelf = weakSelf;
+ if (strongSelf) {
+ [strongSelf finishWithError:nil];
+ }
+ });
});
}
}
@@ -143,9 +149,12 @@
}
- (void)finishWithError:(NSError *)errorOrNil {
- id<GRXWriteable> writeable = _writeable;
+ id<GRXWriteable> writeable = self.writeable;
+ self.writeable = nil;
self.state = GRXWriterStateFinished;
- [writeable writesFinishedWithError:errorOrNil];
+ dispatch_async(_writeQueue, ^{
+ [writeable writesFinishedWithError:errorOrNil];
+ });
}
@end