From d7650a841b8acac747b5a8d12502a46ec0e4a54c Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 7 Jan 2019 16:51:08 -0800 Subject: Bug fix for GRXConcurrentWriter --- src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 36 +++++++++++++++++----- 1 file changed, 29 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index e50cdf240d..7cc2101a55 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -27,8 +27,15 @@ @implementation GRXConcurrentWriteable { dispatch_queue_t _writeableQueue; - // This ensures that writesFinishedWithError: is only sent once to the writeable. + + // This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected + // by _writeableQueue. BOOL _alreadyFinished; + + // This ivar ensures that a cancelWithError: call prevents further values to be sent to + // self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be + // protected by self lock. + BOOL _cancelled; } - (instancetype)init { @@ -42,6 +49,7 @@ _writeableQueue = queue; _writeable = writeable; _alreadyFinished = NO; + _cancelled = NO; } return self; } @@ -56,6 +64,12 @@ return; } + @synchronized (self) { + if (self->_cancelled) { + return; + } + } + [self.writeable writeValue:value]; handler(); }); @@ -63,13 +77,18 @@ - (void)enqueueSuccessfulCompletion { dispatch_async(_writeableQueue, ^{ - @synchronized(self) { - if (self->_alreadyFinished) { + if (self->_alreadyFinished) { + return; + } + @synchronized (self) { + if (self->_cancelled) { return; } } [self.writeable writesFinishedWithError:nil]; + // Skip any possible message to the wrapped writeable enqueued after this one. + self->_alreadyFinished = YES; self.writeable = nil; }); } @@ -77,14 +96,17 @@ - (void)cancelWithError:(NSError *)error { NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); @synchronized(self) { + self->_cancelled = YES; + } + dispatch_async(_writeableQueue, ^{ if (self->_alreadyFinished) { + // a cancel or a successful completion is already issued return; } - } - dispatch_async(_writeableQueue, ^{ - // If enqueueSuccessfulCompletion is already issued, self.writeable is nil and the following - // line is no-op. [self.writeable writesFinishedWithError:error]; + + // Skip any possible message to the wrapped writeable enqueued after this one. + self->_alreadyFinished = YES; self.writeable = nil; }); } -- cgit v1.2.3