aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2018-11-26 17:17:09 -0800
committerGravatar Muxi Yan <mxyan@google.com>2018-11-26 17:38:52 -0800
commit5ae61f5a5a267f5975248d4262133a740e09a66b (patch)
tree17b9c4a8131c7b657865e5b65236798115bc541c
parent03c73e92f1dedb1de4bba0269e7c614b47cf8035 (diff)
Multiple fixes
-rw-r--r--src/objective-c/GRPCClient/GRPCCall+ChannelArg.m2
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m53
-rw-r--r--src/objective-c/GRPCClient/private/ChannelArgsUtil.m21
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannel.h6
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannel.m66
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannelPool.h49
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannelPool.m145
-rw-r--r--src/objective-c/GRPCClient/private/GRPCWrappedCall.m2
-rw-r--r--src/objective-c/ProtoRPC/ProtoRPC.m38
-rw-r--r--src/objective-c/tests/ChannelTests/ChannelPoolTest.m40
-rw-r--r--src/objective-c/tests/ChannelTests/ChannelTests.m4
-rw-r--r--src/objective-c/tests/GRPCClientTests.m7
12 files changed, 221 insertions, 212 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall+ChannelArg.m b/src/objective-c/GRPCClient/GRPCCall+ChannelArg.m
index 703cff63bb..ae60d6208e 100644
--- a/src/objective-c/GRPCClient/GRPCCall+ChannelArg.m
+++ b/src/objective-c/GRPCClient/GRPCCall+ChannelArg.m
@@ -36,7 +36,7 @@
}
+ (void)closeOpenConnections {
- [[GRPCChannelPool sharedInstance] closeOpenConnections];
+ [[GRPCChannelPool sharedInstance] disconnectAllChannels];
}
+ (void)setDefaultCompressMethod:(GRPCCompressAlgorithm)algorithm forhost:(nonnull NSString *)host {
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 94e470d4ed..bf9441c27e 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -69,11 +69,8 @@ const char *kCFStreamVarName = "grpc_cfstream";
- (instancetype)initWithHost:(NSString *)host path:(NSString *)path safety:(GRPCCallSafety)safety {
NSAssert(host.length != 0 && path.length != 0, @"Host and Path cannot be empty");
- if (host.length == 0) {
- host = [NSString string];
- }
- if (path.length == 0) {
- path = [NSString string];
+ if (host.length == 0 || path.length == 0) {
+ return nil;
}
if ((self = [super init])) {
_host = [host copy];
@@ -173,7 +170,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
- (void)start {
- GRPCCall *call = nil;
+ GRPCCall *copiedCall = nil;
@synchronized(self) {
NSAssert(!_started, @"Call already started.");
NSAssert(!_canceled, @"Call already canceled.");
@@ -197,7 +194,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
if (_callOptions.initialMetadata) {
[_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata];
}
- call = _call;
+ copiedCall = _call;
}
void (^valueHandler)(id value) = ^(id value) {
@@ -235,11 +232,11 @@ const char *kCFStreamVarName = "grpc_cfstream";
};
id<GRXWriteable> responseWriteable =
[[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler];
- [call startWithWriteable:responseWriteable];
+ [copiedCall startWithWriteable:responseWriteable];
}
- (void)cancel {
- GRPCCall *call = nil;
+ GRPCCall *copiedCall = nil;
@synchronized(self) {
if (_canceled) {
return;
@@ -247,7 +244,7 @@ const char *kCFStreamVarName = "grpc_cfstream";
_canceled = YES;
- call = _call;
+ copiedCall = _call;
_call = nil;
_pipe = nil;
@@ -268,13 +265,15 @@ const char *kCFStreamVarName = "grpc_cfstream";
@"Canceled by app"
}]];
});
+ } else {
+ _handler = nil;
}
}
- [call cancel];
+ [copiedCall cancel];
}
- (void)writeData:(NSData *)data {
- GRXBufferedPipe *pipe = nil;
+ GRXBufferedPipe *copiedPipe = nil;
@synchronized(self) {
NSAssert(!_canceled, @"Call arleady canceled.");
NSAssert(!_finished, @"Call is half-closed before sending data.");
@@ -286,14 +285,14 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
if (_pipe) {
- pipe = _pipe;
+ copiedPipe = _pipe;
}
}
- [pipe writeValue:data];
+ [copiedPipe writeValue:data];
}
- (void)finish {
- GRXBufferedPipe *pipe = nil;
+ GRXBufferedPipe *copiedPipe = nil;
@synchronized(self) {
NSAssert(_started, @"Call not started.");
NSAssert(!_canceled, @"Call arleady canceled.");
@@ -309,12 +308,12 @@ const char *kCFStreamVarName = "grpc_cfstream";
}
if (_pipe) {
- pipe = _pipe;
+ copiedPipe = _pipe;
_pipe = nil;
}
_finished = YES;
}
- [pipe writesFinishedWithError:nil];
+ [copiedPipe writesFinishedWithError:nil];
}
- (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
@@ -322,11 +321,11 @@ const char *kCFStreamVarName = "grpc_cfstream";
if (initialMetadata != nil &&
[_handler respondsToSelector:@selector(receivedInitialMetadata:)]) {
dispatch_async(_dispatchQueue, ^{
- id<GRPCResponseHandler> handler = nil;
+ id<GRPCResponseHandler> copiedHandler = nil;
@synchronized(self) {
- handler = self->_handler;
+ copiedHandler = self->_handler;
}
- [handler receivedInitialMetadata:initialMetadata];
+ [copiedHandler receivedInitialMetadata:initialMetadata];
});
}
}
@@ -336,11 +335,11 @@ const char *kCFStreamVarName = "grpc_cfstream";
@synchronized(self) {
if (message != nil && [_handler respondsToSelector:@selector(receivedRawMessage:)]) {
dispatch_async(_dispatchQueue, ^{
- id<GRPCResponseHandler> handler = nil;
+ id<GRPCResponseHandler> copiedHandler = nil;
@synchronized(self) {
- handler = self->_handler;
+ copiedHandler = self->_handler;
}
- [handler receivedRawMessage:message];
+ [copiedHandler receivedRawMessage:message];
});
}
}
@@ -350,14 +349,16 @@ const char *kCFStreamVarName = "grpc_cfstream";
@synchronized(self) {
if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
dispatch_async(_dispatchQueue, ^{
- id<GRPCResponseHandler> handler = nil;
+ id<GRPCResponseHandler> copiedHandler = nil;
@synchronized(self) {
- handler = self->_handler;
+ copiedHandler = self->_handler;
// Clean up _handler so that no more responses are reported to the handler.
self->_handler = nil;
}
- [handler closedWithTrailingMetadata:trailingMetadata error:error];
+ [copiedHandler closedWithTrailingMetadata:trailingMetadata error:error];
});
+ } else {
+ _handler = nil;
}
}
}
diff --git a/src/objective-c/GRPCClient/private/ChannelArgsUtil.m b/src/objective-c/GRPCClient/private/ChannelArgsUtil.m
index d9e44b557d..3135fcff02 100644
--- a/src/objective-c/GRPCClient/private/ChannelArgsUtil.m
+++ b/src/objective-c/GRPCClient/private/ChannelArgsUtil.m
@@ -60,36 +60,35 @@ grpc_channel_args *GRPCBuildChannelArgs(NSDictionary *dictionary) {
NSUInteger argCount = [keys count];
grpc_channel_args *channelArgs = gpr_malloc(sizeof(grpc_channel_args));
- channelArgs->num_args = argCount;
channelArgs->args = gpr_malloc(argCount * sizeof(grpc_arg));
// TODO(kriswuollett) Check that keys adhere to GRPC core library requirements
+ NSUInteger j = 0;
for (NSUInteger i = 0; i < argCount; ++i) {
- grpc_arg *arg = &channelArgs->args[i];
+ grpc_arg *arg = &channelArgs->args[j];
arg->key = gpr_strdup([keys[i] UTF8String]);
id value = dictionary[keys[i]];
if ([value respondsToSelector:@selector(UTF8String)]) {
arg->type = GRPC_ARG_STRING;
arg->value.string = gpr_strdup([value UTF8String]);
+ j++;
} else if ([value respondsToSelector:@selector(intValue)]) {
- if ([value compare:[NSNumber numberWithInteger:INT_MAX]] == NSOrderedDescending ||
- [value compare:[NSNumber numberWithInteger:INT_MIN]] == NSOrderedAscending) {
- [NSException raise:NSInvalidArgumentException
- format:@"Out of range for a value-typed channel argument: %@", value];
+ int64_t value64 = [value longLongValue];
+ if (value64 <= INT_MAX || value64 >= INT_MIN) {
+ arg->type = GRPC_ARG_INTEGER;
+ arg->value.integer = [value intValue];
+ j++;
}
- arg->type = GRPC_ARG_INTEGER;
- arg->value.integer = [value intValue];
} else if (value != nil) {
arg->type = GRPC_ARG_POINTER;
arg->value.pointer.p = (__bridge_retained void *)value;
arg->value.pointer.vtable = &objc_arg_vtable;
- } else {
- [NSException raise:NSInvalidArgumentException
- format:@"Invalid channel argument type: %@", [value class]];
+ j++;
}
}
+ channelArgs->num_args = j;
return channelArgs;
}
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.h b/src/objective-c/GRPCClient/private/GRPCChannel.h
index 5426b28d75..147015bed1 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannel.h
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.h
@@ -32,6 +32,10 @@ NS_ASSUME_NONNULL_BEGIN
/** Caching signature of a channel. */
@interface GRPCChannelConfiguration : NSObject<NSCopying>
+- (instancetype)init NS_UNAVAILABLE;
+
++ (instancetype) new NS_UNAVAILABLE;
+
/** The host that this channel is connected to. */
@property(copy, readonly) NSString *host;
@@ -47,7 +51,7 @@ NS_ASSUME_NONNULL_BEGIN
/** Acquire the dictionary of channel args with current configurations. */
@property(copy, readonly) NSDictionary *channelArgs;
-- (nullable instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions;
+- (nullable instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions NS_DESIGNATED_INITIALIZER;
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m
index e4cefc338c..24cf670d1b 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannel.m
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.m
@@ -39,8 +39,9 @@
- (instancetype)initWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions {
NSAssert(host.length > 0, @"Host must not be empty.");
NSAssert(callOptions != nil, @"callOptions must not be empty.");
- if (host.length == 0) return nil;
- if (callOptions == nil) return nil;
+ if (host.length == 0 || callOptions == nil) {
+ return nil;
+ }
if ((self = [super init])) {
_host = [host copy];
@@ -180,7 +181,9 @@
- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration {
NSAssert(channelConfiguration != nil, @"channelConfiguration must not be empty.");
- if (channelConfiguration == nil) return nil;
+ if (channelConfiguration == nil) {
+ return nil;
+ }
if ((self = [super init])) {
_configuration = [channelConfiguration copy];
@@ -218,35 +221,34 @@
if (callOptions == nil) return NULL;
grpc_call *call = NULL;
- @synchronized(self) {
- NSAssert(_unmanagedChannel != NULL, @"Channel should have valid unmanaged channel.");
- if (_unmanagedChannel == NULL) return NULL;
-
- NSString *serverAuthority =
- callOptions.transportType == GRPCTransportTypeCronet ? nil : callOptions.serverAuthority;
- NSTimeInterval timeout = callOptions.timeout;
- NSAssert(timeout >= 0, @"Invalid timeout");
- if (timeout < 0) return NULL;
- grpc_slice host_slice = grpc_empty_slice();
- if (serverAuthority) {
- host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String);
- }
- grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String);
- gpr_timespec deadline_ms =
- timeout == 0 ? gpr_inf_future(GPR_CLOCK_REALTIME)
- : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN));
- call = grpc_channel_create_call(_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS,
- queue.unmanagedQueue, path_slice,
- serverAuthority ? &host_slice : NULL, deadline_ms, NULL);
- if (serverAuthority) {
- grpc_slice_unref(host_slice);
- }
- grpc_slice_unref(path_slice);
- NSAssert(call != nil, @"Unable to create call.");
- if (call == NULL) {
- NSLog(@"Unable to create call.");
- }
+ // No need to lock here since _unmanagedChannel is only changed in _dealloc
+ NSAssert(_unmanagedChannel != NULL, @"Channel should have valid unmanaged channel.");
+ if (_unmanagedChannel == NULL) return NULL;
+
+ NSString *serverAuthority =
+ callOptions.transportType == GRPCTransportTypeCronet ? nil : callOptions.serverAuthority;
+ NSTimeInterval timeout = callOptions.timeout;
+ NSAssert(timeout >= 0, @"Invalid timeout");
+ if (timeout < 0) return NULL;
+ grpc_slice host_slice = grpc_empty_slice();
+ if (serverAuthority) {
+ host_slice = grpc_slice_from_copied_string(serverAuthority.UTF8String);
+ }
+ grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String);
+ gpr_timespec deadline_ms =
+ timeout == 0 ? gpr_inf_future(GPR_CLOCK_REALTIME)
+ : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN));
+ call = grpc_channel_create_call(_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS,
+ queue.unmanagedQueue, path_slice,
+ serverAuthority ? &host_slice : NULL, deadline_ms, NULL);
+ if (serverAuthority) {
+ grpc_slice_unref(host_slice);
+ }
+ grpc_slice_unref(path_slice);
+ NSAssert(call != nil, @"Unable to create call.");
+ if (call == NULL) {
+ NSLog(@"Unable to create call.");
}
return call;
}
diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.h b/src/objective-c/GRPCClient/private/GRPCChannelPool.h
index 1e3c1d7d97..26600ef3a9 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannelPool.h
+++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.h
@@ -39,12 +39,16 @@ NS_ASSUME_NONNULL_BEGIN
*/
@interface GRPCPooledChannel : NSObject
+- (nullable instancetype)init NS_UNAVAILABLE;
+
++ (nullable instancetype)new NS_UNAVAILABLE;
+
/**
* Initialize with an actual channel object \a channel and a reference to the channel pool.
*/
- (nullable instancetype)initWithChannelConfiguration:
(GRPCChannelConfiguration *)channelConfiguration
- channelPool:(GRPCChannelPool *)channelPool;
+ channelPool:(GRPCChannelPool *)channelPool NS_DESIGNATED_INITIALIZER;
/**
* Create a grpc core call object (grpc_call) from this channel. If channel is disconnected, get a
@@ -59,17 +63,21 @@ NS_ASSUME_NONNULL_BEGIN
* \a unmanagedCallWithPath:completionQueue:callOptions: and decrease channel refcount. If refcount
* of the channel becomes 0, return the channel object to channel pool.
*/
-- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall;
+- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall;
/**
- * Force the channel to disconnect immediately.
+ * Force the channel to disconnect immediately. Subsequent calls to unmanagedCallWithPath: will
+ * attempt to reconnect to the remote channel.
*/
- (void)disconnect;
-// The following methods and properties are for test only
+@end
+
+/** Test-only interface for \a GRPCPooledChannel. */
+@interface GRPCPooledChannel (Test)
/**
- * Return the pointer to the real channel wrapped by the proxy.
+ * Return the pointer to the raw channel wrapped.
*/
@property(atomic, readonly) GRPCChannel *wrappedChannel;
@@ -81,6 +89,10 @@ NS_ASSUME_NONNULL_BEGIN
*/
@interface GRPCChannelPool : NSObject
+- (nullable instancetype)init NS_UNAVAILABLE;
+
++ (nullable instancetype)new NS_UNAVAILABLE;
+
/**
* Get the global channel pool.
*/
@@ -92,31 +104,20 @@ NS_ASSUME_NONNULL_BEGIN
- (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions;
/**
- * This method is deprecated.
- *
- * Destroy all open channels and close their connections.
+ * Disconnect all channels in this pool.
*/
-- (void)closeOpenConnections;
-
-// Test-only methods below
+- (void)disconnectAllChannels;
-/**
- * Get an instance of pool isolated from the global shared pool. This method is for test only.
- * Global pool should be used in production.
- */
-- (nullable instancetype)init;
+@end
-/**
- * Simulate a network transition event and destroy all channels. This method is for internal and
- * test only.
- */
-- (void)disconnectAllChannels;
+/** Test-only interface for \a GRPCChannelPool. */
+@interface GRPCChannelPool (Test)
/**
- * Set the destroy delay of channels. A channel should be destroyed if it stayed idle (no active
- * call on it) for this period of time. This property is for test only.
+ * Get an instance of pool isolated from the global shared pool with channels' destroy delay being
+ * \a destroyDelay.
*/
-@property(atomic) NSTimeInterval destroyDelay;
+- (nullable instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay;
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.m b/src/objective-c/GRPCClient/private/GRPCChannelPool.m
index 7c139b3717..f6615b5840 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannelPool.m
+++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.m
@@ -52,10 +52,9 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
__weak GRPCChannelPool *_channelPool;
GRPCChannelConfiguration *_channelConfiguration;
NSMutableSet *_unmanagedCalls;
+ GRPCChannel *_wrappedChannel;
}
-@synthesize wrappedChannel = _wrappedChannel;
-
- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration
channelPool:(GRPCChannelPool *)channelPool {
NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty.");
@@ -68,11 +67,17 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
_channelPool = channelPool;
_channelConfiguration = channelConfiguration;
_unmanagedCalls = [NSMutableSet set];
+ _wrappedChannel = nil;
}
return self;
}
+- (void)dealloc {
+ NSAssert([_unmanagedCalls count] == 0 && _wrappedChannel == nil, @"Pooled channel should only be"
+ "destroyed after the wrapped channel is destroyed");
+}
+
- (grpc_call *)unmanagedCallWithPath:(NSString *)path
completionQueue:(GRPCCompletionQueue *)queue
callOptions:(GRPCCallOptions *)callOptions {
@@ -99,31 +104,38 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
return call;
}
-- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall {
- if (unmanagedCall == nil) return;
+- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall {
+ if (unmanagedCall == NULL) {
+ return;
+ }
grpc_call_unref(unmanagedCall);
- BOOL timedDestroy = NO;
@synchronized(self) {
- if ([_unmanagedCalls containsObject:[NSValue valueWithPointer:unmanagedCall]]) {
- [_unmanagedCalls removeObject:[NSValue valueWithPointer:unmanagedCall]];
- if ([_unmanagedCalls count] == 0) {
- timedDestroy = YES;
- }
+ NSValue *removedCall = [NSValue valueWithPointer:unmanagedCall];
+ [_unmanagedCalls removeObject:removedCall];
+ if ([_unmanagedCalls count] == 0) {
+ _wrappedChannel = nil;
+ GRPCChannelPool *strongPool = _channelPool;
+ [strongPool unrefChannelWithConfiguration:_channelConfiguration];
}
}
- if (timedDestroy) {
- [self timedDestroy];
- }
}
- (void)disconnect {
@synchronized(self) {
- _wrappedChannel = nil;
- [_unmanagedCalls removeAllObjects];
+ if (_wrappedChannel != nil) {
+ _wrappedChannel = nil;
+ [_unmanagedCalls removeAllObjects];
+ GRPCChannelPool *strongPool = _channelPool;
+ [strongPool unrefChannelWithConfiguration:_channelConfiguration];
+ }
}
}
+@end
+
+@implementation GRPCPooledChannel (Test)
+
- (GRPCChannel *)wrappedChannel {
GRPCChannel *channel = nil;
@synchronized(self) {
@@ -132,67 +144,52 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
return channel;
}
-- (void)timedDestroy {
- __strong GRPCChannelPool *pool = nil;
- @synchronized(self) {
- // Check if we still want to destroy the channel.
- if ([_unmanagedCalls count] == 0) {
- pool = _channelPool;
- _wrappedChannel = nil;
- }
- }
- [pool unrefChannelWithConfiguration:_channelConfiguration];
-}
-
@end
/**
* A convenience value type for cached channel.
*/
-@interface GRPCChannelRecord : NSObject<NSCopying>
+@interface GRPCChannelRecord : NSObject
/** Pointer to the raw channel. May be nil when the channel has been destroyed. */
@property GRPCChannel *channel;
/** Channel proxy corresponding to this channel configuration. */
-@property GRPCPooledChannel *proxy;
+@property GRPCPooledChannel *pooledChannel;
/** Last time when a timed destroy is initiated on the channel. */
@property NSDate *timedDestroyDate;
/** Reference count of the proxy to the channel. */
-@property NSUInteger refcount;
+@property NSUInteger refCount;
@end
@implementation GRPCChannelRecord
-- (id)copyWithZone:(NSZone *)zone {
- GRPCChannelRecord *newRecord = [[GRPCChannelRecord allocWithZone:zone] init];
- newRecord.channel = _channel;
- newRecord.proxy = _proxy;
- newRecord.timedDestroyDate = _timedDestroyDate;
- newRecord.refcount = _refcount;
+@end
- return newRecord;
-}
+@interface GRPCChannelPool ()
+
+- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay NS_DESIGNATED_INITIALIZER;
@end
@implementation GRPCChannelPool {
NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannelRecord *> *_channelPool;
dispatch_queue_t _dispatchQueue;
+ NSTimeInterval _destroyDelay;
}
+ (instancetype)sharedInstance {
dispatch_once(&gInitChannelPool, ^{
- gChannelPool = [[GRPCChannelPool alloc] init];
+ gChannelPool = [[GRPCChannelPool alloc] initInstanceWithDestroyDelay:kDefaultChannelDestroyDelay];
NSAssert(gChannelPool != nil, @"Cannot initialize global channel pool.");
});
return gChannelPool;
}
-- (instancetype)init {
+- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay {
if ((self = [super init])) {
_channelPool = [NSMutableDictionary dictionary];
#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
@@ -206,7 +203,7 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
#endif
_dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
}
- _destroyDelay = kDefaultChannelDestroyDelay;
+ _destroyDelay = destroyDelay;
// Connectivity monitor is not required for CFStream
char *enableCFStream = getenv(kCFStreamVarName);
@@ -217,56 +214,56 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
return self;
}
+- (void)dealloc {
+ [GRPCConnectivityMonitor unregisterObserver:self];
+}
+
- (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions {
NSAssert(host.length > 0, @"Host must not be empty.");
NSAssert(callOptions != nil, @"callOptions must not be empty.");
- if (host.length == 0) return nil;
- if (callOptions == nil) return nil;
+ if (host.length == 0 || callOptions == nil) {
+ return nil;
+ }
- GRPCPooledChannel *channelProxy = nil;
+ GRPCPooledChannel *pooledChannel = nil;
GRPCChannelConfiguration *configuration =
[[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions];
@synchronized(self) {
GRPCChannelRecord *record = _channelPool[configuration];
if (record == nil) {
record = [[GRPCChannelRecord alloc] init];
- record.proxy =
+ record.pooledChannel =
[[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration channelPool:self];
- record.timedDestroyDate = nil;
_channelPool[configuration] = record;
- channelProxy = record.proxy;
+ pooledChannel = record.pooledChannel;
} else {
- channelProxy = record.proxy;
+ pooledChannel = record.pooledChannel;
}
}
- return channelProxy;
-}
-
-- (void)closeOpenConnections {
- [self disconnectAllChannels];
+ return pooledChannel;
}
- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration {
GRPCChannel *ret = nil;
@synchronized(self) {
NSAssert(configuration != nil, @"configuration cannot be empty.");
- if (configuration == nil) return nil;
+ if (configuration == nil) {
+ return nil;
+ }
GRPCChannelRecord *record = _channelPool[configuration];
NSAssert(record != nil, @"No record corresponding to a proxy.");
- if (record == nil) return nil;
+ if (record == nil) {
+ return nil;
+ }
+ record.refCount++;
+ record.timedDestroyDate = nil;
if (record.channel == nil) {
// Channel is already destroyed;
record.channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration];
- record.timedDestroyDate = nil;
- record.refcount = 1;
- ret = record.channel;
- } else {
- ret = record.channel;
- record.timedDestroyDate = nil;
- record.refcount++;
}
+ ret = record.channel;
}
return ret;
}
@@ -275,11 +272,13 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
@synchronized(self) {
GRPCChannelRecord *record = _channelPool[configuration];
NSAssert(record != nil, @"No record corresponding to a proxy.");
- if (record == nil) return;
- NSAssert(record.refcount > 0, @"Inconsistent channel refcount.");
- if (record.refcount > 0) {
- record.refcount--;
- if (record.refcount == 0) {
+ if (record == nil) {
+ return;
+ }
+ NSAssert(record.refCount > 0, @"Inconsistent channel refcount.");
+ if (record.refCount > 0) {
+ record.refCount--;
+ if (record.refCount == 0) {
NSDate *now = [NSDate date];
record.timedDestroyDate = now;
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(_destroyDelay * NSEC_PER_SEC)),
@@ -288,7 +287,6 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
if (now == record.timedDestroyDate) {
// Destroy the raw channel and reset related records.
record.timedDestroyDate = nil;
- record.refcount = 0;
record.channel = nil;
}
}
@@ -306,8 +304,7 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
GRPCChannelRecord *_Nonnull obj, BOOL *_Nonnull stop) {
obj.channel = nil;
obj.timedDestroyDate = nil;
- obj.refcount = 0;
- [proxySet addObject:obj.proxy];
+ [proxySet addObject:obj.pooledChannel];
}];
}
// Disconnect proxies
@@ -320,8 +317,12 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
[self disconnectAllChannels];
}
-- (void)dealloc {
- [GRPCConnectivityMonitor unregisterObserver:self];
+@end
+
+@implementation GRPCChannelPool (Test)
+
+- (instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay {
+ return [self initInstanceWithDestroyDelay:destroyDelay];
}
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
index 615dfc8556..5788d0a003 100644
--- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
+++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
@@ -317,7 +317,7 @@
}
- (void)dealloc {
- [_channel unrefUnmanagedCall:_call];
+ [_channel destroyUnmanagedCall:_call];
_channel = nil;
}
diff --git a/src/objective-c/ProtoRPC/ProtoRPC.m b/src/objective-c/ProtoRPC/ProtoRPC.m
index 2de2932072..dff88b8591 100644
--- a/src/objective-c/ProtoRPC/ProtoRPC.m
+++ b/src/objective-c/ProtoRPC/ProtoRPC.m
@@ -133,18 +133,18 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
}
- (void)cancel {
- GRPCCall2 *call;
+ GRPCCall2 *copiedCall;
@synchronized(self) {
- call = _call;
+ copiedCall = _call;
_call = nil;
if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
dispatch_async(_handler.dispatchQueue, ^{
- id<GRPCProtoResponseHandler> handler = nil;
+ id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
- handler = self->_handler;
+ copiedHandler = self->_handler;
self->_handler = nil;
}
- [handler closedWithTrailingMetadata:nil
+ [copiedHandler closedWithTrailingMetadata:nil
error:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeCancelled
userInfo:@{
@@ -152,9 +152,11 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
@"Canceled by app"
}]];
});
+ } else {
+ _handler = nil;
}
}
- [call cancel];
+ [copiedCall cancel];
}
- (void)writeMessage:(GPBMessage *)message {
@@ -182,11 +184,11 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
@synchronized(self) {
if (initialMetadata != nil && [_handler respondsToSelector:@selector(initialMetadata:)]) {
dispatch_async(_dispatchQueue, ^{
- id<GRPCProtoResponseHandler> handler = nil;
+ id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
- handler = self->_handler;
+ copiedHandler = self->_handler;
}
- [handler receivedInitialMetadata:initialMetadata];
+ [copiedHandler receivedInitialMetadata:initialMetadata];
});
}
}
@@ -200,21 +202,21 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
@synchronized(self) {
if (parsed && [_handler respondsToSelector:@selector(receivedProtoMessage:)]) {
dispatch_async(_dispatchQueue, ^{
- id<GRPCProtoResponseHandler> handler = nil;
+ id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
- handler = self->_handler;
+ copiedHandler = self->_handler;
}
- [handler receivedProtoMessage:parsed];
+ [copiedHandler receivedProtoMessage:parsed];
});
} else if (!parsed &&
[_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
dispatch_async(_dispatchQueue, ^{
- id<GRPCProtoResponseHandler> handler = nil;
+ id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
- handler = self->_handler;
+ copiedHandler = self->_handler;
self->_handler = nil;
}
- [handler closedWithTrailingMetadata:nil
+ [copiedHandler closedWithTrailingMetadata:nil
error:ErrorForBadProto(message, _responseClass, error)];
});
[_call cancel];
@@ -227,12 +229,12 @@ static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsing
@synchronized(self) {
if ([_handler respondsToSelector:@selector(closedWithTrailingMetadata:error:)]) {
dispatch_async(_dispatchQueue, ^{
- id<GRPCProtoResponseHandler> handler = nil;
+ id<GRPCProtoResponseHandler> copiedHandler = nil;
@synchronized(self) {
- handler = self->_handler;
+ copiedHandler = self->_handler;
self->_handler = nil;
}
- [handler closedWithTrailingMetadata:trailingMetadata error:error];
+ [copiedHandler closedWithTrailingMetadata:trailingMetadata error:error];
});
}
_call = nil;
diff --git a/src/objective-c/tests/ChannelTests/ChannelPoolTest.m b/src/objective-c/tests/ChannelTests/ChannelPoolTest.m
index 4424801c11..9461945560 100644
--- a/src/objective-c/tests/ChannelTests/ChannelPoolTest.m
+++ b/src/objective-c/tests/ChannelTests/ChannelPoolTest.m
@@ -28,6 +28,8 @@ NSString *kDummyHost = @"dummy.host";
NSString *kDummyHost2 = @"dummy.host.2";
NSString *kDummyPath = @"/dummy/path";
+const NSTimeInterval kDestroyDelay = 1.0;
+
@interface ChannelPoolTest : XCTestCase
@end
@@ -39,7 +41,7 @@ NSString *kDummyPath = @"/dummy/path";
}
- (void)testCreateChannelAndCall {
- GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
+ GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCPooledChannel *channel =
(GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
@@ -49,12 +51,12 @@ NSString *kDummyPath = @"/dummy/path";
[channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
XCTAssert(call != NULL);
XCTAssertNotNil(channel.wrappedChannel);
- [channel unrefUnmanagedCall:call];
+ [channel destroyUnmanagedCall:call];
XCTAssertNil(channel.wrappedChannel);
}
- (void)testCacheChannel {
- GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
+ GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
GRPCCallOptions *options1 = [[GRPCCallOptions alloc] init];
GRPCCallOptions *options2 = [options1 copy];
GRPCMutableCallOptions *options3 = [options1 mutableCopy];
@@ -80,17 +82,14 @@ NSString *kDummyPath = @"/dummy/path";
XCTAssertNotEqual(channel1.wrappedChannel, channel3.wrappedChannel);
XCTAssertNotEqual(channel1.wrappedChannel, channel4.wrappedChannel);
XCTAssertNotEqual(channel3.wrappedChannel, channel4.wrappedChannel);
- [channel1 unrefUnmanagedCall:call1];
- [channel2 unrefUnmanagedCall:call2];
- [channel3 unrefUnmanagedCall:call3];
- [channel4 unrefUnmanagedCall:call4];
+ [channel1 destroyUnmanagedCall:call1];
+ [channel2 destroyUnmanagedCall:call2];
+ [channel3 destroyUnmanagedCall:call3];
+ [channel4 destroyUnmanagedCall:call4];
}
- (void)testTimedDestroyChannel {
- const NSTimeInterval kDestroyDelay = 1.0;
-
- GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
- pool.destroyDelay = kDestroyDelay;
+ GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCPooledChannel *channel =
(GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
@@ -99,25 +98,24 @@ NSString *kDummyPath = @"/dummy/path";
[channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
GRPCChannel *wrappedChannel = channel.wrappedChannel;
- [channel unrefUnmanagedCall:call];
+ [channel destroyUnmanagedCall:call];
// Confirm channel is not destroyed at this time
call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
XCTAssertEqual(wrappedChannel, channel.wrappedChannel);
- [channel unrefUnmanagedCall:call];
+ [channel destroyUnmanagedCall:call];
sleep(kDestroyDelay + 1);
// Confirm channel is new at this time
call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel);
// Confirm the new channel can create call
- call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
XCTAssert(call != NULL);
- [channel unrefUnmanagedCall:call];
+ [channel destroyUnmanagedCall:call];
}
- (void)testPoolDisconnection {
- GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
+ GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCPooledChannel *channel =
(GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
@@ -141,12 +139,12 @@ NSString *kDummyPath = @"/dummy/path";
[channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
XCTAssertNotNil(channel.wrappedChannel);
XCTAssertNotEqual(channel.wrappedChannel, wrappedChannel);
- [channel unrefUnmanagedCall:call];
- [channel unrefUnmanagedCall:call2];
+ [channel destroyUnmanagedCall:call];
+ [channel destroyUnmanagedCall:call2];
}
- (void)testUnrefCallFromStaleChannel {
- GRPCChannelPool *pool = [[GRPCChannelPool alloc] init];
+ GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
GRPCPooledChannel *channel =
(GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
@@ -163,12 +161,12 @@ NSString *kDummyPath = @"/dummy/path";
// destroy state
XCTAssertNotNil(channel.wrappedChannel);
GRPCChannel *wrappedChannel = channel.wrappedChannel;
- [channel unrefUnmanagedCall:call];
+ [channel destroyUnmanagedCall:call];
XCTAssertNotNil(channel.wrappedChannel);
XCTAssertEqual(wrappedChannel, channel.wrappedChannel);
// Test unref the call of the current channel will cause the channel going into timed destroy
// state
- [channel unrefUnmanagedCall:call2];
+ [channel destroyUnmanagedCall:call2];
XCTAssertNil(channel.wrappedChannel);
}
diff --git a/src/objective-c/tests/ChannelTests/ChannelTests.m b/src/objective-c/tests/ChannelTests/ChannelTests.m
index c07c8e6983..5547449092 100644
--- a/src/objective-c/tests/ChannelTests/ChannelTests.m
+++ b/src/objective-c/tests/ChannelTests/ChannelTests.m
@@ -35,7 +35,7 @@
completionQueue:(GRPCCompletionQueue *)queue
callOptions:(GRPCCallOptions *)callOptions;
-- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall;
+- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall;
@end
@@ -66,7 +66,7 @@
return (grpc_call *)(++_grpcCallCounter);
}
-- (void)unrefUnmanagedCall:(grpc_call *)unmanagedCall {
+- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall {
if (_unrefExpectation) [_unrefExpectation fulfill];
}
diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m
index 834bf6d661..2cfdd1a003 100644
--- a/src/objective-c/tests/GRPCClientTests.m
+++ b/src/objective-c/tests/GRPCClientTests.m
@@ -554,13 +554,14 @@ static GRPCProtoMethod *kFullDuplexCallMethod;
__weak XCTestExpectation *completion = [self expectationWithDescription:@"Timeout in a second."];
NSString *const kDummyAddress = [NSString stringWithFormat:@"8.8.8.8:1"];
- GRPCCall *call = [[GRPCCall alloc] initWithHost:kDummyAddress
- path:@"/dummyPath"
- requestsWriter:[GRXWriter writerWithValue:[NSData data]]];
+ [GRPCCall useInsecureConnectionsForHost:kDummyAddress];
[GRPCCall setMinConnectTimeout:timeout * 1000
initialBackoff:backoff * 1000
maxBackoff:0
forHost:kDummyAddress];
+ GRPCCall *call = [[GRPCCall alloc] initWithHost:kDummyAddress
+ path:@"/dummyPath"
+ requestsWriter:[GRXWriter writerWithValue:[NSData data]]];
NSDate *startTime = [NSDate date];
id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(id value) {
XCTAssert(NO, @"Received message. Should not reach here");