aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
diff options
context:
space:
mode:
Diffstat (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.m')
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.m102
1 files changed, 48 insertions, 54 deletions
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
index 81ccc3fbce..115195463d 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
@@ -27,8 +27,15 @@
@implementation GRXConcurrentWriteable {
dispatch_queue_t _writeableQueue;
- // This ensures that writesFinishedWithError: is only sent once to the writeable.
+
+ // This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected
+ // by _writeableQueue.
BOOL _alreadyFinished;
+
+ // This ivar ensures that a cancelWithError: call prevents further values to be sent to
+ // self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be
+ // protected by self lock.
+ BOOL _cancelled;
}
- (instancetype)init {
@@ -41,6 +48,8 @@
if (self = [super init]) {
_writeableQueue = queue;
_writeable = writeable;
+ _alreadyFinished = NO;
+ _cancelled = NO;
}
return self;
}
@@ -51,78 +60,63 @@
- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler {
dispatch_async(_writeableQueue, ^{
- // We're racing a possible cancellation performed by another thread. To turn all already-
- // enqueued messages into noops, cancellation nillifies the writeable property. If we get it
- // before it's nil, we won the race.
- id<GRXWriteable> writeable = self.writeable;
- if (writeable) {
- [writeable writeValue:value];
- handler();
+ if (self->_alreadyFinished) {
+ return;
+ }
+
+ @synchronized(self) {
+ if (self->_cancelled) {
+ return;
+ }
}
+
+ [self.writeable writeValue:value];
+ handler();
});
}
- (void)enqueueSuccessfulCompletion {
- __weak typeof(self) weakSelf = self;
dispatch_async(_writeableQueue, ^{
- typeof(self) strongSelf = weakSelf;
- if (strongSelf) {
- BOOL finished = NO;
- @synchronized(strongSelf) {
- if (!strongSelf->_alreadyFinished) {
- strongSelf->_alreadyFinished = YES;
- } else {
- finished = YES;
- }
- }
- if (!finished) {
- // Cancellation is now impossible. None of the other three blocks can run concurrently with
- // this one.
- [strongSelf.writeable writesFinishedWithError:nil];
- // Skip any possible message to the wrapped writeable enqueued after this one.
- strongSelf.writeable = nil;
+ if (self->_alreadyFinished) {
+ return;
+ }
+ @synchronized(self) {
+ if (self->_cancelled) {
+ return;
}
}
+ [self.writeable writesFinishedWithError:nil];
+
+ // Skip any possible message to the wrapped writeable enqueued after this one.
+ self->_alreadyFinished = YES;
+ self.writeable = nil;
});
}
- (void)cancelWithError:(NSError *)error {
- NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion.");
- BOOL finished = NO;
+ NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion.");
@synchronized(self) {
- if (!_alreadyFinished) {
- _alreadyFinished = YES;
- } else {
- finished = YES;
- }
+ self->_cancelled = YES;
}
- if (!finished) {
- // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
- // nillify writeable because we might be running concurrently with the blocks in
- // _writeableQueue, and assignment with ARC isn't atomic.
- id<GRXWriteable> writeable = self.writeable;
- self.writeable = nil;
+ dispatch_async(_writeableQueue, ^{
+ if (self->_alreadyFinished) {
+ // a cancel or a successful completion is already issued
+ return;
+ }
+ [self.writeable writesFinishedWithError:error];
- dispatch_async(_writeableQueue, ^{
- [writeable writesFinishedWithError:error];
- });
- }
+ // Skip any possible message to the wrapped writeable enqueued after this one.
+ self->_alreadyFinished = YES;
+ self.writeable = nil;
+ });
}
- (void)cancelSilently {
- BOOL finished = NO;
- @synchronized(self) {
- if (!_alreadyFinished) {
- _alreadyFinished = YES;
- } else {
- finished = YES;
+ dispatch_async(_writeableQueue, ^{
+ if (self->_alreadyFinished) {
+ return;
}
- }
- if (!finished) {
- // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
- // nillify writeable because we might be running concurrently with the blocks in
- // _writeableQueue, and assignment with ARC isn't atomic.
self.writeable = nil;
- }
+ });
}
@end