diff options
Diffstat (limited to 'src/objective-c/GRPCClient/GRPCCall.m')
-rw-r--r-- | src/objective-c/GRPCClient/GRPCCall.m | 721 |
1 files changed, 564 insertions, 157 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 084fbdeb49..e8fae09a1f 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -20,11 +20,16 @@ #import "GRPCCall+OAuth2.h" +#import <RxLibrary/GRXBufferedPipe.h> #import <RxLibrary/GRXConcurrentWriteable.h> #import <RxLibrary/GRXImmediateSingleWriter.h> +#import <RxLibrary/GRXWriter+Immediate.h> #include <grpc/grpc.h> #include <grpc/support/time.h> +#import "GRPCCallOptions.h" +#import "private/GRPCChannelPool.h" +#import "private/GRPCCompletionQueue.h" #import "private/GRPCConnectivityMonitor.h" #import "private/GRPCHost.h" #import "private/GRPCRequestHeaders.h" @@ -51,7 +56,313 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Make them read-write. @property(atomic, strong) NSDictionary *responseHeaders; @property(atomic, strong) NSDictionary *responseTrailers; -@property(atomic) BOOL isWaitingForToken; + +- (instancetype)initWithHost:(NSString *)host + path:(NSString *)path + callSafety:(GRPCCallSafety)safety + requestsWriter:(GRXWriter *)requestsWriter + callOptions:(GRPCCallOptions *)callOptions; + +@end + +@implementation GRPCRequestOptions + +- (instancetype)initWithHost:(NSString *)host path:(NSString *)path safety:(GRPCCallSafety)safety { + NSAssert(host.length != 0 && path.length != 0, @"host and path cannot be empty"); + if (host.length == 0 || path.length == 0) { + return nil; + } + if ((self = [super init])) { + _host = [host copy]; + _path = [path copy]; + _safety = safety; + } + return self; +} + +- (id)copyWithZone:(NSZone *)zone { + GRPCRequestOptions *request = + [[GRPCRequestOptions alloc] initWithHost:_host path:_path safety:_safety]; + + return request; +} + +@end + +@implementation GRPCCall2 { + /** Options for the call. */ + GRPCCallOptions *_callOptions; + /** The handler of responses. */ + id<GRPCResponseHandler> _handler; + + // Thread safety of ivars below are protected by _dispatchQueue. + + /** + * Make use of legacy GRPCCall to make calls. Nullified when call is finished. + */ + GRPCCall *_call; + /** Flags whether initial metadata has been published to response handler. */ + BOOL _initialMetadataPublished; + /** Streaming call writeable to the underlying call. */ + GRXBufferedPipe *_pipe; + /** Serial dispatch queue for tasks inside the call. */ + dispatch_queue_t _dispatchQueue; + /** Flags whether call has started. */ + BOOL _started; + /** Flags whether call has been canceled. */ + BOOL _canceled; + /** Flags whether call has been finished. */ + BOOL _finished; +} + +- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions + responseHandler:(id<GRPCResponseHandler>)responseHandler + callOptions:(GRPCCallOptions *)callOptions { + NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0, + @"Neither host nor path can be nil."); + NSAssert(requestOptions.safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value."); + NSAssert(responseHandler != nil, @"Response handler required."); + if (requestOptions.host.length == 0 || requestOptions.path.length == 0) { + return nil; + } + if (requestOptions.safety > GRPCCallSafetyCacheableRequest) { + return nil; + } + if (responseHandler == nil) { + return nil; + } + + if ((self = [super init])) { + _requestOptions = [requestOptions copy]; + if (callOptions == nil) { + _callOptions = [[GRPCCallOptions alloc] init]; + } else { + _callOptions = [callOptions copy]; + } + _handler = responseHandler; + _initialMetadataPublished = NO; + _pipe = [GRXBufferedPipe pipe]; + // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above +#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300 + if (@available(iOS 8.0, macOS 10.10, *)) { + _dispatchQueue = dispatch_queue_create( + NULL, + dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); + } else { +#else + { +#endif + _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + } + dispatch_set_target_queue(_dispatchQueue, responseHandler.dispatchQueue); + _started = NO; + _canceled = NO; + _finished = NO; + } + + return self; +} + +- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions + responseHandler:(id<GRPCResponseHandler>)responseHandler { + return + [self initWithRequestOptions:requestOptions responseHandler:responseHandler callOptions:nil]; +} + +- (void)start { + GRPCCall *copiedCall = nil; + @synchronized(self) { + NSAssert(!_started, @"Call already started."); + NSAssert(!_canceled, @"Call already canceled."); + if (_started) { + return; + } + if (_canceled) { + return; + } + + _started = YES; + if (!_callOptions) { + _callOptions = [[GRPCCallOptions alloc] init]; + } + + _call = [[GRPCCall alloc] initWithHost:_requestOptions.host + path:_requestOptions.path + callSafety:_requestOptions.safety + requestsWriter:_pipe + callOptions:_callOptions]; + if (_callOptions.initialMetadata) { + [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata]; + } + copiedCall = _call; + } + + void (^valueHandler)(id value) = ^(id value) { + @synchronized(self) { + if (self->_handler) { + if (!self->_initialMetadataPublished) { + self->_initialMetadataPublished = YES; + [self issueInitialMetadata:self->_call.responseHeaders]; + } + if (value) { + [self issueMessage:value]; + } + } + } + }; + void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) { + @synchronized(self) { + if (self->_handler) { + if (!self->_initialMetadataPublished) { + self->_initialMetadataPublished = YES; + [self issueInitialMetadata:self->_call.responseHeaders]; + } + [self issueClosedWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil]; + } + // Clearing _call must happen *after* dispatching close in order to get trailing + // metadata from _call. + if (self->_call) { + // Clean up the request writers. This should have no effect to _call since its + // response writeable is already nullified. + [self->_pipe writesFinishedWithError:nil]; + self->_call = nil; + self->_pipe = nil; + } + } + }; + id<GRXWriteable> responseWriteable = + [[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler]; + [copiedCall startWithWriteable:responseWriteable]; +} + +- (void)cancel { + GRPCCall *copiedCall = nil; + @synchronized(self) { + if (_canceled) { + return; + } + + _canceled = YES; + + copiedCall = _call; + _call = nil; + _pipe = nil; + + if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) { + dispatch_async(_dispatchQueue, ^{ + // Copy to local so that block is freed after cancellation completes. + id<GRPCResponseHandler> copiedHandler = nil; + @synchronized(self) { + copiedHandler = self->_handler; + self->_handler = nil; + } + + [copiedHandler didCloseWithTrailingMetadata:nil + error:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeCancelled + userInfo:@{ + NSLocalizedDescriptionKey : + @"Canceled by app" + }]]; + }); + } else { + _handler = nil; + } + } + [copiedCall cancel]; +} + +- (void)writeData:(NSData *)data { + GRXBufferedPipe *copiedPipe = nil; + @synchronized(self) { + NSAssert(!_canceled, @"Call already canceled."); + NSAssert(!_finished, @"Call is half-closed before sending data."); + if (_canceled) { + return; + } + if (_finished) { + return; + } + + if (_pipe) { + copiedPipe = _pipe; + } + } + [copiedPipe writeValue:data]; +} + +- (void)finish { + GRXBufferedPipe *copiedPipe = nil; + @synchronized(self) { + NSAssert(_started, @"Call not started."); + NSAssert(!_canceled, @"Call already canceled."); + NSAssert(!_finished, @"Call already half-closed."); + if (!_started) { + return; + } + if (_canceled) { + return; + } + if (_finished) { + return; + } + + if (_pipe) { + copiedPipe = _pipe; + _pipe = nil; + } + _finished = YES; + } + [copiedPipe writesFinishedWithError:nil]; +} + +- (void)issueInitialMetadata:(NSDictionary *)initialMetadata { + @synchronized(self) { + if (initialMetadata != nil && + [_handler respondsToSelector:@selector(didReceiveInitialMetadata:)]) { + dispatch_async(_dispatchQueue, ^{ + id<GRPCResponseHandler> copiedHandler = nil; + @synchronized(self) { + copiedHandler = self->_handler; + } + [copiedHandler didReceiveInitialMetadata:initialMetadata]; + }); + } + } +} + +- (void)issueMessage:(id)message { + @synchronized(self) { + if (message != nil && [_handler respondsToSelector:@selector(didReceiveRawMessage:)]) { + dispatch_async(_dispatchQueue, ^{ + id<GRPCResponseHandler> copiedHandler = nil; + @synchronized(self) { + copiedHandler = self->_handler; + } + [copiedHandler didReceiveRawMessage:message]; + }); + } + } +} + +- (void)issueClosedWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error { + @synchronized(self) { + if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) { + dispatch_async(_dispatchQueue, ^{ + id<GRPCResponseHandler> copiedHandler = nil; + @synchronized(self) { + copiedHandler = self->_handler; + // Clean up _handler so that no more responses are reported to the handler. + self->_handler = nil; + } + [copiedHandler didCloseWithTrailingMetadata:trailingMetadata error:error]; + }); + } else { + _handler = nil; + } + } +} + @end // The following methods of a C gRPC call object aren't reentrant, and thus @@ -75,6 +386,8 @@ const char *kCFStreamVarName = "grpc_cfstream"; NSString *_host; NSString *_path; + GRPCCallSafety _callSafety; + GRPCCallOptions *_callOptions; GRPCWrappedCall *_wrappedCall; GRPCConnectivityMonitor *_connectivityMonitor; @@ -111,8 +424,8 @@ const char *kCFStreamVarName = "grpc_cfstream"; // queue dispatch_queue_t _responseQueue; - // Whether the call is finished. If it is, should not call finishWithError again. - BOOL _finished; + // The OAuth2 token fetched from a token provider. + NSString *_fetchedOauth2AccessToken; } @synthesize state = _state; @@ -127,48 +440,73 @@ const char *kCFStreamVarName = "grpc_cfstream"; } + (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path { + if (host.length == 0 || path.length == 0) { + return; + } NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; - switch (callSafety) { - case GRPCCallSafetyDefault: - callFlags[hostAndPath] = @0; - break; - case GRPCCallSafetyIdempotentRequest: - callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; - break; - case GRPCCallSafetyCacheableRequest: - callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; - break; - default: - break; + @synchronized(callFlags) { + switch (callSafety) { + case GRPCCallSafetyDefault: + callFlags[hostAndPath] = @0; + break; + case GRPCCallSafetyIdempotentRequest: + callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; + break; + case GRPCCallSafetyCacheableRequest: + callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; + break; + default: + break; + } } } + (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path { NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; - return [callFlags[hostAndPath] intValue]; -} - -- (instancetype)init { - return [self initWithHost:nil path:nil requestsWriter:nil]; + @synchronized(callFlags) { + return [callFlags[hostAndPath] intValue]; + } } // Designated initializer - (instancetype)initWithHost:(NSString *)host path:(NSString *)path requestsWriter:(GRXWriter *)requestWriter { + return [self initWithHost:host + path:path + callSafety:GRPCCallSafetyDefault + requestsWriter:requestWriter + callOptions:nil]; +} + +- (instancetype)initWithHost:(NSString *)host + path:(NSString *)path + callSafety:(GRPCCallSafety)safety + requestsWriter:(GRXWriter *)requestWriter + callOptions:(GRPCCallOptions *)callOptions { + // Purposely using pointer rather than length (host.length == 0) for backwards compatibility. + NSAssert(host != nil && path != nil, @"Neither host nor path can be nil."); + NSAssert(safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value."); + NSAssert(requestWriter.state == GRXWriterStateNotStarted, + @"The requests writer can't be already started."); if (!host || !path) { - [NSException raise:NSInvalidArgumentException format:@"Neither host nor path can be nil."]; + return nil; + } + if (safety > GRPCCallSafetyCacheableRequest) { + return nil; } if (requestWriter.state != GRXWriterStateNotStarted) { - [NSException raise:NSInvalidArgumentException - format:@"The requests writer can't be already started."]; + return nil; } + if ((self = [super init])) { _host = [host copy]; _path = [path copy]; + _callSafety = safety; + _callOptions = [callOptions copy]; // Serial queue to invoke the non-reentrant methods of the grpc_call object. - _callQueue = dispatch_queue_create("io.grpc.call", NULL); + _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL); _requestWriter = requestWriter; @@ -185,69 +523,48 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)setResponseDispatchQueue:(dispatch_queue_t)queue { - if (_state != GRXWriterStateNotStarted) { - return; + @synchronized(self) { + if (_state != GRXWriterStateNotStarted) { + return; + } + _responseQueue = queue; } - _responseQueue = queue; } #pragma mark Finish +// This function should support being called within a @synchronized(self) block in another function +// Should not manipulate _requestWriter for deadlock prevention. - (void)finishWithError:(NSError *)errorOrNil { @synchronized(self) { + if (_state == GRXWriterStateFinished) { + return; + } _state = GRXWriterStateFinished; - } - // If there were still request messages coming, stop them. - @synchronized(_requestWriter) { - _requestWriter.state = GRXWriterStateFinished; - } - - if (errorOrNil) { - [_responseWriteable cancelWithError:errorOrNil]; - } else { - [_responseWriteable enqueueSuccessfulCompletion]; - } + if (errorOrNil) { + [_responseWriteable cancelWithError:errorOrNil]; + } else { + [_responseWriteable enqueueSuccessfulCompletion]; + } - // Connectivity monitor is not required for CFStream - char *enableCFStream = getenv(kCFStreamVarName); - if (enableCFStream == nil || enableCFStream[0] != '1') { - [GRPCConnectivityMonitor unregisterObserver:self]; + // If the call isn't retained anywhere else, it can be deallocated now. + _retainSelf = nil; } - - // If the call isn't retained anywhere else, it can be deallocated now. - _retainSelf = nil; -} - -- (void)cancelCall { - // Can be called from any thread, any number of times. - [_wrappedCall cancel]; } - (void)cancel { - if (!self.isWaitingForToken) { - [self cancelCall]; - } else { - self.isWaitingForToken = NO; - } - [self - maybeFinishWithError:[NSError - errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeCancelled - userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; -} - -- (void)maybeFinishWithError:(NSError *)errorOrNil { - BOOL toFinish = NO; @synchronized(self) { - if (_finished == NO) { - _finished = YES; - toFinish = YES; + if (_state == GRXWriterStateFinished) { + return; } + [self finishWithError:[NSError + errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeCancelled + userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; + [_wrappedCall cancel]; } - if (toFinish == YES) { - [self finishWithError:errorOrNil]; - } + _requestWriter.state = GRXWriterStateFinished; } - (void)dealloc { @@ -274,21 +591,24 @@ const char *kCFStreamVarName = "grpc_cfstream"; // TODO(jcanizales): Rename to readResponseIfNotPaused. - (void)startNextRead { @synchronized(self) { - if (self.state == GRXWriterStatePaused) { + if (_state != GRXWriterStateStarted) { return; } } dispatch_async(_callQueue, ^{ __weak GRPCCall *weakSelf = self; - __weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable; [self startReadWithHandler:^(grpc_byte_buffer *message) { - __strong GRPCCall *strongSelf = weakSelf; - __strong GRXConcurrentWriteable *strongWriteable = weakWriteable; + NSLog(@"message received"); if (message == NULL) { // No more messages from the server return; } + __strong GRPCCall *strongSelf = weakSelf; + if (strongSelf == nil) { + grpc_byte_buffer_destroy(message); + return; + } NSData *data = [NSData grpc_dataWithByteBuffer:message]; grpc_byte_buffer_destroy(message); if (!data) { @@ -296,38 +616,71 @@ const char *kCFStreamVarName = "grpc_cfstream"; // don't want to throw, because the app shouldn't crash for a behavior // that's on the hands of any server to have. Instead we finish and ask // the server to cancel. - [strongSelf cancelCall]; - [strongSelf - maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeResourceExhausted - userInfo:@{ - NSLocalizedDescriptionKey : - @"Client does not have enough memory to " - @"hold the server response." - }]]; - return; + @synchronized(strongSelf) { + [strongSelf + finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeResourceExhausted + userInfo:@{ + NSLocalizedDescriptionKey : + @"Client does not have enough memory to " + @"hold the server response." + }]]; + [strongSelf->_wrappedCall cancel]; + } + strongSelf->_requestWriter.state = GRXWriterStateFinished; + } else { + @synchronized(strongSelf) { + [strongSelf->_responseWriteable enqueueValue:data + completionHandler:^{ + [strongSelf startNextRead]; + }]; + } } - [strongWriteable enqueueValue:data - completionHandler:^{ - [strongSelf startNextRead]; - }]; }]; }); } #pragma mark Send headers -- (void)sendHeaders:(NSDictionary *)headers { +- (void)sendHeaders { + // TODO (mxyan): Remove after deprecated methods are removed + uint32_t callSafetyFlags = 0; + switch (_callSafety) { + case GRPCCallSafetyDefault: + callSafetyFlags = 0; + break; + case GRPCCallSafetyIdempotentRequest: + callSafetyFlags = GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; + break; + case GRPCCallSafetyCacheableRequest: + callSafetyFlags = GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; + break; + } + + NSMutableDictionary *headers = [_requestHeaders mutableCopy]; + NSString *fetchedOauth2AccessToken; + @synchronized(self) { + fetchedOauth2AccessToken = _fetchedOauth2AccessToken; + } + if (fetchedOauth2AccessToken != nil) { + headers[@"authorization"] = [kBearerPrefix stringByAppendingString:fetchedOauth2AccessToken]; + } else if (_callOptions.oauth2AccessToken != nil) { + headers[@"authorization"] = + [kBearerPrefix stringByAppendingString:_callOptions.oauth2AccessToken]; + } + // TODO(jcanizales): Add error handlers for async failures GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] initWithMetadata:headers - flags:[GRPCCall callFlagsForHost:_host path:_path] + flags:callSafetyFlags handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA - if (!_unaryCall) { - [_wrappedCall startBatchWithOperations:@[ op ]]; - } else { - [_unaryOpBatch addObject:op]; - } + dispatch_async(_callQueue, ^{ + if (!self->_unaryCall) { + [self->_wrappedCall startBatchWithOperations:@[ op ]]; + } else { + [self->_unaryOpBatch addObject:op]; + } + }); } #pragma mark GRXWriteable implementation @@ -342,9 +695,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Resume the request writer. GRPCCall *strongSelf = weakSelf; if (strongSelf) { - @synchronized(strongSelf->_requestWriter) { - strongSelf->_requestWriter.state = GRXWriterStateStarted; - } + strongSelf->_requestWriter.state = GRXWriterStateStarted; } }; @@ -360,13 +711,17 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)writeValue:(id)value { - // TODO(jcanizales): Throw/assert if value isn't NSData. + NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData"); + + @synchronized(self) { + if (_state == GRXWriterStateFinished) { + return; + } + } // Pause the input and only resume it when the C layer notifies us that writes // can proceed. - @synchronized(_requestWriter) { - _requestWriter.state = GRXWriterStatePaused; - } + _requestWriter.state = GRXWriterStatePaused; dispatch_async(_callQueue, ^{ // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT @@ -405,17 +760,20 @@ const char *kCFStreamVarName = "grpc_cfstream"; // The second one (completionHandler), whenever the RPC finishes for any reason. - (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler { - // TODO(jcanizales): Add error handlers for async failures - [_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; - [_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; + dispatch_async(_callQueue, ^{ + // TODO(jcanizales): Add error handlers for async failures + [self->_wrappedCall + startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; + [self->_wrappedCall + startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; + }); } - (void)invokeCall { __weak GRPCCall *weakSelf = self; [self invokeCallWithHeadersHandler:^(NSDictionary *headers) { // Response headers received. + NSLog(@"response received"); __strong GRPCCall *strongSelf = weakSelf; if (strongSelf) { strongSelf.responseHeaders = headers; @@ -423,6 +781,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; } } completionHandler:^(NSError *error, NSDictionary *trailers) { + NSLog(@"completion received"); __strong GRPCCall *strongSelf = weakSelf; if (strongSelf) { strongSelf.responseTrailers = trailers; @@ -433,71 +792,114 @@ const char *kCFStreamVarName = "grpc_cfstream"; [userInfo addEntriesFromDictionary:error.userInfo]; } userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers; - // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be - // called before this one, so an error might end up with trailers but no headers. We - // shouldn't call finishWithError until ater both blocks are called. It is also when - // this is done that we can provide a merged view of response headers and trailers in a - // thread-safe way. - if (strongSelf.responseHeaders) { - userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; - } + // Since gRPC core does not guarantee the headers block being called before this block, + // responseHeaders might be nil. + userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; } - [strongSelf maybeFinishWithError:error]; + [strongSelf finishWithError:error]; + strongSelf->_requestWriter.state = GRXWriterStateFinished; } }]; - // Now that the RPC has been initiated, request writes can start. - @synchronized(_requestWriter) { - [_requestWriter startWithWriteable:self]; - } } #pragma mark GRXWriter implementation +// Lock acquired inside startWithWriteable: - (void)startCallWithWriteable:(id<GRXWriteable>)writeable { - _responseWriteable = - [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; + @synchronized(self) { + if (_state == GRXWriterStateFinished) { + return; + } - _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host - serverName:_serverName - path:_path - timeout:_timeout]; - NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?"); + _responseWriteable = + [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; + + GRPCPooledChannel *channel = + [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; + _wrappedCall = [channel wrappedCallWithPath:_path + completionQueue:[GRPCCompletionQueue completionQueue] + callOptions:_callOptions]; + + if (_wrappedCall == nil) { + [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ + NSLocalizedDescriptionKey : + @"Failed to create call or channel." + }]]; + return; + } - [self sendHeaders:_requestHeaders]; - [self invokeCall]; + [self sendHeaders]; + [self invokeCall]; - // Connectivity monitor is not required for CFStream - char *enableCFStream = getenv(kCFStreamVarName); - if (enableCFStream == nil || enableCFStream[0] != '1') { - [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)]; + // Connectivity monitor is not required for CFStream + char *enableCFStream = getenv(kCFStreamVarName); + if (enableCFStream == nil || enableCFStream[0] != '1') { + [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)]; + } } + + // Now that the RPC has been initiated, request writes can start. + [_requestWriter startWithWriteable:self]; } - (void)startWithWriteable:(id<GRXWriteable>)writeable { + id<GRPCAuthorizationProtocol> tokenProvider = nil; @synchronized(self) { _state = GRXWriterStateStarted; - } - // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). - // This makes RPCs in which the call isn't externally retained possible (as long as it is started - // before being autoreleased). - // Care is taken not to retain self strongly in any of the blocks used in this implementation, so - // that the life of the instance is determined by this retain cycle. - _retainSelf = self; + // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). + // This makes RPCs in which the call isn't externally retained possible (as long as it is + // started before being autoreleased). Care is taken not to retain self strongly in any of the + // blocks used in this implementation, so that the life of the instance is determined by this + // retain cycle. + _retainSelf = self; + + if (_callOptions == nil) { + GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy]; + if (_serverName.length != 0) { + callOptions.serverAuthority = _serverName; + } + if (_timeout > 0) { + callOptions.timeout = _timeout; + } + uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path]; + if (callFlags != 0) { + if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { + _callSafety = GRPCCallSafetyIdempotentRequest; + } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) { + _callSafety = GRPCCallSafetyCacheableRequest; + } + } + + id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider; + if (tokenProvider != nil) { + callOptions.authTokenProvider = tokenProvider; + } + _callOptions = callOptions; + } + + NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil, + @"authTokenProvider and oauth2AccessToken cannot be set at the same time"); - if (self.tokenProvider != nil) { - self.isWaitingForToken = YES; + tokenProvider = _callOptions.authTokenProvider; + } + + if (tokenProvider != nil) { __weak typeof(self) weakSelf = self; - [self.tokenProvider getTokenWithHandler:^(NSString *token) { - typeof(self) strongSelf = weakSelf; - if (strongSelf && strongSelf.isWaitingForToken) { - if (token) { - NSString *t = [kBearerPrefix stringByAppendingString:token]; - strongSelf.requestHeaders[kAuthorizationHeader] = t; + [tokenProvider getTokenWithHandler:^(NSString *token) { + __strong typeof(self) strongSelf = weakSelf; + if (strongSelf) { + @synchronized(strongSelf) { + if (strongSelf->_state == GRXWriterStateNotStarted) { + if (token) { + strongSelf->_fetchedOauth2AccessToken = [token copy]; + } + } } [strongSelf startCallWithWriteable:writeable]; - strongSelf.isWaitingForToken = NO; } }]; } else { @@ -536,16 +938,21 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)connectivityChanged:(NSNotification *)note { - // Cancel underlying call upon this notification + // Cancel underlying call upon this notification. + + // Retain because connectivity manager only keeps weak reference to GRPCCall. __strong GRPCCall *strongSelf = self; if (strongSelf) { - [self cancelCall]; - [self - maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeUnavailable - userInfo:@{ - NSLocalizedDescriptionKey : @"Connectivity lost." - }]]; + @synchronized(strongSelf) { + [_wrappedCall cancel]; + [strongSelf + finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ + NSLocalizedDescriptionKey : @"Connectivity lost." + }]]; + } + strongSelf->_requestWriter.state = GRXWriterStateFinished; } } |