diff options
Diffstat (limited to 'src/objective-c/GRPCClient/private/GRPCChannelPool.m')
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCChannelPool.m | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.m b/src/objective-c/GRPCClient/private/GRPCChannelPool.m new file mode 100644 index 0000000000..a323f0490c --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.m @@ -0,0 +1,276 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#import <Foundation/Foundation.h> + +#import "../internal/GRPCCallOptions+Internal.h" +#import "GRPCChannel.h" +#import "GRPCChannelFactory.h" +#import "GRPCChannelPool+Test.h" +#import "GRPCChannelPool.h" +#import "GRPCCompletionQueue.h" +#import "GRPCConnectivityMonitor.h" +#import "GRPCCronetChannelFactory.h" +#import "GRPCInsecureChannelFactory.h" +#import "GRPCSecureChannelFactory.h" +#import "GRPCWrappedCall.h" +#import "version.h" + +#import <GRPCClient/GRPCCall+Cronet.h> +#include <grpc/support/log.h> + +extern const char *kCFStreamVarName; + +static GRPCChannelPool *gChannelPool; +static dispatch_once_t gInitChannelPool; + +/** When all calls of a channel are destroyed, destroy the channel after this much seconds. */ +static const NSTimeInterval kDefaultChannelDestroyDelay = 30; + +@implementation GRPCPooledChannel { + GRPCChannelConfiguration *_channelConfiguration; + NSTimeInterval _destroyDelay; + + NSHashTable<GRPCWrappedCall *> *_wrappedCalls; + GRPCChannel *_wrappedChannel; + NSDate *_lastTimedDestroy; + dispatch_queue_t _timerQueue; +} + +- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration { + return [self initWithChannelConfiguration:channelConfiguration + destroyDelay:kDefaultChannelDestroyDelay]; +} + +- (nullable instancetype)initWithChannelConfiguration: + (GRPCChannelConfiguration *)channelConfiguration + destroyDelay:(NSTimeInterval)destroyDelay { + NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty."); + if (channelConfiguration == nil) { + return nil; + } + + if ((self = [super init])) { + _channelConfiguration = [channelConfiguration copy]; + _destroyDelay = destroyDelay; + _wrappedCalls = [NSHashTable weakObjectsHashTable]; + _wrappedChannel = nil; + _lastTimedDestroy = nil; +#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300 + if (@available(iOS 8.0, macOS 10.10, *)) { + _timerQueue = dispatch_queue_create(NULL, dispatch_queue_attr_make_with_qos_class( + DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); + } else { +#else + { +#endif + _timerQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + } + } + + return self; +} + +- (void)dealloc { + // Disconnect GRPCWrappedCall objects created but not yet removed + if (_wrappedCalls.allObjects.count != 0) { + for (GRPCWrappedCall *wrappedCall in _wrappedCalls.allObjects) { + [wrappedCall channelDisconnected]; + }; + } +} + +- (GRPCWrappedCall *)wrappedCallWithPath:(NSString *)path + completionQueue:(GRPCCompletionQueue *)queue + callOptions:(GRPCCallOptions *)callOptions { + NSAssert(path.length > 0, @"path must not be empty."); + NSAssert(queue != nil, @"completionQueue must not be empty."); + NSAssert(callOptions, @"callOptions must not be empty."); + if (path.length == 0 || queue == nil || callOptions == nil) { + return nil; + } + + GRPCWrappedCall *call = nil; + + @synchronized(self) { + if (_wrappedChannel == nil) { + _wrappedChannel = [[GRPCChannel alloc] initWithChannelConfiguration:_channelConfiguration]; + if (_wrappedChannel == nil) { + NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy."); + return nil; + } + } + _lastTimedDestroy = nil; + + grpc_call *unmanagedCall = + [_wrappedChannel unmanagedCallWithPath:path + completionQueue:[GRPCCompletionQueue completionQueue] + callOptions:callOptions]; + if (unmanagedCall == NULL) { + NSAssert(unmanagedCall != NULL, @"Unable to create grpc_call object"); + return nil; + } + + call = [[GRPCWrappedCall alloc] initWithUnmanagedCall:unmanagedCall pooledChannel:self]; + if (call == nil) { + NSAssert(call != nil, @"Unable to create GRPCWrappedCall object"); + grpc_call_unref(unmanagedCall); + return nil; + } + + [_wrappedCalls addObject:call]; + } + return call; +} + +- (void)notifyWrappedCallDealloc:(GRPCWrappedCall *)wrappedCall { + NSAssert(wrappedCall != nil, @"wrappedCall cannot be empty."); + if (wrappedCall == nil) { + return; + } + @synchronized(self) { + // Detect if all objects weakly referenced in _wrappedCalls are (implicitly) removed. + // _wrappedCalls.count does not work here since the hash table may include deallocated weak + // references. _wrappedCalls.allObjects forces removal of those objects. + if (_wrappedCalls.allObjects.count == 0) { + // No more call has reference to this channel. We may start the timer for destroying the + // channel now. + NSDate *now = [NSDate date]; + NSAssert(now != nil, @"Unable to create NSDate object 'now'."); + _lastTimedDestroy = now; + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)_destroyDelay * NSEC_PER_SEC), + _timerQueue, ^{ + @synchronized(self) { + // Check _lastTimedDestroy against now in case more calls are created (and + // maybe destroyed) after this dispatch_async. In that case the current + // dispatch_after block should be discarded; the channel should be + // destroyed in a later dispatch_after block. + if (now != nil && self->_lastTimedDestroy == now) { + self->_wrappedChannel = nil; + self->_lastTimedDestroy = nil; + } + } + }); + } + } +} + +- (void)disconnect { + NSArray<GRPCWrappedCall *> *copiedWrappedCalls = nil; + @synchronized(self) { + if (_wrappedChannel != nil) { + _wrappedChannel = nil; + copiedWrappedCalls = _wrappedCalls.allObjects; + [_wrappedCalls removeAllObjects]; + } + } + for (GRPCWrappedCall *wrappedCall in copiedWrappedCalls) { + [wrappedCall channelDisconnected]; + } +} + +- (GRPCChannel *)wrappedChannel { + GRPCChannel *channel = nil; + @synchronized(self) { + channel = _wrappedChannel; + } + return channel; +} + +@end + +@interface GRPCChannelPool () + +- (instancetype)initPrivate NS_DESIGNATED_INITIALIZER; + +@end + +@implementation GRPCChannelPool { + NSMutableDictionary<GRPCChannelConfiguration *, GRPCPooledChannel *> *_channelPool; +} + ++ (instancetype)sharedInstance { + dispatch_once(&gInitChannelPool, ^{ + gChannelPool = [[GRPCChannelPool alloc] initPrivate]; + NSAssert(gChannelPool != nil, @"Cannot initialize global channel pool."); + }); + return gChannelPool; +} + +- (instancetype)initPrivate { + if ((self = [super init])) { + _channelPool = [NSMutableDictionary dictionary]; + + // Connectivity monitor is not required for CFStream + char *enableCFStream = getenv(kCFStreamVarName); + if (enableCFStream == nil || enableCFStream[0] != '1') { + [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChange:)]; + } + } + return self; +} + +- (void)dealloc { + [GRPCConnectivityMonitor unregisterObserver:self]; +} + +- (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions { + NSAssert(host.length > 0, @"Host must not be empty."); + NSAssert(callOptions != nil, @"callOptions must not be empty."); + if (host.length == 0 || callOptions == nil) { + return nil; + } + + GRPCPooledChannel *pooledChannel = nil; + GRPCChannelConfiguration *configuration = + [[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions]; + @synchronized(self) { + pooledChannel = _channelPool[configuration]; + if (pooledChannel == nil) { + pooledChannel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration]; + _channelPool[configuration] = pooledChannel; + } + } + return pooledChannel; +} + +- (void)disconnectAllChannels { + NSArray<GRPCPooledChannel *> *copiedPooledChannels; + @synchronized(self) { + copiedPooledChannels = _channelPool.allValues; + } + + // Disconnect pooled channels. + for (GRPCPooledChannel *pooledChannel in copiedPooledChannels) { + [pooledChannel disconnect]; + } +} + +- (void)connectivityChange:(NSNotification *)note { + [self disconnectAllChannels]; +} + +@end + +@implementation GRPCChannelPool (Test) + +- (instancetype)initTestPool { + return [self initPrivate]; +} + +@end |