aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/GRPCClient/GRPCCall.m
diff options
context:
space:
mode:
Diffstat (limited to 'src/objective-c/GRPCClient/GRPCCall.m')
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m204
1 files changed, 80 insertions, 124 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 85d141aa09..44393f6b99 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -33,9 +33,9 @@
#import "GRPCCall.h"
-#import <RxLibrary/GRXConcurrentWriteable.h>
#include <grpc/grpc.h>
#include <grpc/support/time.h>
+#import <RxLibrary/GRXConcurrentWriteable.h>
#import "private/GRPCConnectivityMonitor.h"
#import "private/GRPCHost.h"
@@ -45,11 +45,11 @@
#import "private/NSDictionary+GRPC.h"
#import "private/NSError+GRPC.h"
-NSString *const kGRPCHeadersKey = @"io.grpc.HeadersKey";
-NSString *const kGRPCTrailersKey = @"io.grpc.TrailersKey";
+NSString * const kGRPCHeadersKey = @"io.grpc.HeadersKey";
+NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
static NSMutableDictionary *callFlags;
-@interface GRPCCall ()<GRXWriteable>
+@interface GRPCCall () <GRXWriteable>
// Make them read-write.
@property(atomic, strong) NSDictionary *responseHeaders;
@property(atomic, strong) NSDictionary *responseTrailers;
@@ -85,21 +85,17 @@ static NSMutableDictionary *callFlags;
// correct ordering.
GRXConcurrentWriteable *_responseWriteable;
- // The network thread wants the requestWriter to resume (when the server is
- // ready for more input), or to stop (on errors), concurrently with user
- // threads that want to start it, pause it or stop it. Because a writer isn't
- // thread-safe, we'll synchronize those operations on it.
- // We don't use a dispatch queue for that purpose, because the writer can call
- // writeValue: or writesFinishedWithError: on this GRPCCall as part of those
- // operations. We want to be able to pause the writer immediately on
- // writeValue:, so we need our locking to be recursive.
+ // The network thread wants the requestWriter to resume (when the server is ready for more input),
+ // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
+ // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
+ // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
+ // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
+ // pause the writer immediately on writeValue:, so we need our locking to be recursive.
GRXWriter *_requestWriter;
// To create a retain cycle when a call is started, up until it finishes. See
- // |startWithWriteable:| and |finishWithError:|. This saves users from having
- // to retain a
- // reference to the call object if all they're interested in is the handler
- // being executed when
+ // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
+ // reference to the call object if all they're interested in is the handler being executed when
// the response arrives.
GRPCCall *_retainSelf;
@@ -108,16 +104,13 @@ static NSMutableDictionary *callFlags;
@synthesize state = _state;
-// TODO(jcanizales): If grpc_init is idempotent, this should be changed from
-// load to initialize.
+// TODO(jcanizales): If grpc_init is idempotent, this should be changed from load to initialize.
+ (void)load {
grpc_init();
callFlags = [NSMutableDictionary dictionary];
}
-+ (void)setCallSafety:(GRPCCallSafety)callSafety
- host:(NSString *)host
- path:(NSString *)path {
++ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path {
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
switch (callSafety) {
case GRPCCallSafetyDefault:
@@ -148,8 +141,7 @@ static NSMutableDictionary *callFlags;
path:(NSString *)path
requestsWriter:(GRXWriter *)requestWriter {
if (!host || !path) {
- [NSException raise:NSInvalidArgumentException
- format:@"Neither host nor path can be nil."];
+ [NSException raise:NSInvalidArgumentException format:@"Neither host nor path can be nil."];
}
if (requestWriter.state != GRXWriterStateNotStarted) {
[NSException raise:NSInvalidArgumentException
@@ -199,10 +191,7 @@ static NSMutableDictionary *callFlags;
- (void)cancel {
[self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeCancelled
- userInfo:@{
- NSLocalizedDescriptionKey :
- @"Canceled by app"
- }]];
+ userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]];
[self cancelCall];
}
@@ -217,18 +206,15 @@ static NSMutableDictionary *callFlags;
// Only called from the call queue.
// The handler will be called from the network queue.
-- (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler {
+- (void)startReadWithHandler:(void(^)(grpc_byte_buffer *))handler {
// TODO(jcanizales): Add error handlers for async failures
- [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc]
- initWithHandler:handler] ]];
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMessage alloc] initWithHandler:handler]]];
}
// Called initially from the network queue once response headers are received,
-// then "recursively" from the responseWriteable queue after each response from
-// the
+// then "recursively" from the responseWriteable queue after each response from the
// server has been written.
-// If the call is currently paused, this is a noop. Restarting the call will
-// invoke this
+// If the call is currently paused, this is a noop. Restarting the call will invoke this
// method.
// TODO(jcanizales): Rename to readResponseIfNotPaused.
- (void)startNextRead {
@@ -251,23 +237,15 @@ static NSMutableDictionary *callFlags;
// don't want to throw, because the app shouldn't crash for a behavior
// that's on the hands of any server to have. Instead we finish and ask
// the server to cancel.
- [weakSelf
- finishWithError:[NSError
- errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeResourceExhausted
- userInfo:@{
- NSLocalizedDescriptionKey :
- @"Client does not have enough "
- @"memory to hold the server "
- @"response."
- }]];
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeResourceExhausted
+ userInfo:@{NSLocalizedDescriptionKey: @"Client does not have enough memory to hold the server response."}]];
[weakSelf cancelCall];
return;
}
- [weakWriteable enqueueValue:data
- completionHandler:^{
- [weakSelf startNextRead];
- }];
+ [weakWriteable enqueueValue:data completionHandler:^{
+ [weakSelf startNextRead];
+ }];
}];
});
}
@@ -276,22 +254,19 @@ static NSMutableDictionary *callFlags;
- (void)sendHeaders:(NSDictionary *)headers {
// TODO(jcanizales): Add error handlers for async failures
- [_wrappedCall startBatchWithOperations:@[
- [[GRPCOpSendMetadata alloc]
- initWithMetadata:headers
- flags:[GRPCCall callFlagsForHost:_host path:_path]
- handler:nil]
- ]];
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers
+ flags:[GRPCCall callFlagsForHost:_host path:_path]
+ handler:nil]]];
}
#pragma mark GRXWriteable implementation
// Only called from the call queue. The error handler will be called from the
// network queue if the write didn't succeed.
-- (void)writeMessage:(NSData *)message
- withErrorHandler:(void (^)())errorHandler {
+- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler {
+
__weak GRPCCall *weakSelf = self;
- void (^resumingHandler)(void) = ^{
+ void(^resumingHandler)(void) = ^{
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
@@ -300,9 +275,8 @@ static NSMutableDictionary *callFlags;
}
}
};
- [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendMessage alloc]
- initWithMessage:message
- handler:resumingHandler] ]
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
+ handler:resumingHandler]]
errorHandler:errorHandler];
}
@@ -317,20 +291,18 @@ static NSMutableDictionary *callFlags;
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{
- [weakSelf writeMessage:value
- withErrorHandler:^{
- [weakSelf
- finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ [weakSelf writeMessage:value withErrorHandler:^{
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeInternal
userInfo:nil]];
- }];
+ }];
});
}
// Only called from the call queue. The error handler will be called from the
// network queue if the requests stream couldn't be closed successfully.
- (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
- [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ]
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]]
errorHandler:errorHandler];
}
@@ -351,19 +323,17 @@ static NSMutableDictionary *callFlags;
#pragma mark Invoke
-// Both handlers will eventually be called, from the network queue. Writes can
-// start immediately after this.
+// Both handlers will eventually be called, from the network queue. Writes can start immediately
+// after this.
// The first one (headersHandler), when the response headers are received.
// The second one (completionHandler), whenever the RPC finishes for any reason.
-- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
- completionHandler:
- (void (^)(NSError *, NSDictionary *))completionHandler {
+- (void)invokeCallWithHeadersHandler:(void(^)(NSDictionary *))headersHandler
+ completionHandler:(void(^)(NSError *, NSDictionary *))completionHandler {
// TODO(jcanizales): Add error handlers for async failures
- [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc]
- initWithHandler:headersHandler] ]];
- [_wrappedCall
- startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc]
- initWithHandler:completionHandler] ]];
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMetadata alloc]
+ initWithHandler:headersHandler]]];
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvStatus alloc]
+ initWithHandler:completionHandler]]];
}
- (void)invokeCall {
@@ -371,31 +341,27 @@ static NSMutableDictionary *callFlags;
// Response headers received.
self.responseHeaders = headers;
[self startNextRead];
- }
- completionHandler:^(NSError *error, NSDictionary *trailers) {
- self.responseTrailers = trailers;
-
- if (error) {
- NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
- if (error.userInfo) {
- [userInfo addEntriesFromDictionary:error.userInfo];
- }
- userInfo[kGRPCTrailersKey] = self.responseTrailers;
- // TODO(jcanizales): The C gRPC library doesn't guarantee that the
- // headers block will be called before this one, so an error might end
- // up with trailers but no headers. We shouldn't call finishWithError
- // until ater both blocks are called. It is also when this is done
- // that we can provide a merged view of response headers and trailers
- // in a thread-safe way.
- if (self.responseHeaders) {
- userInfo[kGRPCHeadersKey] = self.responseHeaders;
- }
- error = [NSError errorWithDomain:error.domain
- code:error.code
- userInfo:userInfo];
- }
- [self finishWithError:error];
- }];
+ } completionHandler:^(NSError *error, NSDictionary *trailers) {
+ self.responseTrailers = trailers;
+
+ if (error) {
+ NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
+ if (error.userInfo) {
+ [userInfo addEntriesFromDictionary:error.userInfo];
+ }
+ userInfo[kGRPCTrailersKey] = self.responseTrailers;
+ // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
+ // called before this one, so an error might end up with trailers but no headers. We
+ // shouldn't call finishWithError until ater both blocks are called. It is also when this is
+ // done that we can provide a merged view of response headers and trailers in a thread-safe
+ // way.
+ if (self.responseHeaders) {
+ userInfo[kGRPCHeadersKey] = self.responseHeaders;
+ }
+ error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
+ }
+ [self finishWithError:error];
+ }];
// Now that the RPC has been initiated, request writes can start.
@synchronized(_requestWriter) {
[_requestWriter startWithWriteable:self];
@@ -409,16 +375,14 @@ static NSMutableDictionary *callFlags;
_state = GRXWriterStateStarted;
}
- // Create a retain cycle so that this instance lives until the RPC finishes
- // (or is cancelled). This makes RPCs in which the call isn't externally
- // retained possible (as long as it is started before being autoreleased).
- // Care is taken not to retain self strongly in any of the blocks used in this
- // implementation, so that the life of the instance is determined by this
- // retain cycle.
+ // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
+ // This makes RPCs in which the call isn't externally retained possible (as long as it is started
+ // before being autoreleased).
+ // Care is taken not to retain self strongly in any of the blocks used in this implementation, so
+ // that the life of the instance is determined by this retain cycle.
_retainSelf = self;
- _responseWriteable =
- [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
+ _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
_wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host path:_path];
NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?");
@@ -427,37 +391,29 @@ static NSMutableDictionary *callFlags;
[self invokeCall];
// TODO(jcanizales): Extract this logic somewhere common.
- NSString *host =
- [NSURL URLWithString:[@"https://" stringByAppendingString:_host]].host;
+ NSString *host = [NSURL URLWithString:[@"https://" stringByAppendingString:_host]].host;
if (!host) {
// TODO(jcanizales): Check this on init.
- [NSException raise:NSInvalidArgumentException
- format:@"host of %@ is nil", _host];
+ [NSException raise:NSInvalidArgumentException format:@"host of %@ is nil", _host];
}
__weak typeof(self) weakSelf = self;
_connectivityMonitor = [GRPCConnectivityMonitor monitorWithHost:host];
void (^handler)() = ^{
typeof(self) strongSelf = weakSelf;
if (strongSelf) {
- [strongSelf
- finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeUnavailable
- userInfo:@{
- NSLocalizedDescriptionKey :
- @"Connectivity lost."
- }]];
+ [strongSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeUnavailable
+ userInfo:@{ NSLocalizedDescriptionKey : @"Connectivity lost." }]];
}
};
[_connectivityMonitor handleLossWithHandler:handler
- wifiStatusChangeHandler:^{
- }];
+ wifiStatusChangeHandler:nil];
}
- (void)setState:(GRXWriterState)newState {
@synchronized(self) {
// Manual transitions are only allowed from the started or paused states.
- if (_state == GRXWriterStateNotStarted ||
- _state == GRXWriterStateFinished) {
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
return;
}