aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-06-06 14:39:15 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-06-06 14:39:15 -0700
commitd7d6a2e12bb4ae0057f45b964555018245865cb2 (patch)
tree1d731c45048db751d2a3d70f06e1ec587185cde0 /src/objective-c
parente09e47ccb83e701c183bee0a0ad3468a8b246be7 (diff)
Better concurrency handling
Diffstat (limited to 'src/objective-c')
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m45
-rw-r--r--src/objective-c/tests/RxLibraryUnitTests.m7
2 files changed, 33 insertions, 19 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index eee41cc847..3bb9e2a0cf 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -34,13 +34,14 @@
#import "GRXBufferedPipe.h"
@interface GRXBufferedPipe ()
-@property(atomic) NSError *errorOrNil;
+@property(atomic) id<GRXWriteable> writeable;
+@property(atomic) BOOL inputIsFinished;
@end
@implementation GRXBufferedPipe {
- id<GRXWriteable> _writeable;
- BOOL _inputIsFinished;
+ NSError *_errorOrNil;
dispatch_queue_t _writeQueue;
+ dispatch_once_t _finishQueue;
}
@synthesize state = _state;
@@ -53,6 +54,7 @@
if (self = [super init]) {
_state = GRXWriterStateNotStarted;
_writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ self.inputIsFinished = NO;
dispatch_suspend(_writeQueue);
}
return self;
@@ -61,7 +63,7 @@
#pragma mark GRXWriteable implementation
- (void)writeValue:(id)value {
- if (_inputIsFinished) {
+ if (self.inputIsFinished) {
return;
}
if ([value respondsToSelector:@selector(copy)]) {
@@ -73,28 +75,32 @@
__weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^(void) {
GRXBufferedPipe *strongSelf = weakSelf;
- if (strongSelf && !strongSelf.errorOrNil) {
- [strongSelf->_writeable writeValue:value];
+ if (strongSelf && strongSelf.writeable) {
+ [strongSelf.writeable writeValue:value];
}
});
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- if (_inputIsFinished) {
+ if (self.inputIsFinished) {
return;
}
- _inputIsFinished = YES;
- self.errorOrNil = errorOrNil;
+ self.inputIsFinished = YES;
if (errorOrNil) {
// No need to write pending values.
- [self finishWithError:_errorOrNil];
+ dispatch_once(&_finishQueue, ^{
+ _errorOrNil = errorOrNil;
+ [self finishWithError:_errorOrNil];
+ });
} else {
- __weak GRXBufferedPipe *weakSelf = self;
- dispatch_async(_writeQueue, ^{
- GRXBufferedPipe *strongSelf = weakSelf;
- if (strongSelf) {
- [strongSelf finishWithError:nil];
- }
+ dispatch_once(&_finishQueue, ^{
+ __weak GRXBufferedPipe *weakSelf = self;
+ dispatch_async(_writeQueue, ^{
+ GRXBufferedPipe *strongSelf = weakSelf;
+ if (strongSelf) {
+ [strongSelf finishWithError:nil];
+ }
+ });
});
}
}
@@ -143,9 +149,12 @@
}
- (void)finishWithError:(NSError *)errorOrNil {
- id<GRXWriteable> writeable = _writeable;
+ id<GRXWriteable> writeable = self.writeable;
+ self.writeable = nil;
self.state = GRXWriterStateFinished;
- [writeable writesFinishedWithError:errorOrNil];
+ dispatch_async(_writeQueue, ^{
+ [writeable writesFinishedWithError:errorOrNil];
+ });
}
@end
diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m
index 770c034220..500b8a64ef 100644
--- a/src/objective-c/tests/RxLibraryUnitTests.m
+++ b/src/objective-c/tests/RxLibraryUnitTests.m
@@ -182,9 +182,13 @@
}
- (void)testBufferedPipePropagatesError {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"];
// Given:
CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
- id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) {
+ handler.block(value, errorOrNil);
+ [expectation fulfill];
+ }];
NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil];
// If:
@@ -193,6 +197,7 @@
[pipe writesFinishedWithError:anyError];
// Then:
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
XCTAssertEqual(handler.timesCalled, 1);
XCTAssertEqualObjects(handler.value, nil);
XCTAssertEqualObjects(handler.errorOrNil, anyError);