aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Muxi Yan <muxi@users.noreply.github.com>2017-04-21 11:06:34 -0700
committerGravatar GitHub <noreply@github.com>2017-04-21 11:06:34 -0700
commit42a95a21e31e3c604e195a9d3ac50af4941436bb (patch)
tree4831505ce23d5d15d2dd4c6edd5c0192f8c34e36 /src
parent7748faf251b255d4e6f2dcd6e40c092136abfebd (diff)
parentfd00263ae91d93e7dea28c6b7f229201e46463b5 (diff)
Merge pull request #10505 from muxi/move-parsing-queue
Move response message processing to a user-specified queue
Diffstat (limited to 'src')
-rw-r--r--src/compiler/objective_c_plugin.cc1
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.h7
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m16
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.h4
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.m10
-rw-r--r--src/objective-c/tests/GRPCClientTests.m55
6 files changed, 89 insertions, 4 deletions
diff --git a/src/compiler/objective_c_plugin.cc b/src/compiler/objective_c_plugin.cc
index 8de0997ebe..5178115e44 100644
--- a/src/compiler/objective_c_plugin.cc
+++ b/src/compiler/objective_c_plugin.cc
@@ -68,6 +68,7 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
::grpc::string imports = ::grpc::string("#import \"") + file_name +
".pbobjc.h\"\n\n"
"#import <ProtoRPC/ProtoService.h>\n"
+ "#import <ProtoRPC/ProtoRPC.h>\n"
"#import <RxLibrary/GRXWriteable.h>\n"
"#import <RxLibrary/GRXWriter.h>\n";
diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h
index 7645bb1d34..5e9324c445 100644
--- a/src/objective-c/GRPCClient/GRPCCall.h
+++ b/src/objective-c/GRPCClient/GRPCCall.h
@@ -253,6 +253,13 @@ extern id const kGRPCTrailersKey;
*/
+ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path;
+/**
+ * Set the dispatch queue to be used for callbacks.
+ *
+ * This configuration is only effective before the call starts.
+ */
+- (void)setResponseDispatchQueue:(dispatch_queue_t)queue;
+
// TODO(jcanizales): Let specify a deadline. As a category of GRXWriter?
@end
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 051138ea4d..f9d13fea57 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -113,6 +113,10 @@ static NSMutableDictionary *callFlags;
// the SendClose op is added.
BOOL _unaryCall;
NSMutableArray *_unaryOpBatch;
+
+ // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
+ // queue
+ dispatch_queue_t _responseQueue;
}
@synthesize state = _state;
@@ -175,10 +179,19 @@ static NSMutableDictionary *callFlags;
_unaryCall = YES;
_unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch];
}
+
+ _responseQueue = dispatch_get_main_queue();
}
return self;
}
+- (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
+ if (_state != GRXWriterStateNotStarted) {
+ return;
+ }
+ _responseQueue = queue;
+}
+
#pragma mark Finish
- (void)finishWithError:(NSError *)errorOrNil {
@@ -424,7 +437,8 @@ static NSMutableDictionary *callFlags;
// that the life of the instance is determined by this retain cycle.
_retainSelf = self;
- _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
+ _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable
+ dispatchQueue:_responseQueue];
_wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host path:_path];
NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?");
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
index b2775f98b5..07004f6d4d 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
@@ -53,7 +53,9 @@
* The GRXWriteable instance is retained until writesFinishedWithError: is sent to it, and released
* after that.
*/
-- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable NS_DESIGNATED_INITIALIZER;
+- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable
+ dispatchQueue:(dispatch_queue_t)queue NS_DESIGNATED_INITIALIZER;
+- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable;
/**
* Enqueues writeValue: to be sent to the writeable in the main thread.
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
index 08bd079aea..88aa7a7282 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
@@ -51,14 +51,20 @@
}
// Designated initializer
-- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable {
+- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable
+ dispatchQueue:(dispatch_queue_t)queue {
if (self = [super init]) {
- _writeableQueue = dispatch_get_main_queue();
+ _writeableQueue = queue;
_writeable = writeable;
}
return self;
}
+- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable {
+ return [self initWithWriteable:writeable
+ dispatchQueue:dispatch_get_main_queue()];
+}
+
- (void)enqueueValue:(id)value completionHandler:(void (^)())handler {
dispatch_async(_writeableQueue, ^{
// We're racing a possible cancellation performed by another thread. To turn all already-
diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m
index 76c15003f6..e36f5c3ee9 100644
--- a/src/objective-c/tests/GRPCClientTests.m
+++ b/src/objective-c/tests/GRPCClientTests.m
@@ -353,4 +353,59 @@ static GRPCProtoMethod *kUnaryCallMethod;
[self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
}
+- (void)testAlternateDispatchQueue {
+ const int32_t kPayloadSize = 100;
+ RMTSimpleRequest *request = [RMTSimpleRequest message];
+ request.responseSize = kPayloadSize;
+
+ __weak XCTestExpectation *expectation1 = [self expectationWithDescription:@"AlternateDispatchQueue1"];
+
+ // Use default (main) dispatch queue
+ NSString *main_queue_label = [NSString stringWithUTF8String:dispatch_queue_get_label(dispatch_get_main_queue())];
+
+ GRXWriter *requestsWriter1 = [GRXWriter writerWithValue:[request data]];
+
+ GRPCCall *call1 = [[GRPCCall alloc] initWithHost:kHostAddress
+ path:kUnaryCallMethod.HTTPPath
+ requestsWriter:requestsWriter1];
+
+ id<GRXWriteable> responsesWriteable1 = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
+ NSString *label = [NSString stringWithUTF8String:dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL)];
+ XCTAssert([label isEqualToString:main_queue_label]);
+
+ [expectation1 fulfill];
+ } completionHandler:^(NSError *errorOrNil) {
+ }];
+
+ [call1 startWithWriteable:responsesWriteable1];
+
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
+
+ // Use a custom queue
+ __weak XCTestExpectation *expectation2 = [self expectationWithDescription:@"AlternateDispatchQueue2"];
+
+ NSString *queue_label = @"test.queue1";
+ dispatch_queue_t queue = dispatch_queue_create([queue_label UTF8String], DISPATCH_QUEUE_SERIAL);
+
+ GRXWriter *requestsWriter2 = [GRXWriter writerWithValue:[request data]];
+
+ GRPCCall *call2 = [[GRPCCall alloc] initWithHost:kHostAddress
+ path:kUnaryCallMethod.HTTPPath
+ requestsWriter:requestsWriter2];
+
+ [call2 setResponseDispatchQueue:queue];
+
+ id<GRXWriteable> responsesWriteable2 = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
+ NSString *label = [NSString stringWithUTF8String:dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL)];
+ XCTAssert([label isEqualToString:queue_label]);
+
+ [expectation2 fulfill];
+ } completionHandler:^(NSError *errorOrNil) {
+ }];
+
+ [call2 startWithWriteable:responsesWriteable2];
+
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
+}
+
@end