aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/GRPCClient/private/GRPCChannelPool.m
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2018-10-08 15:52:27 -0700
committerGravatar Muxi Yan <mxyan@google.com>2018-10-08 15:52:27 -0700
commit309ba191525a96c5314e5762ecaa2fffbc9eccbb (patch)
treeff431202f113bf6b11bac5a48282fa8394faccf3 /src/objective-c/GRPCClient/private/GRPCChannelPool.m
parent0fd4727defda5f8bef106a1f3b59263886b9b6c6 (diff)
Channel pool
Diffstat (limited to 'src/objective-c/GRPCClient/private/GRPCChannelPool.m')
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannelPool.m387
1 files changed, 387 insertions, 0 deletions
diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.m b/src/objective-c/GRPCClient/private/GRPCChannelPool.m
new file mode 100644
index 0000000000..b5f0949ef7
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.m
@@ -0,0 +1,387 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#import <Foundation/Foundation.h>
+
+#import "GRPCChannelFactory.h"
+#import "GRPCChannelPool.h"
+#import "GRPCConnectivityMonitor.h"
+#import "GRPCCronetChannelFactory.h"
+#import "GRPCInsecureChannelFactory.h"
+#import "GRPCSecureChannelFactory.h"
+#import "version.h"
+
+#import <GRPCClient/GRPCCall+Cronet.h>
+#include <grpc/support/log.h>
+
+extern const char *kCFStreamVarName;
+
+// When all calls of a channel are destroyed, destroy the channel after this much seconds.
+const NSTimeInterval kChannelDestroyDelay = 30;
+
+@implementation GRPCChannelConfiguration
+
+- (nullable instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions {
+ if ((self = [super init])) {
+ _host = host;
+ _callOptions = callOptions;
+ }
+ return self;
+}
+
+- (id<GRPCChannelFactory>)channelFactory {
+ NSError *error;
+ id<GRPCChannelFactory> factory;
+ GRPCTransportType type = _callOptions.transportType;
+ switch (type) {
+ case GRPCTransportTypeDefault:
+ // TODO (mxyan): Remove when the API is deprecated
+#ifdef GRPC_COMPILE_WITH_CRONET
+ if (![GRPCCall isUsingCronet]) {
+#endif
+ factory = [GRPCSecureChannelFactory factoryWithPEMRootCerts:_callOptions.pemRootCert
+ privateKey:_callOptions.pemPrivateKey
+ certChain:_callOptions.pemCertChain
+ error:&error];
+ if (error) {
+ NSLog(@"Error creating secure channel factory: %@", error);
+ return nil;
+ }
+ return factory;
+#ifdef GRPC_COMPILE_WITH_CRONET
+ }
+#endif
+ // fallthrough
+ case GRPCTransportTypeCronet:
+ return [GRPCCronetChannelFactory sharedInstance];
+ case GRPCTransportTypeInsecure:
+ return [GRPCInsecureChannelFactory sharedInstance];
+ default:
+ GPR_UNREACHABLE_CODE(return nil);
+ }
+}
+
+- (NSMutableDictionary *)channelArgs {
+ NSMutableDictionary *args = [NSMutableDictionary new];
+
+ NSString *userAgent = @"grpc-objc/" GRPC_OBJC_VERSION_STRING;
+ NSString *userAgentPrefix = _callOptions.userAgentPrefix;
+ if (userAgentPrefix) {
+ args[@GRPC_ARG_PRIMARY_USER_AGENT_STRING] =
+ [_callOptions.userAgentPrefix stringByAppendingFormat:@" %@", userAgent];
+ } else {
+ args[@GRPC_ARG_PRIMARY_USER_AGENT_STRING] = userAgent;
+ }
+
+ NSString *hostNameOverride = _callOptions.hostNameOverride;
+ if (hostNameOverride) {
+ args[@GRPC_SSL_TARGET_NAME_OVERRIDE_ARG] = hostNameOverride;
+ }
+
+ if (_callOptions.responseSizeLimit) {
+ args[@GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH] =
+ [NSNumber numberWithUnsignedInteger:_callOptions.responseSizeLimit];
+ }
+
+ if (_callOptions.compressAlgorithm != GRPC_COMPRESS_NONE) {
+ args[@GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM] =
+ [NSNumber numberWithInt:_callOptions.compressAlgorithm];
+ }
+
+ if (_callOptions.keepaliveInterval != 0) {
+ args[@GRPC_ARG_KEEPALIVE_TIME_MS] =
+ [NSNumber numberWithUnsignedInteger:(unsigned int)(_callOptions.keepaliveInterval * 1000)];
+ args[@GRPC_ARG_KEEPALIVE_TIMEOUT_MS] =
+ [NSNumber numberWithUnsignedInteger:(unsigned int)(_callOptions.keepaliveTimeout * 1000)];
+ }
+
+ if (_callOptions.enableRetry == NO) {
+ args[@GRPC_ARG_ENABLE_RETRIES] = [NSNumber numberWithInt:_callOptions.enableRetry];
+ }
+
+ if (_callOptions.connectMinTimeout > 0) {
+ args[@GRPC_ARG_MIN_RECONNECT_BACKOFF_MS] =
+ [NSNumber numberWithUnsignedInteger:(unsigned int)(_callOptions.connectMinTimeout * 1000)];
+ }
+ if (_callOptions.connectInitialBackoff > 0) {
+ args[@GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS] = [NSNumber
+ numberWithUnsignedInteger:(unsigned int)(_callOptions.connectInitialBackoff * 1000)];
+ }
+ if (_callOptions.connectMaxBackoff > 0) {
+ args[@GRPC_ARG_MAX_RECONNECT_BACKOFF_MS] =
+ [NSNumber numberWithUnsignedInteger:(unsigned int)(_callOptions.connectMaxBackoff * 1000)];
+ }
+
+ if (_callOptions.logContext != nil) {
+ args[@GRPC_ARG_MOBILE_LOG_CONTEXT] = _callOptions.logContext;
+ }
+
+ if (_callOptions.channelPoolDomain != nil) {
+ args[@GRPC_ARG_CHANNEL_POOL_DOMAIN] = _callOptions.channelPoolDomain;
+ }
+
+ [args addEntriesFromDictionary:_callOptions.additionalChannelArgs];
+
+ return args;
+}
+
+- (nonnull id)copyWithZone:(nullable NSZone *)zone {
+ GRPCChannelConfiguration *newConfig = [[GRPCChannelConfiguration alloc] init];
+ newConfig.host = _host;
+ newConfig.callOptions = _callOptions;
+
+ return newConfig;
+}
+
+- (BOOL)isEqual:(id)object {
+ NSAssert([object isKindOfClass:[GRPCChannelConfiguration class]], @"Illegal :isEqual");
+ GRPCChannelConfiguration *obj = (GRPCChannelConfiguration *)object;
+ if (!(obj.host == _host || [obj.host isEqualToString:_host])) return NO;
+ if (!(obj.callOptions.userAgentPrefix == _callOptions.userAgentPrefix ||
+ [obj.callOptions.userAgentPrefix isEqualToString:_callOptions.userAgentPrefix]))
+ return NO;
+ if (!(obj.callOptions.responseSizeLimit == _callOptions.responseSizeLimit)) return NO;
+ if (!(obj.callOptions.compressAlgorithm == _callOptions.compressAlgorithm)) return NO;
+ if (!(obj.callOptions.enableRetry == _callOptions.enableRetry)) return NO;
+ if (!(obj.callOptions.keepaliveInterval == _callOptions.keepaliveInterval)) return NO;
+ if (!(obj.callOptions.keepaliveTimeout == _callOptions.keepaliveTimeout)) return NO;
+ if (!(obj.callOptions.connectMinTimeout == _callOptions.connectMinTimeout)) return NO;
+ if (!(obj.callOptions.connectInitialBackoff == _callOptions.connectInitialBackoff)) return NO;
+ if (!(obj.callOptions.connectMaxBackoff == _callOptions.connectMaxBackoff)) return NO;
+ if (!(obj.callOptions.additionalChannelArgs == _callOptions.additionalChannelArgs ||
+ [obj.callOptions.additionalChannelArgs
+ isEqualToDictionary:_callOptions.additionalChannelArgs]))
+ return NO;
+ if (!(obj.callOptions.pemRootCert == _callOptions.pemRootCert ||
+ [obj.callOptions.pemRootCert isEqualToString:_callOptions.pemRootCert]))
+ return NO;
+ if (!(obj.callOptions.pemPrivateKey == _callOptions.pemPrivateKey ||
+ [obj.callOptions.pemPrivateKey isEqualToString:_callOptions.pemPrivateKey]))
+ return NO;
+ if (!(obj.callOptions.pemCertChain == _callOptions.pemCertChain ||
+ [obj.callOptions.pemCertChain isEqualToString:_callOptions.pemCertChain]))
+ return NO;
+ if (!(obj.callOptions.hostNameOverride == _callOptions.hostNameOverride ||
+ [obj.callOptions.hostNameOverride isEqualToString:_callOptions.hostNameOverride]))
+ return NO;
+ if (!(obj.callOptions.transportType == _callOptions.transportType)) return NO;
+ if (!(obj.callOptions.logContext == _callOptions.logContext ||
+ [obj.callOptions.logContext isEqual:_callOptions.logContext]))
+ return NO;
+ if (!(obj.callOptions.channelPoolDomain == _callOptions.channelPoolDomain ||
+ [obj.callOptions.channelPoolDomain isEqualToString:_callOptions.channelPoolDomain]))
+ return NO;
+ if (!(obj.callOptions.channelId == _callOptions.channelId)) return NO;
+
+ return YES;
+}
+
+- (NSUInteger)hash {
+ NSUInteger result = 0;
+ result ^= _host.hash;
+ result ^= _callOptions.userAgentPrefix.hash;
+ result ^= _callOptions.responseSizeLimit;
+ result ^= _callOptions.compressAlgorithm;
+ result ^= _callOptions.enableRetry;
+ result ^= (unsigned int)(_callOptions.keepaliveInterval * 1000);
+ result ^= (unsigned int)(_callOptions.keepaliveTimeout * 1000);
+ result ^= (unsigned int)(_callOptions.connectMinTimeout * 1000);
+ result ^= (unsigned int)(_callOptions.connectInitialBackoff * 1000);
+ result ^= (unsigned int)(_callOptions.connectMaxBackoff * 1000);
+ result ^= _callOptions.additionalChannelArgs.hash;
+ result ^= _callOptions.pemRootCert.hash;
+ result ^= _callOptions.pemPrivateKey.hash;
+ result ^= _callOptions.pemCertChain.hash;
+ result ^= _callOptions.hostNameOverride.hash;
+ result ^= _callOptions.transportType;
+ result ^= [_callOptions.logContext hash];
+ result ^= _callOptions.channelPoolDomain.hash;
+ result ^= _callOptions.channelId;
+
+ return result;
+}
+
+@end
+
+/**
+ * Time the channel destroy when the channel's calls are unreffed. If there's new call, reset the
+ * timer.
+ */
+@interface GRPCChannelCallRef : NSObject
+
+- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)configuration
+ destroyDelay:(NSTimeInterval)destroyDelay
+ dispatchQueue:(dispatch_queue_t)dispatchQueue
+ destroyChannel:(void (^)())destroyChannel;
+
+/** Add call ref count to the channel and maybe reset the timer. */
+- (void)refChannel;
+
+/** Reduce call ref count to the channel and maybe set the timer. */
+- (void)unrefChannel;
+
+@end
+
+@implementation GRPCChannelCallRef {
+ GRPCChannelConfiguration *_configuration;
+ NSTimeInterval _destroyDelay;
+ // We use dispatch queue for this purpose since timer invalidation must happen on the same
+ // thread which issued the timer.
+ dispatch_queue_t _dispatchQueue;
+ void (^_destroyChannel)();
+
+ NSUInteger _refCount;
+ NSTimer *_timer;
+}
+
+- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)configuration
+ destroyDelay:(NSTimeInterval)destroyDelay
+ dispatchQueue:(dispatch_queue_t)dispatchQueue
+ destroyChannel:(void (^)())destroyChannel {
+ if ((self = [super init])) {
+ _configuration = configuration;
+ _destroyDelay = destroyDelay;
+ _dispatchQueue = dispatchQueue;
+ _destroyChannel = destroyChannel;
+
+ _refCount = 0;
+ _timer = nil;
+ }
+ return self;
+}
+
+// This function is protected by channel pool dispatch queue.
+- (void)refChannel {
+ _refCount++;
+ if (_timer) {
+ [_timer invalidate];
+ }
+ _timer = nil;
+}
+
+// This function is protected by channel spool dispatch queue.
+- (void)unrefChannel {
+ self->_refCount--;
+ if (self->_refCount == 0) {
+ if (self->_timer) {
+ [self->_timer invalidate];
+ }
+ self->_timer = [NSTimer scheduledTimerWithTimeInterval:self->_destroyDelay
+ target:self
+ selector:@selector(timerFire:)
+ userInfo:nil
+ repeats:NO];
+ }
+}
+
+- (void)timerFire:(NSTimer *)timer {
+ dispatch_sync(_dispatchQueue, ^{
+ if (self->_timer == nil || self->_timer != timer) {
+ return;
+ }
+ self->_timer = nil;
+ self->_destroyChannel(self->_configuration);
+ });
+}
+
+@end
+
+#pragma mark GRPCChannelPool
+
+@implementation GRPCChannelPool {
+ NSTimeInterval _channelDestroyDelay;
+ NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannel *> *_channelPool;
+ NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannelCallRef *> *_callRefs;
+ // Dedicated queue for timer
+ dispatch_queue_t _dispatchQueue;
+}
+
+- (instancetype)init {
+ return [self initWithChannelDestroyDelay:kChannelDestroyDelay];
+}
+
+- (instancetype)initWithChannelDestroyDelay:(NSTimeInterval)channelDestroyDelay {
+ if ((self = [super init])) {
+ _channelDestroyDelay = channelDestroyDelay;
+ _channelPool = [NSMutableDictionary dictionary];
+ _callRefs = [NSMutableDictionary dictionary];
+ _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+
+ // Connectivity monitor is not required for CFStream
+ char *enableCFStream = getenv(kCFStreamVarName);
+ if (enableCFStream == nil || enableCFStream[0] != '1') {
+ [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChange:)];
+ }
+ }
+ return self;
+}
+
+- (void)dealloc {
+ // Connectivity monitor is not required for CFStream
+ char *enableCFStream = getenv(kCFStreamVarName);
+ if (enableCFStream == nil || enableCFStream[0] != '1') {
+ [GRPCConnectivityMonitor unregisterObserver:self];
+ }
+}
+
+- (GRPCChannel *)channelWithConfiguration:(GRPCChannelConfiguration *)configuration
+ createChannel:(GRPCChannel * (^)(void))createChannel {
+ __block GRPCChannel *channel;
+ dispatch_sync(_dispatchQueue, ^{
+ if ([self->_channelPool objectForKey:configuration]) {
+ [self->_callRefs[configuration] refChannel];
+ channel = self->_channelPool[configuration];
+ } else {
+ channel = createChannel();
+ self->_channelPool[configuration] = channel;
+
+ GRPCChannelCallRef *callRef = [[GRPCChannelCallRef alloc]
+ initWithChannelConfiguration:configuration
+ destroyDelay:self->_channelDestroyDelay
+ dispatchQueue:self->_dispatchQueue
+ destroyChannel:^(GRPCChannelConfiguration *configuration) {
+ [self->_channelPool removeObjectForKey:configuration];
+ [self->_callRefs removeObjectForKey:configuration];
+ }];
+ [callRef refChannel];
+ self->_callRefs[configuration] = callRef;
+ }
+ });
+ return channel;
+}
+
+- (void)unrefChannelWithConfiguration:configuration {
+ dispatch_sync(_dispatchQueue, ^{
+ if ([self->_channelPool objectForKey:configuration]) {
+ [self->_callRefs[configuration] unrefChannel];
+ }
+ });
+}
+
+- (void)clear {
+ dispatch_sync(_dispatchQueue, ^{
+ self->_channelPool = [NSMutableDictionary dictionary];
+ self->_callRefs = [NSMutableDictionary dictionary];
+ });
+}
+
+- (void)connectivityChange:(NSNotification *)note {
+ [self clear];
+}
+
+@end