diff options
Diffstat (limited to 'src/objective-c/GRPCClient/GRPCCall.m')
-rw-r--r-- | src/objective-c/GRPCClient/GRPCCall.m | 140 |
1 files changed, 74 insertions, 66 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 501114dea0..75dda9df85 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -20,10 +20,10 @@ #import "GRPCCall+OAuth2.h" -#include <grpc/grpc.h> -#include <grpc/support/time.h> #import <RxLibrary/GRXConcurrentWriteable.h> #import <RxLibrary/GRXImmediateSingleWriter.h> +#include <grpc/grpc.h> +#include <grpc/support/time.h> #import "private/GRPCConnectivityMonitor.h" #import "private/GRPCHost.h" @@ -38,14 +38,14 @@ // and RECV_STATUS_ON_CLIENT. NSInteger kMaxClientBatch = 6; -NSString * const kGRPCHeadersKey = @"io.grpc.HeadersKey"; -NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; +NSString *const kGRPCHeadersKey = @"io.grpc.HeadersKey"; +NSString *const kGRPCTrailersKey = @"io.grpc.TrailersKey"; static NSMutableDictionary *callFlags; -static NSString * const kAuthorizationHeader = @"authorization"; -static NSString * const kBearerPrefix = @"Bearer "; +static NSString *const kAuthorizationHeader = @"authorization"; +static NSString *const kBearerPrefix = @"Bearer "; -@interface GRPCCall () <GRXWriteable> +@interface GRPCCall ()<GRXWriteable> // Make them read-write. @property(atomic, strong) NSDictionary *responseHeaders; @property(atomic, strong) NSDictionary *responseTrailers; @@ -219,9 +219,11 @@ static NSString * const kBearerPrefix = @"Bearer "; } - (void)cancel { - [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeCancelled - userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]]; + [self + maybeFinishWithError:[NSError + errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeCancelled + userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; if (!self.isWaitingForToken) { [self cancelCall]; @@ -254,9 +256,9 @@ static NSString * const kBearerPrefix = @"Bearer "; // Only called from the call queue. // The handler will be called from the network queue. -- (void)startReadWithHandler:(void(^)(grpc_byte_buffer *))handler { +- (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler { // TODO(jcanizales): Add error handlers for async failures - [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMessage alloc] initWithHandler:handler]]]; + [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc] initWithHandler:handler] ]]; } // Called initially from the network queue once response headers are received, @@ -287,15 +289,21 @@ static NSString * const kBearerPrefix = @"Bearer "; // don't want to throw, because the app shouldn't crash for a behavior // that's on the hands of any server to have. Instead we finish and ask // the server to cancel. - [strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeResourceExhausted - userInfo:@{NSLocalizedDescriptionKey: @"Client does not have enough memory to hold the server response."}]]; + [strongSelf + maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeResourceExhausted + userInfo:@{ + NSLocalizedDescriptionKey : + @"Client does not have enough memory to " + @"hold the server response." + }]]; [strongSelf cancelCall]; return; } - [strongWriteable enqueueValue:data completionHandler:^{ - [strongSelf startNextRead]; - }]; + [strongWriteable enqueueValue:data + completionHandler:^{ + [strongSelf startNextRead]; + }]; }]; }); } @@ -304,11 +312,12 @@ static NSString * const kBearerPrefix = @"Bearer "; - (void)sendHeaders:(NSDictionary *)headers { // TODO(jcanizales): Add error handlers for async failures - GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] initWithMetadata:headers - flags:[GRPCCall callFlagsForHost:_host path:_path] - handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA + GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] + initWithMetadata:headers + flags:[GRPCCall callFlagsForHost:_host path:_path] + handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA if (!_unaryCall) { - [_wrappedCall startBatchWithOperations:@[op]]; + [_wrappedCall startBatchWithOperations:@[ op ]]; } else { [_unaryOpBatch addObject:op]; } @@ -321,9 +330,8 @@ static NSString * const kBearerPrefix = @"Bearer "; // If the call is a unary call, parameter \a errorHandler will be ignored and // the error handler of GRPCOpSendClose will be executed in case of error. - (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler { - __weak GRPCCall *weakSelf = self; - void(^resumingHandler)(void) = ^{ + void (^resumingHandler)(void) = ^{ // Resume the request writer. GRPCCall *strongSelf = weakSelf; if (strongSelf) { @@ -333,11 +341,10 @@ static NSString * const kBearerPrefix = @"Bearer "; } }; - GRPCOpSendMessage *op = [[GRPCOpSendMessage alloc] initWithMessage:message - handler:resumingHandler]; + GRPCOpSendMessage *op = + [[GRPCOpSendMessage alloc] initWithMessage:message handler:resumingHandler]; if (!_unaryCall) { - [_wrappedCall startBatchWithOperations:@[op] - errorHandler:errorHandler]; + [_wrappedCall startBatchWithOperations:@[ op ] errorHandler:errorHandler]; } else { // Ignored errorHandler since it is the same as the one for GRPCOpSendClose. // TODO (mxyan): unify the error handlers of all Ops into a single closure. @@ -364,12 +371,11 @@ static NSString * const kBearerPrefix = @"Bearer "; // network queue if the requests stream couldn't be closed successfully. - (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler { if (!_unaryCall) { - [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]] + [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ] errorHandler:errorHandler]; } else { [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]]; - [_wrappedCall startBatchWithOperations:_unaryOpBatch - errorHandler:errorHandler]; + [_wrappedCall startBatchWithOperations:_unaryOpBatch errorHandler:errorHandler]; } } @@ -390,13 +396,13 @@ static NSString * const kBearerPrefix = @"Bearer "; // after this. // The first one (headersHandler), when the response headers are received. // The second one (completionHandler), whenever the RPC finishes for any reason. -- (void)invokeCallWithHeadersHandler:(void(^)(NSDictionary *))headersHandler - completionHandler:(void(^)(NSError *, NSDictionary *))completionHandler { +- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler + completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler { // TODO(jcanizales): Add error handlers for async failures - [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMetadata alloc] - initWithHandler:headersHandler]]]; - [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvStatus alloc] - initWithHandler:completionHandler]]]; + [_wrappedCall + startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; + [_wrappedCall + startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; } - (void)invokeCall { @@ -408,30 +414,31 @@ static NSString * const kBearerPrefix = @"Bearer "; strongSelf.responseHeaders = headers; [strongSelf startNextRead]; } - } completionHandler:^(NSError *error, NSDictionary *trailers) { - __strong GRPCCall *strongSelf = weakSelf; - if (strongSelf) { - strongSelf.responseTrailers = trailers; - - if (error) { - NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; - if (error.userInfo) { - [userInfo addEntriesFromDictionary:error.userInfo]; - } - userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers; - // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be - // called before this one, so an error might end up with trailers but no headers. We - // shouldn't call finishWithError until ater both blocks are called. It is also when this is - // done that we can provide a merged view of response headers and trailers in a thread-safe - // way. - if (strongSelf.responseHeaders) { - userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; + } + completionHandler:^(NSError *error, NSDictionary *trailers) { + __strong GRPCCall *strongSelf = weakSelf; + if (strongSelf) { + strongSelf.responseTrailers = trailers; + + if (error) { + NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; + if (error.userInfo) { + [userInfo addEntriesFromDictionary:error.userInfo]; + } + userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers; + // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be + // called before this one, so an error might end up with trailers but no headers. We + // shouldn't call finishWithError until ater both blocks are called. It is also when + // this is done that we can provide a merged view of response headers and trailers in a + // thread-safe way. + if (strongSelf.responseHeaders) { + userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; + } + error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; + } + [strongSelf maybeFinishWithError:error]; } - error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; - } - [strongSelf maybeFinishWithError:error]; - } - }]; + }]; // Now that the RPC has been initiated, request writes can start. @synchronized(_requestWriter) { [_requestWriter startWithWriteable:self]; @@ -441,8 +448,8 @@ static NSString * const kBearerPrefix = @"Bearer "; #pragma mark GRXWriter implementation - (void)startCallWithWriteable:(id<GRXWriteable>)writeable { - _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable - dispatchQueue:_responseQueue]; + _responseWriteable = + [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host serverName:_serverName @@ -453,8 +460,7 @@ static NSString * const kBearerPrefix = @"Bearer "; [self sendHeaders:_requestHeaders]; [self invokeCall]; - [GRPCConnectivityMonitor registerObserver:self - selector:@selector(connectivityChanged:)]; + [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)]; } - (void)startWithWriteable:(id<GRXWriteable>)writeable { @@ -472,7 +478,7 @@ static NSString * const kBearerPrefix = @"Bearer "; if (self.tokenProvider != nil) { self.isWaitingForToken = YES; __weak typeof(self) weakSelf = self; - [self.tokenProvider getTokenWithHandler:^(NSString *token){ + [self.tokenProvider getTokenWithHandler:^(NSString *token) { typeof(self) strongSelf = weakSelf; if (strongSelf && strongSelf.isWaitingForToken) { if (token) { @@ -521,7 +527,9 @@ static NSString * const kBearerPrefix = @"Bearer "; - (void)connectivityChanged:(NSNotification *)note { [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeUnavailable - userInfo:@{ NSLocalizedDescriptionKey : @"Connectivity lost." }]]; + userInfo:@{ + NSLocalizedDescriptionKey : @"Connectivity lost." + }]]; // Cancel underlying call upon this notification [self cancelCall]; } |