aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2018-12-21 13:39:21 -0800
committerGravatar Muxi Yan <mxyan@google.com>2019-01-07 10:27:20 -0800
commit5e10a3b037bcd20ab17428ccb765ba9464eb3644 (patch)
tree187a4a277055a8d969cabb77d605477599895731
parent4ff3543465e0a8d26a5bdf934f1dc09d0703970b (diff)
Make gRPC ObjC thread safety right
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m360
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m21
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.h4
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.m76
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.m48
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateSingleWriter.m30
-rw-r--r--src/objective-c/RxLibrary/GRXWriter.h4
7 files changed, 275 insertions, 268 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 83c6edc6e3..5ea1755c58 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -448,24 +448,30 @@ const char *kCFStreamVarName = "grpc_cfstream";
return;
}
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
- switch (callSafety) {
- case GRPCCallSafetyDefault:
- callFlags[hostAndPath] = @0;
- break;
- case GRPCCallSafetyIdempotentRequest:
- callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
- break;
- case GRPCCallSafetyCacheableRequest:
- callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
- break;
- default:
- break;
+ @synchronized (callFlags) {
+ switch (callSafety) {
+ case GRPCCallSafetyDefault:
+ callFlags[hostAndPath] = @0;
+ break;
+ case GRPCCallSafetyIdempotentRequest:
+ callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
+ break;
+ case GRPCCallSafetyCacheableRequest:
+ callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
+ break;
+ default:
+ break;
+ }
}
}
+ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
- return [callFlags[hostAndPath] intValue];
+ uint32_t flags = 0;
+ @synchronized (callFlags) {
+ flags = [callFlags[hostAndPath] intValue];
+ }
+ return flags;
}
// Designated initializer
@@ -506,7 +512,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
_callOptions = [callOptions copy];
// Serial queue to invoke the non-reentrant methods of the grpc_call object.
- _callQueue = dispatch_queue_create("io.grpc.call", NULL);
+ _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL);
_requestWriter = requestWriter;
@@ -523,53 +529,49 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
- if (_state != GRXWriterStateNotStarted) {
- return;
+ @synchronized (self) {
+ if (_state != GRXWriterStateNotStarted) {
+ return;
+ }
+ _responseQueue = queue;
}
- _responseQueue = queue;
}
#pragma mark Finish
- (void)finishWithError:(NSError *)errorOrNil {
+ GRXConcurrentWriteable *copiedResponseWriteable = nil;
+
@synchronized(self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
_state = GRXWriterStateFinished;
- }
+ copiedResponseWriteable = _responseWriteable;
- // If there were still request messages coming, stop them.
- @synchronized(_requestWriter) {
- _requestWriter.state = GRXWriterStateFinished;
+ // If the call isn't retained anywhere else, it can be deallocated now.
+ _retainSelf = nil;
}
if (errorOrNil) {
- [_responseWriteable cancelWithError:errorOrNil];
+ [copiedResponseWriteable cancelWithError:errorOrNil];
} else {
- [_responseWriteable enqueueSuccessfulCompletion];
- }
-
- [GRPCConnectivityMonitor unregisterObserver:self];
-
- // If the call isn't retained anywhere else, it can be deallocated now.
- _retainSelf = nil;
-}
-
-- (void)cancelCall {
- // Can be called from any thread, any number of times.
- @synchronized(self) {
- [_wrappedCall cancel];
+ [copiedResponseWriteable enqueueSuccessfulCompletion];
}
+ _requestWriter.state = GRXWriterStateFinished;
}
- (void)cancel {
- @synchronized(self) {
- [self cancelCall];
- self.isWaitingForToken = NO;
+ @synchronized (self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
+ [self finishWithError:[NSError
+ errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeCancelled
+ userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
+ [_wrappedCall cancel];
}
- [self
- maybeFinishWithError:[NSError
- errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeCancelled
- userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
}
- (void)maybeFinishWithError:(NSError *)errorOrNil {
@@ -609,21 +611,24 @@ const char *kCFStreamVarName = "grpc_cfstream";
// TODO(jcanizales): Rename to readResponseIfNotPaused.
- (void)startNextRead {
@synchronized(self) {
- if (self.state == GRXWriterStatePaused) {
+ if (_state != GRXWriterStateStarted) {
return;
}
}
dispatch_async(_callQueue, ^{
__weak GRPCCall *weakSelf = self;
- __weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable;
[self startReadWithHandler:^(grpc_byte_buffer *message) {
- __strong GRPCCall *strongSelf = weakSelf;
- __strong GRXConcurrentWriteable *strongWriteable = weakWriteable;
+ NSLog(@"message received");
if (message == NULL) {
// No more messages from the server
return;
}
+ __strong GRPCCall *strongSelf = weakSelf;
+ if (strongSelf == nil) {
+ grpc_byte_buffer_destroy(message);
+ return;
+ }
NSData *data = [NSData grpc_dataWithByteBuffer:message];
grpc_byte_buffer_destroy(message);
if (!data) {
@@ -631,21 +636,25 @@ const char *kCFStreamVarName = "grpc_cfstream";
// don't want to throw, because the app shouldn't crash for a behavior
// that's on the hands of any server to have. Instead we finish and ask
// the server to cancel.
- [strongSelf cancelCall];
- [strongSelf
- maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeResourceExhausted
- userInfo:@{
- NSLocalizedDescriptionKey :
- @"Client does not have enough memory to "
- @"hold the server response."
- }]];
- return;
+ @synchronized (strongSelf) {
+ [strongSelf
+ finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeResourceExhausted
+ userInfo:@{
+ NSLocalizedDescriptionKey :
+ @"Client does not have enough memory to "
+ @"hold the server response."
+ }]];
+ [strongSelf->_wrappedCall cancel];
+ }
+ } else {
+ @synchronized (strongSelf) {
+ [strongSelf->_responseWriteable enqueueValue:data
+ completionHandler:^{
+ [strongSelf startNextRead];
+ }];
+ }
}
- [strongWriteable enqueueValue:data
- completionHandler:^{
- [strongSelf startNextRead];
- }];
}];
});
}
@@ -680,15 +689,16 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
// TODO(jcanizales): Add error handlers for async failures
- GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc]
- initWithMetadata:headers
- flags:callSafetyFlags
- handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA
- if (!_unaryCall) {
- [_wrappedCall startBatchWithOperations:@[ op ]];
- } else {
- [_unaryOpBatch addObject:op];
- }
+ GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] initWithMetadata:headers
+ flags:callSafetyFlags
+ handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA
+ dispatch_async(_callQueue, ^{
+ if (!self->_unaryCall) {
+ [self->_wrappedCall startBatchWithOperations:@[ op ]];
+ } else {
+ [self->_unaryOpBatch addObject:op];
+ }
+ });
}
#pragma mark GRXWriteable implementation
@@ -703,9 +713,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
- @synchronized(strongSelf->_requestWriter) {
- strongSelf->_requestWriter.state = GRXWriterStateStarted;
- }
+ strongSelf->_requestWriter.state = GRXWriterStateStarted;
}
};
@@ -721,13 +729,17 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)writeValue:(id)value {
- // TODO(jcanizales): Throw/assert if value isn't NSData.
+ NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData");
+
+ @synchronized (self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
+ }
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
- @synchronized(_requestWriter) {
- _requestWriter.state = GRXWriterStatePaused;
- }
+ _requestWriter.state = GRXWriterStatePaused;
dispatch_async(_callQueue, ^{
// Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
@@ -752,6 +764,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
[self cancel];
} else {
dispatch_async(_callQueue, ^{
+
// EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
[self finishRequestWithErrorHandler:nil];
});
@@ -766,17 +779,20 @@ const char *kCFStreamVarName = "grpc_cfstream";
// The second one (completionHandler), whenever the RPC finishes for any reason.
- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler {
- // TODO(jcanizales): Add error handlers for async failures
- [_wrappedCall
- startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
- [_wrappedCall
- startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
+ dispatch_async(_callQueue, ^{
+ // TODO(jcanizales): Add error handlers for async failures
+ [self->_wrappedCall
+ startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
+ [self->_wrappedCall
+ startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
+ });
}
- (void)invokeCall {
__weak GRPCCall *weakSelf = self;
[self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
// Response headers received.
+ NSLog(@"response received");
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseHeaders = headers;
@@ -784,6 +800,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
}
completionHandler:^(NSError *error, NSDictionary *trailers) {
+ NSLog(@"completion received");
__strong GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
strongSelf.responseTrailers = trailers;
@@ -794,112 +811,113 @@ const char *kCFStreamVarName = "grpc_cfstream";
[userInfo addEntriesFromDictionary:error.userInfo];
}
userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
- // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
- // called before this one, so an error might end up with trailers but no headers. We
- // shouldn't call finishWithError until ater both blocks are called. It is also when
- // this is done that we can provide a merged view of response headers and trailers in a
- // thread-safe way.
- if (strongSelf.responseHeaders) {
- userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
- }
+ // Since gRPC core does not guarantee the headers block being called before this block,
+ // responseHeaders might be nil.
+ userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
}
- [strongSelf maybeFinishWithError:error];
+ [strongSelf finishWithError:error];
}
}];
- // Now that the RPC has been initiated, request writes can start.
- @synchronized(_requestWriter) {
- [_requestWriter startWithWriteable:self];
- }
}
#pragma mark GRXWriter implementation
+// Lock acquired inside startWithWriteable:
- (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
- _responseWriteable =
- [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
-
- GRPCPooledChannel *channel =
- [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
- GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:_path
- completionQueue:[GRPCCompletionQueue completionQueue]
- callOptions:_callOptions];
-
- if (wrappedCall == nil) {
- [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeUnavailable
- userInfo:@{
- NSLocalizedDescriptionKey :
- @"Failed to create call or channel."
- }]];
- return;
- }
+ @synchronized (self) {
+ if (_state == GRXWriterStateFinished) {
+ return;
+ }
- @synchronized(self) {
- _wrappedCall = wrappedCall;
- }
+ _responseWriteable =
+ [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
+
+ GRPCPooledChannel *channel =
+ [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
+ _wrappedCall = [channel wrappedCallWithPath:_path
+ completionQueue:[GRPCCompletionQueue completionQueue]
+ callOptions:_callOptions];
- [self sendHeaders];
- [self invokeCall];
+ if (_wrappedCall == nil) {
+ [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeUnavailable
+ userInfo:@{
+ NSLocalizedDescriptionKey :
+ @"Failed to create call or channel."
+ }]];
+ return;
+ }
- // Connectivity monitor is not required for CFStream
- char *enableCFStream = getenv(kCFStreamVarName);
- if (enableCFStream == nil || enableCFStream[0] != '1') {
- [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
+ [self sendHeaders];
+ [self invokeCall];
+
+ // Connectivity monitor is not required for CFStream
+ char *enableCFStream = getenv(kCFStreamVarName);
+ if (enableCFStream == nil || enableCFStream[0] != '1') {
+ [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
+ }
}
+
+ // Now that the RPC has been initiated, request writes can start.
+ [_requestWriter startWithWriteable:self];
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ id<GRPCAuthorizationProtocol> tokenProvider = nil;
@synchronized(self) {
_state = GRXWriterStateStarted;
- }
- // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
- // This makes RPCs in which the call isn't externally retained possible (as long as it is started
- // before being autoreleased).
- // Care is taken not to retain self strongly in any of the blocks used in this implementation, so
- // that the life of the instance is determined by this retain cycle.
- _retainSelf = self;
+ // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
+ // This makes RPCs in which the call isn't externally retained possible (as long as it is started
+ // before being autoreleased).
+ // Care is taken not to retain self strongly in any of the blocks used in this implementation, so
+ // that the life of the instance is determined by this retain cycle.
+ _retainSelf = self;
+
+ if (_callOptions == nil) {
+ GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
+ if (_serverName.length != 0) {
+ callOptions.serverAuthority = _serverName;
+ }
+ if (_timeout > 0) {
+ callOptions.timeout = _timeout;
+ }
+ uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
+ if (callFlags != 0) {
+ if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
+ _callSafety = GRPCCallSafetyIdempotentRequest;
+ } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
+ _callSafety = GRPCCallSafetyCacheableRequest;
+ }
+ }
- if (_callOptions == nil) {
- GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
- if (_serverName.length != 0) {
- callOptions.serverAuthority = _serverName;
- }
- if (_timeout > 0) {
- callOptions.timeout = _timeout;
- }
- uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
- if (callFlags != 0) {
- if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
- _callSafety = GRPCCallSafetyIdempotentRequest;
- } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
- _callSafety = GRPCCallSafetyCacheableRequest;
+ id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
+ if (tokenProvider != nil) {
+ callOptions.authTokenProvider = tokenProvider;
}
+ _callOptions = callOptions;
}
- id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
- if (tokenProvider != nil) {
- callOptions.authTokenProvider = tokenProvider;
- }
- _callOptions = callOptions;
+ NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
+ @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
+
+ tokenProvider = _callOptions.authTokenProvider;
}
- NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
- @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
- if (_callOptions.authTokenProvider != nil) {
- @synchronized(self) {
- self.isWaitingForToken = YES;
- }
- [_callOptions.authTokenProvider getTokenWithHandler:^(NSString *token) {
- @synchronized(self) {
- if (self.isWaitingForToken) {
- if (token) {
- self->_fetchedOauth2AccessToken = [token copy];
+ if (tokenProvider != nil) {
+ __weak typeof(self) weakSelf = self;
+ [tokenProvider getTokenWithHandler:^(NSString *token) {
+ __strong typeof(self) strongSelf = weakSelf;
+ if (strongSelf) {
+ @synchronized(strongSelf) {
+ if (strongSelf->_state == GRXWriterStateNotStarted) {
+ if (token) {
+ strongSelf->_fetchedOauth2AccessToken = [token copy];
+ }
}
- [self startCallWithWriteable:writeable];
- self.isWaitingForToken = NO;
}
+ [strongSelf startCallWithWriteable:writeable];
}
}];
} else {
@@ -938,16 +956,20 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)connectivityChanged:(NSNotification *)note {
- // Cancel underlying call upon this notification
+ // Cancel underlying call upon this notification.
+
+ // Retain because connectivity manager only keeps weak reference to GRPCCall.
__strong GRPCCall *strongSelf = self;
if (strongSelf) {
- [self cancelCall];
- [self
- maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
- code:GRPCErrorCodeUnavailable
- userInfo:@{
- NSLocalizedDescriptionKey : @"Connectivity lost."
- }]];
+ @synchronized (strongSelf) {
+ [_wrappedCall cancel];
+ [strongSelf
+ finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeUnavailable
+ userInfo:@{
+ NSLocalizedDescriptionKey : @"Connectivity lost."
+ }]];
+ }
}
}
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index 546d46cba3..d0064a5cfa 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -51,16 +51,22 @@
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
value = [value copy];
}
- __weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^(void) {
- [weakSelf.writeable writeValue:value];
+ @synchronized (self) {
+ if (self->_state == GRXWriterStateFinished) {
+ return;
+ }
+ [self.writeable writeValue:value];
+ }
});
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- __weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^{
- [weakSelf finishWithError:errorOrNil];
+ if (self->_state == GRXWriterStateFinished) {
+ return;
+ }
+ [self finishWithError:errorOrNil];
});
}
@@ -100,14 +106,15 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- self.writeable = writeable;
- _state = GRXWriterStateStarted;
+ @synchronized (self) {
+ self.writeable = writeable;
+ _state = GRXWriterStateStarted;
+ }
dispatch_resume(_writeQueue);
}
- (void)finishWithError:(NSError *)errorOrNil {
[self.writeable writesFinishedWithError:errorOrNil];
- self.state = GRXWriterStateFinished;
}
- (void)dealloc {
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
index abb831e6fb..606f5dea95 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
@@ -23,9 +23,9 @@
/**
* This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a
- * GRXWriteable instance for the main thread, guaranteeing that writesFinishedWithError: is the last
+ * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is the last
* message sent to it (no matter what messages are sent to the wrapper, in what order, nor from
- * which thread). It also guarantees that, if cancelWithError: is called from the main thread (e.g.
+ * which thread). It also guarantees that, if cancelWithError: is called (e.g.
* by the app cancelling the writes), no further messages are sent to the writeable except
* writesFinishedWithError:.
*
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
index 81ccc3fbce..229d592f48 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
@@ -41,6 +41,7 @@
if (self = [super init]) {
_writeableQueue = queue;
_writeable = writeable;
+ _alreadyFinished = NO;
}
return self;
}
@@ -51,78 +52,43 @@
- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler {
dispatch_async(_writeableQueue, ^{
- // We're racing a possible cancellation performed by another thread. To turn all already-
- // enqueued messages into noops, cancellation nillifies the writeable property. If we get it
- // before it's nil, we won the race.
- id<GRXWriteable> writeable = self.writeable;
- if (writeable) {
- [writeable writeValue:value];
- handler();
+ if (self->_alreadyFinished) {
+ return;
}
+
+ [self.writeable writeValue:value];
+ handler();
});
}
- (void)enqueueSuccessfulCompletion {
- __weak typeof(self) weakSelf = self;
dispatch_async(_writeableQueue, ^{
- typeof(self) strongSelf = weakSelf;
- if (strongSelf) {
- BOOL finished = NO;
- @synchronized(strongSelf) {
- if (!strongSelf->_alreadyFinished) {
- strongSelf->_alreadyFinished = YES;
- } else {
- finished = YES;
- }
- }
- if (!finished) {
- // Cancellation is now impossible. None of the other three blocks can run concurrently with
- // this one.
- [strongSelf.writeable writesFinishedWithError:nil];
- // Skip any possible message to the wrapped writeable enqueued after this one.
- strongSelf.writeable = nil;
- }
+ if (self->_alreadyFinished) {
+ return;
}
+ [self.writeable writesFinishedWithError:nil];
+ // Skip any possible message to the wrapped writeable enqueued after this one.
+ self.writeable = nil;
});
}
- (void)cancelWithError:(NSError *)error {
- NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion.");
- BOOL finished = NO;
- @synchronized(self) {
- if (!_alreadyFinished) {
- _alreadyFinished = YES;
- } else {
- finished = YES;
+ NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion.");
+ dispatch_async(_writeableQueue, ^{
+ if (self->_alreadyFinished) {
+ return;
}
- }
- if (!finished) {
- // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
- // nillify writeable because we might be running concurrently with the blocks in
- // _writeableQueue, and assignment with ARC isn't atomic.
- id<GRXWriteable> writeable = self.writeable;
+ [self.writeable writesFinishedWithError:error];
self.writeable = nil;
-
- dispatch_async(_writeableQueue, ^{
- [writeable writesFinishedWithError:error];
- });
- }
+ });
}
- (void)cancelSilently {
- BOOL finished = NO;
- @synchronized(self) {
- if (!_alreadyFinished) {
- _alreadyFinished = YES;
- } else {
- finished = YES;
+ dispatch_async(_writeableQueue, ^{
+ if (self->_alreadyFinished) {
+ return;
}
- }
- if (!finished) {
- // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
- // nillify writeable because we might be running concurrently with the blocks in
- // _writeableQueue, and assignment with ARC isn't atomic.
self.writeable = nil;
- }
+ });
}
@end
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m
index 3e522ef24e..376c196b4f 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.m
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m
@@ -54,23 +54,19 @@
[writeable writesFinishedWithError:errorOrNil];
}
-// This is used to stop the input writer. It nillifies our reference to it
-// to release it.
-- (void)finishInput {
- GRXWriter *writer = _writer;
- _writer = nil;
- writer.state = GRXWriterStateFinished;
-}
-
#pragma mark GRXWriteable implementation
- (void)writeValue:(id)value {
- [_writeable writeValue:value];
+ @synchronized (self) {
+ [_writeable writeValue:value];
+ }
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _writer = nil;
- [self finishOutputWithError:errorOrNil];
+ @synchronized (self) {
+ _writer = nil;
+ [self finishOutputWithError:errorOrNil];
+ }
}
#pragma mark GRXWriter implementation
@@ -80,22 +76,38 @@
}
- (void)setState:(GRXWriterState)state {
+ GRXWriter *copiedWriter = nil;
if (state == GRXWriterStateFinished) {
- _writeable = nil;
- [self finishInput];
+ @synchronized (self) {
+ _writeable = nil;
+ copiedWriter = _writer;
+ _writer = nil;
+ }
+ copiedWriter.state = GRXWriterStateFinished;
} else {
- _writer.state = state;
+ @synchronized (self) {
+ copiedWriter = _writer;
+ }
+ copiedWriter.state = state;
}
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- _writeable = writeable;
- [_writer startWithWriteable:self];
+ GRXWriter *copiedWriter = nil;
+ @synchronized (self) {
+ _writeable = writeable;
+ copiedWriter = _writer;
+ }
+ [copiedWriter startWithWriteable:self];
}
- (void)finishWithError:(NSError *)errorOrNil {
- [self finishOutputWithError:errorOrNil];
- [self finishInput];
+ GRXWriter *copiedWriter = nil;
+ @synchronized (self) {
+ [self finishOutputWithError:errorOrNil];
+ copiedWriter = _writer;
+ }
+ copiedWriter.state = GRXWriterStateFinished;
}
@end
diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
index 3126ae4bd1..eadad6c3b6 100644
--- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
+++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
@@ -20,7 +20,6 @@
@implementation GRXImmediateSingleWriter {
id _value;
- id<GRXWriteable> _writeable;
}
@synthesize state = _state;
@@ -38,17 +37,16 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- _state = GRXWriterStateStarted;
- _writeable = writeable;
- [writeable writeValue:_value];
- [self finish];
-}
-
-- (void)finish {
- _state = GRXWriterStateFinished;
- _value = nil;
- id<GRXWriteable> writeable = _writeable;
- _writeable = nil;
+ id copiedValue = nil;
+ @synchronized (self) {
+ if (_state != GRXWriterStateNotStarted) {
+ return;
+ }
+ copiedValue = _value;
+ _value = nil;
+ _state = GRXWriterStateFinished;
+ }
+ [writeable writeValue:copiedValue];
[writeable writesFinishedWithError:nil];
}
@@ -65,9 +63,11 @@
// the original \a map function returns a new Writer of another type. So we
// need to override this function here.
- (GRXWriter *)map:(id (^)(id))map {
- // Since _value is available when creating the object, we can simply
- // apply the map and store the output.
- _value = map(_value);
+ @synchronized (self) {
+ // Since _value is available when creating the object, we can simply
+ // apply the map and store the output.
+ _value = map(_value);
+ }
return self;
}
diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h
index 5d99583a92..ac1f7b9c4c 100644
--- a/src/objective-c/RxLibrary/GRXWriter.h
+++ b/src/objective-c/RxLibrary/GRXWriter.h
@@ -80,9 +80,9 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
* This property can be used to query the current state of the writer, which determines how it might
* currently use its writeable. Some state transitions can be triggered by setting this property to
* the corresponding value, and that's useful for advanced use cases like pausing an writer. For
- * more details, see the documentation of the enum further down.
+ * more details, see the documentation of the enum further down. The property is thread safe.
*/
-@property(nonatomic) GRXWriterState state;
+@property(atomic) GRXWriterState state;
/**
* Transition to the Started state, and start sending messages to the writeable (a reference to it