diff options
-rw-r--r-- | src/compiler/objective_c_generator.cc | 104 | ||||
-rw-r--r-- | src/compiler/objective_c_generator.h | 10 | ||||
-rw-r--r-- | src/compiler/objective_c_generator_helpers.h | 40 | ||||
-rw-r--r-- | src/compiler/objective_c_plugin.cc | 145 | ||||
-rw-r--r-- | src/core/lib/security/credentials/fake/fake_credentials.cc | 3 | ||||
-rw-r--r-- | src/core/lib/security/credentials/fake/fake_credentials.h | 3 | ||||
-rw-r--r-- | src/core/lib/security/security_connector/security_connector.cc | 9 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/GRPCCall.m | 136 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h | 58 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m | 199 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/private/GRPCHost.m | 25 | ||||
-rw-r--r-- | test/cpp/end2end/grpclb_end2end_test.cc | 66 |
12 files changed, 419 insertions, 379 deletions
diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc index ab7d869758..ffdeb8f6b0 100644 --- a/src/compiler/objective_c_generator.cc +++ b/src/compiler/objective_c_generator.cc @@ -212,37 +212,49 @@ void PrintMethodImplementations(Printer* printer, return output; } -::grpc::string GetHeader(const ServiceDescriptor* service) { +::grpc::string GetProtocol(const ServiceDescriptor* service) { ::grpc::string output; - { - // Scope the output stream so it closes and finalizes output to the string. - grpc::protobuf::io::StringOutputStream output_stream(&output); - Printer printer(&output_stream, '$'); - - map< ::grpc::string, ::grpc::string> vars = { - {"service_class", ServiceClassName(service)}}; - printer.Print(vars, "@protocol $service_class$ <NSObject>\n\n"); + // Scope the output stream so it closes and finalizes output to the string. + grpc::protobuf::io::StringOutputStream output_stream(&output); + Printer printer(&output_stream, '$'); - for (int i = 0; i < service->method_count(); i++) { - PrintMethodDeclarations(&printer, service->method(i)); - } - printer.Print("@end\n\n"); + map< ::grpc::string, ::grpc::string> vars = { + {"service_class", ServiceClassName(service)}}; - printer.Print( - "/**\n" - " * Basic service implementation, over gRPC, that only does\n" - " * marshalling and parsing.\n" - " */\n"); - printer.Print(vars, - "@interface $service_class$ :" - " GRPCProtoService<$service_class$>\n"); - printer.Print( - "- (instancetype)initWithHost:(NSString *)host" - " NS_DESIGNATED_INITIALIZER;\n"); - printer.Print("+ (instancetype)serviceWithHost:(NSString *)host;\n"); - printer.Print("@end\n"); + printer.Print(vars, "@protocol $service_class$ <NSObject>\n\n"); + for (int i = 0; i < service->method_count(); i++) { + PrintMethodDeclarations(&printer, service->method(i)); } + printer.Print("@end\n\n"); + + return output; +} + +::grpc::string GetInterface(const ServiceDescriptor* service) { + ::grpc::string output; + + // Scope the output stream so it closes and finalizes output to the string. + grpc::protobuf::io::StringOutputStream output_stream(&output); + Printer printer(&output_stream, '$'); + + map< ::grpc::string, ::grpc::string> vars = { + {"service_class", ServiceClassName(service)}}; + + printer.Print(vars, + "/**\n" + " * Basic service implementation, over gRPC, that only does\n" + " * marshalling and parsing.\n" + " */\n"); + printer.Print(vars, + "@interface $service_class$ :" + " GRPCProtoService<$service_class$>\n"); + printer.Print( + "- (instancetype)initWithHost:(NSString *)host" + " NS_DESIGNATED_INITIALIZER;\n"); + printer.Print("+ (instancetype)serviceWithHost:(NSString *)host;\n"); + printer.Print("@end\n"); + return output; } @@ -258,26 +270,32 @@ void PrintMethodImplementations(Printer* printer, {"service_class", ServiceClassName(service)}, {"package", service->file()->package()}}; - printer.Print(vars, "@implementation $service_class$\n\n"); + printer.Print(vars, + "@implementation $service_class$\n\n" + "// Designated initializer\n" + "- (instancetype)initWithHost:(NSString *)host {\n" + " self = [super initWithHost:host\n" + " packageName:@\"$package$\"\n" + " serviceName:@\"$service_name$\"];\n" + " return self;\n" + "}\n\n"); - printer.Print("// Designated initializer\n"); - printer.Print("- (instancetype)initWithHost:(NSString *)host {\n"); - printer.Print( - vars, - " return (self = [super initWithHost:host" - " packageName:@\"$package$\" serviceName:@\"$service_name$\"]);\n"); - printer.Print("}\n\n"); printer.Print( "// Override superclass initializer to disallow different" - " package and service names.\n"); - printer.Print("- (instancetype)initWithHost:(NSString *)host\n"); - printer.Print(" packageName:(NSString *)packageName\n"); - printer.Print(" serviceName:(NSString *)serviceName {\n"); - printer.Print(" return [self initWithHost:host];\n"); - printer.Print("}\n\n"); - printer.Print("+ (instancetype)serviceWithHost:(NSString *)host {\n"); - printer.Print(" return [[self alloc] initWithHost:host];\n"); - printer.Print("}\n\n\n"); + " package and service names.\n" + "- (instancetype)initWithHost:(NSString *)host\n" + " packageName:(NSString *)packageName\n" + " serviceName:(NSString *)serviceName {\n" + " return [self initWithHost:host];\n" + "}\n\n"); + + printer.Print( + "#pragma mark - Class Methods\n\n" + "+ (instancetype)serviceWithHost:(NSString *)host {\n" + " return [[self alloc] initWithHost:host];\n" + "}\n\n"); + + printer.Print("#pragma mark - Method Implementations\n\n"); for (int i = 0; i < service->method_count(); i++) { PrintMethodImplementations(&printer, service->method(i)); diff --git a/src/compiler/objective_c_generator.h b/src/compiler/objective_c_generator.h index d3aed76c4f..eb1c7ff005 100644 --- a/src/compiler/objective_c_generator.h +++ b/src/compiler/objective_c_generator.h @@ -31,9 +31,13 @@ using ::grpc::string; // Returns forward declaration of classes in the generated header file. string GetAllMessageClasses(const FileDescriptor* file); -// Returns the content to be included in the "global_scope" insertion point of -// the generated header file. -string GetHeader(const ServiceDescriptor* service); +// Returns the content to be included defining the @protocol segment at the +// insertion point of the generated implementation file. +string GetProtocol(const ServiceDescriptor* service); + +// Returns the content to be included defining the @interface segment at the +// insertion point of the generated implementation file. +string GetInterface(const ServiceDescriptor* service); // Returns the content to be included in the "global_scope" insertion point of // the generated implementation file. diff --git a/src/compiler/objective_c_generator_helpers.h b/src/compiler/objective_c_generator_helpers.h index 4004e6aef8..a284da97f4 100644 --- a/src/compiler/objective_c_generator_helpers.h +++ b/src/compiler/objective_c_generator_helpers.h @@ -40,5 +40,45 @@ inline string ServiceClassName(const ServiceDescriptor* service) { string prefix = file->options().objc_class_prefix(); return prefix + service->name(); } + +inline ::grpc::string LocalImport(const ::grpc::string& import) { + return ::grpc::string("#import \"" + import + "\"\n"); +} + +inline ::grpc::string SystemImport(const ::grpc::string& import) { + return ::grpc::string("#import <" + import + ">\n"); +} + +inline ::grpc::string PreprocConditional(::grpc::string symbol, bool invert) { + return invert ? "!defined(" + symbol + ") || !" + symbol + : "defined(" + symbol + ") && " + symbol; +} + +inline ::grpc::string PreprocIf(const ::grpc::string& symbol, + const ::grpc::string& if_true) { + return ::grpc::string("#if " + PreprocConditional(symbol, false) + "\n" + + if_true + "#endif\n"); +} + +inline ::grpc::string PreprocIfNot(const ::grpc::string& symbol, + const ::grpc::string& if_true) { + return ::grpc::string("#if " + PreprocConditional(symbol, true) + "\n" + + if_true + "#endif\n"); +} + +inline ::grpc::string PreprocIfElse(const ::grpc::string& symbol, + const ::grpc::string& if_true, + const ::grpc::string& if_false) { + return ::grpc::string("#if " + PreprocConditional(symbol, false) + "\n" + + if_true + "#else\n" + if_false + "#endif\n"); +} + +inline ::grpc::string PreprocIfNotElse(const ::grpc::string& symbol, + const ::grpc::string& if_true, + const ::grpc::string& if_false) { + return ::grpc::string("#if " + PreprocConditional(symbol, true) + "\n" + + if_true + "#else\n" + if_false + "#endif\n"); +} + } // namespace grpc_objective_c_generator #endif // GRPC_INTERNAL_COMPILER_OBJECTIVE_C_GENERATOR_HELPERS_H diff --git a/src/compiler/objective_c_plugin.cc b/src/compiler/objective_c_plugin.cc index d5d488e84d..76703d79cd 100644 --- a/src/compiler/objective_c_plugin.cc +++ b/src/compiler/objective_c_plugin.cc @@ -29,12 +29,42 @@ using ::google::protobuf::compiler::objectivec:: IsProtobufLibraryBundledProtoFile; using ::google::protobuf::compiler::objectivec::ProtobufLibraryFrameworkName; +using ::grpc_objective_c_generator::LocalImport; +using ::grpc_objective_c_generator::PreprocIfElse; +using ::grpc_objective_c_generator::PreprocIfNot; +using ::grpc_objective_c_generator::SystemImport; + +namespace { + +inline ::grpc::string ImportProtoHeaders( + const grpc::protobuf::FileDescriptor* dep, const char* indent) { + ::grpc::string header = grpc_objective_c_generator::MessageHeaderName(dep); + + if (!IsProtobufLibraryBundledProtoFile(dep)) { + return indent + LocalImport(header); + } + + ::grpc::string base_name = header; + grpc_generator::StripPrefix(&base_name, "google/protobuf/"); + // create the import code snippet + ::grpc::string framework_header = + ::grpc::string(ProtobufLibraryFrameworkName) + "/" + base_name; + + static const ::grpc::string kFrameworkImportsCondition = + "GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS"; + return PreprocIfElse(kFrameworkImportsCondition, + indent + SystemImport(framework_header), + indent + LocalImport(header)); +} + +} // namespace class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { public: ObjectiveCGrpcGenerator() {} virtual ~ObjectiveCGrpcGenerator() {} + public: virtual bool Generate(const grpc::protobuf::FileDescriptor* file, const ::grpc::string& parameter, grpc::protobuf::compiler::GeneratorContext* context, @@ -44,97 +74,68 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { return true; } + static const ::grpc::string kNonNullBegin = "NS_ASSUME_NONNULL_BEGIN\n"; + static const ::grpc::string kNonNullEnd = "NS_ASSUME_NONNULL_END\n"; + static const ::grpc::string kProtocolOnly = "GPB_GRPC_PROTOCOL_ONLY"; + static const ::grpc::string kForwardDeclare = + "GPB_GRPC_FORWARD_DECLARE_MESSAGE_PROTO"; + ::grpc::string file_name = google::protobuf::compiler::objectivec::FilePath(file); - ::grpc::string prefix = file->options().objc_class_prefix(); { // Generate .pbrpc.h - ::grpc::string imports = - ::grpc::string("#if !GPB_GRPC_FORWARD_DECLARE_MESSAGE_PROTO\n") + - "#import \"" + file_name + - ".pbobjc.h\"\n" - "#endif\n\n" - "#import <ProtoRPC/ProtoService.h>\n" - "#import <ProtoRPC/ProtoRPC.h>\n" - "#import <RxLibrary/GRXWriteable.h>\n" - "#import <RxLibrary/GRXWriter.h>\n"; - - ::grpc::string proto_imports; - proto_imports += "#if GPB_GRPC_FORWARD_DECLARE_MESSAGE_PROTO\n" + - grpc_objective_c_generator::GetAllMessageClasses(file) + - "#else\n"; + ::grpc::string imports = LocalImport(file_name + ".pbobjc.h"); + + ::grpc::string system_imports = SystemImport("ProtoRPC/ProtoService.h") + + SystemImport("ProtoRPC/ProtoRPC.h") + + SystemImport("RxLibrary/GRXWriteable.h") + + SystemImport("RxLibrary/GRXWriter.h"); + + ::grpc::string forward_declarations = "@class GRPCProtoCall;\n\n"; + + ::grpc::string class_declarations = + grpc_objective_c_generator::GetAllMessageClasses(file); + + ::grpc::string class_imports; for (int i = 0; i < file->dependency_count(); i++) { - ::grpc::string header = - grpc_objective_c_generator::MessageHeaderName(file->dependency(i)); - const grpc::protobuf::FileDescriptor* dependency = file->dependency(i); - if (IsProtobufLibraryBundledProtoFile(dependency)) { - ::grpc::string base_name = header; - grpc_generator::StripPrefix(&base_name, "google/protobuf/"); - // create the import code snippet - proto_imports += - " #if GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS\n" - " #import <" + - ::grpc::string(ProtobufLibraryFrameworkName) + "/" + base_name + - ">\n" - " #else\n" - " #import \"" + - header + - "\"\n" - " #endif\n"; - } else { - proto_imports += ::grpc::string(" #import \"") + header + "\"\n"; - } + class_imports += ImportProtoHeaders(file->dependency(i), " "); } - proto_imports += "#endif\n"; - ::grpc::string declarations; + ::grpc::string protocols; for (int i = 0; i < file->service_count(); i++) { const grpc::protobuf::ServiceDescriptor* service = file->service(i); - declarations += grpc_objective_c_generator::GetHeader(service); + protocols += grpc_objective_c_generator::GetProtocol(service); } - static const ::grpc::string kNonNullBegin = - "\nNS_ASSUME_NONNULL_BEGIN\n\n"; - static const ::grpc::string kNonNullEnd = "\nNS_ASSUME_NONNULL_END\n"; + ::grpc::string interfaces; + for (int i = 0; i < file->service_count(); i++) { + const grpc::protobuf::ServiceDescriptor* service = file->service(i); + interfaces += grpc_objective_c_generator::GetInterface(service); + } Write(context, file_name + ".pbrpc.h", - imports + '\n' + proto_imports + '\n' + kNonNullBegin + - declarations + kNonNullEnd); + PreprocIfNot(kForwardDeclare, imports) + "\n" + + PreprocIfNot(kProtocolOnly, system_imports) + "\n" + + PreprocIfElse(kForwardDeclare, class_declarations, + class_imports) + + "\n" + forward_declarations + "\n" + kNonNullBegin + "\n" + + protocols + "\n" + PreprocIfNot(kProtocolOnly, interfaces) + + "\n" + kNonNullEnd + "\n"); } { // Generate .pbrpc.m - ::grpc::string imports = ::grpc::string("#import \"") + file_name + - ".pbrpc.h\"\n" - "#import \"" + - file_name + - ".pbobjc.h\"\n\n" - "#import <ProtoRPC/ProtoRPC.h>\n" - "#import <RxLibrary/GRXWriter+Immediate.h>\n"; + ::grpc::string imports = LocalImport(file_name + ".pbrpc.h") + + LocalImport(file_name + ".pbobjc.h") + + SystemImport("ProtoRPC/ProtoRPC.h") + + SystemImport("RxLibrary/GRXWriter+Immediate.h"); + + ::grpc::string class_imports; for (int i = 0; i < file->dependency_count(); i++) { - ::grpc::string header = - grpc_objective_c_generator::MessageHeaderName(file->dependency(i)); - const grpc::protobuf::FileDescriptor* dependency = file->dependency(i); - if (IsProtobufLibraryBundledProtoFile(dependency)) { - ::grpc::string base_name = header; - grpc_generator::StripPrefix(&base_name, "google/protobuf/"); - // create the import code snippet - imports += - "#if GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS\n" - " #import <" + - ::grpc::string(ProtobufLibraryFrameworkName) + "/" + base_name + - ">\n" - "#else\n" - " #import \"" + - header + - "\"\n" - "#endif\n"; - } else { - imports += ::grpc::string("#import \"") + header + "\"\n"; - } + class_imports += ImportProtoHeaders(file->dependency(i), ""); } ::grpc::string definitions; @@ -143,7 +144,9 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { definitions += grpc_objective_c_generator::GetSource(service); } - Write(context, file_name + ".pbrpc.m", imports + '\n' + definitions); + Write(context, file_name + ".pbrpc.m", + PreprocIfNot(kProtocolOnly, + imports + "\n" + class_imports + "\n" + definitions)); } return true; diff --git a/src/core/lib/security/credentials/fake/fake_credentials.cc b/src/core/lib/security/credentials/fake/fake_credentials.cc index 46311fa122..858ab6b41b 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.cc +++ b/src/core/lib/security/credentials/fake/fake_credentials.cc @@ -32,9 +32,6 @@ /* -- Fake transport security credentials. -- */ -#define GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS \ - "grpc.fake_security.expected_targets" - static grpc_security_status fake_transport_security_create_security_connector( grpc_channel_credentials* c, grpc_call_credentials* call_creds, const char* target, const grpc_channel_args* args, diff --git a/src/core/lib/security/credentials/fake/fake_credentials.h b/src/core/lib/security/credentials/fake/fake_credentials.h index 5166e43167..e89e6e24cc 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.h +++ b/src/core/lib/security/credentials/fake/fake_credentials.h @@ -23,6 +23,9 @@ #include "src/core/lib/security/credentials/credentials.h" +#define GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS \ + "grpc.fake_security.expected_targets" + /* -- Fake transport security credentials. -- */ /* Creates a fake transport security credentials object for testing. */ diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc index a57c895374..3cc151bec7 100644 --- a/src/core/lib/security/security_connector/security_connector.cc +++ b/src/core/lib/security/security_connector/security_connector.cc @@ -463,6 +463,15 @@ static bool fake_channel_check_call_host(grpc_channel_security_connector* sc, grpc_auth_context* auth_context, grpc_closure* on_call_host_checked, grpc_error** error) { + grpc_fake_channel_security_connector* c = + reinterpret_cast<grpc_fake_channel_security_connector*>(sc); + if (c->is_lb_channel) { + // TODO(dgq): verify that the host (ie, authority header) matches that of + // the LB, as opposed to that of the backends. + } else { + // TODO(dgq): verify that the host (ie, authority header) matches that of + // the backend, not the LB's. + } return true; } diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index ac4596da25..02492607cd 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -108,6 +108,9 @@ static NSString * const kBearerPrefix = @"Bearer "; // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch // queue dispatch_queue_t _responseQueue; + + // Whether the call is finished. If it is, should not call finishWithError again. + BOOL _finished; } @synthesize state = _state; @@ -206,6 +209,8 @@ static NSString * const kBearerPrefix = @"Bearer "; } else { [_responseWriteable enqueueSuccessfulCompletion]; } + + [GRPCConnectivityMonitor unregisterObserver:self]; } - (void)cancelCall { @@ -214,9 +219,10 @@ static NSString * const kBearerPrefix = @"Bearer "; } - (void)cancel { - [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeCancelled - userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]]; + [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeCancelled + userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]]; + if (!self.isWaitingForToken) { [self cancelCall]; } else { @@ -224,6 +230,19 @@ static NSString * const kBearerPrefix = @"Bearer "; } } +- (void)maybeFinishWithError:(NSError *)errorOrNil { + BOOL toFinish = NO; + @synchronized(self) { + if (_finished == NO) { + _finished = YES; + toFinish = YES; + } + } + if (toFinish == YES) { + [self finishWithError:errorOrNil]; + } +} + - (void)dealloc { __block GRPCWrappedCall *wrappedCall = _wrappedCall; dispatch_async(_callQueue, ^{ @@ -250,11 +269,13 @@ static NSString * const kBearerPrefix = @"Bearer "; if (self.state == GRXWriterStatePaused) { return; } - __weak GRPCCall *weakSelf = self; - __weak GRXConcurrentWriteable *weakWriteable = _responseWriteable; dispatch_async(_callQueue, ^{ - [weakSelf startReadWithHandler:^(grpc_byte_buffer *message) { + __weak GRPCCall *weakSelf = self; + __weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable; + [self startReadWithHandler:^(grpc_byte_buffer *message) { + __strong GRPCCall *strongSelf = weakSelf; + __strong GRXConcurrentWriteable *strongWriteable = weakWriteable; if (message == NULL) { // No more messages from the server return; @@ -266,14 +287,14 @@ static NSString * const kBearerPrefix = @"Bearer "; // don't want to throw, because the app shouldn't crash for a behavior // that's on the hands of any server to have. Instead we finish and ask // the server to cancel. - [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeResourceExhausted - userInfo:@{NSLocalizedDescriptionKey: @"Client does not have enough memory to hold the server response."}]]; - [weakSelf cancelCall]; + [strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeResourceExhausted + userInfo:@{NSLocalizedDescriptionKey: @"Client does not have enough memory to hold the server response."}]]; + [strongSelf cancelCall]; return; } - [weakWriteable enqueueValue:data completionHandler:^{ - [weakSelf startNextRead]; + [strongWriteable enqueueValue:data completionHandler:^{ + [strongSelf startNextRead]; }]; }]; }); @@ -333,12 +354,17 @@ static NSString * const kBearerPrefix = @"Bearer "; _requestWriter.state = GRXWriterStatePaused; } - __weak GRPCCall *weakSelf = self; dispatch_async(_callQueue, ^{ - [weakSelf writeMessage:value withErrorHandler:^{ - [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeInternal - userInfo:nil]]; + __weak GRPCCall *weakSelf = self; + [self writeMessage:value withErrorHandler:^{ + __strong GRPCCall *strongSelf = weakSelf; + if (strongSelf != nil) { + [strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeInternal + userInfo:nil]]; + // Wrapped call must be canceled when error is reported to upper layers + [strongSelf cancelCall]; + } }]; }); } @@ -360,12 +386,15 @@ static NSString * const kBearerPrefix = @"Bearer "; if (errorOrNil) { [self cancel]; } else { - __weak GRPCCall *weakSelf = self; dispatch_async(_callQueue, ^{ - [weakSelf finishRequestWithErrorHandler:^{ - [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeInternal - userInfo:nil]]; + __weak GRPCCall *weakSelf = self; + [self finishRequestWithErrorHandler:^{ + __strong GRPCCall *strongSelf = weakSelf; + [strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeInternal + userInfo:nil]]; + // Wrapped call must be canceled when error is reported to upper layers + [strongSelf cancelCall]; }]; }); } @@ -387,30 +416,37 @@ static NSString * const kBearerPrefix = @"Bearer "; } - (void)invokeCall { + __weak GRPCCall *weakSelf = self; [self invokeCallWithHeadersHandler:^(NSDictionary *headers) { // Response headers received. - self.responseHeaders = headers; - [self startNextRead]; + __strong GRPCCall *strongSelf = weakSelf; + if (strongSelf) { + strongSelf.responseHeaders = headers; + [strongSelf startNextRead]; + } } completionHandler:^(NSError *error, NSDictionary *trailers) { - self.responseTrailers = trailers; + __strong GRPCCall *strongSelf = weakSelf; + if (strongSelf) { + strongSelf.responseTrailers = trailers; - if (error) { - NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; - if (error.userInfo) { - [userInfo addEntriesFromDictionary:error.userInfo]; - } - userInfo[kGRPCTrailersKey] = self.responseTrailers; - // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be - // called before this one, so an error might end up with trailers but no headers. We - // shouldn't call finishWithError until ater both blocks are called. It is also when this is - // done that we can provide a merged view of response headers and trailers in a thread-safe - // way. - if (self.responseHeaders) { - userInfo[kGRPCHeadersKey] = self.responseHeaders; + if (error) { + NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; + if (error.userInfo) { + [userInfo addEntriesFromDictionary:error.userInfo]; + } + userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers; + // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be + // called before this one, so an error might end up with trailers but no headers. We + // shouldn't call finishWithError until ater both blocks are called. It is also when this is + // done that we can provide a merged view of response headers and trailers in a thread-safe + // way. + if (strongSelf.responseHeaders) { + userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; + } + error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; } - error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; + [strongSelf maybeFinishWithError:error]; } - [self finishWithError:error]; }]; // Now that the RPC has been initiated, request writes can start. @synchronized(_requestWriter) { @@ -439,16 +475,8 @@ static NSString * const kBearerPrefix = @"Bearer "; // TODO(jcanizales): Check this on init. [NSException raise:NSInvalidArgumentException format:@"host of %@ is nil", _host]; } - _connectivityMonitor = [GRPCConnectivityMonitor monitorWithHost:host]; - __weak typeof(self) weakSelf = self; - void (^handler)(void) = ^{ - typeof(self) strongSelf = weakSelf; - [strongSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeUnavailable - userInfo:@{ NSLocalizedDescriptionKey : @"Connectivity lost." }]]; - }; - [_connectivityMonitor handleLossWithHandler:handler - wifiStatusChangeHandler:nil]; + [GRPCConnectivityMonitor registerObserver:self + selector:@selector(connectivityChanged:)]; } - (void)startWithWriteable:(id<GRXWriteable>)writeable { @@ -512,4 +540,12 @@ static NSString * const kBearerPrefix = @"Bearer "; } } +- (void)connectivityChanged:(NSNotification *)note { + [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ NSLocalizedDescriptionKey : @"Connectivity lost." }]]; + // Cancel underlying call upon this notification + [self cancelCall]; +} + @end diff --git a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h index cb55e46d70..394d21792d 100644 --- a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h +++ b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h @@ -19,44 +19,30 @@ #import <Foundation/Foundation.h> #import <SystemConfiguration/SystemConfiguration.h> -@interface GRPCReachabilityFlags : NSObject - -+ (nonnull instancetype)flagsWithFlags:(SCNetworkReachabilityFlags)flags; - -/** - * One accessor method to query each of the different flags. Example: - -@property(nonatomic, readonly) BOOL isCell; - - */ -#define GRPC_XMACRO_ITEM(methodName, FlagName) \ -@property(nonatomic, readonly) BOOL methodName; - -#include "GRPCReachabilityFlagNames.xmacro.h" -#undef GRPC_XMACRO_ITEM - -@property(nonatomic, readonly) BOOL isHostReachable; -@end - +typedef NS_ENUM(NSInteger, GRPCConnectivityStatus) { + GRPCConnectivityUnknown = 0, + GRPCConnectivityNoNetwork = 1, + GRPCConnectivityCellular = 2, + GRPCConnectivityWiFi = 3, +}; + +extern NSString * _Nonnull kGRPCConnectivityNotification; + +// This interface monitors OS reachability interface for any network status +// change. Parties interested in these events should register themselves as +// observer. @interface GRPCConnectivityMonitor : NSObject -+ (nullable instancetype)monitorWithHost:(nonnull NSString *)hostName; - - (nonnull instancetype)init NS_UNAVAILABLE; -/** - * Queue on which callbacks will be dispatched. Default is the main queue. Set it before calling - * handleLossWithHandler:. - */ -// TODO(jcanizales): Default to a serial background queue instead. -@property(nonatomic, strong, null_resettable) dispatch_queue_t queue; - -/** - * Calls handler every time the connectivity to this instance's host is lost. If this instance is - * released before that happens, the handler won't be called. - * Only one handler is active at a time, so if this method is called again before the previous - * handler has been called, it might never be called at all (or yes, if it has already been queued). - */ -- (void)handleLossWithHandler:(nullable void (^)(void))lossHandler - wifiStatusChangeHandler:(nullable void (^)(void))wifiStatusChangeHandler; +// Register an object as observer of network status change. \a observer +// must have a notification method with one parameter of type +// (NSNotification *) and should pass it to parameter \a selector. The +// parameter of this notification method is not used for now. ++ (void)registerObserver:(_Nonnull id)observer + selector:(_Nonnull SEL)selector; + +// Ungegister an object from observers of network status change. ++ (void)unregisterObserver:(_Nonnull id)observer; + @end diff --git a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m index c8e10dd75f..7f31c7e23e 100644 --- a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m +++ b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m @@ -18,175 +18,74 @@ #import "GRPCConnectivityMonitor.h" -#pragma mark Flags +#include <netinet/in.h> -@implementation GRPCReachabilityFlags { - SCNetworkReachabilityFlags _flags; -} +NSString *kGRPCConnectivityNotification = @"kGRPCConnectivityNotification"; -+ (instancetype)flagsWithFlags:(SCNetworkReachabilityFlags)flags { - return [[self alloc] initWithFlags:flags]; -} +static SCNetworkReachabilityRef reachability; +static GRPCConnectivityStatus currentStatus; -- (instancetype)initWithFlags:(SCNetworkReachabilityFlags)flags { - if ((self = [super init])) { - _flags = flags; +// Aggregate information in flags into network status. +GRPCConnectivityStatus CalculateConnectivityStatus(SCNetworkReachabilityFlags flags) { + GRPCConnectivityStatus result = GRPCConnectivityUnknown; + if (((flags & kSCNetworkReachabilityFlagsReachable) == 0) || + ((flags & kSCNetworkReachabilityFlagsConnectionRequired) != 0)) { + return GRPCConnectivityNoNetwork; } - return self; -} - -/* - * One accessor method implementation per flag. Example: - -- (BOOL)isCell { \ - return !!(_flags & kSCNetworkReachabilityFlagsIsWWAN); \ -} - - */ -#define GRPC_XMACRO_ITEM(methodName, FlagName) \ -- (BOOL)methodName { \ - return !!(_flags & kSCNetworkReachabilityFlags ## FlagName); \ -} -#include "GRPCReachabilityFlagNames.xmacro.h" -#undef GRPC_XMACRO_ITEM - -- (BOOL)isHostReachable { - // Note: connectionOnDemand means it'll be reachable only if using the CFSocketStream API or APIs - // on top of it. - // connectionRequired means we can't tell until a connection is attempted (e.g. for VPN on - // demand). - return self.reachable && !self.interventionRequired && !self.connectionOnDemand; -} - -- (NSString *)description { - NSMutableArray *activeOptions = [NSMutableArray arrayWithCapacity:9]; - - /* - * For each flag, add its name to the array if it's ON. Example: - - if (self.isCell) { - [activeOptions addObject:@"isCell"]; + result = GRPCConnectivityWiFi; +#if TARGET_OS_IPHONE + if (flags & kSCNetworkReachabilityFlagsIsWWAN) { + return result = GRPCConnectivityCellular; } - - */ - #define GRPC_XMACRO_ITEM(methodName, FlagName) \ - if (self.methodName) { \ - [activeOptions addObject:@ #methodName]; \ - } - #include "GRPCReachabilityFlagNames.xmacro.h" - #undef GRPC_XMACRO_ITEM - - return activeOptions.count == 0 ? @"(none)" : [activeOptions componentsJoinedByString:@", "]; -} - -- (BOOL)isEqual:(id)object { - return [object isKindOfClass:[GRPCReachabilityFlags class]] && - _flags == ((GRPCReachabilityFlags *)object)->_flags; -} - -- (NSUInteger)hash { - return _flags; -} -@end - -#pragma mark Connectivity Monitor - -// Assumes the third argument is a block that accepts a GRPCReachabilityFlags object, and passes the -// received ones to it. -static void PassFlagsToContextInfoBlock(SCNetworkReachabilityRef target, - SCNetworkReachabilityFlags flags, - void *info) { - #pragma unused (target) - // This can be called many times with the same info. The info is retained by SCNetworkReachability - // while this function is being executed. - void (^handler)(GRPCReachabilityFlags *) = (__bridge void (^)(GRPCReachabilityFlags *))info; - handler([[GRPCReachabilityFlags alloc] initWithFlags:flags]); +#endif + return result; } -@implementation GRPCConnectivityMonitor { - SCNetworkReachabilityRef _reachabilityRef; - GRPCReachabilityFlags *_previousReachabilityFlags; -} +static void ReachabilityCallback( + SCNetworkReachabilityRef target, SCNetworkReachabilityFlags flags, void* info) { + GRPCConnectivityStatus newStatus = CalculateConnectivityStatus(flags); -- (nullable instancetype)initWithReachability:(nullable SCNetworkReachabilityRef)reachability { - if (!reachability) { - return nil; + if (newStatus != currentStatus) { + [[NSNotificationCenter defaultCenter] postNotificationName:kGRPCConnectivityNotification + object:nil]; + currentStatus = newStatus; } - if ((self = [super init])) { - _reachabilityRef = CFRetain(reachability); - _queue = dispatch_get_main_queue(); - _previousReachabilityFlags = nil; - } - return self; } -+ (nullable instancetype)monitorWithHost:(nonnull NSString *)host { - const char *hostName = host.UTF8String; - if (!hostName) { - [NSException raise:NSInvalidArgumentException - format:@"host.UTF8String returns NULL for %@", host]; - } - SCNetworkReachabilityRef reachability = - SCNetworkReachabilityCreateWithName(NULL, hostName); +@implementation GRPCConnectivityMonitor - GRPCConnectivityMonitor *returnValue = [[self alloc] initWithReachability:reachability]; - if (reachability) { - CFRelease(reachability); - } - return returnValue; -} ++ (void)initialize { + if (self == [GRPCConnectivityMonitor self]) { + struct sockaddr_in addr = {0}; + addr.sin_len = sizeof(addr); + addr.sin_family = AF_INET; + reachability = SCNetworkReachabilityCreateWithAddress(NULL, (struct sockaddr *)&addr); + currentStatus = GRPCConnectivityUnknown; -- (void)handleLossWithHandler:(nullable void (^)(void))lossHandler - wifiStatusChangeHandler:(nullable void (^)(void))wifiStatusChangeHandler { - __weak typeof(self) weakSelf = self; - [self startListeningWithHandler:^(GRPCReachabilityFlags *flags) { - typeof(self) strongSelf = weakSelf; - if (strongSelf) { - if (lossHandler && !flags.reachable) { - lossHandler(); -#if TARGET_OS_IPHONE - } else if (wifiStatusChangeHandler && - strongSelf->_previousReachabilityFlags && - (flags.isWWAN ^ - strongSelf->_previousReachabilityFlags.isWWAN)) { - wifiStatusChangeHandler(); -#endif - } - strongSelf->_previousReachabilityFlags = flags; + SCNetworkConnectionFlags flags; + if (SCNetworkReachabilityGetFlags(reachability, &flags)) { + currentStatus = CalculateConnectivityStatus(flags); } - }]; -} -- (void)startListeningWithHandler:(void (^)(GRPCReachabilityFlags *))handler { - // Copy to ensure the handler block is in the heap (and so can't be deallocated when this method - // returns). - void (^copiedHandler)(GRPCReachabilityFlags *) = [handler copy]; - SCNetworkReachabilityContext context = { - .version = 0, - .info = (__bridge void *)copiedHandler, - .retain = CFRetain, - .release = CFRelease, - }; - // The following will retain context.info, and release it when the callback is set to NULL. - SCNetworkReachabilitySetCallback(_reachabilityRef, PassFlagsToContextInfoBlock, &context); - SCNetworkReachabilitySetDispatchQueue(_reachabilityRef, _queue); -} - -- (void)stopListening { - // This releases the block on context.info. - SCNetworkReachabilitySetCallback(_reachabilityRef, NULL, NULL); - SCNetworkReachabilitySetDispatchQueue(_reachabilityRef, NULL); + SCNetworkReachabilityContext context = {0, (__bridge void *)(self), NULL, NULL, NULL}; + if (!SCNetworkReachabilitySetCallback(reachability, ReachabilityCallback, &context) || + !SCNetworkReachabilityScheduleWithRunLoop( + reachability, CFRunLoopGetMain(), kCFRunLoopCommonModes)) { + NSLog(@"gRPC connectivity monitor fail to set"); + } + } } -- (void)setQueue:(dispatch_queue_t)queue { - _queue = queue ?: dispatch_get_main_queue(); ++ (void)registerObserver:(_Nonnull id)observer + selector:(SEL)selector { + [[NSNotificationCenter defaultCenter] addObserver:observer + selector:selector + name:kGRPCConnectivityNotification + object:nil]; } -- (void)dealloc { - if (_reachabilityRef) { - [self stopListening]; - CFRelease(_reachabilityRef); - } ++ (void)unregisterObserver:(_Nonnull id)observer { + [[NSNotificationCenter defaultCenter] removeObserver:observer]; } @end diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m index 71b57cf1f6..8568e334dd 100644 --- a/src/objective-c/GRPCClient/private/GRPCHost.m +++ b/src/objective-c/GRPCClient/private/GRPCHost.m @@ -37,12 +37,6 @@ NS_ASSUME_NONNULL_BEGIN static NSMutableDictionary *kHostCache; -// This connectivity monitor flushes the host cache when connectivity status -// changes or when connection switch between Wifi and Cellular data, so that a -// new call will use a new channel. Otherwise, a new call will still use the -// cached channel which is no longer available and will cause gRPC to hang. -static GRPCConnectivityMonitor *connectivityMonitor = nil; - @implementation GRPCHost { // TODO(mlumish): Investigate whether caching channels with strong links is a good idea. GRPCChannel *_channel; @@ -90,17 +84,7 @@ static GRPCConnectivityMonitor *connectivityMonitor = nil; kHostCache[address] = self; _compressAlgorithm = GRPC_COMPRESS_NONE; } - // Keep a single monitor to flush the cache if the connectivity status changes - // Thread safety guarded by @synchronized(kHostCache) - if (!connectivityMonitor) { - connectivityMonitor = - [GRPCConnectivityMonitor monitorWithHost:hostURL.host]; - void (^handler)(void) = ^{ - [GRPCHost flushChannelCache]; - }; - [connectivityMonitor handleLossWithHandler:handler - wifiStatusChangeHandler:handler]; - } + [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChange:)]; } return self; } @@ -281,6 +265,13 @@ static GRPCConnectivityMonitor *connectivityMonitor = nil; } } +// Flushes the host cache when connectivity status changes or when connection switch between Wifi +// and Cellular data, so that a new call will use a new channel. Otherwise, a new call will still +// use the cached channel which is no longer available and will cause gRPC to hang. +- (void)connectivityChange:(NSNotification *)note { + [GRPCHost flushChannelCache]; +} + @end NS_ASSUME_NONNULL_END diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index eb354907f6..fcfe860b1c 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -37,6 +37,10 @@ #include "src/core/lib/gpr/thd.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/security/credentials/fake/fake_credentials.h" +#include "src/cpp/server/secure_server_credentials.h" + +#include "src/cpp/client/secure_credentials.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" @@ -380,15 +384,21 @@ class GrpclbEnd2endTest : public ::testing::Test { SetNextResolution(addresses); } - void ResetStub(int fallback_timeout = 0) { + void ResetStub(int fallback_timeout = 0, grpc::string expected_targets = "") { ChannelArguments args; args.SetGrpclbFallbackTimeout(fallback_timeout); args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_.get()); + if (!expected_targets.empty()) { + args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets); + } std::ostringstream uri; - uri << "fake:///servername_not_used"; - channel_ = - CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args); + uri << "fake:///" << kApplicationTargetName_; + // TODO(dgq): templatize tests to run everything using both secure and + // insecure channel credentials. + std::shared_ptr<ChannelCredentials> creds(new SecureChannelCredentials( + grpc_fake_transport_security_credentials_create())); + channel_ = CreateCustomChannel(uri.str(), creds, args); stub_ = grpc::testing::EchoTestService::NewStub(channel_); } @@ -566,8 +576,9 @@ class GrpclbEnd2endTest : public ::testing::Test { std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; - builder.AddListeningPort(server_address.str(), - InsecureServerCredentials()); + std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials( + grpc_fake_transport_security_server_credentials_create())); + builder.AddListeningPort(server_address.str(), creds); builder.RegisterService(service_); server_ = builder.BuildAndStart(); cond->notify_one(); @@ -600,6 +611,7 @@ class GrpclbEnd2endTest : public ::testing::Test { grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator> response_generator_; const grpc::string kRequestMessage_ = "Live long and prosper."; + const grpc::string kApplicationTargetName_ = "application_target_name"; }; class SingleBalancerTest : public GrpclbEnd2endTest { @@ -635,6 +647,48 @@ TEST_F(SingleBalancerTest, Vanilla) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +TEST_F(SingleBalancerTest, SecureNaming) { + ResetStub(0, kApplicationTargetName_ + ";lb"); + SetNextResolution({AddressData{balancer_servers_[0].port_, true, "lb"}}); + const size_t kNumRpcsPerAddress = 100; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), + 0); + // Make sure that trying to connect works without a call. + channel_->GetState(true /* try_to_connect */); + // We need to wait for all backends to come online. + WaitForAllBackends(); + // Send kNumRpcsPerAddress RPCs per server. + CheckRpcSendOk(kNumRpcsPerAddress * num_backends_); + + // Each backend should have gotten 100 requests. + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(kNumRpcsPerAddress, + backend_servers_[i].service_->request_count()); + } + balancers_[0]->NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + // Check LB policy name for the channel. + EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + +TEST_F(SingleBalancerTest, SecureNamingDeathTest) { + ::testing::FLAGS_gtest_death_test_style = "threadsafe"; + // Make sure that we blow up (via abort() from the security connector) when + // the name from the balancer doesn't match expectations. + ASSERT_DEATH( + { + ResetStub(0, kApplicationTargetName_ + ";lb"); + SetNextResolution( + {AddressData{balancer_servers_[0].port_, true, "woops"}}); + channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(1)); + }, + ""); +} + TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { SetNextResolutionAllBalancers(); const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor(); |