aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/GRPCClient/GRPCCall.m
diff options
context:
space:
mode:
Diffstat (limited to 'src/objective-c/GRPCClient/GRPCCall.m')
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m58
1 files changed, 50 insertions, 8 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 44393f6b99..051138ea4d 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -36,6 +36,7 @@
#include <grpc/grpc.h>
#include <grpc/support/time.h>
#import <RxLibrary/GRXConcurrentWriteable.h>
+#import <RxLibrary/GRXImmediateSingleWriter.h>
#import "private/GRPCConnectivityMonitor.h"
#import "private/GRPCHost.h"
@@ -45,6 +46,11 @@
#import "private/NSDictionary+GRPC.h"
#import "private/NSError+GRPC.h"
+// At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA,
+// SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE,
+// and RECV_STATUS_ON_CLIENT.
+NSInteger kMaxClientBatch = 6;
+
NSString * const kGRPCHeadersKey = @"io.grpc.HeadersKey";
NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
static NSMutableDictionary *callFlags;
@@ -100,6 +106,13 @@ static NSMutableDictionary *callFlags;
GRPCCall *_retainSelf;
GRPCRequestHeaders *_requestHeaders;
+
+ // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type
+ // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core
+ // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when
+ // the SendClose op is added.
+ BOOL _unaryCall;
+ NSMutableArray *_unaryOpBatch;
}
@synthesize state = _state;
@@ -157,6 +170,11 @@ static NSMutableDictionary *callFlags;
_requestWriter = requestWriter;
_requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
+
+ if ([requestWriter isKindOfClass:[GRXImmediateSingleWriter class]]) {
+ _unaryCall = YES;
+ _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch];
+ }
}
return self;
}
@@ -165,6 +183,9 @@ static NSMutableDictionary *callFlags;
- (void)finishWithError:(NSError *)errorOrNil {
@synchronized(self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
_state = GRXWriterStateFinished;
}
@@ -254,15 +275,22 @@ static NSMutableDictionary *callFlags;
- (void)sendHeaders:(NSDictionary *)headers {
// TODO(jcanizales): Add error handlers for async failures
- [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers
- flags:[GRPCCall callFlagsForHost:_host path:_path]
- handler:nil]]];
+ GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] initWithMetadata:headers
+ flags:[GRPCCall callFlagsForHost:_host path:_path]
+ handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA
+ if (!_unaryCall) {
+ [_wrappedCall startBatchWithOperations:@[op]];
+ } else {
+ [_unaryOpBatch addObject:op];
+ }
}
#pragma mark GRXWriteable implementation
// Only called from the call queue. The error handler will be called from the
// network queue if the write didn't succeed.
+// If the call is a unary call, parameter \a errorHandler will be ignored and
+// the error handler of GRPCOpSendClose will be executed in case of error.
- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler {
__weak GRPCCall *weakSelf = self;
@@ -275,9 +303,17 @@ static NSMutableDictionary *callFlags;
}
}
};
- [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
- handler:resumingHandler]]
- errorHandler:errorHandler];
+
+ GRPCOpSendMessage *op = [[GRPCOpSendMessage alloc] initWithMessage:message
+ handler:resumingHandler];
+ if (!_unaryCall) {
+ [_wrappedCall startBatchWithOperations:@[op]
+ errorHandler:errorHandler];
+ } else {
+ // Ignored errorHandler since it is the same as the one for GRPCOpSendClose.
+ // TODO (mxyan): unify the error handlers of all Ops into a single closure.
+ [_unaryOpBatch addObject:op];
+ }
}
- (void)writeValue:(id)value {
@@ -302,8 +338,14 @@ static NSMutableDictionary *callFlags;
// Only called from the call queue. The error handler will be called from the
// network queue if the requests stream couldn't be closed successfully.
- (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
- [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]]
- errorHandler:errorHandler];
+ if (!_unaryCall) {
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]]
+ errorHandler:errorHandler];
+ } else {
+ [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]];
+ [_wrappedCall startBatchWithOperations:_unaryOpBatch
+ errorHandler:errorHandler];
+ }
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {