diff options
author | 2017-09-18 08:57:19 -0700 | |
---|---|---|
committer | 2017-09-18 08:57:19 -0700 | |
commit | adbcec4f57f7e25859326a6daf8ad7e08f2805ca (patch) | |
tree | c029f503127946f0564134d343c8af9f88ac8da5 /src | |
parent | 7f10c299c7eb5d6688c20ae43c63b0548ca8acd1 (diff) | |
parent | a5f46e29bcee3068f5ec6691b7e06cf944430881 (diff) |
Merge github.com:grpc/grpc into pollset_kick_stats
Diffstat (limited to 'src')
80 files changed, 1807 insertions, 1279 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index b09bf99677..c2db8eff71 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -165,25 +165,37 @@ void PrintHeaderClientMethodInterfaces( (*vars)["Request"] = method->input_type_name(); (*vars)["Response"] = method->output_type_name(); + struct { + grpc::string prefix; + grpc::string method_params; // extra arguments to method + grpc::string raw_args; // extra arguments to raw version of method + } async_prefixes[] = {{"Async", ", void* tag", ", tag"}, + {"PrepareAsync", "", ""}}; + if (is_public) { if (method->NoStreaming()) { printer->Print( *vars, "virtual ::grpc::Status $Method$(::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) = 0;\n"); - printer->Print(*vars, - "std::unique_ptr< " - "::grpc::ClientAsyncResponseReaderInterface< $Response$>> " - "Async$Method$(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) {\n"); - printer->Indent(); - printer->Print(*vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncResponseReaderInterface< $Response$>>(" - "Async$Method$Raw(context, request, cq));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + printer->Print( + *vars, + "std::unique_ptr< " + "::grpc::ClientAsyncResponseReaderInterface< $Response$>> " + "$AsyncPrefix$$Method$(::grpc::ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) {\n"); + printer->Indent(); + printer->Print( + *vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncResponseReaderInterface< $Response$>>(" + "$AsyncPrefix$$Method$Raw(context, request, cq));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, @@ -197,19 +209,26 @@ void PrintHeaderClientMethodInterfaces( "($Method$Raw(context, response));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print( - *vars, - "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>" - " Async$Method$(::grpc::ClientContext* context, $Response$* " - "response, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Indent(); - printer->Print(*vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncWriterInterface< $Request$>>(" - "Async$Method$Raw(context, response, cq, tag));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>" + " $AsyncPrefix$$Method$(::grpc::ClientContext* context, " + "$Response$* " + "response, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Indent(); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncWriterInterface< $Request$>>(" + "$AsyncPrefix$$Method$Raw(context, response, " + "cq$AsyncRawArgs$));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -223,19 +242,25 @@ void PrintHeaderClientMethodInterfaces( "($Method$Raw(context, request));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print( - *vars, - "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< $Response$>> " - "Async$Method$(" - "::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Indent(); - printer->Print(*vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncReaderInterface< $Response$>>(" - "Async$Method$Raw(context, request, cq, tag));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "std::unique_ptr< ::grpc::ClientAsyncReaderInterface< $Response$>> " + "$AsyncPrefix$$Method$(" + "::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Indent(); + printer->Print( + *vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncReaderInterface< $Response$>>(" + "$AsyncPrefix$$Method$Raw(context, request, cq$AsyncRawArgs$));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } else if (method->BidiStreaming()) { printer->Print(*vars, "std::unique_ptr< ::grpc::ClientReaderWriterInterface< " @@ -249,61 +274,83 @@ void PrintHeaderClientMethodInterfaces( "$Method$Raw(context));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print( - *vars, - "std::unique_ptr< " - "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>> " - "Async$Method$(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>>(" - "Async$Method$Raw(context, cq, tag));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "std::unique_ptr< " + "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>> " + "$AsyncPrefix$$Method$(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Indent(); + printer->Print( + *vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncReaderWriterInterface< $Request$, $Response$>>(" + "$AsyncPrefix$$Method$Raw(context, cq$AsyncRawArgs$));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } } else { if (method->NoStreaming()) { - printer->Print( - *vars, - "virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) = 0;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + printer->Print( + *vars, + "virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* " + "$AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) = 0;\n"); + } } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, "virtual ::grpc::ClientWriterInterface< $Request$>*" " $Method$Raw(" "::grpc::ClientContext* context, $Response$* response) = 0;\n"); - printer->Print(*vars, - "virtual ::grpc::ClientAsyncWriterInterface< $Request$>*" - " Async$Method$Raw(::grpc::ClientContext* context, " - "$Response$* response, " - "::grpc::CompletionQueue* cq, void* tag) = 0;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + printer->Print( + *vars, + "virtual ::grpc::ClientAsyncWriterInterface< $Request$>*" + " $AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, " + "$Response$* response, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) = 0;\n"); + } } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, "virtual ::grpc::ClientReaderInterface< $Response$>* $Method$Raw(" "::grpc::ClientContext* context, const $Request$& request) = 0;\n"); - printer->Print( - *vars, - "virtual ::grpc::ClientAsyncReaderInterface< $Response$>* " - "Async$Method$Raw(" - "::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag) = 0;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + printer->Print( + *vars, + "virtual ::grpc::ClientAsyncReaderInterface< $Response$>* " + "$AsyncPrefix$$Method$Raw(" + "::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) = 0;\n"); + } } else if (method->BidiStreaming()) { printer->Print(*vars, "virtual ::grpc::ClientReaderWriterInterface< $Request$, " "$Response$>* " "$Method$Raw(::grpc::ClientContext* context) = 0;\n"); - printer->Print(*vars, - "virtual ::grpc::ClientAsyncReaderWriterInterface< " - "$Request$, $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) = 0;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + printer->Print( + *vars, + "virtual ::grpc::ClientAsyncReaderWriterInterface< " + "$Request$, $Response$>* " + "$AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) = 0;\n"); + } } } } @@ -315,25 +362,35 @@ void PrintHeaderClientMethod(grpc_generator::Printer *printer, (*vars)["Method"] = method->name(); (*vars)["Request"] = method->input_type_name(); (*vars)["Response"] = method->output_type_name(); + struct { + grpc::string prefix; + grpc::string method_params; // extra arguments to method + grpc::string raw_args; // extra arguments to raw version of method + } async_prefixes[] = {{"Async", ", void* tag", ", tag"}, + {"PrepareAsync", "", ""}}; + if (is_public) { if (method->NoStreaming()) { printer->Print( *vars, "::grpc::Status $Method$(::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) override;\n"); - printer->Print( - *vars, - "std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> " - "Async$Method$(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) {\n"); - printer->Indent(); - printer->Print(*vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncResponseReader< $Response$>>(" - "Async$Method$Raw(context, request, cq));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + printer->Print( + *vars, + "std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> " + "$AsyncPrefix$$Method$(::grpc::ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) {\n"); + printer->Indent(); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncResponseReader< $Response$>>(" + "$AsyncPrefix$$Method$Raw(context, request, cq));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, @@ -346,18 +403,24 @@ void PrintHeaderClientMethod(grpc_generator::Printer *printer, "($Method$Raw(context, response));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print(*vars, - "std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>" - " Async$Method$(::grpc::ClientContext* context, " - "$Response$* response, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>(" - "Async$Method$Raw(context, response, cq, tag));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print(*vars, + "std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>" + " $AsyncPrefix$$Method$(::grpc::ClientContext* context, " + "$Response$* response, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Indent(); + printer->Print( + *vars, + "return std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>(" + "$AsyncPrefix$$Method$Raw(context, response, " + "cq$AsyncRawArgs$));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -371,19 +434,24 @@ void PrintHeaderClientMethod(grpc_generator::Printer *printer, "($Method$Raw(context, request));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print( - *vars, - "std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> " - "Async$Method$(" - "::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>>(" - "Async$Method$Raw(context, request, cq, tag));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> " + "$AsyncPrefix$$Method$(" + "::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Indent(); + printer->Print( + *vars, + "return std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>>(" + "$AsyncPrefix$$Method$Raw(context, request, cq$AsyncRawArgs$));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } else if (method->BidiStreaming()) { printer->Print( *vars, @@ -396,53 +464,80 @@ void PrintHeaderClientMethod(grpc_generator::Printer *printer, "$Method$Raw(context));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print(*vars, - "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " - "$Request$, $Response$>> " - "Async$Method$(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Indent(); - printer->Print(*vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>(" - "Async$Method$Raw(context, cq, tag));\n"); - printer->Outdent(); - printer->Print("}\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print(*vars, + "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " + "$Request$, $Response$>> " + "$AsyncPrefix$$Method$(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Indent(); + printer->Print( + *vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>(" + "$AsyncPrefix$$Method$Raw(context, cq$AsyncRawArgs$));\n"); + printer->Outdent(); + printer->Print("}\n"); + } } } else { if (method->NoStreaming()) { - printer->Print(*vars, - "::grpc::ClientAsyncResponseReader< $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) override;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + printer->Print( + *vars, + "::grpc::ClientAsyncResponseReader< $Response$>* " + "$AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) override;\n"); + } } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, "::grpc::ClientWriter< $Request$>* $Method$Raw(" "::grpc::ClientContext* context, $Response$* response) " "override;\n"); - printer->Print(*vars, - "::grpc::ClientAsyncWriter< $Request$>* Async$Method$Raw(" - "::grpc::ClientContext* context, $Response$* response, " - "::grpc::CompletionQueue* cq, void* tag) override;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "::grpc::ClientAsyncWriter< $Request$>* $AsyncPrefix$$Method$Raw(" + "::grpc::ClientContext* context, $Response$* response, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) override;\n"); + } } else if (ServerOnlyStreaming(method)) { printer->Print(*vars, "::grpc::ClientReader< $Response$>* $Method$Raw(" "::grpc::ClientContext* context, const $Request$& request)" " override;\n"); - printer->Print( - *vars, - "::grpc::ClientAsyncReader< $Response$>* Async$Method$Raw(" - "::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag) override;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "::grpc::ClientAsyncReader< $Response$>* $AsyncPrefix$$Method$Raw(" + "::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) override;\n"); + } } else if (method->BidiStreaming()) { printer->Print(*vars, "::grpc::ClientReaderWriter< $Request$, $Response$>* " "$Method$Raw(::grpc::ClientContext* context) override;\n"); - printer->Print(*vars, - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) override;\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncRawArgs"] = async_prefix.raw_args; + printer->Print( + *vars, + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " + "$AsyncPrefix$$Method$Raw(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) override;\n"); + } } } } @@ -1077,6 +1172,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, (*vars)["Method"] = method->name(); (*vars)["Request"] = method->input_type_name(); (*vars)["Response"] = method->output_type_name(); + struct { + grpc::string prefix; + grpc::string start; // bool literal expressed as string + grpc::string method_params; // extra arguments to method + grpc::string create_args; // extra arguments to creator + } async_prefixes[] = {{"Async", "true", ", void* tag", ", tag"}, + {"PrepareAsync", "false", "", ", nullptr"}}; if (method->NoStreaming()) { printer->Print(*vars, "::grpc::Status $ns$$Service$::Stub::$Method$(" @@ -1087,19 +1189,23 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "rpcmethod_$Method$_, " "context, request, response);\n" "}\n\n"); - printer->Print( - *vars, - "::grpc::ClientAsyncResponseReader< $Response$>* " - "$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) {\n"); - printer->Print(*vars, - " return " - "::grpc::ClientAsyncResponseReader< $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, request);\n" - "}\n\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncStart"] = async_prefix.start; + printer->Print(*vars, + "::grpc::ClientAsyncResponseReader< $Response$>* " + "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(::grpc::" + "ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) {\n"); + printer->Print(*vars, + " return " + "::grpc::ClientAsyncResponseReader< $Response$>::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, request, $AsyncStart$);\n" + "}\n\n"); + } } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, "::grpc::ClientWriter< $Request$>* " @@ -1111,17 +1217,23 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "rpcmethod_$Method$_, " "context, response);\n" "}\n\n"); - printer->Print(*vars, - "::grpc::ClientAsyncWriter< $Request$>* " - "$ns$$Service$::Stub::Async$Method$Raw(" - "::grpc::ClientContext* context, $Response$* response, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print(*vars, - " return ::grpc::ClientAsyncWriter< $Request$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, response, tag);\n" - "}\n\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncStart"] = async_prefix.start; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncCreateArgs"] = async_prefix.create_args; + printer->Print(*vars, + "::grpc::ClientAsyncWriter< $Request$>* " + "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(" + "::grpc::ClientContext* context, $Response$* response, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Print(*vars, + " return ::grpc::ClientAsyncWriter< $Request$>::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, response, $AsyncStart$$AsyncCreateArgs$);\n" + "}\n\n"); + } } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -1134,17 +1246,24 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "rpcmethod_$Method$_, " "context, request);\n" "}\n\n"); - printer->Print(*vars, - "::grpc::ClientAsyncReader< $Response$>* " - "$ns$$Service$::Stub::Async$Method$Raw(" - "::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print(*vars, - " return ::grpc::ClientAsyncReader< $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, request, tag);\n" - "}\n\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncStart"] = async_prefix.start; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncCreateArgs"] = async_prefix.create_args; + printer->Print( + *vars, + "::grpc::ClientAsyncReader< $Response$>* " + "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(" + "::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Print(*vars, + " return ::grpc::ClientAsyncReader< $Response$>::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, request, $AsyncStart$$AsyncCreateArgs$);\n" + "}\n\n"); + } } else if (method->BidiStreaming()) { printer->Print( *vars, @@ -1157,19 +1276,25 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "rpcmethod_$Method$_, " "context);\n" "}\n\n"); - printer->Print( - *vars, - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " - "$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print( - *vars, - " return " - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, tag);\n" - "}\n\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncStart"] = async_prefix.start; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["AsyncCreateArgs"] = async_prefix.create_args; + printer->Print(*vars, + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " + "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(::grpc::" + "ClientContext* context, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); + printer->Print( + *vars, + " return " + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, $AsyncStart$$AsyncCreateArgs$);\n" + "}\n\n"); + } } } @@ -1460,50 +1585,79 @@ void PrintMockClientMethods(grpc_generator::Printer *printer, (*vars)["Request"] = method->input_type_name(); (*vars)["Response"] = method->output_type_name(); + struct { + grpc::string prefix; + grpc::string method_params; // extra arguments to method + int extra_method_param_count; + } async_prefixes[] = {{"Async", ", void* tag", 1}, {"PrepareAsync", "", 0}}; + if (method->NoStreaming()) { printer->Print( *vars, "MOCK_METHOD3($Method$, ::grpc::Status(::grpc::ClientContext* context, " "const $Request$& request, $Response$* response));\n"); - printer->Print(*vars, - "MOCK_METHOD3(Async$Method$Raw, " - "::grpc::ClientAsyncResponseReaderInterface< $Response$>*" - "(::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq));\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + printer->Print( + *vars, + "MOCK_METHOD3($AsyncPrefix$$Method$Raw, " + "::grpc::ClientAsyncResponseReaderInterface< $Response$>*" + "(::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq));\n"); + } } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, "MOCK_METHOD2($Method$Raw, " "::grpc::ClientWriterInterface< $Request$>*" "(::grpc::ClientContext* context, $Response$* response));\n"); - printer->Print(*vars, - "MOCK_METHOD4(Async$Method$Raw, " - "::grpc::ClientAsyncWriterInterface< $Request$>*" - "(::grpc::ClientContext* context, $Response$* response, " - "::grpc::CompletionQueue* cq, void* tag));\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["MockArgs"] = + std::to_string(3 + async_prefix.extra_method_param_count); + printer->Print(*vars, + "MOCK_METHOD$MockArgs$($AsyncPrefix$$Method$Raw, " + "::grpc::ClientAsyncWriterInterface< $Request$>*" + "(::grpc::ClientContext* context, $Response$* response, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$));\n"); + } } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, "MOCK_METHOD2($Method$Raw, " "::grpc::ClientReaderInterface< $Response$>*" "(::grpc::ClientContext* context, const $Request$& request));\n"); - printer->Print(*vars, - "MOCK_METHOD4(Async$Method$Raw, " - "::grpc::ClientAsyncReaderInterface< $Response$>*" - "(::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag));\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["MockArgs"] = + std::to_string(3 + async_prefix.extra_method_param_count); + printer->Print( + *vars, + "MOCK_METHOD$MockArgs$($AsyncPrefix$$Method$Raw, " + "::grpc::ClientAsyncReaderInterface< $Response$>*" + "(::grpc::ClientContext* context, const $Request$& request, " + "::grpc::CompletionQueue* cq$AsyncMethodParams$));\n"); + } } else if (method->BidiStreaming()) { printer->Print( *vars, "MOCK_METHOD1($Method$Raw, " "::grpc::ClientReaderWriterInterface< $Request$, $Response$>*" "(::grpc::ClientContext* context));\n"); - printer->Print( - *vars, - "MOCK_METHOD3(Async$Method$Raw, " - "::grpc::ClientAsyncReaderWriterInterface<$Request$, $Response$>*" - "(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, " - "void* tag));\n"); + for (auto async_prefix : async_prefixes) { + (*vars)["AsyncPrefix"] = async_prefix.prefix; + (*vars)["AsyncMethodParams"] = async_prefix.method_params; + (*vars)["MockArgs"] = + std::to_string(2 + async_prefix.extra_method_param_count); + printer->Print( + *vars, + "MOCK_METHOD$MockArgs$($AsyncPrefix$$Method$Raw, " + "::grpc::ClientAsyncReaderWriterInterface<$Request$, $Response$>*" + "(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq" + "$AsyncMethodParams$));\n"); + } } } diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index 31b177c28e..a60b528e3b 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -769,7 +769,7 @@ bool PythonGrpcGenerator::Generate(const FileDescriptor* file, PrivateGenerator generator(config_, &pbfile); if (parameter == "grpc_2_0") { return GenerateGrpc(context, generator, pb2_grpc_file_name, true); - } else if (parameter == "") { + } else if (parameter == "grpc_1_0" || parameter == "") { return GenerateGrpc(context, generator, pb2_grpc_file_name, true) && GenerateGrpc(context, generator, pb2_file_name, false); } else { diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index e5f6fa76ae..3844b98021 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -86,7 +86,7 @@ static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) { static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, grpc_cq_completion *ignored) { - int delete = 0; + bool should_delete = false; state_watcher *w = (state_watcher *)pw; gpr_mu_lock(&w->mu); switch (w->phase) { @@ -94,12 +94,12 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, case READY_TO_CALL_BACK: GPR_UNREACHABLE_CODE(return ); case CALLING_BACK_AND_FINISHED: - delete = 1; + should_delete = true; break; } gpr_mu_unlock(&w->mu); - if (delete) { + if (should_delete) { delete_state_watcher(exec_ctx, w); } } @@ -161,12 +161,12 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, grpc_error *error) { - partly_done(exec_ctx, pw, true, GRPC_ERROR_REF(error)); + partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error)); } static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, grpc_error *error) { - partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error)); + partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error)); } int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) { diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index e5f4a8a813..129d0f368b 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -85,7 +85,7 @@ static void method_parameters_unref(method_parameters *method_params) { } static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) { - method_parameters_unref(value); + method_parameters_unref((method_parameters *)value); } static bool parse_wait_for_ready(grpc_json *field, @@ -717,7 +717,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "client channel factory arg must be a pointer"); } - grpc_client_channel_factory_ref(arg->value.pointer.p); + grpc_client_channel_factory_ref( + (grpc_client_channel_factory *)arg->value.pointer.p); chand->client_channel_factory = (grpc_client_channel_factory *)arg->value.pointer.p; // Get server name to resolve, using proxy mapper if needed. diff --git a/src/core/ext/filters/client_channel/client_channel_factory.c b/src/core/ext/filters/client_channel/client_channel_factory.c index 7220a8639e..e8aa4cda29 100644 --- a/src/core/ext/filters/client_channel/client_channel_factory.c +++ b/src/core/ext/filters/client_channel/client_channel_factory.c @@ -43,14 +43,13 @@ grpc_channel* grpc_client_channel_factory_create_channel( } static void* factory_arg_copy(void* factory) { - grpc_client_channel_factory_ref(factory); + grpc_client_channel_factory_ref((grpc_client_channel_factory*)factory); return factory; } static void factory_arg_destroy(grpc_exec_ctx* exec_ctx, void* factory) { - // TODO(roth): Remove local exec_ctx when - // https://github.com/grpc/grpc/pull/8705 is merged. - grpc_client_channel_factory_unref(exec_ctx, factory); + grpc_client_channel_factory_unref(exec_ctx, + (grpc_client_channel_factory*)factory); } static int factory_arg_cmp(void* factory1, void* factory2) { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c index bd290464c8..7ad322902b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -75,7 +75,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->context != NULL); GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL); calld->client_stats = grpc_grpclb_client_stats_ref( - args->context[GRPC_GRPCLB_CLIENT_STATS].value); + (grpc_grpclb_client_stats *)args->context[GRPC_GRPCLB_CLIENT_STATS] + .value); // Record call started. grpc_grpclb_client_stats_add_call_started(calld->client_stats); return GRPC_ERROR_NONE; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 18979829bd..a776a07d99 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -101,6 +101,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/combiner.h" @@ -122,7 +123,6 @@ #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 -#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb"); @@ -138,7 +138,7 @@ static grpc_error *initial_metadata_add_lb_token( } static void destroy_client_stats(void *arg) { - grpc_grpclb_client_stats_unref(arg); + grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats *)arg); } typedef struct wrapped_rr_closure_arg { @@ -286,7 +286,7 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { * glb_lb_policy */ typedef struct rr_connectivity_data rr_connectivity_data; -static const grpc_lb_policy_vtable glb_lb_policy_vtable; + typedef struct glb_lb_policy { /** base policy: must be first */ grpc_lb_policy base; @@ -299,10 +299,6 @@ typedef struct glb_lb_policy { /** timeout in milliseconds for the LB call. 0 means no deadline. */ int lb_call_timeout_ms; - /** timeout in milliseconds for before using fallback backend addresses. - * 0 means not using fallback. */ - int lb_fallback_timeout_ms; - /** for communicating with the LB server */ grpc_channel *lb_channel; @@ -329,9 +325,6 @@ typedef struct glb_lb_policy { * Otherwise, we delegate to the RR policy. */ size_t serverlist_index; - /** stores the backend addresses from the resolver */ - grpc_lb_addresses *fallback_backend_addresses; - /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; @@ -352,9 +345,6 @@ typedef struct glb_lb_policy { /** is \a lb_call_retry_timer active? */ bool retry_timer_active; - /** is \a lb_fallback_timer active? */ - bool fallback_timer_active; - /** called upon changes to the LB channel's connectivity. */ grpc_closure lb_channel_on_connectivity_changed; @@ -377,9 +367,6 @@ typedef struct glb_lb_policy { /* LB call retry timer callback. */ grpc_closure lb_on_call_retry; - /* LB fallback timer callback. */ - grpc_closure lb_on_fallback; - grpc_call *lb_call; /* streaming call to the LB server, */ grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ @@ -403,9 +390,6 @@ typedef struct glb_lb_policy { /** LB call retry timer */ grpc_timer lb_call_retry_timer; - /** LB fallback timer */ - grpc_timer lb_fallback_timer; - bool initial_request_sent; bool seen_initial_response; @@ -552,32 +536,6 @@ static grpc_lb_addresses *process_serverlist_locked( return lb_addresses; } -/* Returns the backend addresses extracted from the given addresses */ -static grpc_lb_addresses *extract_backend_addresses_locked( - grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses) { - /* first pass: count the number of backend addresses */ - size_t num_backends = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) { - ++num_backends; - } - } - /* second pass: actually populate the addresses and (empty) LB tokens */ - grpc_lb_addresses *backend_addresses = - grpc_lb_addresses_create(num_backends, &lb_token_vtable); - size_t num_copied = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) continue; - const grpc_resolved_address *addr = &addresses->addresses[i].address; - grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, - addr->len, false /* is_balancer */, - NULL /* balancer_name */, - (void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); - ++num_copied; - } - return backend_addresses; -} - static void update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_connectivity_state rr_state, grpc_error *rr_state_error) { @@ -645,38 +603,35 @@ static bool pick_from_internal_rr_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, const grpc_lb_policy_pick_args *pick_args, bool force_async, grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { - // Check for drops if we are not using fallback backend addresses. - if (glb_policy->serverlist != NULL) { - // Look at the index into the serverlist to see if we should drop this call. - grpc_grpclb_server *server = - glb_policy->serverlist->servers[glb_policy->serverlist_index++]; - if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { - glb_policy->serverlist_index = 0; // Wrap-around. + // Look at the index into the serverlist to see if we should drop this call. + grpc_grpclb_server *server = + glb_policy->serverlist->servers[glb_policy->serverlist_index++]; + if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { + glb_policy->serverlist_index = 0; // Wrap-around. + } + if (server->drop) { + // Not using the RR policy, so unref it. + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); } - if (server->drop) { - // Not using the RR policy, so unref it. - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", - (intptr_t)wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); - // Update client load reporting stats to indicate the number of - // dropped calls. Note that we have to do this here instead of in - // the client_load_reporting filter, because we do not create a - // subchannel call (and therefore no client_load_reporting filter) - // for dropped calls. - grpc_grpclb_client_stats_add_call_dropped_locked( - server->load_balance_token, wc_arg->client_stats); - grpc_grpclb_client_stats_unref(wc_arg->client_stats); - if (force_async) { - GPR_ASSERT(wc_arg->wrapped_closure != NULL); - GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); - gpr_free(wc_arg->free_when_done); - return false; - } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); + // Update client load reporting stats to indicate the number of + // dropped calls. Note that we have to do this here instead of in + // the client_load_reporting filter, because we do not create a + // subchannel call (and therefore no client_load_reporting filter) + // for dropped calls. + grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token, + wc_arg->client_stats); + grpc_grpclb_client_stats_unref(wc_arg->client_stats); + if (force_async) { + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); gpr_free(wc_arg->free_when_done); - return true; + return false; } + gpr_free(wc_arg->free_when_done); + return true; } // Pick via the RR policy. const bool pick_done = grpc_lb_policy_pick_locked( @@ -714,18 +669,8 @@ static bool pick_from_internal_rr_locked( static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { - grpc_lb_addresses *addresses; - if (glb_policy->serverlist != NULL) { - GPR_ASSERT(glb_policy->serverlist->num_servers > 0); - addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist); - } else { - // If rr_handover_locked() is invoked when we haven't received any - // serverlist from the balancer, we use the fallback backends returned by - // the resolver. Note that the fallback backend list may be empty, in which - // case the new round_robin policy will keep the requested picks pending. - GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); - addresses = grpc_lb_addresses_copy(glb_policy->fallback_backend_addresses); - } + grpc_lb_addresses *addresses = + process_serverlist_locked(exec_ctx, glb_policy->serverlist); GPR_ASSERT(addresses != NULL); grpc_lb_policy_args *args = (grpc_lb_policy_args *)gpr_zalloc(sizeof(*args)); args->client_channel_factory = glb_policy->cc_factory; @@ -783,7 +728,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, /* Allocate the data for the tracking of the new RR policy's connectivity. * It'll be deallocated in glb_rr_connectivity_changed() */ rr_connectivity_data *rr_connectivity = - gpr_zalloc(sizeof(rr_connectivity_data)); + (rr_connectivity_data *)gpr_zalloc(sizeof(rr_connectivity_data)); GRPC_CLOSURE_INIT(&rr_connectivity->on_change, glb_rr_connectivity_changed_locked, rr_connectivity, grpc_combiner_scheduler(glb_policy->base.combiner)); @@ -831,6 +776,8 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, /* glb_policy->rr_policy may be NULL (initial handover) */ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { + GPR_ASSERT(glb_policy->serverlist != NULL && + glb_policy->serverlist->num_servers > 0); if (glb_policy->shutting_down) return; grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); GPR_ASSERT(args != NULL); @@ -923,7 +870,8 @@ static grpc_channel_args *build_lb_channel_args( grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_grpclb_addrs, NULL); grpc_slice_hash_table_entry *targets_info_entries = - gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs); + (grpc_slice_hash_table_entry *)gpr_zalloc(sizeof(*targets_info_entries) * + num_grpclb_addrs); size_t lb_addresses_idx = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { @@ -965,96 +913,6 @@ static grpc_channel_args *build_lb_channel_args( return result; } -static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error); -static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, - grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { - /* Count the number of gRPC-LB addresses. There must be at least one. */ - const grpc_arg *arg = - grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - return NULL; - } - grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; - } - if (num_grpclb_addrs == 0) return NULL; - - glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy)); - - /* Get server name. */ - arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); - GPR_ASSERT(arg != NULL); - GPR_ASSERT(arg->type == GRPC_ARG_STRING); - grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true); - GPR_ASSERT(uri->path[0] != '\0'); - glb_policy->server_name = - gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.", - glb_policy->server_name); - } - grpc_uri_destroy(uri); - - glb_policy->cc_factory = args->client_channel_factory; - GPR_ASSERT(glb_policy->cc_factory != NULL); - - arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); - glb_policy->lb_call_timeout_ms = - grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); - - arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); - glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer( - arg, (grpc_integer_options){GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, - INT_MAX}); - - // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, - // since we use this to trigger the client_load_reporting filter. - grpc_arg new_arg = - grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb"); - static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; - glb_policy->args = grpc_channel_args_copy_and_add_and_remove( - args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); - - /* Extract the backend addresses (may be empty) from the resolver for - * fallback. */ - glb_policy->fallback_backend_addresses = - extract_backend_addresses_locked(exec_ctx, addresses); - - /* Create a client channel over them to communicate with a LB service */ - glb_policy->response_generator = - grpc_fake_resolver_response_generator_create(); - grpc_channel_args *lb_channel_args = build_lb_channel_args( - exec_ctx, addresses, glb_policy->response_generator, args->args); - char *uri_str; - gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name); - glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel( - exec_ctx, uri_str, args->client_channel_factory, lb_channel_args); - - /* Propagate initial resolution */ - grpc_fake_resolver_response_generator_set_response( - exec_ctx, glb_policy->response_generator, lb_channel_args); - grpc_channel_args_destroy(exec_ctx, lb_channel_args); - gpr_free(uri_str); - if (glb_policy->lb_channel == NULL) { - gpr_free((void *)glb_policy->server_name); - grpc_channel_args_destroy(exec_ctx, glb_policy->args); - gpr_free(glb_policy); - return NULL; - } - GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, - glb_lb_channel_on_connectivity_changed_cb, glb_policy, - grpc_combiner_scheduler(args->combiner)); - grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); - grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, - "grpclb"); - return &glb_policy->base; -} - static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; GPR_ASSERT(glb_policy->pending_picks == NULL); @@ -1068,10 +926,8 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->serverlist != NULL) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } - if (glb_policy->fallback_backend_addresses != NULL) { - grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); - } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); + grpc_subchannel_index_unref(); if (glb_policy->pending_update_args != NULL) { grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args); gpr_free(glb_policy->pending_update_args); @@ -1211,28 +1067,10 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy); static void start_picking_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { - /* start a timer to fall back */ - if (glb_policy->lb_fallback_timeout_ms > 0 && - glb_policy->serverlist == NULL) { - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec deadline = gpr_time_add( - now, - gpr_time_from_millis(glb_policy->lb_fallback_timeout_ms, GPR_TIMESPAN)); - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); - GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, - glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); - glb_policy->fallback_timer_active = true; - grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline, - &glb_policy->lb_on_fallback, now); - } - glb_policy->started_picking = true; gpr_backoff_reset(&glb_policy->lb_call_backoff_state); query_for_backends_locked(exec_ctx, glb_policy); @@ -1382,7 +1220,8 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, static bool load_report_counters_are_zero(grpc_grpclb_request *request) { grpc_grpclb_dropped_call_counts *drop_entries = - request->client_stats.calls_finished_with_drop.arg; + (grpc_grpclb_dropped_call_counts *) + request->client_stats.calls_finished_with_drop.arg; return request->client_stats.num_calls_started == 0 && request->client_stats.num_calls_finished == 0 && request->client_stats.num_calls_finished_with_client_failed_to_send == @@ -1686,15 +1525,6 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, if (glb_policy->serverlist != NULL) { /* dispose of the old serverlist */ grpc_grpclb_destroy_serverlist(glb_policy->serverlist); - } else { - /* or dispose of the fallback */ - grpc_lb_addresses_destroy(exec_ctx, - glb_policy->fallback_backend_addresses); - glb_policy->fallback_backend_addresses = NULL; - if (glb_policy->fallback_timer_active) { - grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer); - glb_policy->fallback_timer_active = false; - } } /* and update the copy in the glb_lb_policy instance. This * serverlist instance will be destroyed either upon the next @@ -1705,7 +1535,9 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, } } else { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Received empty server list, ignoring."); + gpr_log(GPR_INFO, + "Received empty server list. Picks will stay pending until " + "a response with > 0 servers is received"); } grpc_grpclb_destroy_serverlist(serverlist); } @@ -1752,26 +1584,6 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer"); } -static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - glb_lb_policy *glb_policy = arg; - /* If we receive a serverlist after the timer fires but before this callback - * actually runs, don't do anything. */ - if (glb_policy->serverlist != NULL) return; - glb_policy->fallback_timer_active = false; - if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Falling back to use backends from resolver (grpclb %p)", - (void *)glb_policy); - } - GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); - rr_handover_locked(exec_ctx, glb_policy); - } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "grpclb_fallback_timer"); -} - static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)arg; @@ -1892,17 +1704,6 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, &glb_policy->lb_channel_connectivity, &glb_policy->lb_channel_on_connectivity_changed, NULL); } - - // Propagate update to fallback_backend_addresses if a non-empty serverlist - // hasn't been received from the balancer. - if (glb_policy->serverlist == NULL) { - grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); - glb_policy->fallback_backend_addresses = - extract_backend_addresses_locked(exec_ctx, addresses); - if (glb_policy->rr_policy != NULL) { - rr_handover_locked(exec_ctx, glb_policy); - } - } } // Invoked as part of the update process. It continues watching the LB channel @@ -1982,6 +1783,90 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_notify_on_state_change_locked, glb_update_locked}; +static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { + /* Count the number of gRPC-LB addresses. There must be at least one. + * TODO(roth): For now, we ignore non-balancer addresses, but in the + * future, we may change the behavior such that we fall back to using + * the non-balancer addresses if we cannot reach any balancers. In the + * fallback case, we should use the LB policy indicated by + * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is + * unset, we should default to pick_first). */ + const grpc_arg *arg = + grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + return NULL; + } + grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; + size_t num_grpclb_addrs = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + } + if (num_grpclb_addrs == 0) return NULL; + + glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy)); + + /* Get server name. */ + arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); + GPR_ASSERT(arg != NULL); + GPR_ASSERT(arg->type == GRPC_ARG_STRING); + grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true); + GPR_ASSERT(uri->path[0] != '\0'); + glb_policy->server_name = + gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.", + glb_policy->server_name); + } + grpc_uri_destroy(uri); + + glb_policy->cc_factory = args->client_channel_factory; + GPR_ASSERT(glb_policy->cc_factory != NULL); + + arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); + glb_policy->lb_call_timeout_ms = + grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); + + // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, + // since we use this to trigger the client_load_reporting filter. + grpc_arg new_arg = + grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb"); + static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; + glb_policy->args = grpc_channel_args_copy_and_add_and_remove( + args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); + + /* Create a client channel over them to communicate with a LB service */ + glb_policy->response_generator = + grpc_fake_resolver_response_generator_create(); + grpc_channel_args *lb_channel_args = build_lb_channel_args( + exec_ctx, addresses, glb_policy->response_generator, args->args); + char *uri_str; + gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name); + glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel( + exec_ctx, uri_str, args->client_channel_factory, lb_channel_args); + + /* Propagate initial resolution */ + grpc_fake_resolver_response_generator_set_response( + exec_ctx, glb_policy->response_generator, lb_channel_args); + grpc_channel_args_destroy(exec_ctx, lb_channel_args); + gpr_free(uri_str); + if (glb_policy->lb_channel == NULL) { + gpr_free((void *)glb_policy->server_name); + grpc_channel_args_destroy(exec_ctx, glb_policy->args); + gpr_free(glb_policy); + return NULL; + } + grpc_subchannel_index_ref(); + GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, + glb_lb_channel_on_connectivity_changed_cb, glb_policy, + grpc_combiner_scheduler(args->combiner)); + grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); + grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, + "grpclb"); + return &glb_policy->base; +} + static void glb_factory_ref(grpc_lb_policy_factory *factory) {} static void glb_factory_unref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index 407bd18adb..8ef6dfc6f4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -148,7 +148,8 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { void grpc_grpclb_request_destroy(grpc_grpclb_request *request) { if (request->has_client_stats) { grpc_grpclb_dropped_call_counts *drop_entries = - request->client_stats.calls_finished_with_drop.arg; + (grpc_grpclb_dropped_call_counts *) + request->client_stats.calls_finished_with_drop.arg; grpc_grpclb_dropped_call_counts_destroy(drop_entries); } gpr_free(request); @@ -170,7 +171,8 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( if (!res.has_initial_response) return NULL; grpc_grpclb_initial_response *initial_res = - gpr_malloc(sizeof(grpc_grpclb_initial_response)); + (grpc_grpclb_initial_response *)gpr_malloc( + sizeof(grpc_grpclb_initial_response)); memcpy(initial_res, &res.initial_response, sizeof(grpc_grpclb_initial_response)); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index fab3073eb9..d20cbb8388 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -89,6 +89,7 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { "picked_first_destroy"); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + grpc_subchannel_index_unref(); if (p->pending_update_args != NULL) { grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); gpr_free(p->pending_update_args); @@ -330,8 +331,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", (void *)p, (unsigned long)addresses->num_addresses); } - grpc_subchannel_args *sc_args = - gpr_zalloc(sizeof(*sc_args) * addresses->num_addresses); + grpc_subchannel_args *sc_args = (grpc_subchannel_args *)gpr_zalloc( + sizeof(*sc_args) * addresses->num_addresses); /* We remove the following keys in order for subchannel keys belonging to * subchannels point to the same address to match. */ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, @@ -403,7 +404,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } /* Create the subchannels for the new subchannel args/addresses. */ grpc_subchannel **new_subchannels = - gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); + (grpc_subchannel **)gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); size_t num_new_subchannels = 0; for (size_t i = 0; i < sc_args_count; i++) { grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( @@ -686,6 +687,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, } pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); + grpc_subchannel_index_ref(); GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p, grpc_combiner_scheduler(args->combiner)); return &p->base; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index be91d3d651..8ac1a46abd 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -30,6 +30,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/combiner.h" @@ -310,6 +311,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { (void *)pol, (void *)pol); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + grpc_subchannel_index_unref(); gpr_free(p); } @@ -890,6 +892,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->client_channel_factory != NULL); round_robin_lb_policy *p = (round_robin_lb_policy *)gpr_zalloc(sizeof(*p)); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); + grpc_subchannel_index_ref(); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); rr_update_locked(exec_ctx, &p->base, args); diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.c b/src/core/ext/filters/client_channel/lb_policy_factory.c index 918bab745c..acf5929746 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.c +++ b/src/core/ext/filters/client_channel/lb_policy_factory.c @@ -56,7 +56,7 @@ grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) { } void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, - const void* address, size_t address_len, + void* address, size_t address_len, bool is_balancer, const char* balancer_name, void* user_data) { GPR_ASSERT(index < addresses->num_addresses); @@ -126,13 +126,14 @@ void grpc_lb_addresses_destroy(grpc_exec_ctx* exec_ctx, } static void* lb_addresses_copy(void* addresses) { - return grpc_lb_addresses_copy(addresses); + return grpc_lb_addresses_copy((grpc_lb_addresses*)addresses); } static void lb_addresses_destroy(grpc_exec_ctx* exec_ctx, void* addresses) { - grpc_lb_addresses_destroy(exec_ctx, addresses); + grpc_lb_addresses_destroy(exec_ctx, (grpc_lb_addresses*)addresses); } static int lb_addresses_cmp(void* addresses1, void* addresses2) { - return grpc_lb_addresses_cmp(addresses1, addresses2); + return grpc_lb_addresses_cmp((grpc_lb_addresses*)addresses1, + (grpc_lb_addresses*)addresses2); } static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = { lb_addresses_copy, lb_addresses_destroy, lb_addresses_cmp}; @@ -149,7 +150,7 @@ grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES); if (lb_addresses_arg == NULL || lb_addresses_arg->type != GRPC_ARG_POINTER) return NULL; - return lb_addresses_arg->value.pointer.p; + return (grpc_lb_addresses*)lb_addresses_arg->value.pointer.p; } void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) { diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index cf0f8cb615..9d9fb143df 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -73,7 +73,7 @@ grpc_lb_addresses *grpc_lb_addresses_copy(const grpc_lb_addresses *addresses); * \a address is a socket address of length \a address_len. * Takes ownership of \a balancer_name. */ void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index, - const void *address, size_t address_len, + void *address, size_t address_len, bool is_balancer, const char *balancer_name, void *user_data); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c index b87a3b7082..371c59b8cf 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c @@ -204,7 +204,7 @@ static char *choose_service_config(char *service_config_choice_json) { int random_pct = rand() % 100; int percentage; if (sscanf(field->value, "%d", &percentage) != 1 || - random_pct > percentage) { + random_pct > percentage || percentage == 0) { service_config_json = NULL; break; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index 9747d39a16..7f1f57259a 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -38,7 +38,7 @@ typedef struct fd_node { /** the owner of this fd node */ grpc_ares_ev_driver *ev_driver; /** the grpc_fd owned by this fd node */ - grpc_fd *grpc_fd; + grpc_fd *fd; /** a closure wrapping on_readable_cb, which should be invoked when the grpc_fd in this node becomes readable. */ grpc_closure read_closure; @@ -96,15 +96,15 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver *ev_driver) { } static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) { - gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); gpr_mu_destroy(&fdn->mu); - grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd); + grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->fd); /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ - grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, true /* already_closed */, + grpc_fd_orphan(exec_ctx, fdn->fd, NULL, NULL, true /* already_closed */, "c-ares query finished"); gpr_free(fdn); } @@ -150,9 +150,8 @@ void grpc_ares_ev_driver_shutdown(grpc_exec_ctx *exec_ctx, ev_driver->shutting_down = true; fd_node *fn = ev_driver->fds; while (fn != NULL) { - grpc_fd_shutdown( - exec_ctx, fn->grpc_fd, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("grpc_ares_ev_driver_shutdown")); + grpc_fd_shutdown(exec_ctx, fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "grpc_ares_ev_driver_shutdown")); fn = fn->next; } gpr_mu_unlock(&ev_driver->mu); @@ -165,7 +164,7 @@ static fd_node *pop_fd_node(fd_node **head, int fd) { dummy_head.next = *head; fd_node *node = &dummy_head; while (node->next != NULL) { - if (grpc_fd_wrapped_fd(node->next->grpc_fd) == fd) { + if (grpc_fd_wrapped_fd(node->next->fd) == fd) { fd_node *ret = node->next; node->next = node->next->next; *head = dummy_head.next; @@ -184,9 +183,9 @@ static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg, fdn->readable_registered = false; gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->fd)); if (error == GRPC_ERROR_NONE) { - ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->grpc_fd), + ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->fd), ARES_SOCKET_BAD); } else { // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or @@ -211,10 +210,10 @@ static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg, fdn->writable_registered = false; gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->fd)); if (error == GRPC_ERROR_NONE) { ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, - grpc_fd_wrapped_fd(fdn->grpc_fd)); + grpc_fd_wrapped_fd(fdn->fd)); } else { // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // timed out. The pending lookups made on this ev_driver will be cancelled @@ -253,7 +252,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); fdn = (fd_node *)gpr_malloc(sizeof(fd_node)); gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); - fdn->grpc_fd = grpc_fd_create(socks[i], fd_name); + fdn->fd = grpc_fd_create(socks[i], fd_name); fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; @@ -262,8 +261,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn, grpc_schedule_on_exec_ctx); - grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, - fdn->grpc_fd); + grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->fd); gpr_free(fd_name); } fdn->next = new_list; @@ -274,9 +272,8 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, if (ARES_GETSOCK_READABLE(socks_bitmask, i) && !fdn->readable_registered) { grpc_ares_ev_driver_ref(ev_driver); - gpr_log(GPR_DEBUG, "notify read on: %d", - grpc_fd_wrapped_fd(fdn->grpc_fd)); - grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure); + gpr_log(GPR_DEBUG, "notify read on: %d", grpc_fd_wrapped_fd(fdn->fd)); + grpc_fd_notify_on_read(exec_ctx, fdn->fd, &fdn->read_closure); fdn->readable_registered = true; } // Register write_closure if the socket is writable and write_closure @@ -284,9 +281,9 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && !fdn->writable_registered) { gpr_log(GPR_DEBUG, "notify write on: %d", - grpc_fd_wrapped_fd(fdn->grpc_fd)); + grpc_fd_wrapped_fd(fdn->fd)); grpc_ares_ev_driver_ref(ev_driver); - grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure); + grpc_fd_notify_on_write(exec_ctx, fdn->fd, &fdn->write_closure); fdn->writable_registered = true; } gpr_mu_unlock(&fdn->mu); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c index 2e2b411ab8..0ffb38518a 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -123,8 +123,8 @@ static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx, static grpc_ares_hostbyname_request *create_hostbyname_request( grpc_ares_request *parent_request, char *host, uint16_t port, bool is_balancer) { - grpc_ares_hostbyname_request *hr = - gpr_zalloc(sizeof(grpc_ares_hostbyname_request)); + grpc_ares_hostbyname_request *hr = (grpc_ares_hostbyname_request *)gpr_zalloc( + sizeof(grpc_ares_hostbyname_request)); hr->parent_request = parent_request; hr->host = gpr_strdup(host); hr->port = port; @@ -527,7 +527,8 @@ static void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_resolved_addresses **addrs) { grpc_resolve_address_ares_request *r = - gpr_zalloc(sizeof(grpc_resolve_address_ares_request)); + (grpc_resolve_address_ares_request *)gpr_zalloc( + sizeof(grpc_resolve_address_ares_request)); r->addrs_out = addrs; r->on_resolve_address_done = on_done; GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, diff --git a/src/core/ext/filters/client_channel/retry_throttle.c b/src/core/ext/filters/client_channel/retry_throttle.c index 6cd6654b6f..09dcade089 100644 --- a/src/core/ext/filters/client_channel/retry_throttle.c +++ b/src/core/ext/filters/client_channel/retry_throttle.c @@ -99,7 +99,7 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( int max_milli_tokens, int milli_token_ratio, grpc_server_retry_throttle_data* old_throttle_data) { grpc_server_retry_throttle_data* throttle_data = - gpr_malloc(sizeof(*throttle_data)); + (grpc_server_retry_throttle_data*)gpr_malloc(sizeof(*throttle_data)); memset(throttle_data, 0, sizeof(*throttle_data)); gpr_ref_init(&throttle_data->refs, 1); throttle_data->max_milli_tokens = max_milli_tokens; @@ -131,11 +131,11 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( // static void* copy_server_name(void* key, void* unused) { - return gpr_strdup(key); + return gpr_strdup((const char*)key); } static long compare_server_name(void* key1, void* key2, void* unused) { - return strcmp(key1, key2); + return strcmp((const char*)key1, (const char*)key2); } static void destroy_server_retry_throttle_data(void* value, void* unused) { @@ -177,7 +177,8 @@ grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( const char* server_name, int max_milli_tokens, int milli_token_ratio) { gpr_mu_lock(&g_mu); grpc_server_retry_throttle_data* throttle_data = - gpr_avl_get(g_avl, (char*)server_name, NULL); + (grpc_server_retry_throttle_data*)gpr_avl_get(g_avl, (char*)server_name, + NULL); if (throttle_data == NULL) { // Entry not found. Create a new one. throttle_data = grpc_server_retry_throttle_data_create( diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index 05c55aaa89..bc9c3cc782 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -32,6 +32,7 @@ #include "src/core/ext/filters/client_channel/uri_parser.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" @@ -290,6 +291,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, return c; } + GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx); c = (grpc_subchannel *)gpr_zalloc(sizeof(*c)); c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c index f57b631c41..d7a51f3899 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.c +++ b/src/core/ext/filters/client_channel/subchannel_index.c @@ -34,6 +34,8 @@ static gpr_avl g_subchannel_index; static gpr_mu g_mu; +static gpr_refcount g_refcount; + struct grpc_subchannel_key { grpc_subchannel_args args; }; @@ -88,24 +90,26 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, static void sck_avl_destroy(void *p, void *user_data) { grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; - grpc_subchannel_key_destroy(exec_ctx, p); + grpc_subchannel_key_destroy(exec_ctx, (grpc_subchannel_key *)p); } static void *sck_avl_copy(void *p, void *unused) { - return subchannel_key_copy(p); + return subchannel_key_copy((grpc_subchannel_key *)p); } static long sck_avl_compare(void *a, void *b, void *unused) { - return grpc_subchannel_key_compare(a, b); + return grpc_subchannel_key_compare((grpc_subchannel_key *)a, + (grpc_subchannel_key *)b); } static void scv_avl_destroy(void *p, void *user_data) { grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, p, "subchannel_index"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, (grpc_subchannel *)p, + "subchannel_index"); } static void *scv_avl_copy(void *p, void *unused) { - GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index"); + GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel *)p, "subchannel_index"); return p; } @@ -119,15 +123,27 @@ static const gpr_avl_vtable subchannel_avl_vtable = { void grpc_subchannel_index_init(void) { g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable); gpr_mu_init(&g_mu); + gpr_ref_init(&g_refcount, 1); } void grpc_subchannel_index_shutdown(void) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_mu_destroy(&g_mu); - gpr_avl_unref(g_subchannel_index, &exec_ctx); - grpc_exec_ctx_finish(&exec_ctx); + // TODO(juanlishen): This refcounting mechanism may lead to memory leackage. + // To solve that, we should force polling to flush any pending callbacks, then + // shutdown safely. + grpc_subchannel_index_unref(); +} + +void grpc_subchannel_index_unref(void) { + if (gpr_unref(&g_refcount)) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_subchannel_index, &exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); + } } +void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); } + grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key) { // Lock, and take a reference to the subchannel index. diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h index 98d882a453..92e36d5283 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.h +++ b/src/core/ext/filters/client_channel/subchannel_index.h @@ -59,6 +59,13 @@ void grpc_subchannel_index_init(void); /** Shutdown the subchannel index (global) */ void grpc_subchannel_index_shutdown(void); +/** Increment the refcount (non-zero) of subchannel index (global). */ +void grpc_subchannel_index_ref(void); + +/** Decrement the refcount of subchannel index (global). If the refcount drops + to zero, unref the subchannel index and destroy its mutex. */ +void grpc_subchannel_index_unref(void); + /** \em TEST ONLY. * If \a force_creation is true, all key comparisons will be false, resulting in * new subchannels always being created. Otherwise, the keys will be compared as diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c index 554a7f530d..03958136b4 100644 --- a/src/core/ext/filters/http/server/http_server_filter.c +++ b/src/core/ext/filters/http/server/http_server_filter.c @@ -83,12 +83,12 @@ static grpc_error *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx, } static void add_error(const char *error_name, grpc_error **cumulative, - grpc_error *new) { - if (new == GRPC_ERROR_NONE) return; + grpc_error *new_err) { + if (new_err == GRPC_ERROR_NONE) return; if (*cumulative == GRPC_ERROR_NONE) { *cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name); } - *cumulative = grpc_error_add_child(*cumulative, new); + *cumulative = grpc_error_add_child(*cumulative, new_err); } static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c index 450f67746f..0ac803ed41 100644 --- a/src/core/ext/filters/max_age/max_age_filter.c +++ b/src/core/ext/filters/max_age/max_age_filter.c @@ -402,7 +402,7 @@ static bool maybe_add_max_age_filter(grpc_exec_ctx* exec_ctx, bool enable = grpc_channel_arg_get_integer( grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS), - MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX && + MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX || grpc_channel_arg_get_integer( grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS), MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX; diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index 0ec9353c04..202bcd47f5 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -161,7 +161,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error)); } gpr_mu_unlock(&c->mu); - chttp2_connector_unref(exec_ctx, arg); + chttp2_connector_unref(exec_ctx, (grpc_connector *)arg); } else { GPR_ASSERT(c->endpoint != NULL); start_handshake_locked(exec_ctx, c); diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index d7add0538b..f5a409a403 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -52,7 +52,7 @@ typedef struct { } server_state; typedef struct { - server_state *server_state; + server_state *svr_state; grpc_pollset *accepting_pollset; grpc_tcp_server_acceptor *acceptor; grpc_handshake_manager *handshake_mgr; @@ -63,8 +63,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_handshaker_args *args = (grpc_handshaker_args *)arg; server_connection_state *connection_state = (server_connection_state *)args->user_data; - gpr_mu_lock(&connection_state->server_state->mu); - if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) { + gpr_mu_lock(&connection_state->svr_state->mu); + if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) { const char *error_str = grpc_error_string(error); gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str); @@ -89,7 +89,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport *transport = grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); grpc_server_setup_transport( - exec_ctx, connection_state->server_state->server, transport, + exec_ctx, connection_state->svr_state->server, transport, connection_state->accepting_pollset, args->args); grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer); @@ -97,11 +97,11 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } } grpc_handshake_manager_pending_list_remove( - &connection_state->server_state->pending_handshake_mgrs, + &connection_state->svr_state->pending_handshake_mgrs, connection_state->handshake_mgr); - gpr_mu_unlock(&connection_state->server_state->mu); + gpr_mu_unlock(&connection_state->svr_state->mu); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); + grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server); gpr_free(connection_state->acceptor); gpr_free(connection_state); } @@ -124,8 +124,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, gpr_mu_unlock(&state->mu); grpc_tcp_server_ref(state->tcp_server); server_connection_state *connection_state = - gpr_malloc(sizeof(*connection_state)); - connection_state->server_state = state; + (server_connection_state *)gpr_malloc(sizeof(*connection_state)); + connection_state->svr_state = state; connection_state->accepting_pollset = accepting_pollset; connection_state->acceptor = acceptor; connection_state->handshake_mgr = handshake_mgr; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 3fd701fe2f..79a9ed827f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -144,10 +144,11 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); -static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type, - grpc_closure *on_initiate, - grpc_closure *on_complete); +static void send_ping_locked( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate, + grpc_closure *on_complete, + grpc_chttp2_initiate_write_reason initiate_write_reason); static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); @@ -346,7 +347,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( GRPC_CHTTP2_CLIENT_CONNECT_STRING)); - grpc_chttp2_initiate_write(exec_ctx, t, "initial_write"); } /* configure http2 the way we like it */ @@ -578,7 +578,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; } - grpc_chttp2_initiate_write(exec_ctx, t, "init"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); post_benign_reclaimer(exec_ctx, t); } @@ -846,13 +847,91 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } +static void inc_initiate_write_reason( + grpc_exec_ctx *exec_ctx, grpc_chttp2_initiate_write_reason reason) { + switch (reason) { + case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(exec_ctx); + break; + } +} + void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, const char *reason) { + grpc_chttp2_transport *t, + grpc_chttp2_initiate_write_reason reason) { GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0); switch (t->write_state) { case GRPC_CHTTP2_WRITE_STATE_IDLE: - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason); + inc_initiate_write_reason(exec_ctx, reason); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, + grpc_chttp2_initiate_write_reason_string(reason)); t->is_first_write_in_batch = true; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CLOSURE_SCHED( @@ -864,7 +943,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, break; case GRPC_CHTTP2_WRITE_STATE_WRITING: set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, - reason); + grpc_chttp2_initiate_write_reason_string(reason)); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: break; @@ -872,16 +951,12 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("grpc_chttp2_initiate_write", 0); } -void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - bool also_initiate_write, const char *reason) { +void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); } - if (also_initiate_write) { - grpc_chttp2_initiate_write(exec_ctx, t, reason); - } } static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t, @@ -1105,7 +1180,9 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); post_destructive_reclaimer(exec_ctx, t); - grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM); } /* cancel out streams that will never be started */ while (t->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -1202,7 +1279,9 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length > t->write_buffer_size)) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE); } } @@ -1404,14 +1483,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } else { GPR_ASSERT(s->id != 0); - bool initiate_write = true; - if (op->send_message && - (op->payload->send_message.send_message->flags & - GRPC_WRITE_BUFFER_HINT)) { - initiate_write = false; + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + if (!(op->send_message && + (op->payload->send_message.send_message->flags & + GRPC_WRITE_BUFFER_HINT))) { + grpc_chttp2_initiate_write( + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); } - grpc_chttp2_become_writable(exec_ctx, t, s, initiate_write, - "op.send_initial_metadata"); } } else { s->send_initial_metadata = NULL; @@ -1519,8 +1597,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else if (s->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_become_writable(exec_ctx, t, s, true, - "op.send_trailing_metadata"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write( + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA); } } } @@ -1632,15 +1711,17 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } -static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type, - grpc_closure *on_initiate, grpc_closure *on_ack) { +static void send_ping_locked( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate, + grpc_closure *on_ack, + grpc_chttp2_initiate_write_reason initiate_write_reason) { grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type]; grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate, GRPC_ERROR_NONE); if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, GRPC_ERROR_NONE)) { - grpc_chttp2_initiate_write(exec_ctx, t, "send_ping"); + grpc_chttp2_initiate_write(exec_ctx, t, initiate_write_reason); } } @@ -1648,7 +1729,8 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; t->ping_state.is_delayed_ping_timer_set = false; - grpc_chttp2_initiate_write(exec_ctx, t, "retry_send_ping"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); } void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1663,7 +1745,8 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { - grpc_chttp2_initiate_write(exec_ctx, t, "continue_pings"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); } } @@ -1676,7 +1759,8 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &slice, &http_error); grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error, grpc_slice_ref_internal(slice), &t->qbuf); - grpc_chttp2_initiate_write(exec_ctx, t, "goaway_sent"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); GRPC_ERROR_UNREF(error); } @@ -1723,7 +1807,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, if (op->send_ping) { send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, NULL, - op->send_ping); + op->send_ping, + GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); } if (op->on_connectivity_state_change != NULL) { @@ -1968,7 +2053,8 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error, &s->stats.outgoing)); - grpc_chttp2_initiate_write(exec_ctx, t, "rst_stream"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); } } if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) { @@ -2289,7 +2375,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &s->stats.outgoing)); grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error); - grpc_chttp2_initiate_write(exec_ctx, t, "close_from_api"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API); } typedef struct { @@ -2324,19 +2411,20 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: break; case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_become_writable(exec_ctx, t, s, true, - "immediate stream flowctl"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write( + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL); break; case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: - grpc_chttp2_become_writable(exec_ctx, t, s, false, - "queue stream flowctl"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); break; } switch (action.send_transport_update) { case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: break; case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_initiate_write(exec_ctx, t, "immediate transport flowctl"); + grpc_chttp2_initiate_write( + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL); break; // this is the same as no action b/c every time the transport enters the // writing path it will maybe do an update @@ -2354,7 +2442,8 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, (uint32_t)action.max_frame_size); } if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) { - grpc_chttp2_initiate_write(exec_ctx, t, "immediate setting update"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS); } } if (action.need_ping) { @@ -2362,7 +2451,8 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator); send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, - &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); + &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked, + GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING); } } @@ -2441,7 +2531,10 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (t->flow_control.initial_window_update > 0) { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "unstalled"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write( + exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); } } t->flow_control.initial_window_update = 0; @@ -2556,7 +2649,8 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, &t->start_keepalive_ping_locked, - &t->finish_keepalive_ping_locked); + &t->finish_keepalive_ping_locked, + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); } else { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init( @@ -2912,7 +3006,8 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, uint32_t frame_size, uint32_t flags) { grpc_chttp2_incoming_byte_stream *incoming_byte_stream = - gpr_malloc(sizeof(*incoming_byte_stream)); + (grpc_chttp2_incoming_byte_stream *)gpr_malloc( + sizeof(*incoming_byte_stream)); incoming_byte_stream->base.length = frame_size; incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->base.flags = flags; @@ -3016,6 +3111,56 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, /******************************************************************************* * MONITORING */ + +const char *grpc_chttp2_initiate_write_reason_string( + grpc_chttp2_initiate_write_reason reason) { + switch (reason) { + case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE: + return "INITIAL_WRITE"; + case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM: + return "START_NEW_STREAM"; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE: + return "SEND_MESSAGE"; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA: + return "SEND_INITIAL_METADATA"; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA: + return "SEND_TRAILING_METADATA"; + case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING: + return "RETRY_SEND_PING"; + case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS: + return "CONTINUE_PINGS"; + case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT: + return "GOAWAY_SENT"; + case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM: + return "RST_STREAM"; + case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API: + return "CLOSE_FROM_API"; + case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL: + return "STREAM_FLOW_CONTROL"; + case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL: + return "TRANSPORT_FLOW_CONTROL"; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: + return "SEND_SETTINGS"; + case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING: + return "BDP_ESTIMATOR_PING"; + case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: + return "FLOW_CONTROL_UNSTALLED_BY_SETTING"; + case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE: + return "FLOW_CONTROL_UNSTALLED_BY_UPDATE"; + case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING: + return "APPLICATION_PING"; + case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING: + return "KEEPALIVE_PING"; + case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED: + return "TRANSPORT_FLOW_CONTROL_UNSTALLED"; + case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE: + return "PING_RESPONSE"; + case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM: + return "FORCE_RST_STREAM"; + } + GPR_UNREACHABLE_CODE(return "unknown"); +} + static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) { return ((grpc_chttp2_transport *)t)->ep; diff --git a/src/core/ext/transport/chttp2/transport/flow_control.c b/src/core/ext/transport/chttp2/transport/flow_control.c index 0f078e79e9..569a6349d3 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.c +++ b/src/core/ext/transport/chttp2/transport/flow_control.c @@ -60,24 +60,24 @@ static void pretrace(shadow_flow_control* shadow_fc, #define TRACE_PADDING 30 -static char* fmt_int64_diff_str(int64_t old, int64_t new) { +static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) { char* str; - if (old != new) { - gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new); + if (old_val != new_val) { + gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old_val, new_val); } else { - gpr_asprintf(&str, "%" PRId64 "", old); + gpr_asprintf(&str, "%" PRId64 "", old_val); } char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); gpr_free(str); return str_lp; } -static char* fmt_uint32_diff_str(uint32_t old, uint32_t new) { +static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) { char* str; - if (new > 0 && old != new) { - gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old, new); + if (new_val > 0 && old_val != new_val) { + gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old_val, new_val); } else { - gpr_asprintf(&str, "%" PRIu32 "", old); + gpr_asprintf(&str, "%" PRIu32 "", old_val); } char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); gpr_free(str); diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index 582fd7bfaa..81bd02ae70 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -117,7 +117,8 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks)); } t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes; - grpc_chttp2_initiate_write(exec_ctx, t, "ping response"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE); } } } diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index 806100adaa..2995bf7310 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -44,7 +44,8 @@ static uint8_t *fill_header(uint8_t *out, uint32_t length, uint8_t flags) { return out; } -grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, +grpc_slice grpc_chttp2_settings_create(uint32_t *old_settings, + const uint32_t *new_settings, uint32_t force_mask, size_t count) { size_t i; uint32_t n = 0; @@ -52,21 +53,21 @@ grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, uint8_t *p; for (i = 0; i < count; i++) { - n += (new[i] != old[i] || (force_mask & (1u << i)) != 0); + n += (new_settings[i] != old_settings[i] || (force_mask & (1u << i)) != 0); } output = GRPC_SLICE_MALLOC(9 + 6 * n); p = fill_header(GRPC_SLICE_START_PTR(output), 6 * n, 0); for (i = 0; i < count; i++) { - if (new[i] != old[i] || (force_mask & (1u << i)) != 0) { + if (new_settings[i] != old_settings[i] || (force_mask & (1u << i)) != 0) { *p++ = (uint8_t)(grpc_setting_id_to_wire_id[i] >> 8); *p++ = (uint8_t)(grpc_setting_id_to_wire_id[i]); - *p++ = (uint8_t)(new[i] >> 24); - *p++ = (uint8_t)(new[i] >> 16); - *p++ = (uint8_t)(new[i] >> 8); - *p++ = (uint8_t)(new[i]); - old[i] = new[i]; + *p++ = (uint8_t)(new_settings[i] >> 24); + *p++ = (uint8_t)(new_settings[i] >> 16); + *p++ = (uint8_t)(new_settings[i] >> 8); + *p++ = (uint8_t)(new_settings[i]); + old_settings[i] = new_settings[i]; } } diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index c94f7725bf..c9ab8d1b50 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -99,8 +99,10 @@ grpc_error *grpc_chttp2_window_update_parser_parse( grpc_chttp2_flowctl_recv_stream_update( &t->flow_control, &s->flow_control, received_update); if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, - "stream.read_flow_control"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write( + exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE); } } } else { @@ -109,7 +111,9 @@ grpc_error *grpc_chttp2_window_update_parser_parse( received_update); bool is_zero = t->flow_control.remote_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control"); + grpc_chttp2_initiate_write( + exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED); } } } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 82ff2c8e2c..901e7413c0 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1649,7 +1649,8 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); - grpc_chttp2_initiate_write(exec_ctx, t, "force_rst_stream"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, GRPC_ERROR_NONE); } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst"); diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index cf0a9ca920..ba680a89db 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -42,8 +42,9 @@ grpc_error *grpc_chttp2_incoming_metadata_buffer_add( grpc_mdelem elem) { buffer->size += GRPC_MDELEM_LENGTH(elem); return grpc_metadata_batch_add_tail( - exec_ctx, &buffer->batch, - gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem); + exec_ctx, &buffer->batch, (grpc_linked_mdelem *)gpr_arena_alloc( + buffer->arena, sizeof(grpc_linked_mdelem)), + elem); } grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 0fbedd1e56..c2dfce7c9c 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -79,6 +79,33 @@ typedef enum { GRPC_CHTTP2_PCL_COUNT /* must be last */ } grpc_chttp2_ping_closure_list; +typedef enum { + GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE, + GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM, + GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE, + GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA, + GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA, + GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING, + GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS, + GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT, + GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM, + GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API, + GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, + GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, + GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE, + GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING, + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING, + GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED, + GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE, + GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM, +} grpc_chttp2_initiate_write_reason; + +const char *grpc_chttp2_initiate_write_reason_string( + grpc_chttp2_initiate_write_reason reason); + typedef struct { grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT]; uint64_t inflight_id; @@ -599,7 +626,8 @@ struct grpc_chttp2_stream { The actual call chain is documented in the implementation of this function. */ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, const char *reason); + grpc_chttp2_transport *t, + grpc_chttp2_initiate_write_reason reason); typedef struct { /** are we writing? */ @@ -851,10 +879,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, /** add a ref to the stream and add it to the writable list; ref will be dropped in writing.c */ -void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - bool also_initiate_write, const char *reason); +void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s); void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, diff --git a/src/core/ext/transport/chttp2/transport/stream_map.c b/src/core/ext/transport/chttp2/transport/stream_map.c index 650090d8f0..d6079a9a33 100644 --- a/src/core/ext/transport/chttp2/transport/stream_map.c +++ b/src/core/ext/transport/chttp2/transport/stream_map.c @@ -72,8 +72,10 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, uint32_t key, /* resize when less than 25% of the table is free, because compaction won't help much */ map->capacity = capacity = 3 * capacity / 2; - map->keys = keys = gpr_realloc(keys, capacity * sizeof(uint32_t)); - map->values = values = gpr_realloc(values, capacity * sizeof(void *)); + map->keys = keys = + (uint32_t *)gpr_realloc(keys, capacity * sizeof(uint32_t)); + map->values = values = + (void **)gpr_realloc(values, capacity * sizeof(void *)); } } diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 3fa38db4db..3ded801985 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -201,9 +201,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (t->flow_control.remote_window > 0) { while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) { - if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) && - stream_ref_if_not_destroyed(&s->refcount->refs)) { - grpc_chttp2_initiate_write(exec_ctx, t, "transport.read_flow_control"); + if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { + stream_ref_if_not_destroyed(&s->refcount->refs); } } } diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c index 036853a53b..cd3e76a0b5 100644 --- a/src/core/ext/transport/inproc/inproc_transport.c +++ b/src/core/ext/transport/inproc/inproc_transport.c @@ -37,7 +37,6 @@ if (GRPC_TRACER_ON(grpc_inproc_trace)) gpr_log(__VA_ARGS__); \ } while (0) -static const grpc_transport_vtable inproc_vtable; static grpc_slice g_empty_slice; static grpc_slice g_fake_path_key; static grpc_slice g_fake_path_value; @@ -1167,6 +1166,55 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { } /******************************************************************************* + * INTEGRATION GLUE + */ + +static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset *pollset) { + // Nothing to do here +} + +static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset_set *pollset_set) { + // Nothing to do here +} + +static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) { + return NULL; +} + +/******************************************************************************* + * GLOBAL INIT AND DESTROY + */ +static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} + +void grpc_inproc_transport_init(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL, + grpc_schedule_on_exec_ctx); + g_empty_slice = grpc_slice_from_static_buffer(NULL, 0); + + grpc_slice key_tmp = grpc_slice_from_static_string(":path"); + g_fake_path_key = grpc_slice_intern(key_tmp); + grpc_slice_unref_internal(&exec_ctx, key_tmp); + + g_fake_path_value = grpc_slice_from_static_string("/"); + + grpc_slice auth_tmp = grpc_slice_from_static_string(":authority"); + g_fake_auth_key = grpc_slice_intern(auth_tmp); + grpc_slice_unref_internal(&exec_ctx, auth_tmp); + + g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); + grpc_exec_ctx_finish(&exec_ctx); +} + +static const grpc_transport_vtable inproc_vtable = { + sizeof(inproc_stream), "inproc", init_stream, + set_pollset, set_pollset_set, perform_stream_op, + perform_transport_op, destroy_stream, destroy_transport, + get_endpoint}; + +/******************************************************************************* * Main inproc transport functions */ static void inproc_transports_create(grpc_exec_ctx *exec_ctx, @@ -1178,7 +1226,7 @@ static void inproc_transports_create(grpc_exec_ctx *exec_ctx, inproc_transport *st = (inproc_transport *)gpr_zalloc(sizeof(*st)); inproc_transport *ct = (inproc_transport *)gpr_zalloc(sizeof(*ct)); // Share one lock between both sides since both sides get affected - st->mu = ct->mu = gpr_malloc(sizeof(*st->mu)); + st->mu = ct->mu = (shared_mu *)gpr_malloc(sizeof(*st->mu)); gpr_mu_init(&st->mu->mu); gpr_ref_init(&st->mu->refs, 2); st->base.vtable = &inproc_vtable; @@ -1240,55 +1288,6 @@ grpc_channel *grpc_inproc_channel_create(grpc_server *server, return channel; } -/******************************************************************************* - * INTEGRATION GLUE - */ - -static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset *pollset) { - // Nothing to do here -} - -static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset_set *pollset_set) { - // Nothing to do here -} - -static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) { - return NULL; -} - -static const grpc_transport_vtable inproc_vtable = { - sizeof(inproc_stream), "inproc", init_stream, - set_pollset, set_pollset_set, perform_stream_op, - perform_transport_op, destroy_stream, destroy_transport, - get_endpoint}; - -/******************************************************************************* - * GLOBAL INIT AND DESTROY - */ -static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} - -void grpc_inproc_transport_init(void) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL, - grpc_schedule_on_exec_ctx); - g_empty_slice = grpc_slice_from_static_buffer(NULL, 0); - - grpc_slice key_tmp = grpc_slice_from_static_string(":path"); - g_fake_path_key = grpc_slice_intern(key_tmp); - grpc_slice_unref_internal(&exec_ctx, key_tmp); - - g_fake_path_value = grpc_slice_from_static_string("/"); - - grpc_slice auth_tmp = grpc_slice_from_static_string(":authority"); - g_fake_auth_key = grpc_slice_intern(auth_tmp); - grpc_slice_unref_internal(&exec_ctx, auth_tmp); - - g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); - grpc_exec_ctx_finish(&exec_ctx); -} - void grpc_inproc_transport_shutdown(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_slice_unref_internal(&exec_ctx, g_empty_slice); diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c index 1a020a15c3..0f3e49522f 100644 --- a/src/core/lib/debug/stats_data.c +++ b/src/core/lib/debug/stats_data.c @@ -25,6 +25,10 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "client_calls_created", "server_calls_created", + "cqs_created", + "client_channels_created", + "client_subchannels_created", + "server_channels_created", "syscall_poll", "syscall_wait", "pollset_kick", @@ -52,6 +56,27 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "http2_writes_offloaded", "http2_writes_continued", "http2_partial_writes", + "http2_initiate_write_due_to_initial_write", + "http2_initiate_write_due_to_start_new_stream", + "http2_initiate_write_due_to_send_message", + "http2_initiate_write_due_to_send_initial_metadata", + "http2_initiate_write_due_to_send_trailing_metadata", + "http2_initiate_write_due_to_retry_send_ping", + "http2_initiate_write_due_to_continue_pings", + "http2_initiate_write_due_to_goaway_sent", + "http2_initiate_write_due_to_rst_stream", + "http2_initiate_write_due_to_close_from_api", + "http2_initiate_write_due_to_stream_flow_control", + "http2_initiate_write_due_to_transport_flow_control", + "http2_initiate_write_due_to_send_settings", + "http2_initiate_write_due_to_bdp_estimator_ping", + "http2_initiate_write_due_to_flow_control_unstalled_by_setting", + "http2_initiate_write_due_to_flow_control_unstalled_by_update", + "http2_initiate_write_due_to_application_ping", + "http2_initiate_write_due_to_keepalive_ping", + "http2_initiate_write_due_to_transport_flow_control_unstalled", + "http2_initiate_write_due_to_ping_response", + "http2_initiate_write_due_to_force_rst_stream", "combiner_locks_initiated", "combiner_locks_scheduled_items", "combiner_locks_scheduled_final_items", @@ -68,6 +93,8 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of client side calls created by this process", "Number of server side calls created by this process", + "Number of completion queues created", "Number of client channels created", + "Number of client subchannels created", "Number of server channels created", "Number of polling syscalls (epoll_wait, poll, etc) made by this process", "Number of sleeping syscalls made by this process", "How many polling wakeups were performed by the process (only valid for " @@ -104,6 +131,30 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "written", "Number of HTTP2 writes that were made knowing there was still more data " "to be written (we cap maximum write size to syscall_write)", + "Number of HTTP2 writes initiated due to 'initial_write'", + "Number of HTTP2 writes initiated due to 'start_new_stream'", + "Number of HTTP2 writes initiated due to 'send_message'", + "Number of HTTP2 writes initiated due to 'send_initial_metadata'", + "Number of HTTP2 writes initiated due to 'send_trailing_metadata'", + "Number of HTTP2 writes initiated due to 'retry_send_ping'", + "Number of HTTP2 writes initiated due to 'continue_pings'", + "Number of HTTP2 writes initiated due to 'goaway_sent'", + "Number of HTTP2 writes initiated due to 'rst_stream'", + "Number of HTTP2 writes initiated due to 'close_from_api'", + "Number of HTTP2 writes initiated due to 'stream_flow_control'", + "Number of HTTP2 writes initiated due to 'transport_flow_control'", + "Number of HTTP2 writes initiated due to 'send_settings'", + "Number of HTTP2 writes initiated due to 'bdp_estimator_ping'", + "Number of HTTP2 writes initiated due to " + "'flow_control_unstalled_by_setting'", + "Number of HTTP2 writes initiated due to " + "'flow_control_unstalled_by_update'", + "Number of HTTP2 writes initiated due to 'application_ping'", + "Number of HTTP2 writes initiated due to 'keepalive_ping'", + "Number of HTTP2 writes initiated due to " + "'transport_flow_control_unstalled'", + "Number of HTTP2 writes initiated due to 'ping_response'", + "Number of HTTP2 writes initiated due to 'force_rst_stream'", "Number of combiner lock entries by process (first items queued to a " "combiner)", "Number of items scheduled against combiner locks", diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index 48f60fc8c9..e7cb903053 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -27,6 +27,10 @@ typedef enum { GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED, GRPC_STATS_COUNTER_SERVER_CALLS_CREATED, + GRPC_STATS_COUNTER_CQS_CREATED, + GRPC_STATS_COUNTER_CLIENT_CHANNELS_CREATED, + GRPC_STATS_COUNTER_CLIENT_SUBCHANNELS_CREATED, + GRPC_STATS_COUNTER_SERVER_CHANNELS_CREATED, GRPC_STATS_COUNTER_SYSCALL_POLL, GRPC_STATS_COUNTER_SYSCALL_WAIT, GRPC_STATS_COUNTER_POLLSET_KICK, @@ -54,6 +58,27 @@ typedef enum { GRPC_STATS_COUNTER_HTTP2_WRITES_OFFLOADED, GRPC_STATS_COUNTER_HTTP2_WRITES_CONTINUED, GRPC_STATS_COUNTER_HTTP2_PARTIAL_WRITES, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM, GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED, GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS, GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS, @@ -121,6 +146,15 @@ typedef enum { GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED) #define GRPC_STATS_INC_SERVER_CALLS_CREATED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SERVER_CALLS_CREATED) +#define GRPC_STATS_INC_CQS_CREATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CQS_CREATED) +#define GRPC_STATS_INC_CLIENT_CHANNELS_CREATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CHANNELS_CREATED) +#define GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CLIENT_SUBCHANNELS_CREATED) +#define GRPC_STATS_INC_SERVER_CHANNELS_CREATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SERVER_CHANNELS_CREATED) #define GRPC_STATS_INC_SYSCALL_POLL(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_POLL) #define GRPC_STATS_INC_SYSCALL_WAIT(exec_ctx) \ @@ -181,6 +215,95 @@ typedef enum { GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_WRITES_CONTINUED) #define GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_PARTIAL_WRITES) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM) #define GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED) diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index b43c466d4f..8ba07a4817 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -24,6 +24,14 @@ max: 262144 buckets: 64 doc: Initial size of the grpc_call arena created at call start +- counter: cqs_created + doc: Number of completion queues created +- counter: client_channels_created + doc: Number of client channels created +- counter: client_subchannels_created + doc: Number of client subchannels created +- counter: server_channels_created + doc: Number of server channels created # polling - counter: syscall_poll doc: Number of polling syscalls (epoll_wait, poll, etc) made by this process @@ -139,6 +147,48 @@ - counter: http2_partial_writes doc: Number of HTTP2 writes that were made knowing there was still more data to be written (we cap maximum write size to syscall_write) +- counter: http2_initiate_write_due_to_initial_write + doc: Number of HTTP2 writes initiated due to 'initial_write' +- counter: http2_initiate_write_due_to_start_new_stream + doc: Number of HTTP2 writes initiated due to 'start_new_stream' +- counter: http2_initiate_write_due_to_send_message + doc: Number of HTTP2 writes initiated due to 'send_message' +- counter: http2_initiate_write_due_to_send_initial_metadata + doc: Number of HTTP2 writes initiated due to 'send_initial_metadata' +- counter: http2_initiate_write_due_to_send_trailing_metadata + doc: Number of HTTP2 writes initiated due to 'send_trailing_metadata' +- counter: http2_initiate_write_due_to_retry_send_ping + doc: Number of HTTP2 writes initiated due to 'retry_send_ping' +- counter: http2_initiate_write_due_to_continue_pings + doc: Number of HTTP2 writes initiated due to 'continue_pings' +- counter: http2_initiate_write_due_to_goaway_sent + doc: Number of HTTP2 writes initiated due to 'goaway_sent' +- counter: http2_initiate_write_due_to_rst_stream + doc: Number of HTTP2 writes initiated due to 'rst_stream' +- counter: http2_initiate_write_due_to_close_from_api + doc: Number of HTTP2 writes initiated due to 'close_from_api' +- counter: http2_initiate_write_due_to_stream_flow_control + doc: Number of HTTP2 writes initiated due to 'stream_flow_control' +- counter: http2_initiate_write_due_to_transport_flow_control + doc: Number of HTTP2 writes initiated due to 'transport_flow_control' +- counter: http2_initiate_write_due_to_send_settings + doc: Number of HTTP2 writes initiated due to 'send_settings' +- counter: http2_initiate_write_due_to_bdp_estimator_ping + doc: Number of HTTP2 writes initiated due to 'bdp_estimator_ping' +- counter: http2_initiate_write_due_to_flow_control_unstalled_by_setting + doc: Number of HTTP2 writes initiated due to 'flow_control_unstalled_by_setting' +- counter: http2_initiate_write_due_to_flow_control_unstalled_by_update + doc: Number of HTTP2 writes initiated due to 'flow_control_unstalled_by_update' +- counter: http2_initiate_write_due_to_application_ping + doc: Number of HTTP2 writes initiated due to 'application_ping' +- counter: http2_initiate_write_due_to_keepalive_ping + doc: Number of HTTP2 writes initiated due to 'keepalive_ping' +- counter: http2_initiate_write_due_to_transport_flow_control_unstalled + doc: Number of HTTP2 writes initiated due to 'transport_flow_control_unstalled' +- counter: http2_initiate_write_due_to_ping_response + doc: Number of HTTP2 writes initiated due to 'ping_response' +- counter: http2_initiate_write_due_to_force_rst_stream + doc: Number of HTTP2 writes initiated due to 'force_rst_stream' # combiner locks - counter: combiner_locks_initiated doc: Number of combiner lock entries by process diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index 4cf53f3f79..01c25cce9f 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -1,5 +1,9 @@ client_calls_created_per_iteration:FLOAT, server_calls_created_per_iteration:FLOAT, +cqs_created_per_iteration:FLOAT, +client_channels_created_per_iteration:FLOAT, +client_subchannels_created_per_iteration:FLOAT, +server_channels_created_per_iteration:FLOAT, syscall_poll_per_iteration:FLOAT, syscall_wait_per_iteration:FLOAT, pollset_kick_per_iteration:FLOAT, @@ -27,6 +31,27 @@ http2_writes_begun_per_iteration:FLOAT, http2_writes_offloaded_per_iteration:FLOAT, http2_writes_continued_per_iteration:FLOAT, http2_partial_writes_per_iteration:FLOAT, +http2_initiate_write_due_to_initial_write_per_iteration:FLOAT, +http2_initiate_write_due_to_start_new_stream_per_iteration:FLOAT, +http2_initiate_write_due_to_send_message_per_iteration:FLOAT, +http2_initiate_write_due_to_send_initial_metadata_per_iteration:FLOAT, +http2_initiate_write_due_to_send_trailing_metadata_per_iteration:FLOAT, +http2_initiate_write_due_to_retry_send_ping_per_iteration:FLOAT, +http2_initiate_write_due_to_continue_pings_per_iteration:FLOAT, +http2_initiate_write_due_to_goaway_sent_per_iteration:FLOAT, +http2_initiate_write_due_to_rst_stream_per_iteration:FLOAT, +http2_initiate_write_due_to_close_from_api_per_iteration:FLOAT, +http2_initiate_write_due_to_stream_flow_control_per_iteration:FLOAT, +http2_initiate_write_due_to_transport_flow_control_per_iteration:FLOAT, +http2_initiate_write_due_to_send_settings_per_iteration:FLOAT, +http2_initiate_write_due_to_bdp_estimator_ping_per_iteration:FLOAT, +http2_initiate_write_due_to_flow_control_unstalled_by_setting_per_iteration:FLOAT, +http2_initiate_write_due_to_flow_control_unstalled_by_update_per_iteration:FLOAT, +http2_initiate_write_due_to_application_ping_per_iteration:FLOAT, +http2_initiate_write_due_to_keepalive_ping_per_iteration:FLOAT, +http2_initiate_write_due_to_transport_flow_control_unstalled_per_iteration:FLOAT, +http2_initiate_write_due_to_ping_response_per_iteration:FLOAT, +http2_initiate_write_due_to_force_rst_stream_per_iteration:FLOAT, combiner_locks_initiated_per_iteration:FLOAT, combiner_locks_scheduled_items_per_iteration:FLOAT, combiner_locks_scheduled_final_items_per_iteration:FLOAT, diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index aa861d3cc1..7058acca50 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -130,9 +130,9 @@ static void fd_global_shutdown(void); * Pollset Declarations */ -typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state_t; +typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state; -static const char *kick_state_string(kick_state_t st) { +static const char *kick_state_string(kick_state st) { switch (st) { case UNKICKED: return "UNKICKED"; @@ -145,7 +145,7 @@ static const char *kick_state_string(kick_state_t st) { } struct grpc_pollset_worker { - kick_state_t kick_state; + kick_state state; int kick_state_mutator; // which line of code last changed kick state bool initialized_cv; grpc_pollset_worker *next; @@ -154,9 +154,9 @@ struct grpc_pollset_worker { grpc_closure_list schedule_on_end_work; }; -#define SET_KICK_STATE(worker, state) \ +#define SET_KICK_STATE(worker, kick_state) \ do { \ - (worker)->kick_state = (state); \ + (worker)->state = (kick_state); \ (worker)->kick_state_mutator = __LINE__; \ } while (false) @@ -510,7 +510,7 @@ static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset_worker *worker = pollset->root_worker; do { GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - switch (worker->kick_state) { + switch (worker->state) { case KICKED: GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); break; @@ -695,7 +695,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_mu_lock(&pollset->mu); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d", - pollset, worker, kick_state_string(worker->kick_state), + pollset, worker, kick_state_string(worker->state), is_reassigning); } if (pollset->seen_inactive) { @@ -715,12 +715,12 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, at this point is if it were "kicked specifically". Since the worker has not added itself to the pollset yet (by calling worker_insert()), it is not visible in the "kick any" path yet */ - if (worker->kick_state == UNKICKED) { + if (worker->state == UNKICKED) { pollset->seen_inactive = false; if (neighborhood->active_root == NULL) { neighborhood->active_root = pollset->next = pollset->prev = pollset; /* Make this the designated poller if there isn't one already */ - if (worker->kick_state == UNKICKED && + if (worker->state == UNKICKED && gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) { SET_KICK_STATE(worker, DESIGNATED_POLLER); } @@ -740,19 +740,19 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker_insert(pollset, worker); pollset->begin_refs--; - if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) { + if (worker->state == UNKICKED && !pollset->kicked_without_poller) { GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker); worker->initialized_cv = true; gpr_cv_init(&worker->cv); - while (worker->kick_state == UNKICKED && !pollset->shutting_down) { + while (worker->state == UNKICKED && !pollset->shutting_down) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d", - pollset, worker, kick_state_string(worker->kick_state), + pollset, worker, kick_state_string(worker->state), pollset->shutting_down); } if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) && - worker->kick_state == UNKICKED) { + worker->state == UNKICKED) { /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker received a kick */ SET_KICK_STATE(worker, KICKED); @@ -765,7 +765,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_log(GPR_ERROR, "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d " "kicked_without_poller: %d", - pollset, worker, kick_state_string(worker->kick_state), + pollset, worker, kick_state_string(worker->state), pollset->shutting_down, pollset->kicked_without_poller); } @@ -785,7 +785,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, } GPR_TIMER_END("begin_worker", 0); - return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down; + return worker->state == DESIGNATED_POLLER && !pollset->shutting_down; } static bool check_neighborhood_for_available_poller( @@ -802,7 +802,7 @@ static bool check_neighborhood_for_available_poller( grpc_pollset_worker *inspect_worker = inspect->root_worker; if (inspect_worker != NULL) { do { - switch (inspect_worker->kick_state) { + switch (inspect_worker->state) { case UNKICKED: if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)inspect_worker)) { @@ -866,7 +866,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure_list_move(&worker->schedule_on_end_work, &exec_ctx->closure_list); if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) { - if (worker->next != worker && worker->next->kick_state == UNKICKED) { + if (worker->next != worker && worker->next->state == UNKICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker); } @@ -1005,14 +1005,14 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_strvec_add(&log, tmp); if (pollset->root_worker != NULL) { gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}", - kick_state_string(pollset->root_worker->kick_state), + kick_state_string(pollset->root_worker->state), pollset->root_worker->next, - kick_state_string(pollset->root_worker->next->kick_state)); + kick_state_string(pollset->root_worker->next->state)); gpr_strvec_add(&log, tmp); } if (specific_worker != NULL) { gpr_asprintf(&tmp, " worker_kick_state=%s", - kick_state_string(specific_worker->kick_state)); + kick_state_string(specific_worker->state)); gpr_strvec_add(&log, tmp); } tmp = gpr_strvec_flatten(&log, NULL); @@ -1033,14 +1033,14 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, goto done; } grpc_pollset_worker *next_worker = root_worker->next; - if (root_worker->kick_state == KICKED) { + if (root_worker->state == KICKED) { GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. already kicked %p", root_worker); } SET_KICK_STATE(root_worker, KICKED); goto done; - } else if (next_worker->kick_state == KICKED) { + } else if (next_worker->state == KICKED) { GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. already kicked %p", next_worker); @@ -1059,7 +1059,7 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, SET_KICK_STATE(root_worker, KICKED); ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd); goto done; - } else if (next_worker->kick_state == UNKICKED) { + } else if (next_worker->state == UNKICKED) { GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. kicked %p", next_worker); @@ -1068,8 +1068,8 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, SET_KICK_STATE(next_worker, KICKED); gpr_cv_signal(&next_worker->cv); goto done; - } else if (next_worker->kick_state == DESIGNATED_POLLER) { - if (root_worker->kick_state != DESIGNATED_POLLER) { + } else if (next_worker->state == DESIGNATED_POLLER) { + if (root_worker->state != DESIGNATED_POLLER) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log( GPR_ERROR, @@ -1094,7 +1094,7 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); - GPR_ASSERT(next_worker->kick_state == KICKED); + GPR_ASSERT(next_worker->state == KICKED); SET_KICK_STATE(next_worker, KICKED); goto done; } @@ -1109,7 +1109,7 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GPR_UNREACHABLE_CODE(goto done); } - if (specific_worker->kick_state == KICKED) { + if (specific_worker->state == KICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. specific worker already kicked"); } diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 4e5bed5016..2a276ca360 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -97,12 +97,12 @@ static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, * pollable Declarations */ -typedef struct pollable_t { +typedef struct pollable { polling_obj po; int epfd; grpc_wakeup_fd wakeup; grpc_pollset_worker *root_worker; -} pollable_t; +} pollable; static const char *polling_obj_type_string(polling_obj_type t) { switch (t) { @@ -122,7 +122,7 @@ static const char *polling_obj_type_string(polling_obj_type t) { return "<invalid>"; } -static char *pollable_desc(pollable_t *p) { +static char *pollable_desc(pollable *p) { char *out; gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d", polling_obj_type_string(p->po.type), p->po.group, p->epfd, @@ -130,19 +130,19 @@ static char *pollable_desc(pollable_t *p) { return out; } -static pollable_t g_empty_pollable; +static pollable g_empty_pollable; -static void pollable_init(pollable_t *p, polling_obj_type type); -static void pollable_destroy(pollable_t *p); +static void pollable_init(pollable *p, polling_obj_type type); +static void pollable_destroy(pollable *p); /* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable_t *p); +static grpc_error *pollable_materialize(pollable *p); /******************************************************************************* * Fd Declarations */ struct grpc_fd { - pollable_t pollable; + pollable pollable_obj; int fd; /* refst format: bit 0 : 1=Active / 0=Orphaned @@ -193,15 +193,15 @@ struct grpc_pollset_worker { pollset_worker_link links[POLLSET_WORKER_LINK_COUNT]; gpr_cv cv; grpc_pollset *pollset; - pollable_t *pollable; + pollable *pollable_obj; }; #define MAX_EPOLL_EVENTS 100 #define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 struct grpc_pollset { - pollable_t pollable; - pollable_t *current_pollable; + pollable pollable_obj; + pollable *current_pollable_obj; int kick_alls_pending; bool kicked_without_poller; grpc_closure *shutdown_closure; @@ -282,7 +282,7 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_fd *fd = (grpc_fd *)arg; /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); - pollable_destroy(&fd->pollable); + pollable_destroy(&fd->pollable_obj); gpr_mu_destroy(&fd->orphaned_mu); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -343,7 +343,7 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd)); } - pollable_init(&new_fd->pollable, PO_FD); + pollable_init(&new_fd->pollable_obj, PO_FD); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; @@ -385,7 +385,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, bool is_fd_closed = already_closed; grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_lock(&fd->pollable.po.mu); + gpr_mu_lock(&fd->pollable_obj.po.mu); gpr_mu_lock(&fd->orphaned_mu); fd->on_done_closure = on_done; @@ -411,7 +411,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->orphaned_mu); - gpr_mu_unlock(&fd->pollable.po.mu); + gpr_mu_unlock(&fd->pollable_obj.po.mu); UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */ GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); @@ -451,13 +451,13 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, * Pollable Definitions */ -static void pollable_init(pollable_t *p, polling_obj_type type) { +static void pollable_init(pollable *p, polling_obj_type type) { po_init(&p->po, type); p->root_worker = NULL; p->epfd = -1; } -static void pollable_destroy(pollable_t *p) { +static void pollable_destroy(pollable *p) { po_destroy(&p->po); if (p->epfd != -1) { close(p->epfd); @@ -466,7 +466,7 @@ static void pollable_destroy(pollable_t *p) { } /* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable_t *p) { +static grpc_error *pollable_materialize(pollable *p) { if (p->epfd == -1) { int new_epfd = epoll_create1(EPOLL_CLOEXEC); if (new_epfd < 0) { @@ -492,7 +492,7 @@ static grpc_error *pollable_materialize(pollable_t *p) { } /* pollable must be materialized */ -static grpc_error *pollable_add_fd(pollable_t *p, grpc_fd *fd) { +static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollable_add_fd"; const int epfd = p->epfd; @@ -557,31 +557,34 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_unused) { grpc_error *error = GRPC_ERROR_NONE; grpc_pollset *pollset = (grpc_pollset *)arg; - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; do { GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (worker->pollable != &pollset->pollable) { - gpr_mu_lock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&worker->pollable_obj->po.mu); } if (worker->initialized_cv && worker != pollset->root_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable, worker->pollable); + pollset, worker, &pollset->pollable_obj, + worker->pollable_obj); } worker->kicked = true; gpr_cv_signal(&worker->cv); } else { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable, worker->pollable); + pollset, worker, &pollset->pollable_obj, + worker->pollable_obj); } - append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup), + append_error(&error, + grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup), "pollset_shutdown"); } - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker->pollable_obj->po.mu); } worker = worker->links[PWL_POLLSET].next; @@ -589,7 +592,7 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, } pollset->kick_alls_pending--; pollset_maybe_finish_shutdown(exec_ctx, pollset); - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); GRPC_LOG_IF_ERROR("kick_all", error); } @@ -600,8 +603,7 @@ static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GRPC_ERROR_NONE); } -static grpc_error *pollset_kick_inner(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, pollable_t *p, +static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, grpc_pollset_worker *specific_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, @@ -666,25 +668,25 @@ static grpc_error *pollset_kick_inner(grpc_exec_ctx *exec_ctx, /* p->po.mu must be held before calling this function */ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { - pollable_t *p = pollset->current_pollable; + pollable *p = pollset->current_pollable_obj; GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (p != &pollset->pollable) { + if (p != &pollset->pollable_obj) { gpr_mu_lock(&p->po.mu); } - grpc_error *error = pollset_kick_inner(exec_ctx, pollset, p, specific_worker); - if (p != &pollset->pollable) { + grpc_error *error = pollset_kick_inner(pollset, p, specific_worker); + if (p != &pollset->pollable_obj) { gpr_mu_unlock(&p->po.mu); } return error; } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - pollable_init(&pollset->pollable, PO_POLLSET); - pollset->current_pollable = &g_empty_pollable; + pollable_init(&pollset->pollable_obj, PO_POLLSET); + pollset->current_pollable_obj = &g_empty_pollable; pollset->kicked_without_poller = false; pollset->shutdown_closure = NULL; pollset->root_worker = NULL; - *mu = &pollset->pollable.po.mu; + *mu = &pollset->pollable_obj.po.mu; } /* Convert a timespec to milliseconds: @@ -732,8 +734,8 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "fd_become_pollable"; - if (append_error(&error, pollable_materialize(&fd->pollable), err_desc)) { - append_error(&error, pollable_add_fd(&fd->pollable, fd), err_desc); + if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) { + append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc); } return error; } @@ -747,8 +749,8 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset_maybe_finish_shutdown(exec_ctx, pollset); } -static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable_t *p) { - return p != &g_empty_pollable && p != &pollset->pollable; +static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { + return p != &g_empty_pollable && p != &pollset->pollable_obj; } static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, @@ -794,9 +796,9 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - pollable_destroy(&pollset->pollable); - if (pollset_is_pollable_fd(pollset, pollset->current_pollable)) { - UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2, + pollable_destroy(&pollset->pollable_obj); + if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) { + UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2, "pollset_pollable"); } GRPC_LOG_IF_ERROR("pollset_process_events", @@ -804,7 +806,7 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable_t *p, gpr_timespec now, + pollable *p, gpr_timespec now, gpr_timespec deadline) { int timeout = poll_deadline_to_millis_timeout(deadline, now); @@ -886,68 +888,69 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->initialized_cv = false; worker->kicked = false; worker->pollset = pollset; - worker->pollable = pollset->current_pollable; + worker->pollable_obj = pollset->current_pollable_obj; - if (pollset_is_pollable_fd(pollset, worker->pollable)) { - REF_BY((grpc_fd *)worker->pollable, 2, "one_poll"); + if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { + REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll"); } worker_insert(&pollset->root_worker, PWL_POLLSET, worker); - if (!worker_insert(&worker->pollable->root_worker, PWL_POLLABLE, worker)) { + if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE, + worker)) { worker->initialized_cv = true; gpr_cv_init(&worker->cv); - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&pollset->pollable.po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&pollset->pollable_obj.po.mu); } if (GRPC_TRACER_ON(grpc_polling_trace) && - worker->pollable->root_worker != worker) { + worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, - worker->pollable, worker, + worker->pollable_obj, worker, poll_deadline_to_millis_timeout(deadline, *now)); } - while (do_poll && worker->pollable->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, deadline)) { + while (do_poll && worker->pollable_obj->root_worker != worker) { + if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, deadline)) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, - worker->pollable, worker); + worker->pollable_obj, worker); } do_poll = false; } else if (worker->kicked) { if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, worker->pollable, - worker); + gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, + worker->pollable_obj, worker); } do_poll = false; } else if (GRPC_TRACER_ON(grpc_polling_trace) && - worker->pollable->root_worker != worker) { + worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset, - worker->pollable, worker); + worker->pollable_obj, worker); } } - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&worker->pollable->po.mu); - gpr_mu_lock(&pollset->pollable.po.mu); - gpr_mu_lock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker->pollable_obj->po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); + gpr_mu_lock(&worker->pollable_obj->po.mu); } *now = gpr_now(now->clock_type); } return do_poll && pollset->shutdown_closure == NULL && - pollset->current_pollable == worker->pollable; + pollset->current_pollable_obj == worker->pollable_obj; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { if (NEW_ROOT == - worker_remove(&worker->pollable->root_worker, PWL_POLLABLE, worker)) { - gpr_cv_signal(&worker->pollable->root_worker->cv); + worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) { + gpr_cv_signal(&worker->pollable_obj->root_worker->cv); } if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } - if (pollset_is_pollable_fd(pollset, worker->pollable)) { - UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable, 2, "one_poll"); + if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { + UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll"); } if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) { pollset_maybe_finish_shutdown(exec_ctx, pollset); @@ -975,41 +978,41 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->kicked_without_poller = false; return GRPC_ERROR_NONE; } - if (pollset->current_pollable != &pollset->pollable) { - gpr_mu_lock(&pollset->current_pollable->po.mu); + if (pollset->current_pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&pollset->current_pollable_obj->po.mu); } if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!pollset->shutdown_closure); - append_error(&error, pollable_materialize(worker.pollable), err_desc); - if (worker.pollable != &pollset->pollable) { - gpr_mu_unlock(&worker.pollable->po.mu); + append_error(&error, pollable_materialize(worker.pollable_obj), err_desc); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker.pollable_obj->po.mu); } - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); if (pollset->event_cursor == pollset->event_count) { - append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, + append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj, now, deadline), err_desc); } append_error(&error, pollset_process_events(exec_ctx, pollset, false), err_desc); - gpr_mu_lock(&pollset->pollable.po.mu); - if (worker.pollable != &pollset->pollable) { - gpr_mu_lock(&worker.pollable->po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&worker.pollable_obj->po.mu); } gpr_tls_set(&g_current_thread_pollset, 0); gpr_tls_set(&g_current_thread_worker, 0); pollset_maybe_finish_shutdown(exec_ctx, pollset); } end_worker(exec_ctx, pollset, &worker, worker_hdl); - if (worker.pollable != &pollset->pollable) { - gpr_mu_unlock(&worker.pollable->po.mu); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker.pollable_obj->po.mu); } if (grpc_exec_ctx_has_work(exec_ctx)) { - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); } return error; } @@ -1026,27 +1029,27 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, bool fd_locked) { static const char *err_desc = "pollset_add_fd"; grpc_error *error = GRPC_ERROR_NONE; - if (pollset->current_pollable == &g_empty_pollable) { + if (pollset->current_pollable_obj == &g_empty_pollable) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from empty to fd", pollset, fd); } - /* empty pollable --> single fd pollable_t */ + /* empty pollable --> single fd pollable */ pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable = &fd->pollable; - if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu); + pollset->current_pollable_obj = &fd->pollable_obj; + if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu); append_error(&error, fd_become_pollable_locked(fd), err_desc); - if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu); + if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu); REF_BY(fd, 2, "pollset_pollable"); - } else if (pollset->current_pollable == &pollset->pollable) { + } else if (pollset->current_pollable_obj == &pollset->pollable_obj) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); } - append_error(&error, pollable_add_fd(pollset->current_pollable, fd), + append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd), err_desc); - } else if (pollset->current_pollable != &fd->pollable) { - grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable; + } else if (pollset->current_pollable_obj != &fd->pollable_obj) { + grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj; if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from fd %p to multipoller", @@ -1058,11 +1061,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read"); grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write"); pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable = &pollset->pollable; - if (append_error(&error, pollable_materialize(&pollset->pollable), + pollset->current_pollable_obj = &pollset->pollable_obj; + if (append_error(&error, pollable_materialize(&pollset->pollable_obj), err_desc)) { - pollable_add_fd(&pollset->pollable, had_fd); - pollable_add_fd(&pollset->pollable, fd); + pollable_add_fd(&pollset->pollable_obj, had_fd); + pollable_add_fd(&pollset->pollable_obj, fd); } GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd, @@ -1074,9 +1077,9 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false); - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); GRPC_LOG_IF_ERROR("pollset_add_fd", error); } @@ -1098,7 +1101,7 @@ static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_fd *fd) { - po_join(exec_ctx, &pss->po, &fd->pollable.po); + po_join(exec_ctx, &pss->po, &fd->pollable_obj.po); } static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, @@ -1106,7 +1109,7 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_pollset *ps) { - po_join(exec_ctx, &pss->po, &ps->pollable.po); + po_join(exec_ctx, &pss->po, &ps->pollable_obj.po); } static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 15968d23ba..b8482b9d3a 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -1543,7 +1543,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { for (i = 0; i < nfds; i++) { fds[i].revents = 0; if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - idx = FD_TO_IDX(fds[i].fd); + idx = GRPC_FD_TO_IDX(fds[i].fd); fd_cvs[i].cv = &pollcv_cv; fd_cvs[i].prev = NULL; fd_cvs[i].next = g_cvfds.cvfds[idx].cvs; @@ -1606,8 +1606,8 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { idx = 0; for (i = 0; i < nfds; i++) { if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - remove_cvn(&g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i])); - if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) { + remove_cvn(&g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i])); + if (g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].is_set) { fds[i].revents = POLLIN; if (res >= 0) res++; } diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c index 5e0b1d1704..268e0175dd 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.c +++ b/src/core/lib/iomgr/wakeup_fd_cv.c @@ -57,7 +57,7 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { g_cvfds.free_fds = g_cvfds.free_fds->next_free; g_cvfds.cvfds[idx].cvs = NULL; g_cvfds.cvfds[idx].is_set = 0; - fd_info->read_fd = IDX_TO_FD(idx); + fd_info->read_fd = GRPC_IDX_TO_FD(idx); fd_info->write_fd = -1; gpr_mu_unlock(&g_cvfds.mu); return GRPC_ERROR_NONE; @@ -66,8 +66,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) { cv_node* cvn; gpr_mu_lock(&g_cvfds.mu); - g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 1; - cvn = g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs; + g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 1; + cvn = g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs; while (cvn) { gpr_cv_signal(cvn->cv); cvn = cvn->next; @@ -78,7 +78,7 @@ static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) { static grpc_error* cv_fd_consume(grpc_wakeup_fd* fd_info) { gpr_mu_lock(&g_cvfds.mu); - g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 0; + g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 0; gpr_mu_unlock(&g_cvfds.mu); return GRPC_ERROR_NONE; } @@ -89,9 +89,9 @@ static void cv_fd_destroy(grpc_wakeup_fd* fd_info) { } gpr_mu_lock(&g_cvfds.mu); // Assert that there are no active pollers - GPR_ASSERT(!g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs); - g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds; - g_cvfds.free_fds = &g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)]; + GPR_ASSERT(!g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs); + g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds; + g_cvfds.free_fds = &g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)]; gpr_mu_unlock(&g_cvfds.mu); } diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index 46e84f5843..dc170ad5b4 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -37,8 +37,8 @@ #include "src/core/lib/iomgr/ev_posix.h" -#define FD_TO_IDX(fd) (-(fd)-1) -#define IDX_TO_FD(idx) (-(idx)-1) +#define GRPC_FD_TO_IDX(fd) (-(fd)-1) +#define GRPC_IDX_TO_FD(idx) (-(idx)-1) typedef struct cv_node { gpr_cv* cv; diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 975d599523..3d19605617 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -137,7 +137,7 @@ static void on_peer_checked_inner(grpc_exec_ctx *exec_ctx, // Create zero-copy frame protector, if implemented. tsi_zero_copy_grpc_protector *zero_copy_protector = NULL; tsi_result result = tsi_handshaker_result_create_zero_copy_grpc_protector( - h->handshaker_result, NULL, &zero_copy_protector); + exec_ctx, h->handshaker_result, NULL, &zero_copy_protector); if (result != TSI_OK && result != TSI_UNIMPLEMENTED) { error = grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING( diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index bb5b57a8bc..135257c1ac 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -135,7 +135,7 @@ typedef struct batch_control { typedef struct { gpr_mu child_list_mu; grpc_call *first_child; -} parent_call_t; +} parent_call; typedef struct { grpc_call *parent; @@ -144,7 +144,7 @@ typedef struct { parent->mu */ grpc_call *sibling_next; grpc_call *sibling_prev; -} child_call_t; +} child_call; #define RECV_NONE ((gpr_atm)0) #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1) @@ -157,8 +157,8 @@ struct grpc_call { grpc_polling_entity pollent; grpc_channel *channel; gpr_timespec start_time; - /* parent_call_t* */ gpr_atm parent_call_atm; - child_call_t *child_call; + /* parent_call* */ gpr_atm parent_call_atm; + child_call *child; /* client or server call */ bool is_client; @@ -304,21 +304,21 @@ void *grpc_call_arena_alloc(grpc_call *call, size_t size) { return gpr_arena_alloc(call->arena, size); } -static parent_call_t *get_or_create_parent_call(grpc_call *call) { - parent_call_t *p = (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm); +static parent_call *get_or_create_parent_call(grpc_call *call) { + parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); if (p == NULL) { - p = (parent_call_t *)gpr_arena_alloc(call->arena, sizeof(*p)); + p = (parent_call *)gpr_arena_alloc(call->arena, sizeof(*p)); gpr_mu_init(&p->child_list_mu); if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) { gpr_mu_destroy(&p->child_list_mu); - p = (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm); + p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); } } return p; } -static parent_call_t *get_parent_call(grpc_call *call) { - return (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm); +static parent_call *get_parent_call(grpc_call *call) { + return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); } grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, @@ -378,24 +378,24 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, bool immediately_cancel = false; - if (args->parent_call != NULL) { - child_call_t *cc = call->child_call = - (child_call_t *)gpr_arena_alloc(arena, sizeof(child_call_t)); - call->child_call->parent = args->parent_call; + if (args->parent != NULL) { + child_call *cc = call->child = + (child_call *)gpr_arena_alloc(arena, sizeof(child_call)); + call->child->parent = args->parent; - GRPC_CALL_INTERNAL_REF(args->parent_call, "child"); + GRPC_CALL_INTERNAL_REF(args->parent, "child"); GPR_ASSERT(call->is_client); - GPR_ASSERT(!args->parent_call->is_client); + GPR_ASSERT(!args->parent->is_client); - parent_call_t *pc = get_or_create_parent_call(args->parent_call); + parent_call *pc = get_or_create_parent_call(args->parent); gpr_mu_lock(&pc->child_list_mu); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( gpr_convert_clock_type(send_deadline, - args->parent_call->send_deadline.clock_type), - args->parent_call->send_deadline); + args->parent->send_deadline.clock_type), + args->parent->send_deadline); } /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with * GRPC_PROPAGATE_STATS_CONTEXT */ @@ -407,9 +407,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, "Census tracing propagation requested " "without Census context propagation")); } - grpc_call_context_set( - call, GRPC_CONTEXT_TRACING, - args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL); + grpc_call_context_set(call, GRPC_CONTEXT_TRACING, + args->parent->context[GRPC_CONTEXT_TRACING].value, + NULL); } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) { add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Census context propagation requested " @@ -417,7 +417,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, } if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; - if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) { + if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) { immediately_cancel = true; } } @@ -427,9 +427,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, cc->sibling_next = cc->sibling_prev = call; } else { cc->sibling_next = pc->first_child; - cc->sibling_prev = pc->first_child->child_call->sibling_prev; - cc->sibling_next->child_call->sibling_prev = - cc->sibling_prev->child_call->sibling_next = call; + cc->sibling_prev = pc->first_child->child->sibling_prev; + cc->sibling_next->child->sibling_prev = + cc->sibling_prev->child->sibling_next = call; } gpr_mu_unlock(&pc->child_list_mu); @@ -534,7 +534,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - parent_call_t *pc = get_parent_call(c); + parent_call *pc = get_parent_call(c); if (pc != NULL) { gpr_mu_destroy(&pc->child_list_mu); } @@ -571,14 +571,14 @@ void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); } void grpc_call_unref(grpc_call *c) { if (!gpr_unref(&c->ext_ref)) return; - child_call_t *cc = c->child_call; + child_call *cc = c->child; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_call_unref", 0); GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c)); if (cc) { - parent_call_t *pc = get_parent_call(cc->parent); + parent_call *pc = get_parent_call(cc->parent); gpr_mu_lock(&pc->child_list_mu); if (c == pc->first_child) { pc->first_child = cc->sibling_next; @@ -586,8 +586,8 @@ void grpc_call_unref(grpc_call *c) { pc->first_child = NULL; } } - cc->sibling_prev->child_call->sibling_next = cc->sibling_next; - cc->sibling_next->child_call->sibling_prev = cc->sibling_prev; + cc->sibling_prev->child->sibling_next = cc->sibling_next; + cc->sibling_next->child->sibling_prev = cc->sibling_prev; gpr_mu_unlock(&pc->child_list_mu); GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child"); } @@ -1310,14 +1310,14 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); - parent_call_t *pc = get_parent_call(call); + parent_call *pc = get_parent_call(call); if (pc != NULL) { grpc_call *child; gpr_mu_lock(&pc->child_list_mu); child = pc->first_child; if (child != NULL) { do { - next_child_call = child->child_call->sibling_next; + next_child_call = child->child->sibling_next; if (child->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child, "propagate_cancel"); cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE, diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index d537637cbb..c680139cf6 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -37,7 +37,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx, typedef struct grpc_call_create_args { grpc_channel *channel; - grpc_call *parent_call; + grpc_call *parent; uint32_t propagation_mask; grpc_completion_queue *cq; diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 34548dac26..48962e5e45 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -27,6 +27,7 @@ #include <grpc/support/string_util.h> #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" @@ -77,6 +78,11 @@ grpc_channel *grpc_channel_create_with_builder( grpc_channel_args *args = grpc_channel_args_copy( grpc_channel_stack_builder_get_channel_arguments(builder)); grpc_channel *channel; + if (channel_stack_type == GRPC_SERVER_CHANNEL) { + GRPC_STATS_INC_SERVER_CHANNELS_CREATED(exec_ctx); + } else { + GRPC_STATS_INC_CLIENT_CHANNELS_CREATED(exec_ctx); + } grpc_error *error = grpc_channel_stack_builder_finish( exec_ctx, builder, sizeof(grpc_channel), 1, destroy_channel, NULL, (void **)&channel); @@ -276,7 +282,7 @@ static grpc_call *grpc_channel_create_call_internal( grpc_call_create_args args; memset(&args, 0, sizeof(args)); args.channel = channel; - args.parent_call = parent_call; + args.parent = parent_call; args.propagation_mask = propagation_mask; args.cq = cq; args.pollset_set_alternative = pollset_set_alternative; diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index bc57c6136b..13fb6cccf8 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -26,6 +26,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" @@ -421,6 +422,10 @@ grpc_completion_queue *grpc_completion_queue_create_internal( const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_STATS_INC_CQS_CREATED(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); + cq = (grpc_completion_queue *)gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + poller_vtable->size()); @@ -576,12 +581,12 @@ static bool atm_inc_if_nonzero(gpr_atm *counter) { } static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); return atm_inc_if_nonzero(&cqd->pending_events); } static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); return atm_inc_if_nonzero(&cqd->pending_events); } @@ -626,7 +631,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, } } - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); storage->tag = tag; @@ -687,7 +692,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); @@ -770,7 +775,7 @@ typedef struct { static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = (cq_is_finished_arg *)arg; grpc_completion_queue *cq = a->cq; - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = @@ -821,7 +826,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; gpr_timespec now; - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -954,7 +959,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, this function */ static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); @@ -965,7 +970,7 @@ static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: * We call cq_finish_shutdown_next() below, that would call pollset shutdown. @@ -995,7 +1000,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, static int add_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -1007,7 +1012,7 @@ static int add_plucker(grpc_completion_queue *cq, void *tag, static void del_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); for (int i = 0; i < cqd->num_pluckers; i++) { if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { cqd->num_pluckers--; @@ -1021,7 +1026,7 @@ static void del_plucker(grpc_completion_queue *cq, void *tag, static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = (cq_is_finished_arg *)arg; grpc_completion_queue *cq = a->cq; - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = @@ -1058,7 +1063,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; gpr_timespec now; - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); @@ -1182,7 +1187,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); @@ -1196,7 +1201,7 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, * merging them is a bit tricky and probably not worth it */ static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown. diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 280315036f..b089da2c54 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -36,6 +36,7 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/resource_quota.h" +#include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/alarm_internal.h" @@ -179,14 +180,16 @@ void grpc_shutdown(void) { GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { - grpc_iomgr_shutdown(&exec_ctx); - gpr_timers_global_destroy(); - grpc_tracer_shutdown(); + grpc_executor_shutdown(&exec_ctx); + grpc_timer_manager_set_threading(false); // shutdown timer_manager thread for (i = g_number_of_plugins; i >= 0; i--) { if (g_all_of_the_plugins[i].destroy != NULL) { g_all_of_the_plugins[i].destroy(); } } + grpc_iomgr_shutdown(&exec_ctx); + gpr_timers_global_destroy(); + grpc_tracer_shutdown(); grpc_mdctx_global_shutdown(&exec_ctx); grpc_handshaker_factory_registry_shutdown(&exec_ctx); grpc_slice_intern_shutdown(); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 13ecc915ec..1d0fd472d0 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -76,7 +76,7 @@ typedef struct requested_call { grpc_call_details *details; } batch; struct { - registered_method *registered_method; + registered_method *method; gpr_timespec *deadline; grpc_byte_buffer **optional_payload; } registered; @@ -145,7 +145,7 @@ struct call_data { uint32_t recv_initial_metadata_flags; grpc_metadata_array initial_metadata; - request_matcher *request_matcher; + request_matcher *matcher; grpc_byte_buffer *payload; grpc_closure got_initial_metadata; @@ -171,7 +171,7 @@ struct registered_method { grpc_server_register_method_payload_handling payload_handling; uint32_t flags; /* one request matcher per method */ - request_matcher request_matcher; + request_matcher matcher; registered_method *next; }; @@ -334,7 +334,7 @@ static void request_matcher_destroy(request_matcher *rm) { static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, grpc_error *error) { - grpc_call_unref(grpc_call_from_top_element(elem)); + grpc_call_unref(grpc_call_from_top_element((grpc_call_element *)elem)); } static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, @@ -387,7 +387,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; if (server->started) { - request_matcher_destroy(&rm->request_matcher); + request_matcher_destroy(&rm->matcher); } gpr_free(rm->method); gpr_free(rm->host); @@ -519,7 +519,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, grpc_call_element *call_elem = (grpc_call_element *)arg; call_data *calld = (call_data *)call_elem->call_data; channel_data *chand = (channel_data *)call_elem->channel_data; - request_matcher *rm = calld->request_matcher; + request_matcher *rm = calld->matcher; grpc_server *server = rm->server; if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) { @@ -583,7 +583,7 @@ static void finish_start_new_rpc( return; } - calld->request_matcher = rm; + calld->matcher = rm; switch (payload_handling) { case GRPC_SRM_PAYLOAD_NONE: @@ -629,7 +629,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { continue; } finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->request_matcher, + &rm->server_registered_method->matcher, rm->server_registered_method->payload_handling); return; } @@ -647,7 +647,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { continue; } finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->request_matcher, + &rm->server_registered_method->matcher, rm->server_registered_method->payload_handling); return; } @@ -668,7 +668,7 @@ static int num_listeners(grpc_server *server) { static void done_shutdown_event(grpc_exec_ctx *exec_ctx, void *server, grpc_cq_completion *completion) { - server_unref(exec_ctx, server); + server_unref(exec_ctx, (grpc_server *)server); } static int num_channels(grpc_server *server) { @@ -691,9 +691,9 @@ static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx, exec_ctx, &server->unregistered_request_matcher); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher, + request_matcher_kill_requests(exec_ctx, server, &rm->matcher, GRPC_ERROR_REF(error)); - request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); + request_matcher_zombify_all_pending_calls(exec_ctx, &rm->matcher); } } GRPC_ERROR_UNREF(error); @@ -1114,7 +1114,7 @@ void grpc_server_start(grpc_server *server) { request_matcher_init(&server->unregistered_request_matcher, (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, + request_matcher_init(&rm->matcher, (size_t)server->max_requested_calls_per_cq, server); } @@ -1267,8 +1267,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* stay locked, and gather up some stuff to do */ GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (server->shutdown_published) { - grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, - NULL, gpr_malloc(sizeof(grpc_cq_completion))); + grpc_cq_end_op( + &exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL, + (grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion))); gpr_mu_unlock(&server->mu_global); goto done; } @@ -1390,7 +1391,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &server->unregistered_request_matcher; break; case REGISTERED_CALL: - rm = &rc->data.registered.registered_method->request_matcher; + rm = &rc->data.registered.method->matcher; break; } server->requested_calls_per_cq[cq_idx][request_id] = *rc; @@ -1519,7 +1520,7 @@ grpc_call_error grpc_server_request_registered_call( rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; rc->call = call; - rc->data.registered.registered_method = rm; + rc->data.registered.method = rm; rc->data.registered.deadline = deadline; rc->initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c index a077052561..54388bdcda 100644 --- a/src/core/lib/transport/metadata_batch.c +++ b/src/core/lib/transport/metadata_batch.c @@ -233,32 +233,32 @@ void grpc_metadata_batch_remove(grpc_exec_ctx *exec_ctx, void grpc_metadata_batch_set_value(grpc_exec_ctx *exec_ctx, grpc_linked_mdelem *storage, grpc_slice value) { - grpc_mdelem old = storage->md; - grpc_mdelem new = grpc_mdelem_from_slices( - exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(old)), value); - storage->md = new; - GRPC_MDELEM_UNREF(exec_ctx, old); + grpc_mdelem old_mdelem = storage->md; + grpc_mdelem new_mdelem = grpc_mdelem_from_slices( + exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(old_mdelem)), value); + storage->md = new_mdelem; + GRPC_MDELEM_UNREF(exec_ctx, old_mdelem); } grpc_error *grpc_metadata_batch_substitute(grpc_exec_ctx *exec_ctx, grpc_metadata_batch *batch, grpc_linked_mdelem *storage, - grpc_mdelem new) { + grpc_mdelem new_mdelem) { assert_valid_callouts(exec_ctx, batch); grpc_error *error = GRPC_ERROR_NONE; - grpc_mdelem old = storage->md; - if (!grpc_slice_eq(GRPC_MDKEY(new), GRPC_MDKEY(old))) { + grpc_mdelem old_mdelem = storage->md; + if (!grpc_slice_eq(GRPC_MDKEY(new_mdelem), GRPC_MDKEY(old_mdelem))) { maybe_unlink_callout(batch, storage); - storage->md = new; + storage->md = new_mdelem; error = maybe_link_callout(batch, storage); if (error != GRPC_ERROR_NONE) { unlink_storage(&batch->list, storage); GRPC_MDELEM_UNREF(exec_ctx, storage->md); } } else { - storage->md = new; + storage->md = new_mdelem; } - GRPC_MDELEM_UNREF(exec_ctx, old); + GRPC_MDELEM_UNREF(exec_ctx, old_mdelem); assert_valid_callouts(exec_ctx, batch); return error; } @@ -302,12 +302,12 @@ grpc_error *grpc_metadata_batch_filter(grpc_exec_ctx *exec_ctx, grpc_error *error = GRPC_ERROR_NONE; while (l) { grpc_linked_mdelem *next = l->next; - grpc_filtered_mdelem new = func(exec_ctx, user_data, l->md); - add_error(&error, new.error, composite_error_string); - if (GRPC_MDISNULL(new.md)) { + grpc_filtered_mdelem new_mdelem = func(exec_ctx, user_data, l->md); + add_error(&error, new_mdelem.error, composite_error_string); + if (GRPC_MDISNULL(new_mdelem.md)) { grpc_metadata_batch_remove(exec_ctx, batch, l); - } else if (new.md.payload != l->md.payload) { - grpc_metadata_batch_substitute(exec_ctx, batch, l, new.md); + } else if (new_mdelem.md.payload != l->md.payload) { + grpc_metadata_batch_substitute(exec_ctx, batch, l, new_mdelem.md); } l = next; } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index caa11a956e..ae705195f3 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -102,8 +102,9 @@ static void slice_stream_unref(grpc_exec_ctx *exec_ctx, void *p) { grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount, void *buffer, size_t length) { slice_stream_ref(&refcount->slice_refcount); - return (grpc_slice){.refcount = &refcount->slice_refcount, - .data.refcounted = {.bytes = buffer, .length = length}}; + return (grpc_slice){ + .refcount = &refcount->slice_refcount, + .data.refcounted = {.bytes = (uint8_t *)buffer, .length = length}}; } static const grpc_slice_refcount_vtable stream_ref_slice_vtable = { diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c index e7b3be3d86..64043fea08 100644 --- a/src/core/tsi/fake_transport_security.c +++ b/src/core/tsi/fake_transport_security.c @@ -493,7 +493,8 @@ static tsi_result fake_handshaker_result_extract_peer( } static tsi_result fake_handshaker_result_create_zero_copy_grpc_protector( - const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + void *exec_ctx, const tsi_handshaker_result *self, + size_t *max_output_protected_frame_size, tsi_zero_copy_grpc_protector **protector) { *protector = tsi_create_fake_zero_copy_grpc_protector(max_output_protected_frame_size); diff --git a/src/core/tsi/transport_security.h b/src/core/tsi/transport_security.h index b0d7039850..3bba38149c 100644 --- a/src/core/tsi/transport_security.h +++ b/src/core/tsi/transport_security.h @@ -84,11 +84,17 @@ struct tsi_handshaker { }; /* Base for tsi_handshaker_result implementations. - See transport_security_interface.h for documentation. */ + See transport_security_interface.h for documentation. + The exec_ctx parameter in create_zero_copy_grpc_protector is supposed to be + of type grpc_exec_ctx*, but we're using void* instead to avoid making the TSI + API depend on grpc. The create_zero_copy_grpc_protector() method is only used + in grpc, where we do need the exec_ctx passed through, but the API still + needs to compile in other applications, where grpc_exec_ctx is not defined. +*/ typedef struct { tsi_result (*extract_peer)(const tsi_handshaker_result *self, tsi_peer *peer); tsi_result (*create_zero_copy_grpc_protector)( - const tsi_handshaker_result *self, + void *exec_ctx, const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, tsi_zero_copy_grpc_protector **protector); tsi_result (*create_frame_protector)(const tsi_handshaker_result *self, diff --git a/src/core/tsi/transport_security_grpc.c b/src/core/tsi/transport_security_grpc.c index 773b35e717..affd995230 100644 --- a/src/core/tsi/transport_security_grpc.c +++ b/src/core/tsi/transport_security_grpc.c @@ -20,16 +20,18 @@ /* This method creates a tsi_zero_copy_grpc_protector object. */ tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector( - const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + grpc_exec_ctx *exec_ctx, const tsi_handshaker_result *self, + size_t *max_output_protected_frame_size, tsi_zero_copy_grpc_protector **protector) { - if (self == NULL || self->vtable == NULL || protector == NULL) { + if (exec_ctx == NULL || self == NULL || self->vtable == NULL || + protector == NULL) { return TSI_INVALID_ARGUMENT; } if (self->vtable->create_zero_copy_grpc_protector == NULL) { return TSI_UNIMPLEMENTED; } return self->vtable->create_zero_copy_grpc_protector( - self, max_output_protected_frame_size, protector); + exec_ctx, self, max_output_protected_frame_size, protector); } /* --- tsi_zero_copy_grpc_protector common implementation. --- diff --git a/src/core/tsi/transport_security_grpc.h b/src/core/tsi/transport_security_grpc.h index 375a758888..ca6755c12f 100644 --- a/src/core/tsi/transport_security_grpc.h +++ b/src/core/tsi/transport_security_grpc.h @@ -30,7 +30,8 @@ extern "C" { assuming there is no fatal error. The caller is responsible for destroying the protector. */ tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector( - const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + grpc_exec_ctx *exec_ctx, const tsi_handshaker_result *self, + size_t *max_output_protected_frame_size, tsi_zero_copy_grpc_protector **protector); /* -- tsi_zero_copy_grpc_protector object -- */ diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 66b1ef0e39..de2e449fe8 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -22,14 +22,29 @@ namespace grpc { +namespace { +std::unique_ptr<GenericClientAsyncReaderWriter> CallInternal( + ChannelInterface* channel, ClientContext* context, + const grpc::string& method, CompletionQueue* cq, bool start, void* tag) { + return std::unique_ptr<GenericClientAsyncReaderWriter>( + GenericClientAsyncReaderWriter::Create( + channel, cq, RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), + context, start, tag)); +} + +} // namespace + // begin a call to a named method std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call( ClientContext* context, const grpc::string& method, CompletionQueue* cq, void* tag) { - return std::unique_ptr<GenericClientAsyncReaderWriter>( - GenericClientAsyncReaderWriter::Create( - channel_.get(), cq, - RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag)); + return CallInternal(channel_.get(), context, method, cq, true, tag); +} + +// setup a call to a named method +std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::PrepareCall( + ClientContext* context, const grpc::string& method, CompletionQueue* cq) { + return CallInternal(channel_.get(), context, method, cq, false, nullptr); } } // namespace grpc diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc index f89f5f1f03..f130aecd4b 100644 --- a/src/cpp/common/channel_arguments.cc +++ b/src/cpp/common/channel_arguments.cc @@ -86,10 +86,6 @@ void ChannelArguments::SetCompressionAlgorithm( SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, algorithm); } -void ChannelArguments::SetGrpclbFallbackTimeout(int fallback_timeout) { - SetInt(GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS, fallback_timeout); -} - void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) { if (!mutator) { return; diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h index 4d90cfd384..df563ca36c 100644 --- a/src/objective-c/GRPCClient/GRPCCall.h +++ b/src/objective-c/GRPCClient/GRPCCall.h @@ -170,6 +170,13 @@ extern id const kGRPCTrailersKey; @property (atomic, copy, readwrite) NSString *serverName; /** + * The timeout for the RPC call in seconds. If set to 0, the call will not timeout. If set to + * positive, the gRPC call returns with status GRPCErrorCodeDeadlineExceeded if it is not completed + * within \a timeout seconds. A negative value is not allowed. + */ +@property NSTimeInterval timeout; + +/** * The container of the request headers of an RPC conforms to this protocol, which is a subset of * NSMutableDictionary's interface. It will become a NSMutableDictionary later on. * The keys of this container are the header names, which per the HTTP standard are case- diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 436c19e354..d6c3a3c165 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -423,7 +423,8 @@ static NSString * const kBearerPrefix = @"Bearer "; _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host serverName:_serverName - path:_path]; + path:_path + timeout:_timeout]; NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?"); [self sendHeaders:_requestHeaders]; diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.h b/src/objective-c/GRPCClient/private/GRPCChannel.h index e2aa5bd036..d37182f754 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannel.h +++ b/src/objective-c/GRPCClient/private/GRPCChannel.h @@ -63,5 +63,6 @@ struct grpc_channel_credentials; - (nullable grpc_call *)unmanagedCallWithPath:(nonnull NSString *)path serverName:(nonnull NSString *)serverName + timeout:(NSTimeInterval)timeout completionQueue:(nonnull GRPCCompletionQueue *)queue; @end diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m index 52dbc70b99..b78b14f2af 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannel.m +++ b/src/objective-c/GRPCClient/private/GRPCChannel.m @@ -182,18 +182,28 @@ static grpc_channel_args *BuildChannelArgs(NSDictionary *dictionary) { - (grpc_call *)unmanagedCallWithPath:(NSString *)path serverName:(NSString *)serverName + timeout:(NSTimeInterval)timeout completionQueue:(GRPCCompletionQueue *)queue { + GPR_ASSERT(timeout >= 0); + if (timeout < 0) { + timeout = 0; + } grpc_slice host_slice; if (serverName) { host_slice = grpc_slice_from_copied_string(serverName.UTF8String); } grpc_slice path_slice = grpc_slice_from_copied_string(path.UTF8String); + gpr_timespec deadline_ms = timeout == 0 ? + gpr_inf_future(GPR_CLOCK_REALTIME) : + gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis((int64_t)(timeout * 1000), GPR_TIMESPAN)); grpc_call *call = grpc_channel_create_call(_unmanagedChannel, NULL, GRPC_PROPAGATE_DEFAULTS, queue.unmanagedQueue, path_slice, serverName ? &host_slice : NULL, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + deadline_ms, NULL); if (serverName) { grpc_slice_unref(host_slice); } diff --git a/src/objective-c/GRPCClient/private/GRPCHost.h b/src/objective-c/GRPCClient/private/GRPCHost.h index 0c1d715240..58171211b0 100644 --- a/src/objective-c/GRPCClient/private/GRPCHost.h +++ b/src/objective-c/GRPCClient/private/GRPCHost.h @@ -55,6 +55,7 @@ struct grpc_channel_credentials; /** Create a grpc_call object to the provided path on this host. */ - (nullable struct grpc_call *)unmanagedCallWithPath:(NSString *)path serverName:(NSString *)serverName + timeout:(NSTimeInterval)timeout completionQueue:(GRPCCompletionQueue *)queue; // TODO: There's a race when a new RPC is coming through just as an existing one is getting diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m index 23794c1fed..f73e9cbc50 100644 --- a/src/objective-c/GRPCClient/private/GRPCHost.m +++ b/src/objective-c/GRPCClient/private/GRPCHost.m @@ -121,6 +121,7 @@ static GRPCConnectivityMonitor *connectivityMonitor = nil; - (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path serverName:(NSString *)serverName + timeout:(NSTimeInterval)timeout completionQueue:(GRPCCompletionQueue *)queue { GRPCChannel *channel; // This is racing -[GRPCHost disconnect]. @@ -130,7 +131,10 @@ static GRPCConnectivityMonitor *connectivityMonitor = nil; } channel = _channel; } - return [channel unmanagedCallWithPath:path serverName:serverName completionQueue:queue]; + return [channel unmanagedCallWithPath:path + serverName:serverName + timeout:timeout + completionQueue:queue]; } - (BOOL)setTLSPEMRootCerts:(nullable NSString *)pemRootCerts diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h index 64075591a3..1cd9da8f3e 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h @@ -76,7 +76,8 @@ - (instancetype)initWithHost:(NSString *)host serverName:(NSString *)serverName - path:(NSString *)path NS_DESIGNATED_INITIALIZER; + path:(NSString *)path + timeout:(NSTimeInterval)timeout NS_DESIGNATED_INITIALIZER; - (void)startBatchWithOperations:(NSArray *)ops errorHandler:(void(^)())errorHandler; diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index 87dc33af88..b0b1223b64 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -238,12 +238,13 @@ } - (instancetype)init { - return [self initWithHost:nil serverName:nil path:nil]; + return [self initWithHost:nil serverName:nil path:nil timeout:0]; } - (instancetype)initWithHost:(NSString *)host serverName:(NSString *)serverName - path:(NSString *)path { + path:(NSString *)path + timeout:(NSTimeInterval)timeout { if (!path || !host) { [NSException raise:NSInvalidArgumentException format:@"path and host cannot be nil."]; @@ -255,7 +256,10 @@ // queue. Currently we use a singleton queue. _queue = [GRPCCompletionQueue completionQueue]; - _call = [[GRPCHost hostWithAddress:host] unmanagedCallWithPath:path serverName:serverName completionQueue:_queue]; + _call = [[GRPCHost hostWithAddress:host] unmanagedCallWithPath:path + serverName:serverName + timeout:timeout + completionQueue:_queue]; if (_call == NULL) { return nil; } diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m index 9afe507121..82ac2600fa 100644 --- a/src/objective-c/tests/GRPCClientTests.m +++ b/src/objective-c/tests/GRPCClientTests.m @@ -28,6 +28,7 @@ #import <RemoteTest/Messages.pbobjc.h> #import <RxLibrary/GRXWriteable.h> #import <RxLibrary/GRXWriter+Immediate.h> +#import <RxLibrary/GRXBufferedPipe.h> #define TEST_TIMEOUT 16 @@ -39,6 +40,7 @@ static NSString * const kRemoteSSLHost = @"grpc-test.sandbox.googleapis.com"; static GRPCProtoMethod *kInexistentMethod; static GRPCProtoMethod *kEmptyCallMethod; static GRPCProtoMethod *kUnaryCallMethod; +static GRPCProtoMethod *kFullDuplexCallMethod; /** Observer class for testing that responseMetadata is KVO-compliant */ @interface PassthroughObserver : NSObject @@ -106,6 +108,9 @@ static GRPCProtoMethod *kUnaryCallMethod; kUnaryCallMethod = [[GRPCProtoMethod alloc] initWithPackage:kPackage service:kService method:@"UnaryCall"]; + kFullDuplexCallMethod = [[GRPCProtoMethod alloc] initWithPackage:kPackage + service:kService + method:@"FullDuplexCall"]; } - (void)testConnectionToRemoteServer { @@ -422,4 +427,26 @@ static GRPCProtoMethod *kUnaryCallMethod; [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; } +- (void)testTimeout { + __weak XCTestExpectation *completion = [self expectationWithDescription:@"RPC completed."]; + + GRXBufferedPipe *pipe = [GRXBufferedPipe pipe]; + GRPCCall *call = [[GRPCCall alloc] initWithHost:kHostAddress + path:kFullDuplexCallMethod.HTTPPath + requestsWriter:pipe]; + + id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { + XCTAssert(0, @"Failure: response received; Expect: no response received."); + } completionHandler:^(NSError *errorOrNil) { + XCTAssertNotNil(errorOrNil, @"Failure: no error received; Expect: receive deadline exceeded."); + XCTAssertEqual(errorOrNil.code, GRPCErrorCodeDeadlineExceeded); + [completion fulfill]; + }]; + + call.timeout = 0.001; + [call startWithWriteable:responsesWriteable]; + + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; +} + @end diff --git a/src/objective-c/tests/run_tests.sh b/src/objective-c/tests/run_tests.sh index 5b7a2d104a..608ae6884b 100755 --- a/src/objective-c/tests/run_tests.sh +++ b/src/objective-c/tests/run_tests.sh @@ -49,7 +49,8 @@ xcodebuild \ HOST_PORT_REMOTE=grpc-test.sandbox.googleapis.com \ test \ | egrep -v "$XCODEBUILD_FILTER" \ - | egrep -v '^$' - + | egrep -v '^$' \ + | egrep -v "(GPBDictionary|GPBArray)" - echo "TIME: $(date)" xcodebuild \ @@ -57,7 +58,8 @@ xcodebuild \ -scheme CoreCronetEnd2EndTests \ -destination name="iPhone 6" \ test \ - | egrep "$XCODEBUILD_FILTER" \ + | egrep -v "$XCODEBUILD_FILTER" \ + | egrep -v '^$' \ | egrep -v "(GPBDictionary|GPBArray)" - echo "TIME: $(date)" @@ -65,7 +67,10 @@ xcodebuild \ -workspace Tests.xcworkspace \ -scheme CronetUnitTests \ -destination name="iPhone 6" \ - test | xcpretty + test \ + | egrep -v "$XCODEBUILD_FILTER" \ + | egrep -v '^$' \ + | egrep -v "(GPBDictionary|GPBArray)" - echo "TIME: $(date)" xcodebuild \ @@ -74,5 +79,6 @@ xcodebuild \ -destination name="iPhone 6" \ HOST_PORT_REMOTE=grpc-test.sandbox.googleapis.com \ test \ - | egrep "$XCODEBUILD_FILTER" \ + | egrep -v "$XCODEBUILD_FILTER" \ + | egrep -v '^$' \ | egrep -v "(GPBDictionary|GPBArray)" - diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 162200112a..93f84572b7 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -67,55 +67,6 @@ class GatherProto(setuptools.Command): open(path, 'a').close() -class BuildProtoModules(setuptools.Command): - """Command to generate project *_pb2.py modules from proto files.""" - - description = 'build protobuf modules' - user_options = [ - ('include=', None, 'path patterns to include in protobuf generation'), - ('exclude=', None, 'path patterns to exclude from protobuf generation') - ] - - def initialize_options(self): - self.exclude = None - self.include = r'.*\.proto$' - - def finalize_options(self): - pass - - def run(self): - import grpc_tools.protoc as protoc - - include_regex = re.compile(self.include) - exclude_regex = re.compile(self.exclude) if self.exclude else None - paths = [] - for walk_root, directories, filenames in os.walk(PROTO_STEM): - for filename in filenames: - path = os.path.join(walk_root, filename) - if include_regex.match(path) and not ( - exclude_regex and exclude_regex.match(path)): - paths.append(path) - - # TODO(kpayson): It would be nice to do this in a batch command, - # but we currently have name conflicts in src/proto - for path in paths: - command = [ - 'grpc_tools.protoc', - '-I {}'.format(PROTO_STEM), - '--python_out={}'.format(PROTO_STEM), - '--grpc_python_out={}'.format(PROTO_STEM), - ] + [path] - if protoc.main(command) != 0: - sys.stderr.write( - 'warning: Command:\n{}\nFailed'.format(command)) - - # Generated proto directories dont include __init__.py, but - # these are needed for python package resolution - for walk_root, _, _ in os.walk(PROTO_STEM): - path = os.path.join(walk_root, '__init__.py') - open(path, 'a').close() - - class BuildPy(build_py.build_py): """Custom project build command.""" diff --git a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/__init__.py b/src/python/grpcio_tests/tests/_sanity/__init__.py index 5772620b60..5772620b60 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/__init__.py +++ b/src/python/grpcio_tests/tests/_sanity/__init__.py diff --git a/src/python/grpcio_tests/tests/unit/_sanity/_sanity_test.py b/src/python/grpcio_tests/tests/_sanity/_sanity_test.py index 19bc8801eb..b4079850ff 100644 --- a/src/python/grpcio_tests/tests/unit/_sanity/_sanity_test.py +++ b/src/python/grpcio_tests/tests/_sanity/_sanity_test.py @@ -21,24 +21,25 @@ import six import tests -class Sanity(unittest.TestCase): +class SanityTest(unittest.TestCase): + + maxDiff = 32768 def testTestsJsonUpToDate(self): """Autodiscovers all test suites and checks that tests.json is up to date""" loader = tests.Loader() loader.loadTestsFromNames(['tests']) - test_suite_names = [ + test_suite_names = sorted({ test_case_class.id().rsplit('.', 1)[0] for test_case_class in tests._loader.iterate_suite_cases( loader.suite) - ] - test_suite_names = sorted(set(test_suite_names)) + }) tests_json_string = pkg_resources.resource_string('tests', 'tests.json') - if six.PY3: - tests_json_string = tests_json_string.decode() - tests_json = json.loads(tests_json_string) - self.assertListEqual(test_suite_names, tests_json) + tests_json = json.loads(tests_json_string.decode() + if six.PY3 else tests_json_string) + + self.assertSequenceEqual(tests_json, test_suite_names) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py index 71493bfec6..5b84001aab 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py @@ -33,7 +33,7 @@ from tests.unit.framework.common import test_constants import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2 import tests.protoc_plugin.protos.requests.r.test_requests_pb2 as request_pb2 import tests.protoc_plugin.protos.responses.test_responses_pb2 as response_pb2 -import tests.protoc_plugin.protos.service.test_service_pb2 as service_pb2 +import tests.protoc_plugin.protos.service.test_service_pb2_grpc as service_pb2_grpc # Identifiers of entities we expect to find in the generated module. STUB_IDENTIFIER = 'TestServiceStub' @@ -138,7 +138,7 @@ def _CreateService(): """ servicer_methods = _ServicerMethods() - class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)): + class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)): def UnaryCall(self, request, context): return servicer_methods.UnaryCall(request, context) @@ -157,11 +157,12 @@ def _CreateService(): server = grpc.server( futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) - getattr(service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), server) + getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), + server) port = server.add_insecure_port('[::]:0') server.start() channel = grpc.insecure_channel('localhost:{}'.format(port)) - stub = getattr(service_pb2, STUB_IDENTIFIER)(channel) + stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel) return _Service(servicer_methods, server, stub) @@ -173,16 +174,17 @@ def _CreateIncompleteService(): servicer_methods implements none of the methods required of it. """ - class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)): + class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)): pass server = grpc.server( futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) - getattr(service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), server) + getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), + server) port = server.add_insecure_port('[::]:0') server.start() channel = grpc.insecure_channel('localhost:{}'.format(port)) - stub = getattr(service_pb2, STUB_IDENTIFIER)(channel) + stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel) return _Service(None, server, stub) @@ -223,10 +225,11 @@ class PythonPluginTest(unittest.TestCase): def testImportAttributes(self): # check that we can access the generated module and its members. - self.assertIsNotNone(getattr(service_pb2, STUB_IDENTIFIER, None)) - self.assertIsNotNone(getattr(service_pb2, SERVICER_IDENTIFIER, None)) + self.assertIsNotNone(getattr(service_pb2_grpc, STUB_IDENTIFIER, None)) self.assertIsNotNone( - getattr(service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER, None)) + getattr(service_pb2_grpc, SERVICER_IDENTIFIER, None)) + self.assertIsNotNone( + getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER, None)) def testUpDown(self): service = _CreateService() diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py index 1aeb62a7c5..7868cdbfb3 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py @@ -12,22 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections +import abc from concurrent import futures import contextlib -import distutils.spawn -import errno import importlib import os -import os.path +from os import path import pkgutil +import platform import shutil -import subprocess import sys import tempfile -import threading import unittest -import platform + +import six import grpc from grpc_tools import protoc @@ -37,292 +35,285 @@ _MESSAGES_IMPORT = b'import "messages.proto";' _SPLIT_NAMESPACE = b'package grpc_protoc_plugin.invocation_testing.split;' _COMMON_NAMESPACE = b'package grpc_protoc_plugin.invocation_testing;' +_RELATIVE_PROTO_PATH = 'relative_proto_path' +_RELATIVE_PYTHON_OUT = 'relative_python_out' + @contextlib.contextmanager -def _system_path(path): +def _system_path(path_insertion): old_system_path = sys.path[:] - sys.path = sys.path[0:1] + path + sys.path[1:] + sys.path = sys.path[0:1] + path_insertion + sys.path[1:] yield sys.path = old_system_path -class DummySplitServicer(object): +# NOTE(nathaniel): https://twitter.com/exoplaneteer/status/677259364256747520 +# Life lesson "just always default to idempotence" reinforced. +def _create_directory_tree(root, path_components_sequence): + created = set() + for path_components in path_components_sequence: + thus_far = '' + for path_component in path_components: + relative_path = path.join(thus_far, path_component) + if relative_path not in created: + os.makedirs(path.join(root, relative_path)) + created.add(relative_path) + thus_far = path.join(thus_far, path_component) + + +def _massage_proto_content(proto_content, test_name_bytes, + messages_proto_relative_file_name_bytes): + package_substitution = (b'package grpc_protoc_plugin.invocation_testing.' + + test_name_bytes + b';') + common_namespace_substituted = proto_content.replace(_COMMON_NAMESPACE, + package_substitution) + split_namespace_substituted = common_namespace_substituted.replace( + _SPLIT_NAMESPACE, package_substitution) + message_import_replaced = split_namespace_substituted.replace( + _MESSAGES_IMPORT, + b'import "' + messages_proto_relative_file_name_bytes + b'";') + return message_import_replaced + + +def _packagify(directory): + for subdirectory, _, _ in os.walk(directory): + init_file_name = path.join(subdirectory, '__init__.py') + with open(init_file_name, 'wb') as init_file: + init_file.write(b'') - def __init__(self, request_class, response_class): - self.request_class = request_class - self.response_class = response_class + +class _Servicer(object): + + def __init__(self, response_class): + self._response_class = response_class def Call(self, request, context): - return self.response_class() + return self._response_class() -class SeparateTestMixin(object): +def _protoc(proto_path, python_out, grpc_python_out_flag, grpc_python_out, + absolute_proto_file_names): + args = [ + '', + '--proto_path={}'.format(proto_path), + ] + if python_out is not None: + args.append('--python_out={}'.format(python_out)) + if grpc_python_out is not None: + args.append('--grpc_python_out={}:{}'.format(grpc_python_out_flag, + grpc_python_out)) + args.extend(absolute_proto_file_names) + return protoc.main(args) - def testImportAttributes(self): - with _system_path([self.python_out_directory]): - pb2 = importlib.import_module(self.pb2_import) - pb2.Request - pb2.Response - if self.should_find_services_in_pb2: - pb2.TestServiceServicer - else: - with self.assertRaises(AttributeError): - pb2.TestServiceServicer - - with _system_path([self.grpc_python_out_directory]): - pb2_grpc = importlib.import_module(self.pb2_grpc_import) - pb2_grpc.TestServiceServicer - with self.assertRaises(AttributeError): - pb2_grpc.Request - with self.assertRaises(AttributeError): - pb2_grpc.Response - - def testCall(self): - with _system_path([self.python_out_directory]): - pb2 = importlib.import_module(self.pb2_import) - with _system_path([self.grpc_python_out_directory]): - pb2_grpc = importlib.import_module(self.pb2_grpc_import) - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) - pb2_grpc.add_TestServiceServicer_to_server( - DummySplitServicer(pb2.Request, pb2.Response), server) - port = server.add_insecure_port('[::]:0') - server.start() - channel = grpc.insecure_channel('localhost:{}'.format(port)) - stub = pb2_grpc.TestServiceStub(channel) - request = pb2.Request() - expected_response = pb2.Response() - response = stub.Call(request) - self.assertEqual(expected_response, response) - - -class CommonTestMixin(object): - - def testImportAttributes(self): - with _system_path([self.python_out_directory]): - pb2 = importlib.import_module(self.pb2_import) - pb2.Request - pb2.Response - if self.should_find_services_in_pb2: - pb2.TestServiceServicer - else: - with self.assertRaises(AttributeError): - pb2.TestServiceServicer - - with _system_path([self.grpc_python_out_directory]): - pb2_grpc = importlib.import_module(self.pb2_grpc_import) - pb2_grpc.TestServiceServicer - with self.assertRaises(AttributeError): - pb2_grpc.Request - with self.assertRaises(AttributeError): - pb2_grpc.Response - - def testCall(self): - with _system_path([self.python_out_directory]): - pb2 = importlib.import_module(self.pb2_import) - with _system_path([self.grpc_python_out_directory]): - pb2_grpc = importlib.import_module(self.pb2_grpc_import) - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) - pb2_grpc.add_TestServiceServicer_to_server( - DummySplitServicer(pb2.Request, pb2.Response), server) - port = server.add_insecure_port('[::]:0') - server.start() - channel = grpc.insecure_channel('localhost:{}'.format(port)) - stub = pb2_grpc.TestServiceStub(channel) - request = pb2.Request() - expected_response = pb2.Response() - response = stub.Call(request) - self.assertEqual(expected_response, response) - - -@unittest.skipIf(platform.python_implementation() == "PyPy", - "Skip test if run with PyPy") -class SameSeparateTest(unittest.TestCase, SeparateTestMixin): - def setUp(self): - same_proto_contents = pkgutil.get_data( - 'tests.protoc_plugin.protos.invocation_testing', 'same.proto') - self.directory = tempfile.mkdtemp(suffix='same_separate', dir='.') - self.proto_directory = os.path.join(self.directory, 'proto_path') - self.python_out_directory = os.path.join(self.directory, 'python_out') - self.grpc_python_out_directory = os.path.join(self.directory, - 'grpc_python_out') - os.makedirs(self.proto_directory) - os.makedirs(self.python_out_directory) - os.makedirs(self.grpc_python_out_directory) - same_proto_file = os.path.join(self.proto_directory, - 'same_separate.proto') - open(same_proto_file, 'wb').write( - same_proto_contents.replace( - _COMMON_NAMESPACE, - b'package grpc_protoc_plugin.invocation_testing.same_separate;')) - protoc_result = protoc.main([ - '', - '--proto_path={}'.format(self.proto_directory), - '--python_out={}'.format(self.python_out_directory), - '--grpc_python_out=grpc_2_0:{}'.format( - self.grpc_python_out_directory), - same_proto_file, - ]) - if protoc_result != 0: - raise Exception("unexpected protoc error") - open(os.path.join(self.grpc_python_out_directory, '__init__.py'), - 'w').write('') - open(os.path.join(self.python_out_directory, '__init__.py'), - 'w').write('') - self.pb2_import = 'same_separate_pb2' - self.pb2_grpc_import = 'same_separate_pb2_grpc' - self.should_find_services_in_pb2 = False +class _Mid2016ProtocStyle(object): - def tearDown(self): - shutil.rmtree(self.directory) + def name(self): + return 'Mid2016ProtocStyle' + def grpc_in_pb2_expected(self): + return True -@unittest.skipIf(platform.python_implementation() == "PyPy", - "Skip test if run with PyPy") -class SameCommonTest(unittest.TestCase, CommonTestMixin): + def protoc(self, proto_path, python_out, absolute_proto_file_names): + return (_protoc(proto_path, python_out, 'grpc_1_0', python_out, + absolute_proto_file_names),) - def setUp(self): - same_proto_contents = pkgutil.get_data( - 'tests.protoc_plugin.protos.invocation_testing', 'same.proto') - self.directory = tempfile.mkdtemp(suffix='same_common', dir='.') - self.proto_directory = os.path.join(self.directory, 'proto_path') - self.python_out_directory = os.path.join(self.directory, 'python_out') - self.grpc_python_out_directory = self.python_out_directory - os.makedirs(self.proto_directory) - os.makedirs(self.python_out_directory) - same_proto_file = os.path.join(self.proto_directory, - 'same_common.proto') - open(same_proto_file, 'wb').write( - same_proto_contents.replace( - _COMMON_NAMESPACE, - b'package grpc_protoc_plugin.invocation_testing.same_common;')) - - protoc_result = protoc.main([ - '', - '--proto_path={}'.format(self.proto_directory), - '--python_out={}'.format(self.python_out_directory), - '--grpc_python_out={}'.format(self.grpc_python_out_directory), - same_proto_file, - ]) - if protoc_result != 0: - raise Exception("unexpected protoc error") - open(os.path.join(self.python_out_directory, '__init__.py'), - 'w').write('') - self.pb2_import = 'same_common_pb2' - self.pb2_grpc_import = 'same_common_pb2_grpc' - self.should_find_services_in_pb2 = True - def tearDown(self): - shutil.rmtree(self.directory) +class _SingleProtocExecutionProtocStyle(object): + def name(self): + return 'SingleProtocExecutionProtocStyle' -@unittest.skipIf(platform.python_implementation() == "PyPy", - "Skip test if run with PyPy") -class SplitCommonTest(unittest.TestCase, CommonTestMixin): + def grpc_in_pb2_expected(self): + return False - def setUp(self): - services_proto_contents = pkgutil.get_data( - 'tests.protoc_plugin.protos.invocation_testing.split_services', - 'services.proto') - messages_proto_contents = pkgutil.get_data( - 'tests.protoc_plugin.protos.invocation_testing.split_messages', - 'messages.proto') - self.directory = tempfile.mkdtemp(suffix='split_common', dir='.') - self.proto_directory = os.path.join(self.directory, 'proto_path') - self.python_out_directory = os.path.join(self.directory, 'python_out') - self.grpc_python_out_directory = self.python_out_directory - os.makedirs(self.proto_directory) - os.makedirs(self.python_out_directory) - services_proto_file = os.path.join(self.proto_directory, - 'split_common_services.proto') - messages_proto_file = os.path.join(self.proto_directory, - 'split_common_messages.proto') - open(services_proto_file, 'wb').write( - services_proto_contents.replace( - _MESSAGES_IMPORT, b'import "split_common_messages.proto";') - .replace( - _SPLIT_NAMESPACE, - b'package grpc_protoc_plugin.invocation_testing.split_common;')) - open(messages_proto_file, 'wb').write( - messages_proto_contents.replace( - _SPLIT_NAMESPACE, - b'package grpc_protoc_plugin.invocation_testing.split_common;')) - protoc_result = protoc.main([ - '', - '--proto_path={}'.format(self.proto_directory), - '--python_out={}'.format(self.python_out_directory), - '--grpc_python_out={}'.format(self.grpc_python_out_directory), - services_proto_file, - messages_proto_file, - ]) - if protoc_result != 0: - raise Exception("unexpected protoc error") - open(os.path.join(self.python_out_directory, '__init__.py'), - 'w').write('') - self.pb2_import = 'split_common_messages_pb2' - self.pb2_grpc_import = 'split_common_services_pb2_grpc' - self.should_find_services_in_pb2 = False + def protoc(self, proto_path, python_out, absolute_proto_file_names): + return (_protoc(proto_path, python_out, 'grpc_2_0', python_out, + absolute_proto_file_names),) + + +class _ProtoBeforeGrpcProtocStyle(object): + + def name(self): + return 'ProtoBeforeGrpcProtocStyle' + + def grpc_in_pb2_expected(self): + return False + + def protoc(self, proto_path, python_out, absolute_proto_file_names): + pb2_protoc_exit_code = _protoc(proto_path, python_out, None, None, + absolute_proto_file_names) + pb2_grpc_protoc_exit_code = _protoc( + proto_path, None, 'grpc_2_0', python_out, absolute_proto_file_names) + return pb2_protoc_exit_code, pb2_grpc_protoc_exit_code, - def tearDown(self): - shutil.rmtree(self.directory) +class _GrpcBeforeProtoProtocStyle(object): -@unittest.skipIf(platform.python_implementation() == "PyPy", - "Skip test if run with PyPy") -class SplitSeparateTest(unittest.TestCase, SeparateTestMixin): + def name(self): + return 'GrpcBeforeProtoProtocStyle' + + def grpc_in_pb2_expected(self): + return False + + def protoc(self, proto_path, python_out, absolute_proto_file_names): + pb2_grpc_protoc_exit_code = _protoc( + proto_path, None, 'grpc_2_0', python_out, absolute_proto_file_names) + pb2_protoc_exit_code = _protoc(proto_path, python_out, None, None, + absolute_proto_file_names) + return pb2_grpc_protoc_exit_code, pb2_protoc_exit_code, + + +_PROTOC_STYLES = (_Mid2016ProtocStyle(), _SingleProtocExecutionProtocStyle(), + _ProtoBeforeGrpcProtocStyle(), _GrpcBeforeProtoProtocStyle(),) + + +@unittest.skipIf(platform.python_implementation() == 'PyPy', + 'Skip test if run with PyPy!') +class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)): def setUp(self): - services_proto_contents = pkgutil.get_data( - 'tests.protoc_plugin.protos.invocation_testing.split_services', - 'services.proto') - messages_proto_contents = pkgutil.get_data( - 'tests.protoc_plugin.protos.invocation_testing.split_messages', - 'messages.proto') - self.directory = tempfile.mkdtemp(suffix='split_separate', dir='.') - self.proto_directory = os.path.join(self.directory, 'proto_path') - self.python_out_directory = os.path.join(self.directory, 'python_out') - self.grpc_python_out_directory = os.path.join(self.directory, - 'grpc_python_out') - os.makedirs(self.proto_directory) - os.makedirs(self.python_out_directory) - os.makedirs(self.grpc_python_out_directory) - services_proto_file = os.path.join(self.proto_directory, - 'split_separate_services.proto') - messages_proto_file = os.path.join(self.proto_directory, - 'split_separate_messages.proto') - open(services_proto_file, 'wb').write( - services_proto_contents.replace( - _MESSAGES_IMPORT, b'import "split_separate_messages.proto";') - .replace( - _SPLIT_NAMESPACE, - b'package grpc_protoc_plugin.invocation_testing.split_separate;' - )) - open(messages_proto_file, 'wb').write( - messages_proto_contents.replace( - _SPLIT_NAMESPACE, - b'package grpc_protoc_plugin.invocation_testing.split_separate;' - )) - protoc_result = protoc.main([ - '', - '--proto_path={}'.format(self.proto_directory), - '--python_out={}'.format(self.python_out_directory), - '--grpc_python_out=grpc_2_0:{}'.format( - self.grpc_python_out_directory), - services_proto_file, - messages_proto_file, - ]) - if protoc_result != 0: - raise Exception("unexpected protoc error") - open(os.path.join(self.python_out_directory, '__init__.py'), - 'w').write('') - self.pb2_import = 'split_separate_messages_pb2' - self.pb2_grpc_import = 'split_separate_services_pb2_grpc' - self.should_find_services_in_pb2 = False + self._directory = tempfile.mkdtemp(suffix=self.NAME, dir='.') + self._proto_path = path.join(self._directory, _RELATIVE_PROTO_PATH) + self._python_out = path.join(self._directory, _RELATIVE_PYTHON_OUT) + + os.makedirs(self._proto_path) + os.makedirs(self._python_out) + + proto_directories_and_names = { + (self.MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES, + self.MESSAGES_PROTO_FILE_NAME,), + (self.SERVICES_PROTO_RELATIVE_DIRECTORY_NAMES, + self.SERVICES_PROTO_FILE_NAME,), + } + messages_proto_relative_file_name_forward_slashes = '/'.join( + self.MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES + ( + self.MESSAGES_PROTO_FILE_NAME,)) + _create_directory_tree(self._proto_path, ( + relative_proto_directory_names + for relative_proto_directory_names, _ in proto_directories_and_names + )) + self._absolute_proto_file_names = set() + for relative_directory_names, file_name in proto_directories_and_names: + absolute_proto_file_name = path.join( + self._proto_path, *relative_directory_names + (file_name,)) + raw_proto_content = pkgutil.get_data( + 'tests.protoc_plugin.protos.invocation_testing', + path.join(*relative_directory_names + (file_name,))) + massaged_proto_content = _massage_proto_content( + raw_proto_content, + self.NAME.encode(), + messages_proto_relative_file_name_forward_slashes.encode()) + with open(absolute_proto_file_name, 'wb') as proto_file: + proto_file.write(massaged_proto_content) + self._absolute_proto_file_names.add(absolute_proto_file_name) def tearDown(self): - shutil.rmtree(self.directory) + shutil.rmtree(self._directory) + + def _protoc(self): + protoc_exit_codes = self.PROTOC_STYLE.protoc( + self._proto_path, self._python_out, self._absolute_proto_file_names) + for protoc_exit_code in protoc_exit_codes: + self.assertEqual(0, protoc_exit_code) + + _packagify(self._python_out) + + generated_modules = {} + expected_generated_full_module_names = { + self.EXPECTED_MESSAGES_PB2, + self.EXPECTED_SERVICES_PB2, + self.EXPECTED_SERVICES_PB2_GRPC, + } + with _system_path([self._python_out]): + for full_module_name in expected_generated_full_module_names: + module = importlib.import_module(full_module_name) + generated_modules[full_module_name] = module + + self._messages_pb2 = generated_modules[self.EXPECTED_MESSAGES_PB2] + self._services_pb2 = generated_modules[self.EXPECTED_SERVICES_PB2] + self._services_pb2_grpc = generated_modules[ + self.EXPECTED_SERVICES_PB2_GRPC] + + def _services_modules(self): + if self.PROTOC_STYLE.grpc_in_pb2_expected(): + return self._services_pb2, self._services_pb2_grpc, + else: + return self._services_pb2_grpc, + + def test_imported_attributes(self): + self._protoc() + + self._messages_pb2.Request + self._messages_pb2.Response + self._services_pb2.DESCRIPTOR.services_by_name['TestService'] + for services_module in self._services_modules(): + services_module.TestServiceStub + services_module.TestServiceServicer + services_module.add_TestServiceServicer_to_server + + def test_call(self): + self._protoc() + + for services_module in self._services_modules(): + server = grpc.server( + futures.ThreadPoolExecutor( + max_workers=test_constants.POOL_SIZE)) + services_module.add_TestServiceServicer_to_server( + _Servicer(self._messages_pb2.Response), server) + port = server.add_insecure_port('[::]:0') + server.start() + channel = grpc.insecure_channel('localhost:{}'.format(port)) + stub = services_module.TestServiceStub(channel) + response = stub.Call(self._messages_pb2.Request()) + self.assertEqual(self._messages_pb2.Response(), response) + + +def _create_test_case_class(split_proto, protoc_style): + attributes = {} + + name = '{}{}'.format('SplitProto' if split_proto else 'SameProto', + protoc_style.name()) + attributes['NAME'] = name + + if split_proto: + attributes['MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES'] = ( + 'split_messages', 'sub',) + attributes['MESSAGES_PROTO_FILE_NAME'] = 'messages.proto' + attributes['SERVICES_PROTO_RELATIVE_DIRECTORY_NAMES'] = ( + 'split_services',) + attributes['SERVICES_PROTO_FILE_NAME'] = 'services.proto' + attributes['EXPECTED_MESSAGES_PB2'] = 'split_messages.sub.messages_pb2' + attributes['EXPECTED_SERVICES_PB2'] = 'split_services.services_pb2' + attributes['EXPECTED_SERVICES_PB2_GRPC'] = ( + 'split_services.services_pb2_grpc') + else: + attributes['MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES'] = () + attributes['MESSAGES_PROTO_FILE_NAME'] = 'same.proto' + attributes['SERVICES_PROTO_RELATIVE_DIRECTORY_NAMES'] = () + attributes['SERVICES_PROTO_FILE_NAME'] = 'same.proto' + attributes['EXPECTED_MESSAGES_PB2'] = 'same_pb2' + attributes['EXPECTED_SERVICES_PB2'] = 'same_pb2' + attributes['EXPECTED_SERVICES_PB2_GRPC'] = 'same_pb2_grpc' + + attributes['PROTOC_STYLE'] = protoc_style + + attributes['__module__'] = _Test.__module__ + + return type('{}Test'.format(name), (_Test,), attributes) + + +def _create_test_case_classes(): + for split_proto in (False, True,): + for protoc_style in _PROTOC_STYLES: + yield _create_test_case_class(split_proto, protoc_style) + + +def load_tests(loader, tests, pattern): + tests = tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in _create_test_case_classes()) + return unittest.TestSuite(tests=tests) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/messages.proto b/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/sub/messages.proto index 1b780c69ba..1b780c69ba 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/messages.proto +++ b/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/sub/messages.proto diff --git a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_services/__init__.py b/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_services/__init__.py deleted file mode 100644 index 5772620b60..0000000000 --- a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_services/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2016 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/python/grpcio_tests/tests/qps/benchmark_server.py b/src/python/grpcio_tests/tests/qps/benchmark_server.py index 05101fdc6d..bb07844491 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_server.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_server.py @@ -13,10 +13,10 @@ # limitations under the License. from src.proto.grpc.testing import messages_pb2 -from src.proto.grpc.testing import services_pb2 +from src.proto.grpc.testing import services_pb2_grpc -class BenchmarkServer(services_pb2.BenchmarkServiceServicer): +class BenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer): """Synchronous Server implementation for the Benchmark service.""" def UnaryCall(self, request, context): @@ -29,7 +29,7 @@ class BenchmarkServer(services_pb2.BenchmarkServiceServicer): yield messages_pb2.SimpleResponse(payload=payload) -class GenericBenchmarkServer(services_pb2.BenchmarkServiceServicer): +class GenericBenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer): """Generic Server implementation for the Benchmark service.""" def __init__(self, resp_size): diff --git a/src/python/grpcio_tests/tests/stress/metrics_server.py b/src/python/grpcio_tests/tests/stress/metrics_server.py index 11ab6c3f4e..33a74b4a38 100644 --- a/src/python/grpcio_tests/tests/stress/metrics_server.py +++ b/src/python/grpcio_tests/tests/stress/metrics_server.py @@ -16,11 +16,12 @@ import time from src.proto.grpc.testing import metrics_pb2 +from src.proto.grpc.testing import metrics_pb2_grpc GAUGE_NAME = 'python_overall_qps' -class MetricsServer(metrics_pb2.MetricsServiceServicer): +class MetricsServer(metrics_pb2_grpc.MetricsServiceServicer): def __init__(self, histogram): self._start_time = time.time() diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 4c078e6c22..8512d5b96f 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -1,12 +1,17 @@ [ + "_sanity._sanity_test.SanityTest", "health_check._health_servicer_test.HealthServicerTest", "interop._insecure_intraop_test.InsecureIntraopTest", "interop._secure_intraop_test.SecureIntraopTest", "protoc_plugin._python_plugin_test.PythonPluginTest", - "protoc_plugin._split_definitions_test.SameCommonTest", - "protoc_plugin._split_definitions_test.SameSeparateTest", - "protoc_plugin._split_definitions_test.SplitCommonTest", - "protoc_plugin._split_definitions_test.SplitSeparateTest", + "protoc_plugin._split_definitions_test.SameProtoGrpcBeforeProtoProtocStyleTest", + "protoc_plugin._split_definitions_test.SameProtoMid2016ProtocStyleTest", + "protoc_plugin._split_definitions_test.SameProtoProtoBeforeGrpcProtocStyleTest", + "protoc_plugin._split_definitions_test.SameProtoSingleProtocExecutionProtocStyleTest", + "protoc_plugin._split_definitions_test.SplitProtoGrpcBeforeProtoProtocStyleTest", + "protoc_plugin._split_definitions_test.SplitProtoMid2016ProtocStyleTest", + "protoc_plugin._split_definitions_test.SplitProtoProtoBeforeGrpcProtocStyleTest", + "protoc_plugin._split_definitions_test.SplitProtoSingleProtocExecutionProtocStyleTest", "protoc_plugin.beta_python_plugin_test.PythonPluginTest", "reflection._reflection_servicer_test.ReflectionServicerTest", "testing._client_test.ClientTest", @@ -41,7 +46,6 @@ "unit._reconnect_test.ReconnectTest", "unit._resource_exhausted_test.ResourceExhaustedTest", "unit._rpc_test.RPCTest", - "unit._sanity._sanity_test.Sanity", "unit._thread_cleanup_test.CleanupThreadTest", "unit.beta._beta_features_test.BetaFeaturesTest", "unit.beta._beta_features_test.ContextManagementAndLifecycleTest", diff --git a/src/python/grpcio_tests/tests/unit/_sanity/__init__.py b/src/python/grpcio_tests/tests/unit/_sanity/__init__.py deleted file mode 100644 index 5772620b60..0000000000 --- a/src/python/grpcio_tests/tests/unit/_sanity/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2016 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/ruby/qps/proxy-worker.rb b/src/ruby/qps/proxy-worker.rb index d7a9f114bd..488610ae74 100755 --- a/src/ruby/qps/proxy-worker.rb +++ b/src/ruby/qps/proxy-worker.rb @@ -41,7 +41,7 @@ class ProxyBenchmarkClientServiceImpl < Grpc::Testing::ProxyClientService::Servi @histogram = Histogram.new(@histres, @histmax) @start_time = Time.now # TODO(vjpai): Support multiple client channels by spawning off a PHP client per channel - command = "php " + File.expand_path(File.dirname(__FILE__)) + "/../../php/tests/qps/client.php " + @mytarget + command = "php -d extension=" + File.expand_path(File.dirname(__FILE__)) + "/../../php/ext/grpc/modules/grpc.so " + File.expand_path(File.dirname(__FILE__)) + "/../../php/tests/qps/client.php " + @mytarget puts "Starting command: " + command @php_pid = spawn(command) end |