diff options
Diffstat (limited to 'src/objective-c/GRPCClient/GRPCCall.m')
-rw-r--r-- | src/objective-c/GRPCClient/GRPCCall.m | 58 |
1 files changed, 50 insertions, 8 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 44393f6b99..051138ea4d 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -36,6 +36,7 @@ #include <grpc/grpc.h> #include <grpc/support/time.h> #import <RxLibrary/GRXConcurrentWriteable.h> +#import <RxLibrary/GRXImmediateSingleWriter.h> #import "private/GRPCConnectivityMonitor.h" #import "private/GRPCHost.h" @@ -45,6 +46,11 @@ #import "private/NSDictionary+GRPC.h" #import "private/NSError+GRPC.h" +// At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA, +// SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE, +// and RECV_STATUS_ON_CLIENT. +NSInteger kMaxClientBatch = 6; + NSString * const kGRPCHeadersKey = @"io.grpc.HeadersKey"; NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; static NSMutableDictionary *callFlags; @@ -100,6 +106,13 @@ static NSMutableDictionary *callFlags; GRPCCall *_retainSelf; GRPCRequestHeaders *_requestHeaders; + + // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type + // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core + // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when + // the SendClose op is added. + BOOL _unaryCall; + NSMutableArray *_unaryOpBatch; } @synthesize state = _state; @@ -157,6 +170,11 @@ static NSMutableDictionary *callFlags; _requestWriter = requestWriter; _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self]; + + if ([requestWriter isKindOfClass:[GRXImmediateSingleWriter class]]) { + _unaryCall = YES; + _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch]; + } } return self; } @@ -165,6 +183,9 @@ static NSMutableDictionary *callFlags; - (void)finishWithError:(NSError *)errorOrNil { @synchronized(self) { + if (_state == GRXWriterStateFinished) { + return; + } _state = GRXWriterStateFinished; } @@ -254,15 +275,22 @@ static NSMutableDictionary *callFlags; - (void)sendHeaders:(NSDictionary *)headers { // TODO(jcanizales): Add error handlers for async failures - [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers - flags:[GRPCCall callFlagsForHost:_host path:_path] - handler:nil]]]; + GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] initWithMetadata:headers + flags:[GRPCCall callFlagsForHost:_host path:_path] + handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA + if (!_unaryCall) { + [_wrappedCall startBatchWithOperations:@[op]]; + } else { + [_unaryOpBatch addObject:op]; + } } #pragma mark GRXWriteable implementation // Only called from the call queue. The error handler will be called from the // network queue if the write didn't succeed. +// If the call is a unary call, parameter \a errorHandler will be ignored and +// the error handler of GRPCOpSendClose will be executed in case of error. - (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler { __weak GRPCCall *weakSelf = self; @@ -275,9 +303,17 @@ static NSMutableDictionary *callFlags; } } }; - [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message - handler:resumingHandler]] - errorHandler:errorHandler]; + + GRPCOpSendMessage *op = [[GRPCOpSendMessage alloc] initWithMessage:message + handler:resumingHandler]; + if (!_unaryCall) { + [_wrappedCall startBatchWithOperations:@[op] + errorHandler:errorHandler]; + } else { + // Ignored errorHandler since it is the same as the one for GRPCOpSendClose. + // TODO (mxyan): unify the error handlers of all Ops into a single closure. + [_unaryOpBatch addObject:op]; + } } - (void)writeValue:(id)value { @@ -302,8 +338,14 @@ static NSMutableDictionary *callFlags; // Only called from the call queue. The error handler will be called from the // network queue if the requests stream couldn't be closed successfully. - (void)finishRequestWithErrorHandler:(void (^)())errorHandler { - [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]] - errorHandler:errorHandler]; + if (!_unaryCall) { + [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]] + errorHandler:errorHandler]; + } else { + [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]]; + [_wrappedCall startBatchWithOperations:_unaryOpBatch + errorHandler:errorHandler]; + } } - (void)writesFinishedWithError:(NSError *)errorOrNil { |