diff options
Diffstat (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.m')
-rw-r--r-- | src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 38 |
1 files changed, 31 insertions, 7 deletions
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index cb5d0a63f0..bbfe491783 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -28,7 +28,7 @@ @implementation GRXConcurrentWriteable { dispatch_queue_t _writeableQueue; // This ensures that writesFinishedWithError: is only sent once to the writeable. - dispatch_once_t _alreadyFinished; + BOOL _alreadyFinished; } - (instancetype)init { @@ -65,19 +65,35 @@ - (void)enqueueSuccessfulCompletion { dispatch_async(_writeableQueue, ^{ - dispatch_once(&_alreadyFinished, ^{ + BOOL finished = NO; + @synchronized (self) { + if (!_alreadyFinished) { + _alreadyFinished = YES; + } else { + finished = YES; + } + } + if (!finished) { // Cancellation is now impossible. None of the other three blocks can run concurrently with // this one. [self.writeable writesFinishedWithError:nil]; // Skip any possible message to the wrapped writeable enqueued after this one. self.writeable = nil; - }); + } }); } - (void)cancelWithError:(NSError *)error { NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion."); - dispatch_once(&_alreadyFinished, ^{ + BOOL finished = NO; + @synchronized (self) { + if (!_alreadyFinished) { + _alreadyFinished = YES; + } else { + finished = YES; + } + } + if (!finished) { // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to // nillify writeable because we might be running concurrently with the blocks in // _writeableQueue, and assignment with ARC isn't atomic. @@ -87,15 +103,23 @@ dispatch_async(_writeableQueue, ^{ [writeable writesFinishedWithError:error]; }); - }); + } } - (void)cancelSilently { - dispatch_once(&_alreadyFinished, ^{ + BOOL finished = NO; + @synchronized (self) { + if (!_alreadyFinished) { + _alreadyFinished = YES; + } else { + finished = YES; + } + } + if (!finished) { // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to // nillify writeable because we might be running concurrently with the blocks in // _writeableQueue, and assignment with ARC isn't atomic. self.writeable = nil; - }); + } } @end |