diff options
author | Muxi Yan <muxi@users.noreply.github.com> | 2018-02-27 17:00:37 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-27 17:00:37 -0800 |
commit | 0a03beea7a0c1bced34c778d70e8937ddce5a6ea (patch) | |
tree | 697926c6a4262ceec6730f22f00d395edc56e6c5 | |
parent | 0fc97adc9ee41d517ee49ec8e3a8338b793fba7e (diff) | |
parent | 822d175ac29b8098697d476af04282f94de486a3 (diff) |
Merge pull request #14529 from muxi/polish-connectivity-monitor
Refactor connectivity monitor on iOS
-rw-r--r-- | src/objective-c/GRPCClient/GRPCCall.m | 136 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h | 58 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m | 199 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCHost.m | 25 |
4 files changed, 165 insertions, 253 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index ac4596da25..02492607cd 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -108,6 +108,9 @@ static NSString * const kBearerPrefix = @"Bearer "; // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch // queue dispatch_queue_t _responseQueue; + + // Whether the call is finished. If it is, should not call finishWithError again. + BOOL _finished; } @synthesize state = _state; @@ -206,6 +209,8 @@ static NSString * const kBearerPrefix = @"Bearer "; } else { [_responseWriteable enqueueSuccessfulCompletion]; } + + [GRPCConnectivityMonitor unregisterObserver:self]; } - (void)cancelCall { @@ -214,9 +219,10 @@ static NSString * const kBearerPrefix = @"Bearer "; } - (void)cancel { - [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeCancelled - userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]]; + [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeCancelled + userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]]; + if (!self.isWaitingForToken) { [self cancelCall]; } else { @@ -224,6 +230,19 @@ static NSString * const kBearerPrefix = @"Bearer "; } } +- (void)maybeFinishWithError:(NSError *)errorOrNil { + BOOL toFinish = NO; + @synchronized(self) { + if (_finished == NO) { + _finished = YES; + toFinish = YES; + } + } + if (toFinish == YES) { + [self finishWithError:errorOrNil]; + } +} + - (void)dealloc { __block GRPCWrappedCall *wrappedCall = _wrappedCall; dispatch_async(_callQueue, ^{ @@ -250,11 +269,13 @@ static NSString * const kBearerPrefix = @"Bearer "; if (self.state == GRXWriterStatePaused) { return; } - __weak GRPCCall *weakSelf = self; - __weak GRXConcurrentWriteable *weakWriteable = _responseWriteable; dispatch_async(_callQueue, ^{ - [weakSelf startReadWithHandler:^(grpc_byte_buffer *message) { + __weak GRPCCall *weakSelf = self; + __weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable; + [self startReadWithHandler:^(grpc_byte_buffer *message) { + __strong GRPCCall *strongSelf = weakSelf; + __strong GRXConcurrentWriteable *strongWriteable = weakWriteable; if (message == NULL) { // No more messages from the server return; @@ -266,14 +287,14 @@ static NSString * const kBearerPrefix = @"Bearer "; // don't want to throw, because the app shouldn't crash for a behavior // that's on the hands of any server to have. Instead we finish and ask // the server to cancel. - [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeResourceExhausted - userInfo:@{NSLocalizedDescriptionKey: @"Client does not have enough memory to hold the server response."}]]; - [weakSelf cancelCall]; + [strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeResourceExhausted + userInfo:@{NSLocalizedDescriptionKey: @"Client does not have enough memory to hold the server response."}]]; + [strongSelf cancelCall]; return; } - [weakWriteable enqueueValue:data completionHandler:^{ - [weakSelf startNextRead]; + [strongWriteable enqueueValue:data completionHandler:^{ + [strongSelf startNextRead]; }]; }]; }); @@ -333,12 +354,17 @@ static NSString * const kBearerPrefix = @"Bearer "; _requestWriter.state = GRXWriterStatePaused; } - __weak GRPCCall *weakSelf = self; dispatch_async(_callQueue, ^{ - [weakSelf writeMessage:value withErrorHandler:^{ - [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeInternal - userInfo:nil]]; + __weak GRPCCall *weakSelf = self; + [self writeMessage:value withErrorHandler:^{ + __strong GRPCCall *strongSelf = weakSelf; + if (strongSelf != nil) { + [strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeInternal + userInfo:nil]]; + // Wrapped call must be canceled when error is reported to upper layers + [strongSelf cancelCall]; + } }]; }); } @@ -360,12 +386,15 @@ static NSString * const kBearerPrefix = @"Bearer "; if (errorOrNil) { [self cancel]; } else { - __weak GRPCCall *weakSelf = self; dispatch_async(_callQueue, ^{ - [weakSelf finishRequestWithErrorHandler:^{ - [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeInternal - userInfo:nil]]; + __weak GRPCCall *weakSelf = self; + [self finishRequestWithErrorHandler:^{ + __strong GRPCCall *strongSelf = weakSelf; + [strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeInternal + userInfo:nil]]; + // Wrapped call must be canceled when error is reported to upper layers + [strongSelf cancelCall]; }]; }); } @@ -387,30 +416,37 @@ static NSString * const kBearerPrefix = @"Bearer "; } - (void)invokeCall { + __weak GRPCCall *weakSelf = self; [self invokeCallWithHeadersHandler:^(NSDictionary *headers) { // Response headers received. - self.responseHeaders = headers; - [self startNextRead]; + __strong GRPCCall *strongSelf = weakSelf; + if (strongSelf) { + strongSelf.responseHeaders = headers; + [strongSelf startNextRead]; + } } completionHandler:^(NSError *error, NSDictionary *trailers) { - self.responseTrailers = trailers; + __strong GRPCCall *strongSelf = weakSelf; + if (strongSelf) { + strongSelf.responseTrailers = trailers; - if (error) { - NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; - if (error.userInfo) { - [userInfo addEntriesFromDictionary:error.userInfo]; - } - userInfo[kGRPCTrailersKey] = self.responseTrailers; - // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be - // called before this one, so an error might end up with trailers but no headers. We - // shouldn't call finishWithError until ater both blocks are called. It is also when this is - // done that we can provide a merged view of response headers and trailers in a thread-safe - // way. - if (self.responseHeaders) { - userInfo[kGRPCHeadersKey] = self.responseHeaders; + if (error) { + NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; + if (error.userInfo) { + [userInfo addEntriesFromDictionary:error.userInfo]; + } + userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers; + // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be + // called before this one, so an error might end up with trailers but no headers. We + // shouldn't call finishWithError until ater both blocks are called. It is also when this is + // done that we can provide a merged view of response headers and trailers in a thread-safe + // way. + if (strongSelf.responseHeaders) { + userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; + } + error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; } - error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; + [strongSelf maybeFinishWithError:error]; } - [self finishWithError:error]; }]; // Now that the RPC has been initiated, request writes can start. @synchronized(_requestWriter) { @@ -439,16 +475,8 @@ static NSString * const kBearerPrefix = @"Bearer "; // TODO(jcanizales): Check this on init. [NSException raise:NSInvalidArgumentException format:@"host of %@ is nil", _host]; } - _connectivityMonitor = [GRPCConnectivityMonitor monitorWithHost:host]; - __weak typeof(self) weakSelf = self; - void (^handler)(void) = ^{ - typeof(self) strongSelf = weakSelf; - [strongSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeUnavailable - userInfo:@{ NSLocalizedDescriptionKey : @"Connectivity lost." }]]; - }; - [_connectivityMonitor handleLossWithHandler:handler - wifiStatusChangeHandler:nil]; + [GRPCConnectivityMonitor registerObserver:self + selector:@selector(connectivityChanged:)]; } - (void)startWithWriteable:(id<GRXWriteable>)writeable { @@ -512,4 +540,12 @@ static NSString * const kBearerPrefix = @"Bearer "; } } +- (void)connectivityChanged:(NSNotification *)note { + [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ NSLocalizedDescriptionKey : @"Connectivity lost." }]]; + // Cancel underlying call upon this notification + [self cancelCall]; +} + @end diff --git a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h index cb55e46d70..394d21792d 100644 --- a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h +++ b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h @@ -19,44 +19,30 @@ #import <Foundation/Foundation.h> #import <SystemConfiguration/SystemConfiguration.h> -@interface GRPCReachabilityFlags : NSObject - -+ (nonnull instancetype)flagsWithFlags:(SCNetworkReachabilityFlags)flags; - -/** - * One accessor method to query each of the different flags. Example: - -@property(nonatomic, readonly) BOOL isCell; - - */ -#define GRPC_XMACRO_ITEM(methodName, FlagName) \ -@property(nonatomic, readonly) BOOL methodName; - -#include "GRPCReachabilityFlagNames.xmacro.h" -#undef GRPC_XMACRO_ITEM - -@property(nonatomic, readonly) BOOL isHostReachable; -@end - +typedef NS_ENUM(NSInteger, GRPCConnectivityStatus) { + GRPCConnectivityUnknown = 0, + GRPCConnectivityNoNetwork = 1, + GRPCConnectivityCellular = 2, + GRPCConnectivityWiFi = 3, +}; + +extern NSString * _Nonnull kGRPCConnectivityNotification; + +// This interface monitors OS reachability interface for any network status +// change. Parties interested in these events should register themselves as +// observer. @interface GRPCConnectivityMonitor : NSObject -+ (nullable instancetype)monitorWithHost:(nonnull NSString *)hostName; - - (nonnull instancetype)init NS_UNAVAILABLE; -/** - * Queue on which callbacks will be dispatched. Default is the main queue. Set it before calling - * handleLossWithHandler:. - */ -// TODO(jcanizales): Default to a serial background queue instead. -@property(nonatomic, strong, null_resettable) dispatch_queue_t queue; - -/** - * Calls handler every time the connectivity to this instance's host is lost. If this instance is - * released before that happens, the handler won't be called. - * Only one handler is active at a time, so if this method is called again before the previous - * handler has been called, it might never be called at all (or yes, if it has already been queued). - */ -- (void)handleLossWithHandler:(nullable void (^)(void))lossHandler - wifiStatusChangeHandler:(nullable void (^)(void))wifiStatusChangeHandler; +// Register an object as observer of network status change. \a observer +// must have a notification method with one parameter of type +// (NSNotification *) and should pass it to parameter \a selector. The +// parameter of this notification method is not used for now. ++ (void)registerObserver:(_Nonnull id)observer + selector:(_Nonnull SEL)selector; + +// Ungegister an object from observers of network status change. ++ (void)unregisterObserver:(_Nonnull id)observer; + @end diff --git a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m index c8e10dd75f..7f31c7e23e 100644 --- a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m +++ b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m @@ -18,175 +18,74 @@ #import "GRPCConnectivityMonitor.h" -#pragma mark Flags +#include <netinet/in.h> -@implementation GRPCReachabilityFlags { - SCNetworkReachabilityFlags _flags; -} +NSString *kGRPCConnectivityNotification = @"kGRPCConnectivityNotification"; -+ (instancetype)flagsWithFlags:(SCNetworkReachabilityFlags)flags { - return [[self alloc] initWithFlags:flags]; -} +static SCNetworkReachabilityRef reachability; +static GRPCConnectivityStatus currentStatus; -- (instancetype)initWithFlags:(SCNetworkReachabilityFlags)flags { - if ((self = [super init])) { - _flags = flags; +// Aggregate information in flags into network status. +GRPCConnectivityStatus CalculateConnectivityStatus(SCNetworkReachabilityFlags flags) { + GRPCConnectivityStatus result = GRPCConnectivityUnknown; + if (((flags & kSCNetworkReachabilityFlagsReachable) == 0) || + ((flags & kSCNetworkReachabilityFlagsConnectionRequired) != 0)) { + return GRPCConnectivityNoNetwork; } - return self; -} - -/* - * One accessor method implementation per flag. Example: - -- (BOOL)isCell { \ - return !!(_flags & kSCNetworkReachabilityFlagsIsWWAN); \ -} - - */ -#define GRPC_XMACRO_ITEM(methodName, FlagName) \ -- (BOOL)methodName { \ - return !!(_flags & kSCNetworkReachabilityFlags ## FlagName); \ -} -#include "GRPCReachabilityFlagNames.xmacro.h" -#undef GRPC_XMACRO_ITEM - -- (BOOL)isHostReachable { - // Note: connectionOnDemand means it'll be reachable only if using the CFSocketStream API or APIs - // on top of it. - // connectionRequired means we can't tell until a connection is attempted (e.g. for VPN on - // demand). - return self.reachable && !self.interventionRequired && !self.connectionOnDemand; -} - -- (NSString *)description { - NSMutableArray *activeOptions = [NSMutableArray arrayWithCapacity:9]; - - /* - * For each flag, add its name to the array if it's ON. Example: - - if (self.isCell) { - [activeOptions addObject:@"isCell"]; + result = GRPCConnectivityWiFi; +#if TARGET_OS_IPHONE + if (flags & kSCNetworkReachabilityFlagsIsWWAN) { + return result = GRPCConnectivityCellular; } - - */ - #define GRPC_XMACRO_ITEM(methodName, FlagName) \ - if (self.methodName) { \ - [activeOptions addObject:@ #methodName]; \ - } - #include "GRPCReachabilityFlagNames.xmacro.h" - #undef GRPC_XMACRO_ITEM - - return activeOptions.count == 0 ? @"(none)" : [activeOptions componentsJoinedByString:@", "]; -} - -- (BOOL)isEqual:(id)object { - return [object isKindOfClass:[GRPCReachabilityFlags class]] && - _flags == ((GRPCReachabilityFlags *)object)->_flags; -} - -- (NSUInteger)hash { - return _flags; -} -@end - -#pragma mark Connectivity Monitor - -// Assumes the third argument is a block that accepts a GRPCReachabilityFlags object, and passes the -// received ones to it. -static void PassFlagsToContextInfoBlock(SCNetworkReachabilityRef target, - SCNetworkReachabilityFlags flags, - void *info) { - #pragma unused (target) - // This can be called many times with the same info. The info is retained by SCNetworkReachability - // while this function is being executed. - void (^handler)(GRPCReachabilityFlags *) = (__bridge void (^)(GRPCReachabilityFlags *))info; - handler([[GRPCReachabilityFlags alloc] initWithFlags:flags]); +#endif + return result; } -@implementation GRPCConnectivityMonitor { - SCNetworkReachabilityRef _reachabilityRef; - GRPCReachabilityFlags *_previousReachabilityFlags; -} +static void ReachabilityCallback( + SCNetworkReachabilityRef target, SCNetworkReachabilityFlags flags, void* info) { + GRPCConnectivityStatus newStatus = CalculateConnectivityStatus(flags); -- (nullable instancetype)initWithReachability:(nullable SCNetworkReachabilityRef)reachability { - if (!reachability) { - return nil; + if (newStatus != currentStatus) { + [[NSNotificationCenter defaultCenter] postNotificationName:kGRPCConnectivityNotification + object:nil]; + currentStatus = newStatus; } - if ((self = [super init])) { - _reachabilityRef = CFRetain(reachability); - _queue = dispatch_get_main_queue(); - _previousReachabilityFlags = nil; - } - return self; } -+ (nullable instancetype)monitorWithHost:(nonnull NSString *)host { - const char *hostName = host.UTF8String; - if (!hostName) { - [NSException raise:NSInvalidArgumentException - format:@"host.UTF8String returns NULL for %@", host]; - } - SCNetworkReachabilityRef reachability = - SCNetworkReachabilityCreateWithName(NULL, hostName); +@implementation GRPCConnectivityMonitor - GRPCConnectivityMonitor *returnValue = [[self alloc] initWithReachability:reachability]; - if (reachability) { - CFRelease(reachability); - } - return returnValue; -} ++ (void)initialize { + if (self == [GRPCConnectivityMonitor self]) { + struct sockaddr_in addr = {0}; + addr.sin_len = sizeof(addr); + addr.sin_family = AF_INET; + reachability = SCNetworkReachabilityCreateWithAddress(NULL, (struct sockaddr *)&addr); + currentStatus = GRPCConnectivityUnknown; -- (void)handleLossWithHandler:(nullable void (^)(void))lossHandler - wifiStatusChangeHandler:(nullable void (^)(void))wifiStatusChangeHandler { - __weak typeof(self) weakSelf = self; - [self startListeningWithHandler:^(GRPCReachabilityFlags *flags) { - typeof(self) strongSelf = weakSelf; - if (strongSelf) { - if (lossHandler && !flags.reachable) { - lossHandler(); -#if TARGET_OS_IPHONE - } else if (wifiStatusChangeHandler && - strongSelf->_previousReachabilityFlags && - (flags.isWWAN ^ - strongSelf->_previousReachabilityFlags.isWWAN)) { - wifiStatusChangeHandler(); -#endif - } - strongSelf->_previousReachabilityFlags = flags; + SCNetworkConnectionFlags flags; + if (SCNetworkReachabilityGetFlags(reachability, &flags)) { + currentStatus = CalculateConnectivityStatus(flags); } - }]; -} -- (void)startListeningWithHandler:(void (^)(GRPCReachabilityFlags *))handler { - // Copy to ensure the handler block is in the heap (and so can't be deallocated when this method - // returns). - void (^copiedHandler)(GRPCReachabilityFlags *) = [handler copy]; - SCNetworkReachabilityContext context = { - .version = 0, - .info = (__bridge void *)copiedHandler, - .retain = CFRetain, - .release = CFRelease, - }; - // The following will retain context.info, and release it when the callback is set to NULL. - SCNetworkReachabilitySetCallback(_reachabilityRef, PassFlagsToContextInfoBlock, &context); - SCNetworkReachabilitySetDispatchQueue(_reachabilityRef, _queue); -} - -- (void)stopListening { - // This releases the block on context.info. - SCNetworkReachabilitySetCallback(_reachabilityRef, NULL, NULL); - SCNetworkReachabilitySetDispatchQueue(_reachabilityRef, NULL); + SCNetworkReachabilityContext context = {0, (__bridge void *)(self), NULL, NULL, NULL}; + if (!SCNetworkReachabilitySetCallback(reachability, ReachabilityCallback, &context) || + !SCNetworkReachabilityScheduleWithRunLoop( + reachability, CFRunLoopGetMain(), kCFRunLoopCommonModes)) { + NSLog(@"gRPC connectivity monitor fail to set"); + } + } } -- (void)setQueue:(dispatch_queue_t)queue { - _queue = queue ?: dispatch_get_main_queue(); ++ (void)registerObserver:(_Nonnull id)observer + selector:(SEL)selector { + [[NSNotificationCenter defaultCenter] addObserver:observer + selector:selector + name:kGRPCConnectivityNotification + object:nil]; } -- (void)dealloc { - if (_reachabilityRef) { - [self stopListening]; - CFRelease(_reachabilityRef); - } ++ (void)unregisterObserver:(_Nonnull id)observer { + [[NSNotificationCenter defaultCenter] removeObserver:observer]; } @end diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m index 71b57cf1f6..8568e334dd 100644 --- a/src/objective-c/GRPCClient/private/GRPCHost.m +++ b/src/objective-c/GRPCClient/private/GRPCHost.m @@ -37,12 +37,6 @@ NS_ASSUME_NONNULL_BEGIN static NSMutableDictionary *kHostCache; -// This connectivity monitor flushes the host cache when connectivity status -// changes or when connection switch between Wifi and Cellular data, so that a -// new call will use a new channel. Otherwise, a new call will still use the -// cached channel which is no longer available and will cause gRPC to hang. -static GRPCConnectivityMonitor *connectivityMonitor = nil; - @implementation GRPCHost { // TODO(mlumish): Investigate whether caching channels with strong links is a good idea. GRPCChannel *_channel; @@ -90,17 +84,7 @@ static GRPCConnectivityMonitor *connectivityMonitor = nil; kHostCache[address] = self; _compressAlgorithm = GRPC_COMPRESS_NONE; } - // Keep a single monitor to flush the cache if the connectivity status changes - // Thread safety guarded by @synchronized(kHostCache) - if (!connectivityMonitor) { - connectivityMonitor = - [GRPCConnectivityMonitor monitorWithHost:hostURL.host]; - void (^handler)(void) = ^{ - [GRPCHost flushChannelCache]; - }; - [connectivityMonitor handleLossWithHandler:handler - wifiStatusChangeHandler:handler]; - } + [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChange:)]; } return self; } @@ -281,6 +265,13 @@ static GRPCConnectivityMonitor *connectivityMonitor = nil; } } +// Flushes the host cache when connectivity status changes or when connection switch between Wifi +// and Cellular data, so that a new call will use a new channel. Otherwise, a new call will still +// use the cached channel which is no longer available and will cause gRPC to hang. +- (void)connectivityChange:(NSNotification *)note { + [GRPCHost flushChannelCache]; +} + @end NS_ASSUME_NONNULL_END |