aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m15
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannelPool.h35
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannelPool.m276
-rw-r--r--src/objective-c/GRPCClient/private/GRPCWrappedCall.h13
-rw-r--r--src/objective-c/GRPCClient/private/GRPCWrappedCall.m117
-rw-r--r--src/objective-c/tests/ChannelTests/ChannelPoolTest.m146
-rw-r--r--src/objective-c/tests/ChannelTests/ChannelTests.m152
-rw-r--r--src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/ChannelTests.xcscheme5
8 files changed, 306 insertions, 453 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index bf9441c27e..dad8594a26 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -36,6 +36,8 @@
#import "private/NSDictionary+GRPC.h"
#import "private/NSError+GRPC.h"
#import "private/utilities.h"
+#import "private/GRPCChannelPool.h"
+#import "private/GRPCCompletionQueue.h"
// At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA,
// SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE,
@@ -819,8 +821,11 @@ const char *kCFStreamVarName = "grpc_cfstream";
_responseWriteable =
[[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
- GRPCWrappedCall *wrappedCall =
- [[GRPCWrappedCall alloc] initWithHost:_host path:_path callOptions:_callOptions];
+ GRPCPooledChannel *channel = [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
+ GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:_path
+ completionQueue:[GRPCCompletionQueue completionQueue]
+ callOptions:_callOptions];
+
if (wrappedCall == nil) {
[self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeUnavailable
@@ -837,12 +842,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
[self sendHeaders];
[self invokeCall];
-
- // Connectivity monitor is not required for CFStream
- char *enableCFStream = getenv(kCFStreamVarName);
- if (enableCFStream == nil || enableCFStream[0] != '1') {
- [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
- }
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.h b/src/objective-c/GRPCClient/private/GRPCChannelPool.h
index 7f8ee19fe5..338b6e440f 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannelPool.h
+++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.h
@@ -32,10 +32,13 @@ NS_ASSUME_NONNULL_BEGIN
@class GRPCChannelPool;
@class GRPCCompletionQueue;
@class GRPCChannelConfiguration;
+@class GRPCWrappedCall;
/**
- * Channel proxy that can be retained and automatically reestablish connection when the channel is
- * disconnected.
+ * A proxied channel object that can be retained and creates GRPCWrappedCall object from. If a
+ * raw channel is not present (i.e. no tcp connection to the server) when a GRPCWrappedCall object
+ * is requested, it issues a connection/reconnection. The behavior of this object is to mimic that
+ * of gRPC core's channel object.
*/
@interface GRPCPooledChannel : NSObject
@@ -47,24 +50,21 @@ NS_ASSUME_NONNULL_BEGIN
* Initialize with an actual channel object \a channel and a reference to the channel pool.
*/
- (nullable instancetype)initWithChannelConfiguration:
- (GRPCChannelConfiguration *)channelConfiguration
- channelPool:(GRPCChannelPool *)channelPool
- NS_DESIGNATED_INITIALIZER;
+ (GRPCChannelConfiguration *)channelConfiguration;
/**
- * Create a grpc core call object (grpc_call) from this channel. If channel is disconnected, get a
+ * Create a GRPCWrappedCall object (grpc_call) from this channel. If channel is disconnected, get a
* new channel object from the channel pool.
*/
-- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path
- completionQueue:(GRPCCompletionQueue *)queue
- callOptions:(GRPCCallOptions *)callOptions;
+- (nullable GRPCWrappedCall *)wrappedCallWithPath:(NSString *)path
+ completionQueue:(GRPCCompletionQueue *)queue
+ callOptions:(GRPCCallOptions *)callOptions;
/**
- * Return ownership and destroy the grpc_call object created by
- * \a unmanagedCallWithPath:completionQueue:callOptions: and decrease channel refcount. If refcount
- * of the channel becomes 0, return the channel object to channel pool.
+ * Notify the pooled channel that a wrapped call object is no longer referenced and will be
+ * dealloc'ed.
*/
-- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall;
+- (void)notifyWrappedCallDealloc:(GRPCWrappedCall *)wrappedCall;
/**
* Force the channel to disconnect immediately. Subsequent calls to unmanagedCallWithPath: will
@@ -78,6 +78,13 @@ NS_ASSUME_NONNULL_BEGIN
@interface GRPCPooledChannel (Test)
/**
+ * Initialize a pooled channel with non-default destroy delay for testing purpose.
+ */
+- (nullable instancetype)initWithChannelConfiguration:
+(GRPCChannelConfiguration *)channelConfiguration
+ destroyDelay:(NSTimeInterval)destroyDelay;
+
+/**
* Return the pointer to the raw channel wrapped.
*/
@property(atomic, readonly) GRPCChannel *wrappedChannel;
@@ -118,7 +125,7 @@ NS_ASSUME_NONNULL_BEGIN
* Get an instance of pool isolated from the global shared pool with channels' destroy delay being
* \a destroyDelay.
*/
-- (nullable instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay;
+- (nullable instancetype)initTestPool;
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.m b/src/objective-c/GRPCClient/private/GRPCChannelPool.m
index 646f1e4b86..488766c0ed 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannelPool.m
+++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.m
@@ -28,6 +28,8 @@
#import "GRPCSecureChannelFactory.h"
#import "utilities.h"
#import "version.h"
+#import "GRPCWrappedCall.h"
+#import "GRPCCompletionQueue.h"
#import <GRPCClient/GRPCCall+Cronet.h>
#include <grpc/support/log.h>
@@ -40,103 +42,139 @@ static dispatch_once_t gInitChannelPool;
/** When all calls of a channel are destroyed, destroy the channel after this much seconds. */
static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
-@interface GRPCChannelPool ()
-
-- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration;
-
-- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration;
-
-@end
-
@implementation GRPCPooledChannel {
- __weak GRPCChannelPool *_channelPool;
GRPCChannelConfiguration *_channelConfiguration;
- NSMutableSet *_unmanagedCalls;
+ NSTimeInterval _destroyDelay;
+
+ NSHashTable<GRPCWrappedCall *> *_wrappedCalls;
GRPCChannel *_wrappedChannel;
+ NSDate *_lastTimedDestroy;
+ dispatch_queue_t _timerQueue;
}
-- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration
- channelPool:(GRPCChannelPool *)channelPool {
- NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty.");
- NSAssert(channelPool != nil, @"channelPool cannot be empty.");
- if (channelPool == nil || channelConfiguration == nil) {
- return nil;
- }
-
- if ((self = [super init])) {
- _channelPool = channelPool;
- _channelConfiguration = channelConfiguration;
- _unmanagedCalls = [NSMutableSet set];
- _wrappedChannel = nil;
- }
-
- return self;
+- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration {
+ return [self initWithChannelConfiguration:channelConfiguration destroyDelay:kDefaultChannelDestroyDelay];
}
- (void)dealloc {
- NSAssert([_unmanagedCalls count] == 0 && _wrappedChannel == nil,
- @"Pooled channel should only be"
- "destroyed after the wrapped channel is destroyed");
+ if ([_wrappedCalls objectEnumerator].allObjects.count != 0) {
+ NSEnumerator *enumerator = [_wrappedCalls objectEnumerator];
+ GRPCWrappedCall *wrappedCall;
+ while ((wrappedCall = [enumerator nextObject])) {
+ [wrappedCall channelDisconnected];
+ };
+ }
}
-- (grpc_call *)unmanagedCallWithPath:(NSString *)path
- completionQueue:(GRPCCompletionQueue *)queue
- callOptions:(GRPCCallOptions *)callOptions {
+- (GRPCWrappedCall *)wrappedCallWithPath:(NSString *)path
+completionQueue:(GRPCCompletionQueue *)queue
+callOptions:(GRPCCallOptions *)callOptions {
NSAssert(path.length > 0, @"path must not be empty.");
NSAssert(queue != nil, @"completionQueue must not be empty.");
NSAssert(callOptions, @"callOptions must not be empty.");
if (path.length == 0 || queue == nil || callOptions == nil) return NULL;
- grpc_call *call = NULL;
+ GRPCWrappedCall *call = nil;
+
@synchronized(self) {
if (_wrappedChannel == nil) {
- __strong GRPCChannelPool *strongPool = _channelPool;
- if (strongPool) {
- _wrappedChannel = [strongPool refChannelWithConfiguration:_channelConfiguration];
+ _wrappedChannel = [[GRPCChannel alloc] initWithChannelConfiguration:_channelConfiguration];
+ if (_wrappedChannel == nil) {
+ NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy.");
+ return nil;
}
- NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy.");
}
- call =
- [_wrappedChannel unmanagedCallWithPath:path completionQueue:queue callOptions:callOptions];
- if (call != NULL) {
- [_unmanagedCalls addObject:[NSValue valueWithPointer:call]];
+ _lastTimedDestroy = nil;
+
+ grpc_call *unmanagedCall = [_wrappedChannel unmanagedCallWithPath:path
+ completionQueue:[GRPCCompletionQueue completionQueue]
+ callOptions:callOptions];
+ if (unmanagedCall == NULL) {
+ NSAssert(unmanagedCall != NULL, @"Unable to create grpc_call object");
+ return nil;
+ }
+
+ call = [[GRPCWrappedCall alloc] initWithUnmanagedCall:unmanagedCall pooledChannel:self];
+ if (call == nil) {
+ NSAssert(call != nil, @"Unable to create GRPCWrappedCall object");
+ return nil;
}
+
+ [_wrappedCalls addObject:call];
}
return call;
}
-- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall {
- if (unmanagedCall == NULL) {
+- (void)notifyWrappedCallDealloc:(GRPCWrappedCall *)wrappedCall {
+ NSAssert(wrappedCall != nil, @"wrappedCall cannot be empty.");
+ if (wrappedCall == nil) {
return;
}
-
- grpc_call_unref(unmanagedCall);
@synchronized(self) {
- NSValue *removedCall = [NSValue valueWithPointer:unmanagedCall];
- [_unmanagedCalls removeObject:removedCall];
- if ([_unmanagedCalls count] == 0) {
- _wrappedChannel = nil;
- GRPCChannelPool *strongPool = _channelPool;
- [strongPool unrefChannelWithConfiguration:_channelConfiguration];
+ if ([_wrappedCalls objectEnumerator].allObjects.count == 0) {
+ NSDate *now = [NSDate date];
+ _lastTimedDestroy = now;
+ dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)_destroyDelay * NSEC_PER_SEC),
+ _timerQueue, ^{
+ @synchronized(self) {
+ if (self->_lastTimedDestroy == now) {
+ self->_wrappedChannel = nil;
+ self->_lastTimedDestroy = nil;
+ }
+ }
+ });
}
}
}
- (void)disconnect {
+ NSHashTable<GRPCWrappedCall *> *copiedWrappedCalls = nil;
@synchronized(self) {
if (_wrappedChannel != nil) {
_wrappedChannel = nil;
- [_unmanagedCalls removeAllObjects];
- GRPCChannelPool *strongPool = _channelPool;
- [strongPool unrefChannelWithConfiguration:_channelConfiguration];
+ copiedWrappedCalls = [_wrappedCalls copy];
+ [_wrappedCalls removeAllObjects];
}
}
+ NSEnumerator *enumerator = [copiedWrappedCalls objectEnumerator];
+ GRPCWrappedCall *wrappedCall;
+ while ((wrappedCall = [enumerator nextObject])) {
+ [wrappedCall channelDisconnected];
+ }
}
@end
@implementation GRPCPooledChannel (Test)
+- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration
+ destroyDelay:(NSTimeInterval)destroyDelay {
+ NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty.");
+ if (channelConfiguration == nil) {
+ return nil;
+ }
+
+ if ((self = [super init])) {
+ _channelConfiguration = channelConfiguration;
+ _destroyDelay = destroyDelay;
+ _wrappedCalls = [[NSHashTable alloc] initWithOptions:NSHashTableWeakMemory capacity:1];
+ _wrappedChannel = nil;
+ _lastTimedDestroy = nil;
+#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
+ if (@available(iOS 8.0, macOS 10.10, *)) {
+ _timerQueue = dispatch_queue_create(NULL,
+ dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
+ } else {
+#else
+ {
+#endif
+ _timerQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+ }
+ }
+
+ return self;
+}
+
- (GRPCChannel *)wrappedChannel {
GRPCChannel *channel = nil;
@synchronized(self) {
@@ -147,65 +185,28 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
@end
-/**
- * A convenience value type for cached channel.
- */
-@interface GRPCChannelRecord : NSObject
-
-/** Pointer to the raw channel. May be nil when the channel has been destroyed. */
-@property GRPCChannel *channel;
-
-/** Channel proxy corresponding to this channel configuration. */
-@property GRPCPooledChannel *pooledChannel;
-
-/** Last time when a timed destroy is initiated on the channel. */
-@property NSDate *timedDestroyDate;
-
-/** Reference count of the proxy to the channel. */
-@property NSUInteger refCount;
-
-@end
-
-@implementation GRPCChannelRecord
-
-@end
-
@interface GRPCChannelPool ()
-- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay NS_DESIGNATED_INITIALIZER;
+- (instancetype)initInstance NS_DESIGNATED_INITIALIZER;
@end
@implementation GRPCChannelPool {
- NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannelRecord *> *_channelPool;
- dispatch_queue_t _dispatchQueue;
- NSTimeInterval _destroyDelay;
+ NSMutableDictionary<GRPCChannelConfiguration *, GRPCPooledChannel *> *_channelPool;
}
+ (instancetype)sharedInstance {
dispatch_once(&gInitChannelPool, ^{
gChannelPool =
- [[GRPCChannelPool alloc] initInstanceWithDestroyDelay:kDefaultChannelDestroyDelay];
+ [[GRPCChannelPool alloc] initInstance];
NSAssert(gChannelPool != nil, @"Cannot initialize global channel pool.");
});
return gChannelPool;
}
-- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay {
+- (instancetype)initInstance {
if ((self = [super init])) {
_channelPool = [NSMutableDictionary dictionary];
-#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
- if (@available(iOS 8.0, macOS 10.10, *)) {
- _dispatchQueue = dispatch_queue_create(
- NULL,
- dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
- } else {
-#else
- {
-#endif
- _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
- }
- _destroyDelay = destroyDelay;
// Connectivity monitor is not required for CFStream
char *enableCFStream = getenv(kCFStreamVarName);
@@ -231,86 +232,23 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
GRPCChannelConfiguration *configuration =
[[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions];
@synchronized(self) {
- GRPCChannelRecord *record = _channelPool[configuration];
- if (record == nil) {
- record = [[GRPCChannelRecord alloc] init];
- record.pooledChannel =
- [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration channelPool:self];
- _channelPool[configuration] = record;
- pooledChannel = record.pooledChannel;
- } else {
- pooledChannel = record.pooledChannel;
+ pooledChannel = _channelPool[configuration];
+ if (pooledChannel == nil) {
+ pooledChannel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration];
+ _channelPool[configuration] = pooledChannel;
}
}
return pooledChannel;
}
-- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration {
- GRPCChannel *ret = nil;
- @synchronized(self) {
- NSAssert(configuration != nil, @"configuration cannot be empty.");
- if (configuration == nil) {
- return nil;
- }
-
- GRPCChannelRecord *record = _channelPool[configuration];
- NSAssert(record != nil, @"No record corresponding to a proxy.");
- if (record == nil) {
- return nil;
- }
-
- record.refCount++;
- record.timedDestroyDate = nil;
- if (record.channel == nil) {
- // Channel is already destroyed;
- record.channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration];
- }
- ret = record.channel;
- }
- return ret;
-}
-
-- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration {
- @synchronized(self) {
- GRPCChannelRecord *record = _channelPool[configuration];
- NSAssert(record != nil, @"No record corresponding to a proxy.");
- if (record == nil) {
- return;
- }
- NSAssert(record.refCount > 0, @"Inconsistent channel refcount.");
- if (record.refCount > 0) {
- record.refCount--;
- if (record.refCount == 0) {
- NSDate *now = [NSDate date];
- record.timedDestroyDate = now;
- dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(_destroyDelay * NSEC_PER_SEC)),
- _dispatchQueue, ^{
- @synchronized(self) {
- if (now == record.timedDestroyDate) {
- // Destroy the raw channel and reset related records.
- record.timedDestroyDate = nil;
- record.channel = nil;
- }
- }
- });
- }
- }
- }
-}
-
- (void)disconnectAllChannels {
- NSMutableSet<GRPCPooledChannel *> *proxySet = [NSMutableSet set];
+ NSDictionary *copiedPooledChannels;
@synchronized(self) {
- [_channelPool
- enumerateKeysAndObjectsUsingBlock:^(GRPCChannelConfiguration *_Nonnull key,
- GRPCChannelRecord *_Nonnull obj, BOOL *_Nonnull stop) {
- obj.channel = nil;
- obj.timedDestroyDate = nil;
- [proxySet addObject:obj.pooledChannel];
- }];
+ copiedPooledChannels = [NSDictionary dictionaryWithDictionary:_channelPool];
}
- // Disconnect proxies
- [proxySet enumerateObjectsUsingBlock:^(GRPCPooledChannel *_Nonnull obj, BOOL *_Nonnull stop) {
+
+ // Disconnect pooled channels.
+ [copiedPooledChannels enumerateKeysAndObjectsUsingBlock:^(id _Nonnull key, id _Nonnull obj, BOOL * _Nonnull stop) {
[obj disconnect];
}];
}
@@ -323,8 +261,8 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
@implementation GRPCChannelPool (Test)
-- (instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay {
- return [self initInstanceWithDestroyDelay:destroyDelay];
+- (instancetype)initTestPool {
+ return [self initInstance];
}
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h
index 19aa5367c7..0432190528 100644
--- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h
+++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h
@@ -71,11 +71,16 @@
#pragma mark GRPCWrappedCall
+@class GRPCPooledChannel;
+
@interface GRPCWrappedCall : NSObject
-- (instancetype)initWithHost:(NSString *)host
- path:(NSString *)path
- callOptions:(GRPCCallOptions *)callOptions NS_DESIGNATED_INITIALIZER;
+- (instancetype)init NS_UNAVAILABLE;
+
++ (instancetype)new NS_UNAVAILABLE;
+
+- (instancetype)initWithUnmanagedCall:(grpc_call *)unmanagedCall
+ pooledChannel:(GRPCPooledChannel *)pooledChannel NS_DESIGNATED_INITIALIZER;
- (void)startBatchWithOperations:(NSArray *)ops errorHandler:(void (^)(void))errorHandler;
@@ -83,4 +88,6 @@
- (void)cancel;
+- (void)channelDisconnected;
+
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
index 5788d0a003..7e2d9d3c6d 100644
--- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
+++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
@@ -237,37 +237,21 @@
#pragma mark GRPCWrappedCall
@implementation GRPCWrappedCall {
- GRPCCompletionQueue *_queue;
- GRPCPooledChannel *_channel;
+ __weak GRPCPooledChannel *_channel;
grpc_call *_call;
}
-- (instancetype)init {
- return [self initWithHost:nil path:nil callOptions:[[GRPCCallOptions alloc] init]];
-}
-
-- (instancetype)initWithHost:(NSString *)host
- path:(NSString *)path
- callOptions:(GRPCCallOptions *)callOptions {
- NSAssert(host.length != 0 && path.length != 0, @"path and host cannot be nil.");
+- (instancetype)initWithUnmanagedCall:(grpc_call *)unmanagedCall
+ pooledChannel:(GRPCPooledChannel *)pooledChannel {
+ NSAssert(unmanagedCall != NULL, @"unmanagedCall cannot be empty.");
+ NSAssert(pooledChannel != nil, @"pooledChannel cannot be empty.");
+ if (unmanagedCall == NULL || pooledChannel == nil) {
+ return nil;
+ }
if ((self = [super init])) {
- // Each completion queue consumes one thread. There's a trade to be made between creating and
- // consuming too many threads and having contention of multiple calls in a single completion
- // queue. Currently we use a singleton queue.
- _queue = [GRPCCompletionQueue completionQueue];
- _channel = [[GRPCChannelPool sharedInstance] channelWithHost:host callOptions:callOptions];
- if (_channel == nil) {
- NSAssert(_channel != nil, @"Failed to get a channel for the host.");
- NSLog(@"Failed to get a channel for the host.");
- return nil;
- }
- _call = [_channel unmanagedCallWithPath:path completionQueue:_queue callOptions:callOptions];
- if (_call == nil) {
- NSAssert(_channel != nil, @"Failed to get a channel for the host.");
- NSLog(@"Failed to create a call.");
- return nil;
- }
+ _call = unmanagedCall;
+ _channel = pooledChannel;
}
return self;
}
@@ -283,42 +267,67 @@
[GRPCOpBatchLog addOpBatchToLog:operations];
#endif
- size_t nops = operations.count;
- grpc_op *ops_array = gpr_malloc(nops * sizeof(grpc_op));
- size_t i = 0;
- for (GRPCOperation *operation in operations) {
- ops_array[i++] = operation.op;
- }
- grpc_call_error error =
- grpc_call_start_batch(_call, ops_array, nops, (__bridge_retained void *)(^(bool success) {
- if (!success) {
- if (errorHandler) {
- errorHandler();
- } else {
- return;
- }
- }
- for (GRPCOperation *operation in operations) {
- [operation finish];
- }
- }),
- NULL);
- gpr_free(ops_array);
-
- if (error != GRPC_CALL_OK) {
- [NSException
+ @synchronized (self) {
+ if (_call != NULL) {
+ size_t nops = operations.count;
+ grpc_op *ops_array = gpr_malloc(nops * sizeof(grpc_op));
+ size_t i = 0;
+ for (GRPCOperation *operation in operations) {
+ ops_array[i++] = operation.op;
+ }
+ grpc_call_error error;
+ error = grpc_call_start_batch(_call, ops_array, nops, (__bridge_retained void *)(^(bool success) {
+ if (!success) {
+ if (errorHandler) {
+ errorHandler();
+ } else {
+ return;
+ }
+ }
+ for (GRPCOperation *operation in operations) {
+ [operation finish];
+ }
+ }),
+ NULL);
+ gpr_free(ops_array);
+
+ if (error != GRPC_CALL_OK) {
+ [NSException
raise:NSInternalInconsistencyException
- format:@"A precondition for calling grpc_call_start_batch wasn't met. Error %i", error];
+ format:@"A precondition for calling grpc_call_start_batch wasn't met. Error %i", error];
+ }
+ }
}
}
- (void)cancel {
- grpc_call_cancel(_call, NULL);
+ @synchronized (self) {
+ if (_call != NULL) {
+ grpc_call_cancel(_call, NULL);
+ }
+ }
+}
+
+- (void)channelDisconnected {
+ @synchronized (self) {
+ if (_call != NULL) {
+ grpc_call_unref(_call);
+ _call = NULL;
+ }
+ }
}
- (void)dealloc {
- [_channel destroyUnmanagedCall:_call];
- _channel = nil;
+ @synchronized (self) {
+ if (_call != NULL) {
+ grpc_call_unref(_call);
+ _call = NULL;
+ }
+ }
+ __strong GRPCPooledChannel *channel = _channel;
+ if (channel != nil) {
+ [channel notifyWrappedCallDealloc:self];
+ }
}
@end
diff --git a/src/objective-c/tests/ChannelTests/ChannelPoolTest.m b/src/objective-c/tests/ChannelTests/ChannelPoolTest.m
index 9461945560..dc42d7c341 100644
--- a/src/objective-c/tests/ChannelTests/ChannelPoolTest.m
+++ b/src/objective-c/tests/ChannelTests/ChannelPoolTest.m
@@ -24,11 +24,9 @@
#define TEST_TIMEOUT 32
-NSString *kDummyHost = @"dummy.host";
-NSString *kDummyHost2 = @"dummy.host.2";
-NSString *kDummyPath = @"/dummy/path";
-
-const NSTimeInterval kDestroyDelay = 1.0;
+static NSString *kDummyHost = @"dummy.host";
+static NSString *kDummyHost2 = @"dummy.host.2";
+static NSString *kDummyPath = @"/dummy/path";
@interface ChannelPoolTest : XCTestCase
@@ -40,134 +38,26 @@ const NSTimeInterval kDestroyDelay = 1.0;
grpc_init();
}
-- (void)testCreateChannelAndCall {
- GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
- GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
- GRPCPooledChannel *channel =
- (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
- XCTAssertNil(channel.wrappedChannel);
- GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
- grpc_call *call =
- [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
- XCTAssert(call != NULL);
- XCTAssertNotNil(channel.wrappedChannel);
- [channel destroyUnmanagedCall:call];
- XCTAssertNil(channel.wrappedChannel);
-}
-
-- (void)testCacheChannel {
- GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
+- (void)testCreateAndCacheChannel {
+ GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPool];
GRPCCallOptions *options1 = [[GRPCCallOptions alloc] init];
GRPCCallOptions *options2 = [options1 copy];
GRPCMutableCallOptions *options3 = [options1 mutableCopy];
options3.transportType = GRPCTransportTypeInsecure;
- GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
- GRPCPooledChannel *channel1 =
- (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options1];
- grpc_call *call1 =
- [channel1 unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options1];
- GRPCPooledChannel *channel2 =
- (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options2];
- grpc_call *call2 =
- [channel2 unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options2];
- GRPCPooledChannel *channel3 =
- (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options3];
- grpc_call *call3 =
- [channel3 unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options3];
- GRPCPooledChannel *channel4 =
- (GRPCPooledChannel *)[pool channelWithHost:kDummyHost2 callOptions:options1];
- grpc_call *call4 =
- [channel4 unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options1];
- XCTAssertEqual(channel1.wrappedChannel, channel2.wrappedChannel);
- XCTAssertNotEqual(channel1.wrappedChannel, channel3.wrappedChannel);
- XCTAssertNotEqual(channel1.wrappedChannel, channel4.wrappedChannel);
- XCTAssertNotEqual(channel3.wrappedChannel, channel4.wrappedChannel);
- [channel1 destroyUnmanagedCall:call1];
- [channel2 destroyUnmanagedCall:call2];
- [channel3 destroyUnmanagedCall:call3];
- [channel4 destroyUnmanagedCall:call4];
-}
-
-- (void)testTimedDestroyChannel {
- GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
- GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
- GRPCPooledChannel *channel =
- (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
- GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
- grpc_call *call =
- [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
- GRPCChannel *wrappedChannel = channel.wrappedChannel;
-
- [channel destroyUnmanagedCall:call];
- // Confirm channel is not destroyed at this time
- call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
- XCTAssertEqual(wrappedChannel, channel.wrappedChannel);
-
- [channel destroyUnmanagedCall:call];
- sleep(kDestroyDelay + 1);
- // Confirm channel is new at this time
- call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
- XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel);
-
- // Confirm the new channel can create call
- XCTAssert(call != NULL);
- [channel destroyUnmanagedCall:call];
-}
-
-- (void)testPoolDisconnection {
- GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
- GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
- GRPCPooledChannel *channel =
- (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
- GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
- grpc_call *call =
- [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
- XCTAssertNotNil(channel.wrappedChannel);
- GRPCChannel *wrappedChannel = channel.wrappedChannel;
-
- // Test a new channel is created by requesting a channel from pool
- [pool disconnectAllChannels];
- channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
- call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
- XCTAssertNotNil(channel.wrappedChannel);
- XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel);
- wrappedChannel = channel.wrappedChannel;
-
- // Test a new channel is created by requesting a new call from the previous proxy
- [pool disconnectAllChannels];
- grpc_call *call2 =
- [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
- XCTAssertNotNil(channel.wrappedChannel);
- XCTAssertNotEqual(channel.wrappedChannel, wrappedChannel);
- [channel destroyUnmanagedCall:call];
- [channel destroyUnmanagedCall:call2];
-}
-
-- (void)testUnrefCallFromStaleChannel {
- GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
- GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
- GRPCPooledChannel *channel =
- (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
- GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
- grpc_call *call =
- [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
-
- [pool disconnectAllChannels];
- channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
- grpc_call *call2 =
- [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
- // Test unref the call of a stale channel will not cause the current channel going into timed
- // destroy state
- XCTAssertNotNil(channel.wrappedChannel);
- GRPCChannel *wrappedChannel = channel.wrappedChannel;
- [channel destroyUnmanagedCall:call];
- XCTAssertNotNil(channel.wrappedChannel);
- XCTAssertEqual(wrappedChannel, channel.wrappedChannel);
- // Test unref the call of the current channel will cause the channel going into timed destroy
- // state
- [channel destroyUnmanagedCall:call2];
- XCTAssertNil(channel.wrappedChannel);
+ GRPCPooledChannel *channel1 = [pool channelWithHost:kDummyHost callOptions:options1];
+ GRPCPooledChannel *channel2 = [pool channelWithHost:kDummyHost callOptions:options2];
+ GRPCPooledChannel *channel3 = [pool channelWithHost:kDummyHost callOptions:options3];
+ GRPCPooledChannel *channel4 = [pool channelWithHost:kDummyHost2 callOptions:options1];
+
+ XCTAssertNotNil(channel1);
+ XCTAssertNotNil(channel2);
+ XCTAssertNotNil(channel3);
+ XCTAssertNotNil(channel4);
+ XCTAssertEqual(channel1, channel2);
+ XCTAssertNotEqual(channel1, channel3);
+ XCTAssertNotEqual(channel1, channel4);
+ XCTAssertNotEqual(channel3, channel4);
}
@end
diff --git a/src/objective-c/tests/ChannelTests/ChannelTests.m b/src/objective-c/tests/ChannelTests/ChannelTests.m
index 5547449092..ee7f8b6fdd 100644
--- a/src/objective-c/tests/ChannelTests/ChannelTests.m
+++ b/src/objective-c/tests/ChannelTests/ChannelTests.m
@@ -22,91 +22,99 @@
#import "../../GRPCClient/private/GRPCChannel.h"
#import "../../GRPCClient/private/GRPCChannelPool.h"
#import "../../GRPCClient/private/GRPCCompletionQueue.h"
+#import "../../GRPCClient/private/GRPCWrappedCall.h"
-/*
-#define TEST_TIMEOUT 8
-
-@interface GRPCChannelFake : NSObject
-
-- (instancetype)initWithCreateExpectation:(XCTestExpectation *)createExpectation
- unrefExpectation:(XCTestExpectation *)unrefExpectation;
+static NSString *kDummyHost = @"dummy.host";
+static NSString *kDummyPath = @"/dummy/path";
-- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path
- completionQueue:(GRPCCompletionQueue *)queue
- callOptions:(GRPCCallOptions *)callOptions;
-
-- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall;
+@interface ChannelTests : XCTestCase
@end
-@implementation GRPCChannelFake {
- __weak XCTestExpectation *_createExpectation;
- __weak XCTestExpectation *_unrefExpectation;
- long _grpcCallCounter;
-}
-
-- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration
-*)channelConfiguration { return nil;
-}
-
-- (instancetype)initWithCreateExpectation:(XCTestExpectation *)createExpectation
- unrefExpectation:(XCTestExpectation *)unrefExpectation {
- if ((self = [super init])) {
- _createExpectation = createExpectation;
- _unrefExpectation = unrefExpectation;
- _grpcCallCounter = 0;
- }
- return self;
-}
+@implementation ChannelTests
-- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path
- completionQueue:(GRPCCompletionQueue *)queue
- callOptions:(GRPCCallOptions *)callOptions {
- if (_createExpectation) [_createExpectation fulfill];
- return (grpc_call *)(++_grpcCallCounter);
++ (void)setUp {
+ grpc_init();
}
-- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall {
- if (_unrefExpectation) [_unrefExpectation fulfill];
+- (void)testPooledChannelCreatingChannel {
+ GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
+ GRPCChannelConfiguration *config = [[GRPCChannelConfiguration alloc] initWithHost:kDummyHost
+ callOptions:options];
+ GRPCPooledChannel *channel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:config];
+ GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
+ GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:kDummyPath
+ completionQueue:cq
+ callOptions:options];
+ XCTAssertNotNil(channel.wrappedChannel);
+ (void)wrappedCall;
}
-@end
-
-@interface GRPCChannelPoolFake : NSObject
-
-- (instancetype)initWithDelayedDestroyExpectation:(XCTestExpectation *)delayedDestroyExpectation;
-
-- (GRPCChannel *)rawChannelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions;
-
-- (void)delayedDestroyChannel;
-
-@end
-
-@implementation GRPCChannelPoolFake {
- __weak XCTestExpectation *_delayedDestroyExpectation;
-}
-
-- (instancetype)initWithDelayedDestroyExpectation:(XCTestExpectation *)delayedDestroyExpectation {
- if ((self = [super init])) {
- _delayedDestroyExpectation = delayedDestroyExpectation;
+- (void)testTimedDestroyChannel {
+ const NSTimeInterval kDestroyDelay = 1.0;
+ GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
+ GRPCChannelConfiguration *config = [[GRPCChannelConfiguration alloc] initWithHost:kDummyHost
+ callOptions:options];
+ GRPCPooledChannel *channel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:config
+ destroyDelay:kDestroyDelay];
+ GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
+ GRPCWrappedCall *wrappedCall;
+ GRPCChannel *wrappedChannel;
+ @autoreleasepool {
+ wrappedCall = [channel wrappedCallWithPath:kDummyPath
+ completionQueue:cq
+ callOptions:options];
+ XCTAssertNotNil(channel.wrappedChannel);
+
+ // Unref and ref channel immediately; expect using the same raw channel.
+ wrappedChannel = channel.wrappedChannel;
+
+ wrappedCall = nil;
+ wrappedCall = [channel wrappedCallWithPath:kDummyPath
+ completionQueue:cq
+ callOptions:options];
+ XCTAssertEqual(channel.wrappedChannel, wrappedChannel);
+
+ // Unref and ref channel after destroy delay; expect a new raw channel.
+ wrappedCall = nil;
}
- return self;
-}
-
-- (void)delayedDestroyChannel {
- if (_delayedDestroyExpectation) [_delayedDestroyExpectation fulfill];
+ sleep(kDestroyDelay + 1);
+ XCTAssertNil(channel.wrappedChannel);
+ wrappedCall = [channel wrappedCallWithPath:kDummyPath
+ completionQueue:cq
+ callOptions:options];
+ XCTAssertNotEqual(channel.wrappedChannel, wrappedChannel);
}
-@end */
-
-@interface ChannelTests : XCTestCase
-
-@end
-
-@implementation ChannelTests
-
-+ (void)setUp {
- grpc_init();
+- (void)testDisconnect {
+ const NSTimeInterval kDestroyDelay = 1.0;
+ GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
+ GRPCChannelConfiguration *config = [[GRPCChannelConfiguration alloc] initWithHost:kDummyHost
+ callOptions:options];
+ GRPCPooledChannel *channel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:config
+ destroyDelay:kDestroyDelay];
+ GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
+ GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:kDummyPath
+ completionQueue:cq
+ callOptions:options];
+ XCTAssertNotNil(channel.wrappedChannel);
+
+ // Disconnect; expect wrapped channel to be dropped
+ [channel disconnect];
+ XCTAssertNil(channel.wrappedChannel);
+
+ // Create a new call and unref the old call; confirm that destroy of the old call does not make
+ // the channel disconnect, even after the destroy delay.
+ GRPCWrappedCall *wrappedCall2 = [channel wrappedCallWithPath:kDummyPath
+ completionQueue:cq
+ callOptions:options];
+ XCTAssertNotNil(channel.wrappedChannel);
+ GRPCChannel *wrappedChannel = channel.wrappedChannel;
+ wrappedCall = nil;
+ sleep(kDestroyDelay + 1);
+ XCTAssertNotNil(channel.wrappedChannel);
+ XCTAssertEqual(wrappedChannel, channel.wrappedChannel);
+ (void)wrappedCall2;
}
@end
diff --git a/src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/ChannelTests.xcscheme b/src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/ChannelTests.xcscheme
index 8c8623d7b2..acae965bed 100644
--- a/src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/ChannelTests.xcscheme
+++ b/src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/ChannelTests.xcscheme
@@ -37,11 +37,6 @@
BlueprintName = "ChannelTests"
ReferencedContainer = "container:Tests.xcodeproj">
</BuildableReference>
- <SkippedTests>
- <Test
- Identifier = "ChannelTests">
- </Test>
- </SkippedTests>
</TestableReference>
</Testables>
<AdditionalOptions>