aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/GRPCClient/private
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2018-10-17 16:59:00 -0700
committerGravatar Muxi Yan <mxyan@google.com>2018-10-17 16:59:00 -0700
commit8fef0c87893ceda1c5bb7aaa09ddb8dfaefacbdb (patch)
tree12c9bc7775d2c8544acafafd4671abd22efdc75d /src/objective-c/GRPCClient/private
parent86ff72bb4736cb9333505baeb324386cfa24bcb9 (diff)
Rewrite the channel pool
Diffstat (limited to 'src/objective-c/GRPCClient/private')
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannel.h13
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannel.m204
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannelPool.h14
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannelPool.m139
4 files changed, 213 insertions, 157 deletions
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.h b/src/objective-c/GRPCClient/private/GRPCChannel.h
index 7d5039a8a2..7a40638dc3 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannel.h
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.h
@@ -40,19 +40,22 @@ struct grpc_channel_credentials;
callOptions:(nullable GRPCCallOptions *)callOptions;
/**
+ * Create a channel object with the signature \a config.
+ */
++ (nullable instancetype)createChannelWithConfiguration:(nonnull GRPCChannelConfiguration *)config;
+
+/**
* Get a grpc core call object from this channel.
*/
- (nullable grpc_call *)unmanagedCallWithPath:(nonnull NSString *)path
completionQueue:(nonnull GRPCCompletionQueue *)queue
callOptions:(nonnull GRPCCallOptions *)callOptions;
+- (void)unmanagedCallRef;
+
- (void)unmanagedCallUnref;
-/**
- * Create a channel object with the signature \a config. This function is used for testing only. Use
- * channelWithHost:callOptions: in production.
- */
-+ (nullable instancetype)createChannelWithConfiguration:(nonnull GRPCChannelConfiguration *)config;
+- (void)disconnect;
// TODO (mxyan): deprecate with GRPCCall:closeOpenConnections
+ (void)closeOpenConnections;
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m
index bea75d25a9..9e7e1ea1fc 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannel.m
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.m
@@ -24,7 +24,6 @@
#import "GRPCChannelFactory.h"
#import "GRPCChannelPool.h"
#import "GRPCCompletionQueue.h"
-#import "GRPCConnectivityMonitor.h"
#import "GRPCCronetChannelFactory.h"
#import "GRPCInsecureChannelFactory.h"
#import "GRPCSecureChannelFactory.h"
@@ -33,38 +32,180 @@
#import <GRPCClient/GRPCCall+Cronet.h>
#import <GRPCClient/GRPCCallOptions.h>
+// When all calls of a channel are destroyed, destroy the channel after this much seconds.
+NSTimeInterval kChannelDestroyDelay = 30;
+
+/**
+ * Time the channel destroy when the channel's calls are unreffed. If there's new call, reset the
+ * timer.
+ */
+@interface GRPCChannelRef : NSObject
+
+- (instancetype)initWithDestroyDelay:(NSTimeInterval)destroyDelay
+ destroyChannelCallback:(void (^)())destroyChannelCallback;
+
+/** Add call ref count to the channel and maybe reset the timer. */
+- (void)refChannel;
+
+/** Reduce call ref count to the channel and maybe set the timer. */
+- (void)unrefChannel;
+
+/** Disconnect the channel immediately. */
+- (void)disconnect;
+
+@end
+
+@implementation GRPCChannelRef {
+ NSTimeInterval _destroyDelay;
+ // We use dispatch queue for this purpose since timer invalidation must happen on the same
+ // thread which issued the timer.
+ dispatch_queue_t _dispatchQueue;
+ void (^_destroyChannelCallback)();
+
+ NSUInteger _refCount;
+ NSTimer *_timer;
+ BOOL _disconnected;
+}
+
+- (instancetype)initWithDestroyDelay:(NSTimeInterval)destroyDelay
+ destroyChannelCallback:(void (^)())destroyChannelCallback {
+ if ((self = [super init])) {
+ _destroyDelay = destroyDelay;
+ _destroyChannelCallback = destroyChannelCallback;
+
+ _refCount = 1;
+ _timer = nil;
+ _disconnected = NO;
+ }
+ return self;
+}
+
+// This function is protected by channel dispatch queue.
+- (void)refChannel {
+ if (!_disconnected) {
+ _refCount++;
+ if (_timer) {
+ [_timer invalidate];
+ _timer = nil;
+ }
+ }
+}
+
+// This function is protected by channel dispatch queue.
+- (void)unrefChannel {
+ if (!_disconnected) {
+ _refCount--;
+ if (_refCount == 0) {
+ if (_timer) {
+ [_timer invalidate];
+ }
+ _timer = [NSTimer scheduledTimerWithTimeInterval:self->_destroyDelay
+ target:self
+ selector:@selector(timerFire:)
+ userInfo:nil
+ repeats:NO];
+ }
+ }
+}
+
+// This function is protected by channel dispatch queue.
+- (void)disconnect {
+ if (!_disconnected) {
+ if (self->_timer != nil) {
+ [self->_timer invalidate];
+ self->_timer = nil;
+ }
+ _disconnected = YES;
+ // Break retain loop
+ _destroyChannelCallback = nil;
+ }
+}
+
+// This function is protected by channel dispatch queue.
+- (void)timerFire:(NSTimer *)timer {
+ if (_disconnected || _timer == nil || _timer != timer) {
+ return;
+ }
+ _timer = nil;
+ _destroyChannelCallback();
+ // Break retain loop
+ _destroyChannelCallback = nil;
+ _disconnected = YES;
+}
+
+@end
+
@implementation GRPCChannel {
GRPCChannelConfiguration *_configuration;
grpc_channel *_unmanagedChannel;
+ GRPCChannelRef *_channelRef;
+ dispatch_queue_t _dispatchQueue;
}
- (grpc_call *)unmanagedCallWithPath:(NSString *)path
completionQueue:(nonnull GRPCCompletionQueue *)queue
callOptions:(GRPCCallOptions *)callOptions {
- NSString *serverAuthority = callOptions.serverAuthority;
- NSTimeInterval timeout = callOptions.timeout;
- GPR_ASSERT(timeout >= 0);
- grpc_slice host_slice = grpc_empty_slice();
- if (serverAuthority) {
- host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String);
- }
- grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String);
- gpr_timespec deadline_ms =
+ __block grpc_call *call = nil;
+ dispatch_sync(_dispatchQueue, ^{
+ if (self->_unmanagedChannel) {
+ NSString *serverAuthority = callOptions.serverAuthority;
+ NSTimeInterval timeout = callOptions.timeout;
+ GPR_ASSERT(timeout >= 0);
+ grpc_slice host_slice = grpc_empty_slice();
+ if (serverAuthority) {
+ host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String);
+ }
+ grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String);
+ gpr_timespec deadline_ms =
timeout == 0 ? gpr_inf_future(GPR_CLOCK_REALTIME)
- : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN));
- grpc_call *call = grpc_channel_create_call(
- _unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS, queue.unmanagedQueue, path_slice,
- serverAuthority ? &host_slice : NULL, deadline_ms, NULL);
- if (serverAuthority) {
- grpc_slice_unref(host_slice);
- }
- grpc_slice_unref(path_slice);
+ : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN));
+ call = grpc_channel_create_call(
+ self->_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS, queue.unmanagedQueue, path_slice,
+ serverAuthority ? &host_slice : NULL, deadline_ms, NULL);
+ if (serverAuthority) {
+ grpc_slice_unref(host_slice);
+ }
+ grpc_slice_unref(path_slice);
+ }
+ });
return call;
}
+- (void)unmanagedCallRef {
+ dispatch_async(_dispatchQueue, ^{
+ if (self->_unmanagedChannel) {
+ [self->_channelRef refChannel];
+ }
+ });
+}
+
- (void)unmanagedCallUnref {
- [gChannelPool unrefChannelWithConfiguration:_configuration];
+ dispatch_async(_dispatchQueue, ^{
+ if (self->_unmanagedChannel) {
+ [self->_channelRef unrefChannel];
+ }
+ });
+}
+
+- (void)disconnect {
+ dispatch_async(_dispatchQueue, ^{
+ if (self->_unmanagedChannel) {
+ grpc_channel_destroy(self->_unmanagedChannel);
+ self->_unmanagedChannel = nil;
+ [self->_channelRef disconnect];
+ }
+ });
+}
+
+- (void)destroyChannel {
+ dispatch_async(_dispatchQueue, ^{
+ if (self->_unmanagedChannel) {
+ grpc_channel_destroy(self->_unmanagedChannel);
+ self->_unmanagedChannel = nil;
+ [gChannelPool removeChannelWithConfiguration:self->_configuration];
+ }
+ });
}
- (nullable instancetype)initWithUnmanagedChannel:(nullable grpc_channel *)unmanagedChannel
@@ -72,12 +213,22 @@
if ((self = [super init])) {
_unmanagedChannel = unmanagedChannel;
_configuration = configuration;
+ _channelRef = [[GRPCChannelRef alloc] initWithDestroyDelay:kChannelDestroyDelay destroyChannelCallback:^{
+ [self destroyChannel];
+ }];
+ if (@available(iOS 8.0, *)) {
+ _dispatchQueue = dispatch_queue_create(NULL, dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, -1));
+ } else {
+ _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ }
}
return self;
}
- (void)dealloc {
- grpc_channel_destroy(_unmanagedChannel);
+ if (_unmanagedChannel) {
+ grpc_channel_destroy(_unmanagedChannel);
+ }
}
+ (nullable instancetype)createChannelWithConfiguration:(GRPCChannelConfiguration *)config {
@@ -88,7 +239,7 @@
NSDictionary *channelArgs;
if (config.callOptions.additionalChannelArgs.count != 0) {
- NSMutableDictionary *args = [config.channelArgs copy];
+ NSMutableDictionary *args = [config.channelArgs mutableCopy];
[args addEntriesFromDictionary:config.callOptions.additionalChannelArgs];
channelArgs = args;
} else {
@@ -115,15 +266,12 @@ static GRPCChannelPool *gChannelPool;
GRPCChannelConfiguration *channelConfig =
[[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions];
- return [gChannelPool channelWithConfiguration:channelConfig
- createChannelCallback:^{
- return
- [GRPCChannel createChannelWithConfiguration:channelConfig];
- }];
+
+ return [gChannelPool channelWithConfiguration:channelConfig];
}
+ (void)closeOpenConnections {
- [gChannelPool clear];
+ [gChannelPool removeAndCloseAllChannels];
}
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.h b/src/objective-c/GRPCClient/private/GRPCChannelPool.h
index 48c35eacb0..bd1350c15d 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannelPool.h
+++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.h
@@ -49,20 +49,20 @@ NS_ASSUME_NONNULL_BEGIN
- (instancetype)init;
-- (instancetype)initWithChannelDestroyDelay:(NSTimeInterval)channelDestroyDelay NS_DESIGNATED_INITIALIZER;
-
/**
* Return a channel with a particular configuration. If the channel does not exist, execute \a
* createChannel then add it in the pool. If the channel exists, increase its reference count.
*/
-- (GRPCChannel *)channelWithConfiguration:(GRPCChannelConfiguration *)configuration
- createChannelCallback:(GRPCChannel * (^)(void))createChannelCallback;
+- (GRPCChannel *)channelWithConfiguration:(GRPCChannelConfiguration *)configuration;
-/** Decrease a channel's refcount. */
-- (void)unrefChannelWithConfiguration:configuration;
+/** Remove a channel with particular configuration. */
+- (void)removeChannelWithConfiguration:(GRPCChannelConfiguration *)configuration;
/** Clear all channels in the pool. */
-- (void)clear;
+- (void)removeAllChannels;
+
+/** Clear all channels in the pool and destroy the channels. */
+- (void)removeAndCloseAllChannels;
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.m b/src/objective-c/GRPCClient/private/GRPCChannelPool.m
index c46b9ddfc8..7c0a8a8621 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannelPool.m
+++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.m
@@ -18,6 +18,7 @@
#import <Foundation/Foundation.h>
+#import "GRPCChannel.h"
#import "GRPCChannelFactory.h"
#import "GRPCChannelPool.h"
#import "GRPCConnectivityMonitor.h"
@@ -31,9 +32,6 @@
extern const char *kCFStreamVarName;
-// When all calls of a channel are destroyed, destroy the channel after this much seconds.
-const NSTimeInterval kChannelDestroyDelay = 30;
-
@implementation GRPCChannelConfiguration
- (nullable instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions {
@@ -216,109 +214,22 @@ const NSTimeInterval kChannelDestroyDelay = 30;
@end
-/**
- * Time the channel destroy when the channel's calls are unreffed. If there's new call, reset the
- * timer.
- */
-@interface GRPCChannelCallRef : NSObject
-
-- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)configuration
- destroyDelay:(NSTimeInterval)destroyDelay
- dispatchQueue:(dispatch_queue_t)dispatchQueue
- destroyChannel:(void (^)())destroyChannel;
-
-/** Add call ref count to the channel and maybe reset the timer. */
-- (void)refChannel;
-
-/** Reduce call ref count to the channel and maybe set the timer. */
-- (void)unrefChannel;
-
-@end
-
-@implementation GRPCChannelCallRef {
- GRPCChannelConfiguration *_configuration;
- NSTimeInterval _destroyDelay;
- // We use dispatch queue for this purpose since timer invalidation must happen on the same
- // thread which issued the timer.
- dispatch_queue_t _dispatchQueue;
- void (^_destroyChannel)();
-
- NSUInteger _refCount;
- NSTimer *_timer;
-}
-
-- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)configuration
- destroyDelay:(NSTimeInterval)destroyDelay
- dispatchQueue:(dispatch_queue_t)dispatchQueue
- destroyChannel:(void (^)())destroyChannel {
- if ((self = [super init])) {
- _configuration = configuration;
- _destroyDelay = destroyDelay;
- _dispatchQueue = dispatchQueue;
- _destroyChannel = destroyChannel;
-
- _refCount = 0;
- _timer = nil;
- }
- return self;
-}
-
-// This function is protected by channel pool dispatch queue.
-- (void)refChannel {
- _refCount++;
- if (_timer) {
- [_timer invalidate];
- }
- _timer = nil;
-}
-
-// This function is protected by channel spool dispatch queue.
-- (void)unrefChannel {
- self->_refCount--;
- if (self->_refCount == 0) {
- if (self->_timer) {
- [self->_timer invalidate];
- }
- self->_timer = [NSTimer scheduledTimerWithTimeInterval:self->_destroyDelay
- target:self
- selector:@selector(timerFire:)
- userInfo:nil
- repeats:NO];
- }
-}
-
-- (void)timerFire:(NSTimer *)timer {
- dispatch_sync(_dispatchQueue, ^{
- if (self->_timer == nil || self->_timer != timer) {
- return;
- }
- self->_timer = nil;
- self->_destroyChannel(self->_configuration);
- });
-}
-
-@end
-
#pragma mark GRPCChannelPool
@implementation GRPCChannelPool {
- NSTimeInterval _channelDestroyDelay;
NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannel *> *_channelPool;
- NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannelCallRef *> *_callRefs;
// Dedicated queue for timer
dispatch_queue_t _dispatchQueue;
}
- (instancetype)init {
- return [self initWithChannelDestroyDelay:kChannelDestroyDelay];
-}
-
-- (instancetype)initWithChannelDestroyDelay:(NSTimeInterval)channelDestroyDelay {
if ((self = [super init])) {
- _channelDestroyDelay = channelDestroyDelay;
_channelPool = [NSMutableDictionary dictionary];
- _callRefs = [NSMutableDictionary dictionary];
- _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ if (@available(iOS 8.0, *)) {
+ _dispatchQueue = dispatch_queue_create(NULL, dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, -1));
+ } else {
+ _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ }
// Connectivity monitor is not required for CFStream
char *enableCFStream = getenv(kCFStreamVarName);
@@ -337,49 +248,43 @@ const NSTimeInterval kChannelDestroyDelay = 30;
}
}
-- (GRPCChannel *)channelWithConfiguration:(GRPCChannelConfiguration *)configuration
- createChannelCallback:(GRPCChannel * (^)(void))createChannelCallback {
+- (GRPCChannel *)channelWithConfiguration:(GRPCChannelConfiguration *)configuration {
__block GRPCChannel *channel;
dispatch_sync(_dispatchQueue, ^{
if ([self->_channelPool objectForKey:configuration]) {
- [self->_callRefs[configuration] refChannel];
channel = self->_channelPool[configuration];
+ [channel unmanagedCallRef];
} else {
- channel = createChannelCallback();
+ channel = [GRPCChannel createChannelWithConfiguration:configuration];
self->_channelPool[configuration] = channel;
-
- GRPCChannelCallRef *callRef = [[GRPCChannelCallRef alloc]
- initWithChannelConfiguration:configuration
- destroyDelay:self->_channelDestroyDelay
- dispatchQueue:self->_dispatchQueue
- destroyChannel:^(GRPCChannelConfiguration *configuration) {
- [self->_channelPool removeObjectForKey:configuration];
- [self->_callRefs removeObjectForKey:configuration];
- }];
- [callRef refChannel];
- self->_callRefs[configuration] = callRef;
}
});
return channel;
}
-- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration {
+- (void)removeChannelWithConfiguration:(GRPCChannelConfiguration *)configuration {
+ dispatch_async(_dispatchQueue, ^{
+ [self->_channelPool removeObjectForKey:configuration];
+ });
+}
+
+- (void)removeAllChannels {
dispatch_sync(_dispatchQueue, ^{
- if ([self->_channelPool objectForKey:configuration]) {
- [self->_callRefs[configuration] unrefChannel];
- }
+ self->_channelPool = [NSMutableDictionary dictionary];
});
}
-- (void)clear {
+- (void)removeAndCloseAllChannels {
dispatch_sync(_dispatchQueue, ^{
+ [self->_channelPool enumerateKeysAndObjectsUsingBlock:^(GRPCChannelConfiguration * _Nonnull key, GRPCChannel * _Nonnull obj, BOOL * _Nonnull stop) {
+ [obj disconnect];
+ }];
self->_channelPool = [NSMutableDictionary dictionary];
- self->_callRefs = [NSMutableDictionary dictionary];
});
}
- (void)connectivityChange:(NSNotification *)note {
- [self clear];
+ [self removeAndCloseAllChannels];
}
@end