diff options
author | 2018-11-08 22:01:10 -0800 | |
---|---|---|
committer | 2018-11-08 22:01:10 -0800 | |
commit | 37dbad80d5254f9bf17076d12b22b7a081e6e9dc (patch) | |
tree | 367a86331b789917812b15ec4dde636b32560023 /src/objective-c/GRPCClient/private/GRPCChannel.m | |
parent | d72d5b2c8eaa8a434a7db4624fe6a45bc0d6bde4 (diff) |
Refactor channel pool
Diffstat (limited to 'src/objective-c/GRPCClient/private/GRPCChannel.m')
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCChannel.m | 397 |
1 files changed, 214 insertions, 183 deletions
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m index 773f4261d7..298b6605d1 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannel.m +++ b/src/objective-c/GRPCClient/private/GRPCChannel.m @@ -28,141 +28,222 @@ #import "GRPCInsecureChannelFactory.h" #import "GRPCSecureChannelFactory.h" #import "version.h" +#import "../internal/GRPCCallOptions+Internal.h" #import <GRPCClient/GRPCCall+Cronet.h> #import <GRPCClient/GRPCCallOptions.h> /** When all calls of a channel are destroyed, destroy the channel after this much seconds. */ -NSTimeInterval kChannelDestroyDelay = 30; +NSTimeInterval kDefaultChannelDestroyDelay = 30; -/** Global instance of channel pool. */ -static GRPCChannelPool *gChannelPool; +@implementation GRPCChannelConfiguration -/** - * Time the channel destroy when the channel's calls are unreffed. If there's new call, reset the - * timer. - */ -@interface GRPCChannelRef : NSObject +- (nullable instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions { + NSAssert(host.length, @"Host must not be empty."); + NSAssert(callOptions, @"callOptions must not be empty."); + if ((self = [super init])) { + _host = [host copy]; + _callOptions = [callOptions copy]; + } + return self; +} -- (instancetype)initWithDestroyDelay:(NSTimeInterval)destroyDelay - destroyChannelCallback:(void (^)())destroyChannelCallback; +- (id<GRPCChannelFactory>)channelFactory { + NSError *error; + id<GRPCChannelFactory> factory; + GRPCTransportType type = _callOptions.transportType; + switch (type) { + case GRPCTransportTypeChttp2BoringSSL: + // TODO (mxyan): Remove when the API is deprecated +#ifdef GRPC_COMPILE_WITH_CRONET + if (![GRPCCall isUsingCronet]) { +#endif + factory = [GRPCSecureChannelFactory + factoryWithPEMRootCertificates:_callOptions.PEMRootCertificates + privateKey:_callOptions.PEMPrivateKey + certChain:_callOptions.PEMCertChain + error:&error]; + if (factory == nil) { + NSLog(@"Error creating secure channel factory: %@", error); + } + return factory; +#ifdef GRPC_COMPILE_WITH_CRONET + } +#endif + // fallthrough + case GRPCTransportTypeCronet: + return [GRPCCronetChannelFactory sharedInstance]; + case GRPCTransportTypeInsecure: + return [GRPCInsecureChannelFactory sharedInstance]; + } +} -/** Add call ref count to the channel and maybe reset the timer. */ -- (void)refChannel; +- (NSDictionary *)channelArgs { + NSMutableDictionary *args = [NSMutableDictionary new]; -/** Reduce call ref count to the channel and maybe set the timer. */ -- (void)unrefChannel; + NSString *userAgent = @"grpc-objc/" GRPC_OBJC_VERSION_STRING; + NSString *userAgentPrefix = _callOptions.userAgentPrefix; + if (userAgentPrefix) { + args[@GRPC_ARG_PRIMARY_USER_AGENT_STRING] = + [_callOptions.userAgentPrefix stringByAppendingFormat:@" %@", userAgent]; + } else { + args[@GRPC_ARG_PRIMARY_USER_AGENT_STRING] = userAgent; + } -/** Disconnect the channel. Any further ref/unref are discarded. */ -- (void)disconnect; + NSString *hostNameOverride = _callOptions.hostNameOverride; + if (hostNameOverride) { + args[@GRPC_SSL_TARGET_NAME_OVERRIDE_ARG] = hostNameOverride; + } -@end + if (_callOptions.responseSizeLimit) { + args[@GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH] = + [NSNumber numberWithUnsignedInteger:_callOptions.responseSizeLimit]; + } -@implementation GRPCChannelRef { - NSTimeInterval _destroyDelay; - void (^_destroyChannelCallback)(); + if (_callOptions.compressionAlgorithm != GRPC_COMPRESS_NONE) { + args[@GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM] = + [NSNumber numberWithInt:_callOptions.compressionAlgorithm]; + } - NSUInteger _refCount; - BOOL _disconnected; - dispatch_queue_t _dispatchQueue; + if (_callOptions.keepaliveInterval != 0) { + args[@GRPC_ARG_KEEPALIVE_TIME_MS] = + [NSNumber numberWithUnsignedInteger:(NSUInteger)(_callOptions.keepaliveInterval * 1000)]; + args[@GRPC_ARG_KEEPALIVE_TIMEOUT_MS] = + [NSNumber numberWithUnsignedInteger:(NSUInteger)(_callOptions.keepaliveTimeout * 1000)]; + } - /** - * Date and time when last timer is scheduled. If a firing timer's scheduled date is different - * from this, it is discarded. - */ - NSDate *_lastDispatch; -} + if (_callOptions.retryEnabled == NO) { + args[@GRPC_ARG_ENABLE_RETRIES] = [NSNumber numberWithInt:_callOptions.retryEnabled]; + } -- (instancetype)initWithDestroyDelay:(NSTimeInterval)destroyDelay - destroyChannelCallback:(void (^)())destroyChannelCallback { - if ((self = [super init])) { - _destroyDelay = destroyDelay; - _destroyChannelCallback = destroyChannelCallback; + if (_callOptions.connectMinTimeout > 0) { + args[@GRPC_ARG_MIN_RECONNECT_BACKOFF_MS] = + [NSNumber numberWithUnsignedInteger:(NSUInteger)(_callOptions.connectMinTimeout * 1000)]; + } + if (_callOptions.connectInitialBackoff > 0) { + args[@GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS] = [NSNumber + numberWithUnsignedInteger:(NSUInteger)(_callOptions.connectInitialBackoff * 1000)]; + } + if (_callOptions.connectMaxBackoff > 0) { + args[@GRPC_ARG_MAX_RECONNECT_BACKOFF_MS] = + [NSNumber numberWithUnsignedInteger:(NSUInteger)(_callOptions.connectMaxBackoff * 1000)]; + } - _refCount = 1; - _disconnected = NO; - if (@available(iOS 8.0, *)) { - _dispatchQueue = dispatch_queue_create( - NULL, - dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, -1)); - } else { - _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); - } - _lastDispatch = nil; + if (_callOptions.logContext != nil) { + args[@GRPC_ARG_MOBILE_LOG_CONTEXT] = _callOptions.logContext; } - return self; -} -- (void)refChannel { - dispatch_async(_dispatchQueue, ^{ - if (!self->_disconnected) { - self->_refCount++; - self->_lastDispatch = nil; - } - }); + if (_callOptions.channelPoolDomain.length != 0) { + args[@GRPC_ARG_CHANNEL_POOL_DOMAIN] = _callOptions.channelPoolDomain; + } + + [args addEntriesFromDictionary:_callOptions.additionalChannelArgs]; + + return args; } -- (void)unrefChannel { - dispatch_async(_dispatchQueue, ^{ - if (!self->_disconnected) { - self->_refCount--; - if (self->_refCount == 0) { - NSDate *now = [NSDate date]; - self->_lastDispatch = now; - dispatch_time_t delay = - dispatch_time(DISPATCH_TIME_NOW, (int64_t)self->_destroyDelay * NSEC_PER_SEC); - dispatch_after(delay, self->_dispatchQueue, ^{ - [self timedDisconnectWithScheduleDate:now]; - }); - } - } - }); +- (nonnull id)copyWithZone:(nullable NSZone *)zone { + GRPCChannelConfiguration *newConfig = + [[GRPCChannelConfiguration alloc] initWithHost:_host callOptions:_callOptions]; + + return newConfig; } -- (void)disconnect { - dispatch_async(_dispatchQueue, ^{ - if (!self->_disconnected) { - self->_lastDispatch = nil; - self->_disconnected = YES; - // Break retain loop - self->_destroyChannelCallback = nil; - } - }); +- (BOOL)isEqual:(id)object { + if (![object isKindOfClass:[GRPCChannelConfiguration class]]) { + return NO; + } + GRPCChannelConfiguration *obj = (GRPCChannelConfiguration *)object; + if (!(obj.host == _host || (_host != nil && [obj.host isEqualToString:_host]))) return NO; + if (!(obj.callOptions == _callOptions || [obj.callOptions hasChannelOptionsEqualTo:_callOptions])) + return NO; + + return YES; } -- (void)timedDisconnectWithScheduleDate:(NSDate *)scheduleDate { - dispatch_async(_dispatchQueue, ^{ - if (self->_disconnected || self->_lastDispatch != scheduleDate) { - return; - } - self->_lastDispatch = nil; - self->_disconnected = YES; - self->_destroyChannelCallback(); - // Break retain loop - self->_destroyChannelCallback = nil; - }); +- (NSUInteger)hash { + NSUInteger result = 0; + result ^= _host.hash; + result ^= _callOptions.channelOptionsHash; + + return result; } @end + + @implementation GRPCChannel { GRPCChannelConfiguration *_configuration; - grpc_channel *_unmanagedChannel; - GRPCChannelRef *_channelRef; + dispatch_queue_t _dispatchQueue; + grpc_channel *_unmanagedChannel; + NSTimeInterval _destroyDelay; + + NSUInteger _refcount; + NSDate *_lastDispatch; +} +@synthesize disconnected = _disconnected; + +- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration { + return [self initWithChannelConfiguration:channelConfiguration + destroyDelay:kDefaultChannelDestroyDelay]; +} + +- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration + destroyDelay:(NSTimeInterval)destroyDelay { + NSAssert(channelConfiguration, @"channelConfiguration must not be empty."); + NSAssert(destroyDelay > 0, @"destroyDelay must be greater than 0."); + if ((self = [super init])) { + _configuration = [channelConfiguration copy]; + if (@available(iOS 8.0, *)) { + _dispatchQueue = dispatch_queue_create( + NULL, + dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, -1)); + } else { + _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + } + + // Create gRPC core channel object. + NSString *host = channelConfiguration.host; + NSAssert(host.length != 0, @"host cannot be nil"); + NSDictionary *channelArgs; + if (channelConfiguration.callOptions.additionalChannelArgs.count != 0) { + NSMutableDictionary *args = [channelConfiguration.channelArgs mutableCopy]; + [args addEntriesFromDictionary:channelConfiguration.callOptions.additionalChannelArgs]; + channelArgs = args; + } else { + channelArgs = channelConfiguration.channelArgs; + } + id<GRPCChannelFactory> factory = channelConfiguration.channelFactory; + _unmanagedChannel = [factory createChannelWithHost:host channelArgs:channelArgs]; + if (_unmanagedChannel == NULL) { + NSLog(@"Unable to create channel."); + return nil; + } + _destroyDelay = destroyDelay; + _disconnected = NO; + } + return self; } - (grpc_call *)unmanagedCallWithPath:(NSString *)path completionQueue:(GRPCCompletionQueue *)queue - callOptions:(GRPCCallOptions *)callOptions { + callOptions:(GRPCCallOptions *)callOptions + disconnected:(BOOL *)disconnected { NSAssert(path.length, @"path must not be empty."); NSAssert(queue, @"completionQueue must not be empty."); NSAssert(callOptions, @"callOptions must not be empty."); - __block grpc_call *call = nil; + __block BOOL isDisconnected = NO; + __block grpc_call *call = NULL; dispatch_sync(_dispatchQueue, ^{ - if (self->_unmanagedChannel) { + if (self->_disconnected) { + isDisconnected = YES; + } else { + NSAssert(self->_unmanagedChannel != NULL, @"Invalid channel."); + NSString *serverAuthority = - callOptions.transportType == GRPCTransportTypeCronet ? nil : callOptions.serverAuthority; + callOptions.transportType == GRPCTransportTypeCronet ? nil : callOptions.serverAuthority; NSTimeInterval timeout = callOptions.timeout; NSAssert(timeout >= 0, @"Invalid timeout"); grpc_slice host_slice = grpc_empty_slice(); @@ -171,10 +252,10 @@ static GRPCChannelPool *gChannelPool; } grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String); gpr_timespec deadline_ms = - timeout == 0 - ? gpr_inf_future(GPR_CLOCK_REALTIME) - : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN)); + timeout == 0 + ? gpr_inf_future(GPR_CLOCK_REALTIME) + : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN)); call = grpc_channel_create_call(self->_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS, queue.unmanagedQueue, path_slice, serverAuthority ? &host_slice : NULL, deadline_ms, NULL); @@ -182,71 +263,64 @@ static GRPCChannelPool *gChannelPool; grpc_slice_unref(host_slice); } grpc_slice_unref(path_slice); - } else { - NSAssert(self->_unmanagedChannel != nil, @"Invalid channeg."); + if (call == NULL) { + NSLog(@"Unable to create call."); + } else { + // Ref the channel; + [self ref]; + } } }); + if (disconnected != nil) { + *disconnected = isDisconnected; + } return call; } +// This function should be called on _dispatchQueue. - (void)ref { - dispatch_async(_dispatchQueue, ^{ - if (self->_unmanagedChannel) { - [self->_channelRef refChannel]; - } - }); + _refcount++; + if (_refcount == 1 && _lastDispatch != nil) { + _lastDispatch = nil; + } } - (void)unref { dispatch_async(_dispatchQueue, ^{ - if (self->_unmanagedChannel) { - [self->_channelRef unrefChannel]; + self->_refcount--; + if (self->_refcount == 0 && !self->_disconnected) { + // Start timer. + dispatch_time_t delay = + dispatch_time(DISPATCH_TIME_NOW, (int64_t)self->_destroyDelay * NSEC_PER_SEC); + NSDate *now = [NSDate date]; + self->_lastDispatch = now; + dispatch_after(delay, self->_dispatchQueue, ^{ + if (self->_lastDispatch == now) { + grpc_channel_destroy(self->_unmanagedChannel); + self->_unmanagedChannel = NULL; + self->_disconnected = YES; + } + }); } }); } - (void)disconnect { dispatch_async(_dispatchQueue, ^{ - if (self->_unmanagedChannel) { + if (!self->_disconnected) { grpc_channel_destroy(self->_unmanagedChannel); self->_unmanagedChannel = nil; - [self->_channelRef disconnect]; + self->_disconnected = YES; } }); } -- (void)destroyChannel { - dispatch_async(_dispatchQueue, ^{ - if (self->_unmanagedChannel) { - grpc_channel_destroy(self->_unmanagedChannel); - self->_unmanagedChannel = nil; - [gChannelPool removeChannel:self]; - } +- (BOOL)disconnected { + __block BOOL disconnected; + dispatch_sync(_dispatchQueue, ^{ + disconnected = self->_disconnected; }); -} - -- (nullable instancetype)initWithUnmanagedChannel:(grpc_channel *_Nullable)unmanagedChannel - configuration:(GRPCChannelConfiguration *)configuration { - NSAssert(configuration, @"Configuration must not be empty."); - if (!unmanagedChannel) { - return nil; - } - if ((self = [super init])) { - _unmanagedChannel = unmanagedChannel; - _configuration = [configuration copy]; - _channelRef = [[GRPCChannelRef alloc] initWithDestroyDelay:kChannelDestroyDelay - destroyChannelCallback:^{ - [self destroyChannel]; - }]; - if (@available(iOS 8.0, *)) { - _dispatchQueue = dispatch_queue_create( - NULL, - dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, -1)); - } else { - _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); - } - } - return self; + return disconnected; } - (void)dealloc { @@ -255,47 +329,4 @@ static GRPCChannelPool *gChannelPool; } } -+ (nullable instancetype)createChannelWithConfiguration:(GRPCChannelConfiguration *)config { - NSAssert(config != nil, @"configuration cannot be empty"); - NSString *host = config.host; - NSAssert(host.length != 0, @"host cannot be nil"); - - NSDictionary *channelArgs; - if (config.callOptions.additionalChannelArgs.count != 0) { - NSMutableDictionary *args = [config.channelArgs mutableCopy]; - [args addEntriesFromDictionary:config.callOptions.additionalChannelArgs]; - channelArgs = args; - } else { - channelArgs = config.channelArgs; - } - id<GRPCChannelFactory> factory = config.channelFactory; - grpc_channel *unmanaged_channel = [factory createChannelWithHost:host channelArgs:channelArgs]; - return [[GRPCChannel alloc] initWithUnmanagedChannel:unmanaged_channel configuration:config]; -} - -+ (nullable instancetype)channelWithHost:(NSString *)host - callOptions:(GRPCCallOptions *)callOptions { - static dispatch_once_t initChannelPool; - dispatch_once(&initChannelPool, ^{ - gChannelPool = [[GRPCChannelPool alloc] init]; - }); - - NSURL *hostURL = [NSURL URLWithString:[@"https://" stringByAppendingString:host]]; - if (hostURL.host && !hostURL.port) { - host = [hostURL.host stringByAppendingString:@":443"]; - } - - GRPCChannelConfiguration *channelConfig = - [[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions]; - if (channelConfig == nil) { - return nil; - } - - return [gChannelPool channelWithConfiguration:channelConfig]; -} - -+ (void)closeOpenConnections { - [gChannelPool removeAndCloseAllChannels]; -} - @end |