diff options
author | Muxi Yan <mxyan@google.com> | 2018-11-15 16:59:31 -0800 |
---|---|---|
committer | Muxi Yan <mxyan@google.com> | 2018-11-15 16:59:31 -0800 |
commit | 512c01bc574ab09129608bc501d80f06503c79a3 (patch) | |
tree | 13a4d410f1142acaae036770dad00dc86101c7f9 /src/objective-c/GRPCClient/GRPCCall.m | |
parent | ffeb0e682393e5b76d732e0b034c71f3837b9e57 (diff) |
Polish threading + something else
Diffstat (limited to 'src/objective-c/GRPCClient/GRPCCall.m')
-rw-r--r-- | src/objective-c/GRPCClient/GRPCCall.m | 241 |
1 files changed, 137 insertions, 104 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index f340e2bd8c..ede16b42e8 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -155,7 +155,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Fallback on earlier versions _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); } - dispatch_set_target_queue(responseHandler.dispatchQueue, _dispatchQueue); + dispatch_set_target_queue(_dispatchQueue ,responseHandler.dispatchQueue); _started = NO; _canceled = NO; _finished = NO; @@ -171,161 +171,194 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)start { - dispatch_async(_dispatchQueue, ^{ - NSAssert(!self->_started, @"Call already started."); - NSAssert(!self->_canceled, @"Call already canceled."); - if (self->_started) { + GRPCCall *call = nil; + @synchronized (self) { + NSAssert(!_started, @"Call already started."); + NSAssert(!_canceled, @"Call already canceled."); + if (_started) { return; } - if (self->_canceled) { + if (_canceled) { return; } - self->_started = YES; - if (!self->_callOptions) { - self->_callOptions = [[GRPCCallOptions alloc] init]; + _started = YES; + if (!_callOptions) { + _callOptions = [[GRPCCallOptions alloc] init]; } - self->_call = [[GRPCCall alloc] initWithHost:self->_requestOptions.host - path:self->_requestOptions.path - callSafety:self->_requestOptions.safety - requestsWriter:self->_pipe - callOptions:self->_callOptions]; - if (self->_callOptions.initialMetadata) { - [self->_call.requestHeaders addEntriesFromDictionary:self->_callOptions.initialMetadata]; + _call = [[GRPCCall alloc] initWithHost:_requestOptions.host + path:_requestOptions.path + callSafety:_requestOptions.safety + requestsWriter:_pipe + callOptions:_callOptions]; + if (_callOptions.initialMetadata) { + [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata]; } + call = _call; + } - void (^valueHandler)(id value) = ^(id value) { - dispatch_async(self->_dispatchQueue, ^{ - if (self->_handler) { - if (!self->_initialMetadataPublished) { - self->_initialMetadataPublished = YES; - [self issueInitialMetadata:self->_call.responseHeaders]; - } - if (value) { - [self issueMessage:value]; - } + void (^valueHandler)(id value) = ^(id value) { + @synchronized (self) { + if (self->_handler) { + if (!self->_initialMetadataPublished) { + self->_initialMetadataPublished = YES; + [self issueInitialMetadata:self->_call.responseHeaders]; } - }); - }; - void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) { - dispatch_async(self->_dispatchQueue, ^{ - if (self->_handler) { - if (!self->_initialMetadataPublished) { - self->_initialMetadataPublished = YES; - [self issueInitialMetadata:self->_call.responseHeaders]; - } - [self issueClosedWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil]; - - // Clean up _handler so that no more responses are reported to the handler. - self->_handler = nil; + if (value) { + [self issueMessage:value]; } - // Clearing _call must happen *after* dispatching close in order to get trailing - // metadata from _call. - if (self->_call) { - // Clean up the request writers. This should have no effect to _call since its - // response writeable is already nullified. - [self->_pipe writesFinishedWithError:nil]; - self->_call = nil; - self->_pipe = nil; + } + } + }; + void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) { + @synchronized(self) { + if (self->_handler) { + if (!self->_initialMetadataPublished) { + self->_initialMetadataPublished = YES; + [self issueInitialMetadata:self->_call.responseHeaders]; } - }); - }; - id<GRXWriteable> responseWriteable = - [[GRXWriteable alloc] initWithValueHandler:valueHandler - completionHandler:completionHandler]; - [self->_call startWithWriteable:responseWriteable]; - }); + [self issueClosedWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil]; + + } + // Clearing _call must happen *after* dispatching close in order to get trailing + // metadata from _call. + if (self->_call) { + // Clean up the request writers. This should have no effect to _call since its + // response writeable is already nullified. + [self->_pipe writesFinishedWithError:nil]; + self->_call = nil; + self->_pipe = nil; + } + } + }; + id<GRXWriteable> responseWriteable = + [[GRXWriteable alloc] initWithValueHandler:valueHandler + completionHandler:completionHandler]; + [call startWithWriteable:responseWriteable]; } - (void)cancel { - dispatch_async(_dispatchQueue, ^{ - NSAssert(!self->_canceled, @"Call already canceled."); - if (self->_canceled) { + GRPCCall *call = nil; + @synchronized (self) { + if (_canceled) { return; } - self->_canceled = YES; - if (self->_call) { - [self->_call cancel]; - self->_call = nil; - self->_pipe = nil; - } - if (self->_handler) { - id<GRPCResponseHandler> handler = self->_handler; - dispatch_async(handler.dispatchQueue, ^{ - if ([handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) { - [handler closedWithTrailingMetadata:nil - error:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeCancelled - userInfo:@{ - NSLocalizedDescriptionKey : - @"Canceled by app" - }]]; + _canceled = YES; + + call = _call; + _call = nil; + _pipe = nil; + + if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) { + dispatch_async(_dispatchQueue, ^{ + // Copy to local so that block is freed after cancellation completes. + id<GRPCResponseHandler> copiedHandler = nil; + @synchronized (self) { + copiedHandler = self->_handler; + self->_handler = nil; } - }); - // Clean up _handler so that no more responses are reported to the handler. - self->_handler = nil; + [copiedHandler closedWithTrailingMetadata:nil + error:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeCancelled + userInfo:@{ + NSLocalizedDescriptionKey : + @"Canceled by app" + }]]; + }); } - }); + } + [call cancel]; } - (void)writeData:(NSData *)data { - dispatch_async(_dispatchQueue, ^{ - NSAssert(!self->_canceled, @"Call arleady canceled."); - NSAssert(!self->_finished, @"Call is half-closed before sending data."); - if (self->_canceled) { + GRXBufferedPipe *pipe = nil; + @synchronized(self) { + NSAssert(!_canceled, @"Call arleady canceled."); + NSAssert(!_finished, @"Call is half-closed before sending data."); + if (_canceled) { return; } - if (self->_finished) { + if (_finished) { return; } - if (self->_pipe) { - [self->_pipe writeValue:data]; + if (_pipe) { + pipe = _pipe; } - }); + } + [pipe writeValue:data]; } - (void)finish { - dispatch_async(_dispatchQueue, ^{ - NSAssert(self->_started, @"Call not started."); - NSAssert(!self->_canceled, @"Call arleady canceled."); - NSAssert(!self->_finished, @"Call already half-closed."); - if (!self->_started) { + GRXBufferedPipe *pipe = nil; + @synchronized(self) { + NSAssert(_started, @"Call not started."); + NSAssert(!_canceled, @"Call arleady canceled."); + NSAssert(!_finished, @"Call already half-closed."); + if (!_started) { return; } - if (self->_canceled) { + if (_canceled) { return; } - if (self->_finished) { + if (_finished) { return; } - if (self->_pipe) { - [self->_pipe writesFinishedWithError:nil]; + if (_pipe) { + pipe = _pipe; + _pipe = nil; } - self->_pipe = nil; - self->_finished = YES; - }); + _finished = YES; + } + [pipe writesFinishedWithError:nil]; } - (void)issueInitialMetadata:(NSDictionary *)initialMetadata { - if (initialMetadata != nil && [_handler respondsToSelector:@selector(receivedInitialMetadata:)]) { - [_handler receivedInitialMetadata:initialMetadata]; + @synchronized (self) { + if (initialMetadata != nil && [_handler respondsToSelector:@selector(receivedInitialMetadata:)]) { + dispatch_async(_dispatchQueue, ^{ + id<GRPCResponseHandler> handler = nil; + @synchronized (self) { + handler = self->_handler; + } + [handler receivedInitialMetadata:initialMetadata]; + }); + } } } - (void)issueMessage:(id)message { - if (message != nil && [_handler respondsToSelector:@selector(receivedRawMessage:)]) { - [_handler receivedRawMessage:message]; + @synchronized (self) { + if (message != nil && [_handler respondsToSelector:@selector(receivedRawMessage:)]) { + dispatch_async(_dispatchQueue, ^{ + id<GRPCResponseHandler> handler = nil; + @synchronized (self) { + handler = self->_handler; + } + [handler receivedRawMessage:message]; + }); + } } } - (void)issueClosedWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error { - if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) { - [_handler closedWithTrailingMetadata:trailingMetadata error:error]; + @synchronized (self) { + if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) { + dispatch_async(_dispatchQueue, ^{ + id<GRPCResponseHandler> handler = nil; + @synchronized (self) { + handler = self->_handler; + // Clean up _handler so that no more responses are reported to the handler. + self->_handler = nil; + } + [handler closedWithTrailingMetadata:trailingMetadata + error:error]; + }); + } } } |