diff options
Diffstat (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.m')
-rw-r--r-- | src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 102 |
1 files changed, 48 insertions, 54 deletions
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index 81ccc3fbce..115195463d 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -27,8 +27,15 @@ @implementation GRXConcurrentWriteable { dispatch_queue_t _writeableQueue; - // This ensures that writesFinishedWithError: is only sent once to the writeable. + + // This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected + // by _writeableQueue. BOOL _alreadyFinished; + + // This ivar ensures that a cancelWithError: call prevents further values to be sent to + // self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be + // protected by self lock. + BOOL _cancelled; } - (instancetype)init { @@ -41,6 +48,8 @@ if (self = [super init]) { _writeableQueue = queue; _writeable = writeable; + _alreadyFinished = NO; + _cancelled = NO; } return self; } @@ -51,78 +60,63 @@ - (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler { dispatch_async(_writeableQueue, ^{ - // We're racing a possible cancellation performed by another thread. To turn all already- - // enqueued messages into noops, cancellation nillifies the writeable property. If we get it - // before it's nil, we won the race. - id<GRXWriteable> writeable = self.writeable; - if (writeable) { - [writeable writeValue:value]; - handler(); + if (self->_alreadyFinished) { + return; + } + + @synchronized(self) { + if (self->_cancelled) { + return; + } } + + [self.writeable writeValue:value]; + handler(); }); } - (void)enqueueSuccessfulCompletion { - __weak typeof(self) weakSelf = self; dispatch_async(_writeableQueue, ^{ - typeof(self) strongSelf = weakSelf; - if (strongSelf) { - BOOL finished = NO; - @synchronized(strongSelf) { - if (!strongSelf->_alreadyFinished) { - strongSelf->_alreadyFinished = YES; - } else { - finished = YES; - } - } - if (!finished) { - // Cancellation is now impossible. None of the other three blocks can run concurrently with - // this one. - [strongSelf.writeable writesFinishedWithError:nil]; - // Skip any possible message to the wrapped writeable enqueued after this one. - strongSelf.writeable = nil; + if (self->_alreadyFinished) { + return; + } + @synchronized(self) { + if (self->_cancelled) { + return; } } + [self.writeable writesFinishedWithError:nil]; + + // Skip any possible message to the wrapped writeable enqueued after this one. + self->_alreadyFinished = YES; + self.writeable = nil; }); } - (void)cancelWithError:(NSError *)error { - NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion."); - BOOL finished = NO; + NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); @synchronized(self) { - if (!_alreadyFinished) { - _alreadyFinished = YES; - } else { - finished = YES; - } + self->_cancelled = YES; } - if (!finished) { - // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to - // nillify writeable because we might be running concurrently with the blocks in - // _writeableQueue, and assignment with ARC isn't atomic. - id<GRXWriteable> writeable = self.writeable; - self.writeable = nil; + dispatch_async(_writeableQueue, ^{ + if (self->_alreadyFinished) { + // a cancel or a successful completion is already issued + return; + } + [self.writeable writesFinishedWithError:error]; - dispatch_async(_writeableQueue, ^{ - [writeable writesFinishedWithError:error]; - }); - } + // Skip any possible message to the wrapped writeable enqueued after this one. + self->_alreadyFinished = YES; + self.writeable = nil; + }); } - (void)cancelSilently { - BOOL finished = NO; - @synchronized(self) { - if (!_alreadyFinished) { - _alreadyFinished = YES; - } else { - finished = YES; + dispatch_async(_writeableQueue, ^{ + if (self->_alreadyFinished) { + return; } - } - if (!finished) { - // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to - // nillify writeable because we might be running concurrently with the blocks in - // _writeableQueue, and assignment with ARC isn't atomic. self.writeable = nil; - } + }); } @end |