diff options
author | Muxi Yan <mxyan@google.com> | 2018-11-18 22:47:35 -0800 |
---|---|---|
committer | Muxi Yan <mxyan@google.com> | 2018-11-18 22:47:35 -0800 |
commit | f0cbcde73195b8e17130538c3479d4c0e3bcd2a2 (patch) | |
tree | 06483748c28dc85c4927eec35e88b6001923999e | |
parent | 87abab45c99ab4b40718557cbc1c25dcd7f5a418 (diff) |
New channel pool design
-rw-r--r-- | src/objective-c/GRPCClient/GRPCCall+ChannelArg.m | 2 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/GRPCCall.m | 2 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCChannel.h | 36 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCChannel.m | 169 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCChannelPool.h | 74 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCChannelPool.m | 263 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCWrappedCall.m | 31 | ||||
-rw-r--r-- | src/objective-c/tests/ChannelTests/ChannelPoolTest.m | 222 | ||||
-rw-r--r-- | src/objective-c/tests/ChannelTests/ChannelTests.m | 126 |
9 files changed, 573 insertions, 352 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall+ChannelArg.m b/src/objective-c/GRPCClient/GRPCCall+ChannelArg.m index 971c2803e2..703cff63bb 100644 --- a/src/objective-c/GRPCClient/GRPCCall+ChannelArg.m +++ b/src/objective-c/GRPCClient/GRPCCall+ChannelArg.m @@ -36,7 +36,7 @@ } + (void)closeOpenConnections { - [GRPCChannelPool closeOpenConnections]; + [[GRPCChannelPool sharedInstance] closeOpenConnections]; } + (void)setDefaultCompressMethod:(GRPCCompressAlgorithm)algorithm forhost:(nonnull NSString *)host { diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 19de004cbc..6f4b1647a5 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -488,7 +488,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; requestsWriter:(GRXWriter *)requestWriter callOptions:(GRPCCallOptions *)callOptions { // Purposely using pointer rather than length ([host length] == 0) for backwards compatibility. - NSAssert(host && path, @"Neither host nor path can be nil."); + NSAssert(host != nil && path != nil, @"Neither host nor path can be nil."); NSAssert(safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value."); NSAssert(requestWriter.state == GRXWriterStateNotStarted, diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.h b/src/objective-c/GRPCClient/private/GRPCChannel.h index 4dbe0c276c..de6d1bf4a2 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannel.h +++ b/src/objective-c/GRPCClient/private/GRPCChannel.h @@ -61,47 +61,19 @@ NS_ASSUME_NONNULL_BEGIN + (nullable instancetype) new NS_UNAVAILABLE; /** - * Create a channel with remote \a host and signature \a channelConfigurations. Destroy delay is - * defaulted to 30 seconds. - */ -- (nullable instancetype)initWithChannelConfiguration: - (GRPCChannelConfiguration *)channelConfiguration; - -/** - * Create a channel with remote \a host, signature \a channelConfigurations, and destroy delay of - * \a destroyDelay. + * Create a channel with remote \a host and signature \a channelConfigurations. */ - (nullable instancetype)initWithChannelConfiguration: (GRPCChannelConfiguration *)channelConfiguration - destroyDelay:(NSTimeInterval)destroyDelay NS_DESIGNATED_INITIALIZER; /** - * Create a grpc core call object from this channel. The channel's refcount is added by 1. If no - * call is created, NULL is returned, and if the reason is because the channel is already - * disconnected, \a disconnected is set to YES. When the returned call is unreffed, the caller is - * obligated to call \a unref method once. \a disconnected may be null. + * Create a grpc core call object (grpc_call) from this channel. If no call is created, NULL is + * returned. */ - (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path completionQueue:(GRPCCompletionQueue *)queue - callOptions:(GRPCCallOptions *)callOptions - disconnected:(nullable BOOL *)disconnected; - -/** - * Unref the channel when a call is done. It also decreases the channel's refcount. If the refcount - * of the channel decreases to 0, the channel is destroyed after the destroy delay. - */ -- (void)unref; - -/** - * Force the channel to be disconnected and destroyed. - */ -- (void)disconnect; - -/** - * Return whether the channel is already disconnected. - */ -@property(readonly) BOOL disconnected; + callOptions:(GRPCCallOptions *)callOptions; @end diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m index 0220141db0..81e53d48e3 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannel.m +++ b/src/objective-c/GRPCClient/private/GRPCChannel.m @@ -34,9 +34,6 @@ #import <GRPCClient/GRPCCall+Cronet.h> #import <GRPCClient/GRPCCallOptions.h> -/** When all calls of a channel are destroyed, destroy the channel after this much seconds. */ -static const NSTimeInterval kDefaultChannelDestroyDelay = 30; - @implementation GRPCChannelConfiguration - (instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions { @@ -178,43 +175,18 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; @implementation GRPCChannel { GRPCChannelConfiguration *_configuration; - dispatch_queue_t _dispatchQueue; grpc_channel *_unmanagedChannel; - NSTimeInterval _destroyDelay; - - NSUInteger _refcount; - NSDate *_lastDispatch; -} -@synthesize disconnected = _disconnected; - -- (instancetype)initWithChannelConfiguration: - (GRPCChannelConfiguration *)channelConfiguration { - return [self initWithChannelConfiguration:channelConfiguration - destroyDelay:kDefaultChannelDestroyDelay]; } - (instancetype)initWithChannelConfiguration: - (GRPCChannelConfiguration *)channelConfiguration - destroyDelay:(NSTimeInterval)destroyDelay { + (GRPCChannelConfiguration *)channelConfiguration { NSAssert(channelConfiguration != nil, @"channelConfiguration must not be empty."); - NSAssert(destroyDelay > 0, @"destroyDelay must be greater than 0."); if (channelConfiguration == nil) return nil; - if (destroyDelay <= 0) return nil; if ((self = [super init])) { _configuration = [channelConfiguration copy]; -#if __IPHONE_OS_VERSION_MAX_ALLOWED < 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED < 101300 - if (@available(iOS 8.0, macOS 10.10, *)) { - _dispatchQueue = dispatch_queue_create( - NULL, - dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); - } else { -#else - { -#endif - _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); - } + // Create gRPC core channel object. NSString *host = channelConfiguration.host; NSAssert(host.length != 0, @"host cannot be nil"); @@ -233,119 +205,54 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; NSLog(@"Unable to create channel."); return nil; } - _destroyDelay = destroyDelay; - _disconnected = NO; } return self; } - (grpc_call *)unmanagedCallWithPath:(NSString *)path completionQueue:(GRPCCompletionQueue *)queue - callOptions:(GRPCCallOptions *)callOptions - disconnected:(BOOL *)disconnected { + callOptions:(GRPCCallOptions *)callOptions { NSAssert(path.length > 0, @"path must not be empty."); NSAssert(queue != nil, @"completionQueue must not be empty."); NSAssert(callOptions != nil, @"callOptions must not be empty."); - if (path.length == 0) return nil; - if (queue == nil) return nil; - if (callOptions == nil) return nil; - - __block BOOL isDisconnected = NO; - __block grpc_call *call = NULL; - dispatch_sync(_dispatchQueue, ^{ - if (self->_disconnected) { - isDisconnected = YES; - } else { - NSAssert(self->_unmanagedChannel != NULL, - @"Channel should have valid unmanaged channel."); - if (self->_unmanagedChannel == NULL) return; - - NSString *serverAuthority = - callOptions.transportType == GRPCTransportTypeCronet ? nil : callOptions.serverAuthority; - NSTimeInterval timeout = callOptions.timeout; - NSAssert(timeout >= 0, @"Invalid timeout"); - if (timeout < 0) return; - grpc_slice host_slice = grpc_empty_slice(); - if (serverAuthority) { - host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String); - } - grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String); - gpr_timespec deadline_ms = - timeout == 0 - ? gpr_inf_future(GPR_CLOCK_REALTIME) - : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN)); - call = grpc_channel_create_call(self->_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS, - queue.unmanagedQueue, path_slice, - serverAuthority ? &host_slice : NULL, deadline_ms, NULL); - if (serverAuthority) { - grpc_slice_unref(host_slice); - } - grpc_slice_unref(path_slice); - if (call == NULL) { - NSLog(@"Unable to create call."); - } else { - // Ref the channel; - [self ref]; - } + if (path.length == 0) return NULL; + if (queue == nil) return NULL; + if (callOptions == nil) return NULL; + + grpc_call *call = NULL; + @synchronized(self) { + NSAssert(_unmanagedChannel != NULL, + @"Channel should have valid unmanaged channel."); + if (_unmanagedChannel == NULL) return NULL; + + NSString *serverAuthority = + callOptions.transportType == GRPCTransportTypeCronet ? nil : callOptions.serverAuthority; + NSTimeInterval timeout = callOptions.timeout; + NSAssert(timeout >= 0, @"Invalid timeout"); + if (timeout < 0) return NULL; + grpc_slice host_slice = grpc_empty_slice(); + if (serverAuthority) { + host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String); } - }); - if (disconnected != nil) { - *disconnected = isDisconnected; - } - return call; -} - -// This function should be called on _dispatchQueue. -- (void)ref { - _refcount++; - if (_refcount == 1 && _lastDispatch != nil) { - _lastDispatch = nil; - } -} - -- (void)unref { - dispatch_async(_dispatchQueue, ^{ - NSAssert(self->_refcount > 0, @"Illegal reference count."); - if (self->_refcount == 0) { - NSLog(@"Illegal reference count."); - return; + grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String); + gpr_timespec deadline_ms = + timeout == 0 + ? gpr_inf_future(GPR_CLOCK_REALTIME) + : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN)); + call = grpc_channel_create_call(_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS, + queue.unmanagedQueue, path_slice, + serverAuthority ? &host_slice : NULL, deadline_ms, NULL); + if (serverAuthority) { + grpc_slice_unref(host_slice); } - self->_refcount--; - if (self->_refcount == 0 && !self->_disconnected) { - // Start timer. - dispatch_time_t delay = - dispatch_time(DISPATCH_TIME_NOW, (int64_t)self->_destroyDelay * NSEC_PER_SEC); - NSDate *now = [NSDate date]; - self->_lastDispatch = now; - dispatch_after(delay, self->_dispatchQueue, ^{ - // Timed disconnection. - if (!self->_disconnected && self->_lastDispatch == now) { - grpc_channel_destroy(self->_unmanagedChannel); - self->_unmanagedChannel = NULL; - self->_disconnected = YES; - } - }); + grpc_slice_unref(path_slice); + NSAssert(call != nil, @"Unable to create call."); + if (call == NULL) { + NSLog(@"Unable to create call."); } - }); -} - -- (void)disconnect { - dispatch_async(_dispatchQueue, ^{ - if (!self->_disconnected) { - grpc_channel_destroy(self->_unmanagedChannel); - self->_unmanagedChannel = nil; - self->_disconnected = YES; - } - }); -} - -- (BOOL)disconnected { - __block BOOL disconnected; - dispatch_sync(_dispatchQueue, ^{ - disconnected = self->_disconnected; - }); - return disconnected; + } + return call; } - (void)dealloc { diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.h b/src/objective-c/GRPCClient/private/GRPCChannelPool.h index 48779c4449..887bd5f89f 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannelPool.h +++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.h @@ -27,7 +27,53 @@ NS_ASSUME_NONNULL_BEGIN +@protocol GRPCChannel; @class GRPCChannel; +@class GRPCChannelPool; +@class GRPCCompletionQueue; +@class GRPCChannelConfiguration; + +/** + * Channel proxy that can be retained and automatically reestablish connection when the channel is + * disconnected. + */ +@interface GRPCPooledChannel : NSObject + +/** + * Initialize with an actual channel object \a channel and a reference to the channel pool. + */ +- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration + channelPool:(GRPCChannelPool *)channelPool; + + +/** + * Create a grpc core call object (grpc_call) from this channel. If channel is disconnected, get a + * new channel object from the channel pool. + */ +- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path + completionQueue:(GRPCCompletionQueue *)queue + callOptions:(GRPCCallOptions *)callOptions; + +/** + * Return ownership and destroy the grpc_call object created by + * \a unmanagedCallWithPath:completionQueue:callOptions: and decrease channel refcount. If refcount + * of the channel becomes 0, return the channel object to channel pool. + */ +- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall; + +/** + * Force the channel to disconnect immediately. + */ +- (void)disconnect; + +// The following methods and properties are for test only + +/** + * Return the pointer to the real channel wrapped by the proxy. + */ +@property(atomic, readonly) GRPCChannel *wrappedChannel; + +@end /** * Manage the pool of connected channels. When a channel is no longer referenced by any call, @@ -36,37 +82,41 @@ NS_ASSUME_NONNULL_BEGIN @interface GRPCChannelPool : NSObject /** - * Get the singleton instance + * Get the global channel pool. */ + (nullable instancetype)sharedInstance; /** - * Return a channel with a particular configuration. If the channel does not exist, execute \a - * createChannel then add it in the pool. If the channel exists, increase its reference count. + * Return a channel with a particular configuration. The channel may be a cached channel. */ -- (GRPCChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions; +- (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions; /** * This method is deprecated. * * Destroy all open channels and close their connections. */ -+ (void)closeOpenConnections; +- (void)closeOpenConnections; // Test-only methods below /** - * Return a channel with a special destroy delay. If \a destroyDelay is 0, use the default destroy - * delay. + * Get an instance of pool isolated from the global shared pool. This method is for test only. + * Global pool should be used in production. + */ +- (nullable instancetype)init; + +/** + * Simulate a network transition event and destroy all channels. This method is for internal and + * test only. */ -- (GRPCChannel *)channelWithHost:(NSString *)host - callOptions:(GRPCCallOptions *)callOptions - destroyDelay:(NSTimeInterval)destroyDelay; +- (void)disconnectAllChannels; /** - * Simulate a network transition event and destroy all channels. + * Set the destroy delay of channels. A channel should be destroyed if it stayed idle (no active + * call on it) for this period of time. This property is for test only. */ -- (void)destroyAllChannels; +@property(atomic) NSTimeInterval destroyDelay; @end diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.m b/src/objective-c/GRPCClient/private/GRPCChannelPool.m index 5e2e9bcfeb..92dd4c86ae 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannelPool.m +++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.m @@ -37,8 +37,150 @@ extern const char *kCFStreamVarName; static GRPCChannelPool *gChannelPool; static dispatch_once_t gInitChannelPool; +/** When all calls of a channel are destroyed, destroy the channel after this much seconds. */ +static const NSTimeInterval kDefaultChannelDestroyDelay = 30; + +@interface GRPCChannelPool() + +- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration; + +- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration; + +@end + +@implementation GRPCPooledChannel { + __weak GRPCChannelPool *_channelPool; + GRPCChannelConfiguration *_channelConfiguration; + NSMutableSet *_unmanagedCalls; +} + +@synthesize wrappedChannel = _wrappedChannel; + +- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration + channelPool:(GRPCChannelPool *)channelPool { + NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty."); + NSAssert(channelPool != nil, @"channelPool cannot be empty."); + if (channelPool == nil || channelConfiguration == nil) { + return nil; + } + + if ((self = [super init])) { + _channelPool = channelPool; + _channelConfiguration = channelConfiguration; + _unmanagedCalls = [NSMutableSet set]; + } + + return self; +} + +- (grpc_call *)unmanagedCallWithPath:(NSString *)path + completionQueue:(GRPCCompletionQueue *)queue + callOptions:(GRPCCallOptions *)callOptions { + NSAssert(path.length > 0, @"path must not be empty."); + NSAssert(queue != nil, @"completionQueue must not be empty."); + NSAssert(callOptions, @"callOptions must not be empty."); + if (path.length == 0 || queue == nil || callOptions == nil) return NULL; + + grpc_call *call = NULL; + @synchronized(self) { + if (_wrappedChannel == nil) { + __strong GRPCChannelPool *strongPool = _channelPool; + if (strongPool) { + _wrappedChannel = [strongPool refChannelWithConfiguration:_channelConfiguration]; + } + NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy."); + } + call = [_wrappedChannel unmanagedCallWithPath:path completionQueue:queue callOptions:callOptions]; + if (call != NULL) { + [_unmanagedCalls addObject:[NSValue valueWithPointer:call]]; + } + } + return call; +} + +- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall { + if (unmanagedCall == nil) return; + + grpc_call_unref(unmanagedCall); + BOOL timedDestroy = NO; + @synchronized(self) { + if ([_unmanagedCalls containsObject:[NSValue valueWithPointer:unmanagedCall]]) { + [_unmanagedCalls removeObject:[NSValue valueWithPointer:unmanagedCall]]; + if ([_unmanagedCalls count] == 0) { + timedDestroy = YES; + } + } + } + if (timedDestroy) { + [self timedDestroy]; + } +} + +- (void)disconnect { + @synchronized(self) { + _wrappedChannel = nil; + [_unmanagedCalls removeAllObjects]; + } +} + +- (GRPCChannel *)wrappedChannel { + GRPCChannel *channel = nil; + @synchronized (self) { + channel = _wrappedChannel; + } + return channel; +} + +- (void)timedDestroy { + __strong GRPCChannelPool *pool = nil; + @synchronized(self) { + // Check if we still want to destroy the channel. + if ([_unmanagedCalls count] == 0) { + pool = _channelPool; + _wrappedChannel = nil; + } + } + [pool unrefChannelWithConfiguration:_channelConfiguration]; +} + +@end + +/** + * A convenience value type for cached channel. + */ +@interface GRPCChannelRecord : NSObject <NSCopying> + +/** Pointer to the raw channel. May be nil when the channel has been destroyed. */ +@property GRPCChannel *channel; + +/** Channel proxy corresponding to this channel configuration. */ +@property GRPCPooledChannel *proxy; + +/** Last time when a timed destroy is initiated on the channel. */ +@property NSDate *timedDestroyDate; + +/** Reference count of the proxy to the channel. */ +@property NSUInteger refcount; + +@end + +@implementation GRPCChannelRecord + +- (id)copyWithZone:(NSZone *)zone { + GRPCChannelRecord *newRecord = [[GRPCChannelRecord allocWithZone:zone] init]; + newRecord.channel = _channel; + newRecord.proxy = _proxy; + newRecord.timedDestroyDate = _timedDestroyDate; + newRecord.refcount = _refcount; + + return newRecord; +} + +@end + @implementation GRPCChannelPool { - NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannel *> *_channelPool; + NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannelRecord *> *_channelPool; + dispatch_queue_t _dispatchQueue; } + (instancetype)sharedInstance { @@ -52,6 +194,18 @@ static dispatch_once_t gInitChannelPool; - (instancetype)init { if ((self = [super init])) { _channelPool = [NSMutableDictionary dictionary]; +#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300 + if (@available(iOS 8.0, macOS 10.10, *)) { + _dispatchQueue = dispatch_queue_create( + NULL, + dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); + } else { +#else + { +#endif + _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + } + _destroyDelay = kDefaultChannelDestroyDelay; // Connectivity monitor is not required for CFStream char *enableCFStream = getenv(kCFStreamVarName); @@ -62,51 +216,106 @@ static dispatch_once_t gInitChannelPool; return self; } -- (GRPCChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions { - return [self channelWithHost:host callOptions:callOptions destroyDelay:0]; -} - -- (GRPCChannel *)channelWithHost:(NSString *)host - callOptions:(GRPCCallOptions *)callOptions - destroyDelay:(NSTimeInterval)destroyDelay { +- (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions { NSAssert(host.length > 0, @"Host must not be empty."); NSAssert(callOptions != nil, @"callOptions must not be empty."); if (host.length == 0) return nil; if (callOptions == nil) return nil; - GRPCChannel *channel; + GRPCPooledChannel *channelProxy = nil; GRPCChannelConfiguration *configuration = [[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions]; @synchronized(self) { - channel = _channelPool[configuration]; - if (channel == nil || channel.disconnected) { - if (destroyDelay == 0) { - channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration]; - } else { - channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration - destroyDelay:destroyDelay]; - } - _channelPool[configuration] = channel; + GRPCChannelRecord *record = _channelPool[configuration]; + if (record == nil) { + record = [[GRPCChannelRecord alloc] init]; + record.proxy = [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration + channelPool:self]; + record.timedDestroyDate = nil; + _channelPool[configuration] = record; + channelProxy = record.proxy; + } else { + channelProxy = record.proxy; } } - return channel; + return channelProxy; } -+ (void)closeOpenConnections { - [[GRPCChannelPool sharedInstance] destroyAllChannels]; +- (void)closeOpenConnections { + [self disconnectAllChannels]; } -- (void)destroyAllChannels { - @synchronized(self) { - for (id key in _channelPool) { - [_channelPool[key] disconnect]; +- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration { + GRPCChannel *ret = nil; + @synchronized (self) { + NSAssert(configuration != nil, @"configuration cannot be empty."); + if (configuration == nil) return nil; + + GRPCChannelRecord *record = _channelPool[configuration]; + NSAssert(record != nil, @"No record corresponding to a proxy."); + if (record == nil) return nil; + + if (record.channel == nil) { + // Channel is already destroyed; + record.channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration]; + record.timedDestroyDate = nil; + record.refcount = 1; + ret = record.channel; + } else { + ret = record.channel; + record.timedDestroyDate = nil; + record.refcount++; } - _channelPool = [NSMutableDictionary dictionary]; } + return ret; +} + +- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration { + @synchronized (self) { + GRPCChannelRecord *record = _channelPool[configuration]; + NSAssert(record != nil, @"No record corresponding to a proxy."); + if (record == nil) return; + NSAssert(record.refcount > 0, @"Inconsistent channel refcount."); + if (record.refcount > 0) { + record.refcount--; + if (record.refcount == 0) { + NSDate *now = [NSDate date]; + record.timedDestroyDate = now; + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(_destroyDelay * NSEC_PER_SEC)), + _dispatchQueue, + ^{ + @synchronized (self) { + if (now == record.timedDestroyDate) { + // Destroy the raw channel and reset related records. + record.timedDestroyDate = nil; + record.refcount = 0; + record.channel = nil; + } + } + }); + } + } + } +} + +- (void)disconnectAllChannels { + NSMutableSet<GRPCPooledChannel *> *proxySet = [NSMutableSet set]; + @synchronized (self) { + [_channelPool enumerateKeysAndObjectsUsingBlock:^(GRPCChannelConfiguration * _Nonnull key, GRPCChannelRecord * _Nonnull obj, BOOL * _Nonnull stop) { + obj.channel = nil; + obj.timedDestroyDate = nil; + obj.refcount = 0; + [proxySet addObject:obj.proxy]; + }]; + } + // Disconnect proxies + [proxySet enumerateObjectsUsingBlock:^(GRPCPooledChannel * _Nonnull obj, BOOL * _Nonnull stop) { + [obj disconnect]; + }]; } - (void)connectivityChange:(NSNotification *)note { - [self destroyAllChannels]; + [self disconnectAllChannels]; } - (void)dealloc { diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index ae7f07f119..5c402250cc 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -238,7 +238,7 @@ @implementation GRPCWrappedCall { GRPCCompletionQueue *_queue; - GRPCChannel *_channel; + GRPCPooledChannel *_channel; grpc_call *_call; } @@ -257,21 +257,15 @@ // consuming too many threads and having contention of multiple calls in a single completion // queue. Currently we use a singleton queue. _queue = [GRPCCompletionQueue completionQueue]; - BOOL disconnected = NO; - do { - _channel = [[GRPCChannelPool sharedInstance] channelWithHost:host callOptions:callOptions]; - if (_channel == nil) { - NSAssert(_channel != nil, @"Failed to get a channel for the host."); - NSLog(@"Failed to get a channel for the host."); - return nil; - } - _call = [_channel unmanagedCallWithPath:path - completionQueue:_queue - callOptions:callOptions - disconnected:&disconnected]; - // Try create another channel if the current channel is disconnected (due to idleness or - // connectivity monitor disconnection). - } while (_call == NULL && disconnected); + _channel = [[GRPCChannelPool sharedInstance] channelWithHost:host callOptions:callOptions]; + if (_channel == nil) { + NSAssert(_channel != nil, @"Failed to get a channel for the host."); + NSLog(@"Failed to get a channel for the host."); + return nil; + } + _call = [_channel unmanagedCallWithPath:path + completionQueue:_queue + callOptions:callOptions]; if (_call == nil) { NSAssert(_channel != nil, @"Failed to get a channel for the host."); NSLog(@"Failed to create a call."); @@ -326,10 +320,7 @@ } - (void)dealloc { - if (_call) { - grpc_call_unref(_call); - } - [_channel unref]; + [_channel unrefUnmanagedCall:_call]; _channel = nil; } diff --git a/src/objective-c/tests/ChannelTests/ChannelPoolTest.m b/src/objective-c/tests/ChannelTests/ChannelPoolTest.m index b85e62feb5..51819b12c2 100644 --- a/src/objective-c/tests/ChannelTests/ChannelPoolTest.m +++ b/src/objective-c/tests/ChannelTests/ChannelPoolTest.m @@ -25,6 +25,8 @@ #define TEST_TIMEOUT 32 NSString *kDummyHost = @"dummy.host"; +NSString *kDummyHost2 = @"dummy.host.2"; +NSString *kDummyPath = @"/dummy/path"; @interface ChannelPoolTest : XCTestCase @@ -36,94 +38,156 @@ NSString *kDummyHost = @"dummy.host"; grpc_init(); } -- (void)testChannelPooling { - NSString *kDummyHost = @"dummy.host"; - NSString *kDummyHost2 = @"dummy.host2"; +- (void)testCreateChannelAndCall { + GRPCChannelPool *pool = [[GRPCChannelPool alloc] init]; + GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; + GRPCPooledChannel *channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost + callOptions:options]; + XCTAssertNil(channel.wrappedChannel); + GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; + grpc_call *call = [channel unmanagedCallWithPath:kDummyPath + completionQueue:cq callOptions:options]; + XCTAssert(call != NULL); + XCTAssertNotNil(channel.wrappedChannel); + [channel unrefUnmanagedCall:call]; + XCTAssertNil(channel.wrappedChannel); +} - GRPCMutableCallOptions *options1 = [[GRPCMutableCallOptions alloc] init]; +- (void)testCacheChannel { + GRPCChannelPool *pool = [[GRPCChannelPool alloc] init]; + GRPCCallOptions *options1 = [[GRPCCallOptions alloc] init]; GRPCCallOptions *options2 = [options1 copy]; - GRPCMutableCallOptions *options3 = [options2 mutableCopy]; + GRPCMutableCallOptions *options3 = [options1 mutableCopy]; options3.transportType = GRPCTransportTypeInsecure; - - GRPCChannelPool *pool = [GRPCChannelPool sharedInstance]; - - GRPCChannel *channel1 = [pool channelWithHost:kDummyHost callOptions:options1]; - GRPCChannel *channel2 = [pool channelWithHost:kDummyHost callOptions:options2]; - GRPCChannel *channel3 = [pool channelWithHost:kDummyHost2 callOptions:options1]; - GRPCChannel *channel4 = [pool channelWithHost:kDummyHost callOptions:options3]; - XCTAssertEqual(channel1, channel2); - XCTAssertNotEqual(channel1, channel3); - XCTAssertNotEqual(channel1, channel4); - XCTAssertNotEqual(channel3, channel4); -} - -- (void)testDestroyAllChannels { - NSString *kDummyHost = @"dummy.host"; - - GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; - GRPCChannelPool *pool = [GRPCChannelPool sharedInstance]; - GRPCChannel *channel = [pool channelWithHost:kDummyHost callOptions:options]; - grpc_call *call = [channel unmanagedCallWithPath:@"dummy.path" - completionQueue:[GRPCCompletionQueue completionQueue] - callOptions:options - disconnected:nil]; - [pool destroyAllChannels]; - XCTAssertTrue(channel.disconnected); - GRPCChannel *channel2 = [pool channelWithHost:kDummyHost callOptions:options]; - XCTAssertNotEqual(channel, channel2); - grpc_call_unref(call); + GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; + GRPCPooledChannel *channel1 = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost + callOptions:options1]; + grpc_call *call1 = [channel1 unmanagedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options1]; + GRPCPooledChannel *channel2 = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost + callOptions:options2]; + grpc_call *call2 = [channel2 unmanagedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options2]; + GRPCPooledChannel *channel3 = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost + callOptions:options3]; + grpc_call *call3 = [channel3 unmanagedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options3]; + GRPCPooledChannel *channel4 = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost2 + callOptions:options1]; + grpc_call *call4 = [channel4 unmanagedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options1]; + XCTAssertEqual(channel1.wrappedChannel, channel2.wrappedChannel); + XCTAssertNotEqual(channel1.wrappedChannel, channel3.wrappedChannel); + XCTAssertNotEqual(channel1.wrappedChannel, channel4.wrappedChannel); + XCTAssertNotEqual(channel3.wrappedChannel, channel4.wrappedChannel); + [channel1 unrefUnmanagedCall:call1]; + [channel2 unrefUnmanagedCall:call2]; + [channel3 unrefUnmanagedCall:call3]; + [channel4 unrefUnmanagedCall:call4]; } -- (void)testGetChannelBeforeChannelTimedDisconnection { - NSString *kDummyHost = @"dummy.host"; - const NSTimeInterval kDestroyDelay = 1; - - GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; - GRPCChannelPool *pool = [GRPCChannelPool sharedInstance]; - GRPCChannel *channel = - [pool channelWithHost:kDummyHost callOptions:options destroyDelay:kDestroyDelay]; - grpc_call *call = [channel unmanagedCallWithPath:@"dummy.path" - completionQueue:[GRPCCompletionQueue completionQueue] - callOptions:options - disconnected:nil]; - grpc_call_unref(call); - [channel unref]; - - // Test that we can still get the channel at this time - GRPCChannel *channel2 = - [pool channelWithHost:kDummyHost callOptions:options destroyDelay:kDestroyDelay]; - XCTAssertEqual(channel, channel2); - call = [channel2 unmanagedCallWithPath:@"dummy.path" - completionQueue:[GRPCCompletionQueue completionQueue] - callOptions:options - disconnected:nil]; - - // Test that after the destroy delay, the channel is still alive +- (void)testTimedDestroyChannel { + const NSTimeInterval kDestroyDelay = 1.0; + + GRPCChannelPool *pool = [[GRPCChannelPool alloc] init]; + pool.destroyDelay = kDestroyDelay; + GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; + GRPCPooledChannel *channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost + callOptions:options]; + GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; + grpc_call *call = [channel unmanagedCallWithPath:kDummyPath + completionQueue:cq callOptions:options]; + GRPCChannel *wrappedChannel = channel.wrappedChannel; + + [channel unrefUnmanagedCall:call]; + // Confirm channel is not destroyed at this time + call = [channel unmanagedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + XCTAssertEqual(wrappedChannel, channel.wrappedChannel); + + [channel unrefUnmanagedCall:call]; sleep(kDestroyDelay + 1); - XCTAssertFalse(channel.disconnected); + // Confirm channel is new at this time + call = [channel unmanagedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel); + + // Confirm the new channel can create call + call = [channel unmanagedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + XCTAssert(call != NULL); + [channel unrefUnmanagedCall:call]; } -- (void)testGetChannelAfterChannelTimedDisconnection { - NSString *kDummyHost = @"dummy.host"; - const NSTimeInterval kDestroyDelay = 1; - - GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init]; - GRPCChannelPool *pool = [GRPCChannelPool sharedInstance]; - GRPCChannel *channel = - [pool channelWithHost:kDummyHost callOptions:options destroyDelay:kDestroyDelay]; - grpc_call *call = [channel unmanagedCallWithPath:@"dummy.path" - completionQueue:[GRPCCompletionQueue completionQueue] - callOptions:options - disconnected:nil]; - grpc_call_unref(call); - [channel unref]; - - sleep(kDestroyDelay + 1); +- (void)testPoolDisconnection { + GRPCChannelPool *pool = [[GRPCChannelPool alloc] init]; + GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; + GRPCPooledChannel *channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost + callOptions:options]; + GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; + grpc_call *call = [channel unmanagedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + XCTAssertNotNil(channel.wrappedChannel); + GRPCChannel *wrappedChannel = channel.wrappedChannel; + + // Test a new channel is created by requesting a channel from pool + [pool disconnectAllChannels]; + channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost + callOptions:options]; + call = [channel unmanagedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + XCTAssertNotNil(channel.wrappedChannel); + XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel); + wrappedChannel = channel.wrappedChannel; + + // Test a new channel is created by requesting a new call from the previous proxy + [pool disconnectAllChannels]; + grpc_call *call2 = [channel unmanagedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + XCTAssertNotNil(channel.wrappedChannel); + XCTAssertNotEqual(channel.wrappedChannel, wrappedChannel); + [channel unrefUnmanagedCall:call]; + [channel unrefUnmanagedCall:call2]; +} - // Test that we get new channel to the same host and with the same callOptions - GRPCChannel *channel2 = - [pool channelWithHost:kDummyHost callOptions:options destroyDelay:kDestroyDelay]; - XCTAssertNotEqual(channel, channel2); +- (void)testUnrefCallFromStaleChannel { + GRPCChannelPool *pool = [[GRPCChannelPool alloc] init]; + GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; + GRPCPooledChannel *channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost + callOptions:options]; + GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue]; + grpc_call *call = [channel unmanagedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + + [pool disconnectAllChannels]; + channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost + callOptions:options]; + + grpc_call *call2 = [channel unmanagedCallWithPath:kDummyPath + completionQueue:cq + callOptions:options]; + // Test unref the call of a stale channel will not cause the current channel going into timed + // destroy state + XCTAssertNotNil(channel.wrappedChannel); + GRPCChannel *wrappedChannel = channel.wrappedChannel; + [channel unrefUnmanagedCall:call]; + XCTAssertNotNil(channel.wrappedChannel); + XCTAssertEqual(wrappedChannel, channel.wrappedChannel); + // Test unref the call of the current channel will cause the channel going into timed destroy + // state + [channel unrefUnmanagedCall:call2]; + XCTAssertNil(channel.wrappedChannel); } @end diff --git a/src/objective-c/tests/ChannelTests/ChannelTests.m b/src/objective-c/tests/ChannelTests/ChannelTests.m index 5daafcdf3f..212db2f653 100644 --- a/src/objective-c/tests/ChannelTests/ChannelTests.m +++ b/src/objective-c/tests/ChannelTests/ChannelTests.m @@ -20,65 +20,93 @@ #import "../../GRPCClient/GRPCCallOptions.h" #import "../../GRPCClient/private/GRPCChannel.h" +#import "../../GRPCClient/private/GRPCChannelPool.h" #import "../../GRPCClient/private/GRPCCompletionQueue.h" -@interface ChannelTests : XCTestCase +/* +#define TEST_TIMEOUT 8 + +@interface GRPCChannelFake : NSObject + +- (instancetype)initWithCreateExpectation:(XCTestExpectation *)createExpectation + unrefExpectation:(XCTestExpectation *)unrefExpectation; + +- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path + completionQueue:(GRPCCompletionQueue *)queue + callOptions:(GRPCCallOptions *)callOptions; + +- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall; @end -@implementation ChannelTests +@implementation GRPCChannelFake { + __weak XCTestExpectation *_createExpectation; + __weak XCTestExpectation *_unrefExpectation; + long _grpcCallCounter; +} -+ (void)setUp { - grpc_init(); +- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration { + return nil; } -- (void)testTimedDisconnection { - NSString *const kHost = @"grpc-test.sandbox.googleapis.com"; - const NSTimeInterval kDestroyDelay = 1; - GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; - GRPCChannelConfiguration *configuration = - [[GRPCChannelConfiguration alloc] initWithHost:kHost callOptions:options]; - GRPCChannel *channel = - [[GRPCChannel alloc] initWithChannelConfiguration:configuration destroyDelay:kDestroyDelay]; - BOOL disconnected; - grpc_call *call = [channel unmanagedCallWithPath:@"dummy.path" - completionQueue:[GRPCCompletionQueue completionQueue] - callOptions:options - disconnected:&disconnected]; - XCTAssertFalse(disconnected); - grpc_call_unref(call); - [channel unref]; - XCTAssertFalse(channel.disconnected, @"Channel is pre-maturely disconnected."); - sleep(kDestroyDelay + 1); - XCTAssertTrue(channel.disconnected, @"Channel is not disconnected after delay."); - - // Check another call creation returns null and indicates disconnected. - call = [channel unmanagedCallWithPath:@"dummy.path" - completionQueue:[GRPCCompletionQueue completionQueue] - callOptions:options - disconnected:&disconnected]; - XCTAssert(call == NULL); - XCTAssertTrue(disconnected); +- (instancetype)initWithCreateExpectation:(XCTestExpectation *)createExpectation + unrefExpectation:(XCTestExpectation *)unrefExpectation { + if ((self = [super init])) { + _createExpectation = createExpectation; + _unrefExpectation = unrefExpectation; + _grpcCallCounter = 0; + } + return self; } -- (void)testForceDisconnection { - NSString *const kHost = @"grpc-test.sandbox.googleapis.com"; - const NSTimeInterval kDestroyDelay = 1; - GRPCCallOptions *options = [[GRPCCallOptions alloc] init]; - GRPCChannelConfiguration *configuration = - [[GRPCChannelConfiguration alloc] initWithHost:kHost callOptions:options]; - GRPCChannel *channel = - [[GRPCChannel alloc] initWithChannelConfiguration:configuration destroyDelay:kDestroyDelay]; - grpc_call *call = [channel unmanagedCallWithPath:@"dummy.path" - completionQueue:[GRPCCompletionQueue completionQueue] - callOptions:options - disconnected:nil]; - grpc_call_unref(call); - [channel disconnect]; - XCTAssertTrue(channel.disconnected, @"Channel is not disconnected."); - - // Test calling another unref here will not crash - [channel unref]; +- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path + completionQueue:(GRPCCompletionQueue *)queue + callOptions:(GRPCCallOptions *)callOptions { + if (_createExpectation) [_createExpectation fulfill]; + return (grpc_call *)(++_grpcCallCounter); +} + +- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall { + if (_unrefExpectation) [_unrefExpectation fulfill]; +} + +@end + +@interface GRPCChannelPoolFake : NSObject + +- (instancetype)initWithDelayedDestroyExpectation:(XCTestExpectation *)delayedDestroyExpectation; + +- (GRPCChannel *)rawChannelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions; + +- (void)delayedDestroyChannel; + +@end + +@implementation GRPCChannelPoolFake { + __weak XCTestExpectation *_delayedDestroyExpectation; +} + +- (instancetype)initWithDelayedDestroyExpectation:(XCTestExpectation *)delayedDestroyExpectation { + if ((self = [super init])) { + _delayedDestroyExpectation = delayedDestroyExpectation; + } + return self; +} + +- (void)delayedDestroyChannel { + if (_delayedDestroyExpectation) [_delayedDestroyExpectation fulfill]; +} + +@end */ + +@interface ChannelTests : XCTestCase + +@end + +@implementation ChannelTests + ++ (void)setUp { + grpc_init(); } @end |