diff options
author | Muxi Yan <mxyan@google.com> | 2018-12-21 13:39:21 -0800 |
---|---|---|
committer | Muxi Yan <mxyan@google.com> | 2019-01-07 10:27:20 -0800 |
commit | 5e10a3b037bcd20ab17428ccb765ba9464eb3644 (patch) | |
tree | 187a4a277055a8d969cabb77d605477599895731 /src/objective-c/RxLibrary/GRXConcurrentWriteable.m | |
parent | 4ff3543465e0a8d26a5bdf934f1dc09d0703970b (diff) |
Make gRPC ObjC thread safety right
Diffstat (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.m')
-rw-r--r-- | src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 76 |
1 files changed, 21 insertions, 55 deletions
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index 81ccc3fbce..229d592f48 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -41,6 +41,7 @@ if (self = [super init]) { _writeableQueue = queue; _writeable = writeable; + _alreadyFinished = NO; } return self; } @@ -51,78 +52,43 @@ - (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler { dispatch_async(_writeableQueue, ^{ - // We're racing a possible cancellation performed by another thread. To turn all already- - // enqueued messages into noops, cancellation nillifies the writeable property. If we get it - // before it's nil, we won the race. - id<GRXWriteable> writeable = self.writeable; - if (writeable) { - [writeable writeValue:value]; - handler(); + if (self->_alreadyFinished) { + return; } + + [self.writeable writeValue:value]; + handler(); }); } - (void)enqueueSuccessfulCompletion { - __weak typeof(self) weakSelf = self; dispatch_async(_writeableQueue, ^{ - typeof(self) strongSelf = weakSelf; - if (strongSelf) { - BOOL finished = NO; - @synchronized(strongSelf) { - if (!strongSelf->_alreadyFinished) { - strongSelf->_alreadyFinished = YES; - } else { - finished = YES; - } - } - if (!finished) { - // Cancellation is now impossible. None of the other three blocks can run concurrently with - // this one. - [strongSelf.writeable writesFinishedWithError:nil]; - // Skip any possible message to the wrapped writeable enqueued after this one. - strongSelf.writeable = nil; - } + if (self->_alreadyFinished) { + return; } + [self.writeable writesFinishedWithError:nil]; + // Skip any possible message to the wrapped writeable enqueued after this one. + self.writeable = nil; }); } - (void)cancelWithError:(NSError *)error { - NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion."); - BOOL finished = NO; - @synchronized(self) { - if (!_alreadyFinished) { - _alreadyFinished = YES; - } else { - finished = YES; + NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); + dispatch_async(_writeableQueue, ^{ + if (self->_alreadyFinished) { + return; } - } - if (!finished) { - // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to - // nillify writeable because we might be running concurrently with the blocks in - // _writeableQueue, and assignment with ARC isn't atomic. - id<GRXWriteable> writeable = self.writeable; + [self.writeable writesFinishedWithError:error]; self.writeable = nil; - - dispatch_async(_writeableQueue, ^{ - [writeable writesFinishedWithError:error]; - }); - } + }); } - (void)cancelSilently { - BOOL finished = NO; - @synchronized(self) { - if (!_alreadyFinished) { - _alreadyFinished = YES; - } else { - finished = YES; + dispatch_async(_writeableQueue, ^{ + if (self->_alreadyFinished) { + return; } - } - if (!finished) { - // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to - // nillify writeable because we might be running concurrently with the blocks in - // _writeableQueue, and assignment with ARC isn't atomic. self.writeable = nil; - } + }); } @end |