aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/GRPCClient/GRPCCall.m
diff options
context:
space:
mode:
Diffstat (limited to 'src/objective-c/GRPCClient/GRPCCall.m')
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m470
1 files changed, 436 insertions, 34 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 084fbdeb49..83c6edc6e3 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -20,11 +20,16 @@
#import "GRPCCall+OAuth2.h"
+#import <RxLibrary/GRXBufferedPipe.h>
#import <RxLibrary/GRXConcurrentWriteable.h>
#import <RxLibrary/GRXImmediateSingleWriter.h>
+#import <RxLibrary/GRXWriter+Immediate.h>
#include <grpc/grpc.h>
#include <grpc/support/time.h>
+#import "GRPCCallOptions.h"
+#import "private/GRPCChannelPool.h"
+#import "private/GRPCCompletionQueue.h"
#import "private/GRPCConnectivityMonitor.h"
#import "private/GRPCHost.h"
#import "private/GRPCRequestHeaders.h"
@@ -52,6 +57,313 @@ const char *kCFStreamVarName = "grpc_cfstream";
@property(atomic, strong) NSDictionary *responseHeaders;
@property(atomic, strong) NSDictionary *responseTrailers;
@property(atomic) BOOL isWaitingForToken;
+
+- (instancetype)initWithHost:(NSString *)host
+ path:(NSString *)path
+ callSafety:(GRPCCallSafety)safety
+ requestsWriter:(GRXWriter *)requestsWriter
+ callOptions:(GRPCCallOptions *)callOptions;
+
+@end
+
+@implementation GRPCRequestOptions
+
+- (instancetype)initWithHost:(NSString *)host path:(NSString *)path safety:(GRPCCallSafety)safety {
+ NSAssert(host.length != 0 && path.length != 0, @"host and path cannot be empty");
+ if (host.length == 0 || path.length == 0) {
+ return nil;
+ }
+ if ((self = [super init])) {
+ _host = [host copy];
+ _path = [path copy];
+ _safety = safety;
+ }
+ return self;
+}
+
+- (id)copyWithZone:(NSZone *)zone {
+ GRPCRequestOptions *request =
+ [[GRPCRequestOptions alloc] initWithHost:_host path:_path safety:_safety];
+
+ return request;
+}
+
+@end
+
+@implementation GRPCCall2 {
+ /** Options for the call. */
+ GRPCCallOptions *_callOptions;
+ /** The handler of responses. */
+ id<GRPCResponseHandler> _handler;
+
+ // Thread safety of ivars below are protected by _dispatchQueue.
+
+ /**
+ * Make use of legacy GRPCCall to make calls. Nullified when call is finished.
+ */
+ GRPCCall *_call;
+ /** Flags whether initial metadata has been published to response handler. */
+ BOOL _initialMetadataPublished;
+ /** Streaming call writeable to the underlying call. */
+ GRXBufferedPipe *_pipe;
+ /** Serial dispatch queue for tasks inside the call. */
+ dispatch_queue_t _dispatchQueue;
+ /** Flags whether call has started. */
+ BOOL _started;
+ /** Flags whether call has been canceled. */
+ BOOL _canceled;
+ /** Flags whether call has been finished. */
+ BOOL _finished;
+}
+
+- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
+ responseHandler:(id<GRPCResponseHandler>)responseHandler
+ callOptions:(GRPCCallOptions *)callOptions {
+ NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0,
+ @"Neither host nor path can be nil.");
+ NSAssert(requestOptions.safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
+ NSAssert(responseHandler != nil, @"Response handler required.");
+ if (requestOptions.host.length == 0 || requestOptions.path.length == 0) {
+ return nil;
+ }
+ if (requestOptions.safety > GRPCCallSafetyCacheableRequest) {
+ return nil;
+ }
+ if (responseHandler == nil) {
+ return nil;
+ }
+
+ if ((self = [super init])) {
+ _requestOptions = [requestOptions copy];
+ if (callOptions == nil) {
+ _callOptions = [[GRPCCallOptions alloc] init];
+ } else {
+ _callOptions = [callOptions copy];
+ }
+ _handler = responseHandler;
+ _initialMetadataPublished = NO;
+ _pipe = [GRXBufferedPipe pipe];
+ // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above
+#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
+ if (@available(iOS 8.0, macOS 10.10, *)) {
+ _dispatchQueue = dispatch_queue_create(
+ NULL,
+ dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
+ } else {
+#else
+ {
+#endif
+ _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ }
+ dispatch_set_target_queue(_dispatchQueue, responseHandler.dispatchQueue);
+ _started = NO;
+ _canceled = NO;
+ _finished = NO;
+ }
+
+ return self;
+}
+
+- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
+ responseHandler:(id<GRPCResponseHandler>)responseHandler {
+ return
+ [self initWithRequestOptions:requestOptions responseHandler:responseHandler callOptions:nil];
+}
+
+- (void)start {
+ GRPCCall *copiedCall = nil;
+ @synchronized(self) {
+ NSAssert(!_started, @"Call already started.");
+ NSAssert(!_canceled, @"Call already canceled.");
+ if (_started) {
+ return;
+ }
+ if (_canceled) {
+ return;
+ }
+
+ _started = YES;
+ if (!_callOptions) {
+ _callOptions = [[GRPCCallOptions alloc] init];
+ }
+
+ _call = [[GRPCCall alloc] initWithHost:_requestOptions.host
+ path:_requestOptions.path
+ callSafety:_requestOptions.safety
+ requestsWriter:_pipe
+ callOptions:_callOptions];
+ if (_callOptions.initialMetadata) {
+ [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata];
+ }
+ copiedCall = _call;
+ }
+
+ void (^valueHandler)(id value) = ^(id value) {
+ @synchronized(self) {
+ if (self->_handler) {
+ if (!self->_initialMetadataPublished) {
+ self->_initialMetadataPublished = YES;
+ [self issueInitialMetadata:self->_call.responseHeaders];
+ }
+ if (value) {
+ [self issueMessage:value];
+ }
+ }
+ }
+ };
+ void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) {
+ @synchronized(self) {
+ if (self->_handler) {
+ if (!self->_initialMetadataPublished) {
+ self->_initialMetadataPublished = YES;
+ [self issueInitialMetadata:self->_call.responseHeaders];
+ }
+ [self issueClosedWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil];
+ }
+ // Clearing _call must happen *after* dispatching close in order to get trailing
+ // metadata from _call.
+ if (self->_call) {
+ // Clean up the request writers. This should have no effect to _call since its
+ // response writeable is already nullified.
+ [self->_pipe writesFinishedWithError:nil];
+ self->_call = nil;
+ self->_pipe = nil;
+ }
+ }
+ };
+ id<GRXWriteable> responseWriteable =
+ [[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler];
+ [copiedCall startWithWriteable:responseWriteable];
+}
+
+- (void)cancel {
+ GRPCCall *copiedCall = nil;
+ @synchronized(self) {
+ if (_canceled) {
+ return;
+ }
+
+ _canceled = YES;
+
+ copiedCall = _call;
+ _call = nil;
+ _pipe = nil;
+
+ if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
+ dispatch_async(_dispatchQueue, ^{
+ // Copy to local so that block is freed after cancellation completes.
+ id<GRPCResponseHandler> copiedHandler = nil;
+ @synchronized(self) {
+ copiedHandler = self->_handler;
+ self->_handler = nil;
+ }
+
+ [copiedHandler didCloseWithTrailingMetadata:nil
+ error:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeCancelled
+ userInfo:@{
+ NSLocalizedDescriptionKey :
+ @"Canceled by app"
+ }]];
+ });
+ } else {
+ _handler = nil;
+ }
+ }
+ [copiedCall cancel];
+}
+
+- (void)writeData:(NSData *)data {
+ GRXBufferedPipe *copiedPipe = nil;
+ @synchronized(self) {
+ NSAssert(!_canceled, @"Call already canceled.");
+ NSAssert(!_finished, @"Call is half-closed before sending data.");
+ if (_canceled) {
+ return;
+ }
+ if (_finished) {
+ return;
+ }
+
+ if (_pipe) {
+ copiedPipe = _pipe;
+ }
+ }
+ [copiedPipe writeValue:data];
+}
+
+- (void)finish {
+ GRXBufferedPipe *copiedPipe = nil;
+ @synchronized(self) {
+ NSAssert(_started, @"Call not started.");
+ NSAssert(!_canceled, @"Call already canceled.");
+ NSAssert(!_finished, @"Call already half-closed.");
+ if (!_started) {
+ return;
+ }
+ if (_canceled) {
+ return;
+ }
+ if (_finished) {
+ return;
+ }
+
+ if (_pipe) {
+ copiedPipe = _pipe;
+ _pipe = nil;
+ }
+ _finished = YES;
+ }
+ [copiedPipe writesFinishedWithError:nil];
+}
+
+- (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
+ @synchronized(self) {
+ if (initialMetadata != nil &&
+ [_handler respondsToSelector:@selector(didReceiveInitialMetadata:)]) {
+ dispatch_async(_dispatchQueue, ^{
+ id<GRPCResponseHandler> copiedHandler = nil;
+ @synchronized(self) {
+ copiedHandler = self->_handler;
+ }
+ [copiedHandler didReceiveInitialMetadata:initialMetadata];
+ });
+ }
+ }
+}
+
+- (void)issueMessage:(id)message {
+ @synchronized(self) {
+ if (message != nil && [_handler respondsToSelector:@selector(didReceiveRawMessage:)]) {
+ dispatch_async(_dispatchQueue, ^{
+ id<GRPCResponseHandler> copiedHandler = nil;
+ @synchronized(self) {
+ copiedHandler = self->_handler;
+ }
+ [copiedHandler didReceiveRawMessage:message];
+ });
+ }
+ }
+}
+
+- (void)issueClosedWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
+ @synchronized(self) {
+ if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
+ dispatch_async(_dispatchQueue, ^{
+ id<GRPCResponseHandler> copiedHandler = nil;
+ @synchronized(self) {
+ copiedHandler = self->_handler;
+ // Clean up _handler so that no more responses are reported to the handler.
+ self->_handler = nil;
+ }
+ [copiedHandler didCloseWithTrailingMetadata:trailingMetadata error:error];
+ });
+ } else {
+ _handler = nil;
+ }
+ }
+}
+
@end
// The following methods of a C gRPC call object aren't reentrant, and thus
@@ -75,6 +387,8 @@ const char *kCFStreamVarName = "grpc_cfstream";
NSString *_host;
NSString *_path;
+ GRPCCallSafety _callSafety;
+ GRPCCallOptions *_callOptions;
GRPCWrappedCall *_wrappedCall;
GRPCConnectivityMonitor *_connectivityMonitor;
@@ -113,6 +427,9 @@ const char *kCFStreamVarName = "grpc_cfstream";
// Whether the call is finished. If it is, should not call finishWithError again.
BOOL _finished;
+
+ // The OAuth2 token fetched from a token provider.
+ NSString *_fetchedOauth2AccessToken;
}
@synthesize state = _state;
@@ -127,6 +444,9 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
+ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path {
+ if (host.length == 0 || path.length == 0) {
+ return;
+ }
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
switch (callSafety) {
case GRPCCallSafetyDefault:
@@ -148,24 +468,42 @@ const char *kCFStreamVarName = "grpc_cfstream";
return [callFlags[hostAndPath] intValue];
}
-- (instancetype)init {
- return [self initWithHost:nil path:nil requestsWriter:nil];
-}
-
// Designated initializer
- (instancetype)initWithHost:(NSString *)host
path:(NSString *)path
requestsWriter:(GRXWriter *)requestWriter {
+ return [self initWithHost:host
+ path:path
+ callSafety:GRPCCallSafetyDefault
+ requestsWriter:requestWriter
+ callOptions:nil];
+}
+
+- (instancetype)initWithHost:(NSString *)host
+ path:(NSString *)path
+ callSafety:(GRPCCallSafety)safety
+ requestsWriter:(GRXWriter *)requestWriter
+ callOptions:(GRPCCallOptions *)callOptions {
+ // Purposely using pointer rather than length (host.length == 0) for backwards compatibility.
+ NSAssert(host != nil && path != nil, @"Neither host nor path can be nil.");
+ NSAssert(safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
+ NSAssert(requestWriter.state == GRXWriterStateNotStarted,
+ @"The requests writer can't be already started.");
if (!host || !path) {
- [NSException raise:NSInvalidArgumentException format:@"Neither host nor path can be nil."];
+ return nil;
+ }
+ if (safety > GRPCCallSafetyCacheableRequest) {
+ return nil;
}
if (requestWriter.state != GRXWriterStateNotStarted) {
- [NSException raise:NSInvalidArgumentException
- format:@"The requests writer can't be already started."];
+ return nil;
}
+
if ((self = [super init])) {
_host = [host copy];
_path = [path copy];
+ _callSafety = safety;
+ _callOptions = [callOptions copy];
// Serial queue to invoke the non-reentrant methods of the grpc_call object.
_callQueue = dispatch_queue_create("io.grpc.call", NULL);
@@ -209,11 +547,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
[_responseWriteable enqueueSuccessfulCompletion];
}
- // Connectivity monitor is not required for CFStream
- char *enableCFStream = getenv(kCFStreamVarName);
- if (enableCFStream == nil || enableCFStream[0] != '1') {
- [GRPCConnectivityMonitor unregisterObserver:self];
- }
+ [GRPCConnectivityMonitor unregisterObserver:self];
// If the call isn't retained anywhere else, it can be deallocated now.
_retainSelf = nil;
@@ -221,13 +555,14 @@ const char *kCFStreamVarName = "grpc_cfstream";
- (void)cancelCall {
// Can be called from any thread, any number of times.
- [_wrappedCall cancel];
+ @synchronized(self) {
+ [_wrappedCall cancel];
+ }
}
- (void)cancel {
- if (!self.isWaitingForToken) {
+ @synchronized(self) {
[self cancelCall];
- } else {
self.isWaitingForToken = NO;
}
[self
@@ -317,11 +652,37 @@ const char *kCFStreamVarName = "grpc_cfstream";
#pragma mark Send headers
-- (void)sendHeaders:(NSDictionary *)headers {
+- (void)sendHeaders {
+ // TODO (mxyan): Remove after deprecated methods are removed
+ uint32_t callSafetyFlags = 0;
+ switch (_callSafety) {
+ case GRPCCallSafetyDefault:
+ callSafetyFlags = 0;
+ break;
+ case GRPCCallSafetyIdempotentRequest:
+ callSafetyFlags = GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
+ break;
+ case GRPCCallSafetyCacheableRequest:
+ callSafetyFlags = GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
+ break;
+ }
+
+ NSMutableDictionary *headers = [_requestHeaders mutableCopy];
+ NSString *fetchedOauth2AccessToken;
+ @synchronized(self) {
+ fetchedOauth2AccessToken = _fetchedOauth2AccessToken;
+ }
+ if (fetchedOauth2AccessToken != nil) {
+ headers[@"authorization"] = [kBearerPrefix stringByAppendingString:fetchedOauth2AccessToken];
+ } else if (_callOptions.oauth2AccessToken != nil) {
+ headers[@"authorization"] =
+ [kBearerPrefix stringByAppendingString:_callOptions.oauth2AccessToken];
+ }
+
// TODO(jcanizales): Add error handlers for async failures
GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc]
initWithMetadata:headers
- flags:[GRPCCall callFlagsForHost:_host path:_path]
+ flags:callSafetyFlags
handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA
if (!_unaryCall) {
[_wrappedCall startBatchWithOperations:@[ op ]];
@@ -458,13 +819,27 @@ const char *kCFStreamVarName = "grpc_cfstream";
_responseWriteable =
[[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
- _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host
- serverName:_serverName
- path:_path
- timeout:_timeout];
- NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?");
+ GRPCPooledChannel *channel =
+ [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
+ GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:_path
+ completionQueue:[GRPCCompletionQueue completionQueue]
+ callOptions:_callOptions];
+
+ if (wrappedCall == nil) {
+ [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeUnavailable
+ userInfo:@{
+ NSLocalizedDescriptionKey :
+ @"Failed to create call or channel."
+ }]];
+ return;
+ }
+
+ @synchronized(self) {
+ _wrappedCall = wrappedCall;
+ }
- [self sendHeaders:_requestHeaders];
+ [self sendHeaders];
[self invokeCall];
// Connectivity monitor is not required for CFStream
@@ -486,18 +861,45 @@ const char *kCFStreamVarName = "grpc_cfstream";
// that the life of the instance is determined by this retain cycle.
_retainSelf = self;
- if (self.tokenProvider != nil) {
- self.isWaitingForToken = YES;
- __weak typeof(self) weakSelf = self;
- [self.tokenProvider getTokenWithHandler:^(NSString *token) {
- typeof(self) strongSelf = weakSelf;
- if (strongSelf && strongSelf.isWaitingForToken) {
- if (token) {
- NSString *t = [kBearerPrefix stringByAppendingString:token];
- strongSelf.requestHeaders[kAuthorizationHeader] = t;
+ if (_callOptions == nil) {
+ GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
+ if (_serverName.length != 0) {
+ callOptions.serverAuthority = _serverName;
+ }
+ if (_timeout > 0) {
+ callOptions.timeout = _timeout;
+ }
+ uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
+ if (callFlags != 0) {
+ if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
+ _callSafety = GRPCCallSafetyIdempotentRequest;
+ } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
+ _callSafety = GRPCCallSafetyCacheableRequest;
+ }
+ }
+
+ id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
+ if (tokenProvider != nil) {
+ callOptions.authTokenProvider = tokenProvider;
+ }
+ _callOptions = callOptions;
+ }
+
+ NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
+ @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
+ if (_callOptions.authTokenProvider != nil) {
+ @synchronized(self) {
+ self.isWaitingForToken = YES;
+ }
+ [_callOptions.authTokenProvider getTokenWithHandler:^(NSString *token) {
+ @synchronized(self) {
+ if (self.isWaitingForToken) {
+ if (token) {
+ self->_fetchedOauth2AccessToken = [token copy];
+ }
+ [self startCallWithWriteable:writeable];
+ self.isWaitingForToken = NO;
}
- [strongSelf startCallWithWriteable:writeable];
- strongSelf.isWaitingForToken = NO;
}
}];
} else {