diff options
author | 2017-06-06 14:39:15 -0700 | |
---|---|---|
committer | 2017-06-06 14:39:15 -0700 | |
commit | d7d6a2e12bb4ae0057f45b964555018245865cb2 (patch) | |
tree | 1d731c45048db751d2a3d70f06e1ec587185cde0 /src/objective-c | |
parent | e09e47ccb83e701c183bee0a0ad3468a8b246be7 (diff) |
Better concurrency handling
Diffstat (limited to 'src/objective-c')
-rw-r--r-- | src/objective-c/RxLibrary/GRXBufferedPipe.m | 45 | ||||
-rw-r--r-- | src/objective-c/tests/RxLibraryUnitTests.m | 7 |
2 files changed, 33 insertions, 19 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index eee41cc847..3bb9e2a0cf 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -34,13 +34,14 @@ #import "GRXBufferedPipe.h" @interface GRXBufferedPipe () -@property(atomic) NSError *errorOrNil; +@property(atomic) id<GRXWriteable> writeable; +@property(atomic) BOOL inputIsFinished; @end @implementation GRXBufferedPipe { - id<GRXWriteable> _writeable; - BOOL _inputIsFinished; + NSError *_errorOrNil; dispatch_queue_t _writeQueue; + dispatch_once_t _finishQueue; } @synthesize state = _state; @@ -53,6 +54,7 @@ if (self = [super init]) { _state = GRXWriterStateNotStarted; _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + self.inputIsFinished = NO; dispatch_suspend(_writeQueue); } return self; @@ -61,7 +63,7 @@ #pragma mark GRXWriteable implementation - (void)writeValue:(id)value { - if (_inputIsFinished) { + if (self.inputIsFinished) { return; } if ([value respondsToSelector:@selector(copy)]) { @@ -73,28 +75,32 @@ __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^(void) { GRXBufferedPipe *strongSelf = weakSelf; - if (strongSelf && !strongSelf.errorOrNil) { - [strongSelf->_writeable writeValue:value]; + if (strongSelf && strongSelf.writeable) { + [strongSelf.writeable writeValue:value]; } }); } - (void)writesFinishedWithError:(NSError *)errorOrNil { - if (_inputIsFinished) { + if (self.inputIsFinished) { return; } - _inputIsFinished = YES; - self.errorOrNil = errorOrNil; + self.inputIsFinished = YES; if (errorOrNil) { // No need to write pending values. - [self finishWithError:_errorOrNil]; + dispatch_once(&_finishQueue, ^{ + _errorOrNil = errorOrNil; + [self finishWithError:_errorOrNil]; + }); } else { - __weak GRXBufferedPipe *weakSelf = self; - dispatch_async(_writeQueue, ^{ - GRXBufferedPipe *strongSelf = weakSelf; - if (strongSelf) { - [strongSelf finishWithError:nil]; - } + dispatch_once(&_finishQueue, ^{ + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^{ + GRXBufferedPipe *strongSelf = weakSelf; + if (strongSelf) { + [strongSelf finishWithError:nil]; + } + }); }); } } @@ -143,9 +149,12 @@ } - (void)finishWithError:(NSError *)errorOrNil { - id<GRXWriteable> writeable = _writeable; + id<GRXWriteable> writeable = self.writeable; + self.writeable = nil; self.state = GRXWriterStateFinished; - [writeable writesFinishedWithError:errorOrNil]; + dispatch_async(_writeQueue, ^{ + [writeable writesFinishedWithError:errorOrNil]; + }); } @end diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m index 770c034220..500b8a64ef 100644 --- a/src/objective-c/tests/RxLibraryUnitTests.m +++ b/src/objective-c/tests/RxLibraryUnitTests.m @@ -182,9 +182,13 @@ } - (void)testBufferedPipePropagatesError { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil]; // If: @@ -193,6 +197,7 @@ [pipe writesFinishedWithError:anyError]; // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, nil); XCTAssertEqualObjects(handler.errorOrNil, anyError); |