diff options
8 files changed, 306 insertions, 453 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index bf9441c27e..dad8594a26 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -36,6 +36,8 @@ #import "private/NSDictionary+GRPC.h" #import "private/NSError+GRPC.h" #import "private/utilities.h" +#import "private/GRPCChannelPool.h" +#import "private/GRPCCompletionQueue.h" // At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA, // SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE, @@ -819,8 +821,11 @@ const char *kCFStreamVarName = "grpc_cfstream"; _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; - GRPCWrappedCall *wrappedCall = - [[GRPCWrappedCall alloc] initWithHost:_host path:_path callOptions:_callOptions]; + GRPCPooledChannel *channel = [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; + GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:_path + completionQueue:[GRPCCompletionQueue completionQueue] + callOptions:_callOptions]; + if (wrappedCall == nil) { [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeUnavailable @@ -837,12 +842,6 @@ const char *kCFStreamVarName = "grpc_cfstream"; [self sendHeaders]; [self invokeCall]; - - // Connectivity monitor is not required for CFStream - char *enableCFStream = getenv(kCFStreamVarName); - if (enableCFStream == nil || enableCFStream[0] != '1') { - [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)]; - } } - (void)startWithWriteable:(id<GRXWriteable>)writeable { diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.h b/src/objective-c/GRPCClient/private/GRPCChannelPool.h index 7f8ee19fe5..338b6e440f 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannelPool.h +++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.h @@ -32,10 +32,13 @@ NS_ASSUME_NONNULL_BEGIN @class GRPCChannelPool; @class GRPCCompletionQueue; @class GRPCChannelConfiguration; +@class GRPCWrappedCall; /** - * Channel proxy that can be retained and automatically reestablish connection when the channel is - * disconnected. + * A proxied channel object that can be retained and creates GRPCWrappedCall object from. If a + * raw channel is not present (i.e. no tcp connection to the server) when a GRPCWrappedCall object + * is requested, it issues a connection/reconnection. The behavior of this object is to mimic that + * of gRPC core's channel object. */ @interface GRPCPooledChannel : NSObject @@ -47,24 +50,21 @@ NS_ASSUME_NONNULL_BEGIN * Initialize with an actual channel object \a channel and a reference to the channel pool. */ - (nullable instancetype)initWithChannelConfiguration: - (GRPCChannelConfiguration *)channelConfiguration - channelPool:(GRPCChannelPool *)channelPool - NS_DESIGNATED_INITIALIZER; + (GRPCChannelConfiguration *)channelConfiguration; /** - * Create a grpc core call object (grpc_call) from this channel. If channel is disconnected, get a + * Create a GRPCWrappedCall object (grpc_call) from this channel. If channel is disconnected, get a * new channel object from the channel pool. */ -- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path - completionQueue:(GRPCCompletionQueue *)queue - callOptions:(GRPCCallOptions *)callOptions; +- (nullable GRPCWrappedCall *)wrappedCallWithPath:(NSString *)path + completionQueue:(GRPCCompletionQueue *)queue + callOptions:(GRPCCallOptions *)callOptions; /** - * Return ownership and destroy the grpc_call object created by - * \a unmanagedCallWithPath:completionQueue:callOptions: and decrease channel refcount. If refcount - * of the channel becomes 0, return the channel object to channel pool. + * Notify the pooled channel that a wrapped call object is no longer referenced and will be + * dealloc'ed. */ -- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall; +- (void)notifyWrappedCallDealloc:(GRPCWrappedCall *)wrappedCall; /** * Force the channel to disconnect immediately. Subsequent calls to unmanagedCallWithPath: will @@ -78,6 +78,13 @@ NS_ASSUME_NONNULL_BEGIN @interface GRPCPooledChannel (Test) /** + * Initialize a pooled channel with non-default destroy delay for testing purpose. + */ +- (nullable instancetype)initWithChannelConfiguration: +(GRPCChannelConfiguration *)channelConfiguration + destroyDelay:(NSTimeInterval)destroyDelay; + +/** * Return the pointer to the raw channel wrapped. */ @property(atomic, readonly) GRPCChannel *wrappedChannel; @@ -118,7 +125,7 @@ NS_ASSUME_NONNULL_BEGIN * Get an instance of pool isolated from the global shared pool with channels' destroy delay being * \a destroyDelay. */ -- (nullable instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay; +- (nullable instancetype)initTestPool; @end diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.m b/src/objective-c/GRPCClient/private/GRPCChannelPool.m index 646f1e4b86..488766c0ed 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannelPool.m +++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.m @@ -28,6 +28,8 @@ #import "GRPCSecureChannelFactory.h" #import "utilities.h" #import "version.h" +#import "GRPCWrappedCall.h" +#import "GRPCCompletionQueue.h" #import <GRPCClient/GRPCCall+Cronet.h> #include <grpc/support/log.h> @@ -40,103 +42,139 @@ static dispatch_once_t gInitChannelPool; /** When all calls of a channel are destroyed, destroy the channel after this much seconds. */ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; -@interface GRPCChannelPool () - -- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration; - -- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration; - -@end - @implementation GRPCPooledChannel { - __weak GRPCChannelPool *_channelPool; GRPCChannelConfiguration *_channelConfiguration; - NSMutableSet *_unmanagedCalls; + NSTimeInterval _destroyDelay; + + NSHashTable<GRPCWrappedCall *> *_wrappedCalls; GRPCChannel *_wrappedChannel; + NSDate *_lastTimedDestroy; + dispatch_queue_t _timerQueue; } -- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration - channelPool:(GRPCChannelPool *)channelPool { - NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty."); - NSAssert(channelPool != nil, @"channelPool cannot be empty."); - if (channelPool == nil || channelConfiguration == nil) { - return nil; - } - - if ((self = [super init])) { - _channelPool = channelPool; - _channelConfiguration = channelConfiguration; - _unmanagedCalls = [NSMutableSet set]; - _wrappedChannel = nil; - } - - return self; +- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration { + return [self initWithChannelConfiguration:channelConfiguration destroyDelay:kDefaultChannelDestroyDelay]; } - (void)dealloc { - NSAssert([_unmanagedCalls count] == 0 && _wrappedChannel == nil, - @"Pooled channel should only be" - "destroyed after the wrapped channel is destroyed"); + if ([_wrappedCalls objectEnumerator].allObjects.count != 0) { + NSEnumerator *enumerator = [_wrappedCalls objectEnumerator]; + GRPCWrappedCall *wrappedCall; + while ((wrappedCall = [enumerator nextObject])) { + [wrappedCall channelDisconnected]; + }; + } } -- (grpc_call *)unmanagedCallWithPath:(NSString *)path - completionQueue:(GRPCCompletionQueue *)queue - callOptions:(GRPCCallOptions *)callOptions { +- (GRPCWrappedCall *)wrappedCallWithPath:(NSString *)path +completionQueue:(GRPCCompletionQueue *)queue +callOptions:(GRPCCallOptions *)callOptions { NSAssert(path.length > 0, @"path must not be empty."); NSAssert(queue != nil, @"completionQueue must not be empty."); NSAssert(callOptions, @"callOptions must not be empty."); if (path.length == 0 || queue == nil || callOptions == nil) return NULL; - grpc_call *call = NULL; + GRPCWrappedCall *call = nil; + @synchronized(self) { if (_wrappedChannel == nil) { - __strong GRPCChannelPool *strongPool = _channelPool; - if (strongPool) { - _wrappedChannel = [strongPool refChannelWithConfiguration:_channelConfiguration]; + _wrappedChannel = [[GRPCChannel alloc] initWithChannelConfiguration:_channelConfiguration]; + if (_wrappedChannel == nil) { + NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy."); + return nil; } - NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy."); } - call = - [_wrappedChannel unmanagedCallWithPath:path completionQueue:queue callOptions:callOptions]; - if (call != NULL) { - [_unmanagedCalls addObject:[NSValue valueWithPointer:call]]; + _lastTimedDestroy = nil; + + grpc_call *unmanagedCall = [_wrappedChannel unmanagedCallWithPath:path + completionQueue:[GRPCCompletionQueue completionQueue] + callOptions:callOptions]; + if (unmanagedCall == NULL) { + NSAssert(unmanagedCall != NULL, @"Unable to create grpc_call object"); + return nil; + } + + call = [[GRPCWrappedCall alloc] initWithUnmanagedCall:unmanagedCall pooledChannel:self]; + if (call == nil) { + NSAssert(call != nil, @"Unable to create GRPCWrappedCall object"); + return nil; } + + [_wrappedCalls addObject:call]; } return call; } -- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall { - if (unmanagedCall == NULL) { +- (void)notifyWrappedCallDealloc:(GRPCWrappedCall *)wrappedCall { + NSAssert(wrappedCall != nil, @"wrappedCall cannot be empty."); + if (wrappedCall == nil) { return; } - - grpc_call_unref(unmanagedCall); @synchronized(self) { - NSValue *removedCall = [NSValue valueWithPointer:unmanagedCall]; - [_unmanagedCalls removeObject:removedCall]; - if ([_unmanagedCalls count] == 0) { - _wrappedChannel = nil; - GRPCChannelPool *strongPool = _channelPool; - [strongPool unrefChannelWithConfiguration:_channelConfiguration]; + if ([_wrappedCalls objectEnumerator].allObjects.count == 0) { + NSDate *now = [NSDate date]; + _lastTimedDestroy = now; + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)_destroyDelay * NSEC_PER_SEC), + _timerQueue, ^{ + @synchronized(self) { + if (self->_lastTimedDestroy == now) { + self->_wrappedChannel = nil; + self->_lastTimedDestroy = nil; + } + } + }); } } } - (void)disconnect { + NSHashTable<GRPCWrappedCall *> *copiedWrappedCalls = nil; @synchronized(self) { if (_wrappedChannel != nil) { _wrappedChannel = nil; - [_unmanagedCalls removeAllObjects]; - GRPCChannelPool *strongPool = _channelPool; - [strongPool unrefChannelWithConfiguration:_channelConfiguration]; + copiedWrappedCalls = [_wrappedCalls copy]; + [_wrappedCalls removeAllObjects]; } } + NSEnumerator *enumerator = [copiedWrappedCalls objectEnumerator]; + GRPCWrappedCall *wrappedCall; + while ((wrappedCall = [enumerator nextObject])) { + [wrappedCall channelDisconnected]; + } } @end @implementation GRPCPooledChannel (Test) +- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration + destroyDelay:(NSTimeInterval)destroyDelay { + NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty."); + if (channelConfiguration == nil) { + return nil; + } + + if ((self = [super init])) { + _channelConfiguration = channelConfiguration; + _destroyDelay = destroyDelay; + _wrappedCalls = [[NSHashTable alloc] initWithOptions:NSHashTableWeakMemory capacity:1]; + _wrappedChannel = nil; + _lastTimedDestroy = nil; +#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300 + if (@available(iOS 8.0, macOS 10.10, *)) { + _timerQueue = dispatch_queue_create(NULL, + dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); + } else { +#else + { +#endif + _timerQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + } + } + + return self; +} + - (GRPCChannel *)wrappedChannel { GRPCChannel *channel = nil; @synchronized(self) { @@ -147,65 +185,28 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; @end -/** - * A convenience value type for cached channel. - */ -@interface GRPCChannelRecord : NSObject - -/** Pointer to the raw channel. May be nil when the channel has been destroyed. */ -@property GRPCChannel *channel; - -/** Channel proxy corresponding to this channel configuration. */ -@property GRPCPooledChannel *pooledChannel; - -/** Last time when a timed destroy is initiated on the channel. */ -@property NSDate *timedDestroyDate; - -/** Reference count of the proxy to the channel. */ -@property NSUInteger refCount; - -@end - -@implementation GRPCChannelRecord - -@end - @interface GRPCChannelPool () -- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay NS_DESIGNATED_INITIALIZER; +- (instancetype)initInstance NS_DESIGNATED_INITIALIZER; @end @implementation GRPCChannelPool { - NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannelRecord *> *_channelPool; - dispatch_queue_t _dispatchQueue; - NSTimeInterval _destroyDelay; + NSMutableDictionary<GRPCChannelConfiguration *, GRPCPooledChannel *> *_channelPool; } + (instancetype)sharedInstance { dispatch_once(&gInitChannelPool, ^{ gChannelPool = - [[GRPCChannelPool alloc] initInstanceWithDestroyDelay:kDefaultChannelDestroyDelay]; + [[GRPCChannelPool alloc] initInstance]; NSAssert(gChannelPool != nil, @"Cannot initialize global channel pool."); }); return gChannelPool; } -- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay { +- (instancetype)initInstance { if ((self = [super init])) { _channelPool = [NSMutableDictionary dictionary]; -#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300 - if (@available(iOS 8.0, macOS 10.10, *)) { - _dispatchQueue = dispatch_queue_create( - NULL, - dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); - } else { -#else - { -#endif - _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); - } - _destroyDelay = destroyDelay; // Connectivity monitor is not required for CFStream char *enableCFStream = getenv(kCFStreamVarName); @@ -231,86 +232,23 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; GRPCChannelConfiguration *configuration = [[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions]; @synchronized(self) { - GRPCChannelRecord *record = _channelPool[configuration]; - if (record == nil) { - record = [[GRPCChannelRecord alloc] init]; - record.pooledChannel = - [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration channelPool:self]; - _channelPool[configuration] = record; - pooledChannel = record.pooledChannel; - } else { - pooledChannel = record.pooledChannel; + pooledChannel = _channelPool[configuration]; + if (pooledChannel == nil) { + pooledChannel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration]; + _channelPool[configuration] = pooledChannel; } } return pooledChannel; } -- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration { - GRPCChannel *ret = nil; - @synchronized(self) { - NSAssert(configuration != nil, @"configuration cannot be empty."); - if (configuration == nil) { - return nil; - } - - GRPCChannelRecord *record = _channelPool[configuration]; - NSAssert(record != nil, @"No record corresponding to a proxy."); - if (record == nil) { - return nil; - } - - record.refCount++; - record.timedDestroyDate = nil; - if (record.channel == nil) { - // Channel is already destroyed; - record.channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration]; - } - ret = record.channel; - } - return ret; -} - -- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration { - @synchronized(self) { - GRPCChannelRecord *record = _channelPool[configuration]; - NSAssert(record != nil, @"No record corresponding to a proxy."); - if (record == nil) { - return; - } - NSAssert(record.refCount > 0, @"Inconsistent channel refcount."); - if (record.refCount > 0) { - record.refCount--; - if (record.refCount == 0) { - NSDate *now = [NSDate date]; - record.timedDestroyDate = now; - dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(_destroyDelay * NSEC_PER_SEC)), - _dispatchQueue, ^{ - @synchronized(self) { - if (now == record.timedDestroyDate) { - // Destroy the raw channel and reset related records. - record.timedDestroyDate = nil; - record.channel = nil; - } - } - }); - } - } - } -} - - (void)disconnectAllChannels { - NSMutableSet<GRPCPooledChannel *> *proxySet = [NSMutableSet set]; + NSDictionary *copiedPooledChannels; @synchronized(self) { - [_channelPool - enumerateKeysAndObjectsUsingBlock:^(GRPCChannelConfiguration *_Nonnull key, - GRPCChannelRecord *_Nonnull obj, BOOL *_Nonnull stop) { - obj.channel = nil; - obj.timedDestroyDate = nil; - [proxySet addObject:obj.pooledChannel]; - }]; + copiedPooledChannels = [NSDictionary dictionaryWithDictionary:_channelPool]; } - // Disconnect proxies - [proxySet enumerateObjectsUsingBlock:^(GRPCPooledChannel *_Nonnull obj, BOOL *_Nonnull stop) { + + // Disconnect pooled channels. + [copiedPooledChannels enumerateKeysAndObjectsUsingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { [obj disconnect]; }]; } @@ -323,8 +261,8 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; @implementation GRPCChannelPool (Test) -- (instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay { - return [self initInstanceWithDestroyDelay:destroyDelay]; +- (instancetype)initTestPool { + return [self initInstance]; } @end diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h index 19aa5367c7..0432190528 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h @@ -71,11 +71,16 @@ #pragma mark GRPCWrappedCall +@class GRPCPooledChannel; + @interface GRPCWrappedCall : NSObject -- (instancetype)initWithHost:(NSString *)host - path:(NSString *)path - callOptions:(GRPCCallOptions *)callOptions NS_DESIGNATED_INITIALIZER; +- (instancetype)init NS_UNAVAILABLE; + ++ (instancetype)new NS_UNAVAILABLE; + +- (instancetype)initWithUnmanagedCall:(grpc_call *)unmanagedCall + pooledChannel:(GRPCPooledChannel *)pooledChannel NS_DESIGNATED_INITIALIZER; - (void)startBatchWithOperations:(NSArray *)ops errorHandler:(void (^)(void))errorHandler; @@ -83,4 +88,6 @@ - (void)cancel; +- (void)channelDisconnected; + @end diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index 5788d0a003..7e2d9d3c6d 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -237,37 +237,21 @@ #pragma mark GRPCWrappedCall @implementation GRPCWrappedCall { - GRPCCompletionQueue *_queue; - GRPCPooledChannel *_channel; + __weak GRPCPooledChannel *_channel; grpc_call *_call; } -- (instancetype)init { - return [self initWithHost:nil path:nil callOptions:[[GRPCCallOptions alloc] init]]; -} - -- (instancetype)initWithHost:(NSString *)host - path:(NSString *)path - callOptions:(GRPCCallOptions *)callOptions { - NSAssert(host.length != 0 && path.length != 0, @"path and host cannot be nil."); +- (instancetype)initWithUnmanagedCall:(grpc_call *)unmanagedCall + pooledChannel:(GRPCPooledChannel *)pooledChannel { + NSAssert(unmanagedCall != NULL, @"unmanagedCall cannot be empty."); + NSAssert(pooledChannel != nil, @"pooledChannel cannot be empty."); + if (unmanagedCall == NULL || pooledChannel == nil) { + return nil; + } if ((self = [super init])) { - // Each completion queue consumes one thread. There's a trade to be made between creating and - // consuming too many threads and having contention of multiple calls in a single completion - // queue. Currently we use a singleton queue. - _queue = [GRPCCompletionQueue completionQueue]; - _channel = [[GRPCChannelPool sharedInstance] channelWithHost:host callOptions:callOptions]; - if (_channel == nil) { - NSAssert(_channel != nil, @"Failed to get a channel for the host."); - NSLog(@"Failed to get a channel for the host."); - return nil; - } - _call = [_channel unmanagedCallWithPath:path completionQueue:_queue callOptions:callOptions]; - if (_call == nil) { - NSAssert(_channel != nil, @"Failed to get a channel for the host."); - NSLog(@"Failed to create a call."); - return nil; - } + _call = unmanagedCall; + _channel = pooledChannel; } return self; } @@ -283,42 +267,67 @@ [GRPCOpBatchLog addOpBatchToLog:operations]; #endif - size_t nops = operations.count; - grpc_op *ops_array = gpr_malloc(nops * sizeof(grpc_op)); - size_t i = 0; - for (GRPCOperation *operation in operations) { - ops_array[i++] = operation.op; - } - grpc_call_error error = - grpc_call_start_batch(_call, ops_array, nops, (__bridge_retained void *)(^(bool success) { - if (!success) { - if (errorHandler) { - errorHandler(); - } else { - return; - } - } - for (GRPCOperation *operation in operations) { - [operation finish]; - } - }), - NULL); - gpr_free(ops_array); - - if (error != GRPC_CALL_OK) { - [NSException + @synchronized (self) { + if (_call != NULL) { + size_t nops = operations.count; + grpc_op *ops_array = gpr_malloc(nops * sizeof(grpc_op)); + size_t i = 0; + for (GRPCOperation *operation in operations) { + ops_array[i++] = operation.op; + } + grpc_call_error error; + error = grpc_call_start_batch(_call, ops_array, nops, (__bridge_retained void *)(^(bool success) { + if (!success) { + if (errorHandler) { + errorHandler(); + } else { + return; + } + } + for (GRPCOperation *operation in operations) { + [operation finish]; + } + }), + NULL); + gpr_free(ops_array); + + if (error != GRPC_CALL_OK) { + [NSException raise:NSInternalInconsistencyException - format:@"A precondition for calling grpc_call_start_batch wasn't met. Error %i", error]; + format:@"A precondition for calling grpc_call_start_batch wasn't met. Error %i", error]; + } + } } } - (void)cancel { - grpc_call_cancel(_call, NULL); + @synchronized (self) { + if (_call != NULL) { + grpc_call_cancel(_call, NULL); + } + } +} + +- (void)channelDisconnected { + @synchronized (self) { + if (_call != NULL) { + grpc_call_unref(_call); + _call = NULL; + } + } } - (void)dealloc { - [_channel destroyUnmanagedCall:_call]; - _channel = nil; + @synchronized (self) { + if (_call != NULL) { + grpc_call_unref(_call); + _call = NULL; + } + } + __strong GRPCPooledChannel *channel = _channel; + if (channel != nil) { + [channel notifyWrappedCallDealloc:self]; + } } @end diff --git a/src/objective-c/tests/ChannelTests/ChannelPoolTest.m b/src/objective-c/tests/ChannelTests/ChannelPoolTest.m index 9461945560..dc42d7c341 100644 --- a/src/objective-c/tests/ChannelTests/ChannelPoolTest.m +++ b/src/objective-c/tests/ChannelTests/ChannelPoolTest.m @@ -24,11 +24,9 @@ #define TEST_TIMEOUT 32 -NSString *kDummyHost = @"dummy.host"; -NSString *kDummyHost2 = @"dummy.host.2"; -NSString *kDummyPath = @"/dummy/path"; - -const NSTimeInterval kDestroyDelay = 1.0; +static NSString *kDummyHost = @"dummy.host"; +static NSString *kDummyHost2 = @"dummy.host.2"; +static NSString *kDummyPath = @"/dummy/path"; @interface ChannelPoolTest : XCTestCase @@ -40,134 +38,26 @@ const NSTimeInterval kDestroyDelay = 1.0; grpc_init(); } -- (void)testCreateChannelAndCall { - GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay]; - GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; - GRPCPooledChannel *channel = - (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options]; - XCTAssertNil(channel.wrappedChannel); - GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; - grpc_call *call = - [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; - XCTAssert(call != NULL); - XCTAssertNotNil(channel.wrappedChannel); - [channel destroyUnmanagedCall:call]; - XCTAssertNil(channel.wrappedChannel); -} - -- (void)testCacheChannel { - GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay]; +- (void)testCreateAndCacheChannel { + GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPool]; GRPCCallOptions *options1 = [[GRPCCallOptions alloc] init]; GRPCCallOptions *options2 = [options1 copy]; GRPCMutableCallOptions *options3 = [options1 mutableCopy]; options3.transportType = GRPCTransportTypeInsecure; - GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; - GRPCPooledChannel *channel1 = - (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options1]; - grpc_call *call1 = - [channel1 unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options1]; - GRPCPooledChannel *channel2 = - (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options2]; - grpc_call *call2 = - [channel2 unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options2]; - GRPCPooledChannel *channel3 = - (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options3]; - grpc_call *call3 = - [channel3 unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options3]; - GRPCPooledChannel *channel4 = - (GRPCPooledChannel *)[pool channelWithHost:kDummyHost2 callOptions:options1]; - grpc_call *call4 = - [channel4 unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options1]; - XCTAssertEqual(channel1.wrappedChannel, channel2.wrappedChannel); - XCTAssertNotEqual(channel1.wrappedChannel, channel3.wrappedChannel); - XCTAssertNotEqual(channel1.wrappedChannel, channel4.wrappedChannel); - XCTAssertNotEqual(channel3.wrappedChannel, channel4.wrappedChannel); - [channel1 destroyUnmanagedCall:call1]; - [channel2 destroyUnmanagedCall:call2]; - [channel3 destroyUnmanagedCall:call3]; - [channel4 destroyUnmanagedCall:call4]; -} - -- (void)testTimedDestroyChannel { - GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay]; - GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; - GRPCPooledChannel *channel = - (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options]; - GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; - grpc_call *call = - [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; - GRPCChannel *wrappedChannel = channel.wrappedChannel; - - [channel destroyUnmanagedCall:call]; - // Confirm channel is not destroyed at this time - call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; - XCTAssertEqual(wrappedChannel, channel.wrappedChannel); - - [channel destroyUnmanagedCall:call]; - sleep(kDestroyDelay + 1); - // Confirm channel is new at this time - call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; - XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel); - - // Confirm the new channel can create call - XCTAssert(call != NULL); - [channel destroyUnmanagedCall:call]; -} - -- (void)testPoolDisconnection { - GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay]; - GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; - GRPCPooledChannel *channel = - (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options]; - GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; - grpc_call *call = - [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; - XCTAssertNotNil(channel.wrappedChannel); - GRPCChannel *wrappedChannel = channel.wrappedChannel; - - // Test a new channel is created by requesting a channel from pool - [pool disconnectAllChannels]; - channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options]; - call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; - XCTAssertNotNil(channel.wrappedChannel); - XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel); - wrappedChannel = channel.wrappedChannel; - - // Test a new channel is created by requesting a new call from the previous proxy - [pool disconnectAllChannels]; - grpc_call *call2 = - [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; - XCTAssertNotNil(channel.wrappedChannel); - XCTAssertNotEqual(channel.wrappedChannel, wrappedChannel); - [channel destroyUnmanagedCall:call]; - [channel destroyUnmanagedCall:call2]; -} - -- (void)testUnrefCallFromStaleChannel { - GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay]; - GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; - GRPCPooledChannel *channel = - (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options]; - GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; - grpc_call *call = - [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; - - [pool disconnectAllChannels]; - channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options]; - grpc_call *call2 = - [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options]; - // Test unref the call of a stale channel will not cause the current channel going into timed - // destroy state - XCTAssertNotNil(channel.wrappedChannel); - GRPCChannel *wrappedChannel = channel.wrappedChannel; - [channel destroyUnmanagedCall:call]; - XCTAssertNotNil(channel.wrappedChannel); - XCTAssertEqual(wrappedChannel, channel.wrappedChannel); - // Test unref the call of the current channel will cause the channel going into timed destroy - // state - [channel destroyUnmanagedCall:call2]; - XCTAssertNil(channel.wrappedChannel); + GRPCPooledChannel *channel1 = [pool channelWithHost:kDummyHost callOptions:options1]; + GRPCPooledChannel *channel2 = [pool channelWithHost:kDummyHost callOptions:options2]; + GRPCPooledChannel *channel3 = [pool channelWithHost:kDummyHost callOptions:options3]; + GRPCPooledChannel *channel4 = [pool channelWithHost:kDummyHost2 callOptions:options1]; + + XCTAssertNotNil(channel1); + XCTAssertNotNil(channel2); + XCTAssertNotNil(channel3); + XCTAssertNotNil(channel4); + XCTAssertEqual(channel1, channel2); + XCTAssertNotEqual(channel1, channel3); + XCTAssertNotEqual(channel1, channel4); + XCTAssertNotEqual(channel3, channel4); } @end diff --git a/src/objective-c/tests/ChannelTests/ChannelTests.m b/src/objective-c/tests/ChannelTests/ChannelTests.m index 5547449092..ee7f8b6fdd 100644 --- a/src/objective-c/tests/ChannelTests/ChannelTests.m +++ b/src/objective-c/tests/ChannelTests/ChannelTests.m @@ -22,91 +22,99 @@ #import "../../GRPCClient/private/GRPCChannel.h" #import "../../GRPCClient/private/GRPCChannelPool.h" #import "../../GRPCClient/private/GRPCCompletionQueue.h" +#import "../../GRPCClient/private/GRPCWrappedCall.h" -/* -#define TEST_TIMEOUT 8 - -@interface GRPCChannelFake : NSObject - -- (instancetype)initWithCreateExpectation:(XCTestExpectation *)createExpectation - unrefExpectation:(XCTestExpectation *)unrefExpectation; +static NSString *kDummyHost = @"dummy.host"; +static NSString *kDummyPath = @"/dummy/path"; -- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path - completionQueue:(GRPCCompletionQueue *)queue - callOptions:(GRPCCallOptions *)callOptions; - -- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall; +@interface ChannelTests : XCTestCase @end -@implementation GRPCChannelFake { - __weak XCTestExpectation *_createExpectation; - __weak XCTestExpectation *_unrefExpectation; - long _grpcCallCounter; -} - -- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration -*)channelConfiguration { return nil; -} - -- (instancetype)initWithCreateExpectation:(XCTestExpectation *)createExpectation - unrefExpectation:(XCTestExpectation *)unrefExpectation { - if ((self = [super init])) { - _createExpectation = createExpectation; - _unrefExpectation = unrefExpectation; - _grpcCallCounter = 0; - } - return self; -} +@implementation ChannelTests -- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path - completionQueue:(GRPCCompletionQueue *)queue - callOptions:(GRPCCallOptions *)callOptions { - if (_createExpectation) [_createExpectation fulfill]; - return (grpc_call *)(++_grpcCallCounter); ++ (void)setUp { + grpc_init(); } -- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall { - if (_unrefExpectation) [_unrefExpectation fulfill]; +- (void)testPooledChannelCreatingChannel { + GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; + GRPCChannelConfiguration *config = [[GRPCChannelConfiguration alloc] initWithHost:kDummyHost + callOptions:options]; + GRPCPooledChannel *channel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:config]; + GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; + GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + XCTAssertNotNil(channel.wrappedChannel); + (void)wrappedCall; } -@end - -@interface GRPCChannelPoolFake : NSObject - -- (instancetype)initWithDelayedDestroyExpectation:(XCTestExpectation *)delayedDestroyExpectation; - -- (GRPCChannel *)rawChannelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions; - -- (void)delayedDestroyChannel; - -@end - -@implementation GRPCChannelPoolFake { - __weak XCTestExpectation *_delayedDestroyExpectation; -} - -- (instancetype)initWithDelayedDestroyExpectation:(XCTestExpectation *)delayedDestroyExpectation { - if ((self = [super init])) { - _delayedDestroyExpectation = delayedDestroyExpectation; +- (void)testTimedDestroyChannel { + const NSTimeInterval kDestroyDelay = 1.0; + GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; + GRPCChannelConfiguration *config = [[GRPCChannelConfiguration alloc] initWithHost:kDummyHost + callOptions:options]; + GRPCPooledChannel *channel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:config + destroyDelay:kDestroyDelay]; + GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; + GRPCWrappedCall *wrappedCall; + GRPCChannel *wrappedChannel; + @autoreleasepool { + wrappedCall = [channel wrappedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + XCTAssertNotNil(channel.wrappedChannel); + + // Unref and ref channel immediately; expect using the same raw channel. + wrappedChannel = channel.wrappedChannel; + + wrappedCall = nil; + wrappedCall = [channel wrappedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + XCTAssertEqual(channel.wrappedChannel, wrappedChannel); + + // Unref and ref channel after destroy delay; expect a new raw channel. + wrappedCall = nil; } - return self; -} - -- (void)delayedDestroyChannel { - if (_delayedDestroyExpectation) [_delayedDestroyExpectation fulfill]; + sleep(kDestroyDelay + 1); + XCTAssertNil(channel.wrappedChannel); + wrappedCall = [channel wrappedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + XCTAssertNotEqual(channel.wrappedChannel, wrappedChannel); } -@end */ - -@interface ChannelTests : XCTestCase - -@end - -@implementation ChannelTests - -+ (void)setUp { - grpc_init(); +- (void)testDisconnect { + const NSTimeInterval kDestroyDelay = 1.0; + GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; + GRPCChannelConfiguration *config = [[GRPCChannelConfiguration alloc] initWithHost:kDummyHost + callOptions:options]; + GRPCPooledChannel *channel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:config + destroyDelay:kDestroyDelay]; + GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; + GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + XCTAssertNotNil(channel.wrappedChannel); + + // Disconnect; expect wrapped channel to be dropped + [channel disconnect]; + XCTAssertNil(channel.wrappedChannel); + + // Create a new call and unref the old call; confirm that destroy of the old call does not make + // the channel disconnect, even after the destroy delay. + GRPCWrappedCall *wrappedCall2 = [channel wrappedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + XCTAssertNotNil(channel.wrappedChannel); + GRPCChannel *wrappedChannel = channel.wrappedChannel; + wrappedCall = nil; + sleep(kDestroyDelay + 1); + XCTAssertNotNil(channel.wrappedChannel); + XCTAssertEqual(wrappedChannel, channel.wrappedChannel); + (void)wrappedCall2; } @end diff --git a/src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/ChannelTests.xcscheme b/src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/ChannelTests.xcscheme index 8c8623d7b2..acae965bed 100644 --- a/src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/ChannelTests.xcscheme +++ b/src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/ChannelTests.xcscheme @@ -37,11 +37,6 @@ BlueprintName = "ChannelTests" ReferencedContainer = "container:Tests.xcodeproj"> </BuildableReference> - <SkippedTests> - <Test - Identifier = "ChannelTests"> - </Test> - </SkippedTests> </TestableReference> </Testables> <AdditionalOptions> |