From 5e10a3b037bcd20ab17428ccb765ba9464eb3644 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Fri, 21 Dec 2018 13:39:21 -0800 Subject: Make gRPC ObjC thread safety right --- src/objective-c/GRPCClient/GRPCCall.m | 360 ++++++++++++++++++---------------- 1 file changed, 191 insertions(+), 169 deletions(-) (limited to 'src/objective-c/GRPCClient/GRPCCall.m') diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 83c6edc6e3..5ea1755c58 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -448,24 +448,30 @@ const char *kCFStreamVarName = "grpc_cfstream"; return; } NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; - switch (callSafety) { - case GRPCCallSafetyDefault: - callFlags[hostAndPath] = @0; - break; - case GRPCCallSafetyIdempotentRequest: - callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; - break; - case GRPCCallSafetyCacheableRequest: - callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; - break; - default: - break; + @synchronized (callFlags) { + switch (callSafety) { + case GRPCCallSafetyDefault: + callFlags[hostAndPath] = @0; + break; + case GRPCCallSafetyIdempotentRequest: + callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; + break; + case GRPCCallSafetyCacheableRequest: + callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; + break; + default: + break; + } } } + (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path { NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; - return [callFlags[hostAndPath] intValue]; + uint32_t flags = 0; + @synchronized (callFlags) { + flags = [callFlags[hostAndPath] intValue]; + } + return flags; } // Designated initializer @@ -506,7 +512,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; _callOptions = [callOptions copy]; // Serial queue to invoke the non-reentrant methods of the grpc_call object. - _callQueue = dispatch_queue_create("io.grpc.call", NULL); + _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL); _requestWriter = requestWriter; @@ -523,53 +529,49 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)setResponseDispatchQueue:(dispatch_queue_t)queue { - if (_state != GRXWriterStateNotStarted) { - return; + @synchronized (self) { + if (_state != GRXWriterStateNotStarted) { + return; + } + _responseQueue = queue; } - _responseQueue = queue; } #pragma mark Finish - (void)finishWithError:(NSError *)errorOrNil { + GRXConcurrentWriteable *copiedResponseWriteable = nil; + @synchronized(self) { + if (_state == GRXWriterStateFinished) { + return; + } _state = GRXWriterStateFinished; - } + copiedResponseWriteable = _responseWriteable; - // If there were still request messages coming, stop them. - @synchronized(_requestWriter) { - _requestWriter.state = GRXWriterStateFinished; + // If the call isn't retained anywhere else, it can be deallocated now. + _retainSelf = nil; } if (errorOrNil) { - [_responseWriteable cancelWithError:errorOrNil]; + [copiedResponseWriteable cancelWithError:errorOrNil]; } else { - [_responseWriteable enqueueSuccessfulCompletion]; - } - - [GRPCConnectivityMonitor unregisterObserver:self]; - - // If the call isn't retained anywhere else, it can be deallocated now. - _retainSelf = nil; -} - -- (void)cancelCall { - // Can be called from any thread, any number of times. - @synchronized(self) { - [_wrappedCall cancel]; + [copiedResponseWriteable enqueueSuccessfulCompletion]; } + _requestWriter.state = GRXWriterStateFinished; } - (void)cancel { - @synchronized(self) { - [self cancelCall]; - self.isWaitingForToken = NO; + @synchronized (self) { + if (_state == GRXWriterStateFinished) { + return; + } + [self finishWithError:[NSError + errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeCancelled + userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; + [_wrappedCall cancel]; } - [self - maybeFinishWithError:[NSError - errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeCancelled - userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; } - (void)maybeFinishWithError:(NSError *)errorOrNil { @@ -609,21 +611,24 @@ const char *kCFStreamVarName = "grpc_cfstream"; // TODO(jcanizales): Rename to readResponseIfNotPaused. - (void)startNextRead { @synchronized(self) { - if (self.state == GRXWriterStatePaused) { + if (_state != GRXWriterStateStarted) { return; } } dispatch_async(_callQueue, ^{ __weak GRPCCall *weakSelf = self; - __weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable; [self startReadWithHandler:^(grpc_byte_buffer *message) { - __strong GRPCCall *strongSelf = weakSelf; - __strong GRXConcurrentWriteable *strongWriteable = weakWriteable; + NSLog(@"message received"); if (message == NULL) { // No more messages from the server return; } + __strong GRPCCall *strongSelf = weakSelf; + if (strongSelf == nil) { + grpc_byte_buffer_destroy(message); + return; + } NSData *data = [NSData grpc_dataWithByteBuffer:message]; grpc_byte_buffer_destroy(message); if (!data) { @@ -631,21 +636,25 @@ const char *kCFStreamVarName = "grpc_cfstream"; // don't want to throw, because the app shouldn't crash for a behavior // that's on the hands of any server to have. Instead we finish and ask // the server to cancel. - [strongSelf cancelCall]; - [strongSelf - maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeResourceExhausted - userInfo:@{ - NSLocalizedDescriptionKey : - @"Client does not have enough memory to " - @"hold the server response." - }]]; - return; + @synchronized (strongSelf) { + [strongSelf + finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeResourceExhausted + userInfo:@{ + NSLocalizedDescriptionKey : + @"Client does not have enough memory to " + @"hold the server response." + }]]; + [strongSelf->_wrappedCall cancel]; + } + } else { + @synchronized (strongSelf) { + [strongSelf->_responseWriteable enqueueValue:data + completionHandler:^{ + [strongSelf startNextRead]; + }]; + } } - [strongWriteable enqueueValue:data - completionHandler:^{ - [strongSelf startNextRead]; - }]; }]; }); } @@ -680,15 +689,16 @@ const char *kCFStreamVarName = "grpc_cfstream"; } // TODO(jcanizales): Add error handlers for async failures - GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] - initWithMetadata:headers - flags:callSafetyFlags - handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA - if (!_unaryCall) { - [_wrappedCall startBatchWithOperations:@[ op ]]; - } else { - [_unaryOpBatch addObject:op]; - } + GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] initWithMetadata:headers + flags:callSafetyFlags + handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA + dispatch_async(_callQueue, ^{ + if (!self->_unaryCall) { + [self->_wrappedCall startBatchWithOperations:@[ op ]]; + } else { + [self->_unaryOpBatch addObject:op]; + } + }); } #pragma mark GRXWriteable implementation @@ -703,9 +713,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Resume the request writer. GRPCCall *strongSelf = weakSelf; if (strongSelf) { - @synchronized(strongSelf->_requestWriter) { - strongSelf->_requestWriter.state = GRXWriterStateStarted; - } + strongSelf->_requestWriter.state = GRXWriterStateStarted; } }; @@ -721,13 +729,17 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)writeValue:(id)value { - // TODO(jcanizales): Throw/assert if value isn't NSData. + NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData"); + + @synchronized (self) { + if (_state == GRXWriterStateFinished) { + return; + } + } // Pause the input and only resume it when the C layer notifies us that writes // can proceed. - @synchronized(_requestWriter) { - _requestWriter.state = GRXWriterStatePaused; - } + _requestWriter.state = GRXWriterStatePaused; dispatch_async(_callQueue, ^{ // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT @@ -752,6 +764,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; [self cancel]; } else { dispatch_async(_callQueue, ^{ + // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT [self finishRequestWithErrorHandler:nil]; }); @@ -766,17 +779,20 @@ const char *kCFStreamVarName = "grpc_cfstream"; // The second one (completionHandler), whenever the RPC finishes for any reason. - (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler { - // TODO(jcanizales): Add error handlers for async failures - [_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; - [_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; + dispatch_async(_callQueue, ^{ + // TODO(jcanizales): Add error handlers for async failures + [self->_wrappedCall + startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; + [self->_wrappedCall + startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; + }); } - (void)invokeCall { __weak GRPCCall *weakSelf = self; [self invokeCallWithHeadersHandler:^(NSDictionary *headers) { // Response headers received. + NSLog(@"response received"); __strong GRPCCall *strongSelf = weakSelf; if (strongSelf) { strongSelf.responseHeaders = headers; @@ -784,6 +800,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; } } completionHandler:^(NSError *error, NSDictionary *trailers) { + NSLog(@"completion received"); __strong GRPCCall *strongSelf = weakSelf; if (strongSelf) { strongSelf.responseTrailers = trailers; @@ -794,112 +811,113 @@ const char *kCFStreamVarName = "grpc_cfstream"; [userInfo addEntriesFromDictionary:error.userInfo]; } userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers; - // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be - // called before this one, so an error might end up with trailers but no headers. We - // shouldn't call finishWithError until ater both blocks are called. It is also when - // this is done that we can provide a merged view of response headers and trailers in a - // thread-safe way. - if (strongSelf.responseHeaders) { - userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; - } + // Since gRPC core does not guarantee the headers block being called before this block, + // responseHeaders might be nil. + userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; } - [strongSelf maybeFinishWithError:error]; + [strongSelf finishWithError:error]; } }]; - // Now that the RPC has been initiated, request writes can start. - @synchronized(_requestWriter) { - [_requestWriter startWithWriteable:self]; - } } #pragma mark GRXWriter implementation +// Lock acquired inside startWithWriteable: - (void)startCallWithWriteable:(id)writeable { - _responseWriteable = - [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; - - GRPCPooledChannel *channel = - [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; - GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:_path - completionQueue:[GRPCCompletionQueue completionQueue] - callOptions:_callOptions]; - - if (wrappedCall == nil) { - [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeUnavailable - userInfo:@{ - NSLocalizedDescriptionKey : - @"Failed to create call or channel." - }]]; - return; - } + @synchronized (self) { + if (_state == GRXWriterStateFinished) { + return; + } - @synchronized(self) { - _wrappedCall = wrappedCall; - } + _responseWriteable = + [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; + + GRPCPooledChannel *channel = + [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; + _wrappedCall = [channel wrappedCallWithPath:_path + completionQueue:[GRPCCompletionQueue completionQueue] + callOptions:_callOptions]; - [self sendHeaders]; - [self invokeCall]; + if (_wrappedCall == nil) { + [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ + NSLocalizedDescriptionKey : + @"Failed to create call or channel." + }]]; + return; + } - // Connectivity monitor is not required for CFStream - char *enableCFStream = getenv(kCFStreamVarName); - if (enableCFStream == nil || enableCFStream[0] != '1') { - [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)]; + [self sendHeaders]; + [self invokeCall]; + + // Connectivity monitor is not required for CFStream + char *enableCFStream = getenv(kCFStreamVarName); + if (enableCFStream == nil || enableCFStream[0] != '1') { + [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)]; + } } + + // Now that the RPC has been initiated, request writes can start. + [_requestWriter startWithWriteable:self]; } - (void)startWithWriteable:(id)writeable { + id tokenProvider = nil; @synchronized(self) { _state = GRXWriterStateStarted; - } - // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). - // This makes RPCs in which the call isn't externally retained possible (as long as it is started - // before being autoreleased). - // Care is taken not to retain self strongly in any of the blocks used in this implementation, so - // that the life of the instance is determined by this retain cycle. - _retainSelf = self; + // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). + // This makes RPCs in which the call isn't externally retained possible (as long as it is started + // before being autoreleased). + // Care is taken not to retain self strongly in any of the blocks used in this implementation, so + // that the life of the instance is determined by this retain cycle. + _retainSelf = self; + + if (_callOptions == nil) { + GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy]; + if (_serverName.length != 0) { + callOptions.serverAuthority = _serverName; + } + if (_timeout > 0) { + callOptions.timeout = _timeout; + } + uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path]; + if (callFlags != 0) { + if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { + _callSafety = GRPCCallSafetyIdempotentRequest; + } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) { + _callSafety = GRPCCallSafetyCacheableRequest; + } + } - if (_callOptions == nil) { - GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy]; - if (_serverName.length != 0) { - callOptions.serverAuthority = _serverName; - } - if (_timeout > 0) { - callOptions.timeout = _timeout; - } - uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path]; - if (callFlags != 0) { - if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { - _callSafety = GRPCCallSafetyIdempotentRequest; - } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) { - _callSafety = GRPCCallSafetyCacheableRequest; + id tokenProvider = self.tokenProvider; + if (tokenProvider != nil) { + callOptions.authTokenProvider = tokenProvider; } + _callOptions = callOptions; } - id tokenProvider = self.tokenProvider; - if (tokenProvider != nil) { - callOptions.authTokenProvider = tokenProvider; - } - _callOptions = callOptions; + NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil, + @"authTokenProvider and oauth2AccessToken cannot be set at the same time"); + + tokenProvider = _callOptions.authTokenProvider; } - NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil, - @"authTokenProvider and oauth2AccessToken cannot be set at the same time"); - if (_callOptions.authTokenProvider != nil) { - @synchronized(self) { - self.isWaitingForToken = YES; - } - [_callOptions.authTokenProvider getTokenWithHandler:^(NSString *token) { - @synchronized(self) { - if (self.isWaitingForToken) { - if (token) { - self->_fetchedOauth2AccessToken = [token copy]; + if (tokenProvider != nil) { + __weak typeof(self) weakSelf = self; + [tokenProvider getTokenWithHandler:^(NSString *token) { + __strong typeof(self) strongSelf = weakSelf; + if (strongSelf) { + @synchronized(strongSelf) { + if (strongSelf->_state == GRXWriterStateNotStarted) { + if (token) { + strongSelf->_fetchedOauth2AccessToken = [token copy]; + } } - [self startCallWithWriteable:writeable]; - self.isWaitingForToken = NO; } + [strongSelf startCallWithWriteable:writeable]; } }]; } else { @@ -938,16 +956,20 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)connectivityChanged:(NSNotification *)note { - // Cancel underlying call upon this notification + // Cancel underlying call upon this notification. + + // Retain because connectivity manager only keeps weak reference to GRPCCall. __strong GRPCCall *strongSelf = self; if (strongSelf) { - [self cancelCall]; - [self - maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeUnavailable - userInfo:@{ - NSLocalizedDescriptionKey : @"Connectivity lost." - }]]; + @synchronized (strongSelf) { + [_wrappedCall cancel]; + [strongSelf + finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ + NSLocalizedDescriptionKey : @"Connectivity lost." + }]]; + } } } -- cgit v1.2.3 From 3cdc0db838626ab1d186a4980125bc3e6219c0e0 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 2 Jan 2019 15:59:19 -0800 Subject: clang-format --- src/objective-c/GRPCClient/GRPCCall.m | 77 +++++++++++----------- src/objective-c/RxLibrary/GRXBufferedPipe.m | 4 +- src/objective-c/RxLibrary/GRXConcurrentWriteable.h | 8 +-- src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 6 +- src/objective-c/RxLibrary/GRXForwardingWriter.m | 12 ++-- .../RxLibrary/GRXImmediateSingleWriter.m | 4 +- 6 files changed, 56 insertions(+), 55 deletions(-) (limited to 'src/objective-c/GRPCClient/GRPCCall.m') diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 5ea1755c58..c18dfae635 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -448,7 +448,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; return; } NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; - @synchronized (callFlags) { + @synchronized(callFlags) { switch (callSafety) { case GRPCCallSafetyDefault: callFlags[hostAndPath] = @0; @@ -468,7 +468,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; + (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path { NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; uint32_t flags = 0; - @synchronized (callFlags) { + @synchronized(callFlags) { flags = [callFlags[hostAndPath] intValue]; } return flags; @@ -529,7 +529,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)setResponseDispatchQueue:(dispatch_queue_t)queue { - @synchronized (self) { + @synchronized(self) { if (_state != GRXWriterStateNotStarted) { return; } @@ -562,14 +562,14 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)cancel { - @synchronized (self) { + @synchronized(self) { if (_state == GRXWriterStateFinished) { return; } [self finishWithError:[NSError - errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeCancelled - userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; + errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeCancelled + userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; [_wrappedCall cancel]; } } @@ -636,19 +636,19 @@ const char *kCFStreamVarName = "grpc_cfstream"; // don't want to throw, because the app shouldn't crash for a behavior // that's on the hands of any server to have. Instead we finish and ask // the server to cancel. - @synchronized (strongSelf) { + @synchronized(strongSelf) { [strongSelf - finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeResourceExhausted - userInfo:@{ - NSLocalizedDescriptionKey : - @"Client does not have enough memory to " - @"hold the server response." - }]]; + finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeResourceExhausted + userInfo:@{ + NSLocalizedDescriptionKey : + @"Client does not have enough memory to " + @"hold the server response." + }]]; [strongSelf->_wrappedCall cancel]; } } else { - @synchronized (strongSelf) { + @synchronized(strongSelf) { [strongSelf->_responseWriteable enqueueValue:data completionHandler:^{ [strongSelf startNextRead]; @@ -689,9 +689,10 @@ const char *kCFStreamVarName = "grpc_cfstream"; } // TODO(jcanizales): Add error handlers for async failures - GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] initWithMetadata:headers - flags:callSafetyFlags - handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA + GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] + initWithMetadata:headers + flags:callSafetyFlags + handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA dispatch_async(_callQueue, ^{ if (!self->_unaryCall) { [self->_wrappedCall startBatchWithOperations:@[ op ]]; @@ -731,7 +732,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; - (void)writeValue:(id)value { NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData"); - @synchronized (self) { + @synchronized(self) { if (_state == GRXWriterStateFinished) { return; } @@ -782,9 +783,9 @@ const char *kCFStreamVarName = "grpc_cfstream"; dispatch_async(_callQueue, ^{ // TODO(jcanizales): Add error handlers for async failures [self->_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; + startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; [self->_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; + startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; }); } @@ -825,16 +826,16 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Lock acquired inside startWithWriteable: - (void)startCallWithWriteable:(id)writeable { - @synchronized (self) { + @synchronized(self) { if (_state == GRXWriterStateFinished) { return; } _responseWriteable = - [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; + [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; GRPCPooledChannel *channel = - [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; + [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; _wrappedCall = [channel wrappedCallWithPath:_path completionQueue:[GRPCCompletionQueue completionQueue] callOptions:_callOptions]; @@ -843,9 +844,9 @@ const char *kCFStreamVarName = "grpc_cfstream"; [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeUnavailable userInfo:@{ - NSLocalizedDescriptionKey : - @"Failed to create call or channel." - }]]; + NSLocalizedDescriptionKey : + @"Failed to create call or channel." + }]]; return; } @@ -869,10 +870,10 @@ const char *kCFStreamVarName = "grpc_cfstream"; _state = GRXWriterStateStarted; // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). - // This makes RPCs in which the call isn't externally retained possible (as long as it is started - // before being autoreleased). - // Care is taken not to retain self strongly in any of the blocks used in this implementation, so - // that the life of the instance is determined by this retain cycle. + // This makes RPCs in which the call isn't externally retained possible (as long as it is + // started before being autoreleased). Care is taken not to retain self strongly in any of the + // blocks used in this implementation, so that the life of the instance is determined by this + // retain cycle. _retainSelf = self; if (_callOptions == nil) { @@ -961,14 +962,14 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Retain because connectivity manager only keeps weak reference to GRPCCall. __strong GRPCCall *strongSelf = self; if (strongSelf) { - @synchronized (strongSelf) { + @synchronized(strongSelf) { [_wrappedCall cancel]; [strongSelf - finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeUnavailable - userInfo:@{ - NSLocalizedDescriptionKey : @"Connectivity lost." - }]]; + finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ + NSLocalizedDescriptionKey : @"Connectivity lost." + }]]; } } } diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index d0064a5cfa..74e2f03da6 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -52,7 +52,7 @@ value = [value copy]; } dispatch_async(_writeQueue, ^(void) { - @synchronized (self) { + @synchronized(self) { if (self->_state == GRXWriterStateFinished) { return; } @@ -106,7 +106,7 @@ } - (void)startWithWriteable:(id)writeable { - @synchronized (self) { + @synchronized(self) { self.writeable = writeable; _state = GRXWriterStateStarted; } diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h index 55468b3c07..5beca9d41e 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h @@ -23,10 +23,10 @@ /** * This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a - * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is the last - * message sent to it (no matter what messages are sent to the wrapper, in what order, nor from - * which thread). It also guarantees that, if cancelWithError: is called (e.g. - * by the app cancelling the writes), no further messages are sent to the writeable except + * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is + * the last message sent to it (no matter what messages are sent to the wrapper, in what order, nor + * from which thread). It also guarantees that, if cancelWithError: is called (e.g. by the app + * cancelling the writes), no further messages are sent to the writeable except * writesFinishedWithError:. * * TODO(jcanizales): Let the user specify another queue for the writeable callbacks. diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index d9d0e8c31e..e50cdf240d 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -63,7 +63,7 @@ - (void)enqueueSuccessfulCompletion { dispatch_async(_writeableQueue, ^{ - @synchronized (self) { + @synchronized(self) { if (self->_alreadyFinished) { return; } @@ -76,7 +76,7 @@ - (void)cancelWithError:(NSError *)error { NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); - @synchronized (self) { + @synchronized(self) { if (self->_alreadyFinished) { return; } @@ -91,7 +91,7 @@ - (void)cancelSilently { dispatch_async(_writeableQueue, ^{ - @synchronized (self) { + @synchronized(self) { if (self->_alreadyFinished) { return; } diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m index 376c196b4f..f5ed603698 100644 --- a/src/objective-c/RxLibrary/GRXForwardingWriter.m +++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m @@ -57,13 +57,13 @@ #pragma mark GRXWriteable implementation - (void)writeValue:(id)value { - @synchronized (self) { + @synchronized(self) { [_writeable writeValue:value]; } } - (void)writesFinishedWithError:(NSError *)errorOrNil { - @synchronized (self) { + @synchronized(self) { _writer = nil; [self finishOutputWithError:errorOrNil]; } @@ -78,14 +78,14 @@ - (void)setState:(GRXWriterState)state { GRXWriter *copiedWriter = nil; if (state == GRXWriterStateFinished) { - @synchronized (self) { + @synchronized(self) { _writeable = nil; copiedWriter = _writer; _writer = nil; } copiedWriter.state = GRXWriterStateFinished; } else { - @synchronized (self) { + @synchronized(self) { copiedWriter = _writer; } copiedWriter.state = state; @@ -94,7 +94,7 @@ - (void)startWithWriteable:(id)writeable { GRXWriter *copiedWriter = nil; - @synchronized (self) { + @synchronized(self) { _writeable = writeable; copiedWriter = _writer; } @@ -103,7 +103,7 @@ - (void)finishWithError:(NSError *)errorOrNil { GRXWriter *copiedWriter = nil; - @synchronized (self) { + @synchronized(self) { [self finishOutputWithError:errorOrNil]; copiedWriter = _writer; } diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m index eadad6c3b6..079c11b94f 100644 --- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m +++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m @@ -38,7 +38,7 @@ - (void)startWithWriteable:(id)writeable { id copiedValue = nil; - @synchronized (self) { + @synchronized(self) { if (_state != GRXWriterStateNotStarted) { return; } @@ -63,7 +63,7 @@ // the original \a map function returns a new Writer of another type. So we // need to override this function here. - (GRXWriter *)map:(id (^)(id))map { - @synchronized (self) { + @synchronized(self) { // Since _value is available when creating the object, we can simply // apply the map and store the output. _value = map(_value); -- cgit v1.2.3 From cf303c3ee7d4b4364650ca6fdecbaafdca117497 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 7 Jan 2019 16:51:23 -0800 Subject: Deadlock fix for GRPCCall --- src/objective-c/GRPCClient/GRPCCall.m | 47 ++++++++++++++--------------------- 1 file changed, 18 insertions(+), 29 deletions(-) (limited to 'src/objective-c/GRPCClient/GRPCCall.m') diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index c18dfae635..714c7dbbc2 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -539,26 +539,24 @@ const char *kCFStreamVarName = "grpc_cfstream"; #pragma mark Finish +// This function should support being called within a @synchronized(self) block in another function +// Should not manipulate _requestWriter for deadlock prevention. - (void)finishWithError:(NSError *)errorOrNil { - GRXConcurrentWriteable *copiedResponseWriteable = nil; - @synchronized(self) { if (_state == GRXWriterStateFinished) { return; } _state = GRXWriterStateFinished; - copiedResponseWriteable = _responseWriteable; + + if (errorOrNil) { + [_responseWriteable cancelWithError:errorOrNil]; + } else { + [_responseWriteable enqueueSuccessfulCompletion]; + } // If the call isn't retained anywhere else, it can be deallocated now. _retainSelf = nil; } - - if (errorOrNil) { - [copiedResponseWriteable cancelWithError:errorOrNil]; - } else { - [copiedResponseWriteable enqueueSuccessfulCompletion]; - } - _requestWriter.state = GRXWriterStateFinished; } - (void)cancel { @@ -572,19 +570,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; [_wrappedCall cancel]; } -} - -- (void)maybeFinishWithError:(NSError *)errorOrNil { - BOOL toFinish = NO; - @synchronized(self) { - if (_finished == NO) { - _finished = YES; - toFinish = YES; - } - } - if (toFinish == YES) { - [self finishWithError:errorOrNil]; - } + _requestWriter.state = GRXWriterStateFinished; } - (void)dealloc { @@ -647,6 +633,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; }]]; [strongSelf->_wrappedCall cancel]; } + strongSelf->_requestWriter.state = GRXWriterStateFinished; } else { @synchronized(strongSelf) { [strongSelf->_responseWriteable enqueueValue:data @@ -818,6 +805,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; } [strongSelf finishWithError:error]; + strongSelf->_requestWriter.state = GRXWriterStateFinished; } }]; } @@ -841,12 +829,12 @@ const char *kCFStreamVarName = "grpc_cfstream"; callOptions:_callOptions]; if (_wrappedCall == nil) { - [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeUnavailable - userInfo:@{ - NSLocalizedDescriptionKey : - @"Failed to create call or channel." - }]]; + [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ + NSLocalizedDescriptionKey : + @"Failed to create call or channel." + }]]; return; } @@ -971,6 +959,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; NSLocalizedDescriptionKey : @"Connectivity lost." }]]; } + strongSelf->_requestWriter.state = GRXWriterStateFinished; } } -- cgit v1.2.3 From 8b55c6a8714e7c4422adc8e7e105588397b707ba Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 7 Jan 2019 16:54:37 -0800 Subject: clang-format --- src/objective-c/GRPCClient/GRPCCall.m | 6 +++--- src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) (limited to 'src/objective-c/GRPCClient/GRPCCall.m') diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 714c7dbbc2..20703f548e 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -832,9 +832,9 @@ const char *kCFStreamVarName = "grpc_cfstream"; [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeUnavailable userInfo:@{ - NSLocalizedDescriptionKey : - @"Failed to create call or channel." - }]]; + NSLocalizedDescriptionKey : + @"Failed to create call or channel." + }]]; return; } diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index 7cc2101a55..d8491d2aed 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -64,10 +64,10 @@ return; } - @synchronized (self) { - if (self->_cancelled) { - return; - } + @synchronized(self) { + if (self->_cancelled) { + return; + } } [self.writeable writeValue:value]; @@ -80,7 +80,7 @@ if (self->_alreadyFinished) { return; } - @synchronized (self) { + @synchronized(self) { if (self->_cancelled) { return; } -- cgit v1.2.3 From 0c4c2de427399fe260aeaa2140638825fa43f26b Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 8 Jan 2019 11:45:02 -0800 Subject: Clean up code --- src/objective-c/GRPCClient/GRPCCall.m | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'src/objective-c/GRPCClient/GRPCCall.m') diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 20703f548e..bc48f1aa1b 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -467,11 +467,9 @@ const char *kCFStreamVarName = "grpc_cfstream"; + (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path { NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; - uint32_t flags = 0; @synchronized(callFlags) { - flags = [callFlags[hostAndPath] intValue]; + return [callFlags[hostAndPath] intValue]; } - return flags; } // Designated initializer -- cgit v1.2.3 From d4fa274bf2994276341e47920aa46e7a009b4d10 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 9 Jan 2019 09:38:47 -0800 Subject: address comments --- src/objective-c/GRPCClient/GRPCCall.m | 4 ---- src/objective-c/RxLibrary/GRXConcurrentWriteable.m | 6 ++---- src/objective-c/RxLibrary/GRXWriter.h | 2 +- 3 files changed, 3 insertions(+), 9 deletions(-) (limited to 'src/objective-c/GRPCClient/GRPCCall.m') diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index bc48f1aa1b..c0d10cacc1 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -425,9 +425,6 @@ const char *kCFStreamVarName = "grpc_cfstream"; // queue dispatch_queue_t _responseQueue; - // Whether the call is finished. If it is, should not call finishWithError again. - BOOL _finished; - // The OAuth2 token fetched from a token provider. NSString *_fetchedOauth2AccessToken; } @@ -750,7 +747,6 @@ const char *kCFStreamVarName = "grpc_cfstream"; [self cancel]; } else { dispatch_async(_callQueue, ^{ - // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT [self finishRequestWithErrorHandler:nil]; }); diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index d8491d2aed..115195463d 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -113,10 +113,8 @@ - (void)cancelSilently { dispatch_async(_writeableQueue, ^{ - @synchronized(self) { - if (self->_alreadyFinished) { - return; - } + if (self->_alreadyFinished) { + return; } self.writeable = nil; }); diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h index ac1f7b9c4c..df4c80c28d 100644 --- a/src/objective-c/RxLibrary/GRXWriter.h +++ b/src/objective-c/RxLibrary/GRXWriter.h @@ -82,7 +82,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) { * the corresponding value, and that's useful for advanced use cases like pausing an writer. For * more details, see the documentation of the enum further down. The property is thread safe. */ -@property(atomic) GRXWriterState state; +@property GRXWriterState state; /** * Transition to the Started state, and start sending messages to the writeable (a reference to it -- cgit v1.2.3 From 121e04bc1e44bb684d464be6788f1c0a065b1116 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 9 Jan 2019 15:22:05 -0800 Subject: address comments 2 --- src/objective-c/GRPCClient/GRPCCall.m | 1 - src/objective-c/RxLibrary/GRXForwardingWriter.m | 7 ++++++- 2 files changed, 6 insertions(+), 2 deletions(-) (limited to 'src/objective-c/GRPCClient/GRPCCall.m') diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index c0d10cacc1..e8fae09a1f 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -56,7 +56,6 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Make them read-write. @property(atomic, strong) NSDictionary *responseHeaders; @property(atomic, strong) NSDictionary *responseTrailers; -@property(atomic) BOOL isWaitingForToken; - (instancetype)initWithHost:(NSString *)host path:(NSString *)path diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m index f5ed603698..27ac0acdff 100644 --- a/src/objective-c/RxLibrary/GRXForwardingWriter.m +++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m @@ -72,7 +72,11 @@ #pragma mark GRXWriter implementation - (GRXWriterState)state { - return _writer ? _writer.state : GRXWriterStateFinished; + GRXWriter *copiedWriter; + @synchronized(self) { + copiedWriter = _writer; + } + return copiedWriter ? copiedWriter.state : GRXWriterStateFinished; } - (void)setState:(GRXWriterState)state { @@ -106,6 +110,7 @@ @synchronized(self) { [self finishOutputWithError:errorOrNil]; copiedWriter = _writer; + _writer = nil; } copiedWriter.state = GRXWriterStateFinished; } -- cgit v1.2.3