aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/GRPCClient/GRPCCall.m
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2018-11-15 16:59:31 -0800
committerGravatar Muxi Yan <mxyan@google.com>2018-11-15 16:59:31 -0800
commit512c01bc574ab09129608bc501d80f06503c79a3 (patch)
tree13a4d410f1142acaae036770dad00dc86101c7f9 /src/objective-c/GRPCClient/GRPCCall.m
parentffeb0e682393e5b76d732e0b034c71f3837b9e57 (diff)
Polish threading + something else
Diffstat (limited to 'src/objective-c/GRPCClient/GRPCCall.m')
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m241
1 files changed, 137 insertions, 104 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index f340e2bd8c..ede16b42e8 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -155,7 +155,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
// Fallback on earlier versions
_dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
}
- dispatch_set_target_queue(responseHandler.dispatchQueue, _dispatchQueue);
+ dispatch_set_target_queue(_dispatchQueue ,responseHandler.dispatchQueue);
_started = NO;
_canceled = NO;
_finished = NO;
@@ -171,161 +171,194 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)start {
- dispatch_async(_dispatchQueue, ^{
- NSAssert(!self->_started, @"Call already started.");
- NSAssert(!self->_canceled, @"Call already canceled.");
- if (self->_started) {
+ GRPCCall *call = nil;
+ @synchronized (self) {
+ NSAssert(!_started, @"Call already started.");
+ NSAssert(!_canceled, @"Call already canceled.");
+ if (_started) {
return;
}
- if (self->_canceled) {
+ if (_canceled) {
return;
}
- self->_started = YES;
- if (!self->_callOptions) {
- self->_callOptions = [[GRPCCallOptions alloc] init];
+ _started = YES;
+ if (!_callOptions) {
+ _callOptions = [[GRPCCallOptions alloc] init];
}
- self->_call = [[GRPCCall alloc] initWithHost:self->_requestOptions.host
- path:self->_requestOptions.path
- callSafety:self->_requestOptions.safety
- requestsWriter:self->_pipe
- callOptions:self->_callOptions];
- if (self->_callOptions.initialMetadata) {
- [self->_call.requestHeaders addEntriesFromDictionary:self->_callOptions.initialMetadata];
+ _call = [[GRPCCall alloc] initWithHost:_requestOptions.host
+ path:_requestOptions.path
+ callSafety:_requestOptions.safety
+ requestsWriter:_pipe
+ callOptions:_callOptions];
+ if (_callOptions.initialMetadata) {
+ [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata];
}
+ call = _call;
+ }
- void (^valueHandler)(id value) = ^(id value) {
- dispatch_async(self->_dispatchQueue, ^{
- if (self->_handler) {
- if (!self->_initialMetadataPublished) {
- self->_initialMetadataPublished = YES;
- [self issueInitialMetadata:self->_call.responseHeaders];
- }
- if (value) {
- [self issueMessage:value];
- }
+ void (^valueHandler)(id value) = ^(id value) {
+ @synchronized (self) {
+ if (self->_handler) {
+ if (!self->_initialMetadataPublished) {
+ self->_initialMetadataPublished = YES;
+ [self issueInitialMetadata:self->_call.responseHeaders];
}
- });
- };
- void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) {
- dispatch_async(self->_dispatchQueue, ^{
- if (self->_handler) {
- if (!self->_initialMetadataPublished) {
- self->_initialMetadataPublished = YES;
- [self issueInitialMetadata:self->_call.responseHeaders];
- }
- [self issueClosedWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil];
-
- // Clean up _handler so that no more responses are reported to the handler.
- self->_handler = nil;
+ if (value) {
+ [self issueMessage:value];
}
- // Clearing _call must happen *after* dispatching close in order to get trailing
- // metadata from _call.
- if (self->_call) {
- // Clean up the request writers. This should have no effect to _call since its
- // response writeable is already nullified.
- [self->_pipe writesFinishedWithError:nil];
- self->_call = nil;
- self->_pipe = nil;
+ }
+ }
+ };
+ void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) {
+ @synchronized(self) {
+ if (self->_handler) {
+ if (!self->_initialMetadataPublished) {
+ self->_initialMetadataPublished = YES;
+ [self issueInitialMetadata:self->_call.responseHeaders];
}
- });
- };
- id<GRXWriteable> responseWriteable =
- [[GRXWriteable alloc] initWithValueHandler:valueHandler
- completionHandler:completionHandler];
- [self->_call startWithWriteable:responseWriteable];
- });
+ [self issueClosedWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil];
+
+ }
+ // Clearing _call must happen *after* dispatching close in order to get trailing
+ // metadata from _call.
+ if (self->_call) {
+ // Clean up the request writers. This should have no effect to _call since its
+ // response writeable is already nullified.
+ [self->_pipe writesFinishedWithError:nil];
+ self->_call = nil;
+ self->_pipe = nil;
+ }
+ }
+ };
+ id<GRXWriteable> responseWriteable =
+ [[GRXWriteable alloc] initWithValueHandler:valueHandler
+ completionHandler:completionHandler];
+ [call startWithWriteable:responseWriteable];
}
- (void)cancel {
- dispatch_async(_dispatchQueue, ^{
- NSAssert(!self->_canceled, @"Call already canceled.");
- if (self->_canceled) {
+ GRPCCall *call = nil;
+ @synchronized (self) {
+ if (_canceled) {
return;
}
- self->_canceled = YES;
- if (self->_call) {
- [self->_call cancel];
- self->_call = nil;
- self->_pipe = nil;
- }
- if (self->_handler) {
- id<GRPCResponseHandler> handler = self->_handler;
- dispatch_async(handler.dispatchQueue, ^{
- if ([handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
- [handler closedWithTrailingMetadata:nil
- error:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeCancelled
- userInfo:@{
- NSLocalizedDescriptionKey :
- @"Canceled by app"
- }]];
+ _canceled = YES;
+
+ call = _call;
+ _call = nil;
+ _pipe = nil;
+
+ if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
+ dispatch_async(_dispatchQueue, ^{
+ // Copy to local so that block is freed after cancellation completes.
+ id<GRPCResponseHandler> copiedHandler = nil;
+ @synchronized (self) {
+ copiedHandler = self->_handler;
+ self->_handler = nil;
}
- });
- // Clean up _handler so that no more responses are reported to the handler.
- self->_handler = nil;
+ [copiedHandler closedWithTrailingMetadata:nil
+ error:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeCancelled
+ userInfo:@{
+ NSLocalizedDescriptionKey :
+ @"Canceled by app"
+ }]];
+ });
}
- });
+ }
+ [call cancel];
}
- (void)writeData:(NSData *)data {
- dispatch_async(_dispatchQueue, ^{
- NSAssert(!self->_canceled, @"Call arleady canceled.");
- NSAssert(!self->_finished, @"Call is half-closed before sending data.");
- if (self->_canceled) {
+ GRXBufferedPipe *pipe = nil;
+ @synchronized(self) {
+ NSAssert(!_canceled, @"Call arleady canceled.");
+ NSAssert(!_finished, @"Call is half-closed before sending data.");
+ if (_canceled) {
return;
}
- if (self->_finished) {
+ if (_finished) {
return;
}
- if (self->_pipe) {
- [self->_pipe writeValue:data];
+ if (_pipe) {
+ pipe = _pipe;
}
- });
+ }
+ [pipe writeValue:data];
}
- (void)finish {
- dispatch_async(_dispatchQueue, ^{
- NSAssert(self->_started, @"Call not started.");
- NSAssert(!self->_canceled, @"Call arleady canceled.");
- NSAssert(!self->_finished, @"Call already half-closed.");
- if (!self->_started) {
+ GRXBufferedPipe *pipe = nil;
+ @synchronized(self) {
+ NSAssert(_started, @"Call not started.");
+ NSAssert(!_canceled, @"Call arleady canceled.");
+ NSAssert(!_finished, @"Call already half-closed.");
+ if (!_started) {
return;
}
- if (self->_canceled) {
+ if (_canceled) {
return;
}
- if (self->_finished) {
+ if (_finished) {
return;
}
- if (self->_pipe) {
- [self->_pipe writesFinishedWithError:nil];
+ if (_pipe) {
+ pipe = _pipe;
+ _pipe = nil;
}
- self->_pipe = nil;
- self->_finished = YES;
- });
+ _finished = YES;
+ }
+ [pipe writesFinishedWithError:nil];
}
- (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
- if (initialMetadata != nil && [_handler respondsToSelector:@selector(receivedInitialMetadata:)]) {
- [_handler receivedInitialMetadata:initialMetadata];
+ @synchronized (self) {
+ if (initialMetadata != nil && [_handler respondsToSelector:@selector(receivedInitialMetadata:)]) {
+ dispatch_async(_dispatchQueue, ^{
+ id<GRPCResponseHandler> handler = nil;
+ @synchronized (self) {
+ handler = self->_handler;
+ }
+ [handler receivedInitialMetadata:initialMetadata];
+ });
+ }
}
}
- (void)issueMessage:(id)message {
- if (message != nil && [_handler respondsToSelector:@selector(receivedRawMessage:)]) {
- [_handler receivedRawMessage:message];
+ @synchronized (self) {
+ if (message != nil && [_handler respondsToSelector:@selector(receivedRawMessage:)]) {
+ dispatch_async(_dispatchQueue, ^{
+ id<GRPCResponseHandler> handler = nil;
+ @synchronized (self) {
+ handler = self->_handler;
+ }
+ [handler receivedRawMessage:message];
+ });
+ }
}
}
- (void)issueClosedWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
- if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
- [_handler closedWithTrailingMetadata:trailingMetadata error:error];
+ @synchronized (self) {
+ if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
+ dispatch_async(_dispatchQueue, ^{
+ id<GRPCResponseHandler> handler = nil;
+ @synchronized (self) {
+ handler = self->_handler;
+ // Clean up _handler so that no more responses are reported to the handler.
+ self->_handler = nil;
+ }
+ [handler closedWithTrailingMetadata:trailingMetadata
+ error:error];
+ });
+ }
}
}