From 5e10a3b037bcd20ab17428ccb765ba9464eb3644 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 21 Dec 2018 13:39:21 -0800 Subject: Make gRPC ObjC thread safety right --- src/objective-c/RxLibrary/GRXConcurrentWriteable.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.h') diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h index abb831e6fb..606f5dea95 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h @@ -23,9 +23,9 @@ /** * This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a - * GRXWriteable instance for the main thread, guaranteeing that writesFinishedWithError: is the last + * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is the last * message sent to it (no matter what messages are sent to the wrapper, in what order, nor from - * which thread). It also guarantees that, if cancelWithError: is called from the main thread (e.g. + * which thread). It also guarantees that, if cancelWithError: is called (e.g. * by the app cancelling the writes), no further messages are sent to the writeable except * writesFinishedWithError:. * -- cgit v1.2.3 From 03232ba46fba25465341b5e9632344f102f9df83 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 21 Dec 2018 16:21:18 -0800 Subject: Polish comments and correct concurrent writeable behavior --- src/objective-c/RxLibrary/GRXBufferedPipe.h | 3 +-- src/objective-c/RxLibrary/GRXConcurrentWriteable.h | 13 +++++++------ src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 18 +++++++++++++----- src/objective-c/RxLibrary/GRXForwardingWriter.h | 6 +----- src/objective-c/RxLibrary/GRXImmediateSingleWriter.h | 2 ++ 5 files changed, 24 insertions(+), 18 deletions(-) (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.h') diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h index a871ea895a..ae08cc315b 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.h +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -36,8 +36,7 @@ * crash. If you want to react to flow control signals to prevent that, instead of using this class * you can implement an object that conforms to GRXWriter. * - * Thread-safety: - * The methods of an object of this class should not be called concurrently from different threads. + * Thread-safety: the methods of this class are thread-safe. */ @interface GRXBufferedPipe : GRXWriter diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h index 606f5dea95..55468b3c07 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h @@ -43,21 +43,22 @@ - (instancetype)initWithWriteable:(id)writeable; /** - * Enqueues writeValue: to be sent to the writeable in the main thread. - * The passed handler is invoked from the main thread after writeValue: returns. + * Enqueues writeValue: to be sent to the writeable from the designated dispatch queue. + * The passed handler is invoked from designated dispatch queue after writeValue: returns. */ - (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler; /** - * Enqueues writesFinishedWithError:nil to be sent to the writeable in the main thread. After that - * message is sent to the writeable, all other methods of this object are effectively noops. + * Enqueues writesFinishedWithError:nil to be sent to the writeable in the designated dispatch + * queue. After that message is sent to the writeable, all other methods of this object are + * effectively noops. */ - (void)enqueueSuccessfulCompletion; /** * If the writeable has not yet received a writesFinishedWithError: message, this will enqueue one - * to be sent to it in the main thread, and cancel all other pending messages to the writeable - * enqueued by this object (both past and future). + * to be sent to it in the designated dispatch queue, and cancel all other pending messages to the + * writeable enqueued by this object (both past and future). * The error argument cannot be nil. */ - (void)cancelWithError:(NSError *)error; diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index 229d592f48..d9d0e8c31e 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -63,8 +63,10 @@ - (void)enqueueSuccessfulCompletion { dispatch_async(_writeableQueue, ^{ - if (self->_alreadyFinished) { - return; + @synchronized (self) { + if (self->_alreadyFinished) { + return; + } } [self.writeable writesFinishedWithError:nil]; // Skip any possible message to the wrapped writeable enqueued after this one. @@ -74,10 +76,14 @@ - (void)cancelWithError:(NSError *)error { NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); - dispatch_async(_writeableQueue, ^{ + @synchronized (self) { if (self->_alreadyFinished) { return; } + } + dispatch_async(_writeableQueue, ^{ + // If enqueueSuccessfulCompletion is already issued, self.writeable is nil and the following + // line is no-op. [self.writeable writesFinishedWithError:error]; self.writeable = nil; }); @@ -85,8 +91,10 @@ - (void)cancelSilently { dispatch_async(_writeableQueue, ^{ - if (self->_alreadyFinished) { - return; + @synchronized (self) { + if (self->_alreadyFinished) { + return; + } } self.writeable = nil; }); diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.h b/src/objective-c/RxLibrary/GRXForwardingWriter.h index 3814ff8831..00366b6416 100644 --- a/src/objective-c/RxLibrary/GRXForwardingWriter.h +++ b/src/objective-c/RxLibrary/GRXForwardingWriter.h @@ -25,11 +25,7 @@ * input writer, and for classes that represent objects with input and * output sequences of values, like an RPC. * - * Thread-safety: - * All messages sent to this object need to be serialized. When it is started, the writer it wraps - * is started in the same thread. Manual state changes are propagated to the wrapped writer in the - * same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be - * serialized with any message sent to this object. + * Thread-safety: the methods of this class are thread safe. */ @interface GRXForwardingWriter : GRXWriter - (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER; diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h index 601abdc6b9..2fa38b3dce 100644 --- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h +++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h @@ -23,6 +23,8 @@ /** * Utility to construct GRXWriter instances from values that are immediately available when * required. + * + * Thread safety: the methods of this class are thread safe. */ @interface GRXImmediateSingleWriter : GRXImmediateWriter -- cgit v1.2.3 From 3cdc0db838626ab1d186a4980125bc3e6219c0e0 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 2 Jan 2019 15:59:19 -0800 Subject: clang-format --- src/objective-c/GRPCClient/GRPCCall.m | 77 +++++++++++----------- src/objective-c/RxLibrary/GRXBufferedPipe.m | 4 +- src/objective-c/RxLibrary/GRXConcurrentWriteable.h | 8 +-- src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 6 +- src/objective-c/RxLibrary/GRXForwardingWriter.m | 12 ++-- .../RxLibrary/GRXImmediateSingleWriter.m | 4 +- 6 files changed, 56 insertions(+), 55 deletions(-) (limited to 'src/objective-c/RxLibrary/GRXConcurrentWriteable.h') diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 5ea1755c58..c18dfae635 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -448,7 +448,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; return; } NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; - @synchronized (callFlags) { + @synchronized(callFlags) { switch (callSafety) { case GRPCCallSafetyDefault: callFlags[hostAndPath] = @0; @@ -468,7 +468,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; + (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path { NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; uint32_t flags = 0; - @synchronized (callFlags) { + @synchronized(callFlags) { flags = [callFlags[hostAndPath] intValue]; } return flags; @@ -529,7 +529,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)setResponseDispatchQueue:(dispatch_queue_t)queue { - @synchronized (self) { + @synchronized(self) { if (_state != GRXWriterStateNotStarted) { return; } @@ -562,14 +562,14 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)cancel { - @synchronized (self) { + @synchronized(self) { if (_state == GRXWriterStateFinished) { return; } [self finishWithError:[NSError - errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeCancelled - userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; + errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeCancelled + userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; [_wrappedCall cancel]; } } @@ -636,19 +636,19 @@ const char *kCFStreamVarName = "grpc_cfstream"; // don't want to throw, because the app shouldn't crash for a behavior // that's on the hands of any server to have. Instead we finish and ask // the server to cancel. - @synchronized (strongSelf) { + @synchronized(strongSelf) { [strongSelf - finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeResourceExhausted - userInfo:@{ - NSLocalizedDescriptionKey : - @"Client does not have enough memory to " - @"hold the server response." - }]]; + finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeResourceExhausted + userInfo:@{ + NSLocalizedDescriptionKey : + @"Client does not have enough memory to " + @"hold the server response." + }]]; [strongSelf->_wrappedCall cancel]; } } else { - @synchronized (strongSelf) { + @synchronized(strongSelf) { [strongSelf->_responseWriteable enqueueValue:data completionHandler:^{ [strongSelf startNextRead]; @@ -689,9 +689,10 @@ const char *kCFStreamVarName = "grpc_cfstream"; } // TODO(jcanizales): Add error handlers for async failures - GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] initWithMetadata:headers - flags:callSafetyFlags - handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA + GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] + initWithMetadata:headers + flags:callSafetyFlags + handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA dispatch_async(_callQueue, ^{ if (!self->_unaryCall) { [self->_wrappedCall startBatchWithOperations:@[ op ]]; @@ -731,7 +732,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; - (void)writeValue:(id)value { NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData"); - @synchronized (self) { + @synchronized(self) { if (_state == GRXWriterStateFinished) { return; } @@ -782,9 +783,9 @@ const char *kCFStreamVarName = "grpc_cfstream"; dispatch_async(_callQueue, ^{ // TODO(jcanizales): Add error handlers for async failures [self->_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; + startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; [self->_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; + startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; }); } @@ -825,16 +826,16 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Lock acquired inside startWithWriteable: - (void)startCallWithWriteable:(id)writeable { - @synchronized (self) { + @synchronized(self) { if (_state == GRXWriterStateFinished) { return; } _responseWriteable = - [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; + [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; GRPCPooledChannel *channel = - [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; + [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; _wrappedCall = [channel wrappedCallWithPath:_path completionQueue:[GRPCCompletionQueue completionQueue] callOptions:_callOptions]; @@ -843,9 +844,9 @@ const char *kCFStreamVarName = "grpc_cfstream"; [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeUnavailable userInfo:@{ - NSLocalizedDescriptionKey : - @"Failed to create call or channel." - }]]; + NSLocalizedDescriptionKey : + @"Failed to create call or channel." + }]]; return; } @@ -869,10 +870,10 @@ const char *kCFStreamVarName = "grpc_cfstream"; _state = GRXWriterStateStarted; // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). - // This makes RPCs in which the call isn't externally retained possible (as long as it is started - // before being autoreleased). - // Care is taken not to retain self strongly in any of the blocks used in this implementation, so - // that the life of the instance is determined by this retain cycle. + // This makes RPCs in which the call isn't externally retained possible (as long as it is + // started before being autoreleased). Care is taken not to retain self strongly in any of the + // blocks used in this implementation, so that the life of the instance is determined by this + // retain cycle. _retainSelf = self; if (_callOptions == nil) { @@ -961,14 +962,14 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Retain because connectivity manager only keeps weak reference to GRPCCall. __strong GRPCCall *strongSelf = self; if (strongSelf) { - @synchronized (strongSelf) { + @synchronized(strongSelf) { [_wrappedCall cancel]; [strongSelf - finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeUnavailable - userInfo:@{ - NSLocalizedDescriptionKey : @"Connectivity lost." - }]]; + finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ + NSLocalizedDescriptionKey : @"Connectivity lost." + }]]; } } } diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index d0064a5cfa..74e2f03da6 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -52,7 +52,7 @@ value = [value copy]; } dispatch_async(_writeQueue, ^(void) { - @synchronized (self) { + @synchronized(self) { if (self->_state == GRXWriterStateFinished) { return; } @@ -106,7 +106,7 @@ } - (void)startWithWriteable:(id)writeable { - @synchronized (self) { + @synchronized(self) { self.writeable = writeable; _state = GRXWriterStateStarted; } diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h index 55468b3c07..5beca9d41e 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h @@ -23,10 +23,10 @@ /** * This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a - * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is the last - * message sent to it (no matter what messages are sent to the wrapper, in what order, nor from - * which thread). It also guarantees that, if cancelWithError: is called (e.g. - * by the app cancelling the writes), no further messages are sent to the writeable except + * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is + * the last message sent to it (no matter what messages are sent to the wrapper, in what order, nor + * from which thread). It also guarantees that, if cancelWithError: is called (e.g. by the app + * cancelling the writes), no further messages are sent to the writeable except * writesFinishedWithError:. * * TODO(jcanizales): Let the user specify another queue for the writeable callbacks. diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index d9d0e8c31e..e50cdf240d 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -63,7 +63,7 @@ - (void)enqueueSuccessfulCompletion { dispatch_async(_writeableQueue, ^{ - @synchronized (self) { + @synchronized(self) { if (self->_alreadyFinished) { return; } @@ -76,7 +76,7 @@ - (void)cancelWithError:(NSError *)error { NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); - @synchronized (self) { + @synchronized(self) { if (self->_alreadyFinished) { return; } @@ -91,7 +91,7 @@ - (void)cancelSilently { dispatch_async(_writeableQueue, ^{ - @synchronized (self) { + @synchronized(self) { if (self->_alreadyFinished) { return; } diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m index 376c196b4f..f5ed603698 100644 --- a/src/objective-c/RxLibrary/GRXForwardingWriter.m +++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m @@ -57,13 +57,13 @@ #pragma mark GRXWriteable implementation - (void)writeValue:(id)value { - @synchronized (self) { + @synchronized(self) { [_writeable writeValue:value]; } } - (void)writesFinishedWithError:(NSError *)errorOrNil { - @synchronized (self) { + @synchronized(self) { _writer = nil; [self finishOutputWithError:errorOrNil]; } @@ -78,14 +78,14 @@ - (void)setState:(GRXWriterState)state { GRXWriter *copiedWriter = nil; if (state == GRXWriterStateFinished) { - @synchronized (self) { + @synchronized(self) { _writeable = nil; copiedWriter = _writer; _writer = nil; } copiedWriter.state = GRXWriterStateFinished; } else { - @synchronized (self) { + @synchronized(self) { copiedWriter = _writer; } copiedWriter.state = state; @@ -94,7 +94,7 @@ - (void)startWithWriteable:(id)writeable { GRXWriter *copiedWriter = nil; - @synchronized (self) { + @synchronized(self) { _writeable = writeable; copiedWriter = _writer; } @@ -103,7 +103,7 @@ - (void)finishWithError:(NSError *)errorOrNil { GRXWriter *copiedWriter = nil; - @synchronized (self) { + @synchronized(self) { [self finishOutputWithError:errorOrNil]; copiedWriter = _writer; } diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m index eadad6c3b6..079c11b94f 100644 --- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m +++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m @@ -38,7 +38,7 @@ - (void)startWithWriteable:(id)writeable { id copiedValue = nil; - @synchronized (self) { + @synchronized(self) { if (_state != GRXWriterStateNotStarted) { return; } @@ -63,7 +63,7 @@ // the original \a map function returns a new Writer of another type. So we // need to override this function here. - (GRXWriter *)map:(id (^)(id))map { - @synchronized (self) { + @synchronized(self) { // Since _value is available when creating the object, we can simply // apply the map and store the output. _value = map(_value); -- cgit v1.2.3