aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m72
-rw-r--r--src/objective-c/tests/RxLibraryUnitTests.m1
2 files changed, 24 insertions, 49 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index 763f107678..9dfe332eef 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -35,7 +35,6 @@
@implementation GRXBufferedPipe {
id<GRXWriteable> _writeable;
- NSMutableArray *_queue;
BOOL _inputIsFinished;
NSError *_errorOrNil;
dispatch_queue_t _writeQueue;
@@ -49,63 +48,30 @@
- (instancetype)init {
if (self = [super init]) {
- _queue = [NSMutableArray array];
_state = GRXWriterStateNotStarted;
_writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ dispatch_suspend(_writeQueue);
}
return self;
}
-- (id)popValue {
- id value = _queue[0];
- [_queue removeObjectAtIndex:0];
- return value;
-}
-
-- (void)writeBufferUntilPausedOrStopped {
- dispatch_async(_writeQueue, ^(void) {
- while (_queue.count > 0) {
- BOOL started;
- @synchronized (self) {
- started = (_state == GRXWriterStateStarted);
- }
- if (started) {
- [_writeable writeValue:[self popValue]];
- } else {
- break;
- }
- }
- if (_inputIsFinished && _queue.count == 0) {
- // Our writer finished normally while we were paused or not-started-yet.
- [self finishWithError:_errorOrNil];
- }
- });
-}
-
#pragma mark GRXWriteable implementation
-// Returns whether events can be simply propagated to the other end of the pipe.
-- (BOOL)shouldFastForward {
- BOOL started;
- @synchronized (self) {
- started = (_state == GRXWriterStateStarted);
- }
- return _state == started && _queue.count == 0;
-}
-
- (void)writeValue:(id)value {
+ if (_inputIsFinished) {
+ return;
+ }
if ([value respondsToSelector:@selector(copy)]) {
+ // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
+ // So just buffer the new value.
+ // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
value = [value copy];
}
+ __weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^(void) {
- if (self.shouldFastForward) {
- // Skip the queue.
- [_writeable writeValue:value];
- } else {
- // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
- // So just buffer the new value.
- // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
- [_queue addObject:value];
+ GRXBufferedPipe *strongSelf = weakSelf;
+ if (strongSelf) {
+ [strongSelf->_writeable writeValue:value];
}
});
}
@@ -113,9 +79,14 @@
- (void)writesFinishedWithError:(NSError *)errorOrNil {
_inputIsFinished = YES;
_errorOrNil = errorOrNil;
- if (errorOrNil || self.shouldFastForward) {
+ if (errorOrNil) {
// No need to write pending values.
[self finishWithError:_errorOrNil];
+ } else {
+ // Wait until all the pending writes to be finished.
+ dispatch_sync(_writeQueue, ^{
+ return;
+ });
}
}
@@ -130,18 +101,18 @@
switch (newState) {
case GRXWriterStateFinished:
_state = newState;
- _queue = nil;
// Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
// writeable to be messaged anymore.
_writeable = nil;
return;
case GRXWriterStatePaused:
_state = newState;
+ dispatch_suspend(_writeQueue);
return;
case GRXWriterStateStarted:
if (_state == GRXWriterStatePaused) {
_state = newState;
- [self writeBufferUntilPausedOrStopped];
+ dispatch_resume(_writeQueue);
}
return;
case GRXWriterStateNotStarted:
@@ -150,9 +121,12 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ if (_state != GRXWriterStateNotStarted) {
+ return;
+ }
_state = GRXWriterStateStarted;
_writeable = writeable;
- [self writeBufferUntilPausedOrStopped];
+ dispatch_resume(_writeQueue);
}
- (void)finishWithError:(NSError *)errorOrNil {
diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m
index 62fbdfcdf6..feade14eea 100644
--- a/src/objective-c/tests/RxLibraryUnitTests.m
+++ b/src/objective-c/tests/RxLibraryUnitTests.m
@@ -164,6 +164,7 @@
GRXBufferedPipe *pipe = [GRXBufferedPipe pipe];
[pipe startWithWriteable:writeable];
[pipe writeValue:anyValue];
+ [pipe writesFinishedWithError:nil];
// Then:
XCTAssertEqual(handler.timesCalled, 1);