From 820c1f3fda95853e791ffe396b15b32d1bda356b Mon Sep 17 00:00:00 2001 From: Robbie Shade Date: Thu, 23 Jun 2016 14:31:28 -0400 Subject: ASSERT vector size before directly accessing first element --- test/cpp/end2end/end2end_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test') diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 8de9d339f6..ea04ab97db 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -1407,7 +1407,7 @@ TEST_P(SecureEnd2endTest, ClientAuthContext) { std::shared_ptr auth_ctx = context.auth_context(); std::vector tst = auth_ctx->FindPropertyValues("transport_security_type"); - EXPECT_EQ(1u, tst.size()); + ASSERT_EQ(1u, tst.size()); EXPECT_EQ(GetParam().credentials_type, ToString(tst[0])); if (GetParam().credentials_type == kTlsCredentialsType) { EXPECT_EQ("x509_subject_alternative_name", -- cgit v1.2.3 From 256cc7aa034f038f8e82f3e278ca61b64252693b Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 7 Jul 2016 11:09:49 -0700 Subject: Support server reflection in CLI --- Makefile | 8 ++-- build.yaml | 4 ++ test/cpp/util/grpc_cli.cc | 44 ++++++++++++---------- test/cpp/util/proto_file_parser.cc | 42 +++++++++++++++++++-- test/cpp/util/proto_file_parser.h | 11 ++++++ .../util/proto_reflection_descriptor_database.cc | 12 +++++- tools/run_tests/sources_and_headers.json | 8 +++- .../vcxproj/grpc_cli_libs/grpc_cli_libs.vcxproj | 3 ++ .../grpc_cli_libs/grpc_cli_libs.vcxproj.filters | 6 +++ vsprojects/vcxproj/test/grpc_cli/grpc_cli.vcxproj | 3 ++ 10 files changed, 111 insertions(+), 30 deletions(-) (limited to 'test') diff --git a/Makefile b/Makefile index 7011996371..6ba584cbdc 100644 --- a/Makefile +++ b/Makefile @@ -4140,6 +4140,7 @@ endif LIBGRPC_CLI_LIBS_SRC = \ test/cpp/util/cli_call.cc \ test/cpp/util/proto_file_parser.cc \ + test/cpp/util/proto_reflection_descriptor_database.cc \ PUBLIC_HEADERS_CXX += \ @@ -11079,16 +11080,16 @@ $(BINDIR)/$(CONFIG)/grpc_cli: protobuf_dep_error else -$(BINDIR)/$(CONFIG)/grpc_cli: $(PROTOBUF_DEP) $(GRPC_CLI_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_cli_libs.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a +$(BINDIR)/$(CONFIG)/grpc_cli: $(PROTOBUF_DEP) $(GRPC_CLI_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_cli_libs.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++_reflection.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(E) "[LD] Linking $@" $(Q) mkdir -p `dirname $@` - $(Q) $(LDXX) $(LDFLAGS) $(GRPC_CLI_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_cli_libs.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/grpc_cli + $(Q) $(LDXX) $(LDFLAGS) $(GRPC_CLI_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_cli_libs.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++_reflection.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/grpc_cli endif endif -$(OBJDIR)/$(CONFIG)/test/cpp/util/grpc_cli.o: $(LIBDIR)/$(CONFIG)/libgrpc_cli_libs.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a +$(OBJDIR)/$(CONFIG)/test/cpp/util/grpc_cli.o: $(LIBDIR)/$(CONFIG)/libgrpc_cli_libs.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++_reflection.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a deps_grpc_cli: $(GRPC_CLI_OBJS:.o=.dep) @@ -15002,6 +15003,7 @@ test/cpp/util/byte_buffer_proto_helper.cc: $(OPENSSL_DEP) test/cpp/util/cli_call.cc: $(OPENSSL_DEP) test/cpp/util/create_test_channel.cc: $(OPENSSL_DEP) test/cpp/util/proto_file_parser.cc: $(OPENSSL_DEP) +test/cpp/util/proto_reflection_descriptor_database.cc: $(OPENSSL_DEP) test/cpp/util/string_ref_helper.cc: $(OPENSSL_DEP) test/cpp/util/subprocess.cc: $(OPENSSL_DEP) test/cpp/util/test_config.cc: $(OPENSSL_DEP) diff --git a/build.yaml b/build.yaml index 1c485fd5c9..8fabaad774 100644 --- a/build.yaml +++ b/build.yaml @@ -1028,10 +1028,13 @@ libs: headers: - test/cpp/util/cli_call.h - test/cpp/util/proto_file_parser.h + - test/cpp/util/proto_reflection_descriptor_database.h src: - test/cpp/util/cli_call.cc - test/cpp/util/proto_file_parser.cc + - test/cpp/util/proto_reflection_descriptor_database.cc deps: + - grpc++_reflection - grpc++ - grpc_plugin_support - name: grpc_plugin_support @@ -2669,6 +2672,7 @@ targets: - grpc_cli_libs - grpc++_test_util - grpc_test_util + - grpc++_reflection - grpc++ - grpc - gpr_test_util diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc index c52e48bae6..71470e5214 100644 --- a/test/cpp/util/grpc_cli.cc +++ b/test/cpp/util/grpc_cli.cc @@ -86,6 +86,7 @@ DEFINE_string(output_binary_file, "", DEFINE_string(metadata, "", "Metadata to send to server, in the form of key1:val1:key2:val2"); DEFINE_string(proto_path, ".", "Path to look for the proto file."); +DEFINE_string(proto_file, "", "Name of the proto file."); void ParseMetadataFlag( std::multimap* client_metadata) { @@ -129,31 +130,47 @@ void PrintMetadata(const T& m, const grpc::string& message) { int main(int argc, char** argv) { grpc::testing::InitTest(&argc, &argv, true); - if (argc < 4 || argc == 5 || grpc::string(argv[1]) != "call") { + if (argc < 4 || grpc::string(argv[1]) != "call") { std::cout << "Usage: grpc_cli call server_host:port method_name " << "[proto file] [text format request] []" << std::endl; + return 1; } - grpc::string file_name; grpc::string request_text; grpc::string server_address(argv[2]); grpc::string method_name(argv[3]); std::unique_ptr parser; grpc::string serialized_request_proto; - if (argc == 6) { - file_name = argv[4]; + if (argc == 5) { // TODO(yangg) read from stdin as well? - request_text = argv[5]; + request_text = argv[4]; } + std::shared_ptr creds; + if (!FLAGS_enable_ssl) { + creds = grpc::InsecureChannelCredentials(); + } else { + if (FLAGS_use_auth) { + creds = grpc::GoogleDefaultCredentials(); + } else { + creds = grpc::SslCredentials(grpc::SslCredentialsOptions()); + } + } + std::shared_ptr channel = + grpc::CreateChannel(server_address, creds); + if (request_text.empty() && FLAGS_input_binary_file.empty()) { std::cout << "Missing input. Use text format input or " << "--input_binary_file for serialized request" << std::endl; return 1; } else if (!request_text.empty()) { - parser.reset(new grpc::testing::ProtoFileParser(FLAGS_proto_path, file_name, - method_name)); + if (!FLAGS_proto_file.empty()) { + parser.reset(new grpc::testing::ProtoFileParser( + FLAGS_proto_path, FLAGS_proto_file, method_name)); + } else { + parser.reset(new grpc::testing::ProtoFileParser(channel, method_name)); + } method_name = parser->GetFullMethodName(); if (parser->HasError()) { return 1; @@ -175,19 +192,6 @@ int main(int argc, char** argv) { } std::cout << "connecting to " << server_address << std::endl; - std::shared_ptr creds; - if (!FLAGS_enable_ssl) { - creds = grpc::InsecureChannelCredentials(); - } else { - if (FLAGS_use_auth) { - creds = grpc::GoogleDefaultCredentials(); - } else { - creds = grpc::SslCredentials(grpc::SslCredentialsOptions()); - } - } - std::shared_ptr channel = - grpc::CreateChannel(server_address, creds); - grpc::string serialized_response_proto; std::multimap client_metadata; std::multimap server_initial_metadata, diff --git a/test/cpp/util/proto_file_parser.cc b/test/cpp/util/proto_file_parser.cc index 25aec329eb..2ac19858bf 100644 --- a/test/cpp/util/proto_file_parser.cc +++ b/test/cpp/util/proto_file_parser.cc @@ -95,9 +95,45 @@ ProtoFileParser::ProtoFileParser(const grpc::string& proto_path, dynamic_factory_.reset( new google::protobuf::DynamicMessageFactory(importer_->pool())); + std::vector service_desc_list; + for (int i = 0; i < file_desc->service_count(); i++) { + service_desc_list.push_back(file_desc->service(i)); + } + InitProtoFileParser(method, service_desc_list); +} + +ProtoFileParser::ProtoFileParser(std::shared_ptr channel, + const grpc::string& method) + : has_error_(false), + desc_db_(new grpc::ProtoReflectionDescriptorDatabase(channel)), + desc_pool_(new google::protobuf::DescriptorPool(desc_db_.get())) { + std::vector service_list; + if (!desc_db_->GetServices(&service_list)) { + LogError("Failed to get services"); + } + if (has_error_) { + return; + } + dynamic_factory_.reset( + new google::protobuf::DynamicMessageFactory(desc_pool_.get())); + + std::vector service_desc_list; + for (auto it = service_list.begin(); it != service_list.end(); it++) { + service_desc_list.push_back(desc_pool_->FindServiceByName(*it)); + } + InitProtoFileParser(method, service_desc_list); +} + +ProtoFileParser::~ProtoFileParser() {} + +void ProtoFileParser::InitProtoFileParser( + const grpc::string& method, + const std::vector + service_desc_list) { const google::protobuf::MethodDescriptor* method_descriptor = nullptr; - for (int i = 0; !method_descriptor && i < file_desc->service_count(); i++) { - const auto* service_desc = file_desc->service(i); + for (auto it = service_desc_list.begin(); it != service_desc_list.end(); + it++) { + const auto* service_desc = *it; for (int j = 0; j < service_desc->method_count(); j++) { const auto* method_desc = service_desc->method(j); if (MethodNameMatch(method_desc->full_name(), method)) { @@ -130,8 +166,6 @@ ProtoFileParser::ProtoFileParser(const grpc::string& proto_path, dynamic_factory_->GetPrototype(method_descriptor->output_type())->New()); } -ProtoFileParser::~ProtoFileParser() {} - grpc::string ProtoFileParser::GetSerializedProto( const grpc::string& text_format_proto, bool is_request) { grpc::string serialized; diff --git a/test/cpp/util/proto_file_parser.h b/test/cpp/util/proto_file_parser.h index 46cdd66503..b442d77db9 100644 --- a/test/cpp/util/proto_file_parser.h +++ b/test/cpp/util/proto_file_parser.h @@ -38,8 +38,10 @@ #include #include +#include #include "src/compiler/config.h" +#include "test/cpp/util/proto_reflection_descriptor_database.h" namespace grpc { namespace testing { @@ -53,6 +55,9 @@ class ProtoFileParser { // even just Method. It will log an error if there is ambiguity. ProtoFileParser(const grpc::string& proto_path, const grpc::string& file_name, const grpc::string& method); + + ProtoFileParser(std::shared_ptr channel, + const grpc::string& method); ~ProtoFileParser(); grpc::string GetFullMethodName() const { return full_method_name_; } @@ -68,12 +73,18 @@ class ProtoFileParser { void LogError(const grpc::string& error_msg); private: + void InitProtoFileParser( + const grpc::string& method, + const std::vector services); + bool has_error_; grpc::string request_text_; grpc::string full_method_name_; google::protobuf::compiler::DiskSourceTree source_tree_; std::unique_ptr error_printer_; std::unique_ptr importer_; + std::unique_ptr desc_db_; + std::unique_ptr desc_pool_; std::unique_ptr dynamic_factory_; std::unique_ptr request_prototype_; std::unique_ptr response_prototype_; diff --git a/test/cpp/util/proto_reflection_descriptor_database.cc b/test/cpp/util/proto_reflection_descriptor_database.cc index 25b720aee0..48998551a5 100644 --- a/test/cpp/util/proto_reflection_descriptor_database.cc +++ b/test/cpp/util/proto_reflection_descriptor_database.cc @@ -53,7 +53,17 @@ ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase( std::shared_ptr channel) : stub_(ServerReflection::NewStub(channel)) {} -ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() {} +ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() { + if (!stream_) { + GetStream()->WritesDone(); + Status status = stream_->Finish(); + if (!status.ok()) { + gpr_log(GPR_ERROR, + "ServerReflectionInfo rpc failed. Error code: %d, details: %s", + (int)status.error_code(), status.error_message().c_str()); + } + } +} bool ProtoReflectionDescriptorDatabase::FindFileByName( const string& filename, google::protobuf::FileDescriptorProto* output) { diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 6d7cfdaf23..86383fa0cc 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -2149,6 +2149,7 @@ "gpr_test_util", "grpc", "grpc++", + "grpc++_reflection", "grpc++_test_config", "grpc++_test_util", "grpc_cli_libs", @@ -4479,7 +4480,8 @@ ], "headers": [ "test/cpp/util/cli_call.h", - "test/cpp/util/proto_file_parser.h" + "test/cpp/util/proto_file_parser.h", + "test/cpp/util/proto_reflection_descriptor_database.h" ], "language": "c++", "name": "grpc_cli_libs", @@ -4487,7 +4489,9 @@ "test/cpp/util/cli_call.cc", "test/cpp/util/cli_call.h", "test/cpp/util/proto_file_parser.cc", - "test/cpp/util/proto_file_parser.h" + "test/cpp/util/proto_file_parser.h", + "test/cpp/util/proto_reflection_descriptor_database.cc", + "test/cpp/util/proto_reflection_descriptor_database.h" ], "third_party": false, "type": "lib" diff --git a/vsprojects/vcxproj/grpc_cli_libs/grpc_cli_libs.vcxproj b/vsprojects/vcxproj/grpc_cli_libs/grpc_cli_libs.vcxproj index 39cb1e0cb5..03c82f686c 100644 --- a/vsprojects/vcxproj/grpc_cli_libs/grpc_cli_libs.vcxproj +++ b/vsprojects/vcxproj/grpc_cli_libs/grpc_cli_libs.vcxproj @@ -149,12 +149,15 @@ + + + diff --git a/vsprojects/vcxproj/grpc_cli_libs/grpc_cli_libs.vcxproj.filters b/vsprojects/vcxproj/grpc_cli_libs/grpc_cli_libs.vcxproj.filters index 55ef18bf30..4add8ed5e1 100644 --- a/vsprojects/vcxproj/grpc_cli_libs/grpc_cli_libs.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_cli_libs/grpc_cli_libs.vcxproj.filters @@ -7,6 +7,9 @@ test\cpp\util + + test\cpp\util + @@ -15,6 +18,9 @@ test\cpp\util + + test\cpp\util + diff --git a/vsprojects/vcxproj/test/grpc_cli/grpc_cli.vcxproj b/vsprojects/vcxproj/test/grpc_cli/grpc_cli.vcxproj index cd844d1579..9c8cdc54c2 100644 --- a/vsprojects/vcxproj/test/grpc_cli/grpc_cli.vcxproj +++ b/vsprojects/vcxproj/test/grpc_cli/grpc_cli.vcxproj @@ -173,6 +173,9 @@ {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B} + + {5F575402-3F89-5D1A-6910-9DB8BF5D2BAB} + {C187A093-A0FE-489D-A40A-6E33DE0F9FEB} -- cgit v1.2.3 From c24e0ee4f00a348a2f9948c820cf08be813c5f8a Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 7 Jul 2016 11:32:25 -0700 Subject: Update docs --- test/cpp/util/grpc_cli.cc | 21 ++++++++++++--------- test/cpp/util/proto_file_parser.cc | 7 +++++-- 2 files changed, 17 insertions(+), 11 deletions(-) (limited to 'test') diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc index 71470e5214..1a9662694a 100644 --- a/test/cpp/util/grpc_cli.cc +++ b/test/cpp/util/grpc_cli.cc @@ -34,18 +34,21 @@ /* A command line tool to talk to a grpc server. Example of talking to grpc interop server: - grpc_cli call localhost:50051 UnaryCall src/proto/grpc/testing/test.proto \ - "response_size:10" --enable_ssl=false + grpc_cli call localhost:50051 UnaryCall "response_size:10" \ + --proto_file=src/proto/grpc/testing/test.prot --enable_ssl=false Options: - 1. --proto_path, if your proto file is not under current working directory, + 1. --proto_file, use this flag to provide a proto file if the server does + does not have the reflection service. + 2. --proto_path, if your proto file is not under current working directory, use this flag to provide a search root. It should work similar to the - counterpart in protoc. - 2. --metadata specifies metadata to be sent to the server, such as: + counterpart in protoc. This option is valid only when proto_file is + provided. + 3. --metadata specifies metadata to be sent to the server, such as: --metadata="MyHeaderKey1:Value1:MyHeaderKey2:Value2" - 3. --enable_ssl, whether to use tls. - 4. --use_auth, if set to true, attach a GoogleDefaultCredentials to the call - 3. --input_binary_file, a file containing the serialized request. The file + 4. --enable_ssl, whether to use tls. + 5. --use_auth, if set to true, attach a GoogleDefaultCredentials to the call + 6. --input_binary_file, a file containing the serialized request. The file can be generated by calling something like: protoc --proto_path=src/proto/grpc/testing/ \ --encode=grpc.testing.SimpleRequest \ @@ -53,7 +56,7 @@ < input.txt > input.bin If this is used and no proto file is provided in the argument list, the method string has to be exact in the form of /package.service/method. - 4. --output_binary_file, a file to write binary format response into, it can + 7. --output_binary_file, a file to write binary format response into, it can be later decoded using protoc: protoc --proto_path=src/proto/grpc/testing/ \ --decode=grpc.testing.SimpleResponse \ diff --git a/test/cpp/util/proto_file_parser.cc b/test/cpp/util/proto_file_parser.cc index 2ac19858bf..b1bf0471e1 100644 --- a/test/cpp/util/proto_file_parser.cc +++ b/test/cpp/util/proto_file_parser.cc @@ -109,7 +109,10 @@ ProtoFileParser::ProtoFileParser(std::shared_ptr channel, desc_pool_(new google::protobuf::DescriptorPool(desc_db_.get())) { std::vector service_list; if (!desc_db_->GetServices(&service_list)) { - LogError("Failed to get services"); + LogError( + "Failed to get services from the server, " + "it may not have the reflection service.\n" + "Please try to use the --proto_file option to provide a proto file."); } if (has_error_) { return; @@ -177,7 +180,7 @@ grpc::string ProtoFileParser::GetSerializedProto( LogError("Failed to parse text format to proto."); return ""; } - ok = request_prototype_->SerializeToString(&serialized); + ok = msg->SerializeToString(&serialized); if (!ok) { LogError("Failed to serialize proto."); return ""; -- cgit v1.2.3 From a17c8d993d3bc4867fcc9d6fcf42e6bfff9c9cd8 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 7 Jul 2016 11:40:05 -0700 Subject: Fix typos --- test/cpp/util/grpc_cli.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test') diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc index 1a9662694a..7c8e2cdc63 100644 --- a/test/cpp/util/grpc_cli.cc +++ b/test/cpp/util/grpc_cli.cc @@ -35,7 +35,7 @@ A command line tool to talk to a grpc server. Example of talking to grpc interop server: grpc_cli call localhost:50051 UnaryCall "response_size:10" \ - --proto_file=src/proto/grpc/testing/test.prot --enable_ssl=false + --proto_file=src/proto/grpc/testing/test.proto --enable_ssl=false Options: 1. --proto_file, use this flag to provide a proto file if the server does -- cgit v1.2.3 From c68640f05cea2cf79bf2703d83da6b76fa6dc5e6 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 7 Jul 2016 14:13:35 -0700 Subject: Read from stdin Read from stdin if the request text and binary file are not provided --- test/cpp/util/grpc_cli.cc | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) (limited to 'test') diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc index 7c8e2cdc63..fdb1a7c2a0 100644 --- a/test/cpp/util/grpc_cli.cc +++ b/test/cpp/util/grpc_cli.cc @@ -64,6 +64,7 @@ < output.bin > output.txt */ +#include #include #include #include @@ -146,7 +147,6 @@ int main(int argc, char** argv) { grpc::string serialized_request_proto; if (argc == 5) { - // TODO(yangg) read from stdin as well? request_text = argv[4]; } @@ -164,10 +164,15 @@ int main(int argc, char** argv) { grpc::CreateChannel(server_address, creds); if (request_text.empty() && FLAGS_input_binary_file.empty()) { - std::cout << "Missing input. Use text format input or " - << "--input_binary_file for serialized request" << std::endl; - return 1; - } else if (!request_text.empty()) { + if (isatty(STDIN_FILENO)) { + std::cout << "reading request message from stdin..." << std::endl; + } + std::stringstream input_stream; + input_stream << std::cin.rdbuf(); + request_text = input_stream.str(); + } + + if (!request_text.empty()) { if (!FLAGS_proto_file.empty()) { parser.reset(new grpc::testing::ProtoFileParser( FLAGS_proto_path, FLAGS_proto_file, method_name)); @@ -178,6 +183,12 @@ int main(int argc, char** argv) { if (parser->HasError()) { return 1; } + + if (!FLAGS_input_binary_file.empty()) { + std::cout + << "warning: request given in argv, ignoring --input_binary_file" + << std::endl; + } } if (parser) { @@ -226,7 +237,7 @@ int main(int argc, char** argv) { } } else { std::cout << "Rpc failed with status code " << s.error_code() - << " error message " << s.error_message() << std::endl; + << ", error message: " << s.error_message() << std::endl; } return 0; -- cgit v1.2.3 From 9f340958f6d131ae7d510270f1f60df7b277e994 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 11 Jul 2016 15:15:40 -0700 Subject: Remove unnecessary parsing detail --- src/core/ext/transport/chttp2/transport/chttp2_transport.c | 13 ++++++------- test/core/end2end/bad_server_response_test.c | 4 +++- 2 files changed, 9 insertions(+), 8 deletions(-) (limited to 'test') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 38e782b9b4..01a19eca05 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1764,6 +1764,7 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = arg; + grpc_error *err = GRPC_ERROR_NONE; GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, @@ -1772,15 +1773,13 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing, t->read_buffer.slices[i]); }; - if (i != t->read_buffer.count) { + if (errors[1] == GRPC_ERROR_NONE) { + err = GRPC_ERROR_REF(error); + } else { errors[2] = try_http_parsing(exec_ctx, t); + err = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, + GPR_ARRAY_SIZE(errors)); } - grpc_error *err = - errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE && - errors[2] == GRPC_ERROR_NONE - ? GRPC_ERROR_NONE - : GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, - GPR_ARRAY_SIZE(errors)); for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) { GRPC_ERROR_UNREF(errors[i]); } diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c index cca75f54a5..ab80adf0e0 100644 --- a/test/core/end2end/bad_server_response_test.c +++ b/test/core/end2end/bad_server_response_test.c @@ -71,6 +71,8 @@ #define UNPARSEABLE_DETAIL_MSG "Failed parsing HTTP/2" +#define HTTP1_DETAIL_MSG "Trying to connect an http1.x server" + /* TODO(zyc) Check the content of incomming data instead of using this length */ #define EXPECTED_INCOMING_DATA_LENGTH (size_t)310 @@ -334,7 +336,7 @@ int main(int argc, char **argv) { /* http1 response */ run_test(HTTP1_RESP, sizeof(HTTP1_RESP) - 1, GRPC_STATUS_UNAVAILABLE, - UNPARSEABLE_DETAIL_MSG); + HTTP1_DETAIL_MSG); return 0; } -- cgit v1.2.3 From 05573f1692647ca30a10839b61731486700587ac Mon Sep 17 00:00:00 2001 From: yang-g Date: Mon, 11 Jul 2016 15:48:01 -0700 Subject: Make Server::Wait work for async only server. --- include/grpc++/server.h | 2 ++ src/cpp/server/server.cc | 5 ++--- test/cpp/end2end/async_end2end_test.cc | 18 ++++++++++++++++++ 3 files changed, 22 insertions(+), 3 deletions(-) (limited to 'test') diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 7a8858ef19..9a2f8f11c4 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -183,6 +183,8 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { int num_running_cb_; grpc::condition_variable callback_cv_; + grpc::condition_variable shutdown_cv_; + std::shared_ptr global_callbacks_; std::list* sync_methods_; diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index fb4c68ebe4..374c9cbc04 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -462,14 +462,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) { while (num_running_cb_ != 0) { callback_cv_.wait(lock); } + shutdown_cv_.notify_all(); } } void Server::Wait() { grpc::unique_lock lock(mu_); - while (num_running_cb_ != 0) { - callback_cv_.wait(lock); - } + shutdown_cv_.wait(lock); } void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 6c7eae53a4..01ba8962d7 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -345,6 +345,24 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) { SendRpc(10); } +// We do not need to protect notify because the use is synchronized. +void ServerWait(Server* server, int* notify) { + server->Wait(); + *notify = 1; +} +TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { + int notify = 0; + std::thread* wait_thread = + new std::thread(&ServerWait, server_.get(), ¬ify); + ResetStub(); + SendRpc(1); + EXPECT_EQ(0, notify); + server_->Shutdown(); + wait_thread->join(); + EXPECT_EQ(1, notify); + delete wait_thread; +} + // Test a simple RPC using the async version of Next TEST_P(AsyncEnd2endTest, AsyncNextRpc) { ResetStub(); -- cgit v1.2.3 From 807387641f8a8acceb3a8e58adabf3080afb871d Mon Sep 17 00:00:00 2001 From: yang-g Date: Thu, 14 Jul 2016 14:53:35 -0700 Subject: prevent spurious wake up and unstarted/already shutdown server --- include/grpc++/server.h | 1 + src/cpp/server/server.cc | 7 ++++++- test/cpp/end2end/async_end2end_test.cc | 7 +++++++ 3 files changed, 14 insertions(+), 1 deletion(-) (limited to 'test') diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 9a2f8f11c4..6876961e21 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -179,6 +179,7 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { grpc::mutex mu_; bool started_; bool shutdown_; + bool shutdown_notified_; // The number of threads which are running callbacks. int num_running_cb_; grpc::condition_variable callback_cv_; diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 374c9cbc04..af04fd4ca6 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -281,6 +281,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, : max_message_size_(max_message_size), started_(false), shutdown_(false), + shutdown_notified_(false), num_running_cb_(0), sync_methods_(new std::list), has_generic_service_(false), @@ -462,13 +463,17 @@ void Server::ShutdownInternal(gpr_timespec deadline) { while (num_running_cb_ != 0) { callback_cv_.wait(lock); } + + shutdown_notified_ = true; shutdown_cv_.notify_all(); } } void Server::Wait() { grpc::unique_lock lock(mu_); - shutdown_cv_.wait(lock); + while (started_ && !shutdown_notified_) { + shutdown_cv_.wait(lock); + } } void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 01ba8962d7..4a8936d281 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -363,6 +363,13 @@ TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { delete wait_thread; } +TEST_P(AsyncEnd2endTest, ShutdownThenWait) { + ResetStub(); + SendRpc(1); + server_->Shutdown(); + server_->Wait(); +} + // Test a simple RPC using the async version of Next TEST_P(AsyncEnd2endTest, AsyncNextRpc) { ResetStub(); -- cgit v1.2.3 From 302bfd1a108c048a1db81236acc0c41fb84ae3b5 Mon Sep 17 00:00:00 2001 From: Masood Malekghassemi Date: Wed, 13 Jul 2016 16:53:09 -0700 Subject: Test Python source distribution installation --- test/distrib/python/run_distrib_test.sh | 55 ++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 21 deletions(-) (limited to 'test') diff --git a/test/distrib/python/run_distrib_test.sh b/test/distrib/python/run_distrib_test.sh index 8a983bc248..0c2154838f 100755 --- a/test/distrib/python/run_distrib_test.sh +++ b/test/distrib/python/run_distrib_test.sh @@ -33,34 +33,47 @@ set -ex cd $(dirname $0) # Pick up the source dist archive whatever its version is +SDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio-*.tar.gz BDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio-*.whl +TOOLS_SDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio_tools-*.tar.gz TOOLS_BDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio_tools-*.whl -if [ ! -f ${SDIST_ARCHIVE} ] -then - echo "Archive ${SDIST_ARCHIVE} does not exist." - exit 1 -fi +function make_virtualenv() { + virtualenv $1 + $1/bin/python -m pip install --upgrade six pip + $1/bin/python -m pip install cython +} -PYTHON=python2 -PIP=pip2 -which $PYTHON || PYTHON=python -which $PIP || PIP=pip +function at_least_one_installs() { + for file in "$@"; do + if python -m pip install $file; then + return 0 + fi + done + return -1 +} -# TODO(jtattermusch): this shouldn't be required -# TODO(jtattermusch): run the command twice to workaround docker-on-overlay -# issue https://github.com/docker/docker/issues/12327 -# (first attempt will fail when using docker with overlayFS) -${PIP} install --upgrade six pip || ${PIP} install --upgrade six pip +make_virtualenv bdist_test +make_virtualenv sdist_test -# At least one of the bdist packages has to succeed (whichever one matches the -# test machine, anyway). -for bdist in ${BDIST_ARCHIVES} ${TOOLS_BDIST_ARCHIVES}; do - ($PYTHON -m pip install $bdist) || true -done +# +# Install our distributions in order of dependencies +# + +(source bdist_test/bin/activate && at_least_one_installs ${BDIST_ARCHIVES}) +(source bdist_test/bin/activate && at_least_one_installs ${TOOLS_BDIST_ARCHIVES}) + +(source sdist_test/bin/activate && at_least_one_installs ${SDIST_ARCHIVES}) +(source sdist_test/bin/activate && at_least_one_installs ${TOOLS_SDIST_ARCHIVES}) + +# +# Test our distributions +# # TODO(jtattermusch): add a .proto file to the distribtest, generate python # code from it and then use the generated code from distribtest.py -$PYTHON -m grpc.tools.protoc +(source bdist_test/bin/activate && python -m grpc.tools.protoc --help) +(source sdist_test/bin/activate && python -m grpc.tools.protoc --help) -$PYTHON distribtest.py +(source bdist_test/bin/activate && python distribtest.py) +(source sdist_test/bin/activate && python distribtest.py) -- cgit v1.2.3 From 1318479011fa7d2e2859c20c89748b098618a5f8 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 18 Jul 2016 13:07:42 -0700 Subject: Separate generated file and protobuf dependency --- Makefile | 68 ++++++++- build.yaml | 18 ++- include/grpc++/impl/codegen/config_protobuf.h | 9 ++ src/cpp/ext/proto_server_reflection.h | 4 + test/cpp/end2end/async_end2end_test.cc | 40 +---- test/cpp/end2end/proto_server_reflection_test.cc | 24 ++- .../util/proto_reflection_descriptor_database.cc | 12 +- .../util/proto_reflection_descriptor_database.h | 22 ++- tools/run_tests/sources_and_headers.json | 40 ++++- .../grpc++_reflection_codegen.vcxproj | 168 +++++++++++++++++++++ .../grpc++_reflection_codegen.vcxproj.filters | 27 ++++ 11 files changed, 354 insertions(+), 78 deletions(-) create mode 100644 vsprojects/vcxproj/grpc++_reflection_codegen/grpc++_reflection_codegen.vcxproj create mode 100644 vsprojects/vcxproj/grpc++_reflection_codegen/grpc++_reflection_codegen.vcxproj.filters (limited to 'test') diff --git a/Makefile b/Makefile index 4ce22678d7..ab8da11a93 100644 --- a/Makefile +++ b/Makefile @@ -1184,9 +1184,9 @@ pc_cxx: $(LIBDIR)/$(CONFIG)/pkgconfig/grpc++.pc pc_cxx_unsecure: $(LIBDIR)/$(CONFIG)/pkgconfig/grpc++_unsecure.pc ifeq ($(EMBED_OPENSSL),true) -privatelibs_cxx: $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_cli_libs.a $(LIBDIR)/$(CONFIG)/libinterop_client_helper.a $(LIBDIR)/$(CONFIG)/libinterop_client_main.a $(LIBDIR)/$(CONFIG)/libinterop_server_helper.a $(LIBDIR)/$(CONFIG)/libinterop_server_main.a $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libboringssl_test_util.a $(LIBDIR)/$(CONFIG)/libboringssl_aes_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_asn1_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_base64_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_bio_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_bn_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_bytestring_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_aead_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_cipher_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_cmac_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_ed25519_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_x25519_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_dh_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_digest_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_ec_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_ecdsa_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_err_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_evp_extra_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_evp_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_pbkdf_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_hmac_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_pkcs12_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_pkcs8_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_poly1305_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_rsa_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_x509_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_ssl_test_lib.a +privatelibs_cxx: $(LIBDIR)/$(CONFIG)/libgrpc++_reflection_codegen.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_cli_libs.a $(LIBDIR)/$(CONFIG)/libinterop_client_helper.a $(LIBDIR)/$(CONFIG)/libinterop_client_main.a $(LIBDIR)/$(CONFIG)/libinterop_server_helper.a $(LIBDIR)/$(CONFIG)/libinterop_server_main.a $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libboringssl_test_util.a $(LIBDIR)/$(CONFIG)/libboringssl_aes_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_asn1_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_base64_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_bio_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_bn_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_bytestring_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_aead_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_cipher_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_cmac_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_ed25519_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_x25519_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_dh_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_digest_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_ec_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_ecdsa_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_err_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_evp_extra_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_evp_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_pbkdf_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_hmac_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_pkcs12_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_pkcs8_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_poly1305_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_rsa_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_x509_test_lib.a $(LIBDIR)/$(CONFIG)/libboringssl_ssl_test_lib.a else -privatelibs_cxx: $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_cli_libs.a $(LIBDIR)/$(CONFIG)/libinterop_client_helper.a $(LIBDIR)/$(CONFIG)/libinterop_client_main.a $(LIBDIR)/$(CONFIG)/libinterop_server_helper.a $(LIBDIR)/$(CONFIG)/libinterop_server_main.a $(LIBDIR)/$(CONFIG)/libqps.a +privatelibs_cxx: $(LIBDIR)/$(CONFIG)/libgrpc++_reflection_codegen.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_cli_libs.a $(LIBDIR)/$(CONFIG)/libinterop_client_helper.a $(LIBDIR)/$(CONFIG)/libinterop_client_main.a $(LIBDIR)/$(CONFIG)/libinterop_server_helper.a $(LIBDIR)/$(CONFIG)/libinterop_server_main.a $(LIBDIR)/$(CONFIG)/libqps.a endif @@ -1889,6 +1889,21 @@ $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc: src/proto/grpc/lb/v1/lo $(Q) $(PROTOC) -Ithird_party/protobuf/src -I. --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $< endif +ifeq ($(NO_PROTOC),true) +$(GENDIR)/src/proto/grpc/reflection/v1alpha/reflection.pb.cc: protoc_dep_error +$(GENDIR)/src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.cc: protoc_dep_error +else +$(GENDIR)/src/proto/grpc/reflection/v1alpha/reflection.pb.cc: src/proto/grpc/reflection/v1alpha/reflection.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS) + $(E) "[PROTOC] Generating protobuf CC file from $<" + $(Q) mkdir -p `dirname $@` + $(Q) $(PROTOC) -Ithird_party/protobuf/src -I. --cpp_out=$(GENDIR) $< + +$(GENDIR)/src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.cc: src/proto/grpc/reflection/v1alpha/reflection.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS) + $(E) "[GRPC] Generating gRPC's protobuf service CC file from $<" + $(Q) mkdir -p `dirname $@` + $(Q) $(PROTOC) -Ithird_party/protobuf/src -I. --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $< +endif + ifeq ($(NO_PROTOC),true) $(GENDIR)/src/proto/grpc/testing/compiler_test.pb.cc: protoc_dep_error $(GENDIR)/src/proto/grpc/testing/compiler_test.grpc.pb.cc: protoc_dep_error @@ -3709,6 +3724,55 @@ endif endif +LIBGRPC++_REFLECTION_CODEGEN_SRC = \ + $(GENDIR)/src/proto/grpc/reflection/v1alpha/reflection.pb.cc $(GENDIR)/src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.cc \ + +PUBLIC_HEADERS_CXX += \ + +LIBGRPC++_REFLECTION_CODEGEN_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_REFLECTION_CODEGEN_SRC)))) + + +ifeq ($(NO_SECURE),true) + +# You can't build secure libraries if you don't have OpenSSL. + +$(LIBDIR)/$(CONFIG)/libgrpc++_reflection_codegen.a: openssl_dep_error + + +else + +ifeq ($(NO_PROTOBUF),true) + +# You can't build a C++ library if you don't have protobuf - a bit overreached, but still okay. + +$(LIBDIR)/$(CONFIG)/libgrpc++_reflection_codegen.a: protobuf_dep_error + + +else + +$(LIBDIR)/$(CONFIG)/libgrpc++_reflection_codegen.a: $(ZLIB_DEP) $(OPENSSL_DEP) $(PROTOBUF_DEP) $(LIBGRPC++_REFLECTION_CODEGEN_OBJS) + $(E) "[AR] Creating $@" + $(Q) mkdir -p `dirname $@` + $(Q) rm -f $(LIBDIR)/$(CONFIG)/libgrpc++_reflection_codegen.a + $(Q) $(AR) $(LIBDIR)/$(CONFIG)/libgrpc++_reflection_codegen.a $(LIBGRPC++_REFLECTION_CODEGEN_OBJS) +ifeq ($(SYSTEM),Darwin) + $(Q) ranlib -no_warning_for_no_symbols $(LIBDIR)/$(CONFIG)/libgrpc++_reflection_codegen.a +endif + + + + +endif + +endif + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(LIBGRPC++_REFLECTION_CODEGEN_OBJS:.o=.dep) +endif +endif + + LIBGRPC++_TEST_CONFIG_SRC = \ test/cpp/util/test_config.cc \ diff --git a/build.yaml b/build.yaml index 57545839d4..4c9dbd046d 100644 --- a/build.yaml +++ b/build.yaml @@ -768,6 +768,14 @@ filegroups: language: c++ public_headers: - include/grpc++/impl/codegen/config_protobuf.h +- name: grpc++_reflection_proto + language: c++ + public_headers: + - include/grpc++/ext/reflection.grpc.pb.h + - include/grpc++/ext/reflection.pb.h + src: + - src/cpp/ext/reflection.grpc.pb.cc + - src/cpp/ext/reflection.pb.cc libs: - name: gpr build: all @@ -960,19 +968,21 @@ libs: language: c++ public_headers: - include/grpc++/ext/proto_server_reflection_plugin.h - - include/grpc++/ext/reflection.grpc.pb.h - - include/grpc++/ext/reflection.pb.h headers: - src/cpp/ext/proto_server_reflection.h src: - src/cpp/ext/proto_server_reflection.cc - src/cpp/ext/proto_server_reflection_plugin.cc - - src/cpp/ext/reflection.grpc.pb.cc - - src/cpp/ext/reflection.pb.cc deps: - grpc++ filegroups: + - grpc++_reflection_proto - grpc++_codegen_proto +- name: grpc++_reflection_codegen + build: private + language: c++ + src: + - src/proto/grpc/reflection/v1alpha/reflection.proto - name: grpc++_test_config build: private language: c++ diff --git a/include/grpc++/impl/codegen/config_protobuf.h b/include/grpc++/impl/codegen/config_protobuf.h index 318ba59683..ea7b977791 100644 --- a/include/grpc++/impl/codegen/config_protobuf.h +++ b/include/grpc++/impl/codegen/config_protobuf.h @@ -57,6 +57,13 @@ #define GRPC_CUSTOM_SOURCELOCATION ::google::protobuf::SourceLocation #endif +#ifndef GRPC_CUSTOM_DESCRIPTORDATABASE +#include +#define GRPC_CUSTOM_DESCRIPTORDATABASE ::google::protobuf::DescriptorDatabase +#define GRPC_CUSTOM_SIMPLEDESCRIPTORDATABASE \ + ::google::protobuf::SimpleDescriptorDatabase +#endif + #ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM #include #include @@ -75,11 +82,13 @@ typedef GRPC_CUSTOM_PROTOBUF_INT64 int64; typedef GRPC_CUSTOM_DESCRIPTOR Descriptor; typedef GRPC_CUSTOM_DESCRIPTORPOOL DescriptorPool; +typedef GRPC_CUSTOM_DESCRIPTORDATABASE DescriptorDatabase; typedef GRPC_CUSTOM_FIELDDESCRIPTOR FieldDescriptor; typedef GRPC_CUSTOM_FILEDESCRIPTOR FileDescriptor; typedef GRPC_CUSTOM_FILEDESCRIPTORPROTO FileDescriptorProto; typedef GRPC_CUSTOM_METHODDESCRIPTOR MethodDescriptor; typedef GRPC_CUSTOM_SERVICEDESCRIPTOR ServiceDescriptor; +typedef GRPC_CUSTOM_SIMPLEDESCRIPTORDATABASE SimpleDescriptorDatabase; typedef GRPC_CUSTOM_SOURCELOCATION SourceLocation; namespace io { diff --git a/src/cpp/ext/proto_server_reflection.h b/src/cpp/ext/proto_server_reflection.h index 23c130513d..1a9a0c95a8 100644 --- a/src/cpp/ext/proto_server_reflection.h +++ b/src/cpp/ext/proto_server_reflection.h @@ -36,7 +36,11 @@ #include #include +#ifdef GRPC_NO_GENERATED_CODE +#include "src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.h" +#else #include +#endif // GRPC_NO_GENERATED_CODE #include namespace grpc { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 4a8936d281..084d6db500 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -200,6 +200,10 @@ class Verifier { bool spin_; }; +bool plugin_has_sync_methods(std::unique_ptr& plugin) { + return plugin->has_sync_methods(); +} + // This class disables the server builder plugins that may add sync services to // the server. If there are sync services, UnimplementedRpc test will triger // the sync unkown rpc routine on the server side, rather than the async one @@ -210,14 +214,9 @@ class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption { void UpdatePlugins(std::vector>* plugins) GRPC_OVERRIDE { - auto plugin = plugins->begin(); - while (plugin != plugins->end()) { - if ((*plugin)->has_sync_methods()) { - plugins->erase(plugin++); - } else { - plugin++; - } - } + plugins->erase(std::remove_if(plugins->begin(), plugins->end(), + plugin_has_sync_methods), + plugins->end()); } }; @@ -345,31 +344,6 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) { SendRpc(10); } -// We do not need to protect notify because the use is synchronized. -void ServerWait(Server* server, int* notify) { - server->Wait(); - *notify = 1; -} -TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { - int notify = 0; - std::thread* wait_thread = - new std::thread(&ServerWait, server_.get(), ¬ify); - ResetStub(); - SendRpc(1); - EXPECT_EQ(0, notify); - server_->Shutdown(); - wait_thread->join(); - EXPECT_EQ(1, notify); - delete wait_thread; -} - -TEST_P(AsyncEnd2endTest, ShutdownThenWait) { - ResetStub(); - SendRpc(1); - server_->Shutdown(); - server_->Wait(); -} - // Test a simple RPC using the async version of Next TEST_P(AsyncEnd2endTest, AsyncNextRpc) { ResetStub(); diff --git a/test/cpp/end2end/proto_server_reflection_test.cc b/test/cpp/end2end/proto_server_reflection_test.cc index f8fc39b553..efbb0e1f8e 100644 --- a/test/cpp/end2end/proto_server_reflection_test.cc +++ b/test/cpp/end2end/proto_server_reflection_test.cc @@ -31,7 +31,6 @@ * */ -#include #include #include #include @@ -59,7 +58,7 @@ class ProtoServerReflectionTest : public ::testing::Test { void SetUp() GRPC_OVERRIDE { port_ = grpc_pick_unused_port_or_die(); - ref_desc_pool_ = google::protobuf::DescriptorPool::generated_pool(); + ref_desc_pool_ = protobuf::DescriptorPool::generated_pool(); ServerBuilder builder; grpc::string server_address = "localhost:" + to_string(port_); @@ -73,7 +72,7 @@ class ProtoServerReflectionTest : public ::testing::Test { CreateChannel(target, InsecureChannelCredentials()); stub_ = grpc::testing::EchoTestService::NewStub(channel); desc_db_.reset(new ProtoReflectionDescriptorDatabase(channel)); - desc_pool_.reset(new google::protobuf::DescriptorPool(desc_db_.get())); + desc_pool_.reset(new protobuf::DescriptorPool(desc_db_.get())); } string to_string(const int number) { @@ -83,15 +82,15 @@ class ProtoServerReflectionTest : public ::testing::Test { } void CompareService(const grpc::string& service) { - const google::protobuf::ServiceDescriptor* service_desc = + const protobuf::ServiceDescriptor* service_desc = desc_pool_->FindServiceByName(service); - const google::protobuf::ServiceDescriptor* ref_service_desc = + const protobuf::ServiceDescriptor* ref_service_desc = ref_desc_pool_->FindServiceByName(service); EXPECT_TRUE(service_desc != nullptr); EXPECT_TRUE(ref_service_desc != nullptr); EXPECT_EQ(service_desc->DebugString(), ref_service_desc->DebugString()); - const google::protobuf::FileDescriptor* file_desc = service_desc->file(); + const protobuf::FileDescriptor* file_desc = service_desc->file(); if (known_files_.find(file_desc->package() + "/" + file_desc->name()) != known_files_.end()) { EXPECT_EQ(file_desc->DebugString(), @@ -105,9 +104,9 @@ class ProtoServerReflectionTest : public ::testing::Test { } void CompareMethod(const grpc::string& method) { - const google::protobuf::MethodDescriptor* method_desc = + const protobuf::MethodDescriptor* method_desc = desc_pool_->FindMethodByName(method); - const google::protobuf::MethodDescriptor* ref_method_desc = + const protobuf::MethodDescriptor* ref_method_desc = ref_desc_pool_->FindMethodByName(method); EXPECT_TRUE(method_desc != nullptr); EXPECT_TRUE(ref_method_desc != nullptr); @@ -122,9 +121,8 @@ class ProtoServerReflectionTest : public ::testing::Test { return; } - const google::protobuf::Descriptor* desc = - desc_pool_->FindMessageTypeByName(type); - const google::protobuf::Descriptor* ref_desc = + const protobuf::Descriptor* desc = desc_pool_->FindMessageTypeByName(type); + const protobuf::Descriptor* ref_desc = ref_desc_pool_->FindMessageTypeByName(type); EXPECT_TRUE(desc != nullptr); EXPECT_TRUE(ref_desc != nullptr); @@ -135,10 +133,10 @@ class ProtoServerReflectionTest : public ::testing::Test { std::unique_ptr server_; std::unique_ptr stub_; std::unique_ptr desc_db_; - std::unique_ptr desc_pool_; + std::unique_ptr desc_pool_; std::unordered_set known_files_; std::unordered_set known_types_; - const google::protobuf::DescriptorPool* ref_desc_pool_; + const protobuf::DescriptorPool* ref_desc_pool_; int port_; reflection::ProtoServerReflectionPlugin plugin_; }; diff --git a/test/cpp/util/proto_reflection_descriptor_database.cc b/test/cpp/util/proto_reflection_descriptor_database.cc index 25b720aee0..ebfcc27e2f 100644 --- a/test/cpp/util/proto_reflection_descriptor_database.cc +++ b/test/cpp/util/proto_reflection_descriptor_database.cc @@ -56,7 +56,7 @@ ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase( ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() {} bool ProtoReflectionDescriptorDatabase::FindFileByName( - const string& filename, google::protobuf::FileDescriptorProto* output) { + const string& filename, protobuf::FileDescriptorProto* output) { if (cached_db_.FindFileByName(filename, output)) { return true; } @@ -101,7 +101,7 @@ bool ProtoReflectionDescriptorDatabase::FindFileByName( } bool ProtoReflectionDescriptorDatabase::FindFileContainingSymbol( - const string& symbol_name, google::protobuf::FileDescriptorProto* output) { + const string& symbol_name, protobuf::FileDescriptorProto* output) { if (cached_db_.FindFileContainingSymbol(symbol_name, output)) { return true; } @@ -148,7 +148,7 @@ bool ProtoReflectionDescriptorDatabase::FindFileContainingSymbol( bool ProtoReflectionDescriptorDatabase::FindFileContainingExtension( const string& containing_type, int field_number, - google::protobuf::FileDescriptorProto* output) { + protobuf::FileDescriptorProto* output) { if (cached_db_.FindFileContainingExtension(containing_type, field_number, output)) { return true; @@ -276,10 +276,10 @@ bool ProtoReflectionDescriptorDatabase::GetServices( return false; } -const google::protobuf::FileDescriptorProto +const protobuf::FileDescriptorProto ProtoReflectionDescriptorDatabase::ParseFileDescriptorProtoResponse( const std::string& byte_fd_proto) { - google::protobuf::FileDescriptorProto file_desc_proto; + protobuf::FileDescriptorProto file_desc_proto; file_desc_proto.ParseFromString(byte_fd_proto); return file_desc_proto; } @@ -287,7 +287,7 @@ ProtoReflectionDescriptorDatabase::ParseFileDescriptorProtoResponse( void ProtoReflectionDescriptorDatabase::AddFileFromResponse( const grpc::reflection::v1alpha::FileDescriptorResponse& response) { for (int i = 0; i < response.file_descriptor_proto_size(); ++i) { - const google::protobuf::FileDescriptorProto file_proto = + const protobuf::FileDescriptorProto file_proto = ParseFileDescriptorProtoResponse(response.file_descriptor_proto(i)); if (known_files_.find(file_proto.name()) == known_files_.end()) { known_files_.insert(file_proto.name()); diff --git a/test/cpp/util/proto_reflection_descriptor_database.h b/test/cpp/util/proto_reflection_descriptor_database.h index 99c00675bb..182323b2ec 100644 --- a/test/cpp/util/proto_reflection_descriptor_database.h +++ b/test/cpp/util/proto_reflection_descriptor_database.h @@ -38,19 +38,18 @@ #include #include -#include -#include -#include +#ifdef GRPC_NO_GENERATED_CODE +#include "src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.h" +#else #include +#endif // GRPC_NO_GENERATED_CODE #include - namespace grpc { // ProtoReflectionDescriptorDatabase takes a stub of ServerReflection and // provides the methods defined by DescriptorDatabase interfaces. It can be used // to feed a DescriptorPool instance. -class ProtoReflectionDescriptorDatabase - : public google::protobuf::DescriptorDatabase { +class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase { public: explicit ProtoReflectionDescriptorDatabase( std::unique_ptr stub); @@ -65,14 +64,13 @@ class ProtoReflectionDescriptorDatabase // Find a file by file name. Fills in in *output and returns true if found. // Otherwise, returns false, leaving the contents of *output undefined. bool FindFileByName(const string& filename, - google::protobuf::FileDescriptorProto* output) - GRPC_OVERRIDE; + protobuf::FileDescriptorProto* output) GRPC_OVERRIDE; // Find the file that declares the given fully-qualified symbol name. // If found, fills in *output and returns true, otherwise returns false // and leaves *output undefined. bool FindFileContainingSymbol(const string& symbol_name, - google::protobuf::FileDescriptorProto* output) + protobuf::FileDescriptorProto* output) GRPC_OVERRIDE; // Find the file which defines an extension extending the given message type @@ -81,7 +79,7 @@ class ProtoReflectionDescriptorDatabase // must be a fully-qualified type name. bool FindFileContainingExtension( const string& containing_type, int field_number, - google::protobuf::FileDescriptorProto* output) GRPC_OVERRIDE; + protobuf::FileDescriptorProto* output) GRPC_OVERRIDE; // Finds the tag numbers used by all known extensions of // extendee_type, and appends them to output in an undefined @@ -102,7 +100,7 @@ class ProtoReflectionDescriptorDatabase grpc::reflection::v1alpha::ServerReflectionResponse> ClientStream; - const google::protobuf::FileDescriptorProto ParseFileDescriptorProtoResponse( + const protobuf::FileDescriptorProto ParseFileDescriptorProtoResponse( const std::string& byte_fd_proto); void AddFileFromResponse( @@ -123,7 +121,7 @@ class ProtoReflectionDescriptorDatabase std::unordered_map> cached_extension_numbers_; std::mutex stream_mutex_; - google::protobuf::SimpleDescriptorDatabase cached_db_; + protobuf::SimpleDescriptorDatabase cached_db_; }; } // namespace grpc diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index e3cfd55cd6..451f535a2a 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -4360,26 +4360,33 @@ { "deps": [ "grpc++", - "grpc++_codegen_proto" + "grpc++_codegen_proto", + "grpc++_reflection_proto" ], "headers": [ "include/grpc++/ext/proto_server_reflection_plugin.h", - "include/grpc++/ext/reflection.grpc.pb.h", - "include/grpc++/ext/reflection.pb.h", "src/cpp/ext/proto_server_reflection.h" ], "language": "c++", "name": "grpc++_reflection", "src": [ "include/grpc++/ext/proto_server_reflection_plugin.h", - "include/grpc++/ext/reflection.grpc.pb.h", - "include/grpc++/ext/reflection.pb.h", "src/cpp/ext/proto_server_reflection.cc", "src/cpp/ext/proto_server_reflection.h", - "src/cpp/ext/proto_server_reflection_plugin.cc", - "src/cpp/ext/reflection.grpc.pb.cc", - "src/cpp/ext/reflection.pb.cc" + "src/cpp/ext/proto_server_reflection_plugin.cc" + ], + "third_party": false, + "type": "lib" + }, + { + "deps": [], + "headers": [ + "src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.h", + "src/proto/grpc/reflection/v1alpha/reflection.pb.h" ], + "language": "c++", + "name": "grpc++_reflection_codegen", + "src": [], "third_party": false, "type": "lib" }, @@ -6751,5 +6758,22 @@ ], "third_party": false, "type": "filegroup" + }, + { + "deps": [], + "headers": [ + "include/grpc++/ext/reflection.grpc.pb.h", + "include/grpc++/ext/reflection.pb.h" + ], + "language": "c++", + "name": "grpc++_reflection_proto", + "src": [ + "include/grpc++/ext/reflection.grpc.pb.h", + "include/grpc++/ext/reflection.pb.h", + "src/cpp/ext/reflection.grpc.pb.cc", + "src/cpp/ext/reflection.pb.cc" + ], + "third_party": false, + "type": "filegroup" } ] diff --git a/vsprojects/vcxproj/grpc++_reflection_codegen/grpc++_reflection_codegen.vcxproj b/vsprojects/vcxproj/grpc++_reflection_codegen/grpc++_reflection_codegen.vcxproj new file mode 100644 index 0000000000..d9e10c2d37 --- /dev/null +++ b/vsprojects/vcxproj/grpc++_reflection_codegen/grpc++_reflection_codegen.vcxproj @@ -0,0 +1,168 @@ + + + + + Debug + Win32 + + + Debug + x64 + + + Release + Win32 + + + Release + x64 + + + + {C8A925BF-4373-D85D-60AE-96CDCBBF33F2} + true + $(SolutionDir)IntDir\$(MSBuildProjectName)\ + + + + v100 + + + v110 + + + v120 + + + v140 + + + StaticLibrary + true + Unicode + + + StaticLibrary + false + true + Unicode + + + + + + + + + + + + grpc++_reflection_codegen + + + grpc++_reflection_codegen + + + + NotUsing + Level3 + Disabled + WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) + true + MultiThreadedDebug + true + None + false + + + Windows + true + false + + + + + + NotUsing + Level3 + Disabled + WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) + true + MultiThreadedDebug + true + None + false + + + Windows + true + false + + + + + + NotUsing + Level3 + MaxSpeed + WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) + true + true + true + MultiThreaded + true + None + false + + + Windows + true + false + true + true + + + + + + NotUsing + Level3 + MaxSpeed + WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) + true + true + true + MultiThreaded + true + None + false + + + Windows + true + false + true + true + + + + + + + + + + + + + + + + + + + This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. + + + + diff --git a/vsprojects/vcxproj/grpc++_reflection_codegen/grpc++_reflection_codegen.vcxproj.filters b/vsprojects/vcxproj/grpc++_reflection_codegen/grpc++_reflection_codegen.vcxproj.filters new file mode 100644 index 0000000000..577dcc77d8 --- /dev/null +++ b/vsprojects/vcxproj/grpc++_reflection_codegen/grpc++_reflection_codegen.vcxproj.filters @@ -0,0 +1,27 @@ + + + + + src\proto\grpc\reflection\v1alpha + + + + + + {d6f45d49-92db-00f7-3dd4-e53f5768d80c} + + + {32b951f4-cef1-24a3-ffb9-bb229f0cdd6a} + + + {8fdcb9f3-4d86-2f49-5c15-c92e0e0f4fba} + + + {098a074c-f3de-2840-8009-1a3840af1efc} + + + {219ff371-7d3a-130c-5792-be36514a4e98} + + + + -- cgit v1.2.3 From 5ca7e47493899b69126a2ef331936bcba37ee545 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 19 Jul 2016 09:25:49 -0700 Subject: Merge pull request #6737 from ctiller/delayed-write Delay beginning most writes until we enter poll() --- Makefile | 36 -- build.yaml | 14 - .../chttp2/client/secure/secure_channel_create.c | 6 +- .../ext/transport/chttp2/transport/chttp2_plugin.c | 3 + .../transport/chttp2/transport/chttp2_transport.c | 369 +++++++++++++++++---- src/core/ext/transport/chttp2/transport/internal.h | 65 ++-- src/core/ext/transport/chttp2/transport/parsing.c | 11 +- .../ext/transport/chttp2/transport/stream_lists.c | 20 +- src/core/ext/transport/chttp2/transport/writing.c | 16 +- src/core/lib/iomgr/endpoint.c | 4 + src/core/lib/iomgr/endpoint.h | 4 + src/core/lib/iomgr/ev_epoll_linux.c | 277 +++++++++------- src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 3 + src/core/lib/iomgr/ev_poll_posix.c | 3 + src/core/lib/iomgr/ev_posix.c | 4 + src/core/lib/iomgr/ev_posix.h | 4 + src/core/lib/iomgr/exec_ctx.c | 10 +- src/core/lib/iomgr/exec_ctx.h | 6 +- src/core/lib/iomgr/iomgr.c | 3 + src/core/lib/iomgr/network_status_tracker.c | 15 +- src/core/lib/iomgr/network_status_tracker.h | 4 + src/core/lib/iomgr/tcp_posix.c | 18 +- src/core/lib/iomgr/tcp_server_posix.c | 3 +- src/core/lib/iomgr/tcp_windows.c | 13 +- src/core/lib/iomgr/workqueue.h | 39 ++- src/core/lib/iomgr/workqueue_posix.c | 8 +- src/core/lib/iomgr/workqueue_posix.h | 5 + src/core/lib/iomgr/workqueue_windows.c | 22 ++ src/core/lib/security/transport/secure_endpoint.c | 18 +- src/core/lib/surface/server.c | 76 +++-- src/core/lib/transport/connectivity_state.c | 3 + test/core/end2end/tests/high_initial_seqno.c | 6 + test/core/end2end/tests/network_status_change.c | 5 +- test/core/internal_api_canaries/iomgr.c | 13 +- test/core/iomgr/workqueue_test.c | 150 --------- test/core/util/mock_endpoint.c | 12 +- test/core/util/passthru_endpoint.c | 12 +- test/cpp/end2end/end2end_test.cc | 3 + test/cpp/qps/client_async.cc | 35 +- test/cpp/qps/gen_build_yaml.py | 11 +- test/cpp/qps/json_run_localhost.cc | 2 +- test/cpp/qps/server_async.cc | 5 +- tools/dockerfile/test/python_pyenv_x64/Dockerfile | 112 +++++++ tools/run_tests/run_tests.py | 28 +- tools/run_tests/sources_and_headers.json | 16 - tools/run_tests/tests.json | 131 ++++---- 46 files changed, 1001 insertions(+), 622 deletions(-) delete mode 100644 test/core/iomgr/workqueue_test.c create mode 100644 tools/dockerfile/test/python_pyenv_x64/Dockerfile (limited to 'test') diff --git a/Makefile b/Makefile index 992eee102b..ed362f9752 100644 --- a/Makefile +++ b/Makefile @@ -991,7 +991,6 @@ transport_security_test: $(BINDIR)/$(CONFIG)/transport_security_test udp_server_test: $(BINDIR)/$(CONFIG)/udp_server_test uri_fuzzer_test: $(BINDIR)/$(CONFIG)/uri_fuzzer_test uri_parser_test: $(BINDIR)/$(CONFIG)/uri_parser_test -workqueue_test: $(BINDIR)/$(CONFIG)/workqueue_test alarm_cpp_test: $(BINDIR)/$(CONFIG)/alarm_cpp_test async_end2end_test: $(BINDIR)/$(CONFIG)/async_end2end_test auth_property_iterator_test: $(BINDIR)/$(CONFIG)/auth_property_iterator_test @@ -1295,7 +1294,6 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/transport_security_test \ $(BINDIR)/$(CONFIG)/udp_server_test \ $(BINDIR)/$(CONFIG)/uri_parser_test \ - $(BINDIR)/$(CONFIG)/workqueue_test \ $(BINDIR)/$(CONFIG)/public_headers_must_be_c89 \ $(BINDIR)/$(CONFIG)/badreq_bad_client_test \ $(BINDIR)/$(CONFIG)/connection_prefix_bad_client_test \ @@ -1674,8 +1672,6 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/udp_server_test || ( echo test udp_server_test failed ; exit 1 ) $(E) "[RUN] Testing uri_parser_test" $(Q) $(BINDIR)/$(CONFIG)/uri_parser_test || ( echo test uri_parser_test failed ; exit 1 ) - $(E) "[RUN] Testing workqueue_test" - $(Q) $(BINDIR)/$(CONFIG)/workqueue_test || ( echo test workqueue_test failed ; exit 1 ) $(E) "[RUN] Testing public_headers_must_be_c89" $(Q) $(BINDIR)/$(CONFIG)/public_headers_must_be_c89 || ( echo test public_headers_must_be_c89 failed ; exit 1 ) $(E) "[RUN] Testing badreq_bad_client_test" @@ -10175,38 +10171,6 @@ endif endif -WORKQUEUE_TEST_SRC = \ - test/core/iomgr/workqueue_test.c \ - -WORKQUEUE_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(WORKQUEUE_TEST_SRC)))) -ifeq ($(NO_SECURE),true) - -# You can't build secure targets if you don't have OpenSSL. - -$(BINDIR)/$(CONFIG)/workqueue_test: openssl_dep_error - -else - - - -$(BINDIR)/$(CONFIG)/workqueue_test: $(WORKQUEUE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a - $(E) "[LD] Linking $@" - $(Q) mkdir -p `dirname $@` - $(Q) $(LD) $(LDFLAGS) $(WORKQUEUE_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/workqueue_test - -endif - -$(OBJDIR)/$(CONFIG)/test/core/iomgr/workqueue_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a - -deps_workqueue_test: $(WORKQUEUE_TEST_OBJS:.o=.dep) - -ifneq ($(NO_SECURE),true) -ifneq ($(NO_DEPS),true) --include $(WORKQUEUE_TEST_OBJS:.o=.dep) -endif -endif - - ALARM_CPP_TEST_SRC = \ test/cpp/common/alarm_cpp_test.cc \ diff --git a/build.yaml b/build.yaml index c5d92c1e63..006d35245a 100644 --- a/build.yaml +++ b/build.yaml @@ -2430,20 +2430,6 @@ targets: - grpc - gpr_test_util - gpr -- name: workqueue_test - build: test - language: c - src: - - test/core/iomgr/workqueue_test.c - deps: - - grpc_test_util - - grpc - - gpr_test_util - - gpr - platforms: - - mac - - linux - - posix - name: alarm_cpp_test gtest: true build: test diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 721ba82d8f..9acacbd92d 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -91,11 +91,13 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, connector *c = arg; grpc_closure *notify; gpr_mu_lock(&c->mu); + grpc_error *error = GRPC_ERROR_NONE; if (c->connecting_endpoint == NULL) { memset(c->result, 0, sizeof(*c->result)); gpr_mu_unlock(&c->mu); } else if (status != GRPC_SECURITY_OK) { - gpr_log(GPR_ERROR, "Secure handshake failed with error %d.", status); + error = grpc_error_set_int(GRPC_ERROR_CREATE("Secure handshake failed"), + GRPC_ERROR_INT_SECURITY_STATUS, status); memset(c->result, 0, sizeof(*c->result)); c->connecting_endpoint = NULL; gpr_mu_unlock(&c->mu); @@ -113,7 +115,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } notify = c->notify; c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c index bd87253ed3..7d5279b9da 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c @@ -36,11 +36,14 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/metadata.h" +extern int grpc_http_write_state_trace; + void grpc_chttp2_plugin_init(void) { grpc_chttp2_base64_encode_and_huffman_compress = grpc_chttp2_base64_encode_and_huffman_compress_impl; grpc_register_tracer("http", &grpc_http_trace); grpc_register_tracer("flowctl", &grpc_flowctl_trace); + grpc_register_tracer("http_write_state", &grpc_http_write_state_trace); } void grpc_chttp2_plugin_shutdown(void) {} diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index e2dd463a77..d050467a02 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -48,6 +48,7 @@ #include "src/core/ext/transport/chttp2/transport/status_conversion.h" #include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" @@ -60,9 +61,9 @@ #define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) #define MAX_CLIENT_STREAM_ID 0x7fffffffu - int grpc_http_trace = 0; int grpc_flowctl_trace = 0; +int grpc_http_write_state_trace = 0; #define TRANSPORT_FROM_WRITING(tw) \ ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \ @@ -88,10 +89,16 @@ static const grpc_transport_vtable vtable; static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); +static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); + +static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); +static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, grpc_error *error); /** Set a transport level setting, and push it to our peer */ -static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, - uint32_t value); +static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_setting_id id, uint32_t value); /** Start disconnection chain */ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -137,7 +144,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); static void incoming_byte_stream_update_flow_control( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, size_t max_size_hint, size_t have_already); static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, @@ -201,6 +208,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, gpr_free(t); } +/*#define REFCOUNTING_DEBUG 1*/ #ifdef REFCOUNTING_DEBUG #define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__) #define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__) @@ -231,7 +239,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, const grpc_channel_args *channel_args, - grpc_endpoint *ep, uint8_t is_client) { + grpc_endpoint *ep, bool is_client) { size_t i; int j; @@ -273,6 +281,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->writing_action, writing_action, t); grpc_closure_init(&t->reading_action, reading_action, t); grpc_closure_init(&t->parsing_action, parsing_action, t); + grpc_closure_init(&t->initiate_writing, initiate_writing, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); @@ -286,6 +295,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_slice_buffer_add( &t->global.qbuf, gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write"); } /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet @@ -311,11 +321,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* configure http2 the way we like it */ if (is_client) { - push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } - push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW); - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + DEFAULT_WINDOW); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); if (channel_args) { @@ -329,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be an integer", GRPC_ARG_MAX_CONCURRENT_STREAMS); } else { - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, (uint32_t)channel_args->args[i].value.integer); } } else if (0 == strcmp(channel_args->args[i].key, @@ -368,7 +379,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be non-negative", GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER); } else { - push_setting(t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, (uint32_t)channel_args->args[i].value.integer); } } else if (0 == strcmp(channel_args->args[i].key, @@ -393,7 +404,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be non-negative", GRPC_ARG_MAX_METADATA_SIZE); } else { - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, (uint32_t)channel_args->args[i].value.integer); } } @@ -444,6 +455,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { if (!t->closed) { + if (grpc_http_write_state_trace) { + gpr_log(GPR_DEBUG, "W:%p close transport", t); + } t->closed = 1; connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); @@ -590,7 +604,8 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer_destroy( &s->global.received_trailing_metadata); gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer); - GRPC_ERROR_UNREF(s->global.removal_error); + GRPC_ERROR_UNREF(s->global.read_closed_error); + GRPC_ERROR_UNREF(s->global.write_closed_error); UNREF_TRANSPORT(exec_ctx, t, "stream"); @@ -634,6 +649,36 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( * LOCK MANAGEMENT */ +static const char *write_state_name(grpc_chttp2_write_state state) { + switch (state) { + case GRPC_CHTTP2_WRITING_INACTIVE: + return "INACTIVE"; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + return "REQUESTED[p=0]"; + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + return "REQUESTED[p=1]"; + case GRPC_CHTTP2_WRITE_SCHEDULED: + return "SCHEDULED"; + case GRPC_CHTTP2_WRITING: + return "WRITING"; + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + return "WRITING[p=1]"; + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + return "WRITING[p=0]"; + } + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} + +static void set_write_state(grpc_chttp2_transport *t, + grpc_chttp2_write_state state, const char *reason) { + if (grpc_http_write_state_trace) { + gpr_log(GPR_DEBUG, "W:%p %s -> %s because %s", t, + write_state_name(t->executor.write_state), write_state_name(state), + reason); + } + t->executor.write_state = state; +} + static void finish_global_actions(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_executor_action_header *hdr; @@ -642,13 +687,6 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("finish_global_actions", 0); for (;;) { - if (!t->executor.writing_active && !t->closed && - grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { - t->executor.writing_active = 1; - REF_TRANSPORT(t, "writing"); - prevent_endpoint_shutdown(t); - grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); - } check_read_ops(exec_ctx, &t->global); gpr_mu_lock(&t->executor.mu); @@ -669,8 +707,28 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, continue; } else { t->executor.global_active = false; + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking"); + REF_TRANSPORT(t, "initiate_writing"); + gpr_mu_unlock(&t->executor.mu); + grpc_exec_ctx_sched( + exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE, + t->ep != NULL ? grpc_endpoint_get_workqueue(t->ep) : NULL); + break; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + start_writing(exec_ctx, t); + gpr_mu_unlock(&t->executor.mu); + break; + case GRPC_CHTTP2_WRITING_INACTIVE: + case GRPC_CHTTP2_WRITING: + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + case GRPC_CHTTP2_WRITE_SCHEDULED: + gpr_mu_unlock(&t->executor.mu); + break; + } } - gpr_mu_unlock(&t->executor.mu); break; } @@ -741,16 +799,118 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, * OUTPUT PROCESSING */ -void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { +void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + bool covered_by_poller, const char *reason) { + /* Perform state checks, and transition to a scheduled state if appropriate. + Each time we finish the global lock execution, we check if we need to + write. If we do: + - (if there is a poller surrounding the write) schedule + initiate_writing, which locks and calls initiate_writing_locked to... + - call start_writing, which verifies (under the global lock) that there + are things that need to be written by calling + grpc_chttp2_unlocking_check_writes, and if so schedules writing_action + against the current exec_ctx, to be executed OUTSIDE of the global lock + - eventually writing_action results in grpc_chttp2_terminate_writing being + called, which re-takes the global lock, updates state, checks if we need + to do *another* write immediately, and if so loops back to + start_writing. + + Current problems: + - too much lock entry/exiting + - the writing thread can become stuck indefinitely (punt through the + workqueue periodically to fix) */ + + grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITING_INACTIVE: + set_write_state(t, covered_by_poller + ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER + : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + reason); + break; + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + /* nothing to do: write already requested */ + break; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + if (covered_by_poller) { + /* upgrade to note poller is available to cover the write */ + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason); + } + break; + case GRPC_CHTTP2_WRITE_SCHEDULED: + /* nothing to do: write already scheduled */ + break; + case GRPC_CHTTP2_WRITING: + set_write_state(t, + covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER + : GRPC_CHTTP2_WRITING_STALE_NO_POLLER, + reason); + break; + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + /* nothing to do: write already requested */ + break; + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + if (covered_by_poller) { + /* upgrade to note poller is available to cover the write */ + set_write_state(t, GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, reason); + } + break; + } +} + +static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { + GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED || + t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER); + if (!t->closed && + grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { + set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing"); + REF_TRANSPORT(t, "writing"); + prevent_endpoint_shutdown(t); + grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); + } else { + if (t->closed) { + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, + "start_writing:transport_closed"); + } else { + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, + "start_writing:nothing_to_write"); + } + end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write")); + if (t->ep && !t->endpoint_reading) { + destroy_endpoint(exec_ctx, t); + } + } +} + +static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, + void *arg_ignored) { + start_writing(exec_ctx, t); + UNREF_TRANSPORT(exec_ctx, t, "initiate_writing"); +} + +static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked, + NULL, 0); +} + +void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + bool covered_by_poller, const char *reason) { if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed && grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) { GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); + grpc_chttp2_initiate_write(exec_ctx, transport_global, covered_by_poller, + reason); } } -static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, - uint32_t value) { +static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_setting_id id, uint32_t value) { const grpc_chttp2_setting_parameters *sp = &grpc_chttp2_settings_parameters[id]; uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value); @@ -761,9 +921,22 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) { t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value; t->global.dirtied_local_settings = 1; + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "push_setting"); } } +static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, grpc_error *error) { + grpc_chttp2_stream_global *stream_global; + while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, + &stream_global)) { + fail_pending_writes(exec_ctx, &t->global, stream_global, + GRPC_ERROR_REF(error)); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); + } + GRPC_ERROR_UNREF(error); +} + static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, @@ -778,24 +951,32 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); - grpc_chttp2_stream_global *stream_global; - while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, - &stream_global)) { - fail_pending_writes(exec_ctx, &t->global, stream_global, - GRPC_ERROR_REF(error)); - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); + end_waiting_for_write(exec_ctx, t, error); + + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITING_INACTIVE: + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + case GRPC_CHTTP2_WRITE_SCHEDULED: + GPR_UNREACHABLE_CODE(break); + case GRPC_CHTTP2_WRITING: + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing"); + break; + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, + "terminate_writing"); + break; + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + "terminate_writing"); + break; } - /* leave the writing flag up on shutdown to prevent further writes in - unlock() - from starting */ - t->executor.writing_active = 0; if (t->ep && !t->endpoint_reading) { destroy_endpoint(exec_ctx, t); } UNREF_TRANSPORT(exec_ctx, t, "writing"); - GRPC_ERROR_UNREF(error); } void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, @@ -878,7 +1059,8 @@ static void maybe_start_some_streams( stream_global->id, STREAM_FROM_GLOBAL(stream_global)); stream_global->in_stream_map = true; transport_global->concurrent_stream_count++; - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, true, + "new_stream"); } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -1018,9 +1200,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, maybe_start_some_streams(exec_ctx, transport_global); } else { GPR_ASSERT(stream_global->id != 0); - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + true, "op.send_initial_metadata"); } } else { + stream_global->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, &stream_global->send_initial_metadata_finished, @@ -1042,7 +1226,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, } else { stream_global->send_message = op->send_message; if (stream_global->id != 0) { - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + true, "op.send_message"); } } } @@ -1075,6 +1260,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } if (stream_global->write_closed) { + stream_global->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, &stream_global->send_trailing_metadata_finished, @@ -1085,7 +1271,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, } else if (stream_global->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + true, "op.send_trailing_metadata"); } } } @@ -1106,8 +1293,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, (stream_global->incoming_frames.head == NULL || stream_global->incoming_frames.head->is_tail)) { incoming_byte_stream_update_flow_control( - transport_global, stream_global, transport_global->stream_lookahead, - 0); + exec_ctx, transport_global, stream_global, + transport_global->stream_lookahead, 0); } grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } @@ -1135,7 +1322,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, sizeof(*op)); } -static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { +static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_closure *on_recv) { grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); p->next = &t->global.pings; p->prev = p->next->prev; @@ -1150,6 +1338,7 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { p->id[7] = (uint8_t)(t->global.ping_counter & 0xff); p->on_recv = on_recv; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); + grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping"); } static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1209,6 +1398,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, close_transport = grpc_chttp2_has_streams(t) ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("GOAWAY sent"); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "goaway_sent"); } if (op->set_accept_stream) { @@ -1226,7 +1416,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->send_ping) { - send_ping_locked(t, op->send_ping); + send_ping_locked(exec_ctx, t, op->send_ping); } if (close_transport != GRPC_ERROR_NONE) { @@ -1414,6 +1604,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, &transport_global->qbuf, grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error, &stream_global->stats.outgoing)); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "rst_stream"); } const char *msg = @@ -1473,10 +1665,39 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, } } +static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) { + if (error == GRPC_ERROR_NONE) return; + for (size_t i = 0; i < *nrefs; i++) { + if (error == refs[i]) { + return; + } + } + refs[*nrefs] = error; + ++*nrefs; +} + +static grpc_error *removal_error(grpc_error *extra_error, + grpc_chttp2_stream_global *stream_global) { + grpc_error *refs[3]; + size_t nrefs = 0; + add_error(stream_global->read_closed_error, refs, &nrefs); + add_error(stream_global->write_closed_error, refs, &nrefs); + add_error(extra_error, refs, &nrefs); + grpc_error *error = GRPC_ERROR_NONE; + if (nrefs > 0) { + error = GRPC_ERROR_CREATE_REFERENCING("Failed due to stream removal", refs, + nrefs); + } + GRPC_ERROR_UNREF(extra_error); + return error; +} + static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_error *error) { + error = removal_error(error, stream_global); + stream_global->send_message = NULL; grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, &stream_global->send_initial_metadata_finished, GRPC_ERROR_REF(error)); @@ -1499,14 +1720,17 @@ void grpc_chttp2_mark_stream_closed( } grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); if (close_reads && !stream_global->read_closed) { + stream_global->read_closed_error = GRPC_ERROR_REF(error); stream_global->read_closed = true; stream_global->published_initial_metadata = true; stream_global->published_trailing_metadata = true; decrement_active_streams_locked(exec_ctx, transport_global, stream_global); } if (close_writes && !stream_global->write_closed) { + stream_global->write_closed_error = GRPC_ERROR_REF(error); stream_global->write_closed = true; - if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) { + if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.write_state != + GRPC_CHTTP2_WRITING_INACTIVE) { GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes"); grpc_chttp2_list_add_closed_waiting_for_writing(transport_global, stream_global); @@ -1516,7 +1740,6 @@ void grpc_chttp2_mark_stream_closed( } } if (stream_global->read_closed && stream_global->write_closed) { - stream_global->removal_error = GRPC_ERROR_REF(error); if (stream_global->id != 0 && TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) { grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, @@ -1524,7 +1747,8 @@ void grpc_chttp2_mark_stream_closed( } else { if (stream_global->id != 0) { remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), - stream_global->id, GRPC_ERROR_REF(error)); + stream_global->id, + removal_error(GRPC_ERROR_REF(error), stream_global)); } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } @@ -1649,6 +1873,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, 1, error); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "close_from_api"); } typedef struct { @@ -1678,8 +1904,14 @@ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } /** update window from a settings change */ +typedef struct { + grpc_chttp2_transport *t; + grpc_exec_ctx *exec_ctx; +} update_global_window_args; + static void update_global_window(void *args, uint32_t id, void *stream) { - grpc_chttp2_transport *t = args; + update_global_window_args *a = args; + grpc_chttp2_transport *t = a->t; grpc_chttp2_stream *s = stream; grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_stream_global *stream_global = &s->global; @@ -1693,7 +1925,8 @@ static void update_global_window(void *args, uint32_t id, void *stream) { is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(a->exec_ctx, transport_global, stream_global, + true, "update_global_window"); } } @@ -1801,14 +2034,19 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; /* copy parsing qbuf to global qbuf */ - gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); + if (t->parsing.qbuf.count > 0) { + gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "parsing_qbuf"); + } /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); transport_global->concurrent_stream_count = (uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map); if (transport_parsing->initial_window_update != 0) { + update_global_window_args args = {t, exec_ctx}; grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, - update_global_window, t); + update_global_window, &args); transport_parsing->initial_window_update = 0; } /* handle higher level things */ @@ -1831,7 +2069,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(stream_global->write_closed); GPR_ASSERT(stream_global->read_closed); remove_stream(exec_ctx, t, stream_global->id, - GRPC_ERROR_REF(stream_global->removal_error)); + removal_error(GRPC_ERROR_NONE, stream_global)); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } @@ -1854,11 +2092,12 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, } drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; - if (!t->executor.writing_active && t->ep) { - grpc_endpoint_destroy(exec_ctx, t->ep); - t->ep = NULL; - /* safe as we still have a ref for read */ - UNREF_TRANSPORT(exec_ctx, t, "disconnect"); + if (grpc_http_write_state_trace) { + gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t, + write_state_name(t->executor.write_state)); + } + if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) { + destroy_endpoint(exec_ctx, t); } } else if (!t->closed) { keep_reading = true; @@ -1942,7 +2181,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, } static void incoming_byte_stream_update_flow_control( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, size_t max_size_hint, size_t have_already) { uint32_t max_recv_bytes; @@ -1977,7 +2216,8 @@ static void incoming_byte_stream_update_flow_control( add_max_recv_bytes); grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, stream_global); - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "read_incoming_stream"); } } @@ -1999,8 +2239,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global = &bs->stream->global; if (bs->is_tail) { - incoming_byte_stream_update_flow_control( - transport_global, stream_global, arg->max_size_hint, bs->slices.length); + incoming_byte_stream_update_flow_control(exec_ctx, transport_global, + stream_global, arg->max_size_hint, + bs->slices.length); } if (bs->slices.count > 0) { *arg->slice = gpr_slice_buffer_take_first(&bs->slices); @@ -2184,7 +2425,7 @@ static char *format_flowctl_context_var(const char *context, const char *var, if (context == NULL) { *scope = NULL; gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val); - result = gpr_leftpad(buf, ' ', 40); + result = gpr_leftpad(buf, ' ', 60); gpr_free(buf); return result; } @@ -2197,7 +2438,7 @@ static char *format_flowctl_context_var(const char *context, const char *var, gpr_free(tmp); } gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val); - result = gpr_leftpad(buf, ' ', 40); + result = gpr_leftpad(buf, ' ', 60); gpr_free(buf); return result; } @@ -2230,7 +2471,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, tmp_phase = gpr_leftpad(phase, ' ', 8); tmp_scope1 = gpr_leftpad(scope1, ' ', 11); - gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1); + gpr_asprintf(&prefix, "FLOW %s: %s %s ", tmp_phase, clisvr, scope1); gpr_free(tmp_phase); gpr_free(tmp_scope1); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 8d79e93ceb..e1dcf5262a 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -305,6 +305,22 @@ typedef struct grpc_chttp2_executor_action_header { void *arg; } grpc_chttp2_executor_action_header; +typedef enum { + /** no writing activity */ + GRPC_CHTTP2_WRITING_INACTIVE, + /** write has been requested, but not scheduled yet */ + GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, + GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + /** write has been requested and scheduled against the workqueue */ + GRPC_CHTTP2_WRITE_SCHEDULED, + /** write has been initiated after being reaped from the workqueue */ + GRPC_CHTTP2_WRITING, + /** write has been initiated, AND another write needs to be started once it's + done */ + GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, + GRPC_CHTTP2_WRITING_STALE_NO_POLLER, +} grpc_chttp2_write_state; + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ gpr_refcount refs; @@ -319,10 +335,10 @@ struct grpc_chttp2_transport { /** is a thread currently in the global lock */ bool global_active; - /** is a thread currently writing */ - bool writing_active; /** is a thread currently parsing */ bool parsing_active; + /** write execution state of the transport */ + grpc_chttp2_write_state write_state; grpc_chttp2_executor_action_header *pending_actions_head; grpc_chttp2_executor_action_header *pending_actions_tail; @@ -342,7 +358,8 @@ struct grpc_chttp2_transport { /** global state for reading/writing */ grpc_chttp2_transport_global global; /** state only accessible by the chain of execution that - set writing_active=1 */ + set writing_state >= GRPC_WRITING, and only by the writing closure + chain. */ grpc_chttp2_transport_writing writing; /** state only accessible by the chain of execution that set parsing_active=1 */ @@ -363,6 +380,8 @@ struct grpc_chttp2_transport { grpc_closure reading_action; /** closure to actually do parsing */ grpc_closure parsing_action; + /** closure to initiate writing */ + grpc_closure initiate_writing; /** incoming read bytes */ gpr_slice_buffer read_buffer; @@ -436,8 +455,10 @@ typedef struct { bool seen_error; bool exceeded_metadata_size; - /** the error that resulted in this stream being removed */ - grpc_error *removal_error; + /** the error that resulted in this stream being read-closed */ + grpc_error *read_closed_error; + /** the error that resulted in this stream being write-closed */ + grpc_error *write_closed_error; bool published_initial_metadata; bool published_trailing_metadata; @@ -514,15 +535,20 @@ struct grpc_chttp2_stream { }; /** Transport writing call flow: - chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes - are required; - if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the - writes. - Once writes have been completed (meaning another write could potentially be - started), - grpc_chttp2_terminate_writing is called. This will call - grpc_chttp2_cleanup_writing, at which - point the write phase is complete. */ + grpc_chttp2_initiate_write() is called anywhere that we know bytes need to + go out on the wire. + If no other write has been started, a task is enqueued onto our workqueue. + When that task executes, it obtains the global lock, and gathers the data + to write. + The global lock is dropped and we do the syscall to write. + After writing, a follow-up check is made to see if another round of writing + should be performed. + + The actual call chain is documented in the implementation of this function. + */ +void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + bool covered_by_poller, const char *reason); /** Someone is unlocking the transport mutex: check to see if writes are required, and schedule them if so */ @@ -610,9 +636,8 @@ int grpc_chttp2_list_pop_check_read_ops( void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); -void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, - bool is_window_available); +bool grpc_chttp2_list_flush_writing_stalled_by_transport( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing); void grpc_chttp2_list_add_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, @@ -822,7 +847,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, /** add a ref to the stream and add it to the writable list; ref will be dropped in writing.c */ -void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); +void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + bool covered_by_poller, const char *reason); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 84eb5752f1..e1fc0ddee2 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -154,10 +154,8 @@ void grpc_chttp2_publish_reads( transport_parsing, outgoing_window); is_zero = transport_global->outgoing_window <= 0; if (was_zero && !is_zero) { - while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, - &stream_global)) { - grpc_chttp2_become_writable(transport_global, stream_global); - } + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "new_global_flow_control"); } if (transport_parsing->incoming_window < @@ -168,6 +166,8 @@ void grpc_chttp2_publish_reads( announce_incoming_window, announce_bytes); GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing, incoming_window, announce_bytes); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "global incoming window"); } /* for each stream that saw an update, fixup global state */ @@ -190,7 +190,8 @@ void grpc_chttp2_publish_reads( outgoing_window); is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "stream.read_flow_control"); } stream_global->max_recv_bytes -= (uint32_t)GPR_MIN( diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 8f3ab00e6d..2eb5f5f632 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -329,6 +329,7 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing); + gpr_log(GPR_DEBUG, "writing stalled %d", stream->global.id); if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) { GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled"); } @@ -336,27 +337,28 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT); } -void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, - bool is_window_available) { +bool grpc_chttp2_list_flush_writing_stalled_by_transport( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream *stream; + bool out = false; grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing); while (stream_list_pop(transport, &stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) { - if (is_window_available) { - grpc_chttp2_become_writable(&transport->global, &stream->global); - } else { - grpc_chttp2_list_add_stalled_by_transport(transport_writing, - &stream->writing); - } + gpr_log(GPR_DEBUG, "move %d from writing stalled to just stalled", + stream->global.id); + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + &stream->writing); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global, "chttp2_writing_stalled"); + out = true; } + return out; } void grpc_chttp2_list_add_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { + gpr_log(GPR_DEBUG, "stalled %d", stream_writing->id); stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index b19f5f068d..e0d87725e9 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -75,9 +75,13 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, transport_global, outgoing_window); - bool is_window_available = transport_writing->outgoing_window > 0; - grpc_chttp2_list_flush_writing_stalled_by_transport( - exec_ctx, transport_writing, is_window_available); + if (transport_writing->outgoing_window > 0) { + while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, + &stream_global)) { + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "transport.read_flow_control"); + } + } /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ @@ -331,6 +335,12 @@ void grpc_chttp2_cleanup_writing( grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; + if (grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx, + transport_writing)) { + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "resume_stalled_stream"); + } + while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { if (stream_writing->sent_initial_metadata) { diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index 1ab3733d38..f901fcf962 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -65,3 +65,7 @@ void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { char* grpc_endpoint_get_peer(grpc_endpoint* ep) { return ep->vtable->get_peer(ep); } + +grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) { + return ep->vtable->get_workqueue(ep); +} diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index f9808bbda1..894efc0b23 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -51,6 +51,7 @@ struct grpc_endpoint_vtable { gpr_slice_buffer *slices, grpc_closure *cb); void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb); + grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep); void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -69,6 +70,9 @@ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, char *grpc_endpoint_get_peer(grpc_endpoint *ep); +/* Retrieve a reference to the workqueue associated with this endpoint */ +grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep); + /* Write slices out to the socket. If the connection is ready for more data after the end of the call, it diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index cf0fe736a0..6a63c4d1d1 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -57,6 +57,7 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -113,9 +114,7 @@ struct grpc_fd { grpc_closure *read_closure; grpc_closure *write_closure; - /* The polling island to which this fd belongs to and the mutex protecting the - the field */ - gpr_mu pi_mu; + /* The polling island to which this fd belongs to (protected by mu) */ struct polling_island *polling_island; struct grpc_fd *freelist_next; @@ -152,16 +151,17 @@ static void fd_global_shutdown(void); * Polling island Declarations */ -// #define GRPC_PI_REF_COUNT_DEBUG +//#define GRPC_PI_REF_COUNT_DEBUG #ifdef GRPC_PI_REF_COUNT_DEBUG #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__) -#define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__) +#define PI_UNREF(exec_ctx, p, r) \ + pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__) #else /* defined(GRPC_PI_REF_COUNT_DEBUG) */ #define PI_ADD_REF(p, r) pi_add_ref((p)) -#define PI_UNREF(p, r) pi_unref((p)) +#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p)) #endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */ @@ -172,7 +172,7 @@ typedef struct polling_island { Once the ref count becomes zero, this structure is destroyed which means we should ensure that there is never a scenario where a PI_ADD_REF() is racing with a PI_UNREF() that just made the ref_count zero. */ - gpr_refcount ref_count; + gpr_atm ref_count; /* Pointer to the polling_island this merged into. * merged_to value is only set once in polling_island's lifetime (and that too @@ -184,6 +184,9 @@ typedef struct polling_island { * (except mu and ref_count) are invalid and must be ignored. */ gpr_atm merged_to; + /* The workqueue associated with this polling island */ + grpc_workqueue *workqueue; + /* The fd of the underlying epoll set */ int epoll_fd; @@ -191,11 +194,6 @@ typedef struct polling_island { size_t fd_cnt; size_t fd_capacity; grpc_fd **fds; - - /* Polling islands that are no longer needed are kept in a freelist so that - they can be reused. This field points to the next polling island in the - free list */ - struct polling_island *next_free; } polling_island; /******************************************************************************* @@ -253,13 +251,14 @@ struct grpc_pollset_set { * Common helpers */ -static void append_error(grpc_error **composite, grpc_error *error, +static bool append_error(grpc_error **composite, grpc_error *error, const char *desc) { - if (error == GRPC_ERROR_NONE) return; + if (error == GRPC_ERROR_NONE) return true; if (*composite == GRPC_ERROR_NONE) { *composite = GRPC_ERROR_CREATE(desc); } *composite = grpc_error_add_child(*composite, error); + return false; } /******************************************************************************* @@ -275,11 +274,8 @@ static void append_error(grpc_error **composite, grpc_error *error, threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */ static grpc_wakeup_fd polling_island_wakeup_fd; -/* Polling island freelist */ -static gpr_mu g_pi_freelist_mu; -static polling_island *g_pi_freelist = NULL; - -static void polling_island_delete(); /* Forward declaration */ +/* Forward declaration */ +static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); #ifdef GRPC_TSAN /* Currently TSAN may incorrectly flag data races between epoll_ctl and @@ -293,28 +289,35 @@ gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ #ifdef GRPC_PI_REF_COUNT_DEBUG -void pi_add_ref(polling_island *pi); -void pi_unref(polling_island *pi); +static void pi_add_ref(polling_island *pi); +static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi); -void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, int line) { - long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count)); +static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, + int line) { + long old_cnt = gpr_atm_acq_load(&pi->ref_count); pi_add_ref(pi); gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, old_cnt + 1, reason, file, line); } -void pi_unref_dbg(polling_island *pi, char *reason, char *file, int line) { - long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count)); - pi_unref(pi); +static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi, + char *reason, char *file, int line) { + long old_cnt = gpr_atm_acq_load(&pi->ref_count); + pi_unref(exec_ctx, pi); gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, (old_cnt - 1), reason, file, line); } #endif -void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); } +static void pi_add_ref(polling_island *pi) { + gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1); +} -void pi_unref(polling_island *pi) { - /* If ref count went to zero, delete the polling island. +static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) { + /* If ref count went to one, we're back to just the workqueue owning a ref. + Unref the workqueue to break the loop. + + If ref count went to zero, delete the polling island. Note that this deletion not be done under a lock. Once the ref count goes to zero, we are guaranteed that no one else holds a reference to the polling island (and that there is no racing pi_add_ref() call either). @@ -322,12 +325,20 @@ void pi_unref(polling_island *pi) { Also, if we are deleting the polling island and the merged_to field is non-empty, we should remove a ref to the merged_to polling island */ - if (gpr_unref(&pi->ref_count)) { - polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); - polling_island_delete(pi); - if (next != NULL) { - PI_UNREF(next, "pi_delete"); /* Recursive call */ + switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) { + case 2: /* last external ref: the only one now owned is by the workqueue */ + GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island"); + break; + case 1: { + polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); + polling_island_delete(exec_ctx, pi); + if (next != NULL) { + PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */ + } + break; } + case 0: + GPR_UNREACHABLE_CODE(return ); } } @@ -462,69 +473,68 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd, } /* Might return NULL in case of an error */ -static polling_island *polling_island_create(grpc_fd *initial_fd, +static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, + grpc_fd *initial_fd, grpc_error **error) { polling_island *pi = NULL; - char *err_msg; const char *err_desc = "polling_island_create"; - /* Try to get one from the polling island freelist */ - gpr_mu_lock(&g_pi_freelist_mu); - if (g_pi_freelist != NULL) { - pi = g_pi_freelist; - g_pi_freelist = g_pi_freelist->next_free; - pi->next_free = NULL; - } - gpr_mu_unlock(&g_pi_freelist_mu); + *error = GRPC_ERROR_NONE; - /* Create new polling island if we could not get one from the free list */ - if (pi == NULL) { - pi = gpr_malloc(sizeof(*pi)); - gpr_mu_init(&pi->mu); - pi->fd_cnt = 0; - pi->fd_capacity = 0; - pi->fds = NULL; - } + pi = gpr_malloc(sizeof(*pi)); + gpr_mu_init(&pi->mu); + pi->fd_cnt = 0; + pi->fd_capacity = 0; + pi->fds = NULL; + pi->epoll_fd = -1; + pi->workqueue = NULL; - gpr_ref_init(&pi->ref_count, 0); + gpr_atm_rel_store(&pi->ref_count, 0); gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (pi->epoll_fd < 0) { - gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno, - strerror(errno)); - append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); - gpr_free(err_msg); - } else { - polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); - pi->next_free = NULL; + append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc); + goto done; + } - if (initial_fd != NULL) { - /* Lock the polling island here just in case we got this structure from - the freelist and the polling island lock was not released yet (by the - code that adds the polling island to the freelist) */ - gpr_mu_lock(&pi->mu); - polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); - gpr_mu_unlock(&pi->mu); - } + polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); + + if (initial_fd != NULL) { + polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); + } + + if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue), + err_desc) && + *error == GRPC_ERROR_NONE) { + polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true, + error); + GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL); + pi->workqueue->wakeup_read_fd->polling_island = pi; + PI_ADD_REF(pi, "fd"); } +done: + if (*error != GRPC_ERROR_NONE) { + if (pi->workqueue != NULL) { + GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island"); + } + polling_island_delete(exec_ctx, pi); + pi = NULL; + } return pi; } -static void polling_island_delete(polling_island *pi) { +static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) { GPR_ASSERT(pi->fd_cnt == 0); - gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); - - close(pi->epoll_fd); - pi->epoll_fd = -1; - - gpr_mu_lock(&g_pi_freelist_mu); - pi->next_free = g_pi_freelist; - g_pi_freelist = pi; - gpr_mu_unlock(&g_pi_freelist_mu); + if (pi->epoll_fd >= 0) { + close(pi->epoll_fd); + } + gpr_mu_destroy(&pi->mu); + gpr_free(pi->fds); + gpr_free(pi); } /* Attempts to gets the last polling island in the linked list (liked by the @@ -704,9 +714,6 @@ static polling_island *polling_island_merge(polling_island *p, static grpc_error *polling_island_global_init() { grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_init(&g_pi_freelist_mu); - g_pi_freelist = NULL; - error = grpc_wakeup_fd_init(&polling_island_wakeup_fd); if (error == GRPC_ERROR_NONE) { error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd); @@ -716,18 +723,6 @@ static grpc_error *polling_island_global_init() { } static void polling_island_global_shutdown() { - polling_island *next; - gpr_mu_lock(&g_pi_freelist_mu); - gpr_mu_unlock(&g_pi_freelist_mu); - while (g_pi_freelist != NULL) { - next = g_pi_freelist->next_free; - gpr_mu_destroy(&g_pi_freelist->mu); - gpr_free(g_pi_freelist->fds); - gpr_free(g_pi_freelist); - g_pi_freelist = next; - } - gpr_mu_destroy(&g_pi_freelist_mu); - grpc_wakeup_fd_destroy(&polling_island_wakeup_fd); } @@ -845,7 +840,6 @@ static grpc_fd *fd_create(int fd, const char *name) { if (new_fd == NULL) { new_fd = gpr_malloc(sizeof(grpc_fd)); gpr_mu_init(&new_fd->mu); - gpr_mu_init(&new_fd->pi_mu); } /* Note: It is not really needed to get the new_fd->mu lock here. If this is a @@ -896,6 +890,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, const char *reason) { bool is_fd_closed = false; grpc_error *error = GRPC_ERROR_NONE; + polling_island *unref_pi = NULL; gpr_mu_lock(&fd->mu); fd->on_done_closure = on_done; @@ -923,21 +918,26 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - Unlock the latest polling island - Set fd->polling_island to NULL (but remove the ref on the polling island before doing this.) */ - gpr_mu_lock(&fd->pi_mu); if (fd->polling_island != NULL) { polling_island *pi_latest = polling_island_lock(fd->polling_island); polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error); gpr_mu_unlock(&pi_latest->mu); - PI_UNREF(fd->polling_island, "fd_orphan"); + unref_pi = fd->polling_island; fd->polling_island = NULL; } - gpr_mu_unlock(&fd->pi_mu); grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL); gpr_mu_unlock(&fd->mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ + if (unref_pi != NULL) { + /* Unref stale polling island here, outside the fd lock above. + The polling island owns a workqueue which owns an fd, and unreffing + inside the lock can cause an eventual lock loop that makes TSAN very + unhappy. */ + PI_UNREF(exec_ctx, unref_pi, "fd_orphan"); + } GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); } @@ -1037,6 +1037,17 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_mu_unlock(&fd->mu); } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { + gpr_mu_lock(&fd->mu); + grpc_workqueue *workqueue = NULL; + if (fd->polling_island != NULL) { + workqueue = + GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue"); + } + gpr_mu_unlock(&fd->mu); + return workqueue; +} + /******************************************************************************* * Pollset Definitions */ @@ -1227,9 +1238,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_unlock(&fd->mu); } -static void pollset_release_polling_island(grpc_pollset *ps, char *reason) { +static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, + grpc_pollset *ps, char *reason) { if (ps->polling_island != NULL) { - PI_UNREF(ps->polling_island, reason); + PI_UNREF(exec_ctx, ps->polling_island, reason); } ps->polling_island = NULL; } @@ -1242,7 +1254,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, pollset->finish_shutdown_called = true; /* Release the ref and set pollset->polling_island to NULL */ - pollset_release_polling_island(pollset, "ps_shutdown"); + pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown"); grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } @@ -1281,7 +1293,7 @@ static void pollset_reset(grpc_pollset *pollset) { pollset->finish_shutdown_called = false; pollset->kicked_without_pollers = false; pollset->shutdown_done = NULL; - pollset_release_polling_island(pollset, "ps_reset"); + GPR_ASSERT(pollset->polling_island == NULL); } #define GRPC_EPOLL_MAX_EVENTS 1000 @@ -1309,7 +1321,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, this function (i.e pollset_work_and_unlock()) is called */ if (pollset->polling_island == NULL) { - pollset->polling_island = polling_island_create(NULL, error); + pollset->polling_island = polling_island_create(exec_ctx, NULL, error); if (pollset->polling_island == NULL) { GPR_TIMER_END("pollset_work_and_unlock", 0); return; /* Fatal error. We cannot continue */ @@ -1329,7 +1341,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the polling island to be deleted */ PI_ADD_REF(pi, "ps"); - PI_UNREF(pollset->polling_island, "ps"); + PI_UNREF(exec_ctx, pollset->polling_island, "ps"); pollset->polling_island = pi; } @@ -1400,7 +1412,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, that we got before releasing the polling island lock). This is because pollset->polling_island pointer might get udpated in other parts of the code when there is an island merge while we are doing epoll_wait() above */ - PI_UNREF(pi, "ps_work"); + PI_UNREF(exec_ctx, pi, "ps_work"); GPR_TIMER_END("pollset_work_and_unlock", 0); } @@ -1517,10 +1529,11 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&pollset->mu); - gpr_mu_lock(&fd->pi_mu); + gpr_mu_lock(&fd->mu); polling_island *pi_new = NULL; +retry: /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and * equal, do nothing. * 2) If fd->polling_island and pollset->polling_island are both NULL, create @@ -1535,15 +1548,44 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * polling_island fields in both fd and pollset to point to the merged * polling island. */ + + if (fd->orphaned) { + gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&pollset->mu); + /* early out */ + return; + } + if (fd->polling_island == pollset->polling_island) { pi_new = fd->polling_island; if (pi_new == NULL) { - pi_new = polling_island_create(fd, &error); - - GRPC_POLLING_TRACE( - "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, " - "pollset: %p)", - (void *)pi_new, fd->fd, (void *)pollset); + /* Unlock before creating a new polling island: the polling island will + create a workqueue which creates a file descriptor, and holding an fd + lock here can eventually cause a loop to appear to TSAN (making it + unhappy). We don't think it's a real loop (there's an epoch point where + that loop possibility disappears), but the advantages of keeping TSAN + happy outweigh any performance advantage we might have by keeping the + lock held. */ + gpr_mu_unlock(&fd->mu); + pi_new = polling_island_create(exec_ctx, fd, &error); + gpr_mu_lock(&fd->mu); + /* Need to reverify any assumptions made between the initial lock and + getting to this branch: if they've changed, we need to throw away our + work and figure things out again. */ + if (fd->polling_island != NULL) { + GRPC_POLLING_TRACE( + "pollset_add_fd: Raced creating new polling island. pi_new: %p " + "(fd: %d, pollset: %p)", + (void *)pi_new, fd->fd, (void *)pollset); + PI_ADD_REF(pi_new, "dance_of_destruction"); + PI_UNREF(exec_ctx, pi_new, "dance_of_destruction"); + goto retry; + } else { + GRPC_POLLING_TRACE( + "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, " + "pollset: %p)", + (void *)pi_new, fd->fd, (void *)pollset); + } } } else if (fd->polling_island == NULL) { pi_new = polling_island_lock(pollset->polling_island); @@ -1579,7 +1621,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (fd->polling_island != pi_new) { PI_ADD_REF(pi_new, "fd"); if (fd->polling_island != NULL) { - PI_UNREF(fd->polling_island, "fd"); + PI_UNREF(exec_ctx, fd->polling_island, "fd"); } fd->polling_island = pi_new; } @@ -1587,13 +1629,15 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->polling_island != pi_new) { PI_ADD_REF(pi_new, "ps"); if (pollset->polling_island != NULL) { - PI_UNREF(pollset->polling_island, "ps"); + PI_UNREF(exec_ctx, pollset->polling_island, "ps"); } pollset->polling_island = pi_new; } - gpr_mu_unlock(&fd->pi_mu); + gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&pollset->mu); + + GRPC_LOG_IF_ERROR("pollset_add_fd", error); } /******************************************************************************* @@ -1744,9 +1788,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, void *grpc_fd_get_polling_island(grpc_fd *fd) { polling_island *pi; - gpr_mu_lock(&fd->pi_mu); + gpr_mu_lock(&fd->mu); pi = fd->polling_island; - gpr_mu_unlock(&fd->pi_mu); + gpr_mu_unlock(&fd->mu); return pi; } @@ -1794,6 +1838,7 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, + .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 9e306af5fa..c2107e5e39 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -725,6 +725,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, GRPC_FD_UNREF(fd, "poll"); } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } + /******************************************************************************* * pollset_posix.c */ @@ -2006,6 +2008,7 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, + .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 45c0a5e954..4b593f4b2c 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -617,6 +617,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, GRPC_FD_UNREF(fd, "poll"); } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } + /******************************************************************************* * pollset_posix.c */ @@ -1234,6 +1236,7 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, + .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index a3c1e9db9a..6536672685 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -148,6 +148,10 @@ grpc_fd *grpc_fd_create(int fd, const char *name) { return g_event_engine->fd_create(fd, name); } +grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd) { + return g_event_engine->fd_get_workqueue(fd); +} + int grpc_fd_wrapped_fd(grpc_fd *fd) { return g_event_engine->fd_wrapped_fd(fd); } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 579c84ef70..c2aa1756ea 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -56,6 +56,7 @@ typedef struct grpc_event_engine_vtable { void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); bool (*fd_is_shutdown)(grpc_fd *fd); + grpc_workqueue *(*fd_get_workqueue)(grpc_fd *fd); grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, grpc_fd *fd); @@ -107,6 +108,9 @@ const char *grpc_get_poll_strategy_name(); This takes ownership of closing fd. */ grpc_fd *grpc_fd_create(int fd, const char *name); +/* Get a workqueue that's associated with this fd */ +grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd); + /* Return the wrapped fd, or -1 if it has been released or closed. */ int grpc_fd_wrapped_fd(grpc_fd *fd); diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index c44aafcddf..ac7785ec13 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -37,6 +37,7 @@ #include #include +#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) { @@ -85,14 +86,17 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error, grpc_workqueue *offload_target_or_null) { - GPR_ASSERT(offload_target_or_null == NULL); - grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + if (offload_target_or_null == NULL) { + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + } else { + grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error); + GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched"); + } } void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, grpc_closure_list *list, grpc_workqueue *offload_target_or_null) { - GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_move(list, &exec_ctx->closure_list); } diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 38f27d9b13..917f332f03 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -93,7 +93,11 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); /** Finish any pending work for a grpc_exec_ctx. Must be called before * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); -/** Add a closure to be executed at the next flush/finish point */ +/** Add a closure to be executed in the future. + If \a offload_target_or_null is NULL, the closure will be executed at the + next exec_ctx.{finish,flush} point. + If \a offload_target_or_null is non-NULL, the closure will be scheduled + against the workqueue, and a reference to the workqueue will be consumed. */ void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error, grpc_workqueue *offload_target_or_null); diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 89292a153e..d67d388b8c 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -45,6 +45,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" @@ -62,6 +63,7 @@ void grpc_iomgr_init(void) { grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; + grpc_network_status_init(); grpc_iomgr_platform_init(); } @@ -140,6 +142,7 @@ void grpc_iomgr_shutdown(void) { grpc_iomgr_platform_shutdown(); grpc_exec_ctx_global_shutdown(); + grpc_network_status_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); } diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c index 38a1c9b7d4..b4bb7e3cf7 100644 --- a/src/core/lib/iomgr/network_status_tracker.c +++ b/src/core/lib/iomgr/network_status_tracker.c @@ -42,10 +42,16 @@ typedef struct endpoint_ll_node { static endpoint_ll_node *head = NULL; static gpr_mu g_endpoint_mutex; -static bool g_init_done = false; -void grpc_initialize_network_status_monitor() { - g_init_done = true; +void grpc_network_status_shutdown(void) { + if (head != NULL) { + gpr_log(GPR_ERROR, + "Memory leaked as all network endpoints were not shut down"); + } + gpr_mu_destroy(&g_endpoint_mutex); +} + +void grpc_network_status_init(void) { gpr_mu_init(&g_endpoint_mutex); // TODO(makarandd): Install callback with OS to monitor network status. } @@ -60,9 +66,6 @@ void grpc_destroy_network_status_monitor() { } void grpc_network_status_register_endpoint(grpc_endpoint *ep) { - if (!g_init_done) { - grpc_initialize_network_status_monitor(); - } gpr_mu_lock(&g_endpoint_mutex); if (head == NULL) { head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node)); diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h index 74a1aa8135..67cb645f44 100644 --- a/src/core/lib/iomgr/network_status_tracker.h +++ b/src/core/lib/iomgr/network_status_tracker.h @@ -35,7 +35,11 @@ #define GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H #include "src/core/lib/iomgr/endpoint.h" +void grpc_network_status_init(void); +void grpc_network_status_shutdown(void); + void grpc_network_status_register_endpoint(grpc_endpoint *ep); void grpc_network_status_unregister_endpoint(grpc_endpoint *ep); void grpc_network_status_shutdown_all_endpoints(); + #endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */ diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 2ab45e33ce..ec21e03944 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -284,7 +284,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } /* returns true if done, false if pending; if returning true, *error is set */ -#define MAX_WRITE_IOVEC 16 +#define MAX_WRITE_IOVEC 1024 static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; @@ -450,9 +450,19 @@ static char *tcp_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } -static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, - tcp_shutdown, tcp_destroy, tcp_get_peer}; +static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return grpc_fd_get_workqueue(tcp->em_fd); +} + +static const grpc_endpoint_vtable vtable = {tcp_read, + tcp_write, + tcp_get_workqueue, + tcp_add_to_pollset, + tcp_add_to_pollset_set, + tcp_shutdown, + tcp_destroy, + tcp_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, const char *peer_string) { diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index d3803c3bd0..cb2ff782d6 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -491,7 +491,8 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { } for (unsigned i = 0; i < count; i++) { - int fd, port; + int fd = -1; + int port = -1; grpc_dualstack_mode dsmode; err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0, &dsmode, &fd); diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 37ab59021e..35054c42b5 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -389,9 +389,16 @@ static char *win_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } -static grpc_endpoint_vtable vtable = { - win_read, win_write, win_add_to_pollset, win_add_to_pollset_set, - win_shutdown, win_destroy, win_get_peer}; +static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; } + +static grpc_endpoint_vtable vtable = {win_read, + win_write, + win_get_workqueue, + win_add_to_pollset, + win_add_to_pollset_set, + win_shutdown, + win_destroy, + win_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 5cc40eea50..7156e490d7 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -38,6 +38,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/pollset_set.h" #ifdef GPR_POSIX_SOCKET #include "src/core/lib/iomgr/workqueue_posix.h" @@ -49,35 +50,45 @@ /* grpc_workqueue is forward declared in exec_ctx.h */ -/** Create a work queue */ -grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, - grpc_workqueue **workqueue); - +/* Deprecated: do not use. + This has *already* been removed in a future commit. */ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); -#define GRPC_WORKQUEUE_REFCOUNT_DEBUG +/* Reference counting functions. Use the macro's always + (GRPC_WORKQUEUE_{REF,UNREF}). + + Pass in a descriptive reason string for reffing/unreffing as the last + argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that + string will be printed alongside the refcount. When it is not defined, the + string will be discarded at compilation time. */ + +//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #define GRPC_WORKQUEUE_REF(p, r) \ - grpc_workqueue_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_WORKQUEUE_UNREF(cl, p, r) \ - grpc_workqueue_unref((cl), (p), __FILE__, __LINE__, (r)) + (grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p)) +#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \ + grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, const char *reason); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, const char *file, int line, const char *reason); #else -#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p)) +#define GRPC_WORKQUEUE_REF(p, r) (grpc_workqueue_ref((p)), (p)) #define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p)) void grpc_workqueue_ref(grpc_workqueue *workqueue); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); #endif -/** Bind this workqueue to a pollset */ -void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, - grpc_pollset *pollset); +/** Add a work item to a workqueue. Items added to a work queue will be started + in approximately the order they were enqueued, on some thread that may or + may not be the current thread. Successive closures enqueued onto a workqueue + MAY be executed concurrently. + + It is generally more expensive to add a closure to a workqueue than to the + execution context, both in terms of CPU work and in execution latency. -/** Add a work item to a workqueue */ + Use work queues when it's important that other threads be given a chance to + tackle some workload. */ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error); diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 45e0f6063b..e0d6dac230 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -70,7 +70,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, static void workqueue_destroy(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - GPR_ASSERT(grpc_closure_list_empty(workqueue->closure_list)); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd); } @@ -100,12 +100,6 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { } } -void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, - grpc_pollset *pollset) { - grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd); -} - void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_lock(&workqueue->mu); grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h index dcb47e7b59..0f26ba58e2 100644 --- a/src/core/lib/iomgr/workqueue_posix.h +++ b/src/core/lib/iomgr/workqueue_posix.h @@ -50,4 +50,9 @@ struct grpc_workqueue { grpc_closure read_closure; }; +/** Create a work queue. Returns an error if creation fails. If creation + succeeds, sets *workqueue to point to it. */ +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue); + #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */ diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c index 275f040b1c..23e2dea185 100644 --- a/src/core/lib/iomgr/workqueue_windows.c +++ b/src/core/lib/iomgr/workqueue_windows.c @@ -37,4 +37,26 @@ #include "src/core/lib/iomgr/workqueue.h" +// Minimal implementation of grpc_workqueue for Windows +// Works by directly enqueuing workqueue items onto the current execution +// context, which is at least correct, if not performant or in the spirit of +// workqueues. + +void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} + +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, + const char *reason) {} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + const char *file, int line, const char *reason) {} +#else +void grpc_workqueue_ref(grpc_workqueue *workqueue) {} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} +#endif + +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error) { + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); +} + #endif /* GPR_WINDOWS */ diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 7650d68e89..bc50f9d1b0 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -360,11 +360,19 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) { return grpc_endpoint_get_peer(ep->wrapped_ep); } -static const grpc_endpoint_vtable vtable = { - endpoint_read, endpoint_write, - endpoint_add_to_pollset, endpoint_add_to_pollset_set, - endpoint_shutdown, endpoint_destroy, - endpoint_get_peer}; +static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + return grpc_endpoint_get_workqueue(ep->wrapped_ep); +} + +static const grpc_endpoint_vtable vtable = {endpoint_read, + endpoint_write, + endpoint_get_workqueue, + endpoint_add_to_pollset, + endpoint_add_to_pollset_set, + endpoint_shutdown, + endpoint_destroy, + endpoint_get_peer}; grpc_endpoint *grpc_secure_endpoint_create( struct tsi_frame_protector *protector, grpc_endpoint *transport, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index def6e5068b..2f108af48a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -73,6 +73,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct requested_call { requested_call_type type; + size_t cq_idx; void *tag; grpc_server *server; grpc_completion_queue *cq_bound_to_call; @@ -206,11 +207,11 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; - /** free list of available requested_calls indices */ - gpr_stack_lockfree *request_freelist; + /** free list of available requested_calls_per_cq indices */ + gpr_stack_lockfree **request_freelist_per_cq; /** requested call backing data */ - requested_call *requested_calls; - size_t max_requested_calls; + requested_call **requested_calls_per_cq; + int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -357,7 +358,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, for (size_t i = 0; i < server->cq_count; i++) { while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != -1) { - fail_call(exec_ctx, server, i, &server->requested_calls[request_id], + fail_call(exec_ctx, server, i, + &server->requested_calls_per_cq[i][request_id], GRPC_ERROR_REF(error)); } } @@ -392,12 +394,16 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); + if (server->started) { + gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); + gpr_free(server->requested_calls_per_cq[i]); + } } - gpr_stack_lockfree_destroy(server->request_freelist); + gpr_free(server->request_freelist_per_cq); + gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); - gpr_free(server->requested_calls); gpr_free(server); } @@ -460,11 +466,13 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, requested_call *rc = req; grpc_server *server = rc->server; - if (rc >= server->requested_calls && - rc < server->requested_calls + server->max_requested_calls) { - GPR_ASSERT(rc - server->requested_calls <= INT_MAX); - gpr_stack_lockfree_push(server->request_freelist, - (int)(rc - server->requested_calls)); + if (rc >= server->requested_calls_per_cq[rc->cq_idx] && + rc < server->requested_calls_per_cq[rc->cq_idx] + + server->max_requested_calls_per_cq) { + GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); + gpr_stack_lockfree_push( + server->request_freelist_per_cq[rc->cq_idx], + (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); } else { gpr_free(req); } @@ -540,7 +548,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls[request_id]); + &server->requested_calls_per_cq[cq_idx][request_id]); return; /* early out */ } } @@ -979,8 +987,6 @@ void grpc_server_register_non_listening_completion_queue( } grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { - size_t i; - GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); grpc_server *server = gpr_malloc(sizeof(grpc_server)); @@ -998,15 +1004,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { &server->root_channel_data; /* TODO(ctiller): expose a channel_arg for this */ - server->max_requested_calls = 32768; - server->request_freelist = - gpr_stack_lockfree_create(server->max_requested_calls); - for (i = 0; i < (size_t)server->max_requested_calls; i++) { - gpr_stack_lockfree_push(server->request_freelist, (int)i); - } - server->requested_calls = gpr_malloc(server->max_requested_calls * - sizeof(*server->requested_calls)); - + server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1066,16 +1064,28 @@ void grpc_server_start(grpc_server *server) { server->started = true; size_t pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); + server->request_freelist_per_cq = + gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); + server->requested_calls_per_cq = + gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]); } + server->request_freelist_per_cq[i] = + gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); + for (int j = 0; j < server->max_requested_calls_per_cq; j++) { + gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); + } + server->requested_calls_per_cq[i] = + gpr_malloc((size_t)server->max_requested_calls_per_cq * + sizeof(*server->requested_calls_per_cq[i])); } request_matcher_init(&server->unregistered_request_matcher, - server->max_requested_calls, server); + (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, server->max_requested_calls, - server); + request_matcher_init(&rm->request_matcher, + (size_t)server->max_requested_calls_per_cq, server); } for (l = server->listeners; l; l = l->next) { @@ -1307,11 +1317,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GRPC_ERROR_CREATE("Server Shutdown")); return GRPC_CALL_OK; } - request_id = gpr_stack_lockfree_pop(server->request_freelist); + request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); if (request_id == -1) { /* out of request ids: just fail this one */ fail_call(exec_ctx, server, cq_idx, rc, - GRPC_ERROR_CREATE("Server Shutdown")); + grpc_error_set_int(GRPC_ERROR_CREATE("Out of request ids"), + GRPC_ERROR_INT_LIMIT, + server->max_requested_calls_per_cq)); return GRPC_CALL_OK; } switch (rc->type) { @@ -1322,7 +1334,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - server->requested_calls[request_id] = *rc; + server->requested_calls_per_cq[cq_idx][request_id] = *rc; gpr_free(rc); if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start @@ -1346,7 +1358,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls[request_id]); + &server->requested_calls_per_cq[cq_idx][request_id]); } gpr_mu_lock(&server->mu_call); } @@ -1382,6 +1394,7 @@ grpc_call_error grpc_server_request_call( } grpc_cq_begin_op(cq_for_notification, tag); details->reserved = NULL; + rc->cq_idx = cq_idx; rc->type = BATCH_CALL; rc->server = server; rc->tag = tag; @@ -1430,6 +1443,7 @@ grpc_call_error grpc_server_request_registered_call( goto done; } grpc_cq_begin_op(cq_for_notification, tag); + rc->cq_idx = cq_idx; rc->type = REGISTERED_CALL; rc->server = server; rc->tag = tag; diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 054f112127..68d05e3a85 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -179,6 +179,9 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, while ((w = tracker->watchers) != NULL) { *w->current = tracker->current_state; tracker->watchers = w->next; + if (grpc_connectivity_state_trace) { + gpr_log(GPR_DEBUG, "NOTIFY: %p", w->notify); + } grpc_exec_ctx_sched(exec_ctx, w->notify, GRPC_ERROR_REF(tracker->current_error), NULL); gpr_free(w); diff --git a/test/core/end2end/tests/high_initial_seqno.c b/test/core/end2end/tests/high_initial_seqno.c index 50e3c9cb89..db45f5eb5a 100644 --- a/test/core/end2end/tests/high_initial_seqno.c +++ b/test/core/end2end/tests/high_initial_seqno.c @@ -203,6 +203,12 @@ static void simple_request_body(grpc_end2end_test_fixture f) { grpc_call_destroy(c); grpc_call_destroy(s); + /* TODO(ctiller): this rate limits the test, and it should be removed when + retry has been implemented; until then cross-thread chatter + may result in some requests needing to be cancelled due to + seqno exhaustion. */ + cq_verify_empty(cqv); + cq_verifier_destroy(cqv); } diff --git a/test/core/end2end/tests/network_status_change.c b/test/core/end2end/tests/network_status_change.c index 10207844ab..39ddc13754 100644 --- a/test/core/end2end/tests/network_status_change.c +++ b/test/core/end2end/tests/network_status_change.c @@ -186,9 +186,10 @@ static void test_invoke_network_status_change(grpc_end2end_test_config config) { GPR_ASSERT(GRPC_CALL_OK == error); cq_expect_completion(cqv, tag(102), 1); + cq_verify(cqv); + // Simulate the network loss event grpc_network_status_shutdown_all_endpoints(); - cq_verify(cqv); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; @@ -205,7 +206,7 @@ static void test_invoke_network_status_change(grpc_end2end_test_config config) { op++; error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); GPR_ASSERT(GRPC_CALL_OK == error); - void shutdown_all_endpoints(); + cq_expect_completion(cqv, tag(103), 1); cq_expect_completion(cqv, tag(1), 1); cq_verify(cqv); diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c index 5e86c42309..27d630623e 100644 --- a/test/core/internal_api_canaries/iomgr.c +++ b/test/core/internal_api_canaries/iomgr.c @@ -77,11 +77,14 @@ static void test_code(void) { /* endpoint.h */ grpc_endpoint endpoint; - grpc_endpoint_vtable vtable = { - grpc_endpoint_read, grpc_endpoint_write, - grpc_endpoint_add_to_pollset, grpc_endpoint_add_to_pollset_set, - grpc_endpoint_shutdown, grpc_endpoint_destroy, - grpc_endpoint_get_peer}; + grpc_endpoint_vtable vtable = {grpc_endpoint_read, + grpc_endpoint_write, + grpc_endpoint_get_workqueue, + grpc_endpoint_add_to_pollset, + grpc_endpoint_add_to_pollset_set, + grpc_endpoint_shutdown, + grpc_endpoint_destroy, + grpc_endpoint_get_peer}; endpoint.vtable = &vtable; grpc_endpoint_read(&exec_ctx, &endpoint, NULL, NULL); diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c deleted file mode 100644 index 76ecfae74b..0000000000 --- a/test/core/iomgr/workqueue_test.c +++ /dev/null @@ -1,150 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/iomgr/workqueue.h" - -#include -#include -#include - -#include "test/core/util/test_config.h" - -static gpr_mu *g_mu; -static grpc_pollset *g_pollset; - -static void must_succeed(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); - gpr_mu_lock(g_mu); - *(int *)p = 1; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); - gpr_mu_unlock(g_mu); -} - -static void test_ref_unref(void) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_workqueue *wq; - GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create", - grpc_workqueue_create(&exec_ctx, &wq))); - GRPC_WORKQUEUE_REF(wq, "test"); - GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "test"); - GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); -} - -static void test_add_closure(void) { - grpc_closure c; - int done = 0; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_workqueue *wq; - GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create", - grpc_workqueue_create(&exec_ctx, &wq))); - gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); - grpc_pollset_worker *worker = NULL; - grpc_closure_init(&c, must_succeed, &done); - - grpc_workqueue_enqueue(&exec_ctx, wq, &c, GRPC_ERROR_NONE); - grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); - - gpr_mu_lock(g_mu); - GPR_ASSERT(!done); - while (!done) { - GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(deadline.clock_type), deadline))); - } - gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); - GPR_ASSERT(done); - - GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); -} - -static void test_flush(void) { - grpc_closure c; - int done = 0; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_workqueue *wq; - GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create", - grpc_workqueue_create(&exec_ctx, &wq))); - gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); - grpc_pollset_worker *worker = NULL; - grpc_closure_init(&c, must_succeed, &done); - - grpc_exec_ctx_sched(&exec_ctx, &c, GRPC_ERROR_NONE, NULL); - grpc_workqueue_flush(&exec_ctx, wq); - grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); - - gpr_mu_lock(g_mu); - GPR_ASSERT(!done); - while (!done) { - GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(deadline.clock_type), deadline))); - } - gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); - GPR_ASSERT(done); - - GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); -} - -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, - grpc_error *error) { - grpc_pollset_destroy(p); -} - -int main(int argc, char **argv) { - grpc_closure destroyed; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_test_init(argc, argv); - grpc_init(); - g_pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); - - test_ref_unref(); - test_add_closure(); - test_flush(); - - grpc_closure_init(&destroyed, destroy_pollset, g_pollset); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); - grpc_exec_ctx_finish(&exec_ctx); - grpc_shutdown(); - - gpr_free(g_pollset); - return 0; -} diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index ed9545e9df..13e0e918fb 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -95,9 +95,17 @@ static char *me_get_peer(grpc_endpoint *ep) { return gpr_strdup("fake:mock_endpoint"); } +static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; } + static const grpc_endpoint_vtable vtable = { - me_read, me_write, me_add_to_pollset, me_add_to_pollset_set, - me_shutdown, me_destroy, me_get_peer, + me_read, + me_write, + me_get_workqueue, + me_add_to_pollset, + me_add_to_pollset_set, + me_shutdown, + me_destroy, + me_get_peer, }; grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice)) { diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index a39f3dd66e..7ed9e97bd6 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -140,9 +140,17 @@ static char *me_get_peer(grpc_endpoint *ep) { return gpr_strdup("fake:mock_endpoint"); } +static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; } + static const grpc_endpoint_vtable vtable = { - me_read, me_write, me_add_to_pollset, me_add_to_pollset_set, - me_shutdown, me_destroy, me_get_peer, + me_read, + me_write, + me_get_workqueue, + me_add_to_pollset, + me_add_to_pollset_set, + me_shutdown, + me_destroy, + me_get_peer, }; static void half_init(half *m, passthru_endpoint *parent) { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 354a59cedd..0f87ae3e44 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -1166,6 +1166,9 @@ TEST_P(ProxyEnd2endTest, HugeResponse) { request.mutable_param()->set_response_message_length(kResponseSize); ClientContext context; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::seconds(20); + context.set_deadline(deadline); Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(kResponseSize, response.message().size()); EXPECT_TRUE(s.ok()); diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 1507d1e3d6..5dd0bd8533 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -206,21 +206,30 @@ class AsyncClient : public ClientImpl { void* got_tag; bool ok; - if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { - // Got a regular event, so process it - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (!ctx->RunNextState(ok, histogram)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - auto clone = ctx->StartNewClone(); - clone->Start(cli_cqs_[thread_idx].get()); - // delete the old version - delete ctx; + switch (cli_cqs_[thread_idx]->AsyncNext( + &got_tag, &ok, + std::chrono::system_clock::now() + std::chrono::milliseconds(10))) { + case CompletionQueue::SHUTDOWN: + return false; + case CompletionQueue::GOT_EVENT: { + // Got a regular event, so process it + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + if (!ctx->RunNextState(ok, histogram)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + auto clone = ctx->StartNewClone(); + clone->Start(cli_cqs_[thread_idx].get()); + // delete the old version + delete ctx; + } + return true; } - return true; - } else { // queue is shutting down - return false; + case CompletionQueue::TIMEOUT: + // TODO(ctiller): do something here to track how frequently we pass + // through this codepath. + return true; } + GPR_UNREACHABLE_CODE(return false); } protected: diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py index 34b8151441..4ff4e44b8b 100755 --- a/test/cpp/qps/gen_build_yaml.py +++ b/test/cpp/qps/gen_build_yaml.py @@ -45,9 +45,10 @@ import performance.scenario_config as scenario_config def _scenario_json_string(scenario_json): # tweak parameters to get fast test times - scenario_json['warmup_seconds'] = 1 + scenario_json['warmup_seconds'] = 0 scenario_json['benchmark_seconds'] = 1 - return json.dumps(scenario_config.remove_nonproto_fields(scenario_json)) + scenarios_json = {'scenarios': [scenario_config.remove_nonproto_fields(scenario_json)]} + return json.dumps(scenarios_json) def threads_of_type(scenario_json, path): d = scenario_json @@ -72,8 +73,7 @@ print yaml.dump({ { 'name': 'json_run_localhost', 'shortname': 'json_run_localhost:%s' % scenario_json['name'], - 'args': ['--scenario_json', - pipes.quote(_scenario_json_string(scenario_json))], + 'args': ['--scenarios_json', _scenario_json_string(scenario_json)], 'ci_platforms': ['linux', 'mac', 'posix', 'windows'], 'platforms': ['linux', 'mac', 'posix', 'windows'], 'flaky': False, @@ -81,7 +81,8 @@ print yaml.dump({ 'boringssl': True, 'defaults': 'boringssl', 'cpu_cost': guess_cpu(scenario_json), - 'exclude_configs': [] + 'exclude_configs': [], + 'timeout_seconds': 3*60 } for scenario_json in scenario_config.CXXLanguage().scenarios() ] diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc index 6545dc2917..74e40fbf1a 100644 --- a/test/cpp/qps/json_run_localhost.cc +++ b/test/cpp/qps/json_run_localhost.cc @@ -75,7 +75,7 @@ int main(int argc, char** argv) { for (int i = 1; i < argc; i++) { args.push_back(argv[i]); } - SubProcess(args).Join(); + GPR_ASSERT(SubProcess(args).Join() == 0); for (auto it = jobs.begin(); it != jobs.end(); ++it) { (*it)->Interrupt(); diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c9954d0d02..73ca19148b 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -102,7 +102,7 @@ class AsyncQpsServerTest : public Server { auto process_rpc_bound = std::bind(process_rpc, config.payload_config(), _1, _2); - for (int i = 0; i < 10000 / num_threads; i++) { + for (int i = 0; i < 15000; i++) { for (int j = 0; j < num_threads; j++) { if (request_unary_function) { auto request_unary = @@ -132,7 +132,8 @@ class AsyncQpsServerTest : public Server { for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { (*ss)->set_shutdown(); } - server_->Shutdown(); + server_->Shutdown(std::chrono::system_clock::now() + + std::chrono::seconds(3)); for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); } diff --git a/tools/dockerfile/test/python_pyenv_x64/Dockerfile b/tools/dockerfile/test/python_pyenv_x64/Dockerfile new file mode 100644 index 0000000000..abb5f3c89b --- /dev/null +++ b/tools/dockerfile/test/python_pyenv_x64/Dockerfile @@ -0,0 +1,112 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +FROM debian:jessie + +# Install Git and basic packages. +RUN apt-get update && apt-get install -y \ + autoconf \ + autotools-dev \ + build-essential \ + bzip2 \ + ccache \ + curl \ + gcc \ + gcc-multilib \ + git \ + golang \ + gyp \ + lcov \ + libc6 \ + libc6-dbg \ + libc6-dev \ + libgtest-dev \ + libtool \ + make \ + perl \ + strace \ + python-dev \ + python-setuptools \ + python-yaml \ + telnet \ + unzip \ + wget \ + zip && apt-get clean + +#================ +# Build profiling +RUN apt-get update && apt-get install -y time && apt-get clean + +#==================== +# Python dependencies + +# Install dependencies + +RUN apt-get update && apt-get install -y \ + python-all-dev \ + python3-all-dev \ + python-pip + +# Install Python packages from PyPI +RUN pip install pip --upgrade +RUN pip install virtualenv +RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.0.0a2 six==1.10.0 + +# Install dependencies for pyenv +RUN apt-get update && apt-get install -y \ + libbz2-dev \ + libncurses5-dev \ + libncursesw5-dev \ + libreadline-dev \ + libsqlite3-dev \ + libssl-dev \ + llvm \ + mercurial \ + zlib1g-dev && apt-get clean + +# Install Pyenv and dev Python versions 3.5 and 3.6 +RUN curl -L https://raw.githubusercontent.com/yyuu/pyenv-installer/master/bin/pyenv-installer | bash +RUN pyenv update +RUN pyenv install 3.5-dev +RUN pyenv install 3.6-dev +RUN pyenv local 3.5-dev 3.6-dev + +# Prepare ccache +RUN ln -s /usr/bin/ccache /usr/local/bin/gcc +RUN ln -s /usr/bin/ccache /usr/local/bin/g++ +RUN ln -s /usr/bin/ccache /usr/local/bin/cc +RUN ln -s /usr/bin/ccache /usr/local/bin/c++ +RUN ln -s /usr/bin/ccache /usr/local/bin/clang +RUN ln -s /usr/bin/ccache /usr/local/bin/clang++ + + +RUN mkdir /var/local/jenkins + +# Define the default command. +CMD ["bash"] diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 95d53e5f9e..7dd9e12614 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -39,6 +39,7 @@ import json import multiprocessing import os import os.path +import pipes import platform import random import re @@ -72,6 +73,9 @@ def platform_string(): return jobset.platform_string() +_DEFAULT_TIMEOUT_SECONDS = 5 * 60 + + # SimpleConfig: just compile with CONFIG=config, and run the binary to test class Config(object): @@ -84,7 +88,7 @@ class Config(object): self.tool_prefix = tool_prefix self.timeout_multiplier = timeout_multiplier - def job_spec(self, cmdline, timeout_seconds=5*60, + def job_spec(self, cmdline, timeout_seconds=_DEFAULT_TIMEOUT_SECONDS, shortname=None, environ={}, cpu_cost=1.0, flaky=False): """Construct a jobset.JobSpec for a test under this config @@ -159,7 +163,7 @@ class CLanguage(object): env={'GRPC_DEFAULT_SSL_ROOTS_FILE_PATH': _ROOT + '/src/core/lib/tsi/test_creds/ca.pem', 'GRPC_POLL_STRATEGY': polling_strategy} - shortname_ext = '' if polling_strategy=='all' else ' polling=%s' % polling_strategy + shortname_ext = '' if polling_strategy=='all' else ' GRPC_POLL_STRATEGY=%s' % polling_strategy if self.config.build_config in target['exclude_configs']: continue if self.platform == 'windows': @@ -190,28 +194,26 @@ class CLanguage(object): assert line[1] == ' ' test = base + line.strip() cmdline = [binary] + ['--gtest_filter=%s' % test] - out.append(self.config.job_spec(cmdline, [binary], - shortname='%s:%s %s' % (binary, test, shortname_ext), + out.append(self.config.job_spec(cmdline, + shortname='%s --gtest_filter=%s %s' % (binary, test, shortname_ext), cpu_cost=target['cpu_cost'], environ=env)) else: cmdline = [binary] + target['args'] - out.append(self.config.job_spec(cmdline, [binary], - shortname=' '.join(cmdline) + shortname_ext, + out.append(self.config.job_spec(cmdline, + shortname=' '.join( + pipes.quote(arg) + for arg in cmdline) + + shortname_ext, cpu_cost=target['cpu_cost'], flaky=target.get('flaky', False), + timeout_seconds=target.get('timeout_seconds', _DEFAULT_TIMEOUT_SECONDS), environ=env)) elif self.args.regex == '.*' or self.platform == 'windows': print '\nWARNING: binary not found, skipping', binary return sorted(out) def make_targets(self): - test_regex = self.args.regex - if self.platform != 'windows' and self.args.regex != '.*': - # use the regex to minimize the number of things to build - return [os.path.basename(target['name']) - for target in get_c_tests(False, self.test_lang) - if re.search(test_regex, '/' + target['name'])] if self.platform == 'windows': # don't build tools on windows just yet return ['buildtests_%s' % self.make_target] @@ -1283,8 +1285,6 @@ def _build_and_run( jobset.message( 'FLAKE', '%s [%d/%d runs flaked]' % (k, num_failures, num_runs), do_newline=True) - else: - jobset.message('PASSED', k, do_newline=True) finally: for antagonist in antagonists: antagonist.kill() diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index cdbc254f43..e3cfd55cd6 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -1818,22 +1818,6 @@ "third_party": false, "type": "target" }, - { - "deps": [ - "gpr", - "gpr_test_util", - "grpc", - "grpc_test_util" - ], - "headers": [], - "language": "c", - "name": "workqueue_test", - "src": [ - "test/core/iomgr/workqueue_test.c" - ], - "third_party": false, - "type": "target" - }, { "deps": [ "gpr", diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 93d42e3454..d94301b946 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -1935,25 +1935,6 @@ "windows" ] }, - { - "args": [], - "ci_platforms": [ - "linux", - "mac", - "posix" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "flaky": false, - "gtest": false, - "language": "c", - "name": "workqueue_test", - "platforms": [ - "linux", - "mac", - "posix" - ] - }, { "args": [], "ci_platforms": [ @@ -27153,8 +27134,8 @@ }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_generic_async_streaming_ping_pong_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27175,12 +27156,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_generic_async_streaming_ping_pong_secure" + "shortname": "json_run_localhost:cpp_generic_async_streaming_ping_pong_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_streaming_ping_pong_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_streaming_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27201,12 +27183,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_streaming_ping_pong_secure" + "shortname": "json_run_localhost:cpp_protobuf_async_streaming_ping_pong_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_unary_ping_pong_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27227,12 +27210,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_unary_ping_pong_secure" + "shortname": "json_run_localhost:cpp_protobuf_async_unary_ping_pong_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_sync_unary_ping_pong_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_sync_unary_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27253,12 +27237,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_sync_unary_ping_pong_secure" + "shortname": "json_run_localhost:cpp_protobuf_sync_unary_ping_pong_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_unary_qps_unconstrained_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27279,12 +27264,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_unary_qps_unconstrained_secure" + "shortname": "json_run_localhost:cpp_protobuf_async_unary_qps_unconstrained_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27305,12 +27291,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_streaming_qps_unconstrained_secure" + "shortname": "json_run_localhost:cpp_protobuf_async_streaming_qps_unconstrained_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_generic_async_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27331,12 +27318,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_unconstrained_secure" + "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_unconstrained_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_generic_async_streaming_qps_one_server_core_secure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_qps_one_server_core_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27357,12 +27345,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_one_server_core_secure" + "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_one_server_core_secure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_generic_async_streaming_ping_pong_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27383,12 +27372,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_generic_async_streaming_ping_pong_insecure" + "shortname": "json_run_localhost:cpp_generic_async_streaming_ping_pong_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_streaming_ping_pong_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_streaming_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27409,12 +27399,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_streaming_ping_pong_insecure" + "shortname": "json_run_localhost:cpp_protobuf_async_streaming_ping_pong_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_unary_ping_pong_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27435,12 +27426,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_unary_ping_pong_insecure" + "shortname": "json_run_localhost:cpp_protobuf_async_unary_ping_pong_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_sync_unary_ping_pong_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_sync_unary_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, "ci_platforms": [ @@ -27461,12 +27453,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_sync_unary_ping_pong_insecure" + "shortname": "json_run_localhost:cpp_protobuf_sync_unary_ping_pong_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_unary_qps_unconstrained_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_unary_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27487,12 +27480,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_unary_qps_unconstrained_insecure" + "shortname": "json_run_localhost:cpp_protobuf_async_unary_qps_unconstrained_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_protobuf_async_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"ASYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27513,12 +27507,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_protobuf_async_streaming_qps_unconstrained_insecure" + "shortname": "json_run_localhost:cpp_protobuf_async_streaming_qps_unconstrained_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_generic_async_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27539,12 +27534,13 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_unconstrained_insecure" + "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_unconstrained_insecure", + "timeout_seconds": 180 }, { "args": [ - "--scenario_json", - "'{\"name\": \"cpp_generic_async_streaming_qps_one_server_core_insecure\", \"warmup_seconds\": 1, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}'" + "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_generic_async_streaming_qps_one_server_core_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"server_type\": \"ASYNC_GENERIC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"bytebuf_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" ], "boringssl": true, "ci_platforms": [ @@ -27565,7 +27561,8 @@ "posix", "windows" ], - "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_one_server_core_insecure" + "shortname": "json_run_localhost:cpp_generic_async_streaming_qps_one_server_core_insecure", + "timeout_seconds": 180 }, { "args": [ -- cgit v1.2.3 From 77c7f9fd621148ed63f938c21ad3364bba65445e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 15 Jul 2016 07:21:14 -0700 Subject: Merge pull request #7407 from ctiller/delayed-write Benchmark fixes --- src/proto/grpc/testing/control.proto | 3 + test/cpp/qps/client.h | 68 +++++++++++++----- test/cpp/qps/client_async.cc | 92 +++++++++++++++--------- test/cpp/qps/client_sync.cc | 28 ++++---- test/cpp/qps/driver.cc | 135 +++++++++++++++++++++++++---------- test/cpp/qps/driver.h | 2 +- test/cpp/qps/qps_json_driver.cc | 18 +++-- test/cpp/qps/qps_worker.cc | 40 ++++++----- test/cpp/qps/server_async.cc | 56 +++++++-------- 9 files changed, 287 insertions(+), 155 deletions(-) (limited to 'test') diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 20496a8116..ece6910815 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -229,4 +229,7 @@ message ScenarioResult { repeated int32 server_cores = 5; // An after-the-fact computed summary ScenarioResultSummary summary = 6; + // Information on success or failure of each worker + repeated bool client_success = 7; + repeated bool server_success = 8; } diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 047bd16408..4045e13460 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -112,6 +112,21 @@ class ClientRequestCreator { } }; +class HistogramEntry GRPC_FINAL { + public: + HistogramEntry() : used_(false) {} + bool used() const { return used_; } + double value() const { return value_; } + void set_value(double v) { + used_ = true; + value_ = v; + } + + private: + bool used_; + double value_; +}; + class Client { public: Client() : timer_(new UsageTimer), interarrival_timer_() {} @@ -151,10 +166,21 @@ class Client { return stats; } + // Must call AwaitThreadsCompletion before destructor to avoid a race + // between destructor and invocation of virtual ThreadFunc + void AwaitThreadsCompletion() { + DestroyMultithreading(); + std::unique_lock g(thread_completion_mu_); + while (threads_remaining_ != 0) { + threads_complete_.wait(g); + } + } + protected: bool closed_loop_; void StartThreads(size_t num_threads) { + threads_remaining_ = num_threads; for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); } @@ -162,7 +188,8 @@ class Client { void EndThreads() { threads_.clear(); } - virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; + virtual void DestroyMultithreading() = 0; + virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { // Set up the load distribution based on the number of threads @@ -215,7 +242,6 @@ class Client { public: Thread(Client* client, size_t idx) : done_(false), - new_stats_(nullptr), client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} @@ -230,15 +256,10 @@ class Client { void BeginSwap(Histogram* n) { std::lock_guard g(mu_); - new_stats_ = n; + n->Swap(&histogram_); } - void EndSwap() { - std::unique_lock g(mu_); - while (new_stats_ != nullptr) { - cv_.wait(g); - }; - } + void EndSwap() {} void MergeStatsInto(Histogram* hist) { std::unique_lock g(mu_); @@ -252,29 +273,26 @@ class Client { void ThreadFunc() { for (;;) { // run the loop body - const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_); - // lock, see if we're done + HistogramEntry entry; + const bool thread_still_ok = client_->ThreadFunc(&entry, idx_); + // lock, update histogram if needed and see if we're done std::lock_guard g(mu_); + if (entry.used()) { + histogram_.Add(entry.value()); + } if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); done_ = true; } if (done_) { + client_->CompleteThread(); return; } - // check if we're resetting stats, swap out the histogram if so - if (new_stats_) { - new_stats_->Swap(&histogram_); - new_stats_ = nullptr; - cv_.notify_one(); - } } } std::mutex mu_; - std::condition_variable cv_; bool done_; - Histogram* new_stats_; Histogram histogram_; Client* client_; const size_t idx_; @@ -286,6 +304,18 @@ class Client { InterarrivalTimer interarrival_timer_; std::vector next_time_; + + std::mutex thread_completion_mu_; + size_t threads_remaining_; + std::condition_variable threads_complete_; + + void CompleteThread() { + std::lock_guard g(thread_completion_mu_); + threads_remaining_--; + if (threads_remaining_ == 0) { + threads_complete_.notify_all(); + } + } }; template diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 5dd0bd8533..5d9cb4bd0c 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -31,7 +31,6 @@ * */ -#include #include #include #include @@ -48,7 +47,6 @@ #include #include #include -#include #include #include "src/proto/grpc/testing/services.grpc.pb.h" @@ -64,7 +62,7 @@ class ClientRpcContext { ClientRpcContext() {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate - virtual bool RunNextState(bool, Histogram* hist) = 0; + virtual bool RunNextState(bool, HistogramEntry* entry) = 0; virtual ClientRpcContext* StartNewClone() = 0; static void* tag(ClientRpcContext* c) { return reinterpret_cast(c); } static ClientRpcContext* detag(void* t) { @@ -104,7 +102,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); } } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { switch (next_state_) { case State::READY: start_ = UsageTimer::Now(); @@ -114,7 +112,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { next_state_ = State::RESP_DONE; return true; case State::RESP_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::INVALID; return false; @@ -176,6 +174,7 @@ class AsyncClient : public ClientImpl { for (int i = 0; i < num_async_threads_; i++) { cli_cqs_.emplace_back(new CompletionQueue); next_issuers_.emplace_back(NextIssuer(i)); + shutdown_state_.emplace_back(new PerThreadShutdownState()); } using namespace std::placeholders; @@ -192,7 +191,6 @@ class AsyncClient : public ClientImpl { } virtual ~AsyncClient() { for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { - (*cq)->Shutdown(); void* got_tag; bool ok; while ((*cq)->Next(&got_tag, &ok)) { @@ -201,7 +199,36 @@ class AsyncClient : public ClientImpl { } } - bool ThreadFunc(Histogram* histogram, + protected: + const int num_async_threads_; + + private: + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} + }; + + int NumThreads(const ClientConfig& config) { + int num_threads = config.async_client_threads(); + if (num_threads <= 0) { // Use dynamic sizing + num_threads = cores_; + gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); + } + return num_threads; + } + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { + for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { + std::lock_guard lock((*ss)->mutex); + (*ss)->shutdown = true; + } + for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { + (*cq)->Shutdown(); + } + this->EndThreads(); // this needed for resolution + } + + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; @@ -209,12 +236,15 @@ class AsyncClient : public ClientImpl { switch (cli_cqs_[thread_idx]->AsyncNext( &got_tag, &ok, std::chrono::system_clock::now() + std::chrono::milliseconds(10))) { - case CompletionQueue::SHUTDOWN: - return false; case CompletionQueue::GOT_EVENT: { // Got a regular event, so process it ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (!ctx->RunNextState(ok, histogram)) { + // Proceed while holding a lock to make sure that + // this thread isn't supposed to shut down + std::lock_guard l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { + return true; + } else if (!ctx->RunNextState(ok, entry)) { // The RPC and callback are done, so clone the ctx // and kickstart the new one auto clone = ctx->StartNewClone(); @@ -224,29 +254,23 @@ class AsyncClient : public ClientImpl { } return true; } - case CompletionQueue::TIMEOUT: - // TODO(ctiller): do something here to track how frequently we pass - // through this codepath. + case CompletionQueue::TIMEOUT: { + std::lock_guard l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { + return true; + } + return true; + } + case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be + // done return true; } - GPR_UNREACHABLE_CODE(return false); - } - - protected: - const int num_async_threads_; - - private: - int NumThreads(const ClientConfig& config) { - int num_threads = config.async_client_threads(); - if (num_threads <= 0) { // Use dynamic sizing - num_threads = cores_; - gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); - } - return num_threads; + GPR_UNREACHABLE_CODE(return true); } std::vector> cli_cqs_; std::vector> next_issuers_; + std::vector> shutdown_state_; }; static std::unique_ptr BenchmarkStubCreator( @@ -262,7 +286,7 @@ class AsyncUnaryClient GRPC_FINAL config, SetupCtx, BenchmarkStubCreator) { StartThreads(num_async_threads_); } - ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } + ~AsyncUnaryClient() GRPC_OVERRIDE {} private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} @@ -307,7 +331,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); next_state_ = State::STREAM_IDLE; } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { while (true) { switch (next_state_) { case State::STREAM_IDLE: @@ -339,7 +363,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { return true; break; case State::READ_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around @@ -391,7 +415,7 @@ class AsyncStreamingClient GRPC_FINAL StartThreads(num_async_threads_); } - ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } + ~AsyncStreamingClient() GRPC_OVERRIDE {} private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} @@ -439,7 +463,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { ClientRpcContext::tag(this)); next_state_ = State::STREAM_IDLE; } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { while (true) { switch (next_state_) { case State::STREAM_IDLE: @@ -471,7 +495,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { return true; break; case State::READ_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around @@ -527,7 +551,7 @@ class GenericAsyncStreamingClient GRPC_FINAL StartThreads(num_async_threads_); } - ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } + ~GenericAsyncStreamingClient() GRPC_OVERRIDE {} private: static void CheckDone(grpc::Status s, ByteBuffer* response) {} diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index c88e95b80e..25c7823553 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -31,7 +31,6 @@ * */ -#include #include #include #include @@ -46,7 +45,6 @@ #include #include #include -#include #include #include #include @@ -55,7 +53,6 @@ #include "src/core/lib/profiling/timers.h" #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/client.h" -#include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/usage_timer.h" @@ -90,6 +87,9 @@ class SynchronousClient size_t num_threads_; std::vector responses_; + + private: + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); } }; class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { @@ -98,9 +98,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { : SynchronousClient(config) { StartThreads(num_threads_); } - ~SynchronousUnaryClient() { EndThreads(); } + ~SynchronousUnaryClient() {} - bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = UsageTimer::Now(); @@ -108,7 +108,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { grpc::ClientContext context; grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); - histogram->Add((UsageTimer::Now() - start) * 1e9); + entry->set_value((UsageTimer::Now() - start) * 1e9); return s.ok(); } }; @@ -127,25 +127,29 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { StartThreads(num_threads_); } ~SynchronousStreamingClient() { - EndThreads(); - for (auto stream = &stream_[0]; stream != &stream_[num_threads_]; - stream++) { + for (size_t i = 0; i < num_threads_; i++) { + auto stream = &stream_[i]; if (*stream) { (*stream)->WritesDone(); - EXPECT_TRUE((*stream)->Finish().ok()); + Status s = (*stream)->Finish(); + EXPECT_TRUE(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, + s.error_message().c_str()); + } } } delete[] stream_; delete[] context_; } - bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0); double start = UsageTimer::Now(); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { - histogram->Add((UsageTimer::Now() - start) * 1e9); + entry->set_value((UsageTimer::Now() - start) * 1e9); return true; } return false; diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 08bf045883..2aeaea51f2 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -87,7 +87,7 @@ static std::unordered_map> get_hosts_and_cores( CoreRequest dummy; CoreResponse cores; grpc::Status s = stub->CoreCount(&ctx, dummy, &cores); - assert(s.ok()); + GPR_ASSERT(s.ok()); std::deque dq; for (int i = 0; i < cores.cores(); i++) { dq.push_back(i); @@ -289,9 +289,13 @@ std::unique_ptr RunScenario( *args.mutable_setup() = server_config; servers[i].stream = servers[i].stub->RunServer(runsc::AllocContext(&contexts)); - GPR_ASSERT(servers[i].stream->Write(args)); + if (!servers[i].stream->Write(args)) { + gpr_log(GPR_ERROR, "Could not write args to server %zu", i); + } ServerStatus init_status; - GPR_ASSERT(servers[i].stream->Read(&init_status)); + if (!servers[i].stream->Read(&init_status)) { + gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i); + } gpr_join_host_port(&cli_target, host, init_status.port()); client_config.add_server_targets(cli_target); gpr_free(host); @@ -345,9 +349,13 @@ std::unique_ptr RunScenario( *args.mutable_setup() = per_client_config; clients[i].stream = clients[i].stub->RunClient(runsc::AllocContext(&contexts)); - GPR_ASSERT(clients[i].stream->Write(args)); + if (!clients[i].stream->Write(args)) { + gpr_log(GPR_ERROR, "Could not write args to client %zu", i); + } ClientStatus init_status; - GPR_ASSERT(clients[i].stream->Read(&init_status)); + if (!clients[i].stream->Read(&init_status)) { + gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i); + } } // Let everything warmup @@ -362,19 +370,31 @@ std::unique_ptr RunScenario( server_mark.mutable_mark()->set_reset(true); ClientArgs client_mark; client_mark.mutable_mark()->set_reset(true); - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Write(server_mark)); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + if (!server->stream->Write(server_mark)) { + gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i); + } } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Write(client_mark)); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + if (!client->stream->Write(client_mark)) { + gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i); + } } ServerStatus server_status; ClientStatus client_status; - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Read(&server_status)); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + if (!server->stream->Read(&server_status)) { + gpr_log(GPR_ERROR, "Couldn't get status from server %zu", i); + } } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Read(&client_status)); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + if (!client->stream->Read(&client_status)) { + gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i); + } } // Wait some time @@ -390,37 +410,73 @@ std::unique_ptr RunScenario( Histogram merged_latencies; gpr_log(GPR_INFO, "Finishing clients"); - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Write(client_mark)); - GPR_ASSERT(client->stream->WritesDone()); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + if (!client->stream->Write(client_mark)) { + gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i); + } + if (!client->stream->WritesDone()) { + gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i); + } } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Read(&client_status)); - const auto& stats = client_status.stats(); - merged_latencies.MergeProto(stats.latencies()); - result->add_client_stats()->CopyFrom(stats); - GPR_ASSERT(!client->stream->Read(&client_status)); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + // Read the client final status + if (client->stream->Read(&client_status)) { + gpr_log(GPR_INFO, "Received final status from client %zu", i); + const auto& stats = client_status.stats(); + merged_latencies.MergeProto(stats.latencies()); + result->add_client_stats()->CopyFrom(stats); + // That final status should be the last message on the client stream + GPR_ASSERT(!client->stream->Read(&client_status)); + } else { + gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i); + } } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Finish().ok()); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + Status s = client->stream->Finish(); + result->add_client_success(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Client %zu had an error %s", i, + s.error_message().c_str()); + } } delete[] clients; merged_latencies.FillProto(result->mutable_latencies()); gpr_log(GPR_INFO, "Finishing servers"); - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Write(server_mark)); - GPR_ASSERT(server->stream->WritesDone()); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + if (!server->stream->Write(server_mark)) { + gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i); + } + if (!server->stream->WritesDone()) { + gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i); + } } - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Read(&server_status)); - result->add_server_stats()->CopyFrom(server_status.stats()); - result->add_server_cores(server_status.cores()); - GPR_ASSERT(!server->stream->Read(&server_status)); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + // Read the server final status + if (server->stream->Read(&server_status)) { + gpr_log(GPR_INFO, "Received final status from server %zu", i); + result->add_server_stats()->CopyFrom(server_status.stats()); + result->add_server_cores(server_status.cores()); + // That final status should be the last message on the server stream + GPR_ASSERT(!server->stream->Read(&server_status)); + } else { + gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i); + } } - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Finish().ok()); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + Status s = server->stream->Finish(); + result->add_server_success(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Server %zu had an error %s", i, + s.error_message().c_str()); + } } delete[] servers; @@ -429,8 +485,9 @@ std::unique_ptr RunScenario( return result; } -void RunQuit() { +bool RunQuit() { // Get client, server lists + bool result = true; auto workers = get_workers("QPS_WORKERS"); for (size_t i = 0; i < workers.size(); i++) { auto stub = WorkerService::NewStub( @@ -438,8 +495,14 @@ void RunQuit() { Void dummy; grpc::ClientContext ctx; ctx.set_fail_fast(false); - GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok()); + Status s = stub->QuitWorker(&ctx, dummy, &dummy); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i, + s.error_message().c_str()); + result = false; + } } + return result; } } // namespace testing diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 3a5cf138f1..93f4370caf 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -47,7 +47,7 @@ std::unique_ptr RunScenario( const grpc::testing::ServerConfig& server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count); -void RunQuit(); +bool RunQuit(); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc index f5d739f893..1524ebbc38 100644 --- a/test/cpp/qps/qps_json_driver.cc +++ b/test/cpp/qps/qps_json_driver.cc @@ -53,7 +53,7 @@ DEFINE_bool(quit, false, "Quit the workers"); namespace grpc { namespace testing { -static void QpsDriver() { +static bool QpsDriver() { grpc::string json; bool scfile = (FLAGS_scenarios_file != ""); @@ -81,13 +81,13 @@ static void QpsDriver() { } else if (scjson) { json = FLAGS_scenarios_json.c_str(); } else if (FLAGS_quit) { - RunQuit(); - return; + return RunQuit(); } // Parse into an array of scenarios Scenarios scenarios; ParseJson(json.c_str(), "grpc.testing.Scenarios", &scenarios); + bool success = true; // Make sure that there is at least some valid scenario here GPR_ASSERT(scenarios.scenarios_size() > 0); @@ -109,7 +109,15 @@ static void QpsDriver() { GetReporter()->ReportQPSPerCore(*result); GetReporter()->ReportLatency(*result); GetReporter()->ReportTimes(*result); + + for (int i = 0; success && i < result->client_success_size(); i++) { + success = result->client_success(i); + } + for (int i = 0; success && i < result->server_success_size(); i++) { + success = result->server_success(i); + } } + return success; } } // namespace testing @@ -118,7 +126,7 @@ static void QpsDriver() { int main(int argc, char **argv) { grpc::testing::InitBenchmark(&argc, &argv, true); - grpc::testing::QpsDriver(); + bool ok = grpc::testing::QpsDriver(); - return 0; + return ok ? 0 : 1; } diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index f514e23e85..d3e53fe14a 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -33,7 +33,6 @@ #include "test/cpp/qps/qps_worker.h" -#include #include #include #include @@ -124,11 +123,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { GRPC_OVERRIDE { InstanceGuard g(this); if (!g.Acquired()) { - return Status(StatusCode::RESOURCE_EXHAUSTED, ""); + return Status(StatusCode::RESOURCE_EXHAUSTED, "Client worker busy"); } ScopedProfile profile("qps_client.prof", false); Status ret = RunClientBody(ctx, stream); + gpr_log(GPR_INFO, "RunClient: Returning"); return ret; } @@ -137,11 +137,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { GRPC_OVERRIDE { InstanceGuard g(this); if (!g.Acquired()) { - return Status(StatusCode::RESOURCE_EXHAUSTED, ""); + return Status(StatusCode::RESOURCE_EXHAUSTED, "Server worker busy"); } ScopedProfile profile("qps_server.prof", false); Status ret = RunServerBody(ctx, stream); + gpr_log(GPR_INFO, "RunServer: Returning"); return ret; } @@ -154,7 +155,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE { InstanceGuard g(this); if (!g.Acquired()) { - return Status(StatusCode::RESOURCE_EXHAUSTED, ""); + return Status(StatusCode::RESOURCE_EXHAUSTED, "Quitting worker busy"); } worker_->MarkDone(); @@ -197,33 +198,38 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { ServerReaderWriter* stream) { ClientArgs args; if (!stream->Read(&args)) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read args"); } if (!args.has_setup()) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Invalid setup arg"); } gpr_log(GPR_INFO, "RunClientBody: about to create client"); auto client = CreateClient(args.setup()); if (!client) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create client"); } gpr_log(GPR_INFO, "RunClientBody: client created"); ClientStatus status; if (!stream->Write(status)) { - return Status(StatusCode::UNKNOWN, ""); + return Status(StatusCode::UNKNOWN, "Client couldn't report init status"); } gpr_log(GPR_INFO, "RunClientBody: creation status reported"); while (stream->Read(&args)) { gpr_log(GPR_INFO, "RunClientBody: Message read"); if (!args.has_mark()) { gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!"); - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark"); } *status.mutable_stats() = client->Mark(args.mark().reset()); - stream->Write(status); + if (!stream->Write(status)) { + return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark"); + } gpr_log(GPR_INFO, "RunClientBody: Mark response given"); } + gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion"); + client->AwaitThreadsCompletion(); + gpr_log(GPR_INFO, "RunClientBody: Returning"); return Status::OK; } @@ -232,10 +238,10 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { ServerReaderWriter* stream) { ServerArgs args; if (!stream->Read(&args)) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read server args"); } if (!args.has_setup()) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args"); } if (server_port_ != 0) { args.mutable_setup()->set_port(server_port_); @@ -243,24 +249,26 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { gpr_log(GPR_INFO, "RunServerBody: about to create server"); auto server = CreateServer(args.setup()); if (!server) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server"); } gpr_log(GPR_INFO, "RunServerBody: server created"); ServerStatus status; status.set_port(server->port()); status.set_cores(server->cores()); if (!stream->Write(status)) { - return Status(StatusCode::UNKNOWN, ""); + return Status(StatusCode::UNKNOWN, "Server couldn't report init status"); } gpr_log(GPR_INFO, "RunServerBody: creation status reported"); while (stream->Read(&args)) { gpr_log(GPR_INFO, "RunServerBody: Message read"); if (!args.has_mark()) { gpr_log(GPR_INFO, "RunServerBody: Message not a mark!"); - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark"); } *status.mutable_stats() = server->Mark(args.mark().reset()); - stream->Write(status); + if (!stream->Write(status)) { + return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark"); + } gpr_log(GPR_INFO, "RunServerBody: Mark response given"); } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 73ca19148b..dea8746331 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -123,22 +123,24 @@ class AsyncQpsServerTest : public Server { for (int i = 0; i < num_threads; i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); - } - for (int i = 0; i < num_threads; i++) { threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); } } ~AsyncQpsServerTest() { for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { - (*ss)->set_shutdown(); + std::lock_guard lock((*ss)->mutex); + (*ss)->shutdown = true; + } + // TODO (vpai): Remove this deadline and allow Shutdown to finish properly + auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3); + server_->Shutdown(deadline); + for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { + (*cq)->Shutdown(); } - server_->Shutdown(std::chrono::system_clock::now() + - std::chrono::seconds(3)); for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); } for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { - (*cq)->Shutdown(); bool ok; void *got_tag; while ((*cq)->Next(&got_tag, &ok)) @@ -151,22 +153,24 @@ class AsyncQpsServerTest : public Server { } private: - void ThreadFunc(int rank) { + void ThreadFunc(int thread_idx) { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[rank]->Next(&got_tag, &ok)) { + while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke - const bool still_going = ctx->RunNextState(ok); - if (!shutdown_state_[rank]->shutdown()) { - // this RPC context is done, so refresh it - if (!still_going) { - ctx->Reset(); - } - } else { + // Proceed while holding a lock to make sure that + // this thread isn't supposed to shut down + std::lock_guard l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { return; } + const bool still_going = ctx->RunNextState(ok); + // if this RPC context is done, refresh it + if (!still_going) { + ctx->Reset(); + } } return; } @@ -334,24 +338,12 @@ class AsyncQpsServerTest : public Server { ServiceType async_service_; std::forward_list contexts_; - class PerThreadShutdownState { - public: - PerThreadShutdownState() : shutdown_(false) {} - - bool shutdown() const { - std::lock_guard lock(mutex_); - return shutdown_; - } - - void set_shutdown() { - std::lock_guard lock(mutex_); - shutdown_ = true; - } - - private: - mutable std::mutex mutex_; - bool shutdown_; + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} }; + std::vector> shutdown_state_; }; -- cgit v1.2.3 From 77b74258b56dcc19c3a6d6be32eca579eac95aae Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Tue, 19 Jul 2016 11:53:37 -0700 Subject: Revert unnecessary deletions --- test/cpp/end2end/async_end2end_test.cc | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) (limited to 'test') diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 084d6db500..ac79fe8274 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -344,6 +344,31 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) { SendRpc(10); } +// We do not need to protect notify because the use is synchronized. +void ServerWait(Server* server, int* notify) { + server->Wait(); + *notify = 1; +} +TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { + int notify = 0; + std::thread* wait_thread = + new std::thread(&ServerWait, server_.get(), ¬ify); + ResetStub(); + SendRpc(1); + EXPECT_EQ(0, notify); + server_->Shutdown(); + wait_thread->join(); + EXPECT_EQ(1, notify); + delete wait_thread; +} + +TEST_P(AsyncEnd2endTest, ShutdownThenWait) { + ResetStub(); + SendRpc(1); + server_->Shutdown(); + server_->Wait(); +} + // Test a simple RPC using the async version of Next TEST_P(AsyncEnd2endTest, AsyncNextRpc) { ResetStub(); -- cgit v1.2.3 From 6b88b22d9e844254406e8b4d572d4b0476c04cd2 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Tue, 19 Jul 2016 19:12:46 -0700 Subject: Add comments for GRPC_NO_GENERATED_CODE --- src/cpp/ext/proto_server_reflection.h | 14 ++++++++++++++ test/cpp/util/proto_reflection_descriptor_database.h | 1 + 2 files changed, 15 insertions(+) (limited to 'test') diff --git a/src/cpp/ext/proto_server_reflection.h b/src/cpp/ext/proto_server_reflection.h index 1a9a0c95a8..fa3b9406c6 100644 --- a/src/cpp/ext/proto_server_reflection.h +++ b/src/cpp/ext/proto_server_reflection.h @@ -30,12 +30,26 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ + +/* + - If the generated header `grpc++/ext/reflection.grpc.pb.h` needs to be + installed, target `grpc++_reflection` in `build.yaml` should use the + filegroup `grpc++_reflection_proto`, and GRPC_NO_GENERATED_CODE should not + be defined. + - If the server reflection library needs to generate `reflection.grpc.pb.h` + from `reflection.proto` at the time of making, the generated header + `grpc++/ext/reflection.grpc.pb.h` should not be installed. In this case, + target `grpc++_reflection` should depend on `grpc++_reflection_codegen`, and + GRPC_NO_GENERATED_CODE should be defined. +*/ + #ifndef GRPC_INTERNAL_CPP_EXT_PROTO_SERVER_REFLECTION_H #define GRPC_INTERNAL_CPP_EXT_PROTO_SERVER_REFLECTION_H #include #include +// GRPC_NO_GENERATED_CODE indicates generated pb files should not be used #ifdef GRPC_NO_GENERATED_CODE #include "src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.h" #else diff --git a/test/cpp/util/proto_reflection_descriptor_database.h b/test/cpp/util/proto_reflection_descriptor_database.h index 182323b2ec..eb7cf4907d 100644 --- a/test/cpp/util/proto_reflection_descriptor_database.h +++ b/test/cpp/util/proto_reflection_descriptor_database.h @@ -38,6 +38,7 @@ #include #include +// GRPC_NO_GENERATED_CODE indicates generated pb files should not be used #ifdef GRPC_NO_GENERATED_CODE #include "src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.h" #else -- cgit v1.2.3 From 9cb9445155804fb65af100b9747d467254fb7ca6 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Wed, 20 Jul 2016 16:39:31 -0700 Subject: Addressed review comments --- doc/command_line_tool.md | 29 +++++++++++++++------- test/cpp/util/grpc_cli.cc | 13 +++++----- test/cpp/util/proto_file_parser.cc | 2 +- .../util/proto_reflection_descriptor_database.cc | 4 +-- 4 files changed, 30 insertions(+), 18 deletions(-) (limited to 'test') diff --git a/doc/command_line_tool.md b/doc/command_line_tool.md index 89a70548b8..79a131c041 100644 --- a/doc/command_line_tool.md +++ b/doc/command_line_tool.md @@ -15,6 +15,7 @@ The command line tool can do the following things: - Send unary rpc. - Attach metadata and display received metadata. - Handle common authentication to server. +- Infer request/response types from server reflection result. - Find the request/response types from a given proto file. - Read proto request in text form. - Read request in wire form (for protobuf messages, this means serialized binary form). @@ -24,7 +25,6 @@ The command line tool can do the following things: The command line tool should support the following things: - List server services and methods through server reflection. -- Infer request/response types from server reflection result. - Fine-grained auth control (such as, use this oauth token to talk to the server). - Send streaming rpc. @@ -46,24 +46,35 @@ https://github.com/grpc/grpc/blob/master/test/cpp/util/grpc_cli.cc Send a rpc to a helloworld server at `localhost:50051`: ``` -$ bins/opt/grpc_cli call localhost:50051 SayHello examples/protos/helloworld.proto \ - "name: 'world'" --enable_ssl=false +$ bins/opt/grpc_cli call localhost:50051 SayHello "name: 'world'" \ + --enable_ssl=false ``` On success, the tool will print out ``` Rpc succeeded with OK status -Response: +Response: message: "Hello world" ``` The `localhost:50051` part indicates the server you are connecting to. `SayHello` is (part of) the -gRPC method string. Then there is the path to the proto file containing the service definition, -if it is not under current directory, you can use `--proto_path` to specify a new search root. -`"name: 'world'"` is the text format of the request proto message. -We are not using ssl here by `--enable_ssl=false`. For information on more -flags, look at the comments of `grpc_cli.cc`. +gRPC method string. Then `"name: 'world'"` is the text format of the request proto message. We are +not using ssl here by `--enable_ssl=false`. For information on more flags, look at the comments of `grpc_cli.cc`. + +### Use local proto files + +If the server does not have the server reflection service, you will need to provide local proto +files containing the service definition. The tool will try to find request/response types from +them. + +``` +$ bins/opt/grpc_cli call localhost:50051 SayHello "name: 'world'" \ + --protofiles=examples/protos/helloworld.proto --enable_ssl=false +``` + +If the proto files is not under current directory, you can use `--proto_path` to specify a new +search root. ### Send non-proto rpc diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc index fdb1a7c2a0..53529da782 100644 --- a/test/cpp/util/grpc_cli.cc +++ b/test/cpp/util/grpc_cli.cc @@ -35,14 +35,14 @@ A command line tool to talk to a grpc server. Example of talking to grpc interop server: grpc_cli call localhost:50051 UnaryCall "response_size:10" \ - --proto_file=src/proto/grpc/testing/test.proto --enable_ssl=false + --protofiles=src/proto/grpc/testing/test.proto --enable_ssl=false Options: - 1. --proto_file, use this flag to provide a proto file if the server does + 1. --protofiles, use this flag to provide a proto file if the server does does not have the reflection service. 2. --proto_path, if your proto file is not under current working directory, use this flag to provide a search root. It should work similar to the - counterpart in protoc. This option is valid only when proto_file is + counterpart in protoc. This option is valid only when protofiles is provided. 3. --metadata specifies metadata to be sent to the server, such as: --metadata="MyHeaderKey1:Value1:MyHeaderKey2:Value2" @@ -90,7 +90,8 @@ DEFINE_string(output_binary_file, "", DEFINE_string(metadata, "", "Metadata to send to server, in the form of key1:val1:key2:val2"); DEFINE_string(proto_path, ".", "Path to look for the proto file."); -DEFINE_string(proto_file, "", "Name of the proto file."); +// TODO(zyc): support a list of input proto files +DEFINE_string(protofiles, "", "Name of the proto file."); void ParseMetadataFlag( std::multimap* client_metadata) { @@ -173,9 +174,9 @@ int main(int argc, char** argv) { } if (!request_text.empty()) { - if (!FLAGS_proto_file.empty()) { + if (!FLAGS_protofiles.empty()) { parser.reset(new grpc::testing::ProtoFileParser( - FLAGS_proto_path, FLAGS_proto_file, method_name)); + FLAGS_proto_path, FLAGS_protofiles, method_name)); } else { parser.reset(new grpc::testing::ProtoFileParser(channel, method_name)); } diff --git a/test/cpp/util/proto_file_parser.cc b/test/cpp/util/proto_file_parser.cc index b1bf0471e1..5b0d925e1c 100644 --- a/test/cpp/util/proto_file_parser.cc +++ b/test/cpp/util/proto_file_parser.cc @@ -112,7 +112,7 @@ ProtoFileParser::ProtoFileParser(std::shared_ptr channel, LogError( "Failed to get services from the server, " "it may not have the reflection service.\n" - "Please try to use the --proto_file option to provide a proto file."); + "Please try to use the --protofiles option to provide a proto file."); } if (has_error_) { return; diff --git a/test/cpp/util/proto_reflection_descriptor_database.cc b/test/cpp/util/proto_reflection_descriptor_database.cc index 48998551a5..2d847012a2 100644 --- a/test/cpp/util/proto_reflection_descriptor_database.cc +++ b/test/cpp/util/proto_reflection_descriptor_database.cc @@ -54,8 +54,8 @@ ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase( : stub_(ServerReflection::NewStub(channel)) {} ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() { - if (!stream_) { - GetStream()->WritesDone(); + if (stream_) { + stream_->WritesDone(); Status status = stream_->Finish(); if (!status.ok()) { gpr_log(GPR_ERROR, -- cgit v1.2.3 From fca59fb2aa4a0be093d12162c763b8563ada4087 Mon Sep 17 00:00:00 2001 From: Sanjay Ghemawat Date: Thu, 21 Jul 2016 09:32:06 -0700 Subject: Add gpr_slice_new_with_user_data. gpr_slice_new_with_user_data is like gpr_slice_new, but allows the caller to specify a distinct pointer to pass to the destroy function. This is useful when the data is part of a larger data structure that should be destroyed when the data is no longer needed. --- include/grpc/impl/codegen/slice.h | 8 ++++++++ src/core/lib/support/slice.c | 11 +++++++++-- test/core/support/slice_test.c | 22 ++++++++++++++++++++++ 3 files changed, 39 insertions(+), 2 deletions(-) (limited to 'test') diff --git a/include/grpc/impl/codegen/slice.h b/include/grpc/impl/codegen/slice.h index c684b7587d..dfc0a774fc 100644 --- a/include/grpc/impl/codegen/slice.h +++ b/include/grpc/impl/codegen/slice.h @@ -120,6 +120,14 @@ GPRAPI void gpr_slice_unref(gpr_slice s); passed in at destruction. */ GPRAPI gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)); +/* Equivalent to gpr_slice_new, but with a separate pointer that is + passed to the destroy function. This function can be useful when + the data is part of a larger structure that must be destroyed when + the data is no longer needed. */ +GPRAPI gpr_slice gpr_slice_new_with_user_data(void *p, size_t len, + void (*destroy)(void *), + void *user_data); + /* Equivalent to gpr_slice_new, but with a two argument destroy function that also takes the slice length. */ GPRAPI gpr_slice gpr_slice_new_with_len(void *p, size_t len, diff --git a/src/core/lib/support/slice.c b/src/core/lib/support/slice.c index b9a7c77bda..8a2c0a9086 100644 --- a/src/core/lib/support/slice.c +++ b/src/core/lib/support/slice.c @@ -94,14 +94,16 @@ static void new_slice_unref(void *p) { } } -gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) { +gpr_slice gpr_slice_new_with_user_data(void *p, size_t len, + void (*destroy)(void *), + void *user_data) { gpr_slice slice; new_slice_refcount *rc = gpr_malloc(sizeof(new_slice_refcount)); gpr_ref_init(&rc->refs, 1); rc->rc.ref = new_slice_ref; rc->rc.unref = new_slice_unref; rc->user_destroy = destroy; - rc->user_data = p; + rc->user_data = user_data; slice.refcount = &rc->rc; slice.data.refcounted.bytes = p; @@ -109,6 +111,11 @@ gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) { return slice; } +gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) { + /* Pass "p" to *destroy when the slice is no longer needed. */ + return gpr_slice_new_with_user_data(p, len, destroy, p); +} + /* gpr_slice_new_with_len support structures - we create a refcount object extended with the user provided data pointer & destroy function */ typedef struct new_with_len_slice_refcount { diff --git a/test/core/support/slice_test.c b/test/core/support/slice_test.c index 0da483a321..06c364b368 100644 --- a/test/core/support/slice_test.c +++ b/test/core/support/slice_test.c @@ -85,6 +85,27 @@ static void test_slice_new_returns_something_sensible(void) { gpr_slice_unref(slice); } +/* destroy function that sets a mark to indicate it was called. */ +static void set_mark(void *p) { *((int *)p) = 1; } + +static void test_slice_new_with_user_data(void) { + int marker = 0; + uint8_t buf[2]; + gpr_slice slice; + + buf[0] = 0; + buf[1] = 1; + slice = gpr_slice_new_with_user_data(buf, 2, set_mark, &marker); + GPR_ASSERT(marker == 0); + GPR_ASSERT(GPR_SLICE_LENGTH(slice) == 2); + GPR_ASSERT(GPR_SLICE_START_PTR(slice)[0] == 0); + GPR_ASSERT(GPR_SLICE_START_PTR(slice)[1] == 1); + + /* unref should cause destroy function to run. */ + gpr_slice_unref(slice); + GPR_ASSERT(marker == 1); +} + static int do_nothing_with_len_1_calls = 0; static void do_nothing_with_len_1(void *ignored, size_t len) { @@ -232,6 +253,7 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); test_slice_malloc_returns_something_sensible(); test_slice_new_returns_something_sensible(); + test_slice_new_with_user_data(); test_slice_new_with_len_returns_something_sensible(); for (length = 0; length < 128; length++) { test_slice_sub_works(length); -- cgit v1.2.3