diff options
author | 2018-11-30 13:43:56 -0800 | |
---|---|---|
committer | 2018-11-30 13:45:40 -0800 | |
commit | 459da578db5ae9bd95f91be2888236c4870a7314 (patch) | |
tree | 0269f17e1edb63374d3fffb011f4a0e7d2a4ad8d /src/objective-c/GRPCClient/private/GRPCChannelPool.m | |
parent | a7c41346d8470e7eb5f10234daa08d09a48fa779 (diff) |
Refactor channel pool
Diffstat (limited to 'src/objective-c/GRPCClient/private/GRPCChannelPool.m')
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCChannelPool.m | 276 |
1 files changed, 107 insertions, 169 deletions
diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.m b/src/objective-c/GRPCClient/private/GRPCChannelPool.m index 646f1e4b86..488766c0ed 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannelPool.m +++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.m @@ -28,6 +28,8 @@ #import "GRPCSecureChannelFactory.h" #import "utilities.h" #import "version.h" +#import "GRPCWrappedCall.h" +#import "GRPCCompletionQueue.h" #import <GRPCClient/GRPCCall+Cronet.h> #include <grpc/support/log.h> @@ -40,103 +42,139 @@ static dispatch_once_t gInitChannelPool; /** When all calls of a channel are destroyed, destroy the channel after this much seconds. */ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; -@interface GRPCChannelPool () - -- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration; - -- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration; - -@end - @implementation GRPCPooledChannel { - __weak GRPCChannelPool *_channelPool; GRPCChannelConfiguration *_channelConfiguration; - NSMutableSet *_unmanagedCalls; + NSTimeInterval _destroyDelay; + + NSHashTable<GRPCWrappedCall *> *_wrappedCalls; GRPCChannel *_wrappedChannel; + NSDate *_lastTimedDestroy; + dispatch_queue_t _timerQueue; } -- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration - channelPool:(GRPCChannelPool *)channelPool { - NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty."); - NSAssert(channelPool != nil, @"channelPool cannot be empty."); - if (channelPool == nil || channelConfiguration == nil) { - return nil; - } - - if ((self = [super init])) { - _channelPool = channelPool; - _channelConfiguration = channelConfiguration; - _unmanagedCalls = [NSMutableSet set]; - _wrappedChannel = nil; - } - - return self; +- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration { + return [self initWithChannelConfiguration:channelConfiguration destroyDelay:kDefaultChannelDestroyDelay]; } - (void)dealloc { - NSAssert([_unmanagedCalls count] == 0 && _wrappedChannel == nil, - @"Pooled channel should only be" - "destroyed after the wrapped channel is destroyed"); + if ([_wrappedCalls objectEnumerator].allObjects.count != 0) { + NSEnumerator *enumerator = [_wrappedCalls objectEnumerator]; + GRPCWrappedCall *wrappedCall; + while ((wrappedCall = [enumerator nextObject])) { + [wrappedCall channelDisconnected]; + }; + } } -- (grpc_call *)unmanagedCallWithPath:(NSString *)path - completionQueue:(GRPCCompletionQueue *)queue - callOptions:(GRPCCallOptions *)callOptions { +- (GRPCWrappedCall *)wrappedCallWithPath:(NSString *)path +completionQueue:(GRPCCompletionQueue *)queue +callOptions:(GRPCCallOptions *)callOptions { NSAssert(path.length > 0, @"path must not be empty."); NSAssert(queue != nil, @"completionQueue must not be empty."); NSAssert(callOptions, @"callOptions must not be empty."); if (path.length == 0 || queue == nil || callOptions == nil) return NULL; - grpc_call *call = NULL; + GRPCWrappedCall *call = nil; + @synchronized(self) { if (_wrappedChannel == nil) { - __strong GRPCChannelPool *strongPool = _channelPool; - if (strongPool) { - _wrappedChannel = [strongPool refChannelWithConfiguration:_channelConfiguration]; + _wrappedChannel = [[GRPCChannel alloc] initWithChannelConfiguration:_channelConfiguration]; + if (_wrappedChannel == nil) { + NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy."); + return nil; } - NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy."); } - call = - [_wrappedChannel unmanagedCallWithPath:path completionQueue:queue callOptions:callOptions]; - if (call != NULL) { - [_unmanagedCalls addObject:[NSValue valueWithPointer:call]]; + _lastTimedDestroy = nil; + + grpc_call *unmanagedCall = [_wrappedChannel unmanagedCallWithPath:path + completionQueue:[GRPCCompletionQueue completionQueue] + callOptions:callOptions]; + if (unmanagedCall == NULL) { + NSAssert(unmanagedCall != NULL, @"Unable to create grpc_call object"); + return nil; + } + + call = [[GRPCWrappedCall alloc] initWithUnmanagedCall:unmanagedCall pooledChannel:self]; + if (call == nil) { + NSAssert(call != nil, @"Unable to create GRPCWrappedCall object"); + return nil; } + + [_wrappedCalls addObject:call]; } return call; } -- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall { - if (unmanagedCall == NULL) { +- (void)notifyWrappedCallDealloc:(GRPCWrappedCall *)wrappedCall { + NSAssert(wrappedCall != nil, @"wrappedCall cannot be empty."); + if (wrappedCall == nil) { return; } - - grpc_call_unref(unmanagedCall); @synchronized(self) { - NSValue *removedCall = [NSValue valueWithPointer:unmanagedCall]; - [_unmanagedCalls removeObject:removedCall]; - if ([_unmanagedCalls count] == 0) { - _wrappedChannel = nil; - GRPCChannelPool *strongPool = _channelPool; - [strongPool unrefChannelWithConfiguration:_channelConfiguration]; + if ([_wrappedCalls objectEnumerator].allObjects.count == 0) { + NSDate *now = [NSDate date]; + _lastTimedDestroy = now; + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)_destroyDelay * NSEC_PER_SEC), + _timerQueue, ^{ + @synchronized(self) { + if (self->_lastTimedDestroy == now) { + self->_wrappedChannel = nil; + self->_lastTimedDestroy = nil; + } + } + }); } } } - (void)disconnect { + NSHashTable<GRPCWrappedCall *> *copiedWrappedCalls = nil; @synchronized(self) { if (_wrappedChannel != nil) { _wrappedChannel = nil; - [_unmanagedCalls removeAllObjects]; - GRPCChannelPool *strongPool = _channelPool; - [strongPool unrefChannelWithConfiguration:_channelConfiguration]; + copiedWrappedCalls = [_wrappedCalls copy]; + [_wrappedCalls removeAllObjects]; } } + NSEnumerator *enumerator = [copiedWrappedCalls objectEnumerator]; + GRPCWrappedCall *wrappedCall; + while ((wrappedCall = [enumerator nextObject])) { + [wrappedCall channelDisconnected]; + } } @end @implementation GRPCPooledChannel (Test) +- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration + destroyDelay:(NSTimeInterval)destroyDelay { + NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty."); + if (channelConfiguration == nil) { + return nil; + } + + if ((self = [super init])) { + _channelConfiguration = channelConfiguration; + _destroyDelay = destroyDelay; + _wrappedCalls = [[NSHashTable alloc] initWithOptions:NSHashTableWeakMemory capacity:1]; + _wrappedChannel = nil; + _lastTimedDestroy = nil; +#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300 + if (@available(iOS 8.0, macOS 10.10, *)) { + _timerQueue = dispatch_queue_create(NULL, + dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); + } else { +#else + { +#endif + _timerQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + } + } + + return self; +} + - (GRPCChannel *)wrappedChannel { GRPCChannel *channel = nil; @synchronized(self) { @@ -147,65 +185,28 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; @end -/** - * A convenience value type for cached channel. - */ -@interface GRPCChannelRecord : NSObject - -/** Pointer to the raw channel. May be nil when the channel has been destroyed. */ -@property GRPCChannel *channel; - -/** Channel proxy corresponding to this channel configuration. */ -@property GRPCPooledChannel *pooledChannel; - -/** Last time when a timed destroy is initiated on the channel. */ -@property NSDate *timedDestroyDate; - -/** Reference count of the proxy to the channel. */ -@property NSUInteger refCount; - -@end - -@implementation GRPCChannelRecord - -@end - @interface GRPCChannelPool () -- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay NS_DESIGNATED_INITIALIZER; +- (instancetype)initInstance NS_DESIGNATED_INITIALIZER; @end @implementation GRPCChannelPool { - NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannelRecord *> *_channelPool; - dispatch_queue_t _dispatchQueue; - NSTimeInterval _destroyDelay; + NSMutableDictionary<GRPCChannelConfiguration *, GRPCPooledChannel *> *_channelPool; } + (instancetype)sharedInstance { dispatch_once(&gInitChannelPool, ^{ gChannelPool = - [[GRPCChannelPool alloc] initInstanceWithDestroyDelay:kDefaultChannelDestroyDelay]; + [[GRPCChannelPool alloc] initInstance]; NSAssert(gChannelPool != nil, @"Cannot initialize global channel pool."); }); return gChannelPool; } -- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay { +- (instancetype)initInstance { if ((self = [super init])) { _channelPool = [NSMutableDictionary dictionary]; -#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300 - if (@available(iOS 8.0, macOS 10.10, *)) { - _dispatchQueue = dispatch_queue_create( - NULL, - dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); - } else { -#else - { -#endif - _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); - } - _destroyDelay = destroyDelay; // Connectivity monitor is not required for CFStream char *enableCFStream = getenv(kCFStreamVarName); @@ -231,86 +232,23 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; GRPCChannelConfiguration *configuration = [[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions]; @synchronized(self) { - GRPCChannelRecord *record = _channelPool[configuration]; - if (record == nil) { - record = [[GRPCChannelRecord alloc] init]; - record.pooledChannel = - [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration channelPool:self]; - _channelPool[configuration] = record; - pooledChannel = record.pooledChannel; - } else { - pooledChannel = record.pooledChannel; + pooledChannel = _channelPool[configuration]; + if (pooledChannel == nil) { + pooledChannel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration]; + _channelPool[configuration] = pooledChannel; } } return pooledChannel; } -- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration { - GRPCChannel *ret = nil; - @synchronized(self) { - NSAssert(configuration != nil, @"configuration cannot be empty."); - if (configuration == nil) { - return nil; - } - - GRPCChannelRecord *record = _channelPool[configuration]; - NSAssert(record != nil, @"No record corresponding to a proxy."); - if (record == nil) { - return nil; - } - - record.refCount++; - record.timedDestroyDate = nil; - if (record.channel == nil) { - // Channel is already destroyed; - record.channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration]; - } - ret = record.channel; - } - return ret; -} - -- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration { - @synchronized(self) { - GRPCChannelRecord *record = _channelPool[configuration]; - NSAssert(record != nil, @"No record corresponding to a proxy."); - if (record == nil) { - return; - } - NSAssert(record.refCount > 0, @"Inconsistent channel refcount."); - if (record.refCount > 0) { - record.refCount--; - if (record.refCount == 0) { - NSDate *now = [NSDate date]; - record.timedDestroyDate = now; - dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(_destroyDelay * NSEC_PER_SEC)), - _dispatchQueue, ^{ - @synchronized(self) { - if (now == record.timedDestroyDate) { - // Destroy the raw channel and reset related records. - record.timedDestroyDate = nil; - record.channel = nil; - } - } - }); - } - } - } -} - - (void)disconnectAllChannels { - NSMutableSet<GRPCPooledChannel *> *proxySet = [NSMutableSet set]; + NSDictionary *copiedPooledChannels; @synchronized(self) { - [_channelPool - enumerateKeysAndObjectsUsingBlock:^(GRPCChannelConfiguration *_Nonnull key, - GRPCChannelRecord *_Nonnull obj, BOOL *_Nonnull stop) { - obj.channel = nil; - obj.timedDestroyDate = nil; - [proxySet addObject:obj.pooledChannel]; - }]; + copiedPooledChannels = [NSDictionary dictionaryWithDictionary:_channelPool]; } - // Disconnect proxies - [proxySet enumerateObjectsUsingBlock:^(GRPCPooledChannel *_Nonnull obj, BOOL *_Nonnull stop) { + + // Disconnect pooled channels. + [copiedPooledChannels enumerateKeysAndObjectsUsingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) { [obj disconnect]; }]; } @@ -323,8 +261,8 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; @implementation GRPCChannelPool (Test) -- (instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay { - return [self initInstanceWithDestroyDelay:destroyDelay]; +- (instancetype)initTestPool { + return [self initInstance]; } @end |