aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Muxi Yan <muxi@users.noreply.github.com>2019-01-10 09:43:54 -0800
committerGravatar GitHub <noreply@github.com>2019-01-10 09:43:54 -0800
commit412c44992b187e9d749bb040b78fd74d32513d61 (patch)
tree4c2eaf030d389e00eaa31b256b1c6479dc62d54f
parent12bca9fa665d49df7b3c11e00126e94f03278ae6 (diff)
parent121e04bc1e44bb684d464be6788f1c0a065b1116 (diff)
Merge pull request #17578 from muxi/grpccall-safety
Make gRPC ObjC thread safety right
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m371
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.h3
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m21
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.h21
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.m102
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.h6
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.m55
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateSingleWriter.h2
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateSingleWriter.m30
-rw-r--r--src/objective-c/RxLibrary/GRXWriter.h4
10 files changed, 318 insertions, 297 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 83c6edc6e3..e8fae09a1f 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -56,7 +56,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
// Make them read-write.
@property(atomic, strong) NSDictionary *responseHeaders;
@property(atomic, strong) NSDictionary *responseTrailers;
-@property(atomic) BOOL isWaitingForToken;
- (instancetype)initWithHost:(NSString *)host
path:(NSString *)path
@@ -425,9 +424,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
// queue
dispatch_queue_t _responseQueue;
- // Whether the call is finished. If it is, should not call finishWithError again.
- BOOL _finished;
-
// The OAuth2 token fetched from a token provider.
NSString *_fetchedOauth2AccessToken;
}
@@ -448,24 +444,28 @@ const char *kCFStreamVarName = "grpc_cfstream";
return;
}
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
- switch (callSafety) {
- case GRPCCallSafetyDefault:
- callFlags[hostAndPath] = @0;
- break;
- case GRPCCallSafetyIdempotentRequest:
- callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
- break;
- case GRPCCallSafetyCacheableRequest:
- callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
- break;
- default:
- break;
+ @synchronized(callFlags) {
+ switch (callSafety) {
+ case GRPCCallSafetyDefault:
+ callFlags[hostAndPath] = @0;
+ break;
+ case GRPCCallSafetyIdempotentRequest:
+ callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
+ break;
+ case GRPCCallSafetyCacheableRequest:
+ callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
+ break;
+ default:
+ break;
+ }
}
}
+ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
- return [callFlags[hostAndPath] intValue];
+ @synchronized(callFlags) {
+ return [callFlags[hostAndPath] intValue];
+ }
}
// Designated initializer
@@ -506,7 +506,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
_callOptions = [callOptions copy];
// Serial queue to invoke the non-reentrant methods of the grpc_call object.
- _callQueue = dispatch_queue_create("io.grpc.call", NULL);
+ _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL);
_requestWriter = requestWriter;
@@ -523,66 +523,48 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
- if (_state != GRXWriterStateNotStarted) {
- return;
+ @synchronized(self) {
+ if (_state != GRXWriterStateNotStarted) {
+ return;
+ }
+ _responseQueue = queue;
}
- _responseQueue = queue;
}
#pragma mark Finish
+// This function should support being called within a @synchronized(self) block in another function
+// Should not manipulate _requestWriter for deadlock prevention.
- (void)finishWithError:(NSError *)errorOrNil {
@synchronized(self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
_state = GRXWriterStateFinished;
- }
-
- // If there were still request messages coming, stop them.
- @synchronized(_requestWriter) {
- _requestWriter.state = GRXWriterStateFinished;
- }
-
- if (errorOrNil) {
- [_responseWriteable cancelWithError:errorOrNil];
- } else {
- [_responseWriteable enqueueSuccessfulCompletion];
- }
-
- [GRPCConnectivityMonitor unregisterObserver:self];
- // If the call isn't retained anywhere else, it can be deallocated now.
- _retainSelf = nil;
-}
+ if (errorOrNil) {
+ [_responseWriteable cancelWithError:errorOrNil];
+ } else {
+ [_responseWriteable enqueueSuccessfulCompletion];
+ }
-- (void)cancelCall {
- // Can be called from any thread, any number of times.
- @synchronized(self) {
- [_wrappedCall cancel];
+ // If the call isn't retained anywhere else, it can be deallocated now.
+ _retainSelf = nil;
}
}
- (void)cancel {
@synchronized(self) {
- [self cancelCall];
- self.isWaitingForToken = NO;
- }
- [self
- maybeFinishWithError:[NSError
- errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeCancelled
- userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
-}
-
-- (void)maybeFinishWithError:(NSError *)errorOrNil {
- BOOL toFinish = NO;
- @synchronized(self) {
- if (_finished == NO) {
- _finished = YES;
- toFinish = YES;
+ if (_state == GRXWriterStateFinished) {
+ return;
}
+ [self finishWithError:[NSError
+ errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeCancelled
+ userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
+ [_wrappedCall cancel];
}
- if (toFinish == YES) {
- [self finishWithError:errorOrNil];
- }
+ _requestWriter.state = GRXWriterStateFinished;
}
- (void)dealloc {
@@ -609,21 +591,24 @@ const char *kCFStreamVarName = "grpc_cfstream";
// TODO(jcanizales): Rename to readResponseIfNotPaused.
- (void)startNextRead {
@synchronized(self) {
- if (self.state == GRXWriterStatePaused) {
+ if (_state != GRXWriterStateStarted) {
return;
}
}
dispatch_async(_callQueue, ^{
__weak GRPCCall *weakSelf = self;
- __weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable;
[self startReadWithHandler:^(grpc_byte_buffer *message) {
- __strong GRPCCall *strongSelf = weakSelf;
- __strong GRXConcurrentWriteable *strongWriteable = weakWriteable;
+ NSLog(@"message received");
if (message == NULL) {
// No more messages from the server
return;
}
+ __strong GRPCCall *strongSelf = weakSelf;
+ if (strongSelf == nil) {
+ grpc_byte_buffer_destroy(message);
+ return;
+ }
NSData *data = [NSData grpc_dataWithByteBuffer:message];
grpc_byte_buffer_destroy(message);
if (!data) {
@@ -631,21 +616,26 @@ const char *kCFStreamVarName = "grpc_cfstream";
// don't want to throw, because the app shouldn't crash for a behavior
// that's on the hands of any server to have. Instead we finish and ask
// the server to cancel.
- [strongSelf cancelCall];
- [strongSelf
- maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeResourceExhausted
- userInfo:@{
- NSLocalizedDescriptionKey :
- @"Client does not have enough memory to "
- @"hold the server response."
- }]];
- return;
+ @synchronized(strongSelf) {
+ [strongSelf
+ finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeResourceExhausted
+ userInfo:@{
+ NSLocalizedDescriptionKey :
+ @"Client does not have enough memory to "
+ @"hold the server response."
+ }]];
+ [strongSelf->_wrappedCall cancel];
+ }
+ strongSelf->_requestWriter.state = GRXWriterStateFinished;
+ } else {
+ @synchronized(strongSelf) {
+ [strongSelf->_responseWriteable enqueueValue:data
+ completionHandler:^{
+ [strongSelf startNextRead];
+ }];
+ }
}
- [strongWriteable enqueueValue:data
- completionHandler:^{
- [strongSelf startNextRead];
- }];
}];
});
}
@@ -684,11 +674,13 @@ const char *kCFStreamVarName = "grpc_cfstream";
initWithMetadata:headers
flags:callSafetyFlags
handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA
- if (!_unaryCall) {
- [_wrappedCall startBatchWithOperations:@[ op ]];
- } else {
- [_unaryOpBatch addObject:op];
- }
+ dispatch_async(_callQueue, ^{
+ if (!self->_unaryCall) {
+ [self->_wrappedCall startBatchWithOperations:@[ op ]];
+ } else {
+ [self->_unaryOpBatch addObject:op];
+ }
+ });
}
#pragma mark GRXWriteable implementation
@@ -703,9 +695,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
- @synchronized(strongSelf->_requestWriter) {
- strongSelf->_requestWriter.state = GRXWriterStateStarted;
- }
+ strongSelf->_requestWriter.state = GRXWriterStateStarted;
}
};
@@ -721,13 +711,17 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)writeValue:(id)value {
- // TODO(jcanizales): Throw/assert if value isn't NSData.
+ NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData");
+
+ @synchronized(self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
+ }
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
- @synchronized(_requestWriter) {
- _requestWriter.state = GRXWriterStatePaused;
- }
+ _requestWriter.state = GRXWriterStatePaused;
dispatch_async(_callQueue, ^{
// Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
@@ -766,17 +760,20 @@ const char *kCFStreamVarName = "grpc_cfstream";
// The second one (completionHandler), whenever the RPC finishes for any reason.
- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler {
- // TODO(jcanizales): Add error handlers for async failures
- [_wrappedCall
- startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
- [_wrappedCall
- startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
+ dispatch_async(_callQueue, ^{
+ // TODO(jcanizales): Add error handlers for async failures
+ [self->_wrappedCall
+ startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
+ [self->_wrappedCall
+ startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
+ });
}
- (void)invokeCall {
__weak GRPCCall *weakSelf = self;
[self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
// Response headers received.
+ NSLog(@"response received");
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseHeaders = headers;
@@ -784,6 +781,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
}
completionHandler:^(NSError *error, NSDictionary *trailers) {
+ NSLog(@"completion received");
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseTrailers = trailers;
@@ -794,112 +792,114 @@ const char *kCFStreamVarName = "grpc_cfstream";
[userInfo addEntriesFromDictionary:error.userInfo];
}
userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
- // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
- // called before this one, so an error might end up with trailers but no headers. We
- // shouldn't call finishWithError until ater both blocks are called. It is also when
- // this is done that we can provide a merged view of response headers and trailers in a
- // thread-safe way.
- if (strongSelf.responseHeaders) {
- userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
- }
+ // Since gRPC core does not guarantee the headers block being called before this block,
+ // responseHeaders might be nil.
+ userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
}
- [strongSelf maybeFinishWithError:error];
+ [strongSelf finishWithError:error];
+ strongSelf->_requestWriter.state = GRXWriterStateFinished;
}
}];
- // Now that the RPC has been initiated, request writes can start.
- @synchronized(_requestWriter) {
- [_requestWriter startWithWriteable:self];
- }
}
#pragma mark GRXWriter implementation
+// Lock acquired inside startWithWriteable:
- (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
- _responseWriteable =
- [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
-
- GRPCPooledChannel *channel =
- [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
- GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:_path
- completionQueue:[GRPCCompletionQueue completionQueue]
- callOptions:_callOptions];
-
- if (wrappedCall == nil) {
- [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeUnavailable
- userInfo:@{
- NSLocalizedDescriptionKey :
- @"Failed to create call or channel."
- }]];
- return;
- }
-
@synchronized(self) {
- _wrappedCall = wrappedCall;
- }
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
+
+ _responseWriteable =
+ [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
+
+ GRPCPooledChannel *channel =
+ [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
+ _wrappedCall = [channel wrappedCallWithPath:_path
+ completionQueue:[GRPCCompletionQueue completionQueue]
+ callOptions:_callOptions];
+
+ if (_wrappedCall == nil) {
+ [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeUnavailable
+ userInfo:@{
+ NSLocalizedDescriptionKey :
+ @"Failed to create call or channel."
+ }]];
+ return;
+ }
- [self sendHeaders];
- [self invokeCall];
+ [self sendHeaders];
+ [self invokeCall];
- // Connectivity monitor is not required for CFStream
- char *enableCFStream = getenv(kCFStreamVarName);
- if (enableCFStream == nil || enableCFStream[0] != '1') {
- [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
+ // Connectivity monitor is not required for CFStream
+ char *enableCFStream = getenv(kCFStreamVarName);
+ if (enableCFStream == nil || enableCFStream[0] != '1') {
+ [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
+ }
}
+
+ // Now that the RPC has been initiated, request writes can start.
+ [_requestWriter startWithWriteable:self];
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ id<GRPCAuthorizationProtocol> tokenProvider = nil;
@synchronized(self) {
_state = GRXWriterStateStarted;
- }
- // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
- // This makes RPCs in which the call isn't externally retained possible (as long as it is started
- // before being autoreleased).
- // Care is taken not to retain self strongly in any of the blocks used in this implementation, so
- // that the life of the instance is determined by this retain cycle.
- _retainSelf = self;
+ // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
+ // This makes RPCs in which the call isn't externally retained possible (as long as it is
+ // started before being autoreleased). Care is taken not to retain self strongly in any of the
+ // blocks used in this implementation, so that the life of the instance is determined by this
+ // retain cycle.
+ _retainSelf = self;
+
+ if (_callOptions == nil) {
+ GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
+ if (_serverName.length != 0) {
+ callOptions.serverAuthority = _serverName;
+ }
+ if (_timeout > 0) {
+ callOptions.timeout = _timeout;
+ }
+ uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
+ if (callFlags != 0) {
+ if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
+ _callSafety = GRPCCallSafetyIdempotentRequest;
+ } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
+ _callSafety = GRPCCallSafetyCacheableRequest;
+ }
+ }
- if (_callOptions == nil) {
- GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
- if (_serverName.length != 0) {
- callOptions.serverAuthority = _serverName;
- }
- if (_timeout > 0) {
- callOptions.timeout = _timeout;
- }
- uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
- if (callFlags != 0) {
- if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
- _callSafety = GRPCCallSafetyIdempotentRequest;
- } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
- _callSafety = GRPCCallSafetyCacheableRequest;
+ id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
+ if (tokenProvider != nil) {
+ callOptions.authTokenProvider = tokenProvider;
}
+ _callOptions = callOptions;
}
- id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
- if (tokenProvider != nil) {
- callOptions.authTokenProvider = tokenProvider;
- }
- _callOptions = callOptions;
+ NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
+ @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
+
+ tokenProvider = _callOptions.authTokenProvider;
}
- NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
- @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
- if (_callOptions.authTokenProvider != nil) {
- @synchronized(self) {
- self.isWaitingForToken = YES;
- }
- [_callOptions.authTokenProvider getTokenWithHandler:^(NSString *token) {
- @synchronized(self) {
- if (self.isWaitingForToken) {
- if (token) {
- self->_fetchedOauth2AccessToken = [token copy];
+ if (tokenProvider != nil) {
+ __weak typeof(self) weakSelf = self;
+ [tokenProvider getTokenWithHandler:^(NSString *token) {
+ __strong typeof(self) strongSelf = weakSelf;
+ if (strongSelf) {
+ @synchronized(strongSelf) {
+ if (strongSelf->_state == GRXWriterStateNotStarted) {
+ if (token) {
+ strongSelf->_fetchedOauth2AccessToken = [token copy];
+ }
}
- [self startCallWithWriteable:writeable];
- self.isWaitingForToken = NO;
}
+ [strongSelf startCallWithWriteable:writeable];
}
}];
} else {
@@ -938,16 +938,21 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)connectivityChanged:(NSNotification *)note {
- // Cancel underlying call upon this notification
+ // Cancel underlying call upon this notification.
+
+ // Retain because connectivity manager only keeps weak reference to GRPCCall.
__strong GRPCCall *strongSelf = self;
if (strongSelf) {
- [self cancelCall];
- [self
- maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeUnavailable
- userInfo:@{
- NSLocalizedDescriptionKey : @"Connectivity lost."
- }]];
+ @synchronized(strongSelf) {
+ [_wrappedCall cancel];
+ [strongSelf
+ finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeUnavailable
+ userInfo:@{
+ NSLocalizedDescriptionKey : @"Connectivity lost."
+ }]];
+ }
+ strongSelf->_requestWriter.state = GRXWriterStateFinished;
}
}
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
index a871ea895a..ae08cc315b 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.h
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -36,8 +36,7 @@
* crash. If you want to react to flow control signals to prevent that, instead of using this class
* you can implement an object that conforms to GRXWriter.
*
- * Thread-safety:
- * The methods of an object of this class should not be called concurrently from different threads.
+ * Thread-safety: the methods of this class are thread-safe.
*/
@interface GRXBufferedPipe : GRXWriter<GRXWriteable>
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index 546d46cba3..74e2f03da6 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -51,16 +51,22 @@
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
value = [value copy];
}
- __weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^(void) {
- [weakSelf.writeable writeValue:value];
+ @synchronized(self) {
+ if (self->_state == GRXWriterStateFinished) {
+ return;
+ }
+ [self.writeable writeValue:value];
+ }
});
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- __weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^{
- [weakSelf finishWithError:errorOrNil];
+ if (self->_state == GRXWriterStateFinished) {
+ return;
+ }
+ [self finishWithError:errorOrNil];
});
}
@@ -100,14 +106,15 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- self.writeable = writeable;
- _state = GRXWriterStateStarted;
+ @synchronized(self) {
+ self.writeable = writeable;
+ _state = GRXWriterStateStarted;
+ }
dispatch_resume(_writeQueue);
}
- (void)finishWithError:(NSError *)errorOrNil {
[self.writeable writesFinishedWithError:errorOrNil];
- self.state = GRXWriterStateFinished;
}
- (void)dealloc {
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
index abb831e6fb..5beca9d41e 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
@@ -23,10 +23,10 @@
/**
* This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a
- * GRXWriteable instance for the main thread, guaranteeing that writesFinishedWithError: is the last
- * message sent to it (no matter what messages are sent to the wrapper, in what order, nor from
- * which thread). It also guarantees that, if cancelWithError: is called from the main thread (e.g.
- * by the app cancelling the writes), no further messages are sent to the writeable except
+ * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is
+ * the last message sent to it (no matter what messages are sent to the wrapper, in what order, nor
+ * from which thread). It also guarantees that, if cancelWithError: is called (e.g. by the app
+ * cancelling the writes), no further messages are sent to the writeable except
* writesFinishedWithError:.
*
* TODO(jcanizales): Let the user specify another queue for the writeable callbacks.
@@ -43,21 +43,22 @@
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable;
/**
- * Enqueues writeValue: to be sent to the writeable in the main thread.
- * The passed handler is invoked from the main thread after writeValue: returns.
+ * Enqueues writeValue: to be sent to the writeable from the designated dispatch queue.
+ * The passed handler is invoked from designated dispatch queue after writeValue: returns.
*/
- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler;
/**
- * Enqueues writesFinishedWithError:nil to be sent to the writeable in the main thread. After that
- * message is sent to the writeable, all other methods of this object are effectively noops.
+ * Enqueues writesFinishedWithError:nil to be sent to the writeable in the designated dispatch
+ * queue. After that message is sent to the writeable, all other methods of this object are
+ * effectively noops.
*/
- (void)enqueueSuccessfulCompletion;
/**
* If the writeable has not yet received a writesFinishedWithError: message, this will enqueue one
- * to be sent to it in the main thread, and cancel all other pending messages to the writeable
- * enqueued by this object (both past and future).
+ * to be sent to it in the designated dispatch queue, and cancel all other pending messages to the
+ * writeable enqueued by this object (both past and future).
* The error argument cannot be nil.
*/
- (void)cancelWithError:(NSError *)error;
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
index 81ccc3fbce..115195463d 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
@@ -27,8 +27,15 @@
@implementation GRXConcurrentWriteable {
dispatch_queue_t _writeableQueue;
- // This ensures that writesFinishedWithError: is only sent once to the writeable.
+
+ // This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected
+ // by _writeableQueue.
BOOL _alreadyFinished;
+
+ // This ivar ensures that a cancelWithError: call prevents further values to be sent to
+ // self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be
+ // protected by self lock.
+ BOOL _cancelled;
}
- (instancetype)init {
@@ -41,6 +48,8 @@
if (self = [super init]) {
_writeableQueue = queue;
_writeable = writeable;
+ _alreadyFinished = NO;
+ _cancelled = NO;
}
return self;
}
@@ -51,78 +60,63 @@
- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler {
dispatch_async(_writeableQueue, ^{
- // We're racing a possible cancellation performed by another thread. To turn all already-
- // enqueued messages into noops, cancellation nillifies the writeable property. If we get it
- // before it's nil, we won the race.
- id<GRXWriteable> writeable = self.writeable;
- if (writeable) {
- [writeable writeValue:value];
- handler();
+ if (self->_alreadyFinished) {
+ return;
+ }
+
+ @synchronized(self) {
+ if (self->_cancelled) {
+ return;
+ }
}
+
+ [self.writeable writeValue:value];
+ handler();
});
}
- (void)enqueueSuccessfulCompletion {
- __weak typeof(self) weakSelf = self;
dispatch_async(_writeableQueue, ^{
- typeof(self) strongSelf = weakSelf;
- if (strongSelf) {
- BOOL finished = NO;
- @synchronized(strongSelf) {
- if (!strongSelf->_alreadyFinished) {
- strongSelf->_alreadyFinished = YES;
- } else {
- finished = YES;
- }
- }
- if (!finished) {
- // Cancellation is now impossible. None of the other three blocks can run concurrently with
- // this one.
- [strongSelf.writeable writesFinishedWithError:nil];
- // Skip any possible message to the wrapped writeable enqueued after this one.
- strongSelf.writeable = nil;
+ if (self->_alreadyFinished) {
+ return;
+ }
+ @synchronized(self) {
+ if (self->_cancelled) {
+ return;
}
}
+ [self.writeable writesFinishedWithError:nil];
+
+ // Skip any possible message to the wrapped writeable enqueued after this one.
+ self->_alreadyFinished = YES;
+ self.writeable = nil;
});
}
- (void)cancelWithError:(NSError *)error {
- NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion.");
- BOOL finished = NO;
+ NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion.");
@synchronized(self) {
- if (!_alreadyFinished) {
- _alreadyFinished = YES;
- } else {
- finished = YES;
- }
+ self->_cancelled = YES;
}
- if (!finished) {
- // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
- // nillify writeable because we might be running concurrently with the blocks in
- // _writeableQueue, and assignment with ARC isn't atomic.
- id<GRXWriteable> writeable = self.writeable;
- self.writeable = nil;
+ dispatch_async(_writeableQueue, ^{
+ if (self->_alreadyFinished) {
+ // a cancel or a successful completion is already issued
+ return;
+ }
+ [self.writeable writesFinishedWithError:error];
- dispatch_async(_writeableQueue, ^{
- [writeable writesFinishedWithError:error];
- });
- }
+ // Skip any possible message to the wrapped writeable enqueued after this one.
+ self->_alreadyFinished = YES;
+ self.writeable = nil;
+ });
}
- (void)cancelSilently {
- BOOL finished = NO;
- @synchronized(self) {
- if (!_alreadyFinished) {
- _alreadyFinished = YES;
- } else {
- finished = YES;
+ dispatch_async(_writeableQueue, ^{
+ if (self->_alreadyFinished) {
+ return;
}
- }
- if (!finished) {
- // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
- // nillify writeable because we might be running concurrently with the blocks in
- // _writeableQueue, and assignment with ARC isn't atomic.
self.writeable = nil;
- }
+ });
}
@end
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.h b/src/objective-c/RxLibrary/GRXForwardingWriter.h
index 3814ff8831..00366b6416 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.h
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.h
@@ -25,11 +25,7 @@
* input writer, and for classes that represent objects with input and
* output sequences of values, like an RPC.
*
- * Thread-safety:
- * All messages sent to this object need to be serialized. When it is started, the writer it wraps
- * is started in the same thread. Manual state changes are propagated to the wrapped writer in the
- * same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be
- * serialized with any message sent to this object.
+ * Thread-safety: the methods of this class are thread safe.
*/
@interface GRXForwardingWriter : GRXWriter
- (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER;
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m
index 3e522ef24e..27ac0acdff 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.m
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m
@@ -54,48 +54,65 @@
[writeable writesFinishedWithError:errorOrNil];
}
-// This is used to stop the input writer. It nillifies our reference to it
-// to release it.
-- (void)finishInput {
- GRXWriter *writer = _writer;
- _writer = nil;
- writer.state = GRXWriterStateFinished;
-}
-
#pragma mark GRXWriteable implementation
- (void)writeValue:(id)value {
- [_writeable writeValue:value];
+ @synchronized(self) {
+ [_writeable writeValue:value];
+ }
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _writer = nil;
- [self finishOutputWithError:errorOrNil];
+ @synchronized(self) {
+ _writer = nil;
+ [self finishOutputWithError:errorOrNil];
+ }
}
#pragma mark GRXWriter implementation
- (GRXWriterState)state {
- return _writer ? _writer.state : GRXWriterStateFinished;
+ GRXWriter *copiedWriter;
+ @synchronized(self) {
+ copiedWriter = _writer;
+ }
+ return copiedWriter ? copiedWriter.state : GRXWriterStateFinished;
}
- (void)setState:(GRXWriterState)state {
+ GRXWriter *copiedWriter = nil;
if (state == GRXWriterStateFinished) {
- _writeable = nil;
- [self finishInput];
+ @synchronized(self) {
+ _writeable = nil;
+ copiedWriter = _writer;
+ _writer = nil;
+ }
+ copiedWriter.state = GRXWriterStateFinished;
} else {
- _writer.state = state;
+ @synchronized(self) {
+ copiedWriter = _writer;
+ }
+ copiedWriter.state = state;
}
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- _writeable = writeable;
- [_writer startWithWriteable:self];
+ GRXWriter *copiedWriter = nil;
+ @synchronized(self) {
+ _writeable = writeable;
+ copiedWriter = _writer;
+ }
+ [copiedWriter startWithWriteable:self];
}
- (void)finishWithError:(NSError *)errorOrNil {
- [self finishOutputWithError:errorOrNil];
- [self finishInput];
+ GRXWriter *copiedWriter = nil;
+ @synchronized(self) {
+ [self finishOutputWithError:errorOrNil];
+ copiedWriter = _writer;
+ _writer = nil;
+ }
+ copiedWriter.state = GRXWriterStateFinished;
}
@end
diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h
index 601abdc6b9..2fa38b3dce 100644
--- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h
+++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h
@@ -23,6 +23,8 @@
/**
* Utility to construct GRXWriter instances from values that are immediately available when
* required.
+ *
+ * Thread safety: the methods of this class are thread safe.
*/
@interface GRXImmediateSingleWriter : GRXImmediateWriter
diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
index 3126ae4bd1..079c11b94f 100644
--- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
+++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
@@ -20,7 +20,6 @@
@implementation GRXImmediateSingleWriter {
id _value;
- id<GRXWriteable> _writeable;
}
@synthesize state = _state;
@@ -38,17 +37,16 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- _state = GRXWriterStateStarted;
- _writeable = writeable;
- [writeable writeValue:_value];
- [self finish];
-}
-
-- (void)finish {
- _state = GRXWriterStateFinished;
- _value = nil;
- id<GRXWriteable> writeable = _writeable;
- _writeable = nil;
+ id copiedValue = nil;
+ @synchronized(self) {
+ if (_state != GRXWriterStateNotStarted) {
+ return;
+ }
+ copiedValue = _value;
+ _value = nil;
+ _state = GRXWriterStateFinished;
+ }
+ [writeable writeValue:copiedValue];
[writeable writesFinishedWithError:nil];
}
@@ -65,9 +63,11 @@
// the original \a map function returns a new Writer of another type. So we
// need to override this function here.
- (GRXWriter *)map:(id (^)(id))map {
- // Since _value is available when creating the object, we can simply
- // apply the map and store the output.
- _value = map(_value);
+ @synchronized(self) {
+ // Since _value is available when creating the object, we can simply
+ // apply the map and store the output.
+ _value = map(_value);
+ }
return self;
}
diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h
index 5d99583a92..df4c80c28d 100644
--- a/src/objective-c/RxLibrary/GRXWriter.h
+++ b/src/objective-c/RxLibrary/GRXWriter.h
@@ -80,9 +80,9 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
* This property can be used to query the current state of the writer, which determines how it might
* currently use its writeable. Some state transitions can be triggered by setting this property to
* the corresponding value, and that's useful for advanced use cases like pausing an writer. For
- * more details, see the documentation of the enum further down.
+ * more details, see the documentation of the enum further down. The property is thread safe.
*/
-@property(nonatomic) GRXWriterState state;
+@property GRXWriterState state;
/**
* Transition to the Started state, and start sending messages to the writeable (a reference to it