aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jorge Canizales <jcanizales@google.com>2015-08-07 23:11:29 -0700
committerGravatar Jorge Canizales <jcanizales@google.com>2015-08-07 23:11:29 -0700
commit238ad7819ffcd650692baff57d15b577503fd448 (patch)
treec0e7cadc82cf9a54ddffda49973cae4d368989a5
parent67ce098ccf6c7d5b64a85523af1c96e04e46312a (diff)
Eliminate race in GRPCCall’s operation of the requests writer
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m35
1 files changed, 26 insertions, 9 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 5f7d74bca8..16abd0fadf 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -74,6 +74,13 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// all. This wrapper over our actual writeable ensures thread-safety and
// correct ordering.
GRXConcurrentWriteable *_responseWriteable;
+
+ // The network thread wants the requestWriter to resume (when the server is ready for more input),
+ // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
+ // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
+ // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
+ // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
+ // pause the writer immediately on writeValue:, so we need our locking to be recursive.
GRXWriter *_requestWriter;
// To create a retain cycle when a call is started, up until it finishes. See
@@ -139,8 +146,10 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
_self = nil;
// If there were still request messages coming, stop them.
- _requestWriter.state = GRXWriterStateFinished;
- _requestWriter = nil;
+ @synchronized(_requestWriter) {
+ _requestWriter.state = GRXWriterStateFinished;
+ _requestWriter = nil;
+ }
if (errorOrNil) {
[_responseWriteable cancelWithError:errorOrNil];
@@ -240,12 +249,14 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
- strongSelf->_requestWriter.state = GRXWriterStateStarted;
+ @synchronized(_requestWriter) {
+ strongSelf->_requestWriter.state = GRXWriterStateStarted;
+ }
}
};
- [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc]
- initWithMessage:message
- handler:resumingHandler]] errorHandler:errorHandler];
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
+ handler:resumingHandler]]
+ errorHandler:errorHandler];
}
- (void)writeValue:(id)value {
@@ -253,7 +264,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
- _requestWriter.state = GRXWriterStatePaused;
+ @synchronized(_requestWriter) {
+ _requestWriter.state = GRXWriterStatePaused;
+ }
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{
@@ -273,7 +286,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _requestWriter = nil;
+ @synchronized(_requestWriter) {
+ _requestWriter = nil;
+ }
if (errorOrNil) {
[self cancel];
} else {
@@ -327,7 +342,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
}];
// Now that the RPC has been initiated, request writes can start.
- [_requestWriter startWithWriteable:self];
+ @synchronized(_requestWriter) {
+ [_requestWriter startWithWriteable:self];
+ }
}
#pragma mark GRXWriter implementation