From 5e10a3b037bcd20ab17428ccb765ba9464eb3644 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 21 Dec 2018 13:39:21 -0800 Subject: Make gRPC ObjC thread safety right --- src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 76 ++++++---------------- 1 file changed, 21 insertions(+), 55 deletions(-) (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.m') diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index 81ccc3fbce..229d592f48 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -41,6 +41,7 @@ if (self = [super init]) { _writeableQueue = queue; _writeable = writeable; + _alreadyFinished = NO; } return self; } @@ -51,78 +52,43 @@ - (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler { dispatch_async(_writeableQueue, ^{ - // We're racing a possible cancellation performed by another thread. To turn all already- - // enqueued messages into noops, cancellation nillifies the writeable property. If we get it - // before it's nil, we won the race. - id writeable = self.writeable; - if (writeable) { - [writeable writeValue:value]; - handler(); + if (self->_alreadyFinished) { + return; } + + [self.writeable writeValue:value]; + handler(); }); } - (void)enqueueSuccessfulCompletion { - __weak typeof(self) weakSelf = self; dispatch_async(_writeableQueue, ^{ - typeof(self) strongSelf = weakSelf; - if (strongSelf) { - BOOL finished = NO; - @synchronized(strongSelf) { - if (!strongSelf->_alreadyFinished) { - strongSelf->_alreadyFinished = YES; - } else { - finished = YES; - } - } - if (!finished) { - // Cancellation is now impossible. None of the other three blocks can run concurrently with - // this one. - [strongSelf.writeable writesFinishedWithError:nil]; - // Skip any possible message to the wrapped writeable enqueued after this one. - strongSelf.writeable = nil; - } + if (self->_alreadyFinished) { + return; } + [self.writeable writesFinishedWithError:nil]; + // Skip any possible message to the wrapped writeable enqueued after this one. + self.writeable = nil; }); } - (void)cancelWithError:(NSError *)error { - NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion."); - BOOL finished = NO; - @synchronized(self) { - if (!_alreadyFinished) { - _alreadyFinished = YES; - } else { - finished = YES; + NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); + dispatch_async(_writeableQueue, ^{ + if (self->_alreadyFinished) { + return; } - } - if (!finished) { - // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to - // nillify writeable because we might be running concurrently with the blocks in - // _writeableQueue, and assignment with ARC isn't atomic. - id writeable = self.writeable; + [self.writeable writesFinishedWithError:error]; self.writeable = nil; - - dispatch_async(_writeableQueue, ^{ - [writeable writesFinishedWithError:error]; - }); - } + }); } - (void)cancelSilently { - BOOL finished = NO; - @synchronized(self) { - if (!_alreadyFinished) { - _alreadyFinished = YES; - } else { - finished = YES; + dispatch_async(_writeableQueue, ^{ + if (self->_alreadyFinished) { + return; } - } - if (!finished) { - // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to - // nillify writeable because we might be running concurrently with the blocks in - // _writeableQueue, and assignment with ARC isn't atomic. self.writeable = nil; - } + }); } @end -- cgit v1.2.3 From 03232ba46fba25465341b5e9632344f102f9df83 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 21 Dec 2018 16:21:18 -0800 Subject: Polish comments and correct concurrent writeable behavior --- src/objective-c/RxLibrary/GRXBufferedPipe.h | 3 +-- src/objective-c/RxLibrary/GRXConcurrentWriteable.h | 13 +++++++------ src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 18 +++++++++++++----- src/objective-c/RxLibrary/GRXForwardingWriter.h | 6 +----- src/objective-c/RxLibrary/GRXImmediateSingleWriter.h | 2 ++ 5 files changed, 24 insertions(+), 18 deletions(-) (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.m') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h index a871ea895a..ae08cc315b 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.h +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -36,8 +36,7 @@ * crash. If you want to react to flow control signals to prevent that, instead of using this class * you can implement an object that conforms to GRXWriter. * - * Thread-safety: - * The methods of an object of this class should not be called concurrently from different threads. + * Thread-safety: the methods of this class are thread-safe. */ @interface GRXBufferedPipe : GRXWriter diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h index 606f5dea95..55468b3c07 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h @@ -43,21 +43,22 @@ - (instancetype)initWithWriteable:(id)writeable; /** - * Enqueues writeValue: to be sent to the writeable in the main thread. - * The passed handler is invoked from the main thread after writeValue: returns. + * Enqueues writeValue: to be sent to the writeable from the designated dispatch queue. + * The passed handler is invoked from designated dispatch queue after writeValue: returns. */ - (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler; /** - * Enqueues writesFinishedWithError:nil to be sent to the writeable in the main thread. After that - * message is sent to the writeable, all other methods of this object are effectively noops. + * Enqueues writesFinishedWithError:nil to be sent to the writeable in the designated dispatch + * queue. After that message is sent to the writeable, all other methods of this object are + * effectively noops. */ - (void)enqueueSuccessfulCompletion; /** * If the writeable has not yet received a writesFinishedWithError: message, this will enqueue one - * to be sent to it in the main thread, and cancel all other pending messages to the writeable - * enqueued by this object (both past and future). + * to be sent to it in the designated dispatch queue, and cancel all other pending messages to the + * writeable enqueued by this object (both past and future). * The error argument cannot be nil. */ - (void)cancelWithError:(NSError *)error; diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index 229d592f48..d9d0e8c31e 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -63,8 +63,10 @@ - (void)enqueueSuccessfulCompletion { dispatch_async(_writeableQueue, ^{ - if (self->_alreadyFinished) { - return; + @synchronized (self) { + if (self->_alreadyFinished) { + return; + } } [self.writeable writesFinishedWithError:nil]; // Skip any possible message to the wrapped writeable enqueued after this one. @@ -74,10 +76,14 @@ - (void)cancelWithError:(NSError *)error { NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); - dispatch_async(_writeableQueue, ^{ + @synchronized (self) { if (self->_alreadyFinished) { return; } + } + dispatch_async(_writeableQueue, ^{ + // If enqueueSuccessfulCompletion is already issued, self.writeable is nil and the following + // line is no-op. [self.writeable writesFinishedWithError:error]; self.writeable = nil; }); @@ -85,8 +91,10 @@ - (void)cancelSilently { dispatch_async(_writeableQueue, ^{ - if (self->_alreadyFinished) { - return; + @synchronized (self) { + if (self->_alreadyFinished) { + return; + } } self.writeable = nil; }); diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.h b/src/objective-c/RxLibrary/GRXForwardingWriter.h index 3814ff8831..00366b6416 100644 --- a/src/objective-c/RxLibrary/GRXForwardingWriter.h +++ b/src/objective-c/RxLibrary/GRXForwardingWriter.h @@ -25,11 +25,7 @@ * input writer, and for classes that represent objects with input and * output sequences of values, like an RPC. * - * Thread-safety: - * All messages sent to this object need to be serialized. When it is started, the writer it wraps - * is started in the same thread. Manual state changes are propagated to the wrapped writer in the - * same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be - * serialized with any message sent to this object. + * Thread-safety: the methods of this class are thread safe. */ @interface GRXForwardingWriter : GRXWriter - (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER; diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h index 601abdc6b9..2fa38b3dce 100644 --- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h +++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h @@ -23,6 +23,8 @@ /** * Utility to construct GRXWriter instances from values that are immediately available when * required. + * + * Thread safety: the methods of this class are thread safe. */ @interface GRXImmediateSingleWriter : GRXImmediateWriter -- cgit v1.2.3 From 3cdc0db838626ab1d186a4980125bc3e6219c0e0 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 2 Jan 2019 15:59:19 -0800 Subject: clang-format --- src/objective-c/GRPCClient/GRPCCall.m | 77 +++++++++++----------- src/objective-c/RxLibrary/GRXBufferedPipe.m | 4 +- src/objective-c/RxLibrary/GRXConcurrentWriteable.h | 8 +-- src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 6 +- src/objective-c/RxLibrary/GRXForwardingWriter.m | 12 ++-- .../RxLibrary/GRXImmediateSingleWriter.m | 4 +- 6 files changed, 56 insertions(+), 55 deletions(-) (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.m') diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 5ea1755c58..c18dfae635 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -448,7 +448,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; return; } NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; - @synchronized (callFlags) { + @synchronized(callFlags) { switch (callSafety) { case GRPCCallSafetyDefault: callFlags[hostAndPath] = @0; @@ -468,7 +468,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; + (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path { NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; uint32_t flags = 0; - @synchronized (callFlags) { + @synchronized(callFlags) { flags = [callFlags[hostAndPath] intValue]; } return flags; @@ -529,7 +529,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)setResponseDispatchQueue:(dispatch_queue_t)queue { - @synchronized (self) { + @synchronized(self) { if (_state != GRXWriterStateNotStarted) { return; } @@ -562,14 +562,14 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)cancel { - @synchronized (self) { + @synchronized(self) { if (_state == GRXWriterStateFinished) { return; } [self finishWithError:[NSError - errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeCancelled - userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; + errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeCancelled + userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; [_wrappedCall cancel]; } } @@ -636,19 +636,19 @@ const char *kCFStreamVarName = "grpc_cfstream"; // don't want to throw, because the app shouldn't crash for a behavior // that's on the hands of any server to have. Instead we finish and ask // the server to cancel. - @synchronized (strongSelf) { + @synchronized(strongSelf) { [strongSelf - finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeResourceExhausted - userInfo:@{ - NSLocalizedDescriptionKey : - @"Client does not have enough memory to " - @"hold the server response." - }]]; + finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeResourceExhausted + userInfo:@{ + NSLocalizedDescriptionKey : + @"Client does not have enough memory to " + @"hold the server response." + }]]; [strongSelf->_wrappedCall cancel]; } } else { - @synchronized (strongSelf) { + @synchronized(strongSelf) { [strongSelf->_responseWriteable enqueueValue:data completionHandler:^{ [strongSelf startNextRead]; @@ -689,9 +689,10 @@ const char *kCFStreamVarName = "grpc_cfstream"; } // TODO(jcanizales): Add error handlers for async failures - GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] initWithMetadata:headers - flags:callSafetyFlags - handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA + GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] + initWithMetadata:headers + flags:callSafetyFlags + handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA dispatch_async(_callQueue, ^{ if (!self->_unaryCall) { [self->_wrappedCall startBatchWithOperations:@[ op ]]; @@ -731,7 +732,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; - (void)writeValue:(id)value { NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData"); - @synchronized (self) { + @synchronized(self) { if (_state == GRXWriterStateFinished) { return; } @@ -782,9 +783,9 @@ const char *kCFStreamVarName = "grpc_cfstream"; dispatch_async(_callQueue, ^{ // TODO(jcanizales): Add error handlers for async failures [self->_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; + startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; [self->_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; + startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; }); } @@ -825,16 +826,16 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Lock acquired inside startWithWriteable: - (void)startCallWithWriteable:(id)writeable { - @synchronized (self) { + @synchronized(self) { if (_state == GRXWriterStateFinished) { return; } _responseWriteable = - [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; + [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; GRPCPooledChannel *channel = - [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; + [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; _wrappedCall = [channel wrappedCallWithPath:_path completionQueue:[GRPCCompletionQueue completionQueue] callOptions:_callOptions]; @@ -843,9 +844,9 @@ const char *kCFStreamVarName = "grpc_cfstream"; [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeUnavailable userInfo:@{ - NSLocalizedDescriptionKey : - @"Failed to create call or channel." - }]]; + NSLocalizedDescriptionKey : + @"Failed to create call or channel." + }]]; return; } @@ -869,10 +870,10 @@ const char *kCFStreamVarName = "grpc_cfstream"; _state = GRXWriterStateStarted; // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). - // This makes RPCs in which the call isn't externally retained possible (as long as it is started - // before being autoreleased). - // Care is taken not to retain self strongly in any of the blocks used in this implementation, so - // that the life of the instance is determined by this retain cycle. + // This makes RPCs in which the call isn't externally retained possible (as long as it is + // started before being autoreleased). Care is taken not to retain self strongly in any of the + // blocks used in this implementation, so that the life of the instance is determined by this + // retain cycle. _retainSelf = self; if (_callOptions == nil) { @@ -961,14 +962,14 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Retain because connectivity manager only keeps weak reference to GRPCCall. __strong GRPCCall *strongSelf = self; if (strongSelf) { - @synchronized (strongSelf) { + @synchronized(strongSelf) { [_wrappedCall cancel]; [strongSelf - finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeUnavailable - userInfo:@{ - NSLocalizedDescriptionKey : @"Connectivity lost." - }]]; + finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ + NSLocalizedDescriptionKey : @"Connectivity lost." + }]]; } } } diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index d0064a5cfa..74e2f03da6 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -52,7 +52,7 @@ value = [value copy]; } dispatch_async(_writeQueue, ^(void) { - @synchronized (self) { + @synchronized(self) { if (self->_state == GRXWriterStateFinished) { return; } @@ -106,7 +106,7 @@ } - (void)startWithWriteable:(id)writeable { - @synchronized (self) { + @synchronized(self) { self.writeable = writeable; _state = GRXWriterStateStarted; } diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h index 55468b3c07..5beca9d41e 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h @@ -23,10 +23,10 @@ /** * This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a - * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is the last - * message sent to it (no matter what messages are sent to the wrapper, in what order, nor from - * which thread). It also guarantees that, if cancelWithError: is called (e.g. - * by the app cancelling the writes), no further messages are sent to the writeable except + * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is + * the last message sent to it (no matter what messages are sent to the wrapper, in what order, nor + * from which thread). It also guarantees that, if cancelWithError: is called (e.g. by the app + * cancelling the writes), no further messages are sent to the writeable except * writesFinishedWithError:. * * TODO(jcanizales): Let the user specify another queue for the writeable callbacks. diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index d9d0e8c31e..e50cdf240d 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -63,7 +63,7 @@ - (void)enqueueSuccessfulCompletion { dispatch_async(_writeableQueue, ^{ - @synchronized (self) { + @synchronized(self) { if (self->_alreadyFinished) { return; } @@ -76,7 +76,7 @@ - (void)cancelWithError:(NSError *)error { NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); - @synchronized (self) { + @synchronized(self) { if (self->_alreadyFinished) { return; } @@ -91,7 +91,7 @@ - (void)cancelSilently { dispatch_async(_writeableQueue, ^{ - @synchronized (self) { + @synchronized(self) { if (self->_alreadyFinished) { return; } diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m index 376c196b4f..f5ed603698 100644 --- a/src/objective-c/RxLibrary/GRXForwardingWriter.m +++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m @@ -57,13 +57,13 @@ #pragma mark GRXWriteable implementation - (void)writeValue:(id)value { - @synchronized (self) { + @synchronized(self) { [_writeable writeValue:value]; } } - (void)writesFinishedWithError:(NSError *)errorOrNil { - @synchronized (self) { + @synchronized(self) { _writer = nil; [self finishOutputWithError:errorOrNil]; } @@ -78,14 +78,14 @@ - (void)setState:(GRXWriterState)state { GRXWriter *copiedWriter = nil; if (state == GRXWriterStateFinished) { - @synchronized (self) { + @synchronized(self) { _writeable = nil; copiedWriter = _writer; _writer = nil; } copiedWriter.state = GRXWriterStateFinished; } else { - @synchronized (self) { + @synchronized(self) { copiedWriter = _writer; } copiedWriter.state = state; @@ -94,7 +94,7 @@ - (void)startWithWriteable:(id)writeable { GRXWriter *copiedWriter = nil; - @synchronized (self) { + @synchronized(self) { _writeable = writeable; copiedWriter = _writer; } @@ -103,7 +103,7 @@ - (void)finishWithError:(NSError *)errorOrNil { GRXWriter *copiedWriter = nil; - @synchronized (self) { + @synchronized(self) { [self finishOutputWithError:errorOrNil]; copiedWriter = _writer; } diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m index eadad6c3b6..079c11b94f 100644 --- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m +++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m @@ -38,7 +38,7 @@ - (void)startWithWriteable:(id)writeable { id copiedValue = nil; - @synchronized (self) { + @synchronized(self) { if (_state != GRXWriterStateNotStarted) { return; } @@ -63,7 +63,7 @@ // the original \a map function returns a new Writer of another type. So we // need to override this function here. - (GRXWriter *)map:(id (^)(id))map { - @synchronized (self) { + @synchronized(self) { // Since _value is available when creating the object, we can simply // apply the map and store the output. _value = map(_value); -- cgit v1.2.3 From d7650a841b8acac747b5a8d12502a46ec0e4a54c Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 7 Jan 2019 16:51:08 -0800 Subject: Bug fix for GRXConcurrentWriter --- src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 36 +++++++++++++++++----- 1 file changed, 29 insertions(+), 7 deletions(-) (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.m') diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index e50cdf240d..7cc2101a55 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -27,8 +27,15 @@ @implementation GRXConcurrentWriteable { dispatch_queue_t _writeableQueue; - // This ensures that writesFinishedWithError: is only sent once to the writeable. + + // This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected + // by _writeableQueue. BOOL _alreadyFinished; + + // This ivar ensures that a cancelWithError: call prevents further values to be sent to + // self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be + // protected by self lock. + BOOL _cancelled; } - (instancetype)init { @@ -42,6 +49,7 @@ _writeableQueue = queue; _writeable = writeable; _alreadyFinished = NO; + _cancelled = NO; } return self; } @@ -56,6 +64,12 @@ return; } + @synchronized (self) { + if (self->_cancelled) { + return; + } + } + [self.writeable writeValue:value]; handler(); }); @@ -63,13 +77,18 @@ - (void)enqueueSuccessfulCompletion { dispatch_async(_writeableQueue, ^{ - @synchronized(self) { - if (self->_alreadyFinished) { + if (self->_alreadyFinished) { + return; + } + @synchronized (self) { + if (self->_cancelled) { return; } } [self.writeable writesFinishedWithError:nil]; + // Skip any possible message to the wrapped writeable enqueued after this one. + self->_alreadyFinished = YES; self.writeable = nil; }); } @@ -77,14 +96,17 @@ - (void)cancelWithError:(NSError *)error { NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); @synchronized(self) { + self->_cancelled = YES; + } + dispatch_async(_writeableQueue, ^{ if (self->_alreadyFinished) { + // a cancel or a successful completion is already issued return; } - } - dispatch_async(_writeableQueue, ^{ - // If enqueueSuccessfulCompletion is already issued, self.writeable is nil and the following - // line is no-op. [self.writeable writesFinishedWithError:error]; + + // Skip any possible message to the wrapped writeable enqueued after this one. + self->_alreadyFinished = YES; self.writeable = nil; }); } -- cgit v1.2.3 From 8b55c6a8714e7c4422adc8e7e105588397b707ba Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 7 Jan 2019 16:54:37 -0800 Subject: clang-format --- src/objective-c/GRPCClient/GRPCCall.m | 6 +++--- src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.m') diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 714c7dbbc2..20703f548e 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -832,9 +832,9 @@ const char *kCFStreamVarName = "grpc_cfstream"; [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeUnavailable userInfo:@{ - NSLocalizedDescriptionKey : - @"Failed to create call or channel." - }]]; + NSLocalizedDescriptionKey : + @"Failed to create call or channel." + }]]; return; } diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index 7cc2101a55..d8491d2aed 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -64,10 +64,10 @@ return; } - @synchronized (self) { - if (self->_cancelled) { - return; - } + @synchronized(self) { + if (self->_cancelled) { + return; + } } [self.writeable writeValue:value]; @@ -80,7 +80,7 @@ if (self->_alreadyFinished) { return; } - @synchronized (self) { + @synchronized(self) { if (self->_cancelled) { return; } -- cgit v1.2.3 From d4fa274bf2994276341e47920aa46e7a009b4d10 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 9 Jan 2019 09:38:47 -0800 Subject: address comments --- src/objective-c/GRPCClient/GRPCCall.m | 4 ---- src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 6 ++---- src/objective-c/RxLibrary/GRXWriter.h | 2 +- 3 files changed, 3 insertions(+), 9 deletions(-) (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.m') diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index bc48f1aa1b..c0d10cacc1 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -425,9 +425,6 @@ const char *kCFStreamVarName = "grpc_cfstream"; // queue dispatch_queue_t _responseQueue; - // Whether the call is finished. If it is, should not call finishWithError again. - BOOL _finished; - // The OAuth2 token fetched from a token provider. NSString *_fetchedOauth2AccessToken; } @@ -750,7 +747,6 @@ const char *kCFStreamVarName = "grpc_cfstream"; [self cancel]; } else { dispatch_async(_callQueue, ^{ - // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT [self finishRequestWithErrorHandler:nil]; }); diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index d8491d2aed..115195463d 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -113,10 +113,8 @@ - (void)cancelSilently { dispatch_async(_writeableQueue, ^{ - @synchronized(self) { - if (self->_alreadyFinished) { - return; - } + if (self->_alreadyFinished) { + return; } self.writeable = nil; }); diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h index ac1f7b9c4c..df4c80c28d 100644 --- a/src/objective-c/RxLibrary/GRXWriter.h +++ b/src/objective-c/RxLibrary/GRXWriter.h @@ -82,7 +82,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) { * the corresponding value, and that's useful for advanced use cases like pausing an writer. For * more details, see the documentation of the enum further down. The property is thread safe. */ -@property(atomic) GRXWriterState state; +@property GRXWriterState state; /** * Transition to the Started state, and start sending messages to the writeable (a reference to it -- cgit v1.2.3