aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiler/cpp_generator.cc151
-rw-r--r--src/core/ext/filters/client_channel/backup_poller.cc158
-rw-r--r--src/core/ext/filters/client_channel/backup_poller.h34
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc7
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc704
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc466
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc265
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h153
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.h8
-rw-r--r--src/core/lib/security/credentials/ssl/ssl_credentials.cc156
-rw-r--r--src/core/lib/security/credentials/ssl/ssl_credentials.h12
-rw-r--r--src/core/lib/security/transport/security_connector.cc251
-rw-r--r--src/core/lib/security/transport/security_connector.h4
-rw-r--r--src/core/lib/transport/connectivity_state.cc3
-rw-r--r--src/cpp/client/channel_cc.cc204
-rw-r--r--src/cpp/client/generic_stub.cc10
-rw-r--r--src/cpp/common/completion_queue_cc.cc4
-rw-r--r--src/cpp/server/health/default_health_check_service.cc9
-rw-r--r--src/cpp/server/health/default_health_check_service.h2
-rw-r--r--src/cpp/server/server_cc.cc69
-rw-r--r--src/cpp/server/server_context.cc8
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c12
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h18
26 files changed, 1577 insertions, 1139 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index c2db8eff71..253280bd24 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -140,7 +140,6 @@ grpc::string GetHeaderIncludes(grpc_generator::File *file,
printer->Print(vars, "namespace grpc {\n");
printer->Print(vars, "class CompletionQueue;\n");
printer->Print(vars, "class Channel;\n");
- printer->Print(vars, "class RpcService;\n");
printer->Print(vars, "class ServerCompletionQueue;\n");
printer->Print(vars, "class ServerContext;\n");
printer->Print(vars, "} // namespace grpc\n\n");
@@ -324,7 +323,8 @@ void PrintHeaderClientMethodInterfaces(
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
- "virtual ::grpc::ClientReaderInterface< $Response$>* $Method$Raw("
+ "virtual ::grpc::ClientReaderInterface< $Response$>* "
+ "$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request) = 0;\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
@@ -546,7 +546,8 @@ void PrintHeaderClientMethodData(grpc_generator::Printer *printer,
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Method"] = method->name();
- printer->Print(*vars, "const ::grpc::RpcMethod rpcmethod_$Method$_;\n");
+ printer->Print(*vars,
+ "const ::grpc::internal::RpcMethod rpcmethod_$Method$_;\n");
}
void PrintHeaderServerMethodSync(grpc_generator::Printer *printer,
@@ -718,7 +719,7 @@ void PrintHeaderServerMethodStreamedUnary(
printer->Print(*vars,
"WithStreamedUnaryMethod_$Method$() {\n"
" ::grpc::Service::MarkMethodStreamed($Idx$,\n"
- " new ::grpc::StreamedUnaryHandler< $Request$, "
+ " new ::grpc::internal::StreamedUnaryHandler< $Request$, "
"$Response$>(std::bind"
"(&WithStreamedUnaryMethod_$Method$<BaseClass>::"
"Streamed$Method$, this, std::placeholders::_1, "
@@ -766,15 +767,16 @@ void PrintHeaderServerMethodSplitStreaming(
"{}\n");
printer->Print(" public:\n");
printer->Indent();
- printer->Print(*vars,
- "WithSplitStreamingMethod_$Method$() {\n"
- " ::grpc::Service::MarkMethodStreamed($Idx$,\n"
- " new ::grpc::SplitServerStreamingHandler< $Request$, "
- "$Response$>(std::bind"
- "(&WithSplitStreamingMethod_$Method$<BaseClass>::"
- "Streamed$Method$, this, std::placeholders::_1, "
- "std::placeholders::_2)));\n"
- "}\n");
+ printer->Print(
+ *vars,
+ "WithSplitStreamingMethod_$Method$() {\n"
+ " ::grpc::Service::MarkMethodStreamed($Idx$,\n"
+ " new ::grpc::internal::SplitServerStreamingHandler< $Request$, "
+ "$Response$>(std::bind"
+ "(&WithSplitStreamingMethod_$Method$<BaseClass>::"
+ "Streamed$Method$, this, std::placeholders::_1, "
+ "std::placeholders::_2)));\n"
+ "}\n");
printer->Print(*vars,
"~WithSplitStreamingMethod_$Method$() override {\n"
" BaseClassMustBeDerivedFromService(this);\n"
@@ -914,7 +916,8 @@ void PrintHeaderService(grpc_generator::Printer *printer,
" {\n public:\n");
printer->Indent();
printer->Print(
- "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n");
+ "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& "
+ "channel);\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i).get(), vars, true);
}
@@ -1185,10 +1188,9 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) {\n");
printer->Print(*vars,
- " return ::grpc::BlockingUnaryCall(channel_.get(), "
- "rpcmethod_$Method$_, "
- "context, request, response);\n"
- "}\n\n");
+ " return ::grpc::internal::BlockingUnaryCall"
+ "(channel_.get(), rpcmethod_$Method$_, "
+ "context, request, response);\n}\n\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
@@ -1198,25 +1200,27 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq) {\n");
- printer->Print(*vars,
- " return "
- "::grpc::ClientAsyncResponseReader< $Response$>::Create("
- "channel_.get(), cq, "
- "rpcmethod_$Method$_, "
- "context, request, $AsyncStart$);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return "
+ "::grpc::internal::ClientAsyncResponseReaderFactory< $Response$>"
+ "::Create(channel_.get(), cq, "
+ "rpcmethod_$Method$_, "
+ "context, request, $AsyncStart$);\n"
+ "}\n\n");
}
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
"::grpc::ClientWriter< $Request$>* "
"$ns$$Service$::Stub::$Method$Raw("
"::grpc::ClientContext* context, $Response$* response) {\n");
- printer->Print(*vars,
- " return new ::grpc::ClientWriter< $Request$>("
- "channel_.get(), "
- "rpcmethod_$Method$_, "
- "context, response);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::internal::ClientWriterFactory< $Request$>::Create("
+ "channel_.get(), "
+ "rpcmethod_$Method$_, "
+ "context, response);\n"
+ "}\n\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
@@ -1227,12 +1231,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw("
"::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
- printer->Print(*vars,
- " return ::grpc::ClientAsyncWriter< $Request$>::Create("
- "channel_.get(), cq, "
- "rpcmethod_$Method$_, "
- "context, response, $AsyncStart$$AsyncCreateArgs$);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::internal::ClientAsyncWriterFactory< $Request$>"
+ "::Create(channel_.get(), cq, "
+ "rpcmethod_$Method$_, "
+ "context, response, $AsyncStart$$AsyncCreateArgs$);\n"
+ "}\n\n");
}
} else if (ServerOnlyStreaming(method)) {
printer->Print(
@@ -1240,12 +1245,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientReader< $Response$>* "
"$ns$$Service$::Stub::$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request) {\n");
- printer->Print(*vars,
- " return new ::grpc::ClientReader< $Response$>("
- "channel_.get(), "
- "rpcmethod_$Method$_, "
- "context, request);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::internal::ClientReaderFactory< $Response$>::Create("
+ "channel_.get(), "
+ "rpcmethod_$Method$_, "
+ "context, request);\n"
+ "}\n\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
@@ -1257,12 +1263,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
- printer->Print(*vars,
- " return ::grpc::ClientAsyncReader< $Response$>::Create("
- "channel_.get(), cq, "
- "rpcmethod_$Method$_, "
- "context, request, $AsyncStart$$AsyncCreateArgs$);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::internal::ClientAsyncReaderFactory< $Response$>"
+ "::Create(channel_.get(), cq, "
+ "rpcmethod_$Method$_, "
+ "context, request, $AsyncStart$$AsyncCreateArgs$);\n"
+ "}\n\n");
}
} else if (method->BidiStreaming()) {
printer->Print(
@@ -1270,8 +1277,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientReaderWriter< $Request$, $Response$>* "
"$ns$$Service$::Stub::$Method$Raw(::grpc::ClientContext* context) {\n");
printer->Print(*vars,
- " return new ::grpc::ClientReaderWriter< "
- "$Request$, $Response$>("
+ " return ::grpc::internal::ClientReaderWriterFactory< "
+ "$Request$, $Response$>::Create("
"channel_.get(), "
"rpcmethod_$Method$_, "
"context);\n"
@@ -1286,14 +1293,14 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(::grpc::"
"ClientContext* context, "
"::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
- printer->Print(
- *vars,
- " return "
- "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create("
- "channel_.get(), cq, "
- "rpcmethod_$Method$_, "
- "context, $AsyncStart$$AsyncCreateArgs$);\n"
- "}\n\n");
+ printer->Print(*vars,
+ " return "
+ "::grpc::internal::ClientAsyncReaderWriterFactory< "
+ "$Request$, $Response$>::Create("
+ "channel_.get(), cq, "
+ "rpcmethod_$Method$_, "
+ "context, $AsyncStart$$AsyncCreateArgs$);\n"
+ "}\n\n");
}
}
}
@@ -1404,7 +1411,7 @@ void PrintSourceService(grpc_generator::Printer *printer,
printer->Print(*vars,
", rpcmethod_$Method$_("
"$prefix$$Service$_method_names[$Idx$], "
- "::grpc::RpcMethod::$StreamingType$, "
+ "::grpc::internal::RpcMethod::$StreamingType$, "
"channel"
")\n");
}
@@ -1427,38 +1434,38 @@ void PrintSourceService(grpc_generator::Printer *printer,
if (method->NoStreaming()) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::NORMAL_RPC,\n"
- " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
+ " ::grpc::internal::RpcMethod::NORMAL_RPC,\n"
+ " new ::grpc::internal::RpcMethodHandler< $ns$$Service$::Service, "
"$Request$, "
"$Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ClientOnlyStreaming(method.get())) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::CLIENT_STREAMING,\n"
- " new ::grpc::ClientStreamingHandler< "
+ " ::grpc::internal::RpcMethod::CLIENT_STREAMING,\n"
+ " new ::grpc::internal::ClientStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ServerOnlyStreaming(method.get())) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::SERVER_STREAMING,\n"
- " new ::grpc::ServerStreamingHandler< "
+ " ::grpc::internal::RpcMethod::SERVER_STREAMING,\n"
+ " new ::grpc::internal::ServerStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (method->BidiStreaming()) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::BIDI_STREAMING,\n"
- " new ::grpc::BidiStreamingHandler< "
+ " ::grpc::internal::RpcMethod::BIDI_STREAMING,\n"
+ " new ::grpc::internal::BidiStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
diff --git a/src/core/ext/filters/client_channel/backup_poller.cc b/src/core/ext/filters/client_channel/backup_poller.cc
new file mode 100644
index 0000000000..466bf86bc0
--- /dev/null
+++ b/src/core/ext/filters/client_channel/backup_poller.cc
@@ -0,0 +1,158 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/ext/filters/client_channel/backup_poller.h"
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/completion_queue.h"
+
+#define DEFAULT_POLL_INTERVAL_MS 5000
+
+typedef struct backup_poller {
+ grpc_timer polling_timer;
+ grpc_closure run_poller_closure;
+ grpc_closure shutdown_closure;
+ gpr_mu* pollset_mu;
+ grpc_pollset* pollset; // guarded by pollset_mu
+ bool shutting_down; // guarded by pollset_mu
+ gpr_refcount refs;
+ gpr_refcount shutdown_refs;
+} backup_poller;
+
+static gpr_once g_once = GPR_ONCE_INIT;
+static gpr_mu g_poller_mu;
+static backup_poller* g_poller = NULL; // guarded by g_poller_mu
+// g_poll_interval_ms is set only once at the first time
+// grpc_client_channel_start_backup_polling() is called, after that it is
+// treated as const.
+static int g_poll_interval_ms = DEFAULT_POLL_INTERVAL_MS;
+
+static void init_globals() {
+ gpr_mu_init(&g_poller_mu);
+ char* env = gpr_getenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS");
+ if (env != NULL) {
+ int poll_interval_ms = gpr_parse_nonnegative_int(env);
+ if (poll_interval_ms == -1) {
+ gpr_log(GPR_ERROR,
+ "Invalid GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS: %s, "
+ "default value %d will be used.",
+ env, g_poll_interval_ms);
+ } else {
+ g_poll_interval_ms = poll_interval_ms;
+ }
+ }
+ gpr_free(env);
+}
+
+static void backup_poller_shutdown_unref(grpc_exec_ctx* exec_ctx,
+ backup_poller* p) {
+ if (gpr_unref(&p->shutdown_refs)) {
+ grpc_pollset_destroy(exec_ctx, p->pollset);
+ gpr_free(p->pollset);
+ gpr_free(p);
+ }
+}
+
+static void done_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ backup_poller_shutdown_unref(exec_ctx, (backup_poller*)arg);
+}
+
+static void g_poller_unref(grpc_exec_ctx* exec_ctx) {
+ if (gpr_unref(&g_poller->refs)) {
+ gpr_mu_lock(&g_poller_mu);
+ backup_poller* p = g_poller;
+ g_poller = NULL;
+ gpr_mu_unlock(&g_poller_mu);
+ gpr_mu_lock(p->pollset_mu);
+ p->shutting_down = true;
+ grpc_pollset_shutdown(exec_ctx, p->pollset,
+ GRPC_CLOSURE_INIT(&p->shutdown_closure, done_poller,
+ p, grpc_schedule_on_exec_ctx));
+ gpr_mu_unlock(p->pollset_mu);
+ grpc_timer_cancel(exec_ctx, &p->polling_timer);
+ }
+}
+
+static void run_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ backup_poller* p = (backup_poller*)arg;
+ if (error != GRPC_ERROR_NONE) {
+ if (error != GRPC_ERROR_CANCELLED) {
+ GRPC_LOG_IF_ERROR("run_poller", GRPC_ERROR_REF(error));
+ }
+ backup_poller_shutdown_unref(exec_ctx, p);
+ return;
+ }
+ gpr_mu_lock(p->pollset_mu);
+ if (p->shutting_down) {
+ gpr_mu_unlock(p->pollset_mu);
+ backup_poller_shutdown_unref(exec_ctx, p);
+ return;
+ }
+ grpc_error* err = grpc_pollset_work(exec_ctx, p->pollset, NULL,
+ grpc_exec_ctx_now(exec_ctx));
+ gpr_mu_unlock(p->pollset_mu);
+ GRPC_LOG_IF_ERROR("Run client channel backup poller", err);
+ grpc_timer_init(exec_ctx, &p->polling_timer,
+ grpc_exec_ctx_now(exec_ctx) + g_poll_interval_ms,
+ &p->run_poller_closure);
+}
+
+void grpc_client_channel_start_backup_polling(
+ grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties) {
+ gpr_once_init(&g_once, init_globals);
+ if (g_poll_interval_ms == 0) {
+ return;
+ }
+ gpr_mu_lock(&g_poller_mu);
+ if (g_poller == NULL) {
+ g_poller = (backup_poller*)gpr_zalloc(sizeof(backup_poller));
+ g_poller->pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
+ g_poller->shutting_down = false;
+ grpc_pollset_init(g_poller->pollset, &g_poller->pollset_mu);
+ gpr_ref_init(&g_poller->refs, 0);
+ // one for timer cancellation, one for pollset shutdown
+ gpr_ref_init(&g_poller->shutdown_refs, 2);
+ GRPC_CLOSURE_INIT(&g_poller->run_poller_closure, run_poller, g_poller,
+ grpc_schedule_on_exec_ctx);
+ grpc_timer_init(exec_ctx, &g_poller->polling_timer,
+ grpc_exec_ctx_now(exec_ctx) + g_poll_interval_ms,
+ &g_poller->run_poller_closure);
+ }
+ gpr_ref(&g_poller->refs);
+ gpr_mu_unlock(&g_poller_mu);
+ grpc_pollset_set_add_pollset(exec_ctx, interested_parties, g_poller->pollset);
+}
+
+void grpc_client_channel_stop_backup_polling(
+ grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties) {
+ if (g_poll_interval_ms == 0) {
+ return;
+ }
+ grpc_pollset_set_del_pollset(exec_ctx, interested_parties, g_poller->pollset);
+ g_poller_unref(exec_ctx);
+}
diff --git a/src/core/ext/filters/client_channel/backup_poller.h b/src/core/ext/filters/client_channel/backup_poller.h
new file mode 100644
index 0000000000..e993d50639
--- /dev/null
+++ b/src/core/ext/filters/client_channel/backup_poller.h
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H
+
+#include <grpc/grpc.h>
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+/* Start polling \a interested_parties periodically in the timer thread */
+void grpc_client_channel_start_backup_polling(
+ grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties);
+
+/* Stop polling \a interested_parties */
+void grpc_client_channel_stop_backup_polling(
+ grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties);
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H */
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index ea5e076c3b..00c51ba543 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -31,6 +31,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
@@ -712,6 +713,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
chand->interested_parties = grpc_pollset_set_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
+ grpc_client_channel_start_backup_polling(exec_ctx, chand->interested_parties);
// Record client channel factory.
const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
GRPC_ARG_CLIENT_CHANNEL_FACTORY);
@@ -790,6 +792,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
+ grpc_client_channel_stop_backup_polling(exec_ctx, chand->interested_parties);
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
@@ -898,7 +901,7 @@ static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx,
call_data *calld = (call_data *)elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s",
+ "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
elem->channel_data, calld, calld->waiting_for_pick_batches_count,
grpc_error_string(error));
}
@@ -940,7 +943,7 @@ static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx,
channel_data *chand = (channel_data *)elem->channel_data;
call_data *calld = (call_data *)elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIuPTR
" pending batches to subchannel_call=%p",
chand, calld, calld->waiting_for_pick_batches_count,
calld->subchannel_call);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index d6bdc13ba9..85e76e68b5 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -611,7 +611,6 @@ static void update_lb_connectivity_status_locked(
case GRPC_CHANNEL_SHUTDOWN:
GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
break;
- case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_READY:
@@ -1790,7 +1789,6 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
// embedded RR policy. Note that the current RR policy, if any, will stay in
// effect until an update from the new lb_call is received.
switch (glb_policy->lb_channel_connectivity) {
- case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
/* resub. */
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 56c261ba34..f0c66c68e1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -20,6 +20,7 @@
#include <grpc/support/alloc.h>
+#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
@@ -42,103 +43,73 @@ typedef struct {
/** base policy: must be first */
grpc_lb_policy base;
/** all our subchannels */
- grpc_subchannel **subchannels;
- grpc_subchannel **new_subchannels;
- size_t num_subchannels;
- size_t num_new_subchannels;
-
- grpc_closure connectivity_changed;
-
- /** remaining members are protected by the combiner */
-
- /** the selected channel */
- grpc_connected_subchannel *selected;
-
- /** the subchannel key for \a selected, or NULL if \a selected not set */
- const grpc_subchannel_key *selected_key;
-
+ grpc_lb_subchannel_list *subchannel_list;
+ /** latest pending subchannel list */
+ grpc_lb_subchannel_list *latest_pending_subchannel_list;
+ /** selected subchannel in \a subchannel_list */
+ grpc_lb_subchannel_data *selected;
/** have we started picking? */
bool started_picking;
/** are we shut down? */
bool shutdown;
- /** are we updating the selected subchannel? */
- bool updating_selected;
- /** are we updating the subchannel candidates? */
- bool updating_subchannels;
- /** args from the latest update received while already updating, or NULL */
- grpc_lb_policy_args *pending_update_args;
- /** which subchannel are we watching? */
- size_t checking_subchannel;
- /** what is the connectivity of that channel? */
- grpc_connectivity_state checking_connectivity;
/** list of picks that are waiting on connectivity */
pending_pick *pending_picks;
-
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ GPR_ASSERT(p->subchannel_list == NULL);
+ GPR_ASSERT(p->latest_pending_subchannel_list == NULL);
GPR_ASSERT(p->pending_picks == NULL);
- for (size_t i = 0; i < p->num_subchannels; i++) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy");
- }
- if (p->selected != NULL) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
- "picked_first_destroy");
- }
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
- grpc_subchannel_index_unref();
- if (p->pending_update_args != NULL) {
- grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
- gpr_free(p->pending_update_args);
- }
- gpr_free(p->subchannels);
- gpr_free(p->new_subchannels);
gpr_free(p);
+ grpc_subchannel_index_unref();
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void *)p);
}
}
-static void fail_pending_picks_for_shutdown(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
+static void shutdown_locked(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p,
+ grpc_error *error) {
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p);
+ }
+ p->shutdown = true;
pending_pick *pp;
while ((pp = p->pending_picks) != NULL) {
p->pending_picks = pp->next;
*pp->target = NULL;
- GRPC_CLOSURE_SCHED(
- exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error));
gpr_free(pp);
}
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
+ "shutdown");
+ if (p->subchannel_list != NULL) {
+ grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ "pf_shutdown");
+ p->subchannel_list = NULL;
+ }
+ if (p->latest_pending_subchannel_list != NULL) {
+ grpc_lb_subchannel_list_shutdown_and_unref(
+ exec_ctx, p->latest_pending_subchannel_list, "pf_shutdown");
+ p->latest_pending_subchannel_list = NULL;
+ }
+ GRPC_ERROR_UNREF(error);
}
static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- p->shutdown = true;
- fail_pending_picks_for_shutdown(exec_ctx, p);
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"), "shutdown");
- /* cancel subscription */
- if (p->selected != NULL) {
- grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
- } else if (p->num_subchannels > 0 && p->started_picking) {
- grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
- &p->connectivity_changed);
- }
+ shutdown_locked(exec_ctx, (pick_first_lb_policy *)pol,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"));
}
static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target,
grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- pending_pick *pp;
- pp = p->pending_picks;
+ pending_pick *pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
@@ -162,8 +133,7 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_eq,
grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- pending_pick *pp;
- pp = p->pending_picks;
+ pending_pick *pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
@@ -185,15 +155,12 @@ static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void start_picking_locked(grpc_exec_ctx *exec_ctx,
pick_first_lb_policy *p) {
p->started_picking = true;
- if (p->subchannels != NULL) {
- GPR_ASSERT(p->num_subchannels > 0);
- p->checking_subchannel = 0;
- p->checking_connectivity = GRPC_CHANNEL_IDLE;
- GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
- &p->connectivity_changed);
+ if (p->subchannel_list != NULL && p->subchannel_list->num_subchannels > 0) {
+ p->subchannel_list->checking_subchannel = 0;
+ grpc_lb_subchannel_list_ref_for_connectivity_watch(
+ p->subchannel_list, "connectivity_watch+start_picking");
+ grpc_lb_subchannel_data_start_connectivity_watch(
+ exec_ctx, &p->subchannel_list->subchannels[0]);
}
}
@@ -210,19 +177,17 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- pending_pick *pp;
-
- /* Check atomically for a selected channel */
+ // If we have a selected subchannel already, return synchronously.
if (p->selected != NULL) {
- *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
+ *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel,
+ "picked");
return 1;
}
-
- /* No subchannel selected yet, so try again */
+ // No subchannel selected yet, so handle asynchronously.
if (!p->started_picking) {
start_picking_locked(exec_ctx, p);
}
- pp = (pending_pick *)gpr_malloc(sizeof(*pp));
+ pending_pick *pp = (pending_pick *)gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->target = target;
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
@@ -231,19 +196,15 @@ static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
return 0;
}
-static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
- size_t num_subchannels = p->num_subchannels;
- grpc_subchannel **subchannels = p->subchannels;
-
- p->num_subchannels = 0;
- p->subchannels = NULL;
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
-
- for (size_t i = 0; i < num_subchannels; i++) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
+static void destroy_unselected_subchannels_locked(grpc_exec_ctx *exec_ctx,
+ pick_first_lb_policy *p) {
+ for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
+ grpc_lb_subchannel_data *sd = &p->subchannel_list->subchannels[i];
+ if (p->selected != sd) {
+ grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
+ "selected_different_subchannel");
+ }
}
- gpr_free(subchannels);
}
static grpc_connectivity_state pf_check_connectivity_locked(
@@ -265,46 +226,24 @@ static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
if (p->selected) {
- grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
+ grpc_connected_subchannel_ping(exec_ctx, p->selected->connected_subchannel,
+ closure);
} else {
GRPC_CLOSURE_SCHED(exec_ctx, closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
}
}
-/* unsubscribe all subchannels */
-static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
- if (p->num_subchannels > 0) {
- GPR_ASSERT(p->selected == NULL);
- if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_DEBUG, "Pick First %p unsubscribing from subchannel %p",
- (void *)p, (void *)p->subchannels[p->checking_subchannel]);
- }
- grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
- &p->connectivity_changed);
- p->updating_subchannels = true;
- } else if (p->selected != NULL) {
- if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_DEBUG,
- "Pick First %p unsubscribing from selected subchannel %p",
- (void *)p, (void *)p->selected);
- }
- grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
- p->updating_selected = true;
- }
-}
+static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
-/* true upon success */
static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_args *args) {
pick_first_lb_policy *p = (pick_first_lb_policy *)policy;
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
- if (p->subchannels == NULL) {
+ if (p->subchannel_list == NULL) {
// If we don't have a current subchannel list, go into TRANSIENT FAILURE.
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
@@ -321,270 +260,222 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
const grpc_lb_addresses *addresses =
(const grpc_lb_addresses *)arg->value.pointer.p;
- if (addresses->num_addresses == 0) {
- // Empty update. Unsubscribe from all current subchannels and put the
- // channel in TRANSIENT_FAILURE.
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
+ (void *)p, (unsigned long)addresses->num_addresses);
+ }
+ grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create(
+ exec_ctx, &p->base, &grpc_lb_pick_first_trace, addresses, args,
+ pf_connectivity_changed_locked);
+ if (subchannel_list->num_subchannels == 0) {
+ // Empty update or no valid subchannels. Unsubscribe from all current
+ // subchannels and put the channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty");
- stop_connectivity_watchers(exec_ctx, p);
+ if (p->subchannel_list != NULL) {
+ grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ "sl_shutdown_empty_update");
+ }
+ p->subchannel_list = subchannel_list; // Empty list.
+ p->selected = NULL;
return;
}
- if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
- (void *)p, (unsigned long)addresses->num_addresses);
- }
- grpc_subchannel_args *sc_args = (grpc_subchannel_args *)gpr_zalloc(
- sizeof(*sc_args) * addresses->num_addresses);
- /* We remove the following keys in order for subchannel keys belonging to
- * subchannels point to the same address to match. */
- static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
- GRPC_ARG_LB_ADDRESSES};
- size_t sc_args_count = 0;
-
- /* Create list of subchannel args for new addresses in \a args. */
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- // If there were any balancer, we would have chosen grpclb policy instead.
- GPR_ASSERT(!addresses->addresses[i].is_balancer);
- if (addresses->addresses[i].user_data != NULL) {
- gpr_log(GPR_ERROR,
- "This LB policy doesn't support user data. It will be ignored");
+ if (p->selected == NULL) {
+ // We don't yet have a selected subchannel, so replace the current
+ // subchannel list immediately.
+ if (p->subchannel_list != NULL) {
+ grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ "pf_update_before_selected");
}
- grpc_arg addr_arg =
- grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
- grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
- args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
- 1);
- gpr_free(addr_arg.value.string);
- sc_args[sc_args_count++].args = new_args;
- }
-
- /* Check if p->selected is amongst them. If so, we are done. */
- if (p->selected != NULL) {
- GPR_ASSERT(p->selected_key != NULL);
- for (size_t i = 0; i < sc_args_count; i++) {
- grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]);
- const bool found_selected =
- grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0;
- grpc_subchannel_key_destroy(exec_ctx, ith_sc_key);
- if (found_selected) {
+ p->subchannel_list = subchannel_list;
+ } else {
+ // We do have a selected subchannel.
+ // Check if it's present in the new list. If so, we're done.
+ for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
+ grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i];
+ if (sd->subchannel == p->selected->subchannel) {
// The currently selected subchannel is in the update: we are done.
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
- "Pick First %p found already selected subchannel %p amongst "
- "updates. Update done.",
- (void *)p, (void *)p->selected);
+ "Pick First %p found already selected subchannel %p "
+ "at update index %" PRIuPTR " of %" PRIuPTR "; update done",
+ p, p->selected->subchannel, i,
+ subchannel_list->num_subchannels);
+ }
+ grpc_lb_subchannel_list_ref_for_connectivity_watch(
+ subchannel_list, "connectivity_watch+replace_selected");
+ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ if (p->subchannel_list != NULL) {
+ grpc_lb_subchannel_list_shutdown_and_unref(
+ exec_ctx, p->subchannel_list, "pf_update_includes_selected");
}
- for (size_t j = 0; j < sc_args_count; j++) {
- grpc_channel_args_destroy(exec_ctx,
- (grpc_channel_args *)sc_args[j].args);
+ p->subchannel_list = subchannel_list;
+ if (p->selected->connected_subchannel != NULL) {
+ sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
+ p->selected->connected_subchannel, "pf_update_includes_selected");
+ }
+ p->selected = sd;
+ destroy_unselected_subchannels_locked(exec_ctx, p);
+ // If there was a previously pending update (which may or may
+ // not have contained the currently selected subchannel), drop
+ // it, so that it doesn't override what we've done here.
+ if (p->latest_pending_subchannel_list != NULL) {
+ grpc_lb_subchannel_list_shutdown_and_unref(
+ exec_ctx, p->latest_pending_subchannel_list,
+ "pf_update_includes_selected+outdated");
+ p->latest_pending_subchannel_list = NULL;
}
- gpr_free(sc_args);
return;
}
}
- }
- // We only check for already running updates here because if the previous
- // steps were successful, the update can be considered done without any
- // interference (ie, no callbacks were scheduled).
- if (p->updating_selected || p->updating_subchannels) {
- if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO,
- "Update already in progress for pick first %p. Deferring update.",
- (void *)p);
- }
- if (p->pending_update_args != NULL) {
- grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
- gpr_free(p->pending_update_args);
- }
- p->pending_update_args =
- (grpc_lb_policy_args *)gpr_zalloc(sizeof(*p->pending_update_args));
- p->pending_update_args->client_channel_factory =
- args->client_channel_factory;
- p->pending_update_args->args = grpc_channel_args_copy(args->args);
- p->pending_update_args->combiner = args->combiner;
- return;
- }
- /* Create the subchannels for the new subchannel args/addresses. */
- grpc_subchannel **new_subchannels =
- (grpc_subchannel **)gpr_zalloc(sizeof(*new_subchannels) * sc_args_count);
- size_t num_new_subchannels = 0;
- for (size_t i = 0; i < sc_args_count; i++) {
- grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
- exec_ctx, args->client_channel_factory, &sc_args[i]);
- if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- char *address_uri =
- grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_INFO,
- "Pick First %p created subchannel %p for address uri %s",
- (void *)p, (void *)subchannel, address_uri);
- gpr_free(address_uri);
+ // Not keeping the previous selected subchannel, so set the latest
+ // pending subchannel list to the new subchannel list. We will wait
+ // for it to report READY before swapping it into the current
+ // subchannel list.
+ if (p->latest_pending_subchannel_list != NULL) {
+ if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_DEBUG,
+ "Pick First %p Shutting down latest pending subchannel list "
+ "%p, about to be replaced by newer latest %p",
+ (void *)p, (void *)p->latest_pending_subchannel_list,
+ (void *)subchannel_list);
+ }
+ grpc_lb_subchannel_list_shutdown_and_unref(
+ exec_ctx, p->latest_pending_subchannel_list,
+ "sl_outdated_dont_smash");
}
- grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args);
- if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel;
- }
- gpr_free(sc_args);
- if (num_new_subchannels == 0) {
- gpr_free(new_subchannels);
- // Empty update. Unsubscribe from all current subchannels and put the
- // channel in TRANSIENT_FAILURE.
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"),
- "pf_update_no_valid_addresses");
- stop_connectivity_watchers(exec_ctx, p);
- return;
+ p->latest_pending_subchannel_list = subchannel_list;
}
-
- /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */
- stop_connectivity_watchers(exec_ctx, p);
-
- /* Save new subchannels. The switch over will happen in
- * pf_connectivity_changed_locked */
- if (p->updating_selected || p->updating_subchannels) {
- p->num_new_subchannels = num_new_subchannels;
- p->new_subchannels = new_subchannels;
- } else { /* nothing is updating. Get things moving from here */
- p->num_subchannels = num_new_subchannels;
- p->subchannels = new_subchannels;
- p->new_subchannels = NULL;
- p->num_new_subchannels = 0;
- if (p->started_picking) {
- p->checking_subchannel = 0;
- p->checking_connectivity = GRPC_CHANNEL_IDLE;
- grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
- &p->connectivity_changed);
- }
+ // If we've started picking, start trying to connect to the first
+ // subchannel in the new list.
+ if (p->started_picking) {
+ grpc_lb_subchannel_list_ref_for_connectivity_watch(
+ subchannel_list, "connectivity_watch+update");
+ grpc_lb_subchannel_data_start_connectivity_watch(
+ exec_ctx, &subchannel_list->subchannels[0]);
}
}
static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)arg;
- grpc_subchannel *selected_subchannel;
- pending_pick *pp;
-
+ grpc_lb_subchannel_data *sd = (grpc_lb_subchannel_data *)arg;
+ pick_first_lb_policy *p = (pick_first_lb_policy *)sd->subchannel_list->policy;
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- gpr_log(
- GPR_DEBUG,
- "Pick First %p connectivity changed. Updating selected: %d; Updating "
- "subchannels: %d; Checking %lu index (%lu total); State: %d; ",
- (void *)p, p->updating_selected, p->updating_subchannels,
- (unsigned long)p->checking_subchannel,
- (unsigned long)p->num_subchannels, p->checking_connectivity);
+ gpr_log(GPR_DEBUG,
+ "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR
+ " of %" PRIuPTR
+ "), subchannel_list %p: state=%s p->shutdown=%d "
+ "sd->subchannel_list->shutting_down=%d error=%s",
+ (void *)p, (void *)sd->subchannel,
+ sd->subchannel_list->checking_subchannel,
+ sd->subchannel_list->num_subchannels, (void *)sd->subchannel_list,
+ grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
+ p->shutdown, sd->subchannel_list->shutting_down,
+ grpc_error_string(error));
}
- bool restart = false;
- if (p->updating_selected && error != GRPC_ERROR_NONE) {
- /* Captured the unsubscription for p->selected */
- GPR_ASSERT(p->selected != NULL);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
- "pf_update_connectivity");
- if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_DEBUG, "Pick First %p unreffing selected subchannel %p",
- (void *)p, (void *)p->selected);
- }
- p->updating_selected = false;
- if (p->num_new_subchannels == 0) {
- p->selected = NULL;
- return;
- }
- restart = true;
- }
- if (p->updating_subchannels && error != GRPC_ERROR_NONE) {
- /* Captured the unsubscription for the checking subchannel */
- GPR_ASSERT(p->selected == NULL);
- for (size_t i = 0; i < p->num_subchannels; i++) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
- "pf_update_connectivity");
- if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_DEBUG, "Pick First %p unreffing subchannel %p", (void *)p,
- (void *)p->subchannels[i]);
- }
- }
- gpr_free(p->subchannels);
- p->subchannels = NULL;
- p->num_subchannels = 0;
- p->updating_subchannels = false;
- if (p->num_new_subchannels == 0) return;
- restart = true;
- }
- if (restart) {
- p->selected = NULL;
- p->selected_key = NULL;
- GPR_ASSERT(p->new_subchannels != NULL);
- GPR_ASSERT(p->num_new_subchannels > 0);
- p->num_subchannels = p->num_new_subchannels;
- p->subchannels = p->new_subchannels;
- p->num_new_subchannels = 0;
- p->new_subchannels = NULL;
- if (p->started_picking) {
- /* If we were picking, continue to do so over the new subchannels,
- * starting from the 0th index. */
- p->checking_subchannel = 0;
- p->checking_connectivity = GRPC_CHANNEL_IDLE;
- /* reuses the weak ref from start_picking_locked */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
- &p->connectivity_changed);
- }
- if (p->pending_update_args != NULL) {
- const grpc_lb_policy_args *args = p->pending_update_args;
- p->pending_update_args = NULL;
- pf_update_locked(exec_ctx, &p->base, args);
- }
+ // If the policy is shutting down, unref and return.
+ if (p->shutdown) {
+ grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_shutdown");
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ exec_ctx, sd->subchannel_list, "pf_shutdown");
return;
}
- GRPC_ERROR_REF(error);
- if (p->shutdown) {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
- GRPC_ERROR_UNREF(error);
+ // If the subchannel list is shutting down, stop watching.
+ if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
+ grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "pf_sl_shutdown");
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ exec_ctx, sd->subchannel_list, "pf_sl_shutdown");
return;
- } else if (p->selected != NULL) {
- if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- /* if the selected channel goes bad, we're done */
- p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN;
- }
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- p->checking_connectivity, GRPC_ERROR_REF(error),
- "selected_changed");
- if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) {
- grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, p->selected, p->base.interested_parties,
- &p->checking_connectivity, &p->connectivity_changed);
+ }
+ // If we're still here, the notification must be for a subchannel in
+ // either the current or latest pending subchannel lists.
+ GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
+ sd->subchannel_list == p->latest_pending_subchannel_list);
+ // Update state.
+ sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
+ // Handle updates for the currently selected subchannel.
+ if (p->selected == sd) {
+ // If the new state is anything other than READY and there is a
+ // pending update, switch to the pending update.
+ if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
+ p->latest_pending_subchannel_list != NULL) {
+ p->selected = NULL;
+ grpc_lb_subchannel_list_shutdown_and_unref(
+ exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update");
+ p->subchannel_list = p->latest_pending_subchannel_list;
+ p->latest_pending_subchannel_list = NULL;
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
} else {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ /* if the selected channel goes bad, we're done */
+ sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN;
+ }
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ sd->curr_connectivity_state,
+ GRPC_ERROR_REF(error), "selected_changed");
+ if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ } else {
+ grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ exec_ctx, sd->subchannel_list, "pf_selected_shutdown");
+ shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error));
+ }
}
- } else {
- loop:
- switch (p->checking_connectivity) {
- case GRPC_CHANNEL_INIT:
- GPR_UNREACHABLE_CODE(return );
- case GRPC_CHANNEL_READY:
+ return;
+ }
+ // If we get here, there are two possible cases:
+ // 1. We do not currently have a selected subchannel, and the update is
+ // for a subchannel in p->subchannel_list that we're trying to
+ // connect to. The goal here is to find a subchannel that we can
+ // select.
+ // 2. We do currently have a selected subchannel, and the update is
+ // for a subchannel in p->latest_pending_subchannel_list. The
+ // goal here is to find a subchannel from the update that we can
+ // select in place of the current one.
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
+ sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ }
+ while (true) {
+ switch (sd->curr_connectivity_state) {
+ case GRPC_CHANNEL_READY: {
+ // Case 2. Promote p->latest_pending_subchannel_list to
+ // p->subchannel_list.
+ if (sd->subchannel_list == p->latest_pending_subchannel_list) {
+ GPR_ASSERT(p->subchannel_list != NULL);
+ grpc_lb_subchannel_list_shutdown_and_unref(
+ exec_ctx, p->subchannel_list, "finish_update");
+ p->subchannel_list = p->latest_pending_subchannel_list;
+ p->latest_pending_subchannel_list = NULL;
+ }
+ // Cases 1 and 2.
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
"connecting_ready");
- selected_subchannel = p->subchannels[p->checking_subchannel];
- p->selected = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(selected_subchannel),
- "picked_first");
-
+ sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(sd->subchannel),
+ "connected");
+ p->selected = sd;
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO,
- "Pick First %p selected subchannel %p (connected %p)",
- (void *)p, (void *)selected_subchannel, (void *)p->selected);
+ gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void *)p,
+ (void *)sd->subchannel);
}
- p->selected_key = grpc_subchannel_get_key(selected_subchannel);
- /* drop the pick list: we are connected now */
- GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
- destroy_subchannels_locked(exec_ctx, p);
- /* update any calls that were waiting for a pick */
+ // Drop all other subchannels, since we are now connected.
+ destroy_unselected_subchannels_locked(exec_ctx, p);
+ // Update any calls that were waiting for a pick.
+ pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
- *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
+ *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ p->selected->connected_subchannel, "picked");
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
@@ -593,71 +484,86 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
- grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, p->selected, p->base.interested_parties,
- &p->checking_connectivity, &p->connectivity_changed);
- break;
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- p->checking_subchannel =
- (p->checking_subchannel + 1) % p->num_subchannels;
- if (p->checking_subchannel == 0) {
- /* only trigger transient failure when we've tried all alternatives
- */
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ return;
+ }
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ do {
+ sd->subchannel_list->checking_subchannel =
+ (sd->subchannel_list->checking_subchannel + 1) %
+ sd->subchannel_list->num_subchannels;
+ sd = &sd->subchannel_list
+ ->subchannels[sd->subchannel_list->checking_subchannel];
+ } while (sd->subchannel == NULL);
+ // Case 1: Only set state to TRANSIENT_FAILURE if we've tried
+ // all subchannels.
+ if (sd->subchannel_list->checking_subchannel == 0 &&
+ sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
+ sd->curr_connectivity_state =
+ grpc_subchannel_check_connectivity(sd->subchannel, &error);
GRPC_ERROR_UNREF(error);
- p->checking_connectivity = grpc_subchannel_check_connectivity(
- p->subchannels[p->checking_subchannel], &error);
- if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
- &p->connectivity_changed);
- } else {
- goto loop;
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ // Reuses the connectivity refs from the previous watch.
+ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ return;
}
- break;
+ break; // Go back to top of loop.
+ }
case GRPC_CHANNEL_CONNECTING:
- case GRPC_CHANNEL_IDLE:
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
- GRPC_ERROR_REF(error), "connecting_changed");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
- &p->connectivity_changed);
- break;
- case GRPC_CHANNEL_SHUTDOWN:
- p->num_subchannels--;
- GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
- p->subchannels[p->num_subchannels]);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
- "pick_first");
- if (p->num_subchannels == 0) {
+ case GRPC_CHANNEL_IDLE: {
+ // Only update connectivity state in case 1.
+ if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick first exhausted channels", &error, 1),
- "no_more_channels");
- fail_pending_picks_for_shutdown(exec_ctx, p);
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
- "pick_first_connectivity");
- } else {
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_REF(error), "connecting_changed");
+ }
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ return;
+ }
+ case GRPC_CHANNEL_SHUTDOWN: {
+ grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
+ "pf_candidate_shutdown");
+ // Advance to next subchannel and check its state.
+ grpc_lb_subchannel_data *original_sd = sd;
+ do {
+ sd->subchannel_list->checking_subchannel =
+ (sd->subchannel_list->checking_subchannel + 1) %
+ sd->subchannel_list->num_subchannels;
+ sd = &sd->subchannel_list
+ ->subchannels[sd->subchannel_list->checking_subchannel];
+ } while (sd->subchannel == NULL && sd != original_sd);
+ if (sd == original_sd) {
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ exec_ctx, sd->subchannel_list, "pf_candidate_shutdown");
+ shutdown_locked(exec_ctx, p,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Pick first exhausted channels", &error, 1));
+ return;
+ }
+ if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "subchannel_failed");
- p->checking_subchannel %= p->num_subchannels;
- GRPC_ERROR_UNREF(error);
- p->checking_connectivity = grpc_subchannel_check_connectivity(
- p->subchannels[p->checking_subchannel], &error);
- goto loop;
}
+ sd->curr_connectivity_state =
+ grpc_subchannel_check_connectivity(sd->subchannel, &error);
+ GRPC_ERROR_UNREF(error);
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ // Reuses the connectivity refs from the previous watch.
+ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
+ return;
+ }
+ // For any other state, go back to top of loop.
+ // We will reuse the connectivity refs from the previous watch.
+ }
}
}
-
- GRPC_ERROR_UNREF(error);
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
@@ -687,8 +593,6 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
grpc_subchannel_index_ref();
- GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p,
- grpc_combiner_scheduler(args->combiner));
return &p->base;
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index de163f6300..8f29c80130 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -28,6 +28,7 @@
#include <grpc/support/alloc.h>
+#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
@@ -64,12 +65,11 @@ typedef struct pending_pick {
grpc_closure *on_complete;
} pending_pick;
-typedef struct rr_subchannel_list rr_subchannel_list;
typedef struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
- rr_subchannel_list *subchannel_list;
+ grpc_lb_subchannel_list *subchannel_list;
/** have we started picking? */
bool started_picking;
@@ -89,157 +89,9 @@ typedef struct round_robin_lb_policy {
* lists if they equal \a latest_pending_subchannel_list. In other words,
* racing callbacks that reference outdated subchannel lists won't perform any
* update. */
- rr_subchannel_list *latest_pending_subchannel_list;
+ grpc_lb_subchannel_list *latest_pending_subchannel_list;
} round_robin_lb_policy;
-typedef struct {
- /** backpointer to owning subchannel list */
- rr_subchannel_list *subchannel_list;
- /** subchannel itself */
- grpc_subchannel *subchannel;
- /** notification that connectivity has changed on subchannel */
- grpc_closure connectivity_changed_closure;
- /** last observed connectivity. Not updated by
- * \a grpc_subchannel_notify_on_state_change. Used to determine the previous
- * state while processing the new state in \a rr_connectivity_changed */
- grpc_connectivity_state prev_connectivity_state;
- /** current connectivity state. Updated by \a
- * grpc_subchannel_notify_on_state_change */
- grpc_connectivity_state curr_connectivity_state;
- /** connectivity state to be updated by the watcher, not guarded by
- * the combiner. Will be moved to curr_connectivity_state inside of
- * the combiner by rr_connectivity_changed_locked(). */
- grpc_connectivity_state pending_connectivity_state_unsafe;
- /** the subchannel's target user data */
- void *user_data;
- /** vtable to operate over \a user_data */
- const grpc_lb_user_data_vtable *user_data_vtable;
-} subchannel_data;
-
-struct rr_subchannel_list {
- /** backpointer to owning policy */
- round_robin_lb_policy *policy;
-
- /** all our subchannels */
- size_t num_subchannels;
- subchannel_data *subchannels;
-
- /** how many subchannels are in state READY */
- size_t num_ready;
- /** how many subchannels are in state TRANSIENT_FAILURE */
- size_t num_transient_failures;
- /** how many subchannels are in state SHUTDOWN */
- size_t num_shutdown;
- /** how many subchannels are in state IDLE */
- size_t num_idle;
-
- /** There will be one ref for each entry in subchannels for which there is a
- * pending connectivity state watcher callback. */
- gpr_refcount refcount;
-
- /** Is this list shutting down? This may be true due to the shutdown of the
- * policy itself or because a newer update has arrived while this one hadn't
- * finished processing. */
- bool shutting_down;
-};
-
-static rr_subchannel_list *rr_subchannel_list_create(round_robin_lb_policy *p,
- size_t num_subchannels) {
- rr_subchannel_list *subchannel_list =
- (rr_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list));
- subchannel_list->policy = p;
- subchannel_list->subchannels =
- (subchannel_data *)gpr_zalloc(sizeof(subchannel_data) * num_subchannels);
- subchannel_list->num_subchannels = num_subchannels;
- gpr_ref_init(&subchannel_list->refcount, 1);
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_INFO, "[RR %p] Created subchannel list %p for %lu subchannels",
- (void *)p, (void *)subchannel_list, (unsigned long)num_subchannels);
- }
- return subchannel_list;
-}
-
-static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
- rr_subchannel_list *subchannel_list) {
- GPR_ASSERT(subchannel_list->shutting_down);
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_INFO, "[RR %p] Destroying subchannel_list %p",
- (void *)subchannel_list->policy, (void *)subchannel_list);
- }
- for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
- subchannel_data *sd = &subchannel_list->subchannels[i];
- if (sd->subchannel != NULL) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel,
- "rr_subchannel_list_destroy");
- }
- sd->subchannel = NULL;
- if (sd->user_data != NULL) {
- GPR_ASSERT(sd->user_data_vtable != NULL);
- sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
- sd->user_data = NULL;
- }
- }
- gpr_free(subchannel_list->subchannels);
- gpr_free(subchannel_list);
-}
-
-static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list,
- const char *reason) {
- gpr_ref_non_zero(&subchannel_list->refcount);
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
- gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu (%s)",
- (void *)subchannel_list->policy, (void *)subchannel_list,
- (unsigned long)(count - 1), (unsigned long)count, reason);
- }
-}
-
-static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
- rr_subchannel_list *subchannel_list,
- const char *reason) {
- const bool done = gpr_unref(&subchannel_list->refcount);
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
- gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu (%s)",
- (void *)subchannel_list->policy, (void *)subchannel_list,
- (unsigned long)(count + 1), (unsigned long)count, reason);
- }
- if (done) {
- rr_subchannel_list_destroy(exec_ctx, subchannel_list);
- }
-}
-
-/** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The
- * watcher's callback will ultimately unref \a subchannel_list. */
-static void rr_subchannel_list_shutdown_and_unref(
- grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list,
- const char *reason) {
- GPR_ASSERT(!subchannel_list->shutting_down);
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[RR %p] Shutting down subchannel_list %p (%s)",
- (void *)subchannel_list->policy, (void *)subchannel_list, reason);
- }
- GPR_ASSERT(!subchannel_list->shutting_down);
- subchannel_list->shutting_down = true;
- for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
- subchannel_data *sd = &subchannel_list->subchannels[i];
- if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe.
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(
- GPR_DEBUG,
- "[RR %p] Unsubscribing from subchannel %p as part of shutting down "
- "subchannel_list %p",
- (void *)subchannel_list->policy, (void *)sd->subchannel,
- (void *)subchannel_list);
- }
- grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
- NULL,
- &sd->connectivity_changed_closure);
- }
- }
- rr_subchannel_list_unref(exec_ctx, subchannel_list, reason);
-}
-
/** Returns the index into p->subchannel_list->subchannels of the next
* subchannel in READY state, or p->subchannel_list->num_subchannels if no
* subchannel is READY.
@@ -299,8 +151,8 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p,
"[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
(void *)p, (unsigned long)last_ready_index,
(void *)p->subchannel_list->subchannels[last_ready_index].subchannel,
- (void *)grpc_subchannel_get_connected_subchannel(
- p->subchannel_list->subchannels[last_ready_index].subchannel));
+ (void *)p->subchannel_list->subchannels[last_ready_index]
+ .connected_subchannel);
}
}
@@ -310,47 +162,47 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p",
(void *)pol, (void *)pol);
}
+ GPR_ASSERT(p->subchannel_list == NULL);
+ GPR_ASSERT(p->latest_pending_subchannel_list == NULL);
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
grpc_subchannel_index_unref();
gpr_free(p);
}
-static void fail_pending_picks_for_shutdown(grpc_exec_ctx *exec_ctx,
- round_robin_lb_policy *p) {
+static void shutdown_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p,
+ grpc_error *error) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
+ }
+ p->shutdown = true;
pending_pick *pp;
while ((pp = p->pending_picks) != NULL) {
p->pending_picks = pp->next;
*pp->target = NULL;
- GRPC_CLOSURE_SCHED(
- exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
+ GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_REF(error));
gpr_free(pp);
}
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
+ "rr_shutdown");
+ if (p->subchannel_list != NULL) {
+ grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ "sl_shutdown_rr_shutdown");
+ p->subchannel_list = NULL;
+ }
+ if (p->latest_pending_subchannel_list != NULL) {
+ grpc_lb_subchannel_list_shutdown_and_unref(
+ exec_ctx, p->latest_pending_subchannel_list,
+ "sl_shutdown_pending_rr_shutdown");
+ p->latest_pending_subchannel_list = NULL;
+ }
+ GRPC_ERROR_UNREF(error);
}
static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p",
- (void *)pol, (void *)pol);
- }
- p->shutdown = true;
- fail_pending_picks_for_shutdown(exec_ctx, p);
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
- const bool latest_is_current =
- p->subchannel_list == p->latest_pending_subchannel_list;
- rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
- "sl_shutdown_rr_shutdown");
- p->subchannel_list = NULL;
- if (!latest_is_current && p->latest_pending_subchannel_list != NULL &&
- !p->latest_pending_subchannel_list->shutting_down) {
- rr_subchannel_list_shutdown_and_unref(exec_ctx,
- p->latest_pending_subchannel_list,
- "sl_shutdown_pending_rr_shutdown");
- p->latest_pending_subchannel_list = NULL;
- }
+ shutdown_locked(exec_ctx, p,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
}
static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
@@ -405,13 +257,10 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p) {
p->started_picking = true;
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
- subchannel_data *sd = &p->subchannel_list->subchannels[i];
- GRPC_LB_POLICY_WEAK_REF(&p->base, "start_picking_locked");
- rr_subchannel_list_ref(sd->subchannel_list, "started_picking");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->pending_connectivity_state_unsafe,
- &sd->connectivity_changed_closure);
+ grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
+ "connectivity_watch");
+ grpc_lb_subchannel_data_start_connectivity_watch(
+ exec_ctx, &p->subchannel_list->subchannels[i]);
}
}
@@ -436,10 +285,10 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
if (next_ready_index < p->subchannel_list->num_subchannels) {
/* readily available, report right away */
- subchannel_data *sd = &p->subchannel_list->subchannels[next_ready_index];
- *target = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(sd->subchannel),
- "rr_picked");
+ grpc_lb_subchannel_data *sd =
+ &p->subchannel_list->subchannels[next_ready_index];
+ *target =
+ GRPC_CONNECTED_SUBCHANNEL_REF(sd->connected_subchannel, "rr_picked");
if (user_data != NULL) {
*user_data = sd->user_data;
}
@@ -470,8 +319,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
return 0;
}
-static void update_state_counters_locked(subchannel_data *sd) {
- rr_subchannel_list *subchannel_list = sd->subchannel_list;
+static void update_state_counters_locked(grpc_lb_subchannel_data *sd) {
+ grpc_lb_subchannel_list *subchannel_list = sd->subchannel_list;
if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(subchannel_list->num_ready > 0);
--subchannel_list->num_ready;
@@ -485,6 +334,7 @@ static void update_state_counters_locked(subchannel_data *sd) {
GPR_ASSERT(subchannel_list->num_idle > 0);
--subchannel_list->num_idle;
}
+ sd->prev_connectivity_state = sd->curr_connectivity_state;
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
++subchannel_list->num_ready;
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@@ -497,12 +347,12 @@ static void update_state_counters_locked(subchannel_data *sd) {
}
/** Sets the policy's connectivity status based on that of the passed-in \a sd
- * (the subchannel_data associted with the updated subchannel) and the
+ * (the grpc_lb_subchannel_data associted with the updated subchannel) and the
* subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be
* used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the
* connectivity status set. */
static grpc_connectivity_state update_lb_connectivity_status_locked(
- grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) {
+ grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, grpc_error *error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
@@ -524,8 +374,8 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
* CHECK: p->num_idle == p->subchannel_list->num_subchannels.
*/
grpc_connectivity_state new_state = sd->curr_connectivity_state;
- rr_subchannel_list *subchannel_list = sd->subchannel_list;
- round_robin_lb_policy *p = subchannel_list->policy;
+ grpc_lb_subchannel_list *subchannel_list = sd->subchannel_list;
+ round_robin_lb_policy *p = (round_robin_lb_policy *)subchannel_list->policy;
if (subchannel_list->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
@@ -561,8 +411,9 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- subchannel_data *sd = (subchannel_data *)arg;
- round_robin_lb_policy *p = sd->subchannel_list->policy;
+ grpc_lb_subchannel_data *sd = (grpc_lb_subchannel_data *)arg;
+ round_robin_lb_policy *p =
+ (round_robin_lb_policy *)sd->subchannel_list->policy;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(
GPR_DEBUG,
@@ -577,65 +428,50 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
// If the policy is shutting down, unref and return.
if (p->shutdown) {
- rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
- "pol_shutdown+started_picking");
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown");
- return;
- }
- if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) {
- // the subchannel list associated with sd has been discarded. This callback
- // corresponds to the unsubscription. The unrefs correspond to the picking
- // ref (start_picking_locked or update_started_picking).
- rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
- "sl_shutdown+started_picking");
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown+picking");
+ grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_shutdown");
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ exec_ctx, sd->subchannel_list, "rr_shutdown");
return;
}
- // Dispose of outdated subchannel lists.
- if (sd->subchannel_list != p->subchannel_list &&
- sd->subchannel_list != p->latest_pending_subchannel_list) {
- const char *reason = NULL;
- if (sd->subchannel_list->shutting_down) {
- reason = "sl_outdated_straggler";
- rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, reason);
- } else {
- reason = "sl_outdated";
- rr_subchannel_list_shutdown_and_unref(exec_ctx, sd->subchannel_list,
- reason);
- }
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, reason);
+ // If the subchannel list is shutting down, stop watching.
+ if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
+ grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "rr_sl_shutdown");
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ exec_ctx, sd->subchannel_list, "rr_sl_shutdown");
return;
}
+ // If we're still here, the notification must be for a subchannel in
+ // either the current or latest pending subchannel lists.
+ GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
+ sd->subchannel_list == p->latest_pending_subchannel_list);
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
// Update state counters and determine new overall state.
update_state_counters_locked(sd);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
const grpc_connectivity_state new_policy_connectivity_state =
update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
// If the sd's new state is SHUTDOWN, unref the subchannel, and if the new
// policy's state is SHUTDOWN, clean up.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
- sd->subchannel = NULL;
- if (sd->user_data != NULL) {
- GPR_ASSERT(sd->user_data_vtable != NULL);
- sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
- sd->user_data = NULL;
- }
+ grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+ grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
+ "rr_connectivity_shutdown");
+ grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown");
if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
- // The policy is shutting down. Fail all of the pending picks.
- fail_pending_picks_for_shutdown(exec_ctx, p);
+ shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error));
}
- rr_subchannel_list_unref(exec_ctx, sd->subchannel_list,
- "sd_shutdown+started_picking");
- // unref the "rr_connectivity_update" weak ref from start_picking.
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
- "rr_connectivity_sd_shutdown");
} else { // sd not in SHUTDOWN
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ if (sd->connected_subchannel == NULL) {
+ sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(sd->subchannel),
+ "connected");
+ }
if (sd->subchannel_list != p->subchannel_list) {
// promote sd->subchannel_list to p->subchannel_list.
// sd->subchannel_list must be equal to
@@ -656,8 +492,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
if (p->subchannel_list != NULL) {
// dispose of the current subchannel_list
- rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
- "sl_phase_out_shutdown");
+ grpc_lb_subchannel_list_shutdown_and_unref(
+ exec_ctx, p->subchannel_list, "sl_phase_out_shutdown");
}
p->subchannel_list = p->latest_pending_subchannel_list;
p->latest_pending_subchannel_list = NULL;
@@ -667,7 +503,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
- subchannel_data *selected =
+ grpc_lb_subchannel_data *selected =
&p->subchannel_list->subchannels[next_ready_index];
if (p->pending_picks != NULL) {
// if the selected subchannel is going to be used for the pending
@@ -678,8 +514,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(selected->subchannel),
- "rr_picked");
+ selected->connected_subchannel, "rr_picked");
if (pp->user_data != NULL) {
*pp->user_data = selected->user_data;
}
@@ -694,12 +529,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(pp);
}
}
- /* renew notification: reuses the "rr_connectivity_update" weak ref on the
- * policy as well as the sd->subchannel_list ref. */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->pending_connectivity_state_unsafe,
- &sd->connectivity_changed_closure);
+ // Renew notification.
+ grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
}
}
@@ -723,13 +554,12 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
if (next_ready_index < p->subchannel_list->num_subchannels) {
- subchannel_data *selected =
+ grpc_lb_subchannel_data *selected =
&p->subchannel_list->subchannels[next_ready_index];
grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(selected->subchannel),
- "rr_picked");
+ selected->connected_subchannel, "rr_ping");
grpc_connected_subchannel_ping(exec_ctx, target, closure);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_ping");
} else {
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
@@ -742,130 +572,68 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+ gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", p);
+ // If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
+ // Otherwise, keep using the current subchannel list (ignore this update).
if (p->subchannel_list == NULL) {
- // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"rr_update_missing");
- } else {
- // otherwise, keep using the current subchannel list (ignore this update).
- gpr_log(GPR_ERROR,
- "[RR %p] No valid LB addresses channel arg for update, ignoring.",
- (void *)p);
}
return;
}
grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
- rr_subchannel_list *subchannel_list =
- rr_subchannel_list_create(p, addresses->num_addresses);
- if (addresses->num_addresses == 0) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", p,
+ addresses->num_addresses);
+ }
+ grpc_lb_subchannel_list *subchannel_list = grpc_lb_subchannel_list_create(
+ exec_ctx, &p->base, &grpc_lb_round_robin_trace, addresses, args,
+ rr_connectivity_changed_locked);
+ if (subchannel_list->num_subchannels == 0) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"rr_update_empty");
if (p->subchannel_list != NULL) {
- rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
- "sl_shutdown_empty_update");
+ grpc_lb_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
+ "sl_shutdown_empty_update");
}
p->subchannel_list = subchannel_list; // empty list
return;
}
- size_t subchannel_index = 0;
- if (p->latest_pending_subchannel_list != NULL && p->started_picking) {
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR %p] Shutting down latest pending subchannel list %p, about "
- "to be replaced by newer latest %p",
- (void *)p, (void *)p->latest_pending_subchannel_list,
- (void *)subchannel_list);
- }
- rr_subchannel_list_shutdown_and_unref(
- exec_ctx, p->latest_pending_subchannel_list, "sl_outdated_dont_smash");
- }
- p->latest_pending_subchannel_list = subchannel_list;
- grpc_subchannel_args sc_args;
- /* We need to remove the LB addresses in order to be able to compare the
- * subchannel keys of subchannels from a different batch of addresses. */
- static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
- GRPC_ARG_LB_ADDRESSES};
- /* Create subchannels for addresses in the update. */
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- // If there were any balancer, we would have chosen grpclb policy instead.
- GPR_ASSERT(!addresses->addresses[i].is_balancer);
- memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- grpc_arg addr_arg =
- grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
- grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
- args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
- 1);
- gpr_free(addr_arg.value.string);
- sc_args.args = new_args;
- grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
- exec_ctx, args->client_channel_factory, &sc_args);
- grpc_channel_args_destroy(exec_ctx, new_args);
- grpc_error *error;
- // Get the connectivity state of the subchannel. Already existing ones may
- // be in a state other than INIT.
- const grpc_connectivity_state subchannel_connectivity_state =
- grpc_subchannel_check_connectivity(subchannel, &error);
- if (error != GRPC_ERROR_NONE) {
- // The subchannel is in error (e.g. shutting down). Ignore it.
- GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "new_sc_connectivity_error");
- GRPC_ERROR_UNREF(error);
- continue;
- }
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- char *address_uri =
- grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(
- GPR_DEBUG,
- "[RR %p] index %lu: Created subchannel %p for address uri %s into "
- "subchannel_list %p. Connectivity state %s",
- (void *)p, (unsigned long)subchannel_index, (void *)subchannel,
- address_uri, (void *)subchannel_list,
- grpc_connectivity_state_name(subchannel_connectivity_state));
- gpr_free(address_uri);
- }
- subchannel_data *sd = &subchannel_list->subchannels[subchannel_index++];
- sd->subchannel_list = subchannel_list;
- sd->subchannel = subchannel;
- GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
- rr_connectivity_changed_locked, sd,
- grpc_combiner_scheduler(args->combiner));
- /* use some sentinel value outside of the range of
- * grpc_connectivity_state to signal an undefined previous state. We
- * won't be referring to this value again and it'll be overwritten after
- * the first call to rr_connectivity_changed_locked */
- sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
- sd->curr_connectivity_state = subchannel_connectivity_state;
- sd->user_data_vtable = addresses->user_data_vtable;
- if (sd->user_data_vtable != NULL) {
- sd->user_data =
- sd->user_data_vtable->copy(addresses->addresses[i].user_data);
+ if (p->started_picking) {
+ if (p->latest_pending_subchannel_list != NULL) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "[RR %p] Shutting down latest pending subchannel list %p, "
+ "about to be replaced by newer latest %p",
+ (void *)p, (void *)p->latest_pending_subchannel_list,
+ (void *)subchannel_list);
+ }
+ grpc_lb_subchannel_list_shutdown_and_unref(
+ exec_ctx, p->latest_pending_subchannel_list, "sl_outdated");
}
- if (p->started_picking) {
- rr_subchannel_list_ref(sd->subchannel_list, "update_started_picking");
- GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity_update");
- /* 2. Watch every new subchannel. A subchannel list becomes active the
+ p->latest_pending_subchannel_list = subchannel_list;
+ for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
+ /* Watch every new subchannel. A subchannel list becomes active the
* moment one of its subchannels is READY. At that moment, we swap
* p->subchannel_list for sd->subchannel_list, provided the subchannel
* list is still valid (ie, isn't shutting down) */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->pending_connectivity_state_unsafe,
- &sd->connectivity_changed_closure);
+ grpc_lb_subchannel_list_ref_for_connectivity_watch(subchannel_list,
+ "connectivity_watch");
+ grpc_lb_subchannel_data_start_connectivity_watch(
+ exec_ctx, &subchannel_list->subchannels[i]);
}
- }
- if (!p->started_picking) {
+ } else {
// The policy isn't picking yet. Save the update for later, disposing of
// previous version if any.
if (p->subchannel_list != NULL) {
- rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list,
- "rr_update_before_started_picking");
+ grpc_lb_subchannel_list_shutdown_and_unref(
+ exec_ctx, p->subchannel_list, "rr_update_before_started_picking");
}
p->subchannel_list = subchannel_list;
- p->latest_pending_subchannel_list = NULL;
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
new file mode 100644
index 0000000000..08ea4f480b
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
@@ -0,0 +1,265 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/transport/connectivity_state.h"
+
+void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx,
+ grpc_lb_subchannel_data *sd,
+ const char *reason) {
+ if (sd->subchannel != NULL) {
+ if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) {
+ gpr_log(
+ GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR
+ " of %" PRIuPTR " (subchannel %p): unreffing subchannel",
+ sd->subchannel_list->tracer->name, sd->subchannel_list->policy,
+ sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels),
+ sd->subchannel_list->num_subchannels, sd->subchannel);
+ }
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, reason);
+ sd->subchannel = NULL;
+ if (sd->connected_subchannel != NULL) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, sd->connected_subchannel,
+ reason);
+ sd->connected_subchannel = NULL;
+ }
+ if (sd->user_data != NULL) {
+ GPR_ASSERT(sd->user_data_vtable != NULL);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ sd->user_data = NULL;
+ }
+ }
+}
+
+void grpc_lb_subchannel_data_start_connectivity_watch(
+ grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) {
+ if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) {
+ gpr_log(GPR_DEBUG,
+ "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): requesting connectivity change notification",
+ sd->subchannel_list->tracer->name, sd->subchannel_list->policy,
+ sd->subchannel_list,
+ (size_t)(sd - sd->subchannel_list->subchannels),
+ sd->subchannel_list->num_subchannels, sd->subchannel);
+ }
+ sd->connectivity_notification_pending = true;
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, sd->subchannel_list->policy->interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
+}
+
+void grpc_lb_subchannel_data_stop_connectivity_watch(
+ grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) {
+ if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) {
+ gpr_log(
+ GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): stopping connectivity watch",
+ sd->subchannel_list->tracer->name, sd->subchannel_list->policy,
+ sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels),
+ sd->subchannel_list->num_subchannels, sd->subchannel);
+ }
+ GPR_ASSERT(sd->connectivity_notification_pending);
+ sd->connectivity_notification_pending = false;
+}
+
+grpc_lb_subchannel_list *grpc_lb_subchannel_list_create(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, grpc_tracer_flag *tracer,
+ const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args,
+ grpc_iomgr_cb_func connectivity_changed_cb) {
+ grpc_lb_subchannel_list *subchannel_list =
+ (grpc_lb_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list));
+ if (GRPC_TRACER_ON(*tracer)) {
+ gpr_log(GPR_DEBUG,
+ "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
+ tracer->name, p, subchannel_list, addresses->num_addresses);
+ }
+ subchannel_list->policy = p;
+ subchannel_list->tracer = tracer;
+ gpr_ref_init(&subchannel_list->refcount, 1);
+ subchannel_list->subchannels = (grpc_lb_subchannel_data *)gpr_zalloc(
+ sizeof(grpc_lb_subchannel_data) * addresses->num_addresses);
+ // We need to remove the LB addresses in order to be able to compare the
+ // subchannel keys of subchannels from a different batch of addresses.
+ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
+ GRPC_ARG_LB_ADDRESSES};
+ // Create a subchannel for each address.
+ grpc_subchannel_args sc_args;
+ size_t subchannel_index = 0;
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ // If there were any balancer, we would have chosen grpclb policy instead.
+ GPR_ASSERT(!addresses->addresses[i].is_balancer);
+ memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ grpc_arg addr_arg =
+ grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
+ args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
+ 1);
+ gpr_free(addr_arg.value.string);
+ sc_args.args = new_args;
+ grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
+ exec_ctx, args->client_channel_factory, &sc_args);
+ grpc_channel_args_destroy(exec_ctx, new_args);
+ if (subchannel == NULL) {
+ // Subchannel could not be created.
+ if (GRPC_TRACER_ON(*tracer)) {
+ char *address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_DEBUG,
+ "[%s %p] could not create subchannel for address uri %s, "
+ "ignoring",
+ tracer->name, subchannel_list->policy, address_uri);
+ gpr_free(address_uri);
+ }
+ continue;
+ }
+ if (GRPC_TRACER_ON(*tracer)) {
+ char *address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR
+ ": Created subchannel %p for address uri %s",
+ tracer->name, p, subchannel_list, subchannel_index, subchannel,
+ address_uri);
+ gpr_free(address_uri);
+ }
+ grpc_lb_subchannel_data *sd =
+ &subchannel_list->subchannels[subchannel_index++];
+ sd->subchannel_list = subchannel_list;
+ sd->subchannel = subchannel;
+ GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
+ connectivity_changed_cb, sd,
+ grpc_combiner_scheduler(args->combiner));
+ // We assume that the current state is IDLE. If not, we'll get a
+ // callback telling us that.
+ sd->prev_connectivity_state = GRPC_CHANNEL_IDLE;
+ sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
+ sd->pending_connectivity_state_unsafe = GRPC_CHANNEL_IDLE;
+ sd->user_data_vtable = addresses->user_data_vtable;
+ if (sd->user_data_vtable != NULL) {
+ sd->user_data =
+ sd->user_data_vtable->copy(addresses->addresses[i].user_data);
+ }
+ }
+ subchannel_list->num_subchannels = subchannel_index;
+ subchannel_list->num_idle = subchannel_index;
+ return subchannel_list;
+}
+
+static void subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_lb_subchannel_list *subchannel_list) {
+ if (GRPC_TRACER_ON(*subchannel_list->tracer)) {
+ gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p",
+ subchannel_list->tracer->name, subchannel_list->policy,
+ subchannel_list);
+ }
+ for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
+ grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i];
+ grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
+ "subchannel_list_destroy");
+ }
+ gpr_free(subchannel_list->subchannels);
+ gpr_free(subchannel_list);
+}
+
+void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list,
+ const char *reason) {
+ gpr_ref_non_zero(&subchannel_list->refcount);
+ if (GRPC_TRACER_ON(*subchannel_list->tracer)) {
+ const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
+ gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p REF %lu->%lu (%s)",
+ subchannel_list->tracer->name, subchannel_list->policy,
+ subchannel_list, (unsigned long)(count - 1), (unsigned long)count,
+ reason);
+ }
+}
+
+void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
+ grpc_lb_subchannel_list *subchannel_list,
+ const char *reason) {
+ const bool done = gpr_unref(&subchannel_list->refcount);
+ if (GRPC_TRACER_ON(*subchannel_list->tracer)) {
+ const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
+ gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)",
+ subchannel_list->tracer->name, subchannel_list->policy,
+ subchannel_list, (unsigned long)(count + 1), (unsigned long)count,
+ reason);
+ }
+ if (done) {
+ subchannel_list_destroy(exec_ctx, subchannel_list);
+ }
+}
+
+void grpc_lb_subchannel_list_ref_for_connectivity_watch(
+ grpc_lb_subchannel_list *subchannel_list, const char *reason) {
+ GRPC_LB_POLICY_WEAK_REF(subchannel_list->policy, reason);
+ grpc_lb_subchannel_list_ref(subchannel_list, reason);
+}
+
+void grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list,
+ const char *reason) {
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, subchannel_list->policy, reason);
+ grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason);
+}
+
+static void subchannel_data_cancel_connectivity_watch(
+ grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason) {
+ if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) {
+ gpr_log(
+ GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
+ " (subchannel %p): canceling connectivity watch (%s)",
+ sd->subchannel_list->tracer->name, sd->subchannel_list->policy,
+ sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels),
+ sd->subchannel_list->num_subchannels, sd->subchannel, reason);
+ }
+ grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
+ &sd->connectivity_changed_closure);
+}
+
+void grpc_lb_subchannel_list_shutdown_and_unref(
+ grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list,
+ const char *reason) {
+ if (GRPC_TRACER_ON(*subchannel_list->tracer)) {
+ gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)",
+ subchannel_list->tracer->name, subchannel_list->policy,
+ subchannel_list, reason);
+ }
+ GPR_ASSERT(!subchannel_list->shutting_down);
+ subchannel_list->shutting_down = true;
+ for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
+ grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i];
+ // If there's a pending notification for this subchannel, cancel it;
+ // the callback is responsible for unreffing the subchannel.
+ // Otherwise, unref the subchannel directly.
+ if (sd->connectivity_notification_pending) {
+ subchannel_data_cancel_connectivity_watch(exec_ctx, sd, reason);
+ } else if (sd->subchannel != NULL) {
+ grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, reason);
+ }
+ }
+ grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason);
+}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
new file mode 100644
index 0000000000..9d5984260f
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -0,0 +1,153 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
+
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/transport/connectivity_state.h"
+
+// TODO(roth): This code is intended to be shared between pick_first and
+// round_robin. However, the interface needs more work to provide clean
+// encapsulation. For example, the structs here have some fields that are
+// only used in one of the two (e.g., the state counters in
+// grpc_lb_subchannel_list and the prev_connectivity_state field in
+// grpc_lb_subchannel_data are only used in round_robin, and the
+// checking_subchannel field in grpc_lb_subchannel_list is only used by
+// pick_first). Also, there is probably some code duplication between the
+// connectivity state notification callback code in both pick_first and
+// round_robin that could be refactored and moved here. In a future PR,
+// need to clean this up.
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct grpc_lb_subchannel_list grpc_lb_subchannel_list;
+
+typedef struct {
+ /** backpointer to owning subchannel list */
+ grpc_lb_subchannel_list *subchannel_list;
+ /** subchannel itself */
+ grpc_subchannel *subchannel;
+ grpc_connected_subchannel *connected_subchannel;
+ /** Is a connectivity notification pending? */
+ bool connectivity_notification_pending;
+ /** notification that connectivity has changed on subchannel */
+ grpc_closure connectivity_changed_closure;
+ /** previous and current connectivity states. Updated by \a
+ * \a connectivity_changed_closure based on
+ * \a pending_connectivity_state_unsafe. */
+ grpc_connectivity_state prev_connectivity_state;
+ grpc_connectivity_state curr_connectivity_state;
+ /** connectivity state to be updated by
+ * grpc_subchannel_notify_on_state_change(), not guarded by
+ * the combiner. To be copied to \a curr_connectivity_state by
+ * \a connectivity_changed_closure. */
+ grpc_connectivity_state pending_connectivity_state_unsafe;
+ /** the subchannel's target user data */
+ void *user_data;
+ /** vtable to operate over \a user_data */
+ const grpc_lb_user_data_vtable *user_data_vtable;
+} grpc_lb_subchannel_data;
+
+/// Unrefs the subchannel contained in sd.
+void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx,
+ grpc_lb_subchannel_data *sd,
+ const char *reason);
+
+/// Starts watching the connectivity state of the subchannel.
+/// The connectivity_changed_cb callback must invoke either
+/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call
+/// grpc_lb_subchannel_data_start_connectivity_watch().
+void grpc_lb_subchannel_data_start_connectivity_watch(
+ grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd);
+
+/// Stops watching the connectivity state of the subchannel.
+void grpc_lb_subchannel_data_stop_connectivity_watch(
+ grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd);
+
+struct grpc_lb_subchannel_list {
+ /** backpointer to owning policy */
+ grpc_lb_policy *policy;
+
+ grpc_tracer_flag *tracer;
+
+ /** all our subchannels */
+ size_t num_subchannels;
+ grpc_lb_subchannel_data *subchannels;
+
+ /** Index into subchannels of the one we're currently checking.
+ * Used when connecting to subchannels serially instead of in parallel. */
+ // TODO(roth): When we have time, we can probably make this go away
+ // and compute the index dynamically by subtracting
+ // subchannel_list->subchannels from the subchannel_data pointer.
+ size_t checking_subchannel;
+
+ /** how many subchannels are in state READY */
+ size_t num_ready;
+ /** how many subchannels are in state TRANSIENT_FAILURE */
+ size_t num_transient_failures;
+ /** how many subchannels are in state SHUTDOWN */
+ size_t num_shutdown;
+ /** how many subchannels are in state IDLE */
+ size_t num_idle;
+
+ /** There will be one ref for each entry in subchannels for which there is a
+ * pending connectivity state watcher callback. */
+ gpr_refcount refcount;
+
+ /** Is this list shutting down? This may be true due to the shutdown of the
+ * policy itself or because a newer update has arrived while this one hadn't
+ * finished processing. */
+ bool shutting_down;
+};
+
+grpc_lb_subchannel_list *grpc_lb_subchannel_list_create(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, grpc_tracer_flag *tracer,
+ const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args,
+ grpc_iomgr_cb_func connectivity_changed_cb);
+
+void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list,
+ const char *reason);
+
+void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
+ grpc_lb_subchannel_list *subchannel_list,
+ const char *reason);
+
+/// Takes and releases refs needed for a connectivity notification.
+/// This includes a ref to subchannel_list and a weak ref to the LB policy.
+void grpc_lb_subchannel_list_ref_for_connectivity_watch(
+ grpc_lb_subchannel_list *subchannel_list, const char *reason);
+void grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list,
+ const char *reason);
+
+/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The
+/// connectivity state notification callback will ultimately unref it.
+void grpc_lb_subchannel_list_shutdown_and_unref(
+ grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list,
+ const char *reason);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index 46b29f1fe0..1cd73f3ff4 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -127,8 +127,8 @@ void grpc_connected_subchannel_process_transport_op(
grpc_connectivity_state grpc_subchannel_check_connectivity(
grpc_subchannel *channel, grpc_error **error);
-/** call notify when the connectivity state of a channel changes from *state.
- Updates *state with the new state of the channel */
+/** Calls notify when the connectivity state of a channel becomes different
+ from *state. Updates *state with the new state of the channel. */
void grpc_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_subchannel *channel,
grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h
index d5107d467b..7dd348ed5f 100644
--- a/src/core/ext/transport/chttp2/transport/flow_control.h
+++ b/src/core/ext/transport/chttp2/transport/flow_control.h
@@ -32,6 +32,12 @@ struct grpc_chttp2_stream;
extern "C" grpc_tracer_flag grpc_flowctl_trace;
+namespace grpc {
+namespace testing {
+class TrickledCHTTP2; // to make this a friend
+} // namespace testing
+} // namespace grpc
+
namespace grpc_core {
namespace chttp2 {
@@ -203,6 +209,7 @@ class TransportFlowControl {
}
private:
+ friend class ::grpc::testing::TrickledCHTTP2;
double TargetLogBdp();
double SmoothLogBdp(grpc_exec_ctx* exec_ctx, double value);
FlowControlAction::Urgency DeltaUrgency(int32_t value,
@@ -297,6 +304,7 @@ class StreamFlowControl {
}
private:
+ friend class ::grpc::testing::TrickledCHTTP2;
TransportFlowControl* const tfc_;
const grpc_chttp2_stream* const s_;
diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.cc b/src/core/lib/security/credentials/ssl/ssl_credentials.cc
index 8e47aebedb..2085e2b8e7 100644
--- a/src/core/lib/security/credentials/ssl/ssl_credentials.cc
+++ b/src/core/lib/security/credentials/ssl/ssl_credentials.cc
@@ -119,6 +119,12 @@ grpc_channel_credentials *grpc_ssl_credentials_create(
// SSL Server Credentials.
//
+struct grpc_ssl_server_credentials_options {
+ grpc_ssl_client_certificate_request_type client_certificate_request;
+ grpc_ssl_server_certificate_config *certificate_config;
+ grpc_ssl_server_certificate_config_fetcher *certificate_config_fetcher;
+};
+
static void ssl_server_destruct(grpc_exec_ctx *exec_ctx,
grpc_server_credentials *creds) {
grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds;
@@ -130,9 +136,7 @@ static void ssl_server_destruct(grpc_exec_ctx *exec_ctx,
static grpc_security_status ssl_server_create_security_connector(
grpc_exec_ctx *exec_ctx, grpc_server_credentials *creds,
grpc_server_security_connector **sc) {
- grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds;
- return grpc_ssl_server_security_connector_create(exec_ctx, creds, &c->config,
- sc);
+ return grpc_ssl_server_security_connector_create(exec_ctx, creds, sc);
}
static grpc_server_credentials_vtable ssl_server_vtable = {
@@ -170,6 +174,86 @@ static void ssl_build_server_config(
config->num_key_cert_pairs = num_key_cert_pairs;
}
+grpc_ssl_server_certificate_config *grpc_ssl_server_certificate_config_create(
+ const char *pem_root_certs,
+ const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs,
+ size_t num_key_cert_pairs) {
+ grpc_ssl_server_certificate_config *config =
+ (grpc_ssl_server_certificate_config *)gpr_zalloc(
+ sizeof(grpc_ssl_server_certificate_config));
+ if (pem_root_certs != NULL) {
+ config->pem_root_certs = gpr_strdup(pem_root_certs);
+ }
+ if (num_key_cert_pairs > 0) {
+ GPR_ASSERT(pem_key_cert_pairs != NULL);
+ config->pem_key_cert_pairs = (grpc_ssl_pem_key_cert_pair *)gpr_zalloc(
+ num_key_cert_pairs * sizeof(grpc_ssl_pem_key_cert_pair));
+ }
+ config->num_key_cert_pairs = num_key_cert_pairs;
+ for (size_t i = 0; i < num_key_cert_pairs; i++) {
+ GPR_ASSERT(pem_key_cert_pairs[i].private_key != NULL);
+ GPR_ASSERT(pem_key_cert_pairs[i].cert_chain != NULL);
+ config->pem_key_cert_pairs[i].cert_chain =
+ gpr_strdup(pem_key_cert_pairs[i].cert_chain);
+ config->pem_key_cert_pairs[i].private_key =
+ gpr_strdup(pem_key_cert_pairs[i].private_key);
+ }
+ return config;
+}
+
+void grpc_ssl_server_certificate_config_destroy(
+ grpc_ssl_server_certificate_config *config) {
+ if (config == NULL) return;
+ for (size_t i = 0; i < config->num_key_cert_pairs; i++) {
+ gpr_free((void *)config->pem_key_cert_pairs[i].private_key);
+ gpr_free((void *)config->pem_key_cert_pairs[i].cert_chain);
+ }
+ gpr_free(config->pem_key_cert_pairs);
+ gpr_free(config->pem_root_certs);
+ gpr_free(config);
+}
+
+grpc_ssl_server_credentials_options *
+grpc_ssl_server_credentials_create_options_using_config(
+ grpc_ssl_client_certificate_request_type client_certificate_request,
+ grpc_ssl_server_certificate_config *config) {
+ grpc_ssl_server_credentials_options *options = NULL;
+ if (config == NULL) {
+ gpr_log(GPR_ERROR, "Certificate config must not be NULL.");
+ goto done;
+ }
+ options = (grpc_ssl_server_credentials_options *)gpr_zalloc(
+ sizeof(grpc_ssl_server_credentials_options));
+ options->client_certificate_request = client_certificate_request;
+ options->certificate_config = config;
+done:
+ return options;
+}
+
+grpc_ssl_server_credentials_options *
+grpc_ssl_server_credentials_create_options_using_config_fetcher(
+ grpc_ssl_client_certificate_request_type client_certificate_request,
+ grpc_ssl_server_certificate_config_callback cb, void *user_data) {
+ if (cb == NULL) {
+ gpr_log(GPR_ERROR, "Invalid certificate config callback parameter.");
+ return NULL;
+ }
+
+ grpc_ssl_server_certificate_config_fetcher *fetcher =
+ (grpc_ssl_server_certificate_config_fetcher *)gpr_zalloc(
+ sizeof(grpc_ssl_server_certificate_config_fetcher));
+ fetcher->cb = cb;
+ fetcher->user_data = user_data;
+
+ grpc_ssl_server_credentials_options *options =
+ (grpc_ssl_server_credentials_options *)gpr_zalloc(
+ sizeof(grpc_ssl_server_credentials_options));
+ options->client_certificate_request = client_certificate_request;
+ options->certificate_config_fetcher = fetcher;
+
+ return options;
+}
+
grpc_server_credentials *grpc_ssl_server_credentials_create(
const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs,
size_t num_key_cert_pairs, int force_client_auth, void *reserved) {
@@ -186,8 +270,6 @@ grpc_server_credentials *grpc_ssl_server_credentials_create_ex(
size_t num_key_cert_pairs,
grpc_ssl_client_certificate_request_type client_certificate_request,
void *reserved) {
- grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)gpr_zalloc(
- sizeof(grpc_ssl_server_credentials));
GRPC_API_TRACE(
"grpc_ssl_server_credentials_create_ex("
"pem_root_certs=%s, pem_key_cert_pairs=%p, num_key_cert_pairs=%lu, "
@@ -195,11 +277,67 @@ grpc_server_credentials *grpc_ssl_server_credentials_create_ex(
5, (pem_root_certs, pem_key_cert_pairs, (unsigned long)num_key_cert_pairs,
client_certificate_request, reserved));
GPR_ASSERT(reserved == NULL);
+
+ grpc_ssl_server_certificate_config *cert_config =
+ grpc_ssl_server_certificate_config_create(
+ pem_root_certs, pem_key_cert_pairs, num_key_cert_pairs);
+ grpc_ssl_server_credentials_options *options =
+ grpc_ssl_server_credentials_create_options_using_config(
+ client_certificate_request, cert_config);
+
+ return grpc_ssl_server_credentials_create_with_options(options);
+}
+
+grpc_server_credentials *grpc_ssl_server_credentials_create_with_options(
+ grpc_ssl_server_credentials_options *options) {
+ grpc_server_credentials *retval = NULL;
+ grpc_ssl_server_credentials *c = NULL;
+
+ if (options == NULL) {
+ gpr_log(GPR_ERROR,
+ "Invalid options trying to create SSL server credentials.");
+ goto done;
+ }
+
+ if (options->certificate_config == NULL &&
+ options->certificate_config_fetcher == NULL) {
+ gpr_log(GPR_ERROR,
+ "SSL server credentials options must specify either "
+ "certificate config or fetcher.");
+ goto done;
+ } else if (options->certificate_config_fetcher != NULL &&
+ options->certificate_config_fetcher->cb == NULL) {
+ gpr_log(GPR_ERROR, "Certificate config fetcher callback must not be NULL.");
+ goto done;
+ }
+
+ c = (grpc_ssl_server_credentials *)gpr_zalloc(
+ sizeof(grpc_ssl_server_credentials));
c->base.type = GRPC_CHANNEL_CREDENTIALS_TYPE_SSL;
gpr_ref_init(&c->base.refcount, 1);
c->base.vtable = &ssl_server_vtable;
- ssl_build_server_config(pem_root_certs, pem_key_cert_pairs,
- num_key_cert_pairs, client_certificate_request,
- &c->config);
- return &c->base;
+
+ if (options->certificate_config_fetcher != NULL) {
+ c->config.client_certificate_request = options->client_certificate_request;
+ c->certificate_config_fetcher = *options->certificate_config_fetcher;
+ } else {
+ ssl_build_server_config(options->certificate_config->pem_root_certs,
+ options->certificate_config->pem_key_cert_pairs,
+ options->certificate_config->num_key_cert_pairs,
+ options->client_certificate_request, &c->config);
+ }
+
+ retval = &c->base;
+
+done:
+ grpc_ssl_server_credentials_options_destroy(options);
+ return retval;
+}
+
+void grpc_ssl_server_credentials_options_destroy(
+ grpc_ssl_server_credentials_options *o) {
+ if (o == NULL) return;
+ gpr_free(o->certificate_config_fetcher);
+ grpc_ssl_server_certificate_config_destroy(o->certificate_config);
+ gpr_free(o);
}
diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.h b/src/core/lib/security/credentials/ssl/ssl_credentials.h
index 42e425d9f1..5542484aae 100644
--- a/src/core/lib/security/credentials/ssl/ssl_credentials.h
+++ b/src/core/lib/security/credentials/ssl/ssl_credentials.h
@@ -29,9 +29,21 @@ typedef struct {
grpc_ssl_config config;
} grpc_ssl_credentials;
+struct grpc_ssl_server_certificate_config {
+ grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs;
+ size_t num_key_cert_pairs;
+ char *pem_root_certs;
+};
+
+typedef struct {
+ grpc_ssl_server_certificate_config_callback cb;
+ void *user_data;
+} grpc_ssl_server_certificate_config_fetcher;
+
typedef struct {
grpc_server_credentials base;
grpc_ssl_server_config config;
+ grpc_ssl_server_certificate_config_fetcher certificate_config_fetcher;
} grpc_ssl_server_credentials;
tsi_ssl_pem_key_cert_pair *grpc_convert_grpc_to_tsi_cert_pairs(
diff --git a/src/core/lib/security/transport/security_connector.cc b/src/core/lib/security/transport/security_connector.cc
index b050be2129..06160d0caa 100644
--- a/src/core/lib/security/transport/security_connector.cc
+++ b/src/core/lib/security/transport/security_connector.cc
@@ -34,6 +34,7 @@
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
+#include "src/core/lib/security/credentials/ssl/ssl_credentials.h"
#include "src/core/lib/security/transport/lb_targets_info.h"
#include "src/core/lib/security/transport/secure_endpoint.h"
#include "src/core/lib/security/transport/security_handshaker.h"
@@ -277,6 +278,30 @@ grpc_security_connector *grpc_security_connector_find_in_args(
return NULL;
}
+static tsi_client_certificate_request_type
+get_tsi_client_certificate_request_type(
+ grpc_ssl_client_certificate_request_type grpc_request_type) {
+ switch (grpc_request_type) {
+ case GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE:
+ return TSI_DONT_REQUEST_CLIENT_CERTIFICATE;
+
+ case GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY:
+ return TSI_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY;
+
+ case GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY:
+ return TSI_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY;
+
+ case GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY:
+ return TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY;
+
+ case GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY:
+ return TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY;
+
+ default:
+ return TSI_DONT_REQUEST_CLIENT_CERTIFICATE;
+ }
+}
+
/* -- Fake implementation. -- */
typedef struct {
@@ -533,6 +558,15 @@ typedef struct {
tsi_ssl_server_handshaker_factory *server_handshaker_factory;
} grpc_ssl_server_security_connector;
+static bool server_connector_has_cert_config_fetcher(
+ grpc_ssl_server_security_connector *c) {
+ GPR_ASSERT(c != NULL);
+ grpc_ssl_server_credentials *server_creds =
+ (grpc_ssl_server_credentials *)c->base.server_creds;
+ GPR_ASSERT(server_creds != NULL);
+ return server_creds->certificate_config_fetcher.cb != NULL;
+}
+
static void ssl_channel_destroy(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {
grpc_ssl_channel_security_connector *c =
@@ -573,7 +607,6 @@ static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx,
tsi_result_to_string(result));
return;
}
-
// Create handshakers.
grpc_handshake_manager_add(
handshake_mgr,
@@ -581,12 +614,102 @@ static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx,
exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base));
}
+static const char **fill_alpn_protocol_strings(size_t *num_alpn_protocols) {
+ GPR_ASSERT(num_alpn_protocols != NULL);
+ *num_alpn_protocols = grpc_chttp2_num_alpn_versions();
+ const char **alpn_protocol_strings =
+ (const char **)gpr_malloc(sizeof(const char *) * (*num_alpn_protocols));
+ for (size_t i = 0; i < *num_alpn_protocols; i++) {
+ alpn_protocol_strings[i] = grpc_chttp2_get_alpn_version_index(i);
+ }
+ return alpn_protocol_strings;
+}
+
+/* Attempts to replace the server_handshaker_factory with a new factory using
+ * the provided grpc_ssl_server_certificate_config. Should new factory creation
+ * fail, the existing factory will not be replaced. Returns true on success (new
+ * factory created). */
+static bool try_replace_server_handshaker_factory(
+ grpc_ssl_server_security_connector *sc,
+ const grpc_ssl_server_certificate_config *config) {
+ if (config == NULL) {
+ gpr_log(GPR_ERROR,
+ "Server certificate config callback returned invalid (NULL) "
+ "config.");
+ return false;
+ }
+ gpr_log(GPR_DEBUG, "Using new server certificate config (%p).", config);
+
+ size_t num_alpn_protocols = 0;
+ const char **alpn_protocol_strings =
+ fill_alpn_protocol_strings(&num_alpn_protocols);
+ tsi_ssl_pem_key_cert_pair *cert_pairs = grpc_convert_grpc_to_tsi_cert_pairs(
+ config->pem_key_cert_pairs, config->num_key_cert_pairs);
+ tsi_ssl_server_handshaker_factory *new_handshaker_factory = NULL;
+ grpc_ssl_server_credentials *server_creds =
+ (grpc_ssl_server_credentials *)sc->base.server_creds;
+ tsi_result result = tsi_create_ssl_server_handshaker_factory_ex(
+ cert_pairs, config->num_key_cert_pairs, config->pem_root_certs,
+ get_tsi_client_certificate_request_type(
+ server_creds->config.client_certificate_request),
+ ssl_cipher_suites(), alpn_protocol_strings, (uint16_t)num_alpn_protocols,
+ &new_handshaker_factory);
+ gpr_free(cert_pairs);
+ gpr_free((void *)alpn_protocol_strings);
+
+ if (result != TSI_OK) {
+ gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.",
+ tsi_result_to_string(result));
+ return false;
+ }
+ tsi_ssl_server_handshaker_factory_unref(sc->server_handshaker_factory);
+ sc->server_handshaker_factory = new_handshaker_factory;
+ return true;
+}
+
+/* Attempts to fetch the server certificate config if a callback is available.
+ * Current certificate config will continue to be used if the callback returns
+ * an error. Returns true if new credentials were sucessfully loaded. */
+static bool try_fetch_ssl_server_credentials(
+ grpc_ssl_server_security_connector *sc) {
+ grpc_ssl_server_certificate_config *certificate_config = NULL;
+ bool status;
+
+ GPR_ASSERT(sc != NULL);
+ if (!server_connector_has_cert_config_fetcher(sc)) return false;
+
+ grpc_ssl_server_credentials *server_creds =
+ (grpc_ssl_server_credentials *)sc->base.server_creds;
+ grpc_ssl_certificate_config_reload_status cb_result =
+ server_creds->certificate_config_fetcher.cb(
+ server_creds->certificate_config_fetcher.user_data,
+ &certificate_config);
+ if (cb_result == GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_UNCHANGED) {
+ gpr_log(GPR_DEBUG, "No change in SSL server credentials.");
+ status = false;
+ } else if (cb_result == GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_NEW) {
+ status = try_replace_server_handshaker_factory(sc, certificate_config);
+ } else {
+ // Log error, continue using previously-loaded credentials.
+ gpr_log(GPR_ERROR,
+ "Failed fetching new server credentials, continuing to "
+ "use previously-loaded credentials.");
+ status = false;
+ }
+
+ if (certificate_config != NULL) {
+ grpc_ssl_server_certificate_config_destroy(certificate_config);
+ }
+ return status;
+}
+
static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx,
grpc_server_security_connector *sc,
grpc_handshake_manager *handshake_mgr) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
// Instantiate TSI handshaker.
+ try_fetch_ssl_server_credentials(c);
tsi_handshaker *tsi_hs = NULL;
tsi_result result = tsi_ssl_server_handshaker_factory_create_handshaker(
c->server_handshaker_factory, &tsi_hs);
@@ -595,7 +718,6 @@ static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx,
tsi_result_to_string(result));
return;
}
-
// Create handshakers.
grpc_handshake_manager_add(
handshake_mgr,
@@ -857,31 +979,6 @@ grpc_slice grpc_get_default_ssl_roots_for_testing(void) {
return compute_default_pem_root_certs_once();
}
-static tsi_client_certificate_request_type
-get_tsi_client_certificate_request_type(
- grpc_ssl_client_certificate_request_type grpc_request_type) {
- switch (grpc_request_type) {
- case GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE:
- return TSI_DONT_REQUEST_CLIENT_CERTIFICATE;
-
- case GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY:
- return TSI_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY;
-
- case GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY:
- return TSI_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY;
-
- case GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY:
- return TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY;
-
- case GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY:
- return TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY;
-
- default:
- // Is this a sane default
- return TSI_DONT_REQUEST_CLIENT_CERTIFICATE;
- }
-}
-
const char *grpc_get_default_ssl_roots(void) {
/* TODO(jboeuf@google.com): Maybe revisit the approach which consists in
loading all the roots once for the lifetime of the process. */
@@ -897,18 +994,14 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
grpc_call_credentials *request_metadata_creds,
const grpc_ssl_config *config, const char *target_name,
const char *overridden_target_name, grpc_channel_security_connector **sc) {
- size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions();
+ size_t num_alpn_protocols = 0;
const char **alpn_protocol_strings =
- (const char **)gpr_malloc(sizeof(const char *) * num_alpn_protocols);
+ fill_alpn_protocol_strings(&num_alpn_protocols);
tsi_result result = TSI_OK;
grpc_ssl_channel_security_connector *c;
- size_t i;
const char *pem_root_certs;
char *port;
bool has_key_cert_pair;
- for (i = 0; i < num_alpn_protocols; i++) {
- alpn_protocol_strings[i] = grpc_chttp2_get_alpn_version_index(i);
- }
if (config == NULL || target_name == NULL) {
gpr_log(GPR_ERROR, "An ssl channel needs a config and a target name.");
@@ -965,50 +1058,64 @@ error:
return GRPC_SECURITY_ERROR;
}
+static grpc_ssl_server_security_connector *
+grpc_ssl_server_security_connector_initialize(
+ grpc_server_credentials *server_creds) {
+ grpc_ssl_server_security_connector *c =
+ (grpc_ssl_server_security_connector *)gpr_zalloc(
+ sizeof(grpc_ssl_server_security_connector));
+ gpr_ref_init(&c->base.base.refcount, 1);
+ c->base.base.url_scheme = GRPC_SSL_URL_SCHEME;
+ c->base.base.vtable = &ssl_server_vtable;
+ c->base.add_handshakers = ssl_server_add_handshakers;
+ c->base.server_creds = grpc_server_credentials_ref(server_creds);
+ return c;
+}
+
grpc_security_status grpc_ssl_server_security_connector_create(
- grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds,
- const grpc_ssl_server_config *config, grpc_server_security_connector **sc) {
- size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions();
- const char **alpn_protocol_strings =
- (const char **)gpr_malloc(sizeof(const char *) * num_alpn_protocols);
+ grpc_exec_ctx *exec_ctx, grpc_server_credentials *gsc,
+ grpc_server_security_connector **sc) {
tsi_result result = TSI_OK;
- grpc_ssl_server_security_connector *c;
- size_t i;
+ grpc_ssl_server_credentials *server_credentials =
+ (grpc_ssl_server_credentials *)gsc;
+ grpc_security_status retval = GRPC_SECURITY_OK;
- for (i = 0; i < num_alpn_protocols; i++) {
- alpn_protocol_strings[i] = grpc_chttp2_get_alpn_version_index(i);
- }
+ GPR_ASSERT(server_credentials != NULL);
+ GPR_ASSERT(sc != NULL);
- if (config == NULL || config->num_key_cert_pairs == 0) {
- gpr_log(GPR_ERROR, "An SSL server needs a key and a cert.");
- goto error;
+ grpc_ssl_server_security_connector *c =
+ grpc_ssl_server_security_connector_initialize(gsc);
+ if (server_connector_has_cert_config_fetcher(c)) {
+ // Load initial credentials from certificate_config_fetcher:
+ if (!try_fetch_ssl_server_credentials(c)) {
+ gpr_log(GPR_ERROR, "Failed loading SSL server credentials from fetcher.");
+ retval = GRPC_SECURITY_ERROR;
+ }
+ } else {
+ size_t num_alpn_protocols = 0;
+ const char **alpn_protocol_strings =
+ fill_alpn_protocol_strings(&num_alpn_protocols);
+ result = tsi_create_ssl_server_handshaker_factory_ex(
+ server_credentials->config.pem_key_cert_pairs,
+ server_credentials->config.num_key_cert_pairs,
+ server_credentials->config.pem_root_certs,
+ get_tsi_client_certificate_request_type(
+ server_credentials->config.client_certificate_request),
+ ssl_cipher_suites(), alpn_protocol_strings,
+ (uint16_t)num_alpn_protocols, &c->server_handshaker_factory);
+ gpr_free((void *)alpn_protocol_strings);
+ if (result != TSI_OK) {
+ gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.",
+ tsi_result_to_string(result));
+ retval = GRPC_SECURITY_ERROR;
+ }
}
- c = (grpc_ssl_server_security_connector *)gpr_zalloc(
- sizeof(grpc_ssl_server_security_connector));
- gpr_ref_init(&c->base.base.refcount, 1);
- c->base.base.url_scheme = GRPC_SSL_URL_SCHEME;
- c->base.base.vtable = &ssl_server_vtable;
- c->base.server_creds = grpc_server_credentials_ref(server_creds);
- result = tsi_create_ssl_server_handshaker_factory_ex(
- config->pem_key_cert_pairs, config->num_key_cert_pairs,
- config->pem_root_certs, get_tsi_client_certificate_request_type(
- config->client_certificate_request),
- ssl_cipher_suites(), alpn_protocol_strings, (uint16_t)num_alpn_protocols,
- &c->server_handshaker_factory);
- if (result != TSI_OK) {
- gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.",
- tsi_result_to_string(result));
- ssl_server_destroy(exec_ctx, &c->base.base);
- *sc = NULL;
- goto error;
+ if (retval == GRPC_SECURITY_OK) {
+ *sc = &c->base;
+ } else {
+ if (c != NULL) ssl_server_destroy(exec_ctx, &c->base.base);
+ if (sc != NULL) *sc = NULL;
}
- c->base.add_handshakers = ssl_server_add_handshakers;
- *sc = &c->base;
- gpr_free((void *)alpn_protocol_strings);
- return GRPC_SECURITY_OK;
-
-error:
- gpr_free((void *)alpn_protocol_strings);
- return GRPC_SECURITY_ERROR;
+ return retval;
}
diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h
index 8287151f44..54a563bb2c 100644
--- a/src/core/lib/security/transport/security_connector.h
+++ b/src/core/lib/security/transport/security_connector.h
@@ -248,8 +248,8 @@ typedef struct {
specific error code otherwise.
*/
grpc_security_status grpc_ssl_server_security_connector_create(
- grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds,
- const grpc_ssl_server_config *config, grpc_server_security_connector **sc);
+ grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_credentials,
+ grpc_server_security_connector **sc);
/* Util. */
const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer,
diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc
index f328a6cdbb..652c26cf0a 100644
--- a/src/core/lib/transport/connectivity_state.cc
+++ b/src/core/lib/transport/connectivity_state.cc
@@ -29,8 +29,6 @@ grpc_tracer_flag grpc_connectivity_state_trace =
const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
switch (state) {
- case GRPC_CHANNEL_INIT:
- return "INIT";
case GRPC_CHANNEL_IDLE:
return "IDLE";
case GRPC_CHANNEL_CONNECTING:
@@ -174,7 +172,6 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_name(state), reason, error, error_string);
}
switch (state) {
- case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_READY:
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index 19a25c838f..9df531066e 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -48,187 +48,13 @@
namespace grpc {
-namespace {
-int kConnectivityCheckIntervalMsec = 500;
-void WatchStateChange(void* arg);
-
-class TagSaver final : public CompletionQueueTag {
- public:
- explicit TagSaver(void* tag) : tag_(tag) {}
- ~TagSaver() override {}
- bool FinalizeResult(void** tag, bool* status) override {
- *tag = tag_;
- delete this;
- return true;
- }
-
- private:
- void* tag_;
-};
-
-// Constantly watches channel connectivity status to reconnect a transiently
-// disconnected channel. This is a temporary work-around before we have retry
-// support.
-class ChannelConnectivityWatcher : private GrpcLibraryCodegen {
- public:
- static void StartWatching(grpc_channel* channel) {
- if (!IsDisabled()) {
- std::unique_lock<std::mutex> lock(g_watcher_mu_);
- if (g_watcher_ == nullptr) {
- g_watcher_ = new ChannelConnectivityWatcher();
- }
- g_watcher_->StartWatchingLocked(channel);
- }
- }
-
- static void StopWatching() {
- if (!IsDisabled()) {
- std::unique_lock<std::mutex> lock(g_watcher_mu_);
- if (g_watcher_->StopWatchingLocked()) {
- delete g_watcher_;
- g_watcher_ = nullptr;
- }
- }
- }
-
- private:
- ChannelConnectivityWatcher() : channel_count_(0), shutdown_(false) {
- gpr_ref_init(&ref_, 0);
- gpr_thd_options options = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&options);
- gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
- }
-
- static bool IsDisabled() {
- char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
- bool disabled = gpr_is_true(env);
- gpr_free(env);
- return disabled;
- }
-
- void WatchStateChangeImpl() {
- bool ok = false;
- void* tag = NULL;
- CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT;
- while (true) {
- {
- std::unique_lock<std::mutex> lock(shutdown_mu_);
- if (shutdown_) {
- // Drain cq_ if the watcher is shutting down
- status = cq_.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME));
- } else {
- status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
- // Make sure we've seen 2 TIMEOUTs before going to sleep
- if (status == CompletionQueue::TIMEOUT) {
- status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
- if (status == CompletionQueue::TIMEOUT) {
- shutdown_cv_.wait_for(lock, std::chrono::milliseconds(
- kConnectivityCheckIntervalMsec));
- continue;
- }
- }
- }
- }
- ChannelState* channel_state = static_cast<ChannelState*>(tag);
- channel_state->state =
- grpc_channel_check_connectivity_state(channel_state->channel, false);
- if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
- void* shutdown_tag = NULL;
- channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
- delete channel_state;
- if (gpr_unref(&ref_)) {
- break;
- }
- } else {
- TagSaver* tag_saver = new TagSaver(channel_state);
- grpc_channel_watch_connectivity_state(
- channel_state->channel, channel_state->state,
- gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
- }
- }
- }
-
- void StartWatchingLocked(grpc_channel* channel) {
- if (thd_id_ != 0) {
- gpr_ref(&ref_);
- ++channel_count_;
- ChannelState* channel_state = new ChannelState(channel);
- // The first grpc_channel_watch_connectivity_state() is not used to
- // monitor the channel state change, but to hold a reference of the
- // c channel. So that WatchStateChangeImpl() can observe state ==
- // GRPC_CHANNEL_SHUTDOWN before the channel gets destroyed.
- grpc_channel_watch_connectivity_state(
- channel_state->channel, channel_state->state,
- gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(),
- new TagSaver(nullptr));
- grpc_channel_watch_connectivity_state(
- channel_state->channel, channel_state->state,
- gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(),
- new TagSaver(channel_state));
- }
- }
-
- bool StopWatchingLocked() {
- if (--channel_count_ == 0) {
- {
- std::unique_lock<std::mutex> lock(shutdown_mu_);
- shutdown_ = true;
- shutdown_cv_.notify_one();
- }
- gpr_thd_join(thd_id_);
- return true;
- }
- return false;
- }
-
- friend void WatchStateChange(void* arg);
- struct ChannelState {
- explicit ChannelState(grpc_channel* channel)
- : channel(channel), state(GRPC_CHANNEL_IDLE){};
- grpc_channel* channel;
- grpc_connectivity_state state;
- CompletionQueue shutdown_cq;
- };
- gpr_thd_id thd_id_;
- CompletionQueue cq_;
- gpr_refcount ref_;
- int channel_count_;
-
- std::mutex shutdown_mu_;
- std::condition_variable shutdown_cv_; // protected by shutdown_mu_
- bool shutdown_; // protected by shutdown_mu_
-
- static std::mutex g_watcher_mu_;
- static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_
-};
-
-std::mutex ChannelConnectivityWatcher::g_watcher_mu_;
-ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr;
-
-void WatchStateChange(void* arg) {
- ChannelConnectivityWatcher* watcher =
- static_cast<ChannelConnectivityWatcher*>(arg);
- watcher->WatchStateChangeImpl();
-}
-} // namespace
-
static internal::GrpcLibraryInitializer g_gli_initializer;
Channel::Channel(const grpc::string& host, grpc_channel* channel)
: host_(host), c_channel_(channel) {
g_gli_initializer.summon();
- if (grpc_channel_support_connectivity_watcher(channel)) {
- ChannelConnectivityWatcher::StartWatching(channel);
- }
}
-Channel::~Channel() {
- const bool stop_watching =
- grpc_channel_support_connectivity_watcher(c_channel_);
- grpc_channel_destroy(c_channel_);
- if (stop_watching) {
- ChannelConnectivityWatcher::StopWatching();
- }
-}
+Channel::~Channel() { grpc_channel_destroy(c_channel_); }
namespace {
@@ -259,8 +85,9 @@ grpc::string Channel::GetServiceConfigJSON() const {
&channel_info.service_config_json);
}
-Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
- CompletionQueue* cq) {
+internal::Call Channel::CreateCall(const internal::RpcMethod& method,
+ ClientContext* context,
+ CompletionQueue* cq) {
const bool kRegistered = method.channel_tag() && context->authority().empty();
grpc_call* c_call = NULL;
if (kRegistered) {
@@ -292,10 +119,11 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
}
grpc_census_call_set_context(c_call, context->census_context());
context->set_call(c_call, shared_from_this());
- return Call(c_call, this, cq);
+ return internal::Call(c_call, this, cq);
}
-void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
+void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
@@ -313,6 +141,24 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) {
return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
}
+namespace {
+
+class TagSaver final : public internal::CompletionQueueTag {
+ public:
+ explicit TagSaver(void* tag) : tag_(tag) {}
+ ~TagSaver() override {}
+ bool FinalizeResult(void** tag, bool* status) override {
+ *tag = tag_;
+ delete this;
+ return true;
+ }
+
+ private:
+ void* tag_;
+};
+
+} // namespace
+
void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) {
diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc
index 693b8bea56..fc18ce9093 100644
--- a/src/cpp/client/generic_stub.cc
+++ b/src/cpp/client/generic_stub.cc
@@ -27,8 +27,9 @@ std::unique_ptr<GenericClientAsyncReaderWriter> CallInternal(
ChannelInterface* channel, ClientContext* context,
const grpc::string& method, CompletionQueue* cq, bool start, void* tag) {
return std::unique_ptr<GenericClientAsyncReaderWriter>(
- GenericClientAsyncReaderWriter::Create(
- channel, cq, RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING),
+ internal::ClientAsyncReaderWriterFactory<ByteBuffer, ByteBuffer>::Create(
+ channel, cq, internal::RpcMethod(method.c_str(),
+ internal::RpcMethod::BIDI_STREAMING),
context, start, tag));
}
@@ -52,8 +53,9 @@ std::unique_ptr<GenericClientAsyncResponseReader> GenericStub::PrepareUnaryCall(
ClientContext* context, const grpc::string& method,
const ByteBuffer& request, CompletionQueue* cq) {
return std::unique_ptr<GenericClientAsyncResponseReader>(
- GenericClientAsyncResponseReader::Create(
- channel_.get(), cq, RpcMethod(method.c_str(), RpcMethod::NORMAL_RPC),
+ internal::ClientAsyncResponseReaderFactory<ByteBuffer>::Create(
+ channel_.get(), cq,
+ internal::RpcMethod(method.c_str(), internal::RpcMethod::NORMAL_RPC),
context, request, false));
}
diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc
index 4a2e2be688..eb6dc8cc5f 100644
--- a/src/cpp/common/completion_queue_cc.cc
+++ b/src/cpp/common/completion_queue_cc.cc
@@ -60,7 +60,7 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
case GRPC_QUEUE_SHUTDOWN:
return SHUTDOWN;
case GRPC_OP_COMPLETE:
- auto cq_tag = static_cast<CompletionQueueTag*>(ev.tag);
+ auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag);
*ok = ev.success != 0;
*tag = cq_tag;
if (cq_tag->FinalizeResult(tag, ok)) {
@@ -87,7 +87,7 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
flushed_ = true;
if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
&res)) {
- auto cq_tag = static_cast<CompletionQueueTag*>(res_tag);
+ auto cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag);
*ok = res == 1;
if (cq_tag->FinalizeResult(tag, ok)) {
return true;
diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc
index d2cba6d662..10dbd3c39a 100644
--- a/src/cpp/server/health/default_health_check_service.cc
+++ b/src/cpp/server/health/default_health_check_service.cc
@@ -37,11 +37,12 @@ const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
DefaultHealthCheckService* service)
: service_(service), method_(nullptr) {
- MethodHandler* handler =
- new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>(
+ internal::MethodHandler* handler =
+ new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer,
+ ByteBuffer>(
std::mem_fn(&HealthCheckServiceImpl::Check), this);
- method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC,
- handler);
+ method_ = new internal::RpcServiceMethod(
+ kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler);
AddMethod(method_);
}
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h
index 09d5cebe98..99d6680c50 100644
--- a/src/cpp/server/health/default_health_check_service.h
+++ b/src/cpp/server/health/default_health_check_service.h
@@ -41,7 +41,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
private:
const DefaultHealthCheckService* const service_;
- RpcServiceMethod* method_;
+ internal::RpcServiceMethod* method_;
};
DefaultHealthCheckService();
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index d982a3d2b7..6480482774 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -90,7 +90,8 @@ class Server::UnimplementedAsyncRequest final
ServerCompletionQueue* const cq_;
};
-typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
+typedef internal::SneakyCallOpSet<internal::CallOpSendInitialMetadata,
+ internal::CallOpServerSendStatus>
UnimplementedAsyncResponseOp;
class Server::UnimplementedAsyncResponse final
: public UnimplementedAsyncResponseOp {
@@ -108,12 +109,12 @@ class Server::UnimplementedAsyncResponse final
UnimplementedAsyncRequest* const request_;
};
-class ShutdownTag : public CompletionQueueTag {
+class ShutdownTag : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) { return false; }
};
-class DummyTag : public CompletionQueueTag {
+class DummyTag : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
*status = true;
@@ -121,15 +122,15 @@ class DummyTag : public CompletionQueueTag {
}
};
-class Server::SyncRequest final : public CompletionQueueTag {
+class Server::SyncRequest final : public internal::CompletionQueueTag {
public:
- SyncRequest(RpcServiceMethod* method, void* tag)
+ SyncRequest(internal::RpcServiceMethod* method, void* tag)
: method_(method),
tag_(tag),
in_flight_(false),
- has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- RpcMethod::SERVER_STREAMING),
+ has_request_payload_(
+ method->method_type() == internal::RpcMethod::NORMAL_RPC ||
+ method->method_type() == internal::RpcMethod::SERVER_STREAMING),
call_details_(nullptr),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
@@ -212,14 +213,14 @@ class Server::SyncRequest final : public CompletionQueueTag {
void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) {
ctx_.BeginCompletionOp(&call_);
global_callbacks->PreSynchronousRequest(&ctx_);
- method_->handler()->RunHandler(
- MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_));
+ method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter(
+ &call_, &ctx_, request_payload_));
global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr;
cq_.Shutdown();
- CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
+ internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
/* Ensure the cq_ is shutdown */
@@ -229,15 +230,15 @@ class Server::SyncRequest final : public CompletionQueueTag {
private:
CompletionQueue cq_;
- Call call_;
+ internal::Call call_;
ServerContext ctx_;
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
- RpcServiceMethod* const method_;
+ internal::RpcServiceMethod* const method_;
};
private:
- RpcServiceMethod* const method_;
+ internal::RpcServiceMethod* const method_;
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
@@ -311,14 +312,15 @@ class Server::SyncRequestThreadManager : public ThreadManager {
// object
}
- void AddSyncMethod(RpcServiceMethod* method, void* tag) {
+ void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) {
sync_requests_.emplace_back(new SyncRequest(method, tag));
}
void AddUnknownSyncMethod() {
if (!sync_requests_.empty()) {
- unknown_method_.reset(new RpcServiceMethod(
- "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ unknown_method_.reset(new internal::RpcServiceMethod(
+ "unknown", internal::RpcMethod::BIDI_STREAMING,
+ new internal::UnknownMethodHandler));
sync_requests_.emplace_back(
new SyncRequest(unknown_method_.get(), nullptr));
}
@@ -355,8 +357,8 @@ class Server::SyncRequestThreadManager : public ThreadManager {
CompletionQueue* server_cq_;
int cq_timeout_msec_;
std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
- std::unique_ptr<RpcServiceMethod> unknown_method_;
- std::unique_ptr<RpcServiceMethod> health_check_;
+ std::unique_ptr<internal::RpcServiceMethod> unknown_method_;
+ std::unique_ptr<internal::RpcServiceMethod> health_check_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
};
@@ -439,13 +441,13 @@ std::shared_ptr<Channel> Server::InProcessChannel(
}
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
- RpcServiceMethod* method) {
+ internal::RpcServiceMethod* method) {
switch (method->method_type()) {
- case RpcMethod::NORMAL_RPC:
- case RpcMethod::SERVER_STREAMING:
+ case internal::RpcMethod::NORMAL_RPC:
+ case internal::RpcMethod::SERVER_STREAMING:
return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
- case RpcMethod::CLIENT_STREAMING:
- case RpcMethod::BIDI_STREAMING:
+ case internal::RpcMethod::CLIENT_STREAMING:
+ case internal::RpcMethod::BIDI_STREAMING:
return GRPC_SRM_PAYLOAD_NONE;
}
GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
@@ -466,7 +468,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
continue;
}
- RpcServiceMethod* method = it->get();
+ internal::RpcServiceMethod* method = it->get();
void* tag = grpc_server_register_method(
server_, method->name(), host ? host->c_str() : nullptr,
PayloadHandlingForMethod(method), 0);
@@ -606,7 +608,8 @@ void Server::Wait() {
}
}
-void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
+void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
@@ -622,8 +625,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
- bool delete_on_finalize)
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ void* tag, bool delete_on_finalize)
: server_(server),
context_(context),
stream_(stream),
@@ -645,7 +648,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
}
context_->set_call(call_);
context_->cq_ = call_cq_;
- Call call(call_, server_, call_cq_, server_->max_receive_message_size());
+ internal::Call call(call_, server_, call_cq_,
+ server_->max_receive_message_size());
if (*status && call_) {
context_->BeginCompletionOp(&call);
}
@@ -660,7 +664,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
@@ -675,7 +680,7 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest(
ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
ServerInterface* server, GenericServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: BaseAsyncRequest(server, context, stream, call_cq, tag,
delete_on_finalize) {
@@ -718,7 +723,7 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
UnimplementedAsyncRequest* request)
: request_(request) {
Status status(StatusCode::UNIMPLEMENTED, "");
- UnknownMethodHandler::FillOps(request_->context(), this);
+ internal::UnknownMethodHandler::FillOps(request_->context(), this);
request_->stream()->call_.PerformOps(this);
}
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index d7876a000b..f2cb6363f5 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -37,7 +37,7 @@ namespace grpc {
// CompletionOp
-class ServerContext::CompletionOp final : public CallOpSetInterface {
+class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
CompletionOp()
@@ -146,7 +146,7 @@ ServerContext::~ServerContext() {
}
}
-void ServerContext::BeginCompletionOp(Call* call) {
+void ServerContext::BeginCompletionOp(internal::Call* call) {
GPR_ASSERT(!completion_op_);
completion_op_ = new CompletionOp();
if (has_notify_when_done_tag_) {
@@ -155,8 +155,8 @@ void ServerContext::BeginCompletionOp(Call* call) {
call->PerformOps(completion_op_);
}
-CompletionQueueTag* ServerContext::GetCompletionOpTag() {
- return static_cast<CompletionQueueTag*>(completion_op_);
+internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
+ return static_cast<internal::CompletionQueueTag*>(completion_op_);
}
void ServerContext::AddInitialMetadata(const grpc::string& key,
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 140f4ceee1..fbe65ba693 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -254,6 +254,7 @@ CORE_SOURCE_FILES = [
'src/core/tsi/transport_security_adapter.cc',
'src/core/ext/transport/chttp2/server/chttp2_server.cc',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc',
+ 'src/core/ext/filters/client_channel/backup_poller.cc',
'src/core/ext/filters/client_channel/channel_connectivity.cc',
'src/core/ext/filters/client_channel/client_channel.cc',
'src/core/ext/filters/client_channel/client_channel_factory.cc',
@@ -293,6 +294,7 @@ CORE_SOURCE_FILES = [
'third_party/nanopb/pb_encode.c',
'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
+ 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc',
'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc',
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index cd1bd98abc..843e05ab66 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -155,8 +155,14 @@ grpc_google_iam_credentials_create_type grpc_google_iam_credentials_create_impor
grpc_metadata_credentials_create_from_plugin_type grpc_metadata_credentials_create_from_plugin_import;
grpc_secure_channel_create_type grpc_secure_channel_create_import;
grpc_server_credentials_release_type grpc_server_credentials_release_import;
+grpc_ssl_server_certificate_config_create_type grpc_ssl_server_certificate_config_create_import;
+grpc_ssl_server_certificate_config_destroy_type grpc_ssl_server_certificate_config_destroy_import;
grpc_ssl_server_credentials_create_type grpc_ssl_server_credentials_create_import;
grpc_ssl_server_credentials_create_ex_type grpc_ssl_server_credentials_create_ex_import;
+grpc_ssl_server_credentials_create_options_using_config_type grpc_ssl_server_credentials_create_options_using_config_import;
+grpc_ssl_server_credentials_create_options_using_config_fetcher_type grpc_ssl_server_credentials_create_options_using_config_fetcher_import;
+grpc_ssl_server_credentials_options_destroy_type grpc_ssl_server_credentials_options_destroy_import;
+grpc_ssl_server_credentials_create_with_options_type grpc_ssl_server_credentials_create_with_options_import;
grpc_server_add_secure_http2_port_type grpc_server_add_secure_http2_port_import;
grpc_call_set_credentials_type grpc_call_set_credentials_import;
grpc_server_credentials_set_auth_metadata_processor_type grpc_server_credentials_set_auth_metadata_processor_import;
@@ -465,8 +471,14 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_metadata_credentials_create_from_plugin_import = (grpc_metadata_credentials_create_from_plugin_type) GetProcAddress(library, "grpc_metadata_credentials_create_from_plugin");
grpc_secure_channel_create_import = (grpc_secure_channel_create_type) GetProcAddress(library, "grpc_secure_channel_create");
grpc_server_credentials_release_import = (grpc_server_credentials_release_type) GetProcAddress(library, "grpc_server_credentials_release");
+ grpc_ssl_server_certificate_config_create_import = (grpc_ssl_server_certificate_config_create_type) GetProcAddress(library, "grpc_ssl_server_certificate_config_create");
+ grpc_ssl_server_certificate_config_destroy_import = (grpc_ssl_server_certificate_config_destroy_type) GetProcAddress(library, "grpc_ssl_server_certificate_config_destroy");
grpc_ssl_server_credentials_create_import = (grpc_ssl_server_credentials_create_type) GetProcAddress(library, "grpc_ssl_server_credentials_create");
grpc_ssl_server_credentials_create_ex_import = (grpc_ssl_server_credentials_create_ex_type) GetProcAddress(library, "grpc_ssl_server_credentials_create_ex");
+ grpc_ssl_server_credentials_create_options_using_config_import = (grpc_ssl_server_credentials_create_options_using_config_type) GetProcAddress(library, "grpc_ssl_server_credentials_create_options_using_config");
+ grpc_ssl_server_credentials_create_options_using_config_fetcher_import = (grpc_ssl_server_credentials_create_options_using_config_fetcher_type) GetProcAddress(library, "grpc_ssl_server_credentials_create_options_using_config_fetcher");
+ grpc_ssl_server_credentials_options_destroy_import = (grpc_ssl_server_credentials_options_destroy_type) GetProcAddress(library, "grpc_ssl_server_credentials_options_destroy");
+ grpc_ssl_server_credentials_create_with_options_import = (grpc_ssl_server_credentials_create_with_options_type) GetProcAddress(library, "grpc_ssl_server_credentials_create_with_options");
grpc_server_add_secure_http2_port_import = (grpc_server_add_secure_http2_port_type) GetProcAddress(library, "grpc_server_add_secure_http2_port");
grpc_call_set_credentials_import = (grpc_call_set_credentials_type) GetProcAddress(library, "grpc_call_set_credentials");
grpc_server_credentials_set_auth_metadata_processor_import = (grpc_server_credentials_set_auth_metadata_processor_type) GetProcAddress(library, "grpc_server_credentials_set_auth_metadata_processor");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index 74b6979fef..6c99744b8f 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -446,12 +446,30 @@ extern grpc_secure_channel_create_type grpc_secure_channel_create_import;
typedef void(*grpc_server_credentials_release_type)(grpc_server_credentials *creds);
extern grpc_server_credentials_release_type grpc_server_credentials_release_import;
#define grpc_server_credentials_release grpc_server_credentials_release_import
+typedef grpc_ssl_server_certificate_config *(*grpc_ssl_server_certificate_config_create_type)(const char *pem_root_certs, const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs);
+extern grpc_ssl_server_certificate_config_create_type grpc_ssl_server_certificate_config_create_import;
+#define grpc_ssl_server_certificate_config_create grpc_ssl_server_certificate_config_create_import
+typedef void(*grpc_ssl_server_certificate_config_destroy_type)(grpc_ssl_server_certificate_config *config);
+extern grpc_ssl_server_certificate_config_destroy_type grpc_ssl_server_certificate_config_destroy_import;
+#define grpc_ssl_server_certificate_config_destroy grpc_ssl_server_certificate_config_destroy_import
typedef grpc_server_credentials *(*grpc_ssl_server_credentials_create_type)(const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, int force_client_auth, void *reserved);
extern grpc_ssl_server_credentials_create_type grpc_ssl_server_credentials_create_import;
#define grpc_ssl_server_credentials_create grpc_ssl_server_credentials_create_import
typedef grpc_server_credentials *(*grpc_ssl_server_credentials_create_ex_type)(const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, grpc_ssl_client_certificate_request_type client_certificate_request, void *reserved);
extern grpc_ssl_server_credentials_create_ex_type grpc_ssl_server_credentials_create_ex_import;
#define grpc_ssl_server_credentials_create_ex grpc_ssl_server_credentials_create_ex_import
+typedef grpc_ssl_server_credentials_options *(*grpc_ssl_server_credentials_create_options_using_config_type)(grpc_ssl_client_certificate_request_type client_certificate_request, grpc_ssl_server_certificate_config *certificate_config);
+extern grpc_ssl_server_credentials_create_options_using_config_type grpc_ssl_server_credentials_create_options_using_config_import;
+#define grpc_ssl_server_credentials_create_options_using_config grpc_ssl_server_credentials_create_options_using_config_import
+typedef grpc_ssl_server_credentials_options *(*grpc_ssl_server_credentials_create_options_using_config_fetcher_type)(grpc_ssl_client_certificate_request_type client_certificate_request, grpc_ssl_server_certificate_config_callback cb, void *user_data);
+extern grpc_ssl_server_credentials_create_options_using_config_fetcher_type grpc_ssl_server_credentials_create_options_using_config_fetcher_import;
+#define grpc_ssl_server_credentials_create_options_using_config_fetcher grpc_ssl_server_credentials_create_options_using_config_fetcher_import
+typedef void(*grpc_ssl_server_credentials_options_destroy_type)(grpc_ssl_server_credentials_options *options);
+extern grpc_ssl_server_credentials_options_destroy_type grpc_ssl_server_credentials_options_destroy_import;
+#define grpc_ssl_server_credentials_options_destroy grpc_ssl_server_credentials_options_destroy_import
+typedef grpc_server_credentials *(*grpc_ssl_server_credentials_create_with_options_type)(grpc_ssl_server_credentials_options *options);
+extern grpc_ssl_server_credentials_create_with_options_type grpc_ssl_server_credentials_create_with_options_import;
+#define grpc_ssl_server_credentials_create_with_options grpc_ssl_server_credentials_create_with_options_import
typedef int(*grpc_server_add_secure_http2_port_type)(grpc_server *server, const char *addr, grpc_server_credentials *creds);
extern grpc_server_add_secure_http2_port_type grpc_server_add_secure_http2_port_import;
#define grpc_server_add_secure_http2_port grpc_server_add_secure_http2_port_import