diff options
author | Muxi Yan <mxyan@google.com> | 2018-11-18 22:47:35 -0800 |
---|---|---|
committer | Muxi Yan <mxyan@google.com> | 2018-11-18 22:47:35 -0800 |
commit | f0cbcde73195b8e17130538c3479d4c0e3bcd2a2 (patch) | |
tree | 06483748c28dc85c4927eec35e88b6001923999e /src/objective-c/GRPCClient/private/GRPCChannelPool.m | |
parent | 87abab45c99ab4b40718557cbc1c25dcd7f5a418 (diff) |
New channel pool design
Diffstat (limited to 'src/objective-c/GRPCClient/private/GRPCChannelPool.m')
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCChannelPool.m | 263 |
1 files changed, 236 insertions, 27 deletions
diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.m b/src/objective-c/GRPCClient/private/GRPCChannelPool.m index 5e2e9bcfeb..92dd4c86ae 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannelPool.m +++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.m @@ -37,8 +37,150 @@ extern const char *kCFStreamVarName; static GRPCChannelPool *gChannelPool; static dispatch_once_t gInitChannelPool; +/** When all calls of a channel are destroyed, destroy the channel after this much seconds. */ +static const NSTimeInterval kDefaultChannelDestroyDelay = 30; + +@interface GRPCChannelPool() + +- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration; + +- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration; + +@end + +@implementation GRPCPooledChannel { + __weak GRPCChannelPool *_channelPool; + GRPCChannelConfiguration *_channelConfiguration; + NSMutableSet *_unmanagedCalls; +} + +@synthesize wrappedChannel = _wrappedChannel; + +- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration + channelPool:(GRPCChannelPool *)channelPool { + NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty."); + NSAssert(channelPool != nil, @"channelPool cannot be empty."); + if (channelPool == nil || channelConfiguration == nil) { + return nil; + } + + if ((self = [super init])) { + _channelPool = channelPool; + _channelConfiguration = channelConfiguration; + _unmanagedCalls = [NSMutableSet set]; + } + + return self; +} + +- (grpc_call *)unmanagedCallWithPath:(NSString *)path + completionQueue:(GRPCCompletionQueue *)queue + callOptions:(GRPCCallOptions *)callOptions { + NSAssert(path.length > 0, @"path must not be empty."); + NSAssert(queue != nil, @"completionQueue must not be empty."); + NSAssert(callOptions, @"callOptions must not be empty."); + if (path.length == 0 || queue == nil || callOptions == nil) return NULL; + + grpc_call *call = NULL; + @synchronized(self) { + if (_wrappedChannel == nil) { + __strong GRPCChannelPool *strongPool = _channelPool; + if (strongPool) { + _wrappedChannel = [strongPool refChannelWithConfiguration:_channelConfiguration]; + } + NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy."); + } + call = [_wrappedChannel unmanagedCallWithPath:path completionQueue:queue callOptions:callOptions]; + if (call != NULL) { + [_unmanagedCalls addObject:[NSValue valueWithPointer:call]]; + } + } + return call; +} + +- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall { + if (unmanagedCall == nil) return; + + grpc_call_unref(unmanagedCall); + BOOL timedDestroy = NO; + @synchronized(self) { + if ([_unmanagedCalls containsObject:[NSValue valueWithPointer:unmanagedCall]]) { + [_unmanagedCalls removeObject:[NSValue valueWithPointer:unmanagedCall]]; + if ([_unmanagedCalls count] == 0) { + timedDestroy = YES; + } + } + } + if (timedDestroy) { + [self timedDestroy]; + } +} + +- (void)disconnect { + @synchronized(self) { + _wrappedChannel = nil; + [_unmanagedCalls removeAllObjects]; + } +} + +- (GRPCChannel *)wrappedChannel { + GRPCChannel *channel = nil; + @synchronized (self) { + channel = _wrappedChannel; + } + return channel; +} + +- (void)timedDestroy { + __strong GRPCChannelPool *pool = nil; + @synchronized(self) { + // Check if we still want to destroy the channel. + if ([_unmanagedCalls count] == 0) { + pool = _channelPool; + _wrappedChannel = nil; + } + } + [pool unrefChannelWithConfiguration:_channelConfiguration]; +} + +@end + +/** + * A convenience value type for cached channel. + */ +@interface GRPCChannelRecord : NSObject <NSCopying> + +/** Pointer to the raw channel. May be nil when the channel has been destroyed. */ +@property GRPCChannel *channel; + +/** Channel proxy corresponding to this channel configuration. */ +@property GRPCPooledChannel *proxy; + +/** Last time when a timed destroy is initiated on the channel. */ +@property NSDate *timedDestroyDate; + +/** Reference count of the proxy to the channel. */ +@property NSUInteger refcount; + +@end + +@implementation GRPCChannelRecord + +- (id)copyWithZone:(NSZone *)zone { + GRPCChannelRecord *newRecord = [[GRPCChannelRecord allocWithZone:zone] init]; + newRecord.channel = _channel; + newRecord.proxy = _proxy; + newRecord.timedDestroyDate = _timedDestroyDate; + newRecord.refcount = _refcount; + + return newRecord; +} + +@end + @implementation GRPCChannelPool { - NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannel *> *_channelPool; + NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannelRecord *> *_channelPool; + dispatch_queue_t _dispatchQueue; } + (instancetype)sharedInstance { @@ -52,6 +194,18 @@ static dispatch_once_t gInitChannelPool; - (instancetype)init { if ((self = [super init])) { _channelPool = [NSMutableDictionary dictionary]; +#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300 + if (@available(iOS 8.0, macOS 10.10, *)) { + _dispatchQueue = dispatch_queue_create( + NULL, + dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); + } else { +#else + { +#endif + _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + } + _destroyDelay = kDefaultChannelDestroyDelay; // Connectivity monitor is not required for CFStream char *enableCFStream = getenv(kCFStreamVarName); @@ -62,51 +216,106 @@ static dispatch_once_t gInitChannelPool; return self; } -- (GRPCChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions { - return [self channelWithHost:host callOptions:callOptions destroyDelay:0]; -} - -- (GRPCChannel *)channelWithHost:(NSString *)host - callOptions:(GRPCCallOptions *)callOptions - destroyDelay:(NSTimeInterval)destroyDelay { +- (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions { NSAssert(host.length > 0, @"Host must not be empty."); NSAssert(callOptions != nil, @"callOptions must not be empty."); if (host.length == 0) return nil; if (callOptions == nil) return nil; - GRPCChannel *channel; + GRPCPooledChannel *channelProxy = nil; GRPCChannelConfiguration *configuration = [[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions]; @synchronized(self) { - channel = _channelPool[configuration]; - if (channel == nil || channel.disconnected) { - if (destroyDelay == 0) { - channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration]; - } else { - channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration - destroyDelay:destroyDelay]; - } - _channelPool[configuration] = channel; + GRPCChannelRecord *record = _channelPool[configuration]; + if (record == nil) { + record = [[GRPCChannelRecord alloc] init]; + record.proxy = [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration + channelPool:self]; + record.timedDestroyDate = nil; + _channelPool[configuration] = record; + channelProxy = record.proxy; + } else { + channelProxy = record.proxy; } } - return channel; + return channelProxy; } -+ (void)closeOpenConnections { - [[GRPCChannelPool sharedInstance] destroyAllChannels]; +- (void)closeOpenConnections { + [self disconnectAllChannels]; } -- (void)destroyAllChannels { - @synchronized(self) { - for (id key in _channelPool) { - [_channelPool[key] disconnect]; +- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration { + GRPCChannel *ret = nil; + @synchronized (self) { + NSAssert(configuration != nil, @"configuration cannot be empty."); + if (configuration == nil) return nil; + + GRPCChannelRecord *record = _channelPool[configuration]; + NSAssert(record != nil, @"No record corresponding to a proxy."); + if (record == nil) return nil; + + if (record.channel == nil) { + // Channel is already destroyed; + record.channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration]; + record.timedDestroyDate = nil; + record.refcount = 1; + ret = record.channel; + } else { + ret = record.channel; + record.timedDestroyDate = nil; + record.refcount++; } - _channelPool = [NSMutableDictionary dictionary]; } + return ret; +} + +- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration { + @synchronized (self) { + GRPCChannelRecord *record = _channelPool[configuration]; + NSAssert(record != nil, @"No record corresponding to a proxy."); + if (record == nil) return; + NSAssert(record.refcount > 0, @"Inconsistent channel refcount."); + if (record.refcount > 0) { + record.refcount--; + if (record.refcount == 0) { + NSDate *now = [NSDate date]; + record.timedDestroyDate = now; + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(_destroyDelay * NSEC_PER_SEC)), + _dispatchQueue, + ^{ + @synchronized (self) { + if (now == record.timedDestroyDate) { + // Destroy the raw channel and reset related records. + record.timedDestroyDate = nil; + record.refcount = 0; + record.channel = nil; + } + } + }); + } + } + } +} + +- (void)disconnectAllChannels { + NSMutableSet<GRPCPooledChannel *> *proxySet = [NSMutableSet set]; + @synchronized (self) { + [_channelPool enumerateKeysAndObjectsUsingBlock:^(GRPCChannelConfiguration * _Nonnull key, GRPCChannelRecord * _Nonnull obj, BOOL * _Nonnull stop) { + obj.channel = nil; + obj.timedDestroyDate = nil; + obj.refcount = 0; + [proxySet addObject:obj.proxy]; + }]; + } + // Disconnect proxies + [proxySet enumerateObjectsUsingBlock:^(GRPCPooledChannel * _Nonnull obj, BOOL * _Nonnull stop) { + [obj disconnect]; + }]; } - (void)connectivityChange:(NSNotification *)note { - [self destroyAllChannels]; + [self disconnectAllChannels]; } - (void)dealloc { |