aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/RxLibrary/GRXBufferedPipe.m
diff options
context:
space:
mode:
Diffstat (limited to 'src/objective-c/RxLibrary/GRXBufferedPipe.m')
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m112
1 files changed, 47 insertions, 65 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index e4a7cc40f9..99cb0ad971 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -18,11 +18,13 @@
#import "GRXBufferedPipe.h"
+@interface GRXBufferedPipe ()
+@property(atomic) id<GRXWriteable> writeable;
+@end
+
@implementation GRXBufferedPipe {
- id<GRXWriteable> _writeable;
- NSMutableArray *_queue;
- BOOL _inputIsFinished;
NSError *_errorOrNil;
+ dispatch_queue_t _writeQueue;
}
@synthesize state = _state;
@@ -33,99 +35,79 @@
- (instancetype)init {
if (self = [super init]) {
- _queue = [NSMutableArray array];
_state = GRXWriterStateNotStarted;
+ _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ dispatch_suspend(_writeQueue);
}
return self;
}
-- (id)popValue {
- id value = _queue[0];
- [_queue removeObjectAtIndex:0];
- return value;
-}
-
-- (void)writeBufferUntilPausedOrStopped {
- while (_state == GRXWriterStateStarted && _queue.count > 0) {
- [_writeable writeValue:[self popValue]];
- }
- if (_inputIsFinished && _queue.count == 0) {
- // Our writer finished normally while we were paused or not-started-yet.
- [self finishWithError:_errorOrNil];
- }
-}
-
#pragma mark GRXWriteable implementation
-// Returns whether events can be simply propagated to the other end of the pipe.
-- (BOOL)shouldFastForward {
- return _state == GRXWriterStateStarted && _queue.count == 0;
-}
-
- (void)writeValue:(id)value {
- if (self.shouldFastForward) {
- // Skip the queue.
- [_writeable writeValue:value];
- } else {
+ if ([value respondsToSelector:@selector(copy)]) {
// Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
// So just buffer the new value.
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
- if ([value respondsToSelector:@selector(copy)]) {
- value = [value copy];
- }
- [_queue addObject:value];
+ value = [value copy];
}
+ __weak GRXBufferedPipe *weakSelf = self;
+ dispatch_async(_writeQueue, ^(void) {
+ [weakSelf.writeable writeValue:value];
+ });
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _inputIsFinished = YES;
- _errorOrNil = errorOrNil;
- if (errorOrNil || self.shouldFastForward) {
- // No need to write pending values.
- [self finishWithError:_errorOrNil];
- }
+ __weak GRXBufferedPipe *weakSelf = self;
+ dispatch_async(_writeQueue, ^{
+ [weakSelf finishWithError:errorOrNil];
+ });
}
#pragma mark GRXWriter implementation
- (void)setState:(GRXWriterState)newState {
- // Manual transitions are only allowed from the started or paused states.
- if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
- return;
- }
-
- switch (newState) {
- case GRXWriterStateFinished:
- _state = newState;
- _queue = nil;
- // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
- // writeable to be messaged anymore.
- _writeable = nil;
- return;
- case GRXWriterStatePaused:
- _state = newState;
+ @synchronized (self) {
+ // Manual transitions are only allowed from the started or paused states.
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
return;
- case GRXWriterStateStarted:
- if (_state == GRXWriterStatePaused) {
+ }
+
+ switch (newState) {
+ case GRXWriterStateFinished:
+ self.writeable = nil;
+ if (_state == GRXWriterStatePaused) {
+ dispatch_resume(_writeQueue);
+ }
_state = newState;
- [self writeBufferUntilPausedOrStopped];
- }
- return;
- case GRXWriterStateNotStarted:
- return;
+ return;
+ case GRXWriterStatePaused:
+ if (_state == GRXWriterStateStarted) {
+ _state = newState;
+ dispatch_suspend(_writeQueue);
+ }
+ return;
+ case GRXWriterStateStarted:
+ if (_state == GRXWriterStatePaused) {
+ _state = newState;
+ dispatch_resume(_writeQueue);
+ }
+ return;
+ case GRXWriterStateNotStarted:
+ return;
+ }
}
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ self.writeable = writeable;
_state = GRXWriterStateStarted;
- _writeable = writeable;
- [self writeBufferUntilPausedOrStopped];
+ dispatch_resume(_writeQueue);
}
- (void)finishWithError:(NSError *)errorOrNil {
- id<GRXWriteable> writeable = _writeable;
+ [self.writeable writesFinishedWithError:errorOrNil];
self.state = GRXWriterStateFinished;
- [writeable writesFinishedWithError:errorOrNil];
}
@end