From 8fef0c87893ceda1c5bb7aaa09ddb8dfaefacbdb Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 17 Oct 2018 16:59:00 -0700 Subject: Rewrite the channel pool --- src/objective-c/GRPCClient/private/GRPCChannel.m | 204 +++++++++++++++++++---- 1 file changed, 176 insertions(+), 28 deletions(-) (limited to 'src/objective-c/GRPCClient/private/GRPCChannel.m') diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m index bea75d25a9..9e7e1ea1fc 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannel.m +++ b/src/objective-c/GRPCClient/private/GRPCChannel.m @@ -24,7 +24,6 @@ #import "GRPCChannelFactory.h" #import "GRPCChannelPool.h" #import "GRPCCompletionQueue.h" -#import "GRPCConnectivityMonitor.h" #import "GRPCCronetChannelFactory.h" #import "GRPCInsecureChannelFactory.h" #import "GRPCSecureChannelFactory.h" @@ -33,38 +32,180 @@ #import #import +// When all calls of a channel are destroyed, destroy the channel after this much seconds. +NSTimeInterval kChannelDestroyDelay = 30; + +/** + * Time the channel destroy when the channel's calls are unreffed. If there's new call, reset the + * timer. + */ +@interface GRPCChannelRef : NSObject + +- (instancetype)initWithDestroyDelay:(NSTimeInterval)destroyDelay + destroyChannelCallback:(void (^)())destroyChannelCallback; + +/** Add call ref count to the channel and maybe reset the timer. */ +- (void)refChannel; + +/** Reduce call ref count to the channel and maybe set the timer. */ +- (void)unrefChannel; + +/** Disconnect the channel immediately. */ +- (void)disconnect; + +@end + +@implementation GRPCChannelRef { + NSTimeInterval _destroyDelay; + // We use dispatch queue for this purpose since timer invalidation must happen on the same + // thread which issued the timer. + dispatch_queue_t _dispatchQueue; + void (^_destroyChannelCallback)(); + + NSUInteger _refCount; + NSTimer *_timer; + BOOL _disconnected; +} + +- (instancetype)initWithDestroyDelay:(NSTimeInterval)destroyDelay + destroyChannelCallback:(void (^)())destroyChannelCallback { + if ((self = [super init])) { + _destroyDelay = destroyDelay; + _destroyChannelCallback = destroyChannelCallback; + + _refCount = 1; + _timer = nil; + _disconnected = NO; + } + return self; +} + +// This function is protected by channel dispatch queue. +- (void)refChannel { + if (!_disconnected) { + _refCount++; + if (_timer) { + [_timer invalidate]; + _timer = nil; + } + } +} + +// This function is protected by channel dispatch queue. +- (void)unrefChannel { + if (!_disconnected) { + _refCount--; + if (_refCount == 0) { + if (_timer) { + [_timer invalidate]; + } + _timer = [NSTimer scheduledTimerWithTimeInterval:self->_destroyDelay + target:self + selector:@selector(timerFire:) + userInfo:nil + repeats:NO]; + } + } +} + +// This function is protected by channel dispatch queue. +- (void)disconnect { + if (!_disconnected) { + if (self->_timer != nil) { + [self->_timer invalidate]; + self->_timer = nil; + } + _disconnected = YES; + // Break retain loop + _destroyChannelCallback = nil; + } +} + +// This function is protected by channel dispatch queue. +- (void)timerFire:(NSTimer *)timer { + if (_disconnected || _timer == nil || _timer != timer) { + return; + } + _timer = nil; + _destroyChannelCallback(); + // Break retain loop + _destroyChannelCallback = nil; + _disconnected = YES; +} + +@end + @implementation GRPCChannel { GRPCChannelConfiguration *_configuration; grpc_channel *_unmanagedChannel; + GRPCChannelRef *_channelRef; + dispatch_queue_t _dispatchQueue; } - (grpc_call *)unmanagedCallWithPath:(NSString *)path completionQueue:(nonnull GRPCCompletionQueue *)queue callOptions:(GRPCCallOptions *)callOptions { - NSString *serverAuthority = callOptions.serverAuthority; - NSTimeInterval timeout = callOptions.timeout; - GPR_ASSERT(timeout >= 0); - grpc_slice host_slice = grpc_empty_slice(); - if (serverAuthority) { - host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String); - } - grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String); - gpr_timespec deadline_ms = + __block grpc_call *call = nil; + dispatch_sync(_dispatchQueue, ^{ + if (self->_unmanagedChannel) { + NSString *serverAuthority = callOptions.serverAuthority; + NSTimeInterval timeout = callOptions.timeout; + GPR_ASSERT(timeout >= 0); + grpc_slice host_slice = grpc_empty_slice(); + if (serverAuthority) { + host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String); + } + grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String); + gpr_timespec deadline_ms = timeout == 0 ? gpr_inf_future(GPR_CLOCK_REALTIME) - : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN)); - grpc_call *call = grpc_channel_create_call( - _unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS, queue.unmanagedQueue, path_slice, - serverAuthority ? &host_slice : NULL, deadline_ms, NULL); - if (serverAuthority) { - grpc_slice_unref(host_slice); - } - grpc_slice_unref(path_slice); + : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN)); + call = grpc_channel_create_call( + self->_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS, queue.unmanagedQueue, path_slice, + serverAuthority ? &host_slice : NULL, deadline_ms, NULL); + if (serverAuthority) { + grpc_slice_unref(host_slice); + } + grpc_slice_unref(path_slice); + } + }); return call; } +- (void)unmanagedCallRef { + dispatch_async(_dispatchQueue, ^{ + if (self->_unmanagedChannel) { + [self->_channelRef refChannel]; + } + }); +} + - (void)unmanagedCallUnref { - [gChannelPool unrefChannelWithConfiguration:_configuration]; + dispatch_async(_dispatchQueue, ^{ + if (self->_unmanagedChannel) { + [self->_channelRef unrefChannel]; + } + }); +} + +- (void)disconnect { + dispatch_async(_dispatchQueue, ^{ + if (self->_unmanagedChannel) { + grpc_channel_destroy(self->_unmanagedChannel); + self->_unmanagedChannel = nil; + [self->_channelRef disconnect]; + } + }); +} + +- (void)destroyChannel { + dispatch_async(_dispatchQueue, ^{ + if (self->_unmanagedChannel) { + grpc_channel_destroy(self->_unmanagedChannel); + self->_unmanagedChannel = nil; + [gChannelPool removeChannelWithConfiguration:self->_configuration]; + } + }); } - (nullable instancetype)initWithUnmanagedChannel:(nullable grpc_channel *)unmanagedChannel @@ -72,12 +213,22 @@ if ((self = [super init])) { _unmanagedChannel = unmanagedChannel; _configuration = configuration; + _channelRef = [[GRPCChannelRef alloc] initWithDestroyDelay:kChannelDestroyDelay destroyChannelCallback:^{ + [self destroyChannel]; + }]; + if (@available(iOS 8.0, *)) { + _dispatchQueue = dispatch_queue_create(NULL, dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, -1)); + } else { + _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); + } } return self; } - (void)dealloc { - grpc_channel_destroy(_unmanagedChannel); + if (_unmanagedChannel) { + grpc_channel_destroy(_unmanagedChannel); + } } + (nullable instancetype)createChannelWithConfiguration:(GRPCChannelConfiguration *)config { @@ -88,7 +239,7 @@ NSDictionary *channelArgs; if (config.callOptions.additionalChannelArgs.count != 0) { - NSMutableDictionary *args = [config.channelArgs copy]; + NSMutableDictionary *args = [config.channelArgs mutableCopy]; [args addEntriesFromDictionary:config.callOptions.additionalChannelArgs]; channelArgs = args; } else { @@ -115,15 +266,12 @@ static GRPCChannelPool *gChannelPool; GRPCChannelConfiguration *channelConfig = [[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions]; - return [gChannelPool channelWithConfiguration:channelConfig - createChannelCallback:^{ - return - [GRPCChannel createChannelWithConfiguration:channelConfig]; - }]; + + return [gChannelPool channelWithConfiguration:channelConfig]; } + (void)closeOpenConnections { - [gChannelPool clear]; + [gChannelPool removeAndCloseAllChannels]; } @end -- cgit v1.2.3