From 91d7bb0c2eb38523c44c8de3e85ae63fa69b1f15 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 10 May 2017 14:26:40 -0700 Subject: Sync writes with queue --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 56 +++++++++++++++++++---------- 1 file changed, 37 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 4820c84af0..763f107678 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -38,6 +38,7 @@ NSMutableArray *_queue; BOOL _inputIsFinished; NSError *_errorOrNil; + dispatch_queue_t _writeQueue; } @synthesize state = _state; @@ -50,6 +51,7 @@ if (self = [super init]) { _queue = [NSMutableArray array]; _state = GRXWriterStateNotStarted; + _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); } return self; } @@ -61,35 +63,51 @@ } - (void)writeBufferUntilPausedOrStopped { - while (_state == GRXWriterStateStarted && _queue.count > 0) { - [_writeable writeValue:[self popValue]]; - } - if (_inputIsFinished && _queue.count == 0) { - // Our writer finished normally while we were paused or not-started-yet. - [self finishWithError:_errorOrNil]; - } + dispatch_async(_writeQueue, ^(void) { + while (_queue.count > 0) { + BOOL started; + @synchronized (self) { + started = (_state == GRXWriterStateStarted); + } + if (started) { + [_writeable writeValue:[self popValue]]; + } else { + break; + } + } + if (_inputIsFinished && _queue.count == 0) { + // Our writer finished normally while we were paused or not-started-yet. + [self finishWithError:_errorOrNil]; + } + }); } #pragma mark GRXWriteable implementation // Returns whether events can be simply propagated to the other end of the pipe. - (BOOL)shouldFastForward { - return _state == GRXWriterStateStarted && _queue.count == 0; + BOOL started; + @synchronized (self) { + started = (_state == GRXWriterStateStarted); + } + return _state == started && _queue.count == 0; } - (void)writeValue:(id)value { - if (self.shouldFastForward) { - // Skip the queue. - [_writeable writeValue:value]; - } else { - // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. - // So just buffer the new value. - // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. - if ([value respondsToSelector:@selector(copy)]) { - value = [value copy]; - } - [_queue addObject:value]; + if ([value respondsToSelector:@selector(copy)]) { + value = [value copy]; } + dispatch_async(_writeQueue, ^(void) { + if (self.shouldFastForward) { + // Skip the queue. + [_writeable writeValue:value]; + } else { + // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. + // So just buffer the new value. + // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. + [_queue addObject:value]; + } + }); } - (void)writesFinishedWithError:(NSError *)errorOrNil { -- cgit v1.2.3 From ec8e82507ebd9439d02d54937d0f4e8179e42fdc Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 15 May 2017 14:59:07 -0700 Subject: Use dispatch_queue to serialize writes --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 72 +++++++++-------------------- src/objective-c/tests/RxLibraryUnitTests.m | 1 + 2 files changed, 24 insertions(+), 49 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 763f107678..9dfe332eef 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -35,7 +35,6 @@ @implementation GRXBufferedPipe { id _writeable; - NSMutableArray *_queue; BOOL _inputIsFinished; NSError *_errorOrNil; dispatch_queue_t _writeQueue; @@ -49,63 +48,30 @@ - (instancetype)init { if (self = [super init]) { - _queue = [NSMutableArray array]; _state = GRXWriterStateNotStarted; _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + dispatch_suspend(_writeQueue); } return self; } -- (id)popValue { - id value = _queue[0]; - [_queue removeObjectAtIndex:0]; - return value; -} - -- (void)writeBufferUntilPausedOrStopped { - dispatch_async(_writeQueue, ^(void) { - while (_queue.count > 0) { - BOOL started; - @synchronized (self) { - started = (_state == GRXWriterStateStarted); - } - if (started) { - [_writeable writeValue:[self popValue]]; - } else { - break; - } - } - if (_inputIsFinished && _queue.count == 0) { - // Our writer finished normally while we were paused or not-started-yet. - [self finishWithError:_errorOrNil]; - } - }); -} - #pragma mark GRXWriteable implementation -// Returns whether events can be simply propagated to the other end of the pipe. -- (BOOL)shouldFastForward { - BOOL started; - @synchronized (self) { - started = (_state == GRXWriterStateStarted); - } - return _state == started && _queue.count == 0; -} - - (void)writeValue:(id)value { + if (_inputIsFinished) { + return; + } if ([value respondsToSelector:@selector(copy)]) { + // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. + // So just buffer the new value. + // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. value = [value copy]; } + __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^(void) { - if (self.shouldFastForward) { - // Skip the queue. - [_writeable writeValue:value]; - } else { - // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. - // So just buffer the new value. - // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. - [_queue addObject:value]; + GRXBufferedPipe *strongSelf = weakSelf; + if (strongSelf) { + [strongSelf->_writeable writeValue:value]; } }); } @@ -113,9 +79,14 @@ - (void)writesFinishedWithError:(NSError *)errorOrNil { _inputIsFinished = YES; _errorOrNil = errorOrNil; - if (errorOrNil || self.shouldFastForward) { + if (errorOrNil) { // No need to write pending values. [self finishWithError:_errorOrNil]; + } else { + // Wait until all the pending writes to be finished. + dispatch_sync(_writeQueue, ^{ + return; + }); } } @@ -130,18 +101,18 @@ switch (newState) { case GRXWriterStateFinished: _state = newState; - _queue = nil; // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the // writeable to be messaged anymore. _writeable = nil; return; case GRXWriterStatePaused: _state = newState; + dispatch_suspend(_writeQueue); return; case GRXWriterStateStarted: if (_state == GRXWriterStatePaused) { _state = newState; - [self writeBufferUntilPausedOrStopped]; + dispatch_resume(_writeQueue); } return; case GRXWriterStateNotStarted: @@ -150,9 +121,12 @@ } - (void)startWithWriteable:(id)writeable { + if (_state != GRXWriterStateNotStarted) { + return; + } _state = GRXWriterStateStarted; _writeable = writeable; - [self writeBufferUntilPausedOrStopped]; + dispatch_resume(_writeQueue); } - (void)finishWithError:(NSError *)errorOrNil { diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m index 62fbdfcdf6..feade14eea 100644 --- a/src/objective-c/tests/RxLibraryUnitTests.m +++ b/src/objective-c/tests/RxLibraryUnitTests.m @@ -164,6 +164,7 @@ GRXBufferedPipe *pipe = [GRXBufferedPipe pipe]; [pipe startWithWriteable:writeable]; [pipe writeValue:anyValue]; + [pipe writesFinishedWithError:nil]; // Then: XCTAssertEqual(handler.timesCalled, 1); -- cgit v1.2.3 From 49b7f8349be4c5ca9d180133d35e57f912f0a373 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 22 May 2017 10:39:48 -0700 Subject: Remove synchronization in GRXBufferedPipe:writesFinishedWithError and change tests correspondingly --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 5 ----- src/objective-c/tests/RxLibraryUnitTests.m | 4 ++++ 2 files changed, 4 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 9dfe332eef..bffabc53f2 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -82,11 +82,6 @@ if (errorOrNil) { // No need to write pending values. [self finishWithError:_errorOrNil]; - } else { - // Wait until all the pending writes to be finished. - dispatch_sync(_writeQueue, ^{ - return; - }); } } diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m index feade14eea..e2ff6e4598 100644 --- a/src/objective-c/tests/RxLibraryUnitTests.m +++ b/src/objective-c/tests/RxLibraryUnitTests.m @@ -166,6 +166,9 @@ [pipe writeValue:anyValue]; [pipe writesFinishedWithError:nil]; + // Wait buffered pipe to be flushed. + sleep(1); + // Then: XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, anyValue); @@ -202,6 +205,7 @@ [pipe writesFinishedWithError:nil]; // then start the writeable [pipe startWithWriteable:writeable]; + sleep(1); // Then: XCTAssertEqual(handler.timesCalled, 1); -- cgit v1.2.3 From a8d40b5672edbcf74c005e367f6ffd4954b35baa Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 23 May 2017 17:15:23 -0700 Subject: Asynchronously wait for tests --- src/objective-c/tests/RxLibraryUnitTests.m | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m index e2ff6e4598..770c034220 100644 --- a/src/objective-c/tests/RxLibraryUnitTests.m +++ b/src/objective-c/tests/RxLibraryUnitTests.m @@ -38,6 +38,8 @@ #import #import +#define TEST_TIMEOUT 1 + // A mock of a GRXSingleValueHandler block that can be queried for how many times it was called and // what were the last values passed to it. // @@ -155,9 +157,14 @@ #pragma mark BufferedPipe - (void)testBufferedPipePropagatesValue { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; + id anyValue = @7; // If: @@ -166,13 +173,12 @@ [pipe writeValue:anyValue]; [pipe writesFinishedWithError:nil]; - // Wait buffered pipe to be flushed. - sleep(1); - // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, anyValue); XCTAssertEqualObjects(handler.errorOrNil, nil); + } - (void)testBufferedPipePropagatesError { @@ -193,9 +199,13 @@ } - (void)testBufferedPipeFinishWriteWhilePaused { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; id anyValue = @7; // If: @@ -205,9 +215,9 @@ [pipe writesFinishedWithError:nil]; // then start the writeable [pipe startWithWriteable:writeable]; - sleep(1); // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, anyValue); XCTAssertEqualObjects(handler.errorOrNil, nil); -- cgit v1.2.3 From 033db460bab9276e8c775d4eff7853160ab23b12 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 23 May 2017 17:17:20 -0700 Subject: Fix pipeline finishing bug --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 64 ++++++++++++++++++----------- 1 file changed, 41 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index bffabc53f2..90d51163dd 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -70,48 +70,66 @@ __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^(void) { GRXBufferedPipe *strongSelf = weakSelf; - if (strongSelf) { + if (strongSelf && !strongSelf->_errorOrNil) { [strongSelf->_writeable writeValue:value]; } }); } - (void)writesFinishedWithError:(NSError *)errorOrNil { + if (_inputIsFinished) { + return; + } _inputIsFinished = YES; _errorOrNil = errorOrNil; if (errorOrNil) { // No need to write pending values. [self finishWithError:_errorOrNil]; + } else { + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^{ + GRXBufferedPipe *strongSelf = weakSelf; + if (strongSelf) { + [strongSelf finishWithError:_errorOrNil]; + } + }); } } #pragma mark GRXWriter implementation - (void)setState:(GRXWriterState)newState { - // Manual transitions are only allowed from the started or paused states. - if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { - return; - } - - switch (newState) { - case GRXWriterStateFinished: - _state = newState; - // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the - // writeable to be messaged anymore. - _writeable = nil; - return; - case GRXWriterStatePaused: - _state = newState; - dispatch_suspend(_writeQueue); + @synchronized (self) { + // Manual transitions are only allowed from the started or paused states. + if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { return; - case GRXWriterStateStarted: - if (_state == GRXWriterStatePaused) { + } + + switch (newState) { + case GRXWriterStateFinished: + if (_state == GRXWriterStatePaused) { + dispatch_resume(_writeQueue); + } _state = newState; - dispatch_resume(_writeQueue); - } - return; - case GRXWriterStateNotStarted: - return; + // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the + // writeable to be messaged anymore. + _writeable = nil; + return; + case GRXWriterStatePaused: + if (_state == GRXWriterStateStarted) { + _state = newState; + dispatch_suspend(_writeQueue); + } + return; + case GRXWriterStateStarted: + if (_state == GRXWriterStatePaused) { + _state = newState; + dispatch_resume(_writeQueue); + } + return; + case GRXWriterStateNotStarted: + return; + } } } -- cgit v1.2.3 From d6545bb3df44ee3543fa5209bb3d2da3061848cd Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 5 Jun 2017 09:22:15 -0700 Subject: Comments on BufferedPipe --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 90d51163dd..154d164f69 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -33,10 +33,13 @@ #import "GRXBufferedPipe.h" +@interface GRXBufferedPipe () +@property(atomic) NSError *errorOrNil; +@end + @implementation GRXBufferedPipe { id _writeable; BOOL _inputIsFinished; - NSError *_errorOrNil; dispatch_queue_t _writeQueue; } @@ -90,7 +93,7 @@ dispatch_async(_writeQueue, ^{ GRXBufferedPipe *strongSelf = weakSelf; if (strongSelf) { - [strongSelf finishWithError:_errorOrNil]; + [strongSelf finishWithError:nil]; } }); } @@ -123,7 +126,7 @@ return; case GRXWriterStateStarted: if (_state == GRXWriterStatePaused) { - _state = newState; + _state = newState; dispatch_resume(_writeQueue); } return; @@ -134,9 +137,6 @@ } - (void)startWithWriteable:(id)writeable { - if (_state != GRXWriterStateNotStarted) { - return; - } _state = GRXWriterStateStarted; _writeable = writeable; dispatch_resume(_writeQueue); -- cgit v1.2.3 From e09e47ccb83e701c183bee0a0ad3468a8b246be7 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 5 Jun 2017 15:13:20 -0700 Subject: Use getter/setter --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 154d164f69..eee41cc847 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -73,7 +73,7 @@ __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^(void) { GRXBufferedPipe *strongSelf = weakSelf; - if (strongSelf && !strongSelf->_errorOrNil) { + if (strongSelf && !strongSelf.errorOrNil) { [strongSelf->_writeable writeValue:value]; } }); @@ -84,7 +84,7 @@ return; } _inputIsFinished = YES; - _errorOrNil = errorOrNil; + self.errorOrNil = errorOrNil; if (errorOrNil) { // No need to write pending values. [self finishWithError:_errorOrNil]; -- cgit v1.2.3 From d7d6a2e12bb4ae0057f45b964555018245865cb2 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 6 Jun 2017 14:39:15 -0700 Subject: Better concurrency handling --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 45 +++++++++++++++++------------ src/objective-c/tests/RxLibraryUnitTests.m | 7 ++++- 2 files changed, 33 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index eee41cc847..3bb9e2a0cf 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -34,13 +34,14 @@ #import "GRXBufferedPipe.h" @interface GRXBufferedPipe () -@property(atomic) NSError *errorOrNil; +@property(atomic) id writeable; +@property(atomic) BOOL inputIsFinished; @end @implementation GRXBufferedPipe { - id _writeable; - BOOL _inputIsFinished; + NSError *_errorOrNil; dispatch_queue_t _writeQueue; + dispatch_once_t _finishQueue; } @synthesize state = _state; @@ -53,6 +54,7 @@ if (self = [super init]) { _state = GRXWriterStateNotStarted; _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + self.inputIsFinished = NO; dispatch_suspend(_writeQueue); } return self; @@ -61,7 +63,7 @@ #pragma mark GRXWriteable implementation - (void)writeValue:(id)value { - if (_inputIsFinished) { + if (self.inputIsFinished) { return; } if ([value respondsToSelector:@selector(copy)]) { @@ -73,28 +75,32 @@ __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^(void) { GRXBufferedPipe *strongSelf = weakSelf; - if (strongSelf && !strongSelf.errorOrNil) { - [strongSelf->_writeable writeValue:value]; + if (strongSelf && strongSelf.writeable) { + [strongSelf.writeable writeValue:value]; } }); } - (void)writesFinishedWithError:(NSError *)errorOrNil { - if (_inputIsFinished) { + if (self.inputIsFinished) { return; } - _inputIsFinished = YES; - self.errorOrNil = errorOrNil; + self.inputIsFinished = YES; if (errorOrNil) { // No need to write pending values. - [self finishWithError:_errorOrNil]; + dispatch_once(&_finishQueue, ^{ + _errorOrNil = errorOrNil; + [self finishWithError:_errorOrNil]; + }); } else { - __weak GRXBufferedPipe *weakSelf = self; - dispatch_async(_writeQueue, ^{ - GRXBufferedPipe *strongSelf = weakSelf; - if (strongSelf) { - [strongSelf finishWithError:nil]; - } + dispatch_once(&_finishQueue, ^{ + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^{ + GRXBufferedPipe *strongSelf = weakSelf; + if (strongSelf) { + [strongSelf finishWithError:nil]; + } + }); }); } } @@ -143,9 +149,12 @@ } - (void)finishWithError:(NSError *)errorOrNil { - id writeable = _writeable; + id writeable = self.writeable; + self.writeable = nil; self.state = GRXWriterStateFinished; - [writeable writesFinishedWithError:errorOrNil]; + dispatch_async(_writeQueue, ^{ + [writeable writesFinishedWithError:errorOrNil]; + }); } @end diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m index 770c034220..500b8a64ef 100644 --- a/src/objective-c/tests/RxLibraryUnitTests.m +++ b/src/objective-c/tests/RxLibraryUnitTests.m @@ -182,9 +182,13 @@ } - (void)testBufferedPipePropagatesError { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Response received"]; // Given: CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; - id writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + id writeable = [GRXWriteable writeableWithSingleHandler:^(id value, NSError *errorOrNil) { + handler.block(value, errorOrNil); + [expectation fulfill]; + }]; NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil]; // If: @@ -193,6 +197,7 @@ [pipe writesFinishedWithError:anyError]; // Then: + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; XCTAssertEqual(handler.timesCalled, 1); XCTAssertEqualObjects(handler.value, nil); XCTAssertEqualObjects(handler.errorOrNil, anyError); -- cgit v1.2.3 From d4792e9d8dc9cdfe0514305eb4e14a6ec4d322da Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 13 Jun 2017 10:00:53 -0700 Subject: Polish GRXBufferedPipe --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 3bb9e2a0cf..fbdaf9d338 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -41,7 +41,7 @@ @implementation GRXBufferedPipe { NSError *_errorOrNil; dispatch_queue_t _writeQueue; - dispatch_once_t _finishQueue; + dispatch_once_t _finishAction; } @synthesize state = _state; @@ -54,7 +54,6 @@ if (self = [super init]) { _state = GRXWriterStateNotStarted; _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); - self.inputIsFinished = NO; dispatch_suspend(_writeQueue); } return self; @@ -74,10 +73,7 @@ } __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^(void) { - GRXBufferedPipe *strongSelf = weakSelf; - if (strongSelf && strongSelf.writeable) { - [strongSelf.writeable writeValue:value]; - } + [weakSelf.writeable writeValue:value]; }); } @@ -86,23 +82,18 @@ return; } self.inputIsFinished = YES; - if (errorOrNil) { - // No need to write pending values. - dispatch_once(&_finishQueue, ^{ + dispatch_once(&_finishAction, ^{ + if (errorOrNil) { + // No need to write pending values. _errorOrNil = errorOrNil; [self finishWithError:_errorOrNil]; - }); - } else { - dispatch_once(&_finishQueue, ^{ + } else { __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^{ - GRXBufferedPipe *strongSelf = weakSelf; - if (strongSelf) { - [strongSelf finishWithError:nil]; - } + [weakSelf finishWithError:nil]; }); - }); - } + } + }); } #pragma mark GRXWriter implementation -- cgit v1.2.3 From 9ae2a7dc8653f1f9e7a83dfb249b4e93ea697a7c Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Sat, 17 Jun 2017 15:20:09 -0700 Subject: Remove conditionals covered by invoke agreement --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index fbdaf9d338..b69305b41f 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -35,13 +35,11 @@ @interface GRXBufferedPipe () @property(atomic) id writeable; -@property(atomic) BOOL inputIsFinished; @end @implementation GRXBufferedPipe { NSError *_errorOrNil; dispatch_queue_t _writeQueue; - dispatch_once_t _finishAction; } @synthesize state = _state; @@ -62,9 +60,6 @@ #pragma mark GRXWriteable implementation - (void)writeValue:(id)value { - if (self.inputIsFinished) { - return; - } if ([value respondsToSelector:@selector(copy)]) { // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. // So just buffer the new value. @@ -78,22 +73,16 @@ } - (void)writesFinishedWithError:(NSError *)errorOrNil { - if (self.inputIsFinished) { - return; + if (errorOrNil) { + // No need to write pending values. + _errorOrNil = errorOrNil; + [self finishWithError:_errorOrNil]; + } else { + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^{ + [weakSelf finishWithError:nil]; + }); } - self.inputIsFinished = YES; - dispatch_once(&_finishAction, ^{ - if (errorOrNil) { - // No need to write pending values. - _errorOrNil = errorOrNil; - [self finishWithError:_errorOrNil]; - } else { - __weak GRXBufferedPipe *weakSelf = self; - dispatch_async(_writeQueue, ^{ - [weakSelf finishWithError:nil]; - }); - } - }); } #pragma mark GRXWriter implementation -- cgit v1.2.3 From 08fef09ed1b96f703b205fa98f5dd712ef3059e9 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 26 Jun 2017 12:19:27 -0700 Subject: Fix minor problems --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index b69305b41f..31c1448b55 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -96,13 +96,11 @@ switch (newState) { case GRXWriterStateFinished: + self.writeable = nil; if (_state == GRXWriterStatePaused) { - dispatch_resume(_writeQueue); + _writeQueue = nil; } _state = newState; - // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the - // writeable to be messaged anymore. - _writeable = nil; return; case GRXWriterStatePaused: if (_state == GRXWriterStateStarted) { @@ -124,13 +122,12 @@ - (void)startWithWriteable:(id)writeable { _state = GRXWriterStateStarted; - _writeable = writeable; + self.writeable = writeable; dispatch_resume(_writeQueue); } - (void)finishWithError:(NSError *)errorOrNil { id writeable = self.writeable; - self.writeable = nil; self.state = GRXWriterStateFinished; dispatch_async(_writeQueue, ^{ [writeable writesFinishedWithError:errorOrNil]; -- cgit v1.2.3 From d6cee1513530ce71040db26a7202465f919b9c51 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 26 Jun 2017 17:09:09 -0700 Subject: Simply queue up writesFinishedWithError without cancelling pending messages --- src/objective-c/RxLibrary/GRXBufferedPipe.m | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 31c1448b55..ee3c3d3031 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -73,16 +73,10 @@ } - (void)writesFinishedWithError:(NSError *)errorOrNil { - if (errorOrNil) { - // No need to write pending values. - _errorOrNil = errorOrNil; - [self finishWithError:_errorOrNil]; - } else { - __weak GRXBufferedPipe *weakSelf = self; - dispatch_async(_writeQueue, ^{ - [weakSelf finishWithError:nil]; - }); - } + __weak GRXBufferedPipe *weakSelf = self; + dispatch_async(_writeQueue, ^{ + [weakSelf finishWithError:nil]; + }); } #pragma mark GRXWriter implementation @@ -98,7 +92,7 @@ case GRXWriterStateFinished: self.writeable = nil; if (_state == GRXWriterStatePaused) { - _writeQueue = nil; + dispatch_resume(_writeQueue); } _state = newState; return; @@ -109,7 +103,8 @@ } return; case GRXWriterStateStarted: - if (_state == GRXWriterStatePaused) { + if (_state == GRXWriterStatePaused || + _state == GRXWriterStateNotStarted) { _state = newState; dispatch_resume(_writeQueue); } @@ -121,17 +116,13 @@ } - (void)startWithWriteable:(id)writeable { - _state = GRXWriterStateStarted; self.writeable = writeable; - dispatch_resume(_writeQueue); + self.state = GRXWriterStateStarted; } - (void)finishWithError:(NSError *)errorOrNil { - id writeable = self.writeable; + [self.writeable writesFinishedWithError:errorOrNil]; self.state = GRXWriterStateFinished; - dispatch_async(_writeQueue, ^{ - [writeable writesFinishedWithError:errorOrNil]; - }); } @end -- cgit v1.2.3 From c05d1b41c285c8e65e305c512323b596cd4a1c16 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 26 Jun 2017 17:25:01 -0700 Subject: Some fixes for tests and change contract --- src/objective-c/RxLibrary/GRXBufferedPipe.h | 4 ++-- src/objective-c/RxLibrary/GRXBufferedPipe.m | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h index bd7d4ad691..a871ea895a 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.h +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -27,8 +27,8 @@ * immediately, unless flow control prevents it. * If it is throttled and keeps receiving values, as well as if it receives values before being * started, it will buffer them and propagate them in order as soon as its state becomes Started. - * If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and - * propagate the error immediately. + * If it receives an end of stream (via -writesFinishedWithError:), it will buffer the EOS after the + * last buffered value and issue it to the writeable after all buffered values are issued. * * Beware that a pipe of this type can't prevent receiving more values when it is paused (for * example if used to write data to a congested network connection). Because in such situations the diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 6433b62578..99cb0ad971 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -60,7 +60,7 @@ - (void)writesFinishedWithError:(NSError *)errorOrNil { __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^{ - [weakSelf finishWithError:nil]; + [weakSelf finishWithError:errorOrNil]; }); } @@ -88,8 +88,7 @@ } return; case GRXWriterStateStarted: - if (_state == GRXWriterStatePaused || - _state == GRXWriterStateNotStarted) { + if (_state == GRXWriterStatePaused) { _state = newState; dispatch_resume(_writeQueue); } @@ -102,7 +101,8 @@ - (void)startWithWriteable:(id)writeable { self.writeable = writeable; - self.state = GRXWriterStateStarted; + _state = GRXWriterStateStarted; + dispatch_resume(_writeQueue); } - (void)finishWithError:(NSError *)errorOrNil { -- cgit v1.2.3