diff options
Diffstat (limited to 'src/objective-c')
-rw-r--r-- | src/objective-c/GRPCClient/GRPCCall.m | 32 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCCompletionQueue.m | 3 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCWrappedCall.m | 7 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXConcurrentWriteable.h (renamed from src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h) | 52 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXConcurrentWriteable.m (renamed from src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m) | 50 | ||||
-rw-r--r-- | src/objective-c/tests/Tests.xcodeproj/project.pbxproj | 2 |
6 files changed, 70 insertions, 76 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 53e5abe177..9435bf2b35 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -35,10 +35,10 @@ #include <grpc/grpc.h> #include <grpc/support/time.h> +#import <RxLibrary/GRXConcurrentWriteable.h> #import "private/GRPCChannel.h" #import "private/GRPCCompletionQueue.h" -#import "private/GRPCDelegateWrapper.h" #import "private/GRPCWrappedCall.h" #import "private/NSData+GRPC.h" #import "private/NSDictionary+GRPC.h" @@ -78,9 +78,13 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; // do. Particularly, in the face of errors, there's no ordering guarantee at // all. This wrapper over our actual writeable ensures thread-safety and // correct ordering. - GRPCDelegateWrapper *_responseWriteable; + GRXConcurrentWriteable *_responseWriteable; GRXWriter *_requestWriter; + // To create a retain cycle when a call is started, up until it finishes. See + // |startWithWriteable:| and |finishWithError:|. + GRPCCall *_self; + NSMutableDictionary *_requestMetadata; NSMutableDictionary *_responseMetadata; } @@ -143,8 +147,13 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; #pragma mark Finish - (void)finishWithError:(NSError *)errorOrNil { + // If the call isn't retained anywhere else, it can be deallocated now. + _self = nil; + + // If there were still request messages coming, stop them. _requestWriter.state = GRXWriterStateFinished; _requestWriter = nil; + if (errorOrNil) { [_responseWriteable cancelWithError:errorOrNil]; } else { @@ -191,7 +200,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; return; } __weak GRPCCall *weakSelf = self; - __weak GRPCDelegateWrapper *weakWriteable = _responseWriteable; + __weak GRXConcurrentWriteable *weakWriteable = _responseWriteable; dispatch_async(_callQueue, ^{ [weakSelf startReadWithHandler:^(grpc_byte_buffer *message) { @@ -216,7 +225,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; [weakSelf cancelCall]; return; } - [weakWriteable enqueueMessage:data completionHandler:^{ + [weakWriteable enqueueValue:data completionHandler:^{ [weakSelf startNextRead]; }]; }]; @@ -276,6 +285,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; } - (void)writesFinishedWithError:(NSError *)errorOrNil { + _requestWriter = nil; if (errorOrNil) { [self cancel]; } else { @@ -335,12 +345,14 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; #pragma mark GRXWriter implementation - (void)startWithWriteable:(id<GRXWriteable>)writeable { - // The following produces a retain cycle self:_responseWriteable:self, which is only - // broken when writesFinishedWithError: is sent to the wrapped writeable. - // Care is taken not to retain self strongly in any of the blocks used in - // the implementation of GRPCCall, so that the life of the instance is - // determined by this retain cycle. - _responseWriteable = [[GRPCDelegateWrapper alloc] initWithWriteable:writeable writer:self]; + // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). + // This makes RPCs in which the call isn't externally retained possible (as long as it is started + // before being autoreleased). + // Care is taken not to retain self strongly in any of the blocks used in this implementation, so + // that the life of the instance is determined by this retain cycle. + _self = self; + + _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable]; [self sendHeaders:_requestMetadata]; [self invokeCall]; } diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m index 40aade4f9a..12535c9616 100644 --- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m @@ -65,7 +65,8 @@ dispatch_async(gDefaultConcurrentQueue, ^{ while (YES) { // The following call blocks until an event is available. - grpc_event event = grpc_completion_queue_next(unmanagedQueue, gpr_inf_future); + grpc_event event = grpc_completion_queue_next(unmanagedQueue, + gpr_inf_future(GPR_CLOCK_REALTIME)); GRPCQueueCompletionHandler handler; switch (event.type) { case GRPC_OP_COMPLETE: diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index 45f10f5d63..1db63df77f 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -246,8 +246,11 @@ if (!_queue) { return nil; } - _call = grpc_channel_create_call(channel.unmanagedChannel, _queue.unmanagedQueue, - path.UTF8String, host.UTF8String, gpr_inf_future); + _call = grpc_channel_create_call(channel.unmanagedChannel, + _queue.unmanagedQueue, + path.UTF8String, + host.UTF8String, + gpr_inf_future(GPR_CLOCK_REALTIME)); if (_call == NULL) { return nil; } diff --git a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h index 9a30a2f966..1080001905 100644 --- a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h @@ -33,49 +33,39 @@ #import <Foundation/Foundation.h> -#import <RxLibrary/GRXWriter.h> +#import "GRXWriter.h" +#import "GRXWriteable.h" -@protocol GRXWriteable; - -// This is a thread-safe wrapper over a GRXWriteable instance. It lets one -// enqueue calls to a GRXWriteable instance for the main thread, guaranteeing -// that writesFinishedWithError: is the last message sent to it (no matter what -// messages are sent to the wrapper, in what order, nor from which thread). It -// also guarantees that, if cancelWithError: is called from the main thread -// (e.g. by the app cancelling the writes), no further messages are sent to the -// writeable except writesFinishedWithError:. +// This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a +// GRXWriteable instance for the main thread, guaranteeing that writesFinishedWithError: is the last +// message sent to it (no matter what messages are sent to the wrapper, in what order, nor from +// which thread). It also guarantees that, if cancelWithError: is called from the main thread (e.g. +// by the app cancelling the writes), no further messages are sent to the writeable except +// writesFinishedWithError:. // -// TODO(jcanizales): Let the user specify another queue for the writeable -// callbacks. -// TODO(jcanizales): Rename to GRXWriteableWrapper and move to the Rx library. -@interface GRPCDelegateWrapper : NSObject +// TODO(jcanizales): Let the user specify another queue for the writeable callbacks. +@interface GRXConcurrentWriteable : NSObject // The GRXWriteable passed is the wrapped writeable. -// Both the GRXWriter instance and the GRXWriteable instance are retained until -// writesFinishedWithError: is sent to the writeable, and released after that. -// This is used to create a retain cycle that keeps both objects alive until the -// writing is explicitly finished. -- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(GRXWriter *)writer - NS_DESIGNATED_INITIALIZER; +// The GRXWriteable instance is retained until writesFinishedWithError: is sent to it, and released +// after that. +- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable NS_DESIGNATED_INITIALIZER; // Enqueues writeValue: to be sent to the writeable in the main thread. // The passed handler is invoked from the main thread after writeValue: returns. -- (void)enqueueMessage:(NSData *)message completionHandler:(void (^)())handler; +- (void)enqueueValue:(id)value completionHandler:(void (^)())handler; -// Enqueues writesFinishedWithError:nil to be sent to the writeable in the main -// thread. After that message is sent to the writeable, all other methods of -// this object are effectively noops. +// Enqueues writesFinishedWithError:nil to be sent to the writeable in the main thread. After that +// message is sent to the writeable, all other methods of this object are effectively noops. - (void)enqueueSuccessfulCompletion; -// If the writeable has not yet received a writesFinishedWithError: message, this -// will enqueue one to be sent to it in the main thread, and cancel all other -// pending messages to the writeable enqueued by this object (both past and -// future). +// If the writeable has not yet received a writesFinishedWithError: message, this will enqueue one +// to be sent to it in the main thread, and cancel all other pending messages to the writeable +// enqueued by this object (both past and future). // The error argument cannot be nil. - (void)cancelWithError:(NSError *)error; -// Cancels all pending messages to the writeable enqueued by this object (both -// past and future). Because the writeable won't receive writesFinishedWithError:, -// this also releases the writeable and the writer. +// Cancels all pending messages to the writeable enqueued by this object (both past and future). +// Because the writeable won't receive writesFinishedWithError:, this also releases the writeable. - (void)cancelSilently; @end diff --git a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index 294cfb7e23..08bd079aea 100644 --- a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -31,45 +31,42 @@ * */ -#import "GRPCDelegateWrapper.h" +#import "GRXConcurrentWriteable.h" #import <RxLibrary/GRXWriteable.h> -@interface GRPCDelegateWrapper () -// These are atomic so that cancellation can nillify them from any thread. +@interface GRXConcurrentWriteable () +// This is atomic so that cancellation can nillify it from any thread. @property(atomic, strong) id<GRXWriteable> writeable; -@property(atomic, strong) GRXWriter *writer; @end -@implementation GRPCDelegateWrapper { +@implementation GRXConcurrentWriteable { dispatch_queue_t _writeableQueue; // This ensures that writesFinishedWithError: is only sent once to the writeable. dispatch_once_t _alreadyFinished; } - (instancetype)init { - return [self initWithWriteable:nil writer:nil]; + return [self initWithWriteable:nil]; } // Designated initializer -- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(GRXWriter *)writer { +- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable { if (self = [super init]) { _writeableQueue = dispatch_get_main_queue(); _writeable = writeable; - _writer = writer; } return self; } -- (void)enqueueMessage:(NSData *)message completionHandler:(void (^)())handler { +- (void)enqueueValue:(id)value completionHandler:(void (^)())handler { dispatch_async(_writeableQueue, ^{ - // We're racing a possible cancellation performed by another thread. To turn - // all already-enqueued messages into noops, cancellation nillifies the - // writeable property. If we get it before it's nil, we won - // the race. + // We're racing a possible cancellation performed by another thread. To turn all already- + // enqueued messages into noops, cancellation nillifies the writeable property. If we get it + // before it's nil, we won the race. id<GRXWriteable> writeable = self.writeable; if (writeable) { - [writeable writeValue:message]; + [writeable writeValue:value]; handler(); } }); @@ -78,13 +75,11 @@ - (void)enqueueSuccessfulCompletion { dispatch_async(_writeableQueue, ^{ dispatch_once(&_alreadyFinished, ^{ - // Cancellation is now impossible. None of the other three blocks can run - // concurrently with this one. + // Cancellation is now impossible. None of the other three blocks can run concurrently with + // this one. [self.writeable writesFinishedWithError:nil]; - // Break the retain cycle with writer, and skip any possible message to the - // wrapped writeable enqueued after this one. + // Skip any possible message to the wrapped writeable enqueued after this one. self.writeable = nil; - self.writer = nil; }); }); } @@ -92,29 +87,24 @@ - (void)cancelWithError:(NSError *)error { NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion."); dispatch_once(&_alreadyFinished, ^{ - // Skip any of the still-enqueued messages to the wrapped writeable. We use - // the atomic setter to nillify writer and writeable because we might be - // running concurrently with the blocks in _writeableQueue, and assignment - // with ARC isn't atomic. + // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to + // nillify writeable because we might be running concurrently with the blocks in + // _writeableQueue, and assignment with ARC isn't atomic. id<GRXWriteable> writeable = self.writeable; self.writeable = nil; dispatch_async(_writeableQueue, ^{ [writeable writesFinishedWithError:error]; - // Break the retain cycle with writer. - self.writer = nil; }); }); } - (void)cancelSilently { dispatch_once(&_alreadyFinished, ^{ - // Skip any of the still-enqueued messages to the wrapped writeable. We use - // the atomic setter to nillify writer and writeable because we might be - // running concurrently with the blocks in _writeableQueue, and assignment - // with ARC isn't atomic. + // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to + // nillify writeable because we might be running concurrently with the blocks in + // _writeableQueue, and assignment with ARC isn't atomic. self.writeable = nil; - self.writer = nil; }); } @end diff --git a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj index 34be705db2..f13fb8288b 100644 --- a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj +++ b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj @@ -391,7 +391,6 @@ 635697DC1B14FC11007A7283 /* Debug */ = { isa = XCBuildConfiguration; buildSettings = { - OTHER_LDFLAGS = "-ObjC"; PRODUCT_NAME = "$(TARGET_NAME)"; SKIP_INSTALL = YES; }; @@ -400,7 +399,6 @@ 635697DD1B14FC11007A7283 /* Release */ = { isa = XCBuildConfiguration; buildSettings = { - OTHER_LDFLAGS = "-ObjC"; PRODUCT_NAME = "$(TARGET_NAME)"; SKIP_INSTALL = YES; }; |