diff options
Diffstat (limited to 'src')
26 files changed, 1577 insertions, 1139 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index c2db8eff71..253280bd24 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -140,7 +140,6 @@ grpc::string GetHeaderIncludes(grpc_generator::File *file, printer->Print(vars, "namespace grpc {\n"); printer->Print(vars, "class CompletionQueue;\n"); printer->Print(vars, "class Channel;\n"); - printer->Print(vars, "class RpcService;\n"); printer->Print(vars, "class ServerCompletionQueue;\n"); printer->Print(vars, "class ServerContext;\n"); printer->Print(vars, "} // namespace grpc\n\n"); @@ -324,7 +323,8 @@ void PrintHeaderClientMethodInterfaces( } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, - "virtual ::grpc::ClientReaderInterface< $Response$>* $Method$Raw(" + "virtual ::grpc::ClientReaderInterface< $Response$>* " + "$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request) = 0;\n"); for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; @@ -546,7 +546,8 @@ void PrintHeaderClientMethodData(grpc_generator::Printer *printer, const grpc_generator::Method *method, std::map<grpc::string, grpc::string> *vars) { (*vars)["Method"] = method->name(); - printer->Print(*vars, "const ::grpc::RpcMethod rpcmethod_$Method$_;\n"); + printer->Print(*vars, + "const ::grpc::internal::RpcMethod rpcmethod_$Method$_;\n"); } void PrintHeaderServerMethodSync(grpc_generator::Printer *printer, @@ -718,7 +719,7 @@ void PrintHeaderServerMethodStreamedUnary( printer->Print(*vars, "WithStreamedUnaryMethod_$Method$() {\n" " ::grpc::Service::MarkMethodStreamed($Idx$,\n" - " new ::grpc::StreamedUnaryHandler< $Request$, " + " new ::grpc::internal::StreamedUnaryHandler< $Request$, " "$Response$>(std::bind" "(&WithStreamedUnaryMethod_$Method$<BaseClass>::" "Streamed$Method$, this, std::placeholders::_1, " @@ -766,15 +767,16 @@ void PrintHeaderServerMethodSplitStreaming( "{}\n"); printer->Print(" public:\n"); printer->Indent(); - printer->Print(*vars, - "WithSplitStreamingMethod_$Method$() {\n" - " ::grpc::Service::MarkMethodStreamed($Idx$,\n" - " new ::grpc::SplitServerStreamingHandler< $Request$, " - "$Response$>(std::bind" - "(&WithSplitStreamingMethod_$Method$<BaseClass>::" - "Streamed$Method$, this, std::placeholders::_1, " - "std::placeholders::_2)));\n" - "}\n"); + printer->Print( + *vars, + "WithSplitStreamingMethod_$Method$() {\n" + " ::grpc::Service::MarkMethodStreamed($Idx$,\n" + " new ::grpc::internal::SplitServerStreamingHandler< $Request$, " + "$Response$>(std::bind" + "(&WithSplitStreamingMethod_$Method$<BaseClass>::" + "Streamed$Method$, this, std::placeholders::_1, " + "std::placeholders::_2)));\n" + "}\n"); printer->Print(*vars, "~WithSplitStreamingMethod_$Method$() override {\n" " BaseClassMustBeDerivedFromService(this);\n" @@ -914,7 +916,8 @@ void PrintHeaderService(grpc_generator::Printer *printer, " {\n public:\n"); printer->Indent(); printer->Print( - "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n"); + "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& " + "channel);\n"); for (int i = 0; i < service->method_count(); ++i) { PrintHeaderClientMethod(printer, service->method(i).get(), vars, true); } @@ -1185,10 +1188,9 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) {\n"); printer->Print(*vars, - " return ::grpc::BlockingUnaryCall(channel_.get(), " - "rpcmethod_$Method$_, " - "context, request, response);\n" - "}\n\n"); + " return ::grpc::internal::BlockingUnaryCall" + "(channel_.get(), rpcmethod_$Method$_, " + "context, request, response);\n}\n\n"); for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; @@ -1198,25 +1200,27 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "ClientContext* context, " "const $Request$& request, " "::grpc::CompletionQueue* cq) {\n"); - printer->Print(*vars, - " return " - "::grpc::ClientAsyncResponseReader< $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, request, $AsyncStart$);\n" - "}\n\n"); + printer->Print( + *vars, + " return " + "::grpc::internal::ClientAsyncResponseReaderFactory< $Response$>" + "::Create(channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, request, $AsyncStart$);\n" + "}\n\n"); } } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, "::grpc::ClientWriter< $Request$>* " "$ns$$Service$::Stub::$Method$Raw(" "::grpc::ClientContext* context, $Response$* response) {\n"); - printer->Print(*vars, - " return new ::grpc::ClientWriter< $Request$>(" - "channel_.get(), " - "rpcmethod_$Method$_, " - "context, response);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::internal::ClientWriterFactory< $Request$>::Create(" + "channel_.get(), " + "rpcmethod_$Method$_, " + "context, response);\n" + "}\n\n"); for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; @@ -1227,12 +1231,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(" "::grpc::ClientContext* context, $Response$* response, " "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); - printer->Print(*vars, - " return ::grpc::ClientAsyncWriter< $Request$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, response, $AsyncStart$$AsyncCreateArgs$);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::internal::ClientAsyncWriterFactory< $Request$>" + "::Create(channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, response, $AsyncStart$$AsyncCreateArgs$);\n" + "}\n\n"); } } else if (ServerOnlyStreaming(method)) { printer->Print( @@ -1240,12 +1245,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientReader< $Response$>* " "$ns$$Service$::Stub::$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request) {\n"); - printer->Print(*vars, - " return new ::grpc::ClientReader< $Response$>(" - "channel_.get(), " - "rpcmethod_$Method$_, " - "context, request);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::internal::ClientReaderFactory< $Response$>::Create(" + "channel_.get(), " + "rpcmethod_$Method$_, " + "context, request);\n" + "}\n\n"); for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; @@ -1257,12 +1263,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request, " "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); - printer->Print(*vars, - " return ::grpc::ClientAsyncReader< $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, request, $AsyncStart$$AsyncCreateArgs$);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::internal::ClientAsyncReaderFactory< $Response$>" + "::Create(channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, request, $AsyncStart$$AsyncCreateArgs$);\n" + "}\n\n"); } } else if (method->BidiStreaming()) { printer->Print( @@ -1270,8 +1277,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientReaderWriter< $Request$, $Response$>* " "$ns$$Service$::Stub::$Method$Raw(::grpc::ClientContext* context) {\n"); printer->Print(*vars, - " return new ::grpc::ClientReaderWriter< " - "$Request$, $Response$>(" + " return ::grpc::internal::ClientReaderWriterFactory< " + "$Request$, $Response$>::Create(" "channel_.get(), " "rpcmethod_$Method$_, " "context);\n" @@ -1286,14 +1293,14 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(::grpc::" "ClientContext* context, " "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); - printer->Print( - *vars, - " return " - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, $AsyncStart$$AsyncCreateArgs$);\n" - "}\n\n"); + printer->Print(*vars, + " return " + "::grpc::internal::ClientAsyncReaderWriterFactory< " + "$Request$, $Response$>::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, $AsyncStart$$AsyncCreateArgs$);\n" + "}\n\n"); } } } @@ -1404,7 +1411,7 @@ void PrintSourceService(grpc_generator::Printer *printer, printer->Print(*vars, ", rpcmethod_$Method$_(" "$prefix$$Service$_method_names[$Idx$], " - "::grpc::RpcMethod::$StreamingType$, " + "::grpc::internal::RpcMethod::$StreamingType$, " "channel" ")\n"); } @@ -1427,38 +1434,38 @@ void PrintSourceService(grpc_generator::Printer *printer, if (method->NoStreaming()) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::NORMAL_RPC,\n" - " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, " + " ::grpc::internal::RpcMethod::NORMAL_RPC,\n" + " new ::grpc::internal::RpcMethodHandler< $ns$$Service$::Service, " "$Request$, " "$Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ClientOnlyStreaming(method.get())) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::CLIENT_STREAMING,\n" - " new ::grpc::ClientStreamingHandler< " + " ::grpc::internal::RpcMethod::CLIENT_STREAMING,\n" + " new ::grpc::internal::ClientStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ServerOnlyStreaming(method.get())) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::SERVER_STREAMING,\n" - " new ::grpc::ServerStreamingHandler< " + " ::grpc::internal::RpcMethod::SERVER_STREAMING,\n" + " new ::grpc::internal::ServerStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (method->BidiStreaming()) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::BIDI_STREAMING,\n" - " new ::grpc::BidiStreamingHandler< " + " ::grpc::internal::RpcMethod::BIDI_STREAMING,\n" + " new ::grpc::internal::BidiStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } diff --git a/src/core/ext/filters/client_channel/backup_poller.cc b/src/core/ext/filters/client_channel/backup_poller.cc new file mode 100644 index 0000000000..466bf86bc0 --- /dev/null +++ b/src/core/ext/filters/client_channel/backup_poller.cc @@ -0,0 +1,158 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/ext/filters/client_channel/backup_poller.h" + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/env.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/completion_queue.h" + +#define DEFAULT_POLL_INTERVAL_MS 5000 + +typedef struct backup_poller { + grpc_timer polling_timer; + grpc_closure run_poller_closure; + grpc_closure shutdown_closure; + gpr_mu* pollset_mu; + grpc_pollset* pollset; // guarded by pollset_mu + bool shutting_down; // guarded by pollset_mu + gpr_refcount refs; + gpr_refcount shutdown_refs; +} backup_poller; + +static gpr_once g_once = GPR_ONCE_INIT; +static gpr_mu g_poller_mu; +static backup_poller* g_poller = NULL; // guarded by g_poller_mu +// g_poll_interval_ms is set only once at the first time +// grpc_client_channel_start_backup_polling() is called, after that it is +// treated as const. +static int g_poll_interval_ms = DEFAULT_POLL_INTERVAL_MS; + +static void init_globals() { + gpr_mu_init(&g_poller_mu); + char* env = gpr_getenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS"); + if (env != NULL) { + int poll_interval_ms = gpr_parse_nonnegative_int(env); + if (poll_interval_ms == -1) { + gpr_log(GPR_ERROR, + "Invalid GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS: %s, " + "default value %d will be used.", + env, g_poll_interval_ms); + } else { + g_poll_interval_ms = poll_interval_ms; + } + } + gpr_free(env); +} + +static void backup_poller_shutdown_unref(grpc_exec_ctx* exec_ctx, + backup_poller* p) { + if (gpr_unref(&p->shutdown_refs)) { + grpc_pollset_destroy(exec_ctx, p->pollset); + gpr_free(p->pollset); + gpr_free(p); + } +} + +static void done_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + backup_poller_shutdown_unref(exec_ctx, (backup_poller*)arg); +} + +static void g_poller_unref(grpc_exec_ctx* exec_ctx) { + if (gpr_unref(&g_poller->refs)) { + gpr_mu_lock(&g_poller_mu); + backup_poller* p = g_poller; + g_poller = NULL; + gpr_mu_unlock(&g_poller_mu); + gpr_mu_lock(p->pollset_mu); + p->shutting_down = true; + grpc_pollset_shutdown(exec_ctx, p->pollset, + GRPC_CLOSURE_INIT(&p->shutdown_closure, done_poller, + p, grpc_schedule_on_exec_ctx)); + gpr_mu_unlock(p->pollset_mu); + grpc_timer_cancel(exec_ctx, &p->polling_timer); + } +} + +static void run_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + backup_poller* p = (backup_poller*)arg; + if (error != GRPC_ERROR_NONE) { + if (error != GRPC_ERROR_CANCELLED) { + GRPC_LOG_IF_ERROR("run_poller", GRPC_ERROR_REF(error)); + } + backup_poller_shutdown_unref(exec_ctx, p); + return; + } + gpr_mu_lock(p->pollset_mu); + if (p->shutting_down) { + gpr_mu_unlock(p->pollset_mu); + backup_poller_shutdown_unref(exec_ctx, p); + return; + } + grpc_error* err = grpc_pollset_work(exec_ctx, p->pollset, NULL, + grpc_exec_ctx_now(exec_ctx)); + gpr_mu_unlock(p->pollset_mu); + GRPC_LOG_IF_ERROR("Run client channel backup poller", err); + grpc_timer_init(exec_ctx, &p->polling_timer, + grpc_exec_ctx_now(exec_ctx) + g_poll_interval_ms, + &p->run_poller_closure); +} + +void grpc_client_channel_start_backup_polling( + grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties) { + gpr_once_init(&g_once, init_globals); + if (g_poll_interval_ms == 0) { + return; + } + gpr_mu_lock(&g_poller_mu); + if (g_poller == NULL) { + g_poller = (backup_poller*)gpr_zalloc(sizeof(backup_poller)); + g_poller->pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size()); + g_poller->shutting_down = false; + grpc_pollset_init(g_poller->pollset, &g_poller->pollset_mu); + gpr_ref_init(&g_poller->refs, 0); + // one for timer cancellation, one for pollset shutdown + gpr_ref_init(&g_poller->shutdown_refs, 2); + GRPC_CLOSURE_INIT(&g_poller->run_poller_closure, run_poller, g_poller, + grpc_schedule_on_exec_ctx); + grpc_timer_init(exec_ctx, &g_poller->polling_timer, + grpc_exec_ctx_now(exec_ctx) + g_poll_interval_ms, + &g_poller->run_poller_closure); + } + gpr_ref(&g_poller->refs); + gpr_mu_unlock(&g_poller_mu); + grpc_pollset_set_add_pollset(exec_ctx, interested_parties, g_poller->pollset); +} + +void grpc_client_channel_stop_backup_polling( + grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties) { + if (g_poll_interval_ms == 0) { + return; + } + grpc_pollset_set_del_pollset(exec_ctx, interested_parties, g_poller->pollset); + g_poller_unref(exec_ctx); +} diff --git a/src/core/ext/filters/client_channel/backup_poller.h b/src/core/ext/filters/client_channel/backup_poller.h new file mode 100644 index 0000000000..e993d50639 --- /dev/null +++ b/src/core/ext/filters/client_channel/backup_poller.h @@ -0,0 +1,34 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H + +#include <grpc/grpc.h> +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +/* Start polling \a interested_parties periodically in the timer thread */ +void grpc_client_channel_start_backup_polling( + grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties); + +/* Stop polling \a interested_parties */ +void grpc_client_channel_stop_backup_polling( + grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties); + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H */ diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index ea5e076c3b..00c51ba543 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -31,6 +31,7 @@ #include <grpc/support/sync.h> #include <grpc/support/useful.h> +#include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" @@ -712,6 +713,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); + grpc_client_channel_start_backup_polling(exec_ctx, chand->interested_parties); // Record client channel factory. const grpc_arg *arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_CLIENT_CHANNEL_FACTORY); @@ -790,6 +792,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } + grpc_client_channel_stop_backup_polling(exec_ctx, chand->interested_parties); grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); grpc_pollset_set_destroy(exec_ctx, chand->interested_parties); GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel"); @@ -898,7 +901,7 @@ static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx, call_data *calld = (call_data *)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, - "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s", + "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", elem->channel_data, calld, calld->waiting_for_pick_batches_count, grpc_error_string(error)); } @@ -940,7 +943,7 @@ static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx, channel_data *chand = (channel_data *)elem->channel_data; call_data *calld = (call_data *)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR + gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIuPTR " pending batches to subchannel_call=%p", chand, calld, calld->waiting_for_pick_batches_count, calld->subchannel_call); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index d6bdc13ba9..85e76e68b5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -611,7 +611,6 @@ static void update_lb_connectivity_status_locked( case GRPC_CHANNEL_SHUTDOWN: GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); break; - case GRPC_CHANNEL_INIT: case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_READY: @@ -1790,7 +1789,6 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, // embedded RR policy. Note that the current RR policy, if any, will stay in // effect until an update from the new lb_call is received. switch (glb_policy->lb_channel_connectivity) { - case GRPC_CHANNEL_INIT: case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_TRANSIENT_FAILURE: { /* resub. */ diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 56c261ba34..f0c66c68e1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -20,6 +20,7 @@ #include <grpc/support/alloc.h> +#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" @@ -42,103 +43,73 @@ typedef struct { /** base policy: must be first */ grpc_lb_policy base; /** all our subchannels */ - grpc_subchannel **subchannels; - grpc_subchannel **new_subchannels; - size_t num_subchannels; - size_t num_new_subchannels; - - grpc_closure connectivity_changed; - - /** remaining members are protected by the combiner */ - - /** the selected channel */ - grpc_connected_subchannel *selected; - - /** the subchannel key for \a selected, or NULL if \a selected not set */ - const grpc_subchannel_key *selected_key; - + grpc_lb_subchannel_list *subchannel_list; + /** latest pending subchannel list */ + grpc_lb_subchannel_list *latest_pending_subchannel_list; + /** selected subchannel in \a subchannel_list */ + grpc_lb_subchannel_data *selected; /** have we started picking? */ bool started_picking; /** are we shut down? */ bool shutdown; - /** are we updating the selected subchannel? */ - bool updating_selected; - /** are we updating the subchannel candidates? */ - bool updating_subchannels; - /** args from the latest update received while already updating, or NULL */ - grpc_lb_policy_args *pending_update_args; - /** which subchannel are we watching? */ - size_t checking_subchannel; - /** what is the connectivity of that channel? */ - grpc_connectivity_state checking_connectivity; /** list of picks that are waiting on connectivity */ pending_pick *pending_picks; - /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + GPR_ASSERT(p->subchannel_list == NULL); + GPR_ASSERT(p->latest_pending_subchannel_list == NULL); GPR_ASSERT(p->pending_picks == NULL); - for (size_t i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy"); - } - if (p->selected != NULL) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, - "picked_first_destroy"); - } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); - grpc_subchannel_index_unref(); - if (p->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); - gpr_free(p->pending_update_args); - } - gpr_free(p->subchannels); - gpr_free(p->new_subchannels); gpr_free(p); + grpc_subchannel_index_unref(); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void *)p); } } -static void fail_pending_picks_for_shutdown(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { +static void shutdown_locked(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p, + grpc_error *error) { + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p); + } + p->shutdown = true; pending_pick *pp; while ((pp = p->pending_picks) != NULL) { p->pending_picks = pp->next; *pp->target = NULL; - GRPC_CLOSURE_SCHED( - exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error)); gpr_free(pp); } + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "shutdown"); + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "pf_shutdown"); + p->subchannel_list = NULL; + } + if (p->latest_pending_subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, "pf_shutdown"); + p->latest_pending_subchannel_list = NULL; + } + GRPC_ERROR_UNREF(error); } static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - p->shutdown = true; - fail_pending_picks_for_shutdown(exec_ctx, p); - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"), "shutdown"); - /* cancel subscription */ - if (p->selected != NULL) { - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); - } else if (p->num_subchannels > 0 && p->started_picking) { - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, - &p->connectivity_changed); - } + shutdown_locked(exec_ctx, (pick_first_lb_policy *)pol, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown")); } static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -162,8 +133,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_eq, grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -185,15 +155,12 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void start_picking_locked(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { p->started_picking = true; - if (p->subchannels != NULL) { - GPR_ASSERT(p->num_subchannels > 0); - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); + if (p->subchannel_list != NULL && p->subchannel_list->num_subchannels > 0) { + p->subchannel_list->checking_subchannel = 0; + grpc_lb_subchannel_list_ref_for_connectivity_watch( + p->subchannel_list, "connectivity_watch+start_picking"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &p->subchannel_list->subchannels[0]); } } @@ -210,19 +177,17 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - - /* Check atomically for a selected channel */ + // If we have a selected subchannel already, return synchronously. if (p->selected != NULL) { - *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); + *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel, + "picked"); return 1; } - - /* No subchannel selected yet, so try again */ + // No subchannel selected yet, so handle asynchronously. if (!p->started_picking) { start_picking_locked(exec_ctx, p); } - pp = (pending_pick *)gpr_malloc(sizeof(*pp)); + pending_pick *pp = (pending_pick *)gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->target = target; pp->initial_metadata_flags = pick_args->initial_metadata_flags; @@ -231,19 +196,15 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, return 0; } -static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - size_t num_subchannels = p->num_subchannels; - grpc_subchannel **subchannels = p->subchannels; - - p->num_subchannels = 0; - p->subchannels = NULL; - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); - - for (size_t i = 0; i < num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); +static void destroy_unselected_subchannels_locked(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { + for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { + grpc_lb_subchannel_data *sd = &p->subchannel_list->subchannels[i]; + if (p->selected != sd) { + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "selected_different_subchannel"); + } } - gpr_free(subchannels); } static grpc_connectivity_state pf_check_connectivity_locked( @@ -265,46 +226,24 @@ static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; if (p->selected) { - grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); + grpc_connected_subchannel_ping(exec_ctx, p->selected->connected_subchannel, + closure); } else { GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); } } -/* unsubscribe all subchannels */ -static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - if (p->num_subchannels > 0) { - GPR_ASSERT(p->selected == NULL); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, "Pick First %p unsubscribing from subchannel %p", - (void *)p, (void *)p->subchannels[p->checking_subchannel]); - } - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, - &p->connectivity_changed); - p->updating_subchannels = true; - } else if (p->selected != NULL) { - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, - "Pick First %p unsubscribing from selected subchannel %p", - (void *)p, (void *)p->selected); - } - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); - p->updating_selected = true; - } -} +static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); -/* true upon success */ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_args *args) { pick_first_lb_policy *p = (pick_first_lb_policy *)policy; const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - if (p->subchannels == NULL) { + if (p->subchannel_list == NULL) { // If we don't have a current subchannel list, go into TRANSIENT FAILURE. grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, @@ -321,270 +260,222 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } const grpc_lb_addresses *addresses = (const grpc_lb_addresses *)arg->value.pointer.p; - if (addresses->num_addresses == 0) { - // Empty update. Unsubscribe from all current subchannels and put the - // channel in TRANSIENT_FAILURE. + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", + (void *)p, (unsigned long)addresses->num_addresses); + } + grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create( + exec_ctx, &p->base, &grpc_lb_pick_first_trace, addresses, args, + pf_connectivity_changed_locked); + if (subchannel_list->num_subchannels == 0) { + // Empty update or no valid subchannels. Unsubscribe from all current + // subchannels and put the channel in TRANSIENT_FAILURE. grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "pf_update_empty"); - stop_connectivity_watchers(exec_ctx, p); + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_empty_update"); + } + p->subchannel_list = subchannel_list; // Empty list. + p->selected = NULL; return; } - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", - (void *)p, (unsigned long)addresses->num_addresses); - } - grpc_subchannel_args *sc_args = (grpc_subchannel_args *)gpr_zalloc( - sizeof(*sc_args) * addresses->num_addresses); - /* We remove the following keys in order for subchannel keys belonging to - * subchannels point to the same address to match. */ - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, - GRPC_ARG_LB_ADDRESSES}; - size_t sc_args_count = 0; - - /* Create list of subchannel args for new addresses in \a args. */ - for (size_t i = 0; i < addresses->num_addresses; i++) { - // If there were any balancer, we would have chosen grpclb policy instead. - GPR_ASSERT(!addresses->addresses[i].is_balancer); - if (addresses->addresses[i].user_data != NULL) { - gpr_log(GPR_ERROR, - "This LB policy doesn't support user data. It will be ignored"); + if (p->selected == NULL) { + // We don't yet have a selected subchannel, so replace the current + // subchannel list immediately. + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "pf_update_before_selected"); } - grpc_arg addr_arg = - grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, - 1); - gpr_free(addr_arg.value.string); - sc_args[sc_args_count++].args = new_args; - } - - /* Check if p->selected is amongst them. If so, we are done. */ - if (p->selected != NULL) { - GPR_ASSERT(p->selected_key != NULL); - for (size_t i = 0; i < sc_args_count; i++) { - grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]); - const bool found_selected = - grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0; - grpc_subchannel_key_destroy(exec_ctx, ith_sc_key); - if (found_selected) { + p->subchannel_list = subchannel_list; + } else { + // We do have a selected subchannel. + // Check if it's present in the new list. If so, we're done. + for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { + grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i]; + if (sd->subchannel == p->selected->subchannel) { // The currently selected subchannel is in the update: we are done. if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, - "Pick First %p found already selected subchannel %p amongst " - "updates. Update done.", - (void *)p, (void *)p->selected); + "Pick First %p found already selected subchannel %p " + "at update index %" PRIuPTR " of %" PRIuPTR "; update done", + p, p->selected->subchannel, i, + subchannel_list->num_subchannels); + } + grpc_lb_subchannel_list_ref_for_connectivity_watch( + subchannel_list, "connectivity_watch+replace_selected"); + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "pf_update_includes_selected"); } - for (size_t j = 0; j < sc_args_count; j++) { - grpc_channel_args_destroy(exec_ctx, - (grpc_channel_args *)sc_args[j].args); + p->subchannel_list = subchannel_list; + if (p->selected->connected_subchannel != NULL) { + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + p->selected->connected_subchannel, "pf_update_includes_selected"); + } + p->selected = sd; + destroy_unselected_subchannels_locked(exec_ctx, p); + // If there was a previously pending update (which may or may + // not have contained the currently selected subchannel), drop + // it, so that it doesn't override what we've done here. + if (p->latest_pending_subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, + "pf_update_includes_selected+outdated"); + p->latest_pending_subchannel_list = NULL; } - gpr_free(sc_args); return; } } - } - // We only check for already running updates here because if the previous - // steps were successful, the update can be considered done without any - // interference (ie, no callbacks were scheduled). - if (p->updating_selected || p->updating_subchannels) { - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, - "Update already in progress for pick first %p. Deferring update.", - (void *)p); - } - if (p->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); - gpr_free(p->pending_update_args); - } - p->pending_update_args = - (grpc_lb_policy_args *)gpr_zalloc(sizeof(*p->pending_update_args)); - p->pending_update_args->client_channel_factory = - args->client_channel_factory; - p->pending_update_args->args = grpc_channel_args_copy(args->args); - p->pending_update_args->combiner = args->combiner; - return; - } - /* Create the subchannels for the new subchannel args/addresses. */ - grpc_subchannel **new_subchannels = - (grpc_subchannel **)gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); - size_t num_new_subchannels = 0; - for (size_t i = 0; i < sc_args_count; i++) { - grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( - exec_ctx, args->client_channel_factory, &sc_args[i]); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - char *address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_INFO, - "Pick First %p created subchannel %p for address uri %s", - (void *)p, (void *)subchannel, address_uri); - gpr_free(address_uri); + // Not keeping the previous selected subchannel, so set the latest + // pending subchannel list to the new subchannel list. We will wait + // for it to report READY before swapping it into the current + // subchannel list. + if (p->latest_pending_subchannel_list != NULL) { + if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { + gpr_log(GPR_DEBUG, + "Pick First %p Shutting down latest pending subchannel list " + "%p, about to be replaced by newer latest %p", + (void *)p, (void *)p->latest_pending_subchannel_list, + (void *)subchannel_list); + } + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, + "sl_outdated_dont_smash"); } - grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args); - if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel; - } - gpr_free(sc_args); - if (num_new_subchannels == 0) { - gpr_free(new_subchannels); - // Empty update. Unsubscribe from all current subchannels and put the - // channel in TRANSIENT_FAILURE. - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"), - "pf_update_no_valid_addresses"); - stop_connectivity_watchers(exec_ctx, p); - return; + p->latest_pending_subchannel_list = subchannel_list; } - - /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */ - stop_connectivity_watchers(exec_ctx, p); - - /* Save new subchannels. The switch over will happen in - * pf_connectivity_changed_locked */ - if (p->updating_selected || p->updating_subchannels) { - p->num_new_subchannels = num_new_subchannels; - p->new_subchannels = new_subchannels; - } else { /* nothing is updating. Get things moving from here */ - p->num_subchannels = num_new_subchannels; - p->subchannels = new_subchannels; - p->new_subchannels = NULL; - p->num_new_subchannels = 0; - if (p->started_picking) { - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - } + // If we've started picking, start trying to connect to the first + // subchannel in the new list. + if (p->started_picking) { + grpc_lb_subchannel_list_ref_for_connectivity_watch( + subchannel_list, "connectivity_watch+update"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &subchannel_list->subchannels[0]); } } static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - pick_first_lb_policy *p = (pick_first_lb_policy *)arg; - grpc_subchannel *selected_subchannel; - pending_pick *pp; - + grpc_lb_subchannel_data *sd = (grpc_lb_subchannel_data *)arg; + pick_first_lb_policy *p = (pick_first_lb_policy *)sd->subchannel_list->policy; if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log( - GPR_DEBUG, - "Pick First %p connectivity changed. Updating selected: %d; Updating " - "subchannels: %d; Checking %lu index (%lu total); State: %d; ", - (void *)p, p->updating_selected, p->updating_subchannels, - (unsigned long)p->checking_subchannel, - (unsigned long)p->num_subchannels, p->checking_connectivity); + gpr_log(GPR_DEBUG, + "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR + " of %" PRIuPTR + "), subchannel_list %p: state=%s p->shutdown=%d " + "sd->subchannel_list->shutting_down=%d error=%s", + (void *)p, (void *)sd->subchannel, + sd->subchannel_list->checking_subchannel, + sd->subchannel_list->num_subchannels, (void *)sd->subchannel_list, + grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), + p->shutdown, sd->subchannel_list->shutting_down, + grpc_error_string(error)); } - bool restart = false; - if (p->updating_selected && error != GRPC_ERROR_NONE) { - /* Captured the unsubscription for p->selected */ - GPR_ASSERT(p->selected != NULL); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, - "pf_update_connectivity"); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p", - (void *)p, (void *)p->selected); - } - p->updating_selected = false; - if (p->num_new_subchannels == 0) { - p->selected = NULL; - return; - } - restart = true; - } - if (p->updating_subchannels && error != GRPC_ERROR_NONE) { - /* Captured the unsubscription for the checking subchannel */ - GPR_ASSERT(p->selected == NULL); - for (size_t i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], - "pf_update_connectivity"); - if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p, - (void *)p->subchannels[i]); - } - } - gpr_free(p->subchannels); - p->subchannels = NULL; - p->num_subchannels = 0; - p->updating_subchannels = false; - if (p->num_new_subchannels == 0) return; - restart = true; - } - if (restart) { - p->selected = NULL; - p->selected_key = NULL; - GPR_ASSERT(p->new_subchannels != NULL); - GPR_ASSERT(p->num_new_subchannels > 0); - p->num_subchannels = p->num_new_subchannels; - p->subchannels = p->new_subchannels; - p->num_new_subchannels = 0; - p->new_subchannels = NULL; - if (p->started_picking) { - /* If we were picking, continue to do so over the new subchannels, - * starting from the 0th index. */ - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - /* reuses the weak ref from start_picking_locked */ - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - } - if (p->pending_update_args != NULL) { - const grpc_lb_policy_args *args = p->pending_update_args; - p->pending_update_args = NULL; - pf_update_locked(exec_ctx, &p->base, args); - } + // If the policy is shutting down, unref and return. + if (p->shutdown) { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_shutdown"); return; } - GRPC_ERROR_REF(error); - if (p->shutdown) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); - GRPC_ERROR_UNREF(error); + // If the subchannel list is shutting down, stop watching. + if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_sl_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_sl_shutdown"); return; - } else if (p->selected != NULL) { - if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* if the selected channel goes bad, we're done */ - p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN; - } - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - p->checking_connectivity, GRPC_ERROR_REF(error), - "selected_changed"); - if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) { - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + } + // If we're still here, the notification must be for a subchannel in + // either the current or latest pending subchannel lists. + GPR_ASSERT(sd->subchannel_list == p->subchannel_list || + sd->subchannel_list == p->latest_pending_subchannel_list); + // Update state. + sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; + // Handle updates for the currently selected subchannel. + if (p->selected == sd) { + // If the new state is anything other than READY and there is a + // pending update, switch to the pending update. + if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && + p->latest_pending_subchannel_list != NULL) { + p->selected = NULL; + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update"); + p->subchannel_list = p->latest_pending_subchannel_list; + p->latest_pending_subchannel_list = NULL; + grpc_connectivity_state_set( + exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); } else { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + /* if the selected channel goes bad, we're done */ + sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN; + } + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + sd->curr_connectivity_state, + GRPC_ERROR_REF(error), "selected_changed"); + if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) { + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + } else { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_selected_shutdown"); + shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error)); + } } - } else { - loop: - switch (p->checking_connectivity) { - case GRPC_CHANNEL_INIT: - GPR_UNREACHABLE_CODE(return ); - case GRPC_CHANNEL_READY: + return; + } + // If we get here, there are two possible cases: + // 1. We do not currently have a selected subchannel, and the update is + // for a subchannel in p->subchannel_list that we're trying to + // connect to. The goal here is to find a subchannel that we can + // select. + // 2. We do currently have a selected subchannel, and the update is + // for a subchannel in p->latest_pending_subchannel_list. The + // goal here is to find a subchannel from the update that we can + // select in place of the current one. + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE || + sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + } + while (true) { + switch (sd->curr_connectivity_state) { + case GRPC_CHANNEL_READY: { + // Case 2. Promote p->latest_pending_subchannel_list to + // p->subchannel_list. + if (sd->subchannel_list == p->latest_pending_subchannel_list) { + GPR_ASSERT(p->subchannel_list != NULL); + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "finish_update"); + p->subchannel_list = p->latest_pending_subchannel_list; + p->latest_pending_subchannel_list = NULL; + } + // Cases 1 and 2. grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - selected_subchannel = p->subchannels[p->checking_subchannel]; - p->selected = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected_subchannel), - "picked_first"); - + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "connected"); + p->selected = sd; if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, - "Pick First %p selected subchannel %p (connected %p)", - (void *)p, (void *)selected_subchannel, (void *)p->selected); + gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void *)p, + (void *)sd->subchannel); } - p->selected_key = grpc_subchannel_get_key(selected_subchannel); - /* drop the pick list: we are connected now */ - GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); - destroy_subchannels_locked(exec_ctx, p); - /* update any calls that were waiting for a pick */ + // Drop all other subchannels, since we are now connected. + destroy_unselected_subchannels_locked(exec_ctx, p); + // Update any calls that were waiting for a pick. + pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( + p->selected->connected_subchannel, "picked"); if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", @@ -593,71 +484,86 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - p->checking_subchannel = - (p->checking_subchannel + 1) % p->num_subchannels; - if (p->checking_subchannel == 0) { - /* only trigger transient failure when we've tried all alternatives - */ + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + return; + } + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == NULL); + // Case 1: Only set state to TRANSIENT_FAILURE if we've tried + // all subchannels. + if (sd->subchannel_list->checking_subchannel == 0 && + sd->subchannel_list == p->subchannel_list) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } + sd->curr_connectivity_state = + grpc_subchannel_check_connectivity(sd->subchannel, &error); GRPC_ERROR_UNREF(error); - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel], &error); - if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - } else { - goto loop; + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + return; } - break; + break; // Go back to top of loop. + } case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, - GRPC_ERROR_REF(error), "connecting_changed"); - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - break; - case GRPC_CHANNEL_SHUTDOWN: - p->num_subchannels--; - GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], - p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], - "pick_first"); - if (p->num_subchannels == 0) { + case GRPC_CHANNEL_IDLE: { + // Only update connectivity state in case 1. + if (sd->subchannel_list == p->subchannel_list) { grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick first exhausted channels", &error, 1), - "no_more_channels"); - fail_pending_picks_for_shutdown(exec_ctx, p); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, - "pick_first_connectivity"); - } else { + exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_REF(error), "connecting_changed"); + } + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + return; + } + case GRPC_CHANNEL_SHUTDOWN: { + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "pf_candidate_shutdown"); + // Advance to next subchannel and check its state. + grpc_lb_subchannel_data *original_sd = sd; + do { + sd->subchannel_list->checking_subchannel = + (sd->subchannel_list->checking_subchannel + 1) % + sd->subchannel_list->num_subchannels; + sd = &sd->subchannel_list + ->subchannels[sd->subchannel_list->checking_subchannel]; + } while (sd->subchannel == NULL && sd != original_sd); + if (sd == original_sd) { + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "pf_candidate_shutdown"); + shutdown_locked(exec_ctx, p, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick first exhausted channels", &error, 1)); + return; + } + if (sd->subchannel_list == p->subchannel_list) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "subchannel_failed"); - p->checking_subchannel %= p->num_subchannels; - GRPC_ERROR_UNREF(error); - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel], &error); - goto loop; } + sd->curr_connectivity_state = + grpc_subchannel_check_connectivity(sd->subchannel, &error); + GRPC_ERROR_UNREF(error); + if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // Reuses the connectivity refs from the previous watch. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); + return; + } + // For any other state, go back to top of loop. + // We will reuse the connectivity refs from the previous watch. + } } } - - GRPC_ERROR_UNREF(error); } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { @@ -687,8 +593,6 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); grpc_subchannel_index_ref(); - GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p, - grpc_combiner_scheduler(args->combiner)); return &p->base; } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index de163f6300..8f29c80130 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -28,6 +28,7 @@ #include <grpc/support/alloc.h> +#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" @@ -64,12 +65,11 @@ typedef struct pending_pick { grpc_closure *on_complete; } pending_pick; -typedef struct rr_subchannel_list rr_subchannel_list; typedef struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; - rr_subchannel_list *subchannel_list; + grpc_lb_subchannel_list *subchannel_list; /** have we started picking? */ bool started_picking; @@ -89,157 +89,9 @@ typedef struct round_robin_lb_policy { * lists if they equal \a latest_pending_subchannel_list. In other words, * racing callbacks that reference outdated subchannel lists won't perform any * update. */ - rr_subchannel_list *latest_pending_subchannel_list; + grpc_lb_subchannel_list *latest_pending_subchannel_list; } round_robin_lb_policy; -typedef struct { - /** backpointer to owning subchannel list */ - rr_subchannel_list *subchannel_list; - /** subchannel itself */ - grpc_subchannel *subchannel; - /** notification that connectivity has changed on subchannel */ - grpc_closure connectivity_changed_closure; - /** last observed connectivity. Not updated by - * \a grpc_subchannel_notify_on_state_change. Used to determine the previous - * state while processing the new state in \a rr_connectivity_changed */ - grpc_connectivity_state prev_connectivity_state; - /** current connectivity state. Updated by \a - * grpc_subchannel_notify_on_state_change */ - grpc_connectivity_state curr_connectivity_state; - /** connectivity state to be updated by the watcher, not guarded by - * the combiner. Will be moved to curr_connectivity_state inside of - * the combiner by rr_connectivity_changed_locked(). */ - grpc_connectivity_state pending_connectivity_state_unsafe; - /** the subchannel's target user data */ - void *user_data; - /** vtable to operate over \a user_data */ - const grpc_lb_user_data_vtable *user_data_vtable; -} subchannel_data; - -struct rr_subchannel_list { - /** backpointer to owning policy */ - round_robin_lb_policy *policy; - - /** all our subchannels */ - size_t num_subchannels; - subchannel_data *subchannels; - - /** how many subchannels are in state READY */ - size_t num_ready; - /** how many subchannels are in state TRANSIENT_FAILURE */ - size_t num_transient_failures; - /** how many subchannels are in state SHUTDOWN */ - size_t num_shutdown; - /** how many subchannels are in state IDLE */ - size_t num_idle; - - /** There will be one ref for each entry in subchannels for which there is a - * pending connectivity state watcher callback. */ - gpr_refcount refcount; - - /** Is this list shutting down? This may be true due to the shutdown of the - * policy itself or because a newer update has arrived while this one hadn't - * finished processing. */ - bool shutting_down; -}; - -static rr_subchannel_list *rr_subchannel_list_create(round_robin_lb_policy *p, - size_t num_subchannels) { - rr_subchannel_list *subchannel_list = - (rr_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list)); - subchannel_list->policy = p; - subchannel_list->subchannels = - (subchannel_data *)gpr_zalloc(sizeof(subchannel_data) * num_subchannels); - subchannel_list->num_subchannels = num_subchannels; - gpr_ref_init(&subchannel_list->refcount, 1); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "[RR %p] Created subchannel list %p for %lu subchannels", - (void *)p, (void *)subchannel_list, (unsigned long)num_subchannels); - } - return subchannel_list; -} - -static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx, - rr_subchannel_list *subchannel_list) { - GPR_ASSERT(subchannel_list->shutting_down); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "[RR %p] Destroying subchannel_list %p", - (void *)subchannel_list->policy, (void *)subchannel_list); - } - for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { - subchannel_data *sd = &subchannel_list->subchannels[i]; - if (sd->subchannel != NULL) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, - "rr_subchannel_list_destroy"); - } - sd->subchannel = NULL; - if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); - sd->user_data = NULL; - } - } - gpr_free(subchannel_list->subchannels); - gpr_free(subchannel_list); -} - -static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list, - const char *reason) { - gpr_ref_non_zero(&subchannel_list->refcount); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu (%s)", - (void *)subchannel_list->policy, (void *)subchannel_list, - (unsigned long)(count - 1), (unsigned long)count, reason); - } -} - -static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, - rr_subchannel_list *subchannel_list, - const char *reason) { - const bool done = gpr_unref(&subchannel_list->refcount); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu (%s)", - (void *)subchannel_list->policy, (void *)subchannel_list, - (unsigned long)(count + 1), (unsigned long)count, reason); - } - if (done) { - rr_subchannel_list_destroy(exec_ctx, subchannel_list); - } -} - -/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The - * watcher's callback will ultimately unref \a subchannel_list. */ -static void rr_subchannel_list_shutdown_and_unref( - grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list, - const char *reason) { - GPR_ASSERT(!subchannel_list->shutting_down); - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] Shutting down subchannel_list %p (%s)", - (void *)subchannel_list->policy, (void *)subchannel_list, reason); - } - GPR_ASSERT(!subchannel_list->shutting_down); - subchannel_list->shutting_down = true; - for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { - subchannel_data *sd = &subchannel_list->subchannels[i]; - if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe. - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log( - GPR_DEBUG, - "[RR %p] Unsubscribing from subchannel %p as part of shutting down " - "subchannel_list %p", - (void *)subchannel_list->policy, (void *)sd->subchannel, - (void *)subchannel_list); - } - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, - NULL, - &sd->connectivity_changed_closure); - } - } - rr_subchannel_list_unref(exec_ctx, subchannel_list, reason); -} - /** Returns the index into p->subchannel_list->subchannels of the next * subchannel in READY state, or p->subchannel_list->num_subchannels if no * subchannel is READY. @@ -299,8 +151,8 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", (void *)p, (unsigned long)last_ready_index, (void *)p->subchannel_list->subchannels[last_ready_index].subchannel, - (void *)grpc_subchannel_get_connected_subchannel( - p->subchannel_list->subchannels[last_ready_index].subchannel)); + (void *)p->subchannel_list->subchannels[last_ready_index] + .connected_subchannel); } } @@ -310,47 +162,47 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p", (void *)pol, (void *)pol); } + GPR_ASSERT(p->subchannel_list == NULL); + GPR_ASSERT(p->latest_pending_subchannel_list == NULL); grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); grpc_subchannel_index_unref(); gpr_free(p); } -static void fail_pending_picks_for_shutdown(grpc_exec_ctx *exec_ctx, - round_robin_lb_policy *p) { +static void shutdown_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p, + grpc_error *error) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p); + } + p->shutdown = true; pending_pick *pp; while ((pp = p->pending_picks) != NULL) { p->pending_picks = pp->next; *pp->target = NULL; - GRPC_CLOSURE_SCHED( - exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error)); gpr_free(pp); } + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "rr_shutdown"); + if (p->subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_shutdown"); + p->subchannel_list = NULL; + } + if (p->latest_pending_subchannel_list != NULL) { + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, + "sl_shutdown_pending_rr_shutdown"); + p->latest_pending_subchannel_list = NULL; + } + GRPC_ERROR_UNREF(error); } static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p", - (void *)pol, (void *)pol); - } - p->shutdown = true; - fail_pending_picks_for_shutdown(exec_ctx, p); - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); - const bool latest_is_current = - p->subchannel_list == p->latest_pending_subchannel_list; - rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, - "sl_shutdown_rr_shutdown"); - p->subchannel_list = NULL; - if (!latest_is_current && p->latest_pending_subchannel_list != NULL && - !p->latest_pending_subchannel_list->shutting_down) { - rr_subchannel_list_shutdown_and_unref(exec_ctx, - p->latest_pending_subchannel_list, - "sl_shutdown_pending_rr_shutdown"); - p->latest_pending_subchannel_list = NULL; - } + shutdown_locked(exec_ctx, p, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); } static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -405,13 +257,10 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { p->started_picking = true; for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { - subchannel_data *sd = &p->subchannel_list->subchannels[i]; - GRPC_LB_POLICY_WEAK_REF(&p->base, "start_picking_locked"); - rr_subchannel_list_ref(sd->subchannel_list, "started_picking"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->pending_connectivity_state_unsafe, - &sd->connectivity_changed_closure); + grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, + "connectivity_watch"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &p->subchannel_list->subchannels[i]); } } @@ -436,10 +285,10 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); if (next_ready_index < p->subchannel_list->num_subchannels) { /* readily available, report right away */ - subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index]; - *target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(sd->subchannel), - "rr_picked"); + grpc_lb_subchannel_data *sd = + &p->subchannel_list->subchannels[next_ready_index]; + *target = + GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked"); if (user_data != NULL) { *user_data = sd->user_data; } @@ -470,8 +319,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, return 0; } -static void update_state_counters_locked(subchannel_data *sd) { - rr_subchannel_list *subchannel_list = sd->subchannel_list; +static void update_state_counters_locked(grpc_lb_subchannel_data *sd) { + grpc_lb_subchannel_list *subchannel_list = sd->subchannel_list; if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { GPR_ASSERT(subchannel_list->num_ready > 0); --subchannel_list->num_ready; @@ -485,6 +334,7 @@ static void update_state_counters_locked(subchannel_data *sd) { GPR_ASSERT(subchannel_list->num_idle > 0); --subchannel_list->num_idle; } + sd->prev_connectivity_state = sd->curr_connectivity_state; if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { ++subchannel_list->num_ready; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { @@ -497,12 +347,12 @@ static void update_state_counters_locked(subchannel_data *sd) { } /** Sets the policy's connectivity status based on that of the passed-in \a sd - * (the subchannel_data associted with the updated subchannel) and the + * (the grpc_lb_subchannel_data associted with the updated subchannel) and the * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the * connectivity status set. */ static grpc_connectivity_state update_lb_connectivity_status_locked( - grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, grpc_error *error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * @@ -524,8 +374,8 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( * CHECK: p->num_idle == p->subchannel_list->num_subchannels. */ grpc_connectivity_state new_state = sd->curr_connectivity_state; - rr_subchannel_list *subchannel_list = sd->subchannel_list; - round_robin_lb_policy *p = subchannel_list->policy; + grpc_lb_subchannel_list *subchannel_list = sd->subchannel_list; + round_robin_lb_policy *p = (round_robin_lb_policy *)subchannel_list->policy; if (subchannel_list->num_ready > 0) { /* 1) READY */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); @@ -561,8 +411,9 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - subchannel_data *sd = (subchannel_data *)arg; - round_robin_lb_policy *p = sd->subchannel_list->policy; + grpc_lb_subchannel_data *sd = (grpc_lb_subchannel_data *)arg; + round_robin_lb_policy *p = + (round_robin_lb_policy *)sd->subchannel_list->policy; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log( GPR_DEBUG, @@ -577,65 +428,50 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } // If the policy is shutting down, unref and return. if (p->shutdown) { - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, - "pol_shutdown+started_picking"); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); - return; - } - if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) { - // the subchannel list associated with sd has been discarded. This callback - // corresponds to the unsubscription. The unrefs correspond to the picking - // ref (start_picking_locked or update_started_picking). - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, - "sl_shutdown+started_picking"); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown+picking"); + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "rr_shutdown"); return; } - // Dispose of outdated subchannel lists. - if (sd->subchannel_list != p->subchannel_list && - sd->subchannel_list != p->latest_pending_subchannel_list) { - const char *reason = NULL; - if (sd->subchannel_list->shutting_down) { - reason = "sl_outdated_straggler"; - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, reason); - } else { - reason = "sl_outdated"; - rr_subchannel_list_shutdown_and_unref(exec_ctx, sd->subchannel_list, - reason); - } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, reason); + // If the subchannel list is shutting down, stop watching. + if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_sl_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "rr_sl_shutdown"); return; } + // If we're still here, the notification must be for a subchannel in + // either the current or latest pending subchannel lists. + GPR_ASSERT(sd->subchannel_list == p->subchannel_list || + sd->subchannel_list == p->latest_pending_subchannel_list); // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; // Update state counters and determine new overall state. update_state_counters_locked(sd); - sd->prev_connectivity_state = sd->curr_connectivity_state; const grpc_connectivity_state new_policy_connectivity_state = update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new // policy's state is SHUTDOWN, clean up. if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); - sd->subchannel = NULL; - if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); - sd->user_data = NULL; - } + grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd); + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "rr_connectivity_shutdown"); + grpc_lb_subchannel_list_unref_for_connectivity_watch( + exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown"); if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - // The policy is shutting down. Fail all of the pending picks. - fail_pending_picks_for_shutdown(exec_ctx, p); + shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error)); } - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, - "sd_shutdown+started_picking"); - // unref the "rr_connectivity_update" weak ref from start_picking. - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, - "rr_connectivity_sd_shutdown"); } else { // sd not in SHUTDOWN if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + if (sd->connected_subchannel == NULL) { + sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(sd->subchannel), + "connected"); + } if (sd->subchannel_list != p->subchannel_list) { // promote sd->subchannel_list to p->subchannel_list. // sd->subchannel_list must be equal to @@ -656,8 +492,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } if (p->subchannel_list != NULL) { // dispose of the current subchannel_list - rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, - "sl_phase_out_shutdown"); + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "sl_phase_out_shutdown"); } p->subchannel_list = p->latest_pending_subchannel_list; p->latest_pending_subchannel_list = NULL; @@ -667,7 +503,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); - subchannel_data *selected = + grpc_lb_subchannel_data *selected = &p->subchannel_list->subchannels[next_ready_index]; if (p->pending_picks != NULL) { // if the selected subchannel is going to be used for the pending @@ -678,8 +514,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected->subchannel), - "rr_picked"); + selected->connected_subchannel, "rr_picked"); if (pp->user_data != NULL) { *pp->user_data = selected->user_data; } @@ -694,12 +529,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(pp); } } - /* renew notification: reuses the "rr_connectivity_update" weak ref on the - * policy as well as the sd->subchannel_list ref. */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->pending_connectivity_state_unsafe, - &sd->connectivity_changed_closure); + // Renew notification. + grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd); } } @@ -723,13 +554,12 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, round_robin_lb_policy *p = (round_robin_lb_policy *)pol; const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); if (next_ready_index < p->subchannel_list->num_subchannels) { - subchannel_data *selected = + grpc_lb_subchannel_data *selected = &p->subchannel_list->subchannels[next_ready_index]; grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected->subchannel), - "rr_picked"); + selected->connected_subchannel, "rr_ping"); grpc_connected_subchannel_ping(exec_ctx, target, closure); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_ping"); } else { GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); @@ -742,130 +572,68 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", p); + // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. + // Otherwise, keep using the current subchannel list (ignore this update). if (p->subchannel_list == NULL) { - // If we don't have a current subchannel list, go into TRANSIENT FAILURE. grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), "rr_update_missing"); - } else { - // otherwise, keep using the current subchannel list (ignore this update). - gpr_log(GPR_ERROR, - "[RR %p] No valid LB addresses channel arg for update, ignoring.", - (void *)p); } return; } grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; - rr_subchannel_list *subchannel_list = - rr_subchannel_list_create(p, addresses->num_addresses); - if (addresses->num_addresses == 0) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", p, + addresses->num_addresses); + } + grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create( + exec_ctx, &p->base, &grpc_lb_round_robin_trace, addresses, args, + rr_connectivity_changed_locked); + if (subchannel_list->num_subchannels == 0) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "rr_update_empty"); if (p->subchannel_list != NULL) { - rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, - "sl_shutdown_empty_update"); + grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_empty_update"); } p->subchannel_list = subchannel_list; // empty list return; } - size_t subchannel_index = 0; - if (p->latest_pending_subchannel_list != NULL && p->started_picking) { - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR %p] Shutting down latest pending subchannel list %p, about " - "to be replaced by newer latest %p", - (void *)p, (void *)p->latest_pending_subchannel_list, - (void *)subchannel_list); - } - rr_subchannel_list_shutdown_and_unref( - exec_ctx, p->latest_pending_subchannel_list, "sl_outdated_dont_smash"); - } - p->latest_pending_subchannel_list = subchannel_list; - grpc_subchannel_args sc_args; - /* We need to remove the LB addresses in order to be able to compare the - * subchannel keys of subchannels from a different batch of addresses. */ - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, - GRPC_ARG_LB_ADDRESSES}; - /* Create subchannels for addresses in the update. */ - for (size_t i = 0; i < addresses->num_addresses; i++) { - // If there were any balancer, we would have chosen grpclb policy instead. - GPR_ASSERT(!addresses->addresses[i].is_balancer); - memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - grpc_arg addr_arg = - grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, - 1); - gpr_free(addr_arg.value.string); - sc_args.args = new_args; - grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( - exec_ctx, args->client_channel_factory, &sc_args); - grpc_channel_args_destroy(exec_ctx, new_args); - grpc_error *error; - // Get the connectivity state of the subchannel. Already existing ones may - // be in a state other than INIT. - const grpc_connectivity_state subchannel_connectivity_state = - grpc_subchannel_check_connectivity(subchannel, &error); - if (error != GRPC_ERROR_NONE) { - // The subchannel is in error (e.g. shutting down). Ignore it. - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "new_sc_connectivity_error"); - GRPC_ERROR_UNREF(error); - continue; - } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - char *address_uri = - grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log( - GPR_DEBUG, - "[RR %p] index %lu: Created subchannel %p for address uri %s into " - "subchannel_list %p. Connectivity state %s", - (void *)p, (unsigned long)subchannel_index, (void *)subchannel, - address_uri, (void *)subchannel_list, - grpc_connectivity_state_name(subchannel_connectivity_state)); - gpr_free(address_uri); - } - subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++]; - sd->subchannel_list = subchannel_list; - sd->subchannel = subchannel; - GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, - rr_connectivity_changed_locked, sd, - grpc_combiner_scheduler(args->combiner)); - /* use some sentinel value outside of the range of - * grpc_connectivity_state to signal an undefined previous state. We - * won't be referring to this value again and it'll be overwritten after - * the first call to rr_connectivity_changed_locked */ - sd->prev_connectivity_state = GRPC_CHANNEL_INIT; - sd->curr_connectivity_state = subchannel_connectivity_state; - sd->user_data_vtable = addresses->user_data_vtable; - if (sd->user_data_vtable != NULL) { - sd->user_data = - sd->user_data_vtable->copy(addresses->addresses[i].user_data); + if (p->started_picking) { + if (p->latest_pending_subchannel_list != NULL) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "[RR %p] Shutting down latest pending subchannel list %p, " + "about to be replaced by newer latest %p", + (void *)p, (void *)p->latest_pending_subchannel_list, + (void *)subchannel_list); + } + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, "sl_outdated"); } - if (p->started_picking) { - rr_subchannel_list_ref(sd->subchannel_list, "update_started_picking"); - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity_update"); - /* 2. Watch every new subchannel. A subchannel list becomes active the + p->latest_pending_subchannel_list = subchannel_list; + for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { + /* Watch every new subchannel. A subchannel list becomes active the * moment one of its subchannels is READY. At that moment, we swap * p->subchannel_list for sd->subchannel_list, provided the subchannel * list is still valid (ie, isn't shutting down) */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->pending_connectivity_state_unsafe, - &sd->connectivity_changed_closure); + grpc_lb_subchannel_list_ref_for_connectivity_watch(subchannel_list, + "connectivity_watch"); + grpc_lb_subchannel_data_start_connectivity_watch( + exec_ctx, &subchannel_list->subchannels[i]); } - } - if (!p->started_picking) { + } else { // The policy isn't picking yet. Save the update for later, disposing of // previous version if any. if (p->subchannel_list != NULL) { - rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, - "rr_update_before_started_picking"); + grpc_lb_subchannel_list_shutdown_and_unref( + exec_ctx, p->subchannel_list, "rr_update_before_started_picking"); } p->subchannel_list = subchannel_list; - p->latest_pending_subchannel_list = NULL; } } diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc new file mode 100644 index 0000000000..08ea4f480b --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -0,0 +1,265 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <string.h> + +#include <grpc/support/alloc.h> + +#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/transport/connectivity_state.h" + +void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, + grpc_lb_subchannel_data *sd, + const char *reason) { + if (sd->subchannel != NULL) { + if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { + gpr_log( + GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR + " of %" PRIuPTR " (subchannel %p): unreffing subchannel", + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), + sd->subchannel_list->num_subchannels, sd->subchannel); + } + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, reason); + sd->subchannel = NULL; + if (sd->connected_subchannel != NULL) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, sd->connected_subchannel, + reason); + sd->connected_subchannel = NULL; + } + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + sd->user_data = NULL; + } + } +} + +void grpc_lb_subchannel_data_start_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { + if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): requesting connectivity change notification", + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, + (size_t)(sd - sd->subchannel_list->subchannels), + sd->subchannel_list->num_subchannels, sd->subchannel); + } + sd->connectivity_notification_pending = true; + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, sd->subchannel_list->policy->interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); +} + +void grpc_lb_subchannel_data_stop_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { + if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { + gpr_log( + GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): stopping connectivity watch", + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), + sd->subchannel_list->num_subchannels, sd->subchannel); + } + GPR_ASSERT(sd->connectivity_notification_pending); + sd->connectivity_notification_pending = false; +} + +grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, grpc_tracer_flag *tracer, + const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args, + grpc_iomgr_cb_func connectivity_changed_cb) { + grpc_lb_subchannel_list *subchannel_list = + (grpc_lb_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list)); + if (GRPC_TRACER_ON(*tracer)) { + gpr_log(GPR_DEBUG, + "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", + tracer->name, p, subchannel_list, addresses->num_addresses); + } + subchannel_list->policy = p; + subchannel_list->tracer = tracer; + gpr_ref_init(&subchannel_list->refcount, 1); + subchannel_list->subchannels = (grpc_lb_subchannel_data *)gpr_zalloc( + sizeof(grpc_lb_subchannel_data) * addresses->num_addresses); + // We need to remove the LB addresses in order to be able to compare the + // subchannel keys of subchannels from a different batch of addresses. + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + GRPC_ARG_LB_ADDRESSES}; + // Create a subchannel for each address. + grpc_subchannel_args sc_args; + size_t subchannel_index = 0; + for (size_t i = 0; i < addresses->num_addresses; i++) { + // If there were any balancer, we would have chosen grpclb policy instead. + GPR_ASSERT(!addresses->addresses[i].is_balancer); + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); + grpc_arg addr_arg = + grpc_create_subchannel_address_arg(&addresses->addresses[i].address); + grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( + args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, + 1); + gpr_free(addr_arg.value.string); + sc_args.args = new_args; + grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( + exec_ctx, args->client_channel_factory, &sc_args); + grpc_channel_args_destroy(exec_ctx, new_args); + if (subchannel == NULL) { + // Subchannel could not be created. + if (GRPC_TRACER_ON(*tracer)) { + char *address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_DEBUG, + "[%s %p] could not create subchannel for address uri %s, " + "ignoring", + tracer->name, subchannel_list->policy, address_uri); + gpr_free(address_uri); + } + continue; + } + if (GRPC_TRACER_ON(*tracer)) { + char *address_uri = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR + ": Created subchannel %p for address uri %s", + tracer->name, p, subchannel_list, subchannel_index, subchannel, + address_uri); + gpr_free(address_uri); + } + grpc_lb_subchannel_data *sd = + &subchannel_list->subchannels[subchannel_index++]; + sd->subchannel_list = subchannel_list; + sd->subchannel = subchannel; + GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, + connectivity_changed_cb, sd, + grpc_combiner_scheduler(args->combiner)); + // We assume that the current state is IDLE. If not, we'll get a + // callback telling us that. + sd->prev_connectivity_state = GRPC_CHANNEL_IDLE; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; + sd->pending_connectivity_state_unsafe = GRPC_CHANNEL_IDLE; + sd->user_data_vtable = addresses->user_data_vtable; + if (sd->user_data_vtable != NULL) { + sd->user_data = + sd->user_data_vtable->copy(addresses->addresses[i].user_data); + } + } + subchannel_list->num_subchannels = subchannel_index; + subchannel_list->num_idle = subchannel_index; + return subchannel_list; +} + +static void subchannel_list_destroy(grpc_exec_ctx *exec_ctx, + grpc_lb_subchannel_list *subchannel_list) { + if (GRPC_TRACER_ON(*subchannel_list->tracer)) { + gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p", + subchannel_list->tracer->name, subchannel_list->policy, + subchannel_list); + } + for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { + grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i]; + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, + "subchannel_list_destroy"); + } + gpr_free(subchannel_list->subchannels); + gpr_free(subchannel_list); +} + +void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list, + const char *reason) { + gpr_ref_non_zero(&subchannel_list->refcount); + if (GRPC_TRACER_ON(*subchannel_list->tracer)) { + const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); + gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p REF %lu->%lu (%s)", + subchannel_list->tracer->name, subchannel_list->policy, + subchannel_list, (unsigned long)(count - 1), (unsigned long)count, + reason); + } +} + +void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx, + grpc_lb_subchannel_list *subchannel_list, + const char *reason) { + const bool done = gpr_unref(&subchannel_list->refcount); + if (GRPC_TRACER_ON(*subchannel_list->tracer)) { + const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); + gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)", + subchannel_list->tracer->name, subchannel_list->policy, + subchannel_list, (unsigned long)(count + 1), (unsigned long)count, + reason); + } + if (done) { + subchannel_list_destroy(exec_ctx, subchannel_list); + } +} + +void grpc_lb_subchannel_list_ref_for_connectivity_watch( + grpc_lb_subchannel_list *subchannel_list, const char *reason) { + GRPC_LB_POLICY_WEAK_REF(subchannel_list->policy, reason); + grpc_lb_subchannel_list_ref(subchannel_list, reason); +} + +void grpc_lb_subchannel_list_unref_for_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, + const char *reason) { + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, subchannel_list->policy, reason); + grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason); +} + +static void subchannel_data_cancel_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason) { + if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { + gpr_log( + GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): canceling connectivity watch (%s)", + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), + sd->subchannel_list->num_subchannels, sd->subchannel, reason); + } + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, + &sd->connectivity_changed_closure); +} + +void grpc_lb_subchannel_list_shutdown_and_unref( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, + const char *reason) { + if (GRPC_TRACER_ON(*subchannel_list->tracer)) { + gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)", + subchannel_list->tracer->name, subchannel_list->policy, + subchannel_list, reason); + } + GPR_ASSERT(!subchannel_list->shutting_down); + subchannel_list->shutting_down = true; + for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { + grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i]; + // If there's a pending notification for this subchannel, cancel it; + // the callback is responsible for unreffing the subchannel. + // Otherwise, unref the subchannel directly. + if (sd->connectivity_notification_pending) { + subchannel_data_cancel_connectivity_watch(exec_ctx, sd, reason); + } else if (sd->subchannel != NULL) { + grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, reason); + } + } + grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason); +} diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h new file mode 100644 index 0000000000..9d5984260f --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -0,0 +1,153 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H + +#include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/transport/connectivity_state.h" + +// TODO(roth): This code is intended to be shared between pick_first and +// round_robin. However, the interface needs more work to provide clean +// encapsulation. For example, the structs here have some fields that are +// only used in one of the two (e.g., the state counters in +// grpc_lb_subchannel_list and the prev_connectivity_state field in +// grpc_lb_subchannel_data are only used in round_robin, and the +// checking_subchannel field in grpc_lb_subchannel_list is only used by +// pick_first). Also, there is probably some code duplication between the +// connectivity state notification callback code in both pick_first and +// round_robin that could be refactored and moved here. In a future PR, +// need to clean this up. + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct grpc_lb_subchannel_list grpc_lb_subchannel_list; + +typedef struct { + /** backpointer to owning subchannel list */ + grpc_lb_subchannel_list *subchannel_list; + /** subchannel itself */ + grpc_subchannel *subchannel; + grpc_connected_subchannel *connected_subchannel; + /** Is a connectivity notification pending? */ + bool connectivity_notification_pending; + /** notification that connectivity has changed on subchannel */ + grpc_closure connectivity_changed_closure; + /** previous and current connectivity states. Updated by \a + * \a connectivity_changed_closure based on + * \a pending_connectivity_state_unsafe. */ + grpc_connectivity_state prev_connectivity_state; + grpc_connectivity_state curr_connectivity_state; + /** connectivity state to be updated by + * grpc_subchannel_notify_on_state_change(), not guarded by + * the combiner. To be copied to \a curr_connectivity_state by + * \a connectivity_changed_closure. */ + grpc_connectivity_state pending_connectivity_state_unsafe; + /** the subchannel's target user data */ + void *user_data; + /** vtable to operate over \a user_data */ + const grpc_lb_user_data_vtable *user_data_vtable; +} grpc_lb_subchannel_data; + +/// Unrefs the subchannel contained in sd. +void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, + grpc_lb_subchannel_data *sd, + const char *reason); + +/// Starts watching the connectivity state of the subchannel. +/// The connectivity_changed_cb callback must invoke either +/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call +/// grpc_lb_subchannel_data_start_connectivity_watch(). +void grpc_lb_subchannel_data_start_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd); + +/// Stops watching the connectivity state of the subchannel. +void grpc_lb_subchannel_data_stop_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd); + +struct grpc_lb_subchannel_list { + /** backpointer to owning policy */ + grpc_lb_policy *policy; + + grpc_tracer_flag *tracer; + + /** all our subchannels */ + size_t num_subchannels; + grpc_lb_subchannel_data *subchannels; + + /** Index into subchannels of the one we're currently checking. + * Used when connecting to subchannels serially instead of in parallel. */ + // TODO(roth): When we have time, we can probably make this go away + // and compute the index dynamically by subtracting + // subchannel_list->subchannels from the subchannel_data pointer. + size_t checking_subchannel; + + /** how many subchannels are in state READY */ + size_t num_ready; + /** how many subchannels are in state TRANSIENT_FAILURE */ + size_t num_transient_failures; + /** how many subchannels are in state SHUTDOWN */ + size_t num_shutdown; + /** how many subchannels are in state IDLE */ + size_t num_idle; + + /** There will be one ref for each entry in subchannels for which there is a + * pending connectivity state watcher callback. */ + gpr_refcount refcount; + + /** Is this list shutting down? This may be true due to the shutdown of the + * policy itself or because a newer update has arrived while this one hadn't + * finished processing. */ + bool shutting_down; +}; + +grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, grpc_tracer_flag *tracer, + const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args, + grpc_iomgr_cb_func connectivity_changed_cb); + +void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list, + const char *reason); + +void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx, + grpc_lb_subchannel_list *subchannel_list, + const char *reason); + +/// Takes and releases refs needed for a connectivity notification. +/// This includes a ref to subchannel_list and a weak ref to the LB policy. +void grpc_lb_subchannel_list_ref_for_connectivity_watch( + grpc_lb_subchannel_list *subchannel_list, const char *reason); +void grpc_lb_subchannel_list_unref_for_connectivity_watch( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, + const char *reason); + +/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The +/// connectivity state notification callback will ultimately unref it. +void grpc_lb_subchannel_list_shutdown_and_unref( + grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, + const char *reason); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */ diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 46b29f1fe0..1cd73f3ff4 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -127,8 +127,8 @@ void grpc_connected_subchannel_process_transport_op( grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_subchannel *channel, grpc_error **error); -/** call notify when the connectivity state of a channel changes from *state. - Updates *state with the new state of the channel */ +/** Calls notify when the connectivity state of a channel becomes different + from *state. Updates *state with the new state of the channel. */ void grpc_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h index d5107d467b..7dd348ed5f 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.h +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -32,6 +32,12 @@ struct grpc_chttp2_stream; extern "C" grpc_tracer_flag grpc_flowctl_trace; +namespace grpc { +namespace testing { +class TrickledCHTTP2; // to make this a friend +} // namespace testing +} // namespace grpc + namespace grpc_core { namespace chttp2 { @@ -203,6 +209,7 @@ class TransportFlowControl { } private: + friend class ::grpc::testing::TrickledCHTTP2; double TargetLogBdp(); double SmoothLogBdp(grpc_exec_ctx* exec_ctx, double value); FlowControlAction::Urgency DeltaUrgency(int32_t value, @@ -297,6 +304,7 @@ class StreamFlowControl { } private: + friend class ::grpc::testing::TrickledCHTTP2; TransportFlowControl* const tfc_; const grpc_chttp2_stream* const s_; diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.cc b/src/core/lib/security/credentials/ssl/ssl_credentials.cc index 8e47aebedb..2085e2b8e7 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.cc +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.cc @@ -119,6 +119,12 @@ grpc_channel_credentials *grpc_ssl_credentials_create( // SSL Server Credentials. // +struct grpc_ssl_server_credentials_options { + grpc_ssl_client_certificate_request_type client_certificate_request; + grpc_ssl_server_certificate_config *certificate_config; + grpc_ssl_server_certificate_config_fetcher *certificate_config_fetcher; +}; + static void ssl_server_destruct(grpc_exec_ctx *exec_ctx, grpc_server_credentials *creds) { grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds; @@ -130,9 +136,7 @@ static void ssl_server_destruct(grpc_exec_ctx *exec_ctx, static grpc_security_status ssl_server_create_security_connector( grpc_exec_ctx *exec_ctx, grpc_server_credentials *creds, grpc_server_security_connector **sc) { - grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds; - return grpc_ssl_server_security_connector_create(exec_ctx, creds, &c->config, - sc); + return grpc_ssl_server_security_connector_create(exec_ctx, creds, sc); } static grpc_server_credentials_vtable ssl_server_vtable = { @@ -170,6 +174,86 @@ static void ssl_build_server_config( config->num_key_cert_pairs = num_key_cert_pairs; } +grpc_ssl_server_certificate_config *grpc_ssl_server_certificate_config_create( + const char *pem_root_certs, + const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, + size_t num_key_cert_pairs) { + grpc_ssl_server_certificate_config *config = + (grpc_ssl_server_certificate_config *)gpr_zalloc( + sizeof(grpc_ssl_server_certificate_config)); + if (pem_root_certs != NULL) { + config->pem_root_certs = gpr_strdup(pem_root_certs); + } + if (num_key_cert_pairs > 0) { + GPR_ASSERT(pem_key_cert_pairs != NULL); + config->pem_key_cert_pairs = (grpc_ssl_pem_key_cert_pair *)gpr_zalloc( + num_key_cert_pairs * sizeof(grpc_ssl_pem_key_cert_pair)); + } + config->num_key_cert_pairs = num_key_cert_pairs; + for (size_t i = 0; i < num_key_cert_pairs; i++) { + GPR_ASSERT(pem_key_cert_pairs[i].private_key != NULL); + GPR_ASSERT(pem_key_cert_pairs[i].cert_chain != NULL); + config->pem_key_cert_pairs[i].cert_chain = + gpr_strdup(pem_key_cert_pairs[i].cert_chain); + config->pem_key_cert_pairs[i].private_key = + gpr_strdup(pem_key_cert_pairs[i].private_key); + } + return config; +} + +void grpc_ssl_server_certificate_config_destroy( + grpc_ssl_server_certificate_config *config) { + if (config == NULL) return; + for (size_t i = 0; i < config->num_key_cert_pairs; i++) { + gpr_free((void *)config->pem_key_cert_pairs[i].private_key); + gpr_free((void *)config->pem_key_cert_pairs[i].cert_chain); + } + gpr_free(config->pem_key_cert_pairs); + gpr_free(config->pem_root_certs); + gpr_free(config); +} + +grpc_ssl_server_credentials_options * +grpc_ssl_server_credentials_create_options_using_config( + grpc_ssl_client_certificate_request_type client_certificate_request, + grpc_ssl_server_certificate_config *config) { + grpc_ssl_server_credentials_options *options = NULL; + if (config == NULL) { + gpr_log(GPR_ERROR, "Certificate config must not be NULL."); + goto done; + } + options = (grpc_ssl_server_credentials_options *)gpr_zalloc( + sizeof(grpc_ssl_server_credentials_options)); + options->client_certificate_request = client_certificate_request; + options->certificate_config = config; +done: + return options; +} + +grpc_ssl_server_credentials_options * +grpc_ssl_server_credentials_create_options_using_config_fetcher( + grpc_ssl_client_certificate_request_type client_certificate_request, + grpc_ssl_server_certificate_config_callback cb, void *user_data) { + if (cb == NULL) { + gpr_log(GPR_ERROR, "Invalid certificate config callback parameter."); + return NULL; + } + + grpc_ssl_server_certificate_config_fetcher *fetcher = + (grpc_ssl_server_certificate_config_fetcher *)gpr_zalloc( + sizeof(grpc_ssl_server_certificate_config_fetcher)); + fetcher->cb = cb; + fetcher->user_data = user_data; + + grpc_ssl_server_credentials_options *options = + (grpc_ssl_server_credentials_options *)gpr_zalloc( + sizeof(grpc_ssl_server_credentials_options)); + options->client_certificate_request = client_certificate_request; + options->certificate_config_fetcher = fetcher; + + return options; +} + grpc_server_credentials *grpc_ssl_server_credentials_create( const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, int force_client_auth, void *reserved) { @@ -186,8 +270,6 @@ grpc_server_credentials *grpc_ssl_server_credentials_create_ex( size_t num_key_cert_pairs, grpc_ssl_client_certificate_request_type client_certificate_request, void *reserved) { - grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)gpr_zalloc( - sizeof(grpc_ssl_server_credentials)); GRPC_API_TRACE( "grpc_ssl_server_credentials_create_ex(" "pem_root_certs=%s, pem_key_cert_pairs=%p, num_key_cert_pairs=%lu, " @@ -195,11 +277,67 @@ grpc_server_credentials *grpc_ssl_server_credentials_create_ex( 5, (pem_root_certs, pem_key_cert_pairs, (unsigned long)num_key_cert_pairs, client_certificate_request, reserved)); GPR_ASSERT(reserved == NULL); + + grpc_ssl_server_certificate_config *cert_config = + grpc_ssl_server_certificate_config_create( + pem_root_certs, pem_key_cert_pairs, num_key_cert_pairs); + grpc_ssl_server_credentials_options *options = + grpc_ssl_server_credentials_create_options_using_config( + client_certificate_request, cert_config); + + return grpc_ssl_server_credentials_create_with_options(options); +} + +grpc_server_credentials *grpc_ssl_server_credentials_create_with_options( + grpc_ssl_server_credentials_options *options) { + grpc_server_credentials *retval = NULL; + grpc_ssl_server_credentials *c = NULL; + + if (options == NULL) { + gpr_log(GPR_ERROR, + "Invalid options trying to create SSL server credentials."); + goto done; + } + + if (options->certificate_config == NULL && + options->certificate_config_fetcher == NULL) { + gpr_log(GPR_ERROR, + "SSL server credentials options must specify either " + "certificate config or fetcher."); + goto done; + } else if (options->certificate_config_fetcher != NULL && + options->certificate_config_fetcher->cb == NULL) { + gpr_log(GPR_ERROR, "Certificate config fetcher callback must not be NULL."); + goto done; + } + + c = (grpc_ssl_server_credentials *)gpr_zalloc( + sizeof(grpc_ssl_server_credentials)); c->base.type = GRPC_CHANNEL_CREDENTIALS_TYPE_SSL; gpr_ref_init(&c->base.refcount, 1); c->base.vtable = &ssl_server_vtable; - ssl_build_server_config(pem_root_certs, pem_key_cert_pairs, - num_key_cert_pairs, client_certificate_request, - &c->config); - return &c->base; + + if (options->certificate_config_fetcher != NULL) { + c->config.client_certificate_request = options->client_certificate_request; + c->certificate_config_fetcher = *options->certificate_config_fetcher; + } else { + ssl_build_server_config(options->certificate_config->pem_root_certs, + options->certificate_config->pem_key_cert_pairs, + options->certificate_config->num_key_cert_pairs, + options->client_certificate_request, &c->config); + } + + retval = &c->base; + +done: + grpc_ssl_server_credentials_options_destroy(options); + return retval; +} + +void grpc_ssl_server_credentials_options_destroy( + grpc_ssl_server_credentials_options *o) { + if (o == NULL) return; + gpr_free(o->certificate_config_fetcher); + grpc_ssl_server_certificate_config_destroy(o->certificate_config); + gpr_free(o); } diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.h b/src/core/lib/security/credentials/ssl/ssl_credentials.h index 42e425d9f1..5542484aae 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.h +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.h @@ -29,9 +29,21 @@ typedef struct { grpc_ssl_config config; } grpc_ssl_credentials; +struct grpc_ssl_server_certificate_config { + grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs; + size_t num_key_cert_pairs; + char *pem_root_certs; +}; + +typedef struct { + grpc_ssl_server_certificate_config_callback cb; + void *user_data; +} grpc_ssl_server_certificate_config_fetcher; + typedef struct { grpc_server_credentials base; grpc_ssl_server_config config; + grpc_ssl_server_certificate_config_fetcher certificate_config_fetcher; } grpc_ssl_server_credentials; tsi_ssl_pem_key_cert_pair *grpc_convert_grpc_to_tsi_cert_pairs( diff --git a/src/core/lib/security/transport/security_connector.cc b/src/core/lib/security/transport/security_connector.cc index b050be2129..06160d0caa 100644 --- a/src/core/lib/security/transport/security_connector.cc +++ b/src/core/lib/security/transport/security_connector.cc @@ -34,6 +34,7 @@ #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/credentials/fake/fake_credentials.h" +#include "src/core/lib/security/credentials/ssl/ssl_credentials.h" #include "src/core/lib/security/transport/lb_targets_info.h" #include "src/core/lib/security/transport/secure_endpoint.h" #include "src/core/lib/security/transport/security_handshaker.h" @@ -277,6 +278,30 @@ grpc_security_connector *grpc_security_connector_find_in_args( return NULL; } +static tsi_client_certificate_request_type +get_tsi_client_certificate_request_type( + grpc_ssl_client_certificate_request_type grpc_request_type) { + switch (grpc_request_type) { + case GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE: + return TSI_DONT_REQUEST_CLIENT_CERTIFICATE; + + case GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY: + return TSI_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY; + + case GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY: + return TSI_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY; + + case GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY: + return TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY; + + case GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY: + return TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY; + + default: + return TSI_DONT_REQUEST_CLIENT_CERTIFICATE; + } +} + /* -- Fake implementation. -- */ typedef struct { @@ -533,6 +558,15 @@ typedef struct { tsi_ssl_server_handshaker_factory *server_handshaker_factory; } grpc_ssl_server_security_connector; +static bool server_connector_has_cert_config_fetcher( + grpc_ssl_server_security_connector *c) { + GPR_ASSERT(c != NULL); + grpc_ssl_server_credentials *server_creds = + (grpc_ssl_server_credentials *)c->base.server_creds; + GPR_ASSERT(server_creds != NULL); + return server_creds->certificate_config_fetcher.cb != NULL; +} + static void ssl_channel_destroy(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc) { grpc_ssl_channel_security_connector *c = @@ -573,7 +607,6 @@ static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx, tsi_result_to_string(result)); return; } - // Create handshakers. grpc_handshake_manager_add( handshake_mgr, @@ -581,12 +614,102 @@ static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx, exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base)); } +static const char **fill_alpn_protocol_strings(size_t *num_alpn_protocols) { + GPR_ASSERT(num_alpn_protocols != NULL); + *num_alpn_protocols = grpc_chttp2_num_alpn_versions(); + const char **alpn_protocol_strings = + (const char **)gpr_malloc(sizeof(const char *) * (*num_alpn_protocols)); + for (size_t i = 0; i < *num_alpn_protocols; i++) { + alpn_protocol_strings[i] = grpc_chttp2_get_alpn_version_index(i); + } + return alpn_protocol_strings; +} + +/* Attempts to replace the server_handshaker_factory with a new factory using + * the provided grpc_ssl_server_certificate_config. Should new factory creation + * fail, the existing factory will not be replaced. Returns true on success (new + * factory created). */ +static bool try_replace_server_handshaker_factory( + grpc_ssl_server_security_connector *sc, + const grpc_ssl_server_certificate_config *config) { + if (config == NULL) { + gpr_log(GPR_ERROR, + "Server certificate config callback returned invalid (NULL) " + "config."); + return false; + } + gpr_log(GPR_DEBUG, "Using new server certificate config (%p).", config); + + size_t num_alpn_protocols = 0; + const char **alpn_protocol_strings = + fill_alpn_protocol_strings(&num_alpn_protocols); + tsi_ssl_pem_key_cert_pair *cert_pairs = grpc_convert_grpc_to_tsi_cert_pairs( + config->pem_key_cert_pairs, config->num_key_cert_pairs); + tsi_ssl_server_handshaker_factory *new_handshaker_factory = NULL; + grpc_ssl_server_credentials *server_creds = + (grpc_ssl_server_credentials *)sc->base.server_creds; + tsi_result result = tsi_create_ssl_server_handshaker_factory_ex( + cert_pairs, config->num_key_cert_pairs, config->pem_root_certs, + get_tsi_client_certificate_request_type( + server_creds->config.client_certificate_request), + ssl_cipher_suites(), alpn_protocol_strings, (uint16_t)num_alpn_protocols, + &new_handshaker_factory); + gpr_free(cert_pairs); + gpr_free((void *)alpn_protocol_strings); + + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.", + tsi_result_to_string(result)); + return false; + } + tsi_ssl_server_handshaker_factory_unref(sc->server_handshaker_factory); + sc->server_handshaker_factory = new_handshaker_factory; + return true; +} + +/* Attempts to fetch the server certificate config if a callback is available. + * Current certificate config will continue to be used if the callback returns + * an error. Returns true if new credentials were sucessfully loaded. */ +static bool try_fetch_ssl_server_credentials( + grpc_ssl_server_security_connector *sc) { + grpc_ssl_server_certificate_config *certificate_config = NULL; + bool status; + + GPR_ASSERT(sc != NULL); + if (!server_connector_has_cert_config_fetcher(sc)) return false; + + grpc_ssl_server_credentials *server_creds = + (grpc_ssl_server_credentials *)sc->base.server_creds; + grpc_ssl_certificate_config_reload_status cb_result = + server_creds->certificate_config_fetcher.cb( + server_creds->certificate_config_fetcher.user_data, + &certificate_config); + if (cb_result == GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_UNCHANGED) { + gpr_log(GPR_DEBUG, "No change in SSL server credentials."); + status = false; + } else if (cb_result == GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_NEW) { + status = try_replace_server_handshaker_factory(sc, certificate_config); + } else { + // Log error, continue using previously-loaded credentials. + gpr_log(GPR_ERROR, + "Failed fetching new server credentials, continuing to " + "use previously-loaded credentials."); + status = false; + } + + if (certificate_config != NULL) { + grpc_ssl_server_certificate_config_destroy(certificate_config); + } + return status; +} + static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, grpc_handshake_manager *handshake_mgr) { grpc_ssl_server_security_connector *c = (grpc_ssl_server_security_connector *)sc; // Instantiate TSI handshaker. + try_fetch_ssl_server_credentials(c); tsi_handshaker *tsi_hs = NULL; tsi_result result = tsi_ssl_server_handshaker_factory_create_handshaker( c->server_handshaker_factory, &tsi_hs); @@ -595,7 +718,6 @@ static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx, tsi_result_to_string(result)); return; } - // Create handshakers. grpc_handshake_manager_add( handshake_mgr, @@ -857,31 +979,6 @@ grpc_slice grpc_get_default_ssl_roots_for_testing(void) { return compute_default_pem_root_certs_once(); } -static tsi_client_certificate_request_type -get_tsi_client_certificate_request_type( - grpc_ssl_client_certificate_request_type grpc_request_type) { - switch (grpc_request_type) { - case GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE: - return TSI_DONT_REQUEST_CLIENT_CERTIFICATE; - - case GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY: - return TSI_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY; - - case GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY: - return TSI_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY; - - case GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY: - return TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY; - - case GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY: - return TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY; - - default: - // Is this a sane default - return TSI_DONT_REQUEST_CLIENT_CERTIFICATE; - } -} - const char *grpc_get_default_ssl_roots(void) { /* TODO(jboeuf@google.com): Maybe revisit the approach which consists in loading all the roots once for the lifetime of the process. */ @@ -897,18 +994,14 @@ grpc_security_status grpc_ssl_channel_security_connector_create( grpc_call_credentials *request_metadata_creds, const grpc_ssl_config *config, const char *target_name, const char *overridden_target_name, grpc_channel_security_connector **sc) { - size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions(); + size_t num_alpn_protocols = 0; const char **alpn_protocol_strings = - (const char **)gpr_malloc(sizeof(const char *) * num_alpn_protocols); + fill_alpn_protocol_strings(&num_alpn_protocols); tsi_result result = TSI_OK; grpc_ssl_channel_security_connector *c; - size_t i; const char *pem_root_certs; char *port; bool has_key_cert_pair; - for (i = 0; i < num_alpn_protocols; i++) { - alpn_protocol_strings[i] = grpc_chttp2_get_alpn_version_index(i); - } if (config == NULL || target_name == NULL) { gpr_log(GPR_ERROR, "An ssl channel needs a config and a target name."); @@ -965,50 +1058,64 @@ error: return GRPC_SECURITY_ERROR; } +static grpc_ssl_server_security_connector * +grpc_ssl_server_security_connector_initialize( + grpc_server_credentials *server_creds) { + grpc_ssl_server_security_connector *c = + (grpc_ssl_server_security_connector *)gpr_zalloc( + sizeof(grpc_ssl_server_security_connector)); + gpr_ref_init(&c->base.base.refcount, 1); + c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; + c->base.base.vtable = &ssl_server_vtable; + c->base.add_handshakers = ssl_server_add_handshakers; + c->base.server_creds = grpc_server_credentials_ref(server_creds); + return c; +} + grpc_security_status grpc_ssl_server_security_connector_create( - grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds, - const grpc_ssl_server_config *config, grpc_server_security_connector **sc) { - size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions(); - const char **alpn_protocol_strings = - (const char **)gpr_malloc(sizeof(const char *) * num_alpn_protocols); + grpc_exec_ctx *exec_ctx, grpc_server_credentials *gsc, + grpc_server_security_connector **sc) { tsi_result result = TSI_OK; - grpc_ssl_server_security_connector *c; - size_t i; + grpc_ssl_server_credentials *server_credentials = + (grpc_ssl_server_credentials *)gsc; + grpc_security_status retval = GRPC_SECURITY_OK; - for (i = 0; i < num_alpn_protocols; i++) { - alpn_protocol_strings[i] = grpc_chttp2_get_alpn_version_index(i); - } + GPR_ASSERT(server_credentials != NULL); + GPR_ASSERT(sc != NULL); - if (config == NULL || config->num_key_cert_pairs == 0) { - gpr_log(GPR_ERROR, "An SSL server needs a key and a cert."); - goto error; + grpc_ssl_server_security_connector *c = + grpc_ssl_server_security_connector_initialize(gsc); + if (server_connector_has_cert_config_fetcher(c)) { + // Load initial credentials from certificate_config_fetcher: + if (!try_fetch_ssl_server_credentials(c)) { + gpr_log(GPR_ERROR, "Failed loading SSL server credentials from fetcher."); + retval = GRPC_SECURITY_ERROR; + } + } else { + size_t num_alpn_protocols = 0; + const char **alpn_protocol_strings = + fill_alpn_protocol_strings(&num_alpn_protocols); + result = tsi_create_ssl_server_handshaker_factory_ex( + server_credentials->config.pem_key_cert_pairs, + server_credentials->config.num_key_cert_pairs, + server_credentials->config.pem_root_certs, + get_tsi_client_certificate_request_type( + server_credentials->config.client_certificate_request), + ssl_cipher_suites(), alpn_protocol_strings, + (uint16_t)num_alpn_protocols, &c->server_handshaker_factory); + gpr_free((void *)alpn_protocol_strings); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.", + tsi_result_to_string(result)); + retval = GRPC_SECURITY_ERROR; + } } - c = (grpc_ssl_server_security_connector *)gpr_zalloc( - sizeof(grpc_ssl_server_security_connector)); - gpr_ref_init(&c->base.base.refcount, 1); - c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; - c->base.base.vtable = &ssl_server_vtable; - c->base.server_creds = grpc_server_credentials_ref(server_creds); - result = tsi_create_ssl_server_handshaker_factory_ex( - config->pem_key_cert_pairs, config->num_key_cert_pairs, - config->pem_root_certs, get_tsi_client_certificate_request_type( - config->client_certificate_request), - ssl_cipher_suites(), alpn_protocol_strings, (uint16_t)num_alpn_protocols, - &c->server_handshaker_factory); - if (result != TSI_OK) { - gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.", - tsi_result_to_string(result)); - ssl_server_destroy(exec_ctx, &c->base.base); - *sc = NULL; - goto error; + if (retval == GRPC_SECURITY_OK) { + *sc = &c->base; + } else { + if (c != NULL) ssl_server_destroy(exec_ctx, &c->base.base); + if (sc != NULL) *sc = NULL; } - c->base.add_handshakers = ssl_server_add_handshakers; - *sc = &c->base; - gpr_free((void *)alpn_protocol_strings); - return GRPC_SECURITY_OK; - -error: - gpr_free((void *)alpn_protocol_strings); - return GRPC_SECURITY_ERROR; + return retval; } diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h index 8287151f44..54a563bb2c 100644 --- a/src/core/lib/security/transport/security_connector.h +++ b/src/core/lib/security/transport/security_connector.h @@ -248,8 +248,8 @@ typedef struct { specific error code otherwise. */ grpc_security_status grpc_ssl_server_security_connector_create( - grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds, - const grpc_ssl_server_config *config, grpc_server_security_connector **sc); + grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_credentials, + grpc_server_security_connector **sc); /* Util. */ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer, diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index f328a6cdbb..652c26cf0a 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -29,8 +29,6 @@ grpc_tracer_flag grpc_connectivity_state_trace = const char *grpc_connectivity_state_name(grpc_connectivity_state state) { switch (state) { - case GRPC_CHANNEL_INIT: - return "INIT"; case GRPC_CHANNEL_IDLE: return "IDLE"; case GRPC_CHANNEL_CONNECTING: @@ -174,7 +172,6 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_name(state), reason, error, error_string); } switch (state) { - case GRPC_CHANNEL_INIT: case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_READY: diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 19a25c838f..9df531066e 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -48,187 +48,13 @@ namespace grpc { -namespace { -int kConnectivityCheckIntervalMsec = 500; -void WatchStateChange(void* arg); - -class TagSaver final : public CompletionQueueTag { - public: - explicit TagSaver(void* tag) : tag_(tag) {} - ~TagSaver() override {} - bool FinalizeResult(void** tag, bool* status) override { - *tag = tag_; - delete this; - return true; - } - - private: - void* tag_; -}; - -// Constantly watches channel connectivity status to reconnect a transiently -// disconnected channel. This is a temporary work-around before we have retry -// support. -class ChannelConnectivityWatcher : private GrpcLibraryCodegen { - public: - static void StartWatching(grpc_channel* channel) { - if (!IsDisabled()) { - std::unique_lock<std::mutex> lock(g_watcher_mu_); - if (g_watcher_ == nullptr) { - g_watcher_ = new ChannelConnectivityWatcher(); - } - g_watcher_->StartWatchingLocked(channel); - } - } - - static void StopWatching() { - if (!IsDisabled()) { - std::unique_lock<std::mutex> lock(g_watcher_mu_); - if (g_watcher_->StopWatchingLocked()) { - delete g_watcher_; - g_watcher_ = nullptr; - } - } - } - - private: - ChannelConnectivityWatcher() : channel_count_(0), shutdown_(false) { - gpr_ref_init(&ref_, 0); - gpr_thd_options options = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&options); - gpr_thd_new(&thd_id_, &WatchStateChange, this, &options); - } - - static bool IsDisabled() { - char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); - bool disabled = gpr_is_true(env); - gpr_free(env); - return disabled; - } - - void WatchStateChangeImpl() { - bool ok = false; - void* tag = NULL; - CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT; - while (true) { - { - std::unique_lock<std::mutex> lock(shutdown_mu_); - if (shutdown_) { - // Drain cq_ if the watcher is shutting down - status = cq_.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)); - } else { - status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); - // Make sure we've seen 2 TIMEOUTs before going to sleep - if (status == CompletionQueue::TIMEOUT) { - status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME)); - if (status == CompletionQueue::TIMEOUT) { - shutdown_cv_.wait_for(lock, std::chrono::milliseconds( - kConnectivityCheckIntervalMsec)); - continue; - } - } - } - } - ChannelState* channel_state = static_cast<ChannelState*>(tag); - channel_state->state = - grpc_channel_check_connectivity_state(channel_state->channel, false); - if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) { - void* shutdown_tag = NULL; - channel_state->shutdown_cq.Next(&shutdown_tag, &ok); - delete channel_state; - if (gpr_unref(&ref_)) { - break; - } - } else { - TagSaver* tag_saver = new TagSaver(channel_state); - grpc_channel_watch_connectivity_state( - channel_state->channel, channel_state->state, - gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver); - } - } - } - - void StartWatchingLocked(grpc_channel* channel) { - if (thd_id_ != 0) { - gpr_ref(&ref_); - ++channel_count_; - ChannelState* channel_state = new ChannelState(channel); - // The first grpc_channel_watch_connectivity_state() is not used to - // monitor the channel state change, but to hold a reference of the - // c channel. So that WatchStateChangeImpl() can observe state == - // GRPC_CHANNEL_SHUTDOWN before the channel gets destroyed. - grpc_channel_watch_connectivity_state( - channel_state->channel, channel_state->state, - gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(), - new TagSaver(nullptr)); - grpc_channel_watch_connectivity_state( - channel_state->channel, channel_state->state, - gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), - new TagSaver(channel_state)); - } - } - - bool StopWatchingLocked() { - if (--channel_count_ == 0) { - { - std::unique_lock<std::mutex> lock(shutdown_mu_); - shutdown_ = true; - shutdown_cv_.notify_one(); - } - gpr_thd_join(thd_id_); - return true; - } - return false; - } - - friend void WatchStateChange(void* arg); - struct ChannelState { - explicit ChannelState(grpc_channel* channel) - : channel(channel), state(GRPC_CHANNEL_IDLE){}; - grpc_channel* channel; - grpc_connectivity_state state; - CompletionQueue shutdown_cq; - }; - gpr_thd_id thd_id_; - CompletionQueue cq_; - gpr_refcount ref_; - int channel_count_; - - std::mutex shutdown_mu_; - std::condition_variable shutdown_cv_; // protected by shutdown_mu_ - bool shutdown_; // protected by shutdown_mu_ - - static std::mutex g_watcher_mu_; - static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_ -}; - -std::mutex ChannelConnectivityWatcher::g_watcher_mu_; -ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr; - -void WatchStateChange(void* arg) { - ChannelConnectivityWatcher* watcher = - static_cast<ChannelConnectivityWatcher*>(arg); - watcher->WatchStateChangeImpl(); -} -} // namespace - static internal::GrpcLibraryInitializer g_gli_initializer; Channel::Channel(const grpc::string& host, grpc_channel* channel) : host_(host), c_channel_(channel) { g_gli_initializer.summon(); - if (grpc_channel_support_connectivity_watcher(channel)) { - ChannelConnectivityWatcher::StartWatching(channel); - } } -Channel::~Channel() { - const bool stop_watching = - grpc_channel_support_connectivity_watcher(c_channel_); - grpc_channel_destroy(c_channel_); - if (stop_watching) { - ChannelConnectivityWatcher::StopWatching(); - } -} +Channel::~Channel() { grpc_channel_destroy(c_channel_); } namespace { @@ -259,8 +85,9 @@ grpc::string Channel::GetServiceConfigJSON() const { &channel_info.service_config_json); } -Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) { +internal::Call Channel::CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) { const bool kRegistered = method.channel_tag() && context->authority().empty(); grpc_call* c_call = NULL; if (kRegistered) { @@ -292,10 +119,11 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, } grpc_census_call_set_context(c_call, context->census_context()); context->set_call(c_call, shared_from_this()); - return Call(c_call, this, cq); + return internal::Call(c_call, this, cq); } -void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { +void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; @@ -313,6 +141,24 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { return grpc_channel_check_connectivity_state(c_channel_, try_to_connect); } +namespace { + +class TagSaver final : public internal::CompletionQueueTag { + public: + explicit TagSaver(void* tag) : tag_(tag) {} + ~TagSaver() override {} + bool FinalizeResult(void** tag, bool* status) override { + *tag = tag_; + delete this; + return true; + } + + private: + void* tag_; +}; + +} // namespace + void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, CompletionQueue* cq, void* tag) { diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 693b8bea56..fc18ce9093 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -27,8 +27,9 @@ std::unique_ptr<GenericClientAsyncReaderWriter> CallInternal( ChannelInterface* channel, ClientContext* context, const grpc::string& method, CompletionQueue* cq, bool start, void* tag) { return std::unique_ptr<GenericClientAsyncReaderWriter>( - GenericClientAsyncReaderWriter::Create( - channel, cq, RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), + internal::ClientAsyncReaderWriterFactory<ByteBuffer, ByteBuffer>::Create( + channel, cq, internal::RpcMethod(method.c_str(), + internal::RpcMethod::BIDI_STREAMING), context, start, tag)); } @@ -52,8 +53,9 @@ std::unique_ptr<GenericClientAsyncResponseReader> GenericStub::PrepareUnaryCall( ClientContext* context, const grpc::string& method, const ByteBuffer& request, CompletionQueue* cq) { return std::unique_ptr<GenericClientAsyncResponseReader>( - GenericClientAsyncResponseReader::Create( - channel_.get(), cq, RpcMethod(method.c_str(), RpcMethod::NORMAL_RPC), + internal::ClientAsyncResponseReaderFactory<ByteBuffer>::Create( + channel_.get(), cq, + internal::RpcMethod(method.c_str(), internal::RpcMethod::NORMAL_RPC), context, request, false)); } diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 4a2e2be688..eb6dc8cc5f 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -60,7 +60,7 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( case GRPC_QUEUE_SHUTDOWN: return SHUTDOWN; case GRPC_OP_COMPLETE: - auto cq_tag = static_cast<CompletionQueueTag*>(ev.tag); + auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag); *ok = ev.success != 0; *tag = cq_tag; if (cq_tag->FinalizeResult(tag, ok)) { @@ -87,7 +87,7 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { flushed_ = true; if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag, &res)) { - auto cq_tag = static_cast<CompletionQueueTag*>(res_tag); + auto cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag); *ok = res == 1; if (cq_tag->FinalizeResult(tag, ok)) { return true; diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index d2cba6d662..10dbd3c39a 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -37,11 +37,12 @@ const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( DefaultHealthCheckService* service) : service_(service), method_(nullptr) { - MethodHandler* handler = - new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>( + internal::MethodHandler* handler = + new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, + ByteBuffer>( std::mem_fn(&HealthCheckServiceImpl::Check), this); - method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC, - handler); + method_ = new internal::RpcServiceMethod( + kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler); AddMethod(method_); } diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index 09d5cebe98..99d6680c50 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -41,7 +41,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { private: const DefaultHealthCheckService* const service_; - RpcServiceMethod* method_; + internal::RpcServiceMethod* method_; }; DefaultHealthCheckService(); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index d982a3d2b7..6480482774 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -90,7 +90,8 @@ class Server::UnimplementedAsyncRequest final ServerCompletionQueue* const cq_; }; -typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> +typedef internal::SneakyCallOpSet<internal::CallOpSendInitialMetadata, + internal::CallOpServerSendStatus> UnimplementedAsyncResponseOp; class Server::UnimplementedAsyncResponse final : public UnimplementedAsyncResponseOp { @@ -108,12 +109,12 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; -class ShutdownTag : public CompletionQueueTag { +class ShutdownTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { return false; } }; -class DummyTag : public CompletionQueueTag { +class DummyTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { *status = true; @@ -121,15 +122,15 @@ class DummyTag : public CompletionQueueTag { } }; -class Server::SyncRequest final : public CompletionQueueTag { +class Server::SyncRequest final : public internal::CompletionQueueTag { public: - SyncRequest(RpcServiceMethod* method, void* tag) + SyncRequest(internal::RpcServiceMethod* method, void* tag) : method_(method), tag_(tag), in_flight_(false), - has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || - method->method_type() == - RpcMethod::SERVER_STREAMING), + has_request_payload_( + method->method_type() == internal::RpcMethod::NORMAL_RPC || + method->method_type() == internal::RpcMethod::SERVER_STREAMING), call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); @@ -212,14 +213,14 @@ class Server::SyncRequest final : public CompletionQueueTag { void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) { ctx_.BeginCompletionOp(&call_); global_callbacks->PreSynchronousRequest(&ctx_); - method_->handler()->RunHandler( - MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_)); + method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( + &call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; cq_.Shutdown(); - CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); /* Ensure the cq_ is shutdown */ @@ -229,15 +230,15 @@ class Server::SyncRequest final : public CompletionQueueTag { private: CompletionQueue cq_; - Call call_; + internal::Call call_; ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; - RpcServiceMethod* const method_; + internal::RpcServiceMethod* const method_; }; private: - RpcServiceMethod* const method_; + internal::RpcServiceMethod* const method_; void* const tag_; bool in_flight_; const bool has_request_payload_; @@ -311,14 +312,15 @@ class Server::SyncRequestThreadManager : public ThreadManager { // object } - void AddSyncMethod(RpcServiceMethod* method, void* tag) { + void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) { sync_requests_.emplace_back(new SyncRequest(method, tag)); } void AddUnknownSyncMethod() { if (!sync_requests_.empty()) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + unknown_method_.reset(new internal::RpcServiceMethod( + "unknown", internal::RpcMethod::BIDI_STREAMING, + new internal::UnknownMethodHandler)); sync_requests_.emplace_back( new SyncRequest(unknown_method_.get(), nullptr)); } @@ -355,8 +357,8 @@ class Server::SyncRequestThreadManager : public ThreadManager { CompletionQueue* server_cq_; int cq_timeout_msec_; std::vector<std::unique_ptr<SyncRequest>> sync_requests_; - std::unique_ptr<RpcServiceMethod> unknown_method_; - std::unique_ptr<RpcServiceMethod> health_check_; + std::unique_ptr<internal::RpcServiceMethod> unknown_method_; + std::unique_ptr<internal::RpcServiceMethod> health_check_; std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; }; @@ -439,13 +441,13 @@ std::shared_ptr<Channel> Server::InProcessChannel( } static grpc_server_register_method_payload_handling PayloadHandlingForMethod( - RpcServiceMethod* method) { + internal::RpcServiceMethod* method) { switch (method->method_type()) { - case RpcMethod::NORMAL_RPC: - case RpcMethod::SERVER_STREAMING: + case internal::RpcMethod::NORMAL_RPC: + case internal::RpcMethod::SERVER_STREAMING: return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; - case RpcMethod::CLIENT_STREAMING: - case RpcMethod::BIDI_STREAMING: + case internal::RpcMethod::CLIENT_STREAMING: + case internal::RpcMethod::BIDI_STREAMING: return GRPC_SRM_PAYLOAD_NONE; } GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); @@ -466,7 +468,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { continue; } - RpcServiceMethod* method = it->get(); + internal::RpcServiceMethod* method = it->get(); void* tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method), 0); @@ -606,7 +608,8 @@ void Server::Wait() { } } -void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { +void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; @@ -622,8 +625,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { ServerInterface::BaseAsyncRequest::BaseAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, - bool delete_on_finalize) + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + void* tag, bool delete_on_finalize) : server_(server), context_(context), stream_(stream), @@ -645,7 +648,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } context_->set_call(call_); context_->cq_ = call_cq_; - Call call(call_, server_, call_cq_, server_->max_receive_message_size()); + internal::Call call(call_, server_, call_cq_, + server_->max_receive_message_size()); if (*status && call_) { context_->BeginCompletionOp(&call); } @@ -660,7 +664,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + void* tag) : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( @@ -675,7 +680,7 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest( ServerInterface::GenericAsyncRequest::GenericAsyncRequest( ServerInterface* server, GenericServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) : BaseAsyncRequest(server, context, stream, call_cq, tag, delete_on_finalize) { @@ -718,7 +723,7 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { Status status(StatusCode::UNIMPLEMENTED, ""); - UnknownMethodHandler::FillOps(request_->context(), this); + internal::UnknownMethodHandler::FillOps(request_->context(), this); request_->stream()->call_.PerformOps(this); } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index d7876a000b..f2cb6363f5 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -37,7 +37,7 @@ namespace grpc { // CompletionOp -class ServerContext::CompletionOp final : public CallOpSetInterface { +class ServerContext::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq CompletionOp() @@ -146,7 +146,7 @@ ServerContext::~ServerContext() { } } -void ServerContext::BeginCompletionOp(Call* call) { +void ServerContext::BeginCompletionOp(internal::Call* call) { GPR_ASSERT(!completion_op_); completion_op_ = new CompletionOp(); if (has_notify_when_done_tag_) { @@ -155,8 +155,8 @@ void ServerContext::BeginCompletionOp(Call* call) { call->PerformOps(completion_op_); } -CompletionQueueTag* ServerContext::GetCompletionOpTag() { - return static_cast<CompletionQueueTag*>(completion_op_); +internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() { + return static_cast<internal::CompletionQueueTag*>(completion_op_); } void ServerContext::AddInitialMetadata(const grpc::string& key, diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 140f4ceee1..fbe65ba693 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -254,6 +254,7 @@ CORE_SOURCE_FILES = [ 'src/core/tsi/transport_security_adapter.cc', 'src/core/ext/transport/chttp2/server/chttp2_server.cc', 'src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc', + 'src/core/ext/filters/client_channel/backup_poller.cc', 'src/core/ext/filters/client_channel/channel_connectivity.cc', 'src/core/ext/filters/client_channel/client_channel.cc', 'src/core/ext/filters/client_channel/client_channel_factory.cc', @@ -293,6 +294,7 @@ CORE_SOURCE_FILES = [ 'third_party/nanopb/pb_encode.c', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', + 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index cd1bd98abc..843e05ab66 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -155,8 +155,14 @@ grpc_google_iam_credentials_create_type grpc_google_iam_credentials_create_impor grpc_metadata_credentials_create_from_plugin_type grpc_metadata_credentials_create_from_plugin_import; grpc_secure_channel_create_type grpc_secure_channel_create_import; grpc_server_credentials_release_type grpc_server_credentials_release_import; +grpc_ssl_server_certificate_config_create_type grpc_ssl_server_certificate_config_create_import; +grpc_ssl_server_certificate_config_destroy_type grpc_ssl_server_certificate_config_destroy_import; grpc_ssl_server_credentials_create_type grpc_ssl_server_credentials_create_import; grpc_ssl_server_credentials_create_ex_type grpc_ssl_server_credentials_create_ex_import; +grpc_ssl_server_credentials_create_options_using_config_type grpc_ssl_server_credentials_create_options_using_config_import; +grpc_ssl_server_credentials_create_options_using_config_fetcher_type grpc_ssl_server_credentials_create_options_using_config_fetcher_import; +grpc_ssl_server_credentials_options_destroy_type grpc_ssl_server_credentials_options_destroy_import; +grpc_ssl_server_credentials_create_with_options_type grpc_ssl_server_credentials_create_with_options_import; grpc_server_add_secure_http2_port_type grpc_server_add_secure_http2_port_import; grpc_call_set_credentials_type grpc_call_set_credentials_import; grpc_server_credentials_set_auth_metadata_processor_type grpc_server_credentials_set_auth_metadata_processor_import; @@ -465,8 +471,14 @@ void grpc_rb_load_imports(HMODULE library) { grpc_metadata_credentials_create_from_plugin_import = (grpc_metadata_credentials_create_from_plugin_type) GetProcAddress(library, "grpc_metadata_credentials_create_from_plugin"); grpc_secure_channel_create_import = (grpc_secure_channel_create_type) GetProcAddress(library, "grpc_secure_channel_create"); grpc_server_credentials_release_import = (grpc_server_credentials_release_type) GetProcAddress(library, "grpc_server_credentials_release"); + grpc_ssl_server_certificate_config_create_import = (grpc_ssl_server_certificate_config_create_type) GetProcAddress(library, "grpc_ssl_server_certificate_config_create"); + grpc_ssl_server_certificate_config_destroy_import = (grpc_ssl_server_certificate_config_destroy_type) GetProcAddress(library, "grpc_ssl_server_certificate_config_destroy"); grpc_ssl_server_credentials_create_import = (grpc_ssl_server_credentials_create_type) GetProcAddress(library, "grpc_ssl_server_credentials_create"); grpc_ssl_server_credentials_create_ex_import = (grpc_ssl_server_credentials_create_ex_type) GetProcAddress(library, "grpc_ssl_server_credentials_create_ex"); + grpc_ssl_server_credentials_create_options_using_config_import = (grpc_ssl_server_credentials_create_options_using_config_type) GetProcAddress(library, "grpc_ssl_server_credentials_create_options_using_config"); + grpc_ssl_server_credentials_create_options_using_config_fetcher_import = (grpc_ssl_server_credentials_create_options_using_config_fetcher_type) GetProcAddress(library, "grpc_ssl_server_credentials_create_options_using_config_fetcher"); + grpc_ssl_server_credentials_options_destroy_import = (grpc_ssl_server_credentials_options_destroy_type) GetProcAddress(library, "grpc_ssl_server_credentials_options_destroy"); + grpc_ssl_server_credentials_create_with_options_import = (grpc_ssl_server_credentials_create_with_options_type) GetProcAddress(library, "grpc_ssl_server_credentials_create_with_options"); grpc_server_add_secure_http2_port_import = (grpc_server_add_secure_http2_port_type) GetProcAddress(library, "grpc_server_add_secure_http2_port"); grpc_call_set_credentials_import = (grpc_call_set_credentials_type) GetProcAddress(library, "grpc_call_set_credentials"); grpc_server_credentials_set_auth_metadata_processor_import = (grpc_server_credentials_set_auth_metadata_processor_type) GetProcAddress(library, "grpc_server_credentials_set_auth_metadata_processor"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 74b6979fef..6c99744b8f 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -446,12 +446,30 @@ extern grpc_secure_channel_create_type grpc_secure_channel_create_import; typedef void(*grpc_server_credentials_release_type)(grpc_server_credentials *creds); extern grpc_server_credentials_release_type grpc_server_credentials_release_import; #define grpc_server_credentials_release grpc_server_credentials_release_import +typedef grpc_ssl_server_certificate_config *(*grpc_ssl_server_certificate_config_create_type)(const char *pem_root_certs, const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs); +extern grpc_ssl_server_certificate_config_create_type grpc_ssl_server_certificate_config_create_import; +#define grpc_ssl_server_certificate_config_create grpc_ssl_server_certificate_config_create_import +typedef void(*grpc_ssl_server_certificate_config_destroy_type)(grpc_ssl_server_certificate_config *config); +extern grpc_ssl_server_certificate_config_destroy_type grpc_ssl_server_certificate_config_destroy_import; +#define grpc_ssl_server_certificate_config_destroy grpc_ssl_server_certificate_config_destroy_import typedef grpc_server_credentials *(*grpc_ssl_server_credentials_create_type)(const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, int force_client_auth, void *reserved); extern grpc_ssl_server_credentials_create_type grpc_ssl_server_credentials_create_import; #define grpc_ssl_server_credentials_create grpc_ssl_server_credentials_create_import typedef grpc_server_credentials *(*grpc_ssl_server_credentials_create_ex_type)(const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, grpc_ssl_client_certificate_request_type client_certificate_request, void *reserved); extern grpc_ssl_server_credentials_create_ex_type grpc_ssl_server_credentials_create_ex_import; #define grpc_ssl_server_credentials_create_ex grpc_ssl_server_credentials_create_ex_import +typedef grpc_ssl_server_credentials_options *(*grpc_ssl_server_credentials_create_options_using_config_type)(grpc_ssl_client_certificate_request_type client_certificate_request, grpc_ssl_server_certificate_config *certificate_config); +extern grpc_ssl_server_credentials_create_options_using_config_type grpc_ssl_server_credentials_create_options_using_config_import; +#define grpc_ssl_server_credentials_create_options_using_config grpc_ssl_server_credentials_create_options_using_config_import +typedef grpc_ssl_server_credentials_options *(*grpc_ssl_server_credentials_create_options_using_config_fetcher_type)(grpc_ssl_client_certificate_request_type client_certificate_request, grpc_ssl_server_certificate_config_callback cb, void *user_data); +extern grpc_ssl_server_credentials_create_options_using_config_fetcher_type grpc_ssl_server_credentials_create_options_using_config_fetcher_import; +#define grpc_ssl_server_credentials_create_options_using_config_fetcher grpc_ssl_server_credentials_create_options_using_config_fetcher_import +typedef void(*grpc_ssl_server_credentials_options_destroy_type)(grpc_ssl_server_credentials_options *options); +extern grpc_ssl_server_credentials_options_destroy_type grpc_ssl_server_credentials_options_destroy_import; +#define grpc_ssl_server_credentials_options_destroy grpc_ssl_server_credentials_options_destroy_import +typedef grpc_server_credentials *(*grpc_ssl_server_credentials_create_with_options_type)(grpc_ssl_server_credentials_options *options); +extern grpc_ssl_server_credentials_create_with_options_type grpc_ssl_server_credentials_create_with_options_import; +#define grpc_ssl_server_credentials_create_with_options grpc_ssl_server_credentials_create_with_options_import typedef int(*grpc_server_add_secure_http2_port_type)(grpc_server *server, const char *addr, grpc_server_credentials *creds); extern grpc_server_add_secure_http2_port_type grpc_server_add_secure_http2_port_import; #define grpc_server_add_secure_http2_port grpc_server_add_secure_http2_port_import |