diff options
Diffstat (limited to 'src/objective-c/GRPCClient/GRPCCall.m')
-rw-r--r-- | src/objective-c/GRPCClient/GRPCCall.m | 204 |
1 files changed, 80 insertions, 124 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 85d141aa09..44393f6b99 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -33,9 +33,9 @@ #import "GRPCCall.h" -#import <RxLibrary/GRXConcurrentWriteable.h> #include <grpc/grpc.h> #include <grpc/support/time.h> +#import <RxLibrary/GRXConcurrentWriteable.h> #import "private/GRPCConnectivityMonitor.h" #import "private/GRPCHost.h" @@ -45,11 +45,11 @@ #import "private/NSDictionary+GRPC.h" #import "private/NSError+GRPC.h" -NSString *const kGRPCHeadersKey = @"io.grpc.HeadersKey"; -NSString *const kGRPCTrailersKey = @"io.grpc.TrailersKey"; +NSString * const kGRPCHeadersKey = @"io.grpc.HeadersKey"; +NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; static NSMutableDictionary *callFlags; -@interface GRPCCall ()<GRXWriteable> +@interface GRPCCall () <GRXWriteable> // Make them read-write. @property(atomic, strong) NSDictionary *responseHeaders; @property(atomic, strong) NSDictionary *responseTrailers; @@ -85,21 +85,17 @@ static NSMutableDictionary *callFlags; // correct ordering. GRXConcurrentWriteable *_responseWriteable; - // The network thread wants the requestWriter to resume (when the server is - // ready for more input), or to stop (on errors), concurrently with user - // threads that want to start it, pause it or stop it. Because a writer isn't - // thread-safe, we'll synchronize those operations on it. - // We don't use a dispatch queue for that purpose, because the writer can call - // writeValue: or writesFinishedWithError: on this GRPCCall as part of those - // operations. We want to be able to pause the writer immediately on - // writeValue:, so we need our locking to be recursive. + // The network thread wants the requestWriter to resume (when the server is ready for more input), + // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop + // it. Because a writer isn't thread-safe, we'll synchronize those operations on it. + // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or + // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to + // pause the writer immediately on writeValue:, so we need our locking to be recursive. GRXWriter *_requestWriter; // To create a retain cycle when a call is started, up until it finishes. See - // |startWithWriteable:| and |finishWithError:|. This saves users from having - // to retain a - // reference to the call object if all they're interested in is the handler - // being executed when + // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a + // reference to the call object if all they're interested in is the handler being executed when // the response arrives. GRPCCall *_retainSelf; @@ -108,16 +104,13 @@ static NSMutableDictionary *callFlags; @synthesize state = _state; -// TODO(jcanizales): If grpc_init is idempotent, this should be changed from -// load to initialize. +// TODO(jcanizales): If grpc_init is idempotent, this should be changed from load to initialize. + (void)load { grpc_init(); callFlags = [NSMutableDictionary dictionary]; } -+ (void)setCallSafety:(GRPCCallSafety)callSafety - host:(NSString *)host - path:(NSString *)path { ++ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path { NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; switch (callSafety) { case GRPCCallSafetyDefault: @@ -148,8 +141,7 @@ static NSMutableDictionary *callFlags; path:(NSString *)path requestsWriter:(GRXWriter *)requestWriter { if (!host || !path) { - [NSException raise:NSInvalidArgumentException - format:@"Neither host nor path can be nil."]; + [NSException raise:NSInvalidArgumentException format:@"Neither host nor path can be nil."]; } if (requestWriter.state != GRXWriterStateNotStarted) { [NSException raise:NSInvalidArgumentException @@ -199,10 +191,7 @@ static NSMutableDictionary *callFlags; - (void)cancel { [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeCancelled - userInfo:@{ - NSLocalizedDescriptionKey : - @"Canceled by app" - }]]; + userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]]; [self cancelCall]; } @@ -217,18 +206,15 @@ static NSMutableDictionary *callFlags; // Only called from the call queue. // The handler will be called from the network queue. -- (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler { +- (void)startReadWithHandler:(void(^)(grpc_byte_buffer *))handler { // TODO(jcanizales): Add error handlers for async failures - [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc] - initWithHandler:handler] ]]; + [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMessage alloc] initWithHandler:handler]]]; } // Called initially from the network queue once response headers are received, -// then "recursively" from the responseWriteable queue after each response from -// the +// then "recursively" from the responseWriteable queue after each response from the // server has been written. -// If the call is currently paused, this is a noop. Restarting the call will -// invoke this +// If the call is currently paused, this is a noop. Restarting the call will invoke this // method. // TODO(jcanizales): Rename to readResponseIfNotPaused. - (void)startNextRead { @@ -251,23 +237,15 @@ static NSMutableDictionary *callFlags; // don't want to throw, because the app shouldn't crash for a behavior // that's on the hands of any server to have. Instead we finish and ask // the server to cancel. - [weakSelf - finishWithError:[NSError - errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeResourceExhausted - userInfo:@{ - NSLocalizedDescriptionKey : - @"Client does not have enough " - @"memory to hold the server " - @"response." - }]]; + [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeResourceExhausted + userInfo:@{NSLocalizedDescriptionKey: @"Client does not have enough memory to hold the server response."}]]; [weakSelf cancelCall]; return; } - [weakWriteable enqueueValue:data - completionHandler:^{ - [weakSelf startNextRead]; - }]; + [weakWriteable enqueueValue:data completionHandler:^{ + [weakSelf startNextRead]; + }]; }]; }); } @@ -276,22 +254,19 @@ static NSMutableDictionary *callFlags; - (void)sendHeaders:(NSDictionary *)headers { // TODO(jcanizales): Add error handlers for async failures - [_wrappedCall startBatchWithOperations:@[ - [[GRPCOpSendMetadata alloc] - initWithMetadata:headers - flags:[GRPCCall callFlagsForHost:_host path:_path] - handler:nil] - ]]; + [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers + flags:[GRPCCall callFlagsForHost:_host path:_path] + handler:nil]]]; } #pragma mark GRXWriteable implementation // Only called from the call queue. The error handler will be called from the // network queue if the write didn't succeed. -- (void)writeMessage:(NSData *)message - withErrorHandler:(void (^)())errorHandler { +- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler { + __weak GRPCCall *weakSelf = self; - void (^resumingHandler)(void) = ^{ + void(^resumingHandler)(void) = ^{ // Resume the request writer. GRPCCall *strongSelf = weakSelf; if (strongSelf) { @@ -300,9 +275,8 @@ static NSMutableDictionary *callFlags; } } }; - [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendMessage alloc] - initWithMessage:message - handler:resumingHandler] ] + [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message + handler:resumingHandler]] errorHandler:errorHandler]; } @@ -317,20 +291,18 @@ static NSMutableDictionary *callFlags; __weak GRPCCall *weakSelf = self; dispatch_async(_callQueue, ^{ - [weakSelf writeMessage:value - withErrorHandler:^{ - [weakSelf - finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + [weakSelf writeMessage:value withErrorHandler:^{ + [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeInternal userInfo:nil]]; - }]; + }]; }); } // Only called from the call queue. The error handler will be called from the // network queue if the requests stream couldn't be closed successfully. - (void)finishRequestWithErrorHandler:(void (^)())errorHandler { - [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ] + [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]] errorHandler:errorHandler]; } @@ -351,19 +323,17 @@ static NSMutableDictionary *callFlags; #pragma mark Invoke -// Both handlers will eventually be called, from the network queue. Writes can -// start immediately after this. +// Both handlers will eventually be called, from the network queue. Writes can start immediately +// after this. // The first one (headersHandler), when the response headers are received. // The second one (completionHandler), whenever the RPC finishes for any reason. -- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler - completionHandler: - (void (^)(NSError *, NSDictionary *))completionHandler { +- (void)invokeCallWithHeadersHandler:(void(^)(NSDictionary *))headersHandler + completionHandler:(void(^)(NSError *, NSDictionary *))completionHandler { // TODO(jcanizales): Add error handlers for async failures - [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] - initWithHandler:headersHandler] ]]; - [_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] - initWithHandler:completionHandler] ]]; + [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMetadata alloc] + initWithHandler:headersHandler]]]; + [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvStatus alloc] + initWithHandler:completionHandler]]]; } - (void)invokeCall { @@ -371,31 +341,27 @@ static NSMutableDictionary *callFlags; // Response headers received. self.responseHeaders = headers; [self startNextRead]; - } - completionHandler:^(NSError *error, NSDictionary *trailers) { - self.responseTrailers = trailers; - - if (error) { - NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; - if (error.userInfo) { - [userInfo addEntriesFromDictionary:error.userInfo]; - } - userInfo[kGRPCTrailersKey] = self.responseTrailers; - // TODO(jcanizales): The C gRPC library doesn't guarantee that the - // headers block will be called before this one, so an error might end - // up with trailers but no headers. We shouldn't call finishWithError - // until ater both blocks are called. It is also when this is done - // that we can provide a merged view of response headers and trailers - // in a thread-safe way. - if (self.responseHeaders) { - userInfo[kGRPCHeadersKey] = self.responseHeaders; - } - error = [NSError errorWithDomain:error.domain - code:error.code - userInfo:userInfo]; - } - [self finishWithError:error]; - }]; + } completionHandler:^(NSError *error, NSDictionary *trailers) { + self.responseTrailers = trailers; + + if (error) { + NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; + if (error.userInfo) { + [userInfo addEntriesFromDictionary:error.userInfo]; + } + userInfo[kGRPCTrailersKey] = self.responseTrailers; + // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be + // called before this one, so an error might end up with trailers but no headers. We + // shouldn't call finishWithError until ater both blocks are called. It is also when this is + // done that we can provide a merged view of response headers and trailers in a thread-safe + // way. + if (self.responseHeaders) { + userInfo[kGRPCHeadersKey] = self.responseHeaders; + } + error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; + } + [self finishWithError:error]; + }]; // Now that the RPC has been initiated, request writes can start. @synchronized(_requestWriter) { [_requestWriter startWithWriteable:self]; @@ -409,16 +375,14 @@ static NSMutableDictionary *callFlags; _state = GRXWriterStateStarted; } - // Create a retain cycle so that this instance lives until the RPC finishes - // (or is cancelled). This makes RPCs in which the call isn't externally - // retained possible (as long as it is started before being autoreleased). - // Care is taken not to retain self strongly in any of the blocks used in this - // implementation, so that the life of the instance is determined by this - // retain cycle. + // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). + // This makes RPCs in which the call isn't externally retained possible (as long as it is started + // before being autoreleased). + // Care is taken not to retain self strongly in any of the blocks used in this implementation, so + // that the life of the instance is determined by this retain cycle. _retainSelf = self; - _responseWriteable = - [[GRXConcurrentWriteable alloc] initWithWriteable:writeable]; + _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable]; _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host path:_path]; NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?"); @@ -427,37 +391,29 @@ static NSMutableDictionary *callFlags; [self invokeCall]; // TODO(jcanizales): Extract this logic somewhere common. - NSString *host = - [NSURL URLWithString:[@"https://" stringByAppendingString:_host]].host; + NSString *host = [NSURL URLWithString:[@"https://" stringByAppendingString:_host]].host; if (!host) { // TODO(jcanizales): Check this on init. - [NSException raise:NSInvalidArgumentException - format:@"host of %@ is nil", _host]; + [NSException raise:NSInvalidArgumentException format:@"host of %@ is nil", _host]; } __weak typeof(self) weakSelf = self; _connectivityMonitor = [GRPCConnectivityMonitor monitorWithHost:host]; void (^handler)() = ^{ typeof(self) strongSelf = weakSelf; if (strongSelf) { - [strongSelf - finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeUnavailable - userInfo:@{ - NSLocalizedDescriptionKey : - @"Connectivity lost." - }]]; + [strongSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ NSLocalizedDescriptionKey : @"Connectivity lost." }]]; } }; [_connectivityMonitor handleLossWithHandler:handler - wifiStatusChangeHandler:^{ - }]; + wifiStatusChangeHandler:nil]; } - (void)setState:(GRXWriterState)newState { @synchronized(self) { // Manual transitions are only allowed from the started or paused states. - if (_state == GRXWriterStateNotStarted || - _state == GRXWriterStateFinished) { + if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { return; } |