aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/GRPCClient/GRPCCall.m
diff options
context:
space:
mode:
Diffstat (limited to 'src/objective-c/GRPCClient/GRPCCall.m')
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m721
1 files changed, 564 insertions, 157 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 084fbdeb49..e8fae09a1f 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -20,11 +20,16 @@
#import "GRPCCall+OAuth2.h"
+#import <RxLibrary/GRXBufferedPipe.h>
#import <RxLibrary/GRXConcurrentWriteable.h>
#import <RxLibrary/GRXImmediateSingleWriter.h>
+#import <RxLibrary/GRXWriter+Immediate.h>
#include <grpc/grpc.h>
#include <grpc/support/time.h>
+#import "GRPCCallOptions.h"
+#import "private/GRPCChannelPool.h"
+#import "private/GRPCCompletionQueue.h"
#import "private/GRPCConnectivityMonitor.h"
#import "private/GRPCHost.h"
#import "private/GRPCRequestHeaders.h"
@@ -51,7 +56,313 @@ const char *kCFStreamVarName = "grpc_cfstream";
// Make them read-write.
@property(atomic, strong) NSDictionary *responseHeaders;
@property(atomic, strong) NSDictionary *responseTrailers;
-@property(atomic) BOOL isWaitingForToken;
+
+- (instancetype)initWithHost:(NSString *)host
+ path:(NSString *)path
+ callSafety:(GRPCCallSafety)safety
+ requestsWriter:(GRXWriter *)requestsWriter
+ callOptions:(GRPCCallOptions *)callOptions;
+
+@end
+
+@implementation GRPCRequestOptions
+
+- (instancetype)initWithHost:(NSString *)host path:(NSString *)path safety:(GRPCCallSafety)safety {
+ NSAssert(host.length != 0 && path.length != 0, @"host and path cannot be empty");
+ if (host.length == 0 || path.length == 0) {
+ return nil;
+ }
+ if ((self = [super init])) {
+ _host = [host copy];
+ _path = [path copy];
+ _safety = safety;
+ }
+ return self;
+}
+
+- (id)copyWithZone:(NSZone *)zone {
+ GRPCRequestOptions *request =
+ [[GRPCRequestOptions alloc] initWithHost:_host path:_path safety:_safety];
+
+ return request;
+}
+
+@end
+
+@implementation GRPCCall2 {
+ /** Options for the call. */
+ GRPCCallOptions *_callOptions;
+ /** The handler of responses. */
+ id<GRPCResponseHandler> _handler;
+
+ // Thread safety of ivars below are protected by _dispatchQueue.
+
+ /**
+ * Make use of legacy GRPCCall to make calls. Nullified when call is finished.
+ */
+ GRPCCall *_call;
+ /** Flags whether initial metadata has been published to response handler. */
+ BOOL _initialMetadataPublished;
+ /** Streaming call writeable to the underlying call. */
+ GRXBufferedPipe *_pipe;
+ /** Serial dispatch queue for tasks inside the call. */
+ dispatch_queue_t _dispatchQueue;
+ /** Flags whether call has started. */
+ BOOL _started;
+ /** Flags whether call has been canceled. */
+ BOOL _canceled;
+ /** Flags whether call has been finished. */
+ BOOL _finished;
+}
+
+- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
+ responseHandler:(id<GRPCResponseHandler>)responseHandler
+ callOptions:(GRPCCallOptions *)callOptions {
+ NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0,
+ @"Neither host nor path can be nil.");
+ NSAssert(requestOptions.safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
+ NSAssert(responseHandler != nil, @"Response handler required.");
+ if (requestOptions.host.length == 0 || requestOptions.path.length == 0) {
+ return nil;
+ }
+ if (requestOptions.safety > GRPCCallSafetyCacheableRequest) {
+ return nil;
+ }
+ if (responseHandler == nil) {
+ return nil;
+ }
+
+ if ((self = [super init])) {
+ _requestOptions = [requestOptions copy];
+ if (callOptions == nil) {
+ _callOptions = [[GRPCCallOptions alloc] init];
+ } else {
+ _callOptions = [callOptions copy];
+ }
+ _handler = responseHandler;
+ _initialMetadataPublished = NO;
+ _pipe = [GRXBufferedPipe pipe];
+ // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above
+#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
+ if (@available(iOS 8.0, macOS 10.10, *)) {
+ _dispatchQueue = dispatch_queue_create(
+ NULL,
+ dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
+ } else {
+#else
+ {
+#endif
+ _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ }
+ dispatch_set_target_queue(_dispatchQueue, responseHandler.dispatchQueue);
+ _started = NO;
+ _canceled = NO;
+ _finished = NO;
+ }
+
+ return self;
+}
+
+- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
+ responseHandler:(id<GRPCResponseHandler>)responseHandler {
+ return
+ [self initWithRequestOptions:requestOptions responseHandler:responseHandler callOptions:nil];
+}
+
+- (void)start {
+ GRPCCall *copiedCall = nil;
+ @synchronized(self) {
+ NSAssert(!_started, @"Call already started.");
+ NSAssert(!_canceled, @"Call already canceled.");
+ if (_started) {
+ return;
+ }
+ if (_canceled) {
+ return;
+ }
+
+ _started = YES;
+ if (!_callOptions) {
+ _callOptions = [[GRPCCallOptions alloc] init];
+ }
+
+ _call = [[GRPCCall alloc] initWithHost:_requestOptions.host
+ path:_requestOptions.path
+ callSafety:_requestOptions.safety
+ requestsWriter:_pipe
+ callOptions:_callOptions];
+ if (_callOptions.initialMetadata) {
+ [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata];
+ }
+ copiedCall = _call;
+ }
+
+ void (^valueHandler)(id value) = ^(id value) {
+ @synchronized(self) {
+ if (self->_handler) {
+ if (!self->_initialMetadataPublished) {
+ self->_initialMetadataPublished = YES;
+ [self issueInitialMetadata:self->_call.responseHeaders];
+ }
+ if (value) {
+ [self issueMessage:value];
+ }
+ }
+ }
+ };
+ void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) {
+ @synchronized(self) {
+ if (self->_handler) {
+ if (!self->_initialMetadataPublished) {
+ self->_initialMetadataPublished = YES;
+ [self issueInitialMetadata:self->_call.responseHeaders];
+ }
+ [self issueClosedWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil];
+ }
+ // Clearing _call must happen *after* dispatching close in order to get trailing
+ // metadata from _call.
+ if (self->_call) {
+ // Clean up the request writers. This should have no effect to _call since its
+ // response writeable is already nullified.
+ [self->_pipe writesFinishedWithError:nil];
+ self->_call = nil;
+ self->_pipe = nil;
+ }
+ }
+ };
+ id<GRXWriteable> responseWriteable =
+ [[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler];
+ [copiedCall startWithWriteable:responseWriteable];
+}
+
+- (void)cancel {
+ GRPCCall *copiedCall = nil;
+ @synchronized(self) {
+ if (_canceled) {
+ return;
+ }
+
+ _canceled = YES;
+
+ copiedCall = _call;
+ _call = nil;
+ _pipe = nil;
+
+ if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
+ dispatch_async(_dispatchQueue, ^{
+ // Copy to local so that block is freed after cancellation completes.
+ id<GRPCResponseHandler> copiedHandler = nil;
+ @synchronized(self) {
+ copiedHandler = self->_handler;
+ self->_handler = nil;
+ }
+
+ [copiedHandler didCloseWithTrailingMetadata:nil
+ error:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeCancelled
+ userInfo:@{
+ NSLocalizedDescriptionKey :
+ @"Canceled by app"
+ }]];
+ });
+ } else {
+ _handler = nil;
+ }
+ }
+ [copiedCall cancel];
+}
+
+- (void)writeData:(NSData *)data {
+ GRXBufferedPipe *copiedPipe = nil;
+ @synchronized(self) {
+ NSAssert(!_canceled, @"Call already canceled.");
+ NSAssert(!_finished, @"Call is half-closed before sending data.");
+ if (_canceled) {
+ return;
+ }
+ if (_finished) {
+ return;
+ }
+
+ if (_pipe) {
+ copiedPipe = _pipe;
+ }
+ }
+ [copiedPipe writeValue:data];
+}
+
+- (void)finish {
+ GRXBufferedPipe *copiedPipe = nil;
+ @synchronized(self) {
+ NSAssert(_started, @"Call not started.");
+ NSAssert(!_canceled, @"Call already canceled.");
+ NSAssert(!_finished, @"Call already half-closed.");
+ if (!_started) {
+ return;
+ }
+ if (_canceled) {
+ return;
+ }
+ if (_finished) {
+ return;
+ }
+
+ if (_pipe) {
+ copiedPipe = _pipe;
+ _pipe = nil;
+ }
+ _finished = YES;
+ }
+ [copiedPipe writesFinishedWithError:nil];
+}
+
+- (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
+ @synchronized(self) {
+ if (initialMetadata != nil &&
+ [_handler respondsToSelector:@selector(didReceiveInitialMetadata:)]) {
+ dispatch_async(_dispatchQueue, ^{
+ id<GRPCResponseHandler> copiedHandler = nil;
+ @synchronized(self) {
+ copiedHandler = self->_handler;
+ }
+ [copiedHandler didReceiveInitialMetadata:initialMetadata];
+ });
+ }
+ }
+}
+
+- (void)issueMessage:(id)message {
+ @synchronized(self) {
+ if (message != nil && [_handler respondsToSelector:@selector(didReceiveRawMessage:)]) {
+ dispatch_async(_dispatchQueue, ^{
+ id<GRPCResponseHandler> copiedHandler = nil;
+ @synchronized(self) {
+ copiedHandler = self->_handler;
+ }
+ [copiedHandler didReceiveRawMessage:message];
+ });
+ }
+ }
+}
+
+- (void)issueClosedWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
+ @synchronized(self) {
+ if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
+ dispatch_async(_dispatchQueue, ^{
+ id<GRPCResponseHandler> copiedHandler = nil;
+ @synchronized(self) {
+ copiedHandler = self->_handler;
+ // Clean up _handler so that no more responses are reported to the handler.
+ self->_handler = nil;
+ }
+ [copiedHandler didCloseWithTrailingMetadata:trailingMetadata error:error];
+ });
+ } else {
+ _handler = nil;
+ }
+ }
+}
+
@end
// The following methods of a C gRPC call object aren't reentrant, and thus
@@ -75,6 +386,8 @@ const char *kCFStreamVarName = "grpc_cfstream";
NSString *_host;
NSString *_path;
+ GRPCCallSafety _callSafety;
+ GRPCCallOptions *_callOptions;
GRPCWrappedCall *_wrappedCall;
GRPCConnectivityMonitor *_connectivityMonitor;
@@ -111,8 +424,8 @@ const char *kCFStreamVarName = "grpc_cfstream";
// queue
dispatch_queue_t _responseQueue;
- // Whether the call is finished. If it is, should not call finishWithError again.
- BOOL _finished;
+ // The OAuth2 token fetched from a token provider.
+ NSString *_fetchedOauth2AccessToken;
}
@synthesize state = _state;
@@ -127,48 +440,73 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
+ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path {
+ if (host.length == 0 || path.length == 0) {
+ return;
+ }
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
- switch (callSafety) {
- case GRPCCallSafetyDefault:
- callFlags[hostAndPath] = @0;
- break;
- case GRPCCallSafetyIdempotentRequest:
- callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
- break;
- case GRPCCallSafetyCacheableRequest:
- callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
- break;
- default:
- break;
+ @synchronized(callFlags) {
+ switch (callSafety) {
+ case GRPCCallSafetyDefault:
+ callFlags[hostAndPath] = @0;
+ break;
+ case GRPCCallSafetyIdempotentRequest:
+ callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
+ break;
+ case GRPCCallSafetyCacheableRequest:
+ callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
+ break;
+ default:
+ break;
+ }
}
}
+ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
- return [callFlags[hostAndPath] intValue];
-}
-
-- (instancetype)init {
- return [self initWithHost:nil path:nil requestsWriter:nil];
+ @synchronized(callFlags) {
+ return [callFlags[hostAndPath] intValue];
+ }
}
// Designated initializer
- (instancetype)initWithHost:(NSString *)host
path:(NSString *)path
requestsWriter:(GRXWriter *)requestWriter {
+ return [self initWithHost:host
+ path:path
+ callSafety:GRPCCallSafetyDefault
+ requestsWriter:requestWriter
+ callOptions:nil];
+}
+
+- (instancetype)initWithHost:(NSString *)host
+ path:(NSString *)path
+ callSafety:(GRPCCallSafety)safety
+ requestsWriter:(GRXWriter *)requestWriter
+ callOptions:(GRPCCallOptions *)callOptions {
+ // Purposely using pointer rather than length (host.length == 0) for backwards compatibility.
+ NSAssert(host != nil && path != nil, @"Neither host nor path can be nil.");
+ NSAssert(safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
+ NSAssert(requestWriter.state == GRXWriterStateNotStarted,
+ @"The requests writer can't be already started.");
if (!host || !path) {
- [NSException raise:NSInvalidArgumentException format:@"Neither host nor path can be nil."];
+ return nil;
+ }
+ if (safety > GRPCCallSafetyCacheableRequest) {
+ return nil;
}
if (requestWriter.state != GRXWriterStateNotStarted) {
- [NSException raise:NSInvalidArgumentException
- format:@"The requests writer can't be already started."];
+ return nil;
}
+
if ((self = [super init])) {
_host = [host copy];
_path = [path copy];
+ _callSafety = safety;
+ _callOptions = [callOptions copy];
// Serial queue to invoke the non-reentrant methods of the grpc_call object.
- _callQueue = dispatch_queue_create("io.grpc.call", NULL);
+ _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL);
_requestWriter = requestWriter;
@@ -185,69 +523,48 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
- if (_state != GRXWriterStateNotStarted) {
- return;
+ @synchronized(self) {
+ if (_state != GRXWriterStateNotStarted) {
+ return;
+ }
+ _responseQueue = queue;
}
- _responseQueue = queue;
}
#pragma mark Finish
+// This function should support being called within a @synchronized(self) block in another function
+// Should not manipulate _requestWriter for deadlock prevention.
- (void)finishWithError:(NSError *)errorOrNil {
@synchronized(self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
_state = GRXWriterStateFinished;
- }
- // If there were still request messages coming, stop them.
- @synchronized(_requestWriter) {
- _requestWriter.state = GRXWriterStateFinished;
- }
-
- if (errorOrNil) {
- [_responseWriteable cancelWithError:errorOrNil];
- } else {
- [_responseWriteable enqueueSuccessfulCompletion];
- }
+ if (errorOrNil) {
+ [_responseWriteable cancelWithError:errorOrNil];
+ } else {
+ [_responseWriteable enqueueSuccessfulCompletion];
+ }
- // Connectivity monitor is not required for CFStream
- char *enableCFStream = getenv(kCFStreamVarName);
- if (enableCFStream == nil || enableCFStream[0] != '1') {
- [GRPCConnectivityMonitor unregisterObserver:self];
+ // If the call isn't retained anywhere else, it can be deallocated now.
+ _retainSelf = nil;
}
-
- // If the call isn't retained anywhere else, it can be deallocated now.
- _retainSelf = nil;
-}
-
-- (void)cancelCall {
- // Can be called from any thread, any number of times.
- [_wrappedCall cancel];
}
- (void)cancel {
- if (!self.isWaitingForToken) {
- [self cancelCall];
- } else {
- self.isWaitingForToken = NO;
- }
- [self
- maybeFinishWithError:[NSError
- errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeCancelled
- userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
-}
-
-- (void)maybeFinishWithError:(NSError *)errorOrNil {
- BOOL toFinish = NO;
@synchronized(self) {
- if (_finished == NO) {
- _finished = YES;
- toFinish = YES;
+ if (_state == GRXWriterStateFinished) {
+ return;
}
+ [self finishWithError:[NSError
+ errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeCancelled
+ userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
+ [_wrappedCall cancel];
}
- if (toFinish == YES) {
- [self finishWithError:errorOrNil];
- }
+ _requestWriter.state = GRXWriterStateFinished;
}
- (void)dealloc {
@@ -274,21 +591,24 @@ const char *kCFStreamVarName = "grpc_cfstream";
// TODO(jcanizales): Rename to readResponseIfNotPaused.
- (void)startNextRead {
@synchronized(self) {
- if (self.state == GRXWriterStatePaused) {
+ if (_state != GRXWriterStateStarted) {
return;
}
}
dispatch_async(_callQueue, ^{
__weak GRPCCall *weakSelf = self;
- __weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable;
[self startReadWithHandler:^(grpc_byte_buffer *message) {
- __strong GRPCCall *strongSelf = weakSelf;
- __strong GRXConcurrentWriteable *strongWriteable = weakWriteable;
+ NSLog(@"message received");
if (message == NULL) {
// No more messages from the server
return;
}
+ __strong GRPCCall *strongSelf = weakSelf;
+ if (strongSelf == nil) {
+ grpc_byte_buffer_destroy(message);
+ return;
+ }
NSData *data = [NSData grpc_dataWithByteBuffer:message];
grpc_byte_buffer_destroy(message);
if (!data) {
@@ -296,38 +616,71 @@ const char *kCFStreamVarName = "grpc_cfstream";
// don't want to throw, because the app shouldn't crash for a behavior
// that's on the hands of any server to have. Instead we finish and ask
// the server to cancel.
- [strongSelf cancelCall];
- [strongSelf
- maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeResourceExhausted
- userInfo:@{
- NSLocalizedDescriptionKey :
- @"Client does not have enough memory to "
- @"hold the server response."
- }]];
- return;
+ @synchronized(strongSelf) {
+ [strongSelf
+ finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeResourceExhausted
+ userInfo:@{
+ NSLocalizedDescriptionKey :
+ @"Client does not have enough memory to "
+ @"hold the server response."
+ }]];
+ [strongSelf->_wrappedCall cancel];
+ }
+ strongSelf->_requestWriter.state = GRXWriterStateFinished;
+ } else {
+ @synchronized(strongSelf) {
+ [strongSelf->_responseWriteable enqueueValue:data
+ completionHandler:^{
+ [strongSelf startNextRead];
+ }];
+ }
}
- [strongWriteable enqueueValue:data
- completionHandler:^{
- [strongSelf startNextRead];
- }];
}];
});
}
#pragma mark Send headers
-- (void)sendHeaders:(NSDictionary *)headers {
+- (void)sendHeaders {
+ // TODO (mxyan): Remove after deprecated methods are removed
+ uint32_t callSafetyFlags = 0;
+ switch (_callSafety) {
+ case GRPCCallSafetyDefault:
+ callSafetyFlags = 0;
+ break;
+ case GRPCCallSafetyIdempotentRequest:
+ callSafetyFlags = GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
+ break;
+ case GRPCCallSafetyCacheableRequest:
+ callSafetyFlags = GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
+ break;
+ }
+
+ NSMutableDictionary *headers = [_requestHeaders mutableCopy];
+ NSString *fetchedOauth2AccessToken;
+ @synchronized(self) {
+ fetchedOauth2AccessToken = _fetchedOauth2AccessToken;
+ }
+ if (fetchedOauth2AccessToken != nil) {
+ headers[@"authorization"] = [kBearerPrefix stringByAppendingString:fetchedOauth2AccessToken];
+ } else if (_callOptions.oauth2AccessToken != nil) {
+ headers[@"authorization"] =
+ [kBearerPrefix stringByAppendingString:_callOptions.oauth2AccessToken];
+ }
+
// TODO(jcanizales): Add error handlers for async failures
GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc]
initWithMetadata:headers
- flags:[GRPCCall callFlagsForHost:_host path:_path]
+ flags:callSafetyFlags
handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA
- if (!_unaryCall) {
- [_wrappedCall startBatchWithOperations:@[ op ]];
- } else {
- [_unaryOpBatch addObject:op];
- }
+ dispatch_async(_callQueue, ^{
+ if (!self->_unaryCall) {
+ [self->_wrappedCall startBatchWithOperations:@[ op ]];
+ } else {
+ [self->_unaryOpBatch addObject:op];
+ }
+ });
}
#pragma mark GRXWriteable implementation
@@ -342,9 +695,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
- @synchronized(strongSelf->_requestWriter) {
- strongSelf->_requestWriter.state = GRXWriterStateStarted;
- }
+ strongSelf->_requestWriter.state = GRXWriterStateStarted;
}
};
@@ -360,13 +711,17 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)writeValue:(id)value {
- // TODO(jcanizales): Throw/assert if value isn't NSData.
+ NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData");
+
+ @synchronized(self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
+ }
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
- @synchronized(_requestWriter) {
- _requestWriter.state = GRXWriterStatePaused;
- }
+ _requestWriter.state = GRXWriterStatePaused;
dispatch_async(_callQueue, ^{
// Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
@@ -405,17 +760,20 @@ const char *kCFStreamVarName = "grpc_cfstream";
// The second one (completionHandler), whenever the RPC finishes for any reason.
- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler {
- // TODO(jcanizales): Add error handlers for async failures
- [_wrappedCall
- startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
- [_wrappedCall
- startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
+ dispatch_async(_callQueue, ^{
+ // TODO(jcanizales): Add error handlers for async failures
+ [self->_wrappedCall
+ startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
+ [self->_wrappedCall
+ startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
+ });
}
- (void)invokeCall {
__weak GRPCCall *weakSelf = self;
[self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
// Response headers received.
+ NSLog(@"response received");
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseHeaders = headers;
@@ -423,6 +781,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
}
completionHandler:^(NSError *error, NSDictionary *trailers) {
+ NSLog(@"completion received");
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseTrailers = trailers;
@@ -433,71 +792,114 @@ const char *kCFStreamVarName = "grpc_cfstream";
[userInfo addEntriesFromDictionary:error.userInfo];
}
userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
- // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
- // called before this one, so an error might end up with trailers but no headers. We
- // shouldn't call finishWithError until ater both blocks are called. It is also when
- // this is done that we can provide a merged view of response headers and trailers in a
- // thread-safe way.
- if (strongSelf.responseHeaders) {
- userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
- }
+ // Since gRPC core does not guarantee the headers block being called before this block,
+ // responseHeaders might be nil.
+ userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
}
- [strongSelf maybeFinishWithError:error];
+ [strongSelf finishWithError:error];
+ strongSelf->_requestWriter.state = GRXWriterStateFinished;
}
}];
- // Now that the RPC has been initiated, request writes can start.
- @synchronized(_requestWriter) {
- [_requestWriter startWithWriteable:self];
- }
}
#pragma mark GRXWriter implementation
+// Lock acquired inside startWithWriteable:
- (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
- _responseWriteable =
- [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
+ @synchronized(self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
- _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host
- serverName:_serverName
- path:_path
- timeout:_timeout];
- NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?");
+ _responseWriteable =
+ [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
+
+ GRPCPooledChannel *channel =
+ [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
+ _wrappedCall = [channel wrappedCallWithPath:_path
+ completionQueue:[GRPCCompletionQueue completionQueue]
+ callOptions:_callOptions];
+
+ if (_wrappedCall == nil) {
+ [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeUnavailable
+ userInfo:@{
+ NSLocalizedDescriptionKey :
+ @"Failed to create call or channel."
+ }]];
+ return;
+ }
- [self sendHeaders:_requestHeaders];
- [self invokeCall];
+ [self sendHeaders];
+ [self invokeCall];
- // Connectivity monitor is not required for CFStream
- char *enableCFStream = getenv(kCFStreamVarName);
- if (enableCFStream == nil || enableCFStream[0] != '1') {
- [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
+ // Connectivity monitor is not required for CFStream
+ char *enableCFStream = getenv(kCFStreamVarName);
+ if (enableCFStream == nil || enableCFStream[0] != '1') {
+ [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
+ }
}
+
+ // Now that the RPC has been initiated, request writes can start.
+ [_requestWriter startWithWriteable:self];
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ id<GRPCAuthorizationProtocol> tokenProvider = nil;
@synchronized(self) {
_state = GRXWriterStateStarted;
- }
- // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
- // This makes RPCs in which the call isn't externally retained possible (as long as it is started
- // before being autoreleased).
- // Care is taken not to retain self strongly in any of the blocks used in this implementation, so
- // that the life of the instance is determined by this retain cycle.
- _retainSelf = self;
+ // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
+ // This makes RPCs in which the call isn't externally retained possible (as long as it is
+ // started before being autoreleased). Care is taken not to retain self strongly in any of the
+ // blocks used in this implementation, so that the life of the instance is determined by this
+ // retain cycle.
+ _retainSelf = self;
+
+ if (_callOptions == nil) {
+ GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
+ if (_serverName.length != 0) {
+ callOptions.serverAuthority = _serverName;
+ }
+ if (_timeout > 0) {
+ callOptions.timeout = _timeout;
+ }
+ uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
+ if (callFlags != 0) {
+ if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
+ _callSafety = GRPCCallSafetyIdempotentRequest;
+ } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
+ _callSafety = GRPCCallSafetyCacheableRequest;
+ }
+ }
+
+ id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
+ if (tokenProvider != nil) {
+ callOptions.authTokenProvider = tokenProvider;
+ }
+ _callOptions = callOptions;
+ }
+
+ NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
+ @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
- if (self.tokenProvider != nil) {
- self.isWaitingForToken = YES;
+ tokenProvider = _callOptions.authTokenProvider;
+ }
+
+ if (tokenProvider != nil) {
__weak typeof(self) weakSelf = self;
- [self.tokenProvider getTokenWithHandler:^(NSString *token) {
- typeof(self) strongSelf = weakSelf;
- if (strongSelf && strongSelf.isWaitingForToken) {
- if (token) {
- NSString *t = [kBearerPrefix stringByAppendingString:token];
- strongSelf.requestHeaders[kAuthorizationHeader] = t;
+ [tokenProvider getTokenWithHandler:^(NSString *token) {
+ __strong typeof(self) strongSelf = weakSelf;
+ if (strongSelf) {
+ @synchronized(strongSelf) {
+ if (strongSelf->_state == GRXWriterStateNotStarted) {
+ if (token) {
+ strongSelf->_fetchedOauth2AccessToken = [token copy];
+ }
+ }
}
[strongSelf startCallWithWriteable:writeable];
- strongSelf.isWaitingForToken = NO;
}
}];
} else {
@@ -536,16 +938,21 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)connectivityChanged:(NSNotification *)note {
- // Cancel underlying call upon this notification
+ // Cancel underlying call upon this notification.
+
+ // Retain because connectivity manager only keeps weak reference to GRPCCall.
__strong GRPCCall *strongSelf = self;
if (strongSelf) {
- [self cancelCall];
- [self
- maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeUnavailable
- userInfo:@{
- NSLocalizedDescriptionKey : @"Connectivity lost."
- }]];
+ @synchronized(strongSelf) {
+ [_wrappedCall cancel];
+ [strongSelf
+ finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeUnavailable
+ userInfo:@{
+ NSLocalizedDescriptionKey : @"Connectivity lost."
+ }]];
+ }
+ strongSelf->_requestWriter.state = GRXWriterStateFinished;
}
}