aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc162
-rw-r--r--src/core/ext/filters/client_channel/client_channel.h3
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.cc4
-rw-r--r--src/core/ext/filters/client_channel/health/health.pb.c23
-rw-r--r--src/core/ext/filters/client_channel/health/health.pb.h73
-rw-r--r--src/core/ext/filters/client_channel/health/health_check_client.cc653
-rw-r--r--src/core/ext/filters/client_channel/health/health_check_client.h173
-rw-r--r--src/core/ext/filters/client_channel/http_connect_handshaker.cc2
-rw-r--r--src/core/ext/filters/client_channel/http_proxy.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc10
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc7
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h25
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds.cc1869
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds.h36
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.cc26
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h36
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc107
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc85
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h72
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc307
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h89
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.h2
-rw-r--r--src/core/ext/filters/client_channel/parse_address.h2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc7
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc80
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h5
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc5
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h2
-rw-r--r--src/core/ext/filters/client_channel/resolver_factory.h2
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc381
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h7
-rw-r--r--src/core/ext/filters/client_channel/uri_parser.cc314
-rw-r--r--src/core/ext/filters/client_channel/uri_parser.h50
33 files changed, 4091 insertions, 530 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index bb3ea400d1..91894689c3 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -129,6 +129,10 @@ typedef struct client_channel_channel_data {
grpc_core::UniquePtr<char> info_lb_policy_name;
/** service config in JSON form */
grpc_core::UniquePtr<char> info_service_config_json;
+ /* backpointer to grpc_channel's channelz node */
+ grpc_core::channelz::ClientChannelNode* channelz_channel;
+ /* caches if the last resolution event contained addresses */
+ bool previous_resolution_contained_addresses;
} channel_data;
typedef struct {
@@ -153,6 +157,23 @@ static void watch_lb_policy_locked(channel_data* chand,
grpc_core::LoadBalancingPolicy* lb_policy,
grpc_connectivity_state current_state);
+static const char* channel_connectivity_state_change_string(
+ grpc_connectivity_state state) {
+ switch (state) {
+ case GRPC_CHANNEL_IDLE:
+ return "Channel state change to IDLE";
+ case GRPC_CHANNEL_CONNECTING:
+ return "Channel state change to CONNECTING";
+ case GRPC_CHANNEL_READY:
+ return "Channel state change to READY";
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ return "Channel state change to TRANSIENT_FAILURE";
+ case GRPC_CHANNEL_SHUTDOWN:
+ return "Channel state change to SHUTDOWN";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
static void set_channel_connectivity_state_locked(channel_data* chand,
grpc_connectivity_state state,
grpc_error* error,
@@ -177,6 +198,12 @@ static void set_channel_connectivity_state_locked(channel_data* chand,
gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand,
grpc_connectivity_state_name(state));
}
+ if (chand->channelz_channel != nullptr) {
+ chand->channelz_channel->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string(
+ channel_connectivity_state_change_string(state)));
+ }
grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
}
@@ -376,6 +403,8 @@ static void request_reresolution_locked(void* arg, grpc_error* error) {
chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
}
+using TraceStringVector = grpc_core::InlinedVector<char*, 3>;
+
// Creates a new LB policy, replacing any previous one.
// If the new policy is created successfully, sets *connectivity_state and
// *connectivity_error to its initial connectivity state; otherwise,
@@ -383,7 +412,7 @@ static void request_reresolution_locked(void* arg, grpc_error* error) {
static void create_new_lb_policy_locked(
channel_data* chand, char* lb_policy_name,
grpc_connectivity_state* connectivity_state,
- grpc_error** connectivity_error) {
+ grpc_error** connectivity_error, TraceStringVector* trace_strings) {
grpc_core::LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = chand->combiner;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
@@ -393,11 +422,21 @@ static void create_new_lb_policy_locked(
lb_policy_name, lb_policy_args);
if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
+ if (chand->channelz_channel != nullptr) {
+ char* str;
+ gpr_asprintf(&str, "Could not create LB policy \'%s\'", lb_policy_name);
+ trace_strings->push_back(str);
+ }
} else {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand,
lb_policy_name, new_lb_policy.get());
}
+ if (chand->channelz_channel != nullptr) {
+ char* str;
+ gpr_asprintf(&str, "Created new LB policy \'%s\'", lb_policy_name);
+ trace_strings->push_back(str);
+ }
// Swap out the LB policy and update the fds in
// chand->interested_parties.
if (chand->lb_policy != nullptr) {
@@ -472,6 +511,51 @@ get_service_config_from_resolver_result_locked(channel_data* chand) {
return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json));
}
+static void maybe_add_trace_message_for_address_changes_locked(
+ channel_data* chand, TraceStringVector* trace_strings) {
+ int resolution_contains_addresses = false;
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
+ if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
+ if (addresses->num_addresses > 0) {
+ resolution_contains_addresses = true;
+ }
+ }
+ if (!resolution_contains_addresses &&
+ chand->previous_resolution_contained_addresses) {
+ trace_strings->push_back(gpr_strdup("Address list became empty"));
+ } else if (resolution_contains_addresses &&
+ !chand->previous_resolution_contained_addresses) {
+ trace_strings->push_back(gpr_strdup("Address list became non-empty"));
+ }
+ chand->previous_resolution_contained_addresses =
+ resolution_contains_addresses;
+}
+
+static void concatenate_and_add_channel_trace_locked(
+ channel_data* chand, TraceStringVector* trace_strings) {
+ if (!trace_strings->empty()) {
+ gpr_strvec v;
+ gpr_strvec_init(&v);
+ gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
+ bool is_first = 1;
+ for (size_t i = 0; i < trace_strings->size(); ++i) {
+ if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
+ is_first = false;
+ gpr_strvec_add(&v, (*trace_strings)[i]);
+ }
+ char* flat;
+ size_t flat_len = 0;
+ flat = gpr_strvec_flatten(&v, &flat_len);
+ chand->channelz_channel->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_new(flat, flat_len, gpr_free));
+ gpr_strvec_destroy(&v);
+ }
+}
+
// Callback invoked when a resolver result is available.
static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg);
@@ -493,6 +577,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
}
// Data used to set the channel's connectivity state.
bool set_connectivity_state = true;
+ // We only want to trace the address resolution in the follow cases:
+ // (a) Address resolution resulted in service config change.
+ // (b) Address resolution that causes number of backends to go from
+ // zero to non-zero.
+ // (c) Address resolution that causes number of backends to go from
+ // non-zero to zero.
+ // (d) Address resolution that causes a new LB policy to be created.
+ //
+ // we track a list of strings to eventually be concatenated and traced.
+ TraceStringVector trace_strings;
grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_error* connectivity_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
@@ -527,11 +621,29 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
} else {
// Instantiate new LB policy.
create_new_lb_policy_locked(chand, lb_policy_name.get(),
- &connectivity_state, &connectivity_error);
+ &connectivity_state, &connectivity_error,
+ &trace_strings);
}
// Find service config.
grpc_core::UniquePtr<char> service_config_json =
get_service_config_from_resolver_result_locked(chand);
+ // Note: It's safe to use chand->info_service_config_json here without
+ // taking a lock on chand->info_mu, because this function is the
+ // only thing that modifies its value, and it can only be invoked
+ // once at any given time.
+ if (chand->channelz_channel != nullptr) {
+ if (((service_config_json == nullptr) !=
+ (chand->info_service_config_json == nullptr)) ||
+ (service_config_json != nullptr &&
+ strcmp(service_config_json.get(),
+ chand->info_service_config_json.get()) != 0)) {
+ // TODO(ncteisen): might be worth somehow including a snippet of the
+ // config in the trace, at the risk of bloating the trace logs.
+ trace_strings.push_back(gpr_strdup("Service config changed"));
+ }
+ maybe_add_trace_message_for_address_changes_locked(chand, &trace_strings);
+ concatenate_and_add_channel_trace_locked(chand, &trace_strings);
+ }
// Swap out the data used by cc_get_channel_info().
gpr_mu_lock(&chand->info_mu);
chand->info_lb_policy_name = std::move(lb_policy_name);
@@ -699,6 +811,8 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
// Record enable_retries.
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
+ chand->channelz_channel = nullptr;
+ chand->previous_resolution_contained_addresses = false;
// Record client channel factory.
arg = grpc_channel_args_find(args->channel_args,
GRPC_ARG_CLIENT_CHANNEL_FACTORY);
@@ -2194,9 +2308,9 @@ static void add_retriable_send_initial_metadata_op(
.grpc_previous_rpc_attempts);
}
if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
- grpc_mdelem retry_md = grpc_mdelem_from_slices(
+ grpc_mdelem retry_md = grpc_mdelem_create(
GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
- *retry_count_strings[calld->num_attempts_completed - 1]);
+ *retry_count_strings[calld->num_attempts_completed - 1], nullptr);
grpc_error* error = grpc_metadata_batch_add_tail(
&retry_state->send_initial_metadata,
&retry_state->send_initial_metadata_storage[calld->send_initial_metadata
@@ -2837,6 +2951,27 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) {
}
}
+// If the channel is in TRANSIENT_FAILURE and the call is not
+// wait_for_ready=true, fails the call and returns true.
+static bool fail_call_if_in_transient_failure(grpc_call_element* elem) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch;
+ if (grpc_connectivity_state_check(&chand->state_tracker) ==
+ GRPC_CHANNEL_TRANSIENT_FAILURE &&
+ (batch->payload->send_initial_metadata.send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
+ pending_batches_fail(
+ elem,
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "channel is in state TRANSIENT_FAILURE"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
+ true /* yield_call_combiner */);
+ return true;
+ }
+ return false;
+}
+
// Invoked once resolver results are available.
static void process_service_config_and_start_lb_pick_locked(
grpc_call_element* elem) {
@@ -2844,6 +2979,9 @@ static void process_service_config_and_start_lb_pick_locked(
// Only get service config data on the first attempt.
if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
apply_service_config_to_call_locked(elem);
+ // Check this after applying service config, since it may have
+ // affected the call's wait_for_ready value.
+ if (fail_call_if_in_transient_failure(elem)) return;
}
// Start LB pick.
grpc_core::LbPicker::StartLocked(elem);
@@ -3013,6 +3151,16 @@ static void start_pick_locked(void* arg, grpc_error* ignored) {
// We do not yet have an LB policy, so wait for a resolver result.
if (GPR_UNLIKELY(!chand->started_resolving)) {
start_resolving_locked(chand);
+ } else {
+ // Normally, we want to do this check in
+ // process_service_config_and_start_lb_pick_locked(), so that we
+ // can honor the wait_for_ready setting in the service config.
+ // However, if the channel is in TRANSIENT_FAILURE at this point, that
+ // means that the resolver has returned a failure, so we're not going
+ // to get a service config right away. In that case, we fail the
+ // call now based on the wait_for_ready value passed in from the
+ // application.
+ if (fail_call_if_in_transient_failure(elem)) return;
}
// Create a new waiter, which will delete itself when done.
grpc_core::New<grpc_core::ResolverResultWaiter>(elem);
@@ -3208,6 +3356,12 @@ static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
}
+void grpc_client_channel_set_channelz_node(
+ grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ chand->channelz_channel = node;
+}
+
void grpc_client_channel_populate_child_refs(
grpc_channel_element* elem,
grpc_core::channelz::ChildRefsList* child_subchannels,
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index d64faaabd2..4935fd24d8 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -40,6 +40,9 @@ extern grpc_core::TraceFlag grpc_client_channel_trace;
extern const grpc_channel_filter grpc_client_channel_filter;
+void grpc_client_channel_set_channelz_node(
+ grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node);
+
void grpc_client_channel_populate_child_refs(
grpc_channel_element* elem,
grpc_core::channelz::ChildRefsList* child_subchannels,
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc
index b66c920b90..8e5426081c 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.cc
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc
@@ -49,6 +49,7 @@ ClientChannelNode::ClientChannelNode(grpc_channel* channel,
: ChannelNode(channel, channel_tracer_max_nodes, is_top_level_channel) {
client_channel_ =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+ grpc_client_channel_set_channelz_node(client_channel_, this);
GPR_ASSERT(client_channel_->filter == &grpc_client_channel_filter);
}
@@ -127,7 +128,8 @@ void SubchannelNode::PopulateConnectivityState(grpc_json* json) {
if (subchannel_ == nullptr) {
state = GRPC_CHANNEL_SHUTDOWN;
} else {
- state = grpc_subchannel_check_connectivity(subchannel_, nullptr);
+ state = grpc_subchannel_check_connectivity(
+ subchannel_, nullptr, true /* inhibit_health_checking */);
}
json = grpc_json_create_child(nullptr, json, "state", nullptr,
GRPC_JSON_OBJECT, false);
diff --git a/src/core/ext/filters/client_channel/health/health.pb.c b/src/core/ext/filters/client_channel/health/health.pb.c
new file mode 100644
index 0000000000..5499c549cc
--- /dev/null
+++ b/src/core/ext/filters/client_channel/health/health.pb.c
@@ -0,0 +1,23 @@
+/* Automatically generated nanopb constant definitions */
+/* Generated by nanopb-0.3.7-dev */
+
+#include "src/core/ext/filters/client_channel/health/health.pb.h"
+/* @@protoc_insertion_point(includes) */
+#if PB_PROTO_HEADER_VERSION != 30
+#error Regenerate this file with the current version of nanopb generator.
+#endif
+
+
+
+const pb_field_t grpc_health_v1_HealthCheckRequest_fields[2] = {
+ PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_health_v1_HealthCheckRequest, service, service, 0),
+ PB_LAST_FIELD
+};
+
+const pb_field_t grpc_health_v1_HealthCheckResponse_fields[2] = {
+ PB_FIELD( 1, UENUM , OPTIONAL, STATIC , FIRST, grpc_health_v1_HealthCheckResponse, status, status, 0),
+ PB_LAST_FIELD
+};
+
+
+/* @@protoc_insertion_point(eof) */
diff --git a/src/core/ext/filters/client_channel/health/health.pb.h b/src/core/ext/filters/client_channel/health/health.pb.h
new file mode 100644
index 0000000000..9d54ccd618
--- /dev/null
+++ b/src/core/ext/filters/client_channel/health/health.pb.h
@@ -0,0 +1,73 @@
+/* Automatically generated nanopb header */
+/* Generated by nanopb-0.3.7-dev */
+
+#ifndef PB_GRPC_HEALTH_V1_HEALTH_PB_H_INCLUDED
+#define PB_GRPC_HEALTH_V1_HEALTH_PB_H_INCLUDED
+#include "pb.h"
+/* @@protoc_insertion_point(includes) */
+#if PB_PROTO_HEADER_VERSION != 30
+#error Regenerate this file with the current version of nanopb generator.
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Enum definitions */
+typedef enum _grpc_health_v1_HealthCheckResponse_ServingStatus {
+ grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN = 0,
+ grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING = 1,
+ grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2,
+ grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN = 3
+} grpc_health_v1_HealthCheckResponse_ServingStatus;
+#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MIN grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN
+#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
+#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN+1))
+
+/* Struct definitions */
+typedef struct _grpc_health_v1_HealthCheckRequest {
+ bool has_service;
+ char service[200];
+/* @@protoc_insertion_point(struct:grpc_health_v1_HealthCheckRequest) */
+} grpc_health_v1_HealthCheckRequest;
+
+typedef struct _grpc_health_v1_HealthCheckResponse {
+ bool has_status;
+ grpc_health_v1_HealthCheckResponse_ServingStatus status;
+/* @@protoc_insertion_point(struct:grpc_health_v1_HealthCheckResponse) */
+} grpc_health_v1_HealthCheckResponse;
+
+/* Default values for struct fields */
+
+/* Initializer values for message structs */
+#define grpc_health_v1_HealthCheckRequest_init_default {false, ""}
+#define grpc_health_v1_HealthCheckResponse_init_default {false, (grpc_health_v1_HealthCheckResponse_ServingStatus)0}
+#define grpc_health_v1_HealthCheckRequest_init_zero {false, ""}
+#define grpc_health_v1_HealthCheckResponse_init_zero {false, (grpc_health_v1_HealthCheckResponse_ServingStatus)0}
+
+/* Field tags (for use in manual encoding/decoding) */
+#define grpc_health_v1_HealthCheckRequest_service_tag 1
+#define grpc_health_v1_HealthCheckResponse_status_tag 1
+
+/* Struct field encoding specification for nanopb */
+extern const pb_field_t grpc_health_v1_HealthCheckRequest_fields[2];
+extern const pb_field_t grpc_health_v1_HealthCheckResponse_fields[2];
+
+/* Maximum encoded size of messages (where known) */
+#define grpc_health_v1_HealthCheckRequest_size 203
+#define grpc_health_v1_HealthCheckResponse_size 2
+
+/* Message IDs (where set with "msgid" option) */
+#ifdef PB_MSGID
+
+#define HEALTH_MESSAGES \
+
+
+#endif
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif
+/* @@protoc_insertion_point(eof) */
+
+#endif
diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc
new file mode 100644
index 0000000000..591637aa86
--- /dev/null
+++ b/src/core/ext/filters/client_channel/health/health_check_client.cc
@@ -0,0 +1,653 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <stdint.h>
+#include <stdio.h>
+
+#include "src/core/ext/filters/client_channel/health/health_check_client.h"
+
+#include "pb_decode.h"
+#include "pb_encode.h"
+#include "src/core/ext/filters/client_channel/health/health.pb.h"
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/status_metadata.h"
+
+#define HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER 1.6
+#define HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS 120
+#define HEALTH_CHECK_RECONNECT_JITTER 0.2
+
+grpc_core::TraceFlag grpc_health_check_client_trace(false,
+ "health_check_client");
+
+namespace grpc_core {
+
+//
+// HealthCheckClient
+//
+
+HealthCheckClient::HealthCheckClient(
+ const char* service_name,
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel,
+ grpc_pollset_set* interested_parties,
+ grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> channelz_node)
+ : InternallyRefCountedWithTracing<HealthCheckClient>(
+ &grpc_health_check_client_trace),
+ service_name_(service_name),
+ connected_subchannel_(std::move(connected_subchannel)),
+ interested_parties_(interested_parties),
+ channelz_node_(std::move(channelz_node)),
+ retry_backoff_(
+ BackOff::Options()
+ .set_initial_backoff(
+ HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
+ .set_multiplier(HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(HEALTH_CHECK_RECONNECT_JITTER)
+ .set_max_backoff(HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS *
+ 1000)) {
+ if (grpc_health_check_client_trace.enabled()) {
+ gpr_log(GPR_INFO, "created HealthCheckClient %p", this);
+ }
+ GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
+ grpc_schedule_on_exec_ctx);
+ gpr_mu_init(&mu_);
+ StartCall();
+}
+
+HealthCheckClient::~HealthCheckClient() {
+ if (grpc_health_check_client_trace.enabled()) {
+ gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this);
+ }
+ GRPC_ERROR_UNREF(error_);
+ gpr_mu_destroy(&mu_);
+}
+
+void HealthCheckClient::NotifyOnHealthChange(grpc_connectivity_state* state,
+ grpc_closure* closure) {
+ MutexLock lock(&mu_);
+ GPR_ASSERT(notify_state_ == nullptr);
+ if (*state != state_) {
+ *state = state_;
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(error_));
+ return;
+ }
+ notify_state_ = state;
+ on_health_changed_ = closure;
+}
+
+void HealthCheckClient::SetHealthStatus(grpc_connectivity_state state,
+ grpc_error* error) {
+ MutexLock lock(&mu_);
+ SetHealthStatusLocked(state, error);
+}
+
+void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state,
+ grpc_error* error) {
+ if (grpc_health_check_client_trace.enabled()) {
+ gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%d error=%s", this,
+ state, grpc_error_string(error));
+ }
+ if (notify_state_ != nullptr && *notify_state_ != state) {
+ *notify_state_ = state;
+ notify_state_ = nullptr;
+ GRPC_CLOSURE_SCHED(on_health_changed_, GRPC_ERROR_REF(error));
+ on_health_changed_ = nullptr;
+ }
+ state_ = state;
+ GRPC_ERROR_UNREF(error_);
+ error_ = error;
+}
+
+void HealthCheckClient::Orphan() {
+ if (grpc_health_check_client_trace.enabled()) {
+ gpr_log(GPR_INFO, "HealthCheckClient %p: shutting down", this);
+ }
+ {
+ MutexLock lock(&mu_);
+ if (on_health_changed_ != nullptr) {
+ *notify_state_ = GRPC_CHANNEL_SHUTDOWN;
+ notify_state_ = nullptr;
+ GRPC_CLOSURE_SCHED(on_health_changed_, GRPC_ERROR_NONE);
+ on_health_changed_ = nullptr;
+ }
+ shutting_down_ = true;
+ call_state_.reset();
+ if (retry_timer_callback_pending_) {
+ grpc_timer_cancel(&retry_timer_);
+ }
+ }
+ Unref(DEBUG_LOCATION, "orphan");
+}
+
+void HealthCheckClient::StartCall() {
+ MutexLock lock(&mu_);
+ StartCallLocked();
+}
+
+void HealthCheckClient::StartCallLocked() {
+ if (shutting_down_) return;
+ GPR_ASSERT(call_state_ == nullptr);
+ SetHealthStatusLocked(GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE);
+ call_state_ = MakeOrphanable<CallState>(Ref(), interested_parties_);
+ if (grpc_health_check_client_trace.enabled()) {
+ gpr_log(GPR_INFO, "HealthCheckClient %p: created CallState %p", this,
+ call_state_.get());
+ }
+ call_state_->StartCall();
+}
+
+void HealthCheckClient::StartRetryTimer() {
+ MutexLock lock(&mu_);
+ SetHealthStatusLocked(
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "health check call failed; will retry after backoff"));
+ grpc_millis next_try = retry_backoff_.NextAttemptTime();
+ if (grpc_health_check_client_trace.enabled()) {
+ gpr_log(GPR_INFO, "HealthCheckClient %p: health check call lost...", this);
+ grpc_millis timeout = next_try - ExecCtx::Get()->Now();
+ if (timeout > 0) {
+ gpr_log(GPR_INFO,
+ "HealthCheckClient %p: ... will retry in %" PRId64 "ms.", this,
+ timeout);
+ } else {
+ gpr_log(GPR_INFO, "HealthCheckClient %p: ... retrying immediately.",
+ this);
+ }
+ }
+ // Ref for callback, tracked manually.
+ Ref(DEBUG_LOCATION, "health_retry_timer").release();
+ retry_timer_callback_pending_ = true;
+ grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
+}
+
+void HealthCheckClient::OnRetryTimer(void* arg, grpc_error* error) {
+ HealthCheckClient* self = static_cast<HealthCheckClient*>(arg);
+ {
+ MutexLock lock(&self->mu_);
+ self->retry_timer_callback_pending_ = false;
+ if (!self->shutting_down_ && error == GRPC_ERROR_NONE &&
+ self->call_state_ == nullptr) {
+ if (grpc_health_check_client_trace.enabled()) {
+ gpr_log(GPR_INFO, "HealthCheckClient %p: restarting health check call",
+ self);
+ }
+ self->StartCallLocked();
+ }
+ }
+ self->Unref(DEBUG_LOCATION, "health_retry_timer");
+}
+
+//
+// protobuf helpers
+//
+
+namespace {
+
+void EncodeRequest(const char* service_name,
+ ManualConstructor<SliceBufferByteStream>* send_message) {
+ grpc_health_v1_HealthCheckRequest request_struct;
+ request_struct.has_service = true;
+ snprintf(request_struct.service, sizeof(request_struct.service), "%s",
+ service_name);
+ pb_ostream_t ostream;
+ memset(&ostream, 0, sizeof(ostream));
+ pb_encode(&ostream, grpc_health_v1_HealthCheckRequest_fields,
+ &request_struct);
+ grpc_slice request_slice = GRPC_SLICE_MALLOC(ostream.bytes_written);
+ ostream = pb_ostream_from_buffer(GRPC_SLICE_START_PTR(request_slice),
+ GRPC_SLICE_LENGTH(request_slice));
+ GPR_ASSERT(pb_encode(&ostream, grpc_health_v1_HealthCheckRequest_fields,
+ &request_struct) != 0);
+ grpc_slice_buffer slice_buffer;
+ grpc_slice_buffer_init(&slice_buffer);
+ grpc_slice_buffer_add(&slice_buffer, request_slice);
+ send_message->Init(&slice_buffer, 0);
+ grpc_slice_buffer_destroy_internal(&slice_buffer);
+}
+
+// Returns true if healthy.
+// If there was an error parsing the response, sets *error and returns false.
+bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error** error) {
+ // If message is empty, assume unhealthy.
+ if (slice_buffer->length == 0) {
+ *error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("health check response was empty");
+ return false;
+ }
+ // Concatenate the slices to form a single string.
+ UniquePtr<uint8_t> recv_message_deleter;
+ uint8_t* recv_message;
+ if (slice_buffer->count == 1) {
+ recv_message = GRPC_SLICE_START_PTR(slice_buffer->slices[0]);
+ } else {
+ recv_message = static_cast<uint8_t*>(gpr_malloc(slice_buffer->length));
+ recv_message_deleter.reset(recv_message);
+ size_t offset = 0;
+ for (size_t i = 0; i < slice_buffer->count; ++i) {
+ memcpy(recv_message + offset,
+ GRPC_SLICE_START_PTR(slice_buffer->slices[i]),
+ GRPC_SLICE_LENGTH(slice_buffer->slices[i]));
+ offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]);
+ }
+ }
+ // Deserialize message.
+ grpc_health_v1_HealthCheckResponse response_struct;
+ pb_istream_t istream =
+ pb_istream_from_buffer(recv_message, slice_buffer->length);
+ if (!pb_decode(&istream, grpc_health_v1_HealthCheckResponse_fields,
+ &response_struct)) {
+ // Can't parse message; assume unhealthy.
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "cannot parse health check response");
+ return false;
+ }
+ if (!response_struct.has_status) {
+ // Field not present; assume unhealthy.
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "status field not present in health check response");
+ return false;
+ }
+ return response_struct.status ==
+ grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING;
+}
+
+} // namespace
+
+//
+// HealthCheckClient::CallState
+//
+
+HealthCheckClient::CallState::CallState(
+ RefCountedPtr<HealthCheckClient> health_check_client,
+ grpc_pollset_set* interested_parties)
+ : InternallyRefCountedWithTracing<CallState>(
+ &grpc_health_check_client_trace),
+ health_check_client_(std::move(health_check_client)),
+ pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
+ arena_(gpr_arena_create(health_check_client_->connected_subchannel_
+ ->GetInitialCallSizeEstimate(0))) {
+ memset(&call_combiner_, 0, sizeof(call_combiner_));
+ grpc_call_combiner_init(&call_combiner_);
+ memset(context_, 0, sizeof(context_));
+ gpr_atm_rel_store(&seen_response_, static_cast<gpr_atm>(0));
+}
+
+HealthCheckClient::CallState::~CallState() {
+ if (grpc_health_check_client_trace.enabled()) {
+ gpr_log(GPR_INFO, "HealthCheckClient %p: destroying CallState %p",
+ health_check_client_.get(), this);
+ }
+ if (call_ != nullptr) GRPC_SUBCHANNEL_CALL_UNREF(call_, "call_ended");
+ for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) {
+ if (context_[i].destroy != nullptr) {
+ context_[i].destroy(context_[i].value);
+ }
+ }
+ // Unset the call combiner cancellation closure. This has the
+ // effect of scheduling the previously set cancellation closure, if
+ // any, so that it can release any internal references it may be
+ // holding to the call stack. Also flush the closures on exec_ctx so that
+ // filters that schedule cancel notification closures on exec_ctx do not
+ // need to take a ref of the call stack to guarantee closure liveness.
+ grpc_call_combiner_set_notify_on_cancel(&call_combiner_, nullptr);
+ grpc_core::ExecCtx::Get()->Flush();
+ grpc_call_combiner_destroy(&call_combiner_);
+ gpr_arena_destroy(arena_);
+}
+
+void HealthCheckClient::CallState::Orphan() {
+ grpc_call_combiner_cancel(&call_combiner_, GRPC_ERROR_CANCELLED);
+ Cancel();
+}
+
+void HealthCheckClient::CallState::StartCall() {
+ ConnectedSubchannel::CallArgs args = {
+ &pollent_,
+ GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH,
+ gpr_now(GPR_CLOCK_MONOTONIC), // start_time
+ GRPC_MILLIS_INF_FUTURE, // deadline
+ arena_,
+ context_,
+ &call_combiner_,
+ 0, // parent_data_size
+ };
+ grpc_error* error =
+ health_check_client_->connected_subchannel_->CreateCall(args, &call_);
+ if (error != GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR,
+ "HealthCheckClient %p CallState %p: error creating health "
+ "checking call on subchannel (%s); will retry",
+ health_check_client_.get(), this, grpc_error_string(error));
+ GRPC_ERROR_UNREF(error);
+ // Schedule instead of running directly, since we must not be
+ // holding health_check_client_->mu_ when CallEnded() is called.
+ Ref(DEBUG_LOCATION, "call_end_closure").release();
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_INIT(&batch_.handler_private.closure, CallEndedRetry, this,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
+ return;
+ }
+ // Initialize payload and batch.
+ memset(&batch_, 0, sizeof(batch_));
+ payload_.context = context_;
+ batch_.payload = &payload_;
+ // on_complete callback takes ref, handled manually.
+ Ref(DEBUG_LOCATION, "on_complete").release();
+ batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
+ grpc_schedule_on_exec_ctx);
+ // Add send_initial_metadata op.
+ grpc_metadata_batch_init(&send_initial_metadata_);
+ error = grpc_metadata_batch_add_head(
+ &send_initial_metadata_, &path_metadata_storage_,
+ grpc_mdelem_from_slices(
+ GRPC_MDSTR_PATH,
+ GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH));
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ payload_.send_initial_metadata.send_initial_metadata =
+ &send_initial_metadata_;
+ payload_.send_initial_metadata.send_initial_metadata_flags = 0;
+ payload_.send_initial_metadata.peer_string = nullptr;
+ batch_.send_initial_metadata = true;
+ // Add send_message op.
+ EncodeRequest(health_check_client_->service_name_, &send_message_);
+ payload_.send_message.send_message.reset(send_message_.get());
+ batch_.send_message = true;
+ // Add send_trailing_metadata op.
+ grpc_metadata_batch_init(&send_trailing_metadata_);
+ payload_.send_trailing_metadata.send_trailing_metadata =
+ &send_trailing_metadata_;
+ batch_.send_trailing_metadata = true;
+ // Add recv_initial_metadata op.
+ grpc_metadata_batch_init(&recv_initial_metadata_);
+ payload_.recv_initial_metadata.recv_initial_metadata =
+ &recv_initial_metadata_;
+ payload_.recv_initial_metadata.recv_flags = nullptr;
+ payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
+ payload_.recv_initial_metadata.peer_string = nullptr;
+ // recv_initial_metadata_ready callback takes ref, handled manually.
+ Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
+ payload_.recv_initial_metadata.recv_initial_metadata_ready =
+ GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
+ this, grpc_schedule_on_exec_ctx);
+ batch_.recv_initial_metadata = true;
+ // Add recv_message op.
+ payload_.recv_message.recv_message = &recv_message_;
+ // recv_message callback takes ref, handled manually.
+ Ref(DEBUG_LOCATION, "recv_message_ready").release();
+ payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
+ &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
+ batch_.recv_message = true;
+ // Start batch.
+ StartBatch(&batch_);
+ // Initialize recv_trailing_metadata batch.
+ memset(&recv_trailing_metadata_batch_, 0,
+ sizeof(recv_trailing_metadata_batch_));
+ recv_trailing_metadata_batch_.payload = &payload_;
+ // Add recv_trailing_metadata op.
+ grpc_metadata_batch_init(&recv_trailing_metadata_);
+ payload_.recv_trailing_metadata.recv_trailing_metadata =
+ &recv_trailing_metadata_;
+ payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
+ // This callback signals the end of the call, so it relies on the
+ // initial ref instead of taking a new ref. When it's invoked, the
+ // initial ref is released.
+ payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
+ GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
+ RecvTrailingMetadataReady, this,
+ grpc_schedule_on_exec_ctx);
+ recv_trailing_metadata_batch_.recv_trailing_metadata = true;
+ // Start recv_trailing_metadata batch.
+ StartBatch(&recv_trailing_metadata_batch_);
+}
+
+void HealthCheckClient::CallState::StartBatchInCallCombiner(void* arg,
+ grpc_error* error) {
+ grpc_transport_stream_op_batch* batch =
+ static_cast<grpc_transport_stream_op_batch*>(arg);
+ grpc_subchannel_call* call =
+ static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
+ grpc_subchannel_call_process_op(call, batch);
+}
+
+void HealthCheckClient::CallState::StartBatch(
+ grpc_transport_stream_op_batch* batch) {
+ batch->handler_private.extra_arg = call_;
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
+ batch, grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
+ GRPC_ERROR_NONE, "start_subchannel_batch");
+}
+
+void HealthCheckClient::CallState::OnCancelComplete(void* arg,
+ grpc_error* error) {
+ HealthCheckClient::CallState* self =
+ static_cast<HealthCheckClient::CallState*>(arg);
+ GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
+ self->Unref(DEBUG_LOCATION, "cancel");
+}
+
+void HealthCheckClient::CallState::StartCancel(void* arg, grpc_error* error) {
+ HealthCheckClient::CallState* self =
+ static_cast<HealthCheckClient::CallState*>(arg);
+ auto* batch = grpc_make_transport_stream_op(
+ GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
+ batch->cancel_stream = true;
+ batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
+ grpc_subchannel_call_process_op(self->call_, batch);
+}
+
+void HealthCheckClient::CallState::Cancel() {
+ if (call_ != nullptr) {
+ Ref(DEBUG_LOCATION, "cancel").release();
+ GRPC_CALL_COMBINER_START(
+ &call_combiner_,
+ GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE, "health_cancel");
+ }
+}
+
+void HealthCheckClient::CallState::OnComplete(void* arg, grpc_error* error) {
+ HealthCheckClient::CallState* self =
+ static_cast<HealthCheckClient::CallState*>(arg);
+ GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
+ grpc_metadata_batch_destroy(&self->send_initial_metadata_);
+ grpc_metadata_batch_destroy(&self->send_trailing_metadata_);
+ self->Unref(DEBUG_LOCATION, "on_complete");
+}
+
+void HealthCheckClient::CallState::RecvInitialMetadataReady(void* arg,
+ grpc_error* error) {
+ HealthCheckClient::CallState* self =
+ static_cast<HealthCheckClient::CallState*>(arg);
+ GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
+ grpc_metadata_batch_destroy(&self->recv_initial_metadata_);
+ self->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
+}
+
+void HealthCheckClient::CallState::DoneReadingRecvMessage(grpc_error* error) {
+ recv_message_.reset();
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_UNREF(error);
+ Cancel();
+ grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
+ Unref(DEBUG_LOCATION, "recv_message_ready");
+ return;
+ }
+ const bool healthy = DecodeResponse(&recv_message_buffer_, &error);
+ const grpc_connectivity_state state =
+ healthy ? GRPC_CHANNEL_READY : GRPC_CHANNEL_TRANSIENT_FAILURE;
+ if (error == GRPC_ERROR_NONE && !healthy) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("backend unhealthy");
+ }
+ health_check_client_->SetHealthStatus(state, error);
+ gpr_atm_rel_store(&seen_response_, static_cast<gpr_atm>(1));
+ grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
+ // Start another recv_message batch.
+ // This re-uses the ref we're holding.
+ // Note: Can't just reuse batch_ here, since we don't know that all
+ // callbacks from the original batch have completed yet.
+ memset(&recv_message_batch_, 0, sizeof(recv_message_batch_));
+ recv_message_batch_.payload = &payload_;
+ payload_.recv_message.recv_message = &recv_message_;
+ payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
+ &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
+ recv_message_batch_.recv_message = true;
+ StartBatch(&recv_message_batch_);
+}
+
+grpc_error* HealthCheckClient::CallState::PullSliceFromRecvMessage() {
+ grpc_slice slice;
+ grpc_error* error = recv_message_->Pull(&slice);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_slice_buffer_add(&recv_message_buffer_, slice);
+ }
+ return error;
+}
+
+void HealthCheckClient::CallState::ContinueReadingRecvMessage() {
+ while (recv_message_->Next(SIZE_MAX, &recv_message_ready_)) {
+ grpc_error* error = PullSliceFromRecvMessage();
+ if (error != GRPC_ERROR_NONE) {
+ DoneReadingRecvMessage(error);
+ return;
+ }
+ if (recv_message_buffer_.length == recv_message_->length()) {
+ DoneReadingRecvMessage(GRPC_ERROR_NONE);
+ break;
+ }
+ }
+}
+
+void HealthCheckClient::CallState::OnByteStreamNext(void* arg,
+ grpc_error* error) {
+ HealthCheckClient::CallState* self =
+ static_cast<HealthCheckClient::CallState*>(arg);
+ if (error != GRPC_ERROR_NONE) {
+ self->DoneReadingRecvMessage(GRPC_ERROR_REF(error));
+ return;
+ }
+ error = self->PullSliceFromRecvMessage();
+ if (error != GRPC_ERROR_NONE) {
+ self->DoneReadingRecvMessage(error);
+ return;
+ }
+ if (self->recv_message_buffer_.length == self->recv_message_->length()) {
+ self->DoneReadingRecvMessage(GRPC_ERROR_NONE);
+ } else {
+ self->ContinueReadingRecvMessage();
+ }
+}
+
+void HealthCheckClient::CallState::RecvMessageReady(void* arg,
+ grpc_error* error) {
+ HealthCheckClient::CallState* self =
+ static_cast<HealthCheckClient::CallState*>(arg);
+ GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
+ if (self->recv_message_ == nullptr) {
+ self->Unref(DEBUG_LOCATION, "recv_message_ready");
+ return;
+ }
+ grpc_slice_buffer_init(&self->recv_message_buffer_);
+ GRPC_CLOSURE_INIT(&self->recv_message_ready_, OnByteStreamNext, self,
+ grpc_schedule_on_exec_ctx);
+ self->ContinueReadingRecvMessage();
+ // Ref will continue to be held until we finish draining the byte stream.
+}
+
+void HealthCheckClient::CallState::RecvTrailingMetadataReady(
+ void* arg, grpc_error* error) {
+ HealthCheckClient::CallState* self =
+ static_cast<HealthCheckClient::CallState*>(arg);
+ GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
+ "recv_trailing_metadata_ready");
+ // Get call status.
+ grpc_status_code status = GRPC_STATUS_UNKNOWN;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, &status,
+ nullptr /* slice */, nullptr /* http_error */,
+ nullptr /* error_string */);
+ } else if (self->recv_trailing_metadata_.idx.named.grpc_status != nullptr) {
+ status = grpc_get_status_code_from_metadata(
+ self->recv_trailing_metadata_.idx.named.grpc_status->md);
+ }
+ if (grpc_health_check_client_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "HealthCheckClient %p CallState %p: health watch failed with "
+ "status %d",
+ self->health_check_client_.get(), self, status);
+ }
+ // Clean up.
+ grpc_metadata_batch_destroy(&self->recv_trailing_metadata_);
+ // For status UNIMPLEMENTED, give up and assume always healthy.
+ bool retry = true;
+ if (status == GRPC_STATUS_UNIMPLEMENTED) {
+ static const char kErrorMessage[] =
+ "health checking Watch method returned UNIMPLEMENTED; "
+ "disabling health checks but assuming server is healthy";
+ gpr_log(GPR_ERROR, kErrorMessage);
+ if (self->health_check_client_->channelz_node_ != nullptr) {
+ self->health_check_client_->channelz_node_->AddTraceEvent(
+ channelz::ChannelTrace::Error,
+ grpc_slice_from_static_string(kErrorMessage));
+ }
+ self->health_check_client_->SetHealthStatus(GRPC_CHANNEL_READY,
+ GRPC_ERROR_NONE);
+ retry = false;
+ }
+ self->CallEnded(retry);
+}
+
+void HealthCheckClient::CallState::CallEndedRetry(void* arg,
+ grpc_error* error) {
+ HealthCheckClient::CallState* self =
+ static_cast<HealthCheckClient::CallState*>(arg);
+ self->CallEnded(true /* retry */);
+ self->Unref(DEBUG_LOCATION, "call_end_closure");
+}
+
+void HealthCheckClient::CallState::CallEnded(bool retry) {
+ // If this CallState is still in use, this call ended because of a failure,
+ // so we need to stop using it and optionally create a new one.
+ // Otherwise, we have deliberately ended this call, and no further action
+ // is required.
+ if (this == health_check_client_->call_state_.get()) {
+ health_check_client_->call_state_.reset();
+ if (retry) {
+ GPR_ASSERT(!health_check_client_->shutting_down_);
+ if (static_cast<bool>(gpr_atm_acq_load(&seen_response_))) {
+ // If the call fails after we've gotten a successful response, reset
+ // the backoff and restart the call immediately.
+ health_check_client_->retry_backoff_.Reset();
+ health_check_client_->StartCall();
+ } else {
+ // If the call failed without receiving any messages, retry later.
+ health_check_client_->StartRetryTimer();
+ }
+ }
+ }
+ Unref(DEBUG_LOCATION, "call_ended");
+}
+
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/health/health_check_client.h b/src/core/ext/filters/client_channel/health/health_check_client.h
new file mode 100644
index 0000000000..7f77348f18
--- /dev/null
+++ b/src/core/ext/filters/client_channel/health/health_check_client.h
@@ -0,0 +1,173 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/grpc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/lib/backoff/backoff.h"
+#include "src/core/lib/gpr/arena.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/call_combiner.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/transport/byte_stream.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/transport.h"
+
+namespace grpc_core {
+
+class HealthCheckClient
+ : public InternallyRefCountedWithTracing<HealthCheckClient> {
+ public:
+ HealthCheckClient(const char* service_name,
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel,
+ grpc_pollset_set* interested_parties,
+ RefCountedPtr<channelz::SubchannelNode> channelz_node);
+
+ ~HealthCheckClient();
+
+ // When the health state changes from *state, sets *state to the new
+ // value and schedules closure.
+ // Only one closure can be outstanding at a time.
+ void NotifyOnHealthChange(grpc_connectivity_state* state,
+ grpc_closure* closure);
+
+ void Orphan() override;
+
+ private:
+ // Contains a call to the backend and all the data related to the call.
+ class CallState : public InternallyRefCountedWithTracing<CallState> {
+ public:
+ CallState(RefCountedPtr<HealthCheckClient> health_check_client,
+ grpc_pollset_set* interested_parties_);
+ ~CallState();
+
+ void Orphan() override;
+
+ void StartCall();
+
+ private:
+ void Cancel();
+
+ void StartBatch(grpc_transport_stream_op_batch* batch);
+ static void StartBatchInCallCombiner(void* arg, grpc_error* error);
+
+ static void CallEndedRetry(void* arg, grpc_error* error);
+ void CallEnded(bool retry);
+
+ static void OnComplete(void* arg, grpc_error* error);
+ static void RecvInitialMetadataReady(void* arg, grpc_error* error);
+ static void RecvMessageReady(void* arg, grpc_error* error);
+ static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
+ static void StartCancel(void* arg, grpc_error* error);
+ static void OnCancelComplete(void* arg, grpc_error* error);
+
+ static void OnByteStreamNext(void* arg, grpc_error* error);
+ void ContinueReadingRecvMessage();
+ grpc_error* PullSliceFromRecvMessage();
+ void DoneReadingRecvMessage(grpc_error* error);
+
+ RefCountedPtr<HealthCheckClient> health_check_client_;
+ grpc_polling_entity pollent_;
+
+ gpr_arena* arena_;
+ grpc_call_combiner call_combiner_;
+ grpc_call_context_element context_[GRPC_CONTEXT_COUNT];
+
+ // The streaming call to the backend. Always non-NULL.
+ grpc_subchannel_call* call_;
+
+ grpc_transport_stream_op_batch_payload payload_;
+ grpc_transport_stream_op_batch batch_;
+ grpc_transport_stream_op_batch recv_message_batch_;
+ grpc_transport_stream_op_batch recv_trailing_metadata_batch_;
+
+ grpc_closure on_complete_;
+
+ // send_initial_metadata
+ grpc_metadata_batch send_initial_metadata_;
+ grpc_linked_mdelem path_metadata_storage_;
+
+ // send_message
+ ManualConstructor<SliceBufferByteStream> send_message_;
+
+ // send_trailing_metadata
+ grpc_metadata_batch send_trailing_metadata_;
+
+ // recv_initial_metadata
+ grpc_metadata_batch recv_initial_metadata_;
+ grpc_closure recv_initial_metadata_ready_;
+
+ // recv_message
+ OrphanablePtr<ByteStream> recv_message_;
+ grpc_closure recv_message_ready_;
+ grpc_slice_buffer recv_message_buffer_;
+ gpr_atm seen_response_;
+
+ // recv_trailing_metadata
+ grpc_metadata_batch recv_trailing_metadata_;
+ grpc_transport_stream_stats collect_stats_;
+ grpc_closure recv_trailing_metadata_ready_;
+ };
+
+ void StartCall();
+ void StartCallLocked(); // Requires holding mu_.
+
+ void StartRetryTimer();
+ static void OnRetryTimer(void* arg, grpc_error* error);
+
+ void SetHealthStatus(grpc_connectivity_state state, grpc_error* error);
+ void SetHealthStatusLocked(grpc_connectivity_state state,
+ grpc_error* error); // Requires holding mu_.
+
+ const char* service_name_; // Do not own.
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
+ grpc_pollset_set* interested_parties_; // Do not own.
+ RefCountedPtr<channelz::SubchannelNode> channelz_node_;
+
+ gpr_mu mu_;
+ grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
+ grpc_error* error_ = GRPC_ERROR_NONE;
+ grpc_connectivity_state* notify_state_ = nullptr;
+ grpc_closure* on_health_changed_ = nullptr;
+ bool shutting_down_ = false;
+
+ // The data associated with the current health check call. It holds a ref
+ // to this HealthCheckClient object.
+ OrphanablePtr<CallState> call_state_;
+
+ // Call retry state.
+ BackOff retry_backoff_;
+ grpc_timer retry_timer_;
+ grpc_closure retry_timer_callback_;
+ bool retry_timer_callback_pending_ = false;
+};
+
+} // namespace grpc_core
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H */
diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
index bfabc68c66..0716e46818 100644
--- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc
+++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
@@ -29,7 +29,6 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
-#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/gpr/env.h"
@@ -37,6 +36,7 @@
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/uri/uri_parser.h"
typedef struct http_connect_handshaker {
// Base class. Must be first.
diff --git a/src/core/ext/filters/client_channel/http_proxy.cc b/src/core/ext/filters/client_channel/http_proxy.cc
index 26d3f479b7..8951a2920c 100644
--- a/src/core/ext/filters/client_channel/http_proxy.cc
+++ b/src/core/ext/filters/client_channel/http_proxy.cc
@@ -29,12 +29,12 @@
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
-#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/slice/b64.h"
+#include "src/core/lib/uri/uri_parser.h"
/**
* Parses the 'https_proxy' env var (fallback on 'http_proxy') and returns the
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 5511df7a27..17e0d26875 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -1699,7 +1699,7 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
// Replace the LB addresses in the channel args that we pass down to
// the subchannel.
static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
- const grpc_arg args_to_add[] = {
+ grpc_arg args_to_add[3] = {
grpc_lb_addresses_create_channel_arg(addresses),
// A channel arg indicating if the target is a backend inferred from a
// grpclb load balancer.
@@ -1708,9 +1708,15 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
is_backend_from_grpclb_load_balancer),
};
+ size_t num_args_to_add = 2;
+ if (is_backend_from_grpclb_load_balancer) {
+ args_to_add[2] = grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
+ ++num_args_to_add;
+ }
grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
- GPR_ARRAY_SIZE(args_to_add));
+ num_args_to_add);
grpc_lb_addresses_destroy(addresses);
return args;
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index f4dca146f7..eb494486b9 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -359,9 +359,14 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
"Pick First %p received update with %" PRIuPTR " addresses", this,
addresses->num_addresses);
}
+ grpc_arg new_arg = grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
+ grpc_channel_args* new_args =
+ grpc_channel_args_copy_and_add(&args, &new_arg, 1);
auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
this, &grpc_lb_pick_first_trace, addresses, combiner(),
- client_channel_factory(), args);
+ client_channel_factory(), *new_args);
+ grpc_channel_args_destroy(new_args);
if (subchannel_list->num_subchannels() == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
// subchannels and put the channel in TRANSIENT_FAILURE.
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index e0e0e1e638..4ec9e935ed 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -102,8 +102,8 @@ class SubchannelData {
// ProcessConnectivityChangeLocked()).
grpc_connectivity_state CheckConnectivityStateLocked(grpc_error** error) {
GPR_ASSERT(!connectivity_notification_pending_);
- pending_connectivity_state_unsafe_ =
- grpc_subchannel_check_connectivity(subchannel(), error);
+ pending_connectivity_state_unsafe_ = grpc_subchannel_check_connectivity(
+ subchannel(), error, subchannel_list_->inhibit_health_checking());
UpdateConnectedSubchannelLocked();
return pending_connectivity_state_unsafe_;
}
@@ -216,6 +216,7 @@ class SubchannelList
// Accessors.
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }
+ bool inhibit_health_checking() const { return inhibit_health_checking_; }
// Resets connection backoff of all subchannels.
// TODO(roth): We will probably need to rethink this as part of moving
@@ -254,6 +255,8 @@ class SubchannelList
TraceFlag* tracer_;
+ bool inhibit_health_checking_;
+
grpc_combiner* combiner_;
// The list of subchannels.
@@ -340,7 +343,8 @@ void SubchannelData<SubchannelListType,
subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release();
grpc_subchannel_notify_on_state_change(
subchannel_, subchannel_list_->policy()->interested_parties(),
- &pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
+ &pending_connectivity_state_unsafe_, &connectivity_changed_closure_,
+ subchannel_list_->inhibit_health_checking());
}
template <typename SubchannelListType, typename SubchannelDataType>
@@ -359,7 +363,8 @@ void SubchannelData<SubchannelListType,
GPR_ASSERT(connectivity_notification_pending_);
grpc_subchannel_notify_on_state_change(
subchannel_, subchannel_list_->policy()->interested_parties(),
- &pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
+ &pending_connectivity_state_unsafe_, &connectivity_changed_closure_,
+ subchannel_list_->inhibit_health_checking());
}
template <typename SubchannelListType, typename SubchannelDataType>
@@ -390,8 +395,9 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
subchannel_, reason);
}
GPR_ASSERT(connectivity_notification_pending_);
- grpc_subchannel_notify_on_state_change(subchannel_, nullptr, nullptr,
- &connectivity_changed_closure_);
+ grpc_subchannel_notify_on_state_change(
+ subchannel_, nullptr, nullptr, &connectivity_changed_closure_,
+ subchannel_list_->inhibit_health_checking());
}
template <typename SubchannelListType, typename SubchannelDataType>
@@ -499,8 +505,13 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
subchannels_.reserve(addresses->num_addresses);
// We need to remove the LB addresses in order to be able to compare the
// subchannel keys of subchannels from a different batch of addresses.
+ // We also remove the inhibit-health-checking arg, since we are
+ // handling that here.
+ inhibit_health_checking_ = grpc_channel_arg_get_bool(
+ grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
- GRPC_ARG_LB_ADDRESSES};
+ GRPC_ARG_LB_ADDRESSES,
+ GRPC_ARG_INHIBIT_HEALTH_CHECKING};
// Create a subchannel for each address.
grpc_subchannel_args sc_args;
for (size_t i = 0; i < addresses->num_addresses; i++) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
new file mode 100644
index 0000000000..7fb4cbdcd2
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -0,0 +1,1869 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/// Implementation of the gRPC LB policy.
+///
+/// This policy takes as input a list of resolved addresses, which must
+/// include at least one balancer address.
+///
+/// An internal channel (\a lb_channel_) is created for the addresses
+/// from that are balancers. This channel behaves just like a regular
+/// channel that uses pick_first to select from the list of balancer
+/// addresses.
+///
+/// The first time the policy gets a request for a pick, a ping, or to exit
+/// the idle state, \a StartPickingLocked() is called. This method is
+/// responsible for instantiating the internal *streaming* call to the LB
+/// server (whichever address pick_first chose). The call will be complete
+/// when either the balancer sends status or when we cancel the call (e.g.,
+/// because we are shutting down). In needed, we retry the call. If we
+/// received at least one valid message from the server, a new call attempt
+/// will be made immediately; otherwise, we apply back-off delays between
+/// attempts.
+///
+/// We maintain an internal round_robin policy instance for distributing
+/// requests across backends. Whenever we receive a new serverlist from
+/// the balancer, we update the round_robin policy with the new list of
+/// addresses. If we cannot communicate with the balancer on startup,
+/// however, we may enter fallback mode, in which case we will populate
+/// the RR policy's addresses from the backend addresses returned by the
+/// resolver.
+///
+/// Once an RR policy instance is in place (and getting updated as described),
+/// calls for a pick, a ping, or a cancellation will be serviced right
+/// away by forwarding them to the RR instance. Any time there's no RR
+/// policy available (i.e., right after the creation of the gRPCLB policy),
+/// pick and ping requests are added to a list of pending picks and pings
+/// to be flushed and serviced when the RR policy instance becomes available.
+///
+/// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
+/// high level design and details.
+
+// With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
+// using that endpoint. Because of various transitive includes in uv.h,
+// including windows.h on Windows, uv.h must be included before other system
+// headers. Therefore, sockaddr.h must always be included first.
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/socket_utils.h"
+
+#include <inttypes.h>
+#include <limits.h>
+#include <string.h>
+
+#include <grpc/byte_buffer_reader.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
+
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/client_channel_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
+#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h"
+#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h"
+#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
+#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
+#include "src/core/lib/backoff/backoff.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/gpr/host_port.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
+#include "src/core/lib/gprpp/orphanable.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/combiner.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/slice/slice_hash_table.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/surface/call.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/channel_init.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+#define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
+#define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
+#define GRPC_XDS_RECONNECT_JITTER 0.2
+#define GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS 10000
+
+namespace grpc_core {
+
+TraceFlag grpc_lb_xds_trace(false, "xds");
+
+namespace {
+
+class XdsLb : public LoadBalancingPolicy {
+ public:
+ XdsLb(const grpc_lb_addresses* addresses, const Args& args);
+
+ void UpdateLocked(const grpc_channel_args& args) override;
+ bool PickLocked(PickState* pick, grpc_error** error) override;
+ void CancelPickLocked(PickState* pick, grpc_error* error) override;
+ void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error* error) override;
+ void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
+ grpc_closure* closure) override;
+ grpc_connectivity_state CheckConnectivityLocked(
+ grpc_error** connectivity_error) override;
+ void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
+ void ExitIdleLocked() override;
+ void ResetBackoffLocked() override;
+ void FillChildRefsForChannelz(
+ channelz::ChildRefsList* child_subchannels,
+ channelz::ChildRefsList* child_channels) override;
+
+ private:
+ /// Linked list of pending pick requests. It stores all information needed to
+ /// eventually call (Round Robin's) pick() on them. They mainly stay pending
+ /// waiting for the RR policy to be created.
+ ///
+ /// Note that when a pick is sent to the RR policy, we inject our own
+ /// on_complete callback, so that we can intercept the result before
+ /// invoking the original on_complete callback. This allows us to set the
+ /// LB token metadata and add client_stats to the call context.
+ /// See \a pending_pick_complete() for details.
+ struct PendingPick {
+ // The xds lb instance that created the wrapping. This instance is not
+ // owned; reference counts are untouched. It's used only for logging
+ // purposes.
+ XdsLb* xdslb_policy;
+ // The original pick.
+ PickState* pick;
+ // Our on_complete closure and the original one.
+ grpc_closure on_complete;
+ grpc_closure* original_on_complete;
+ // The LB token associated with the pick. This is set via user_data in
+ // the pick.
+ grpc_mdelem lb_token;
+ // Stats for client-side load reporting.
+ RefCountedPtr<XdsLbClientStats> client_stats;
+ // Next pending pick.
+ PendingPick* next = nullptr;
+ };
+
+ /// Contains a call to the LB server and all the data related to the call.
+ class BalancerCallState
+ : public InternallyRefCountedWithTracing<BalancerCallState> {
+ public:
+ explicit BalancerCallState(
+ RefCountedPtr<LoadBalancingPolicy> parent_xdslb_policy);
+
+ // It's the caller's responsibility to ensure that Orphan() is called from
+ // inside the combiner.
+ void Orphan() override;
+
+ void StartQuery();
+
+ XdsLbClientStats* client_stats() const { return client_stats_.get(); }
+
+ bool seen_initial_response() const { return seen_initial_response_; }
+
+ private:
+ // So Delete() can access our private dtor.
+ template <typename T>
+ friend void grpc_core::Delete(T*);
+
+ ~BalancerCallState();
+
+ XdsLb* xdslb_policy() const {
+ return static_cast<XdsLb*>(xdslb_policy_.get());
+ }
+
+ void ScheduleNextClientLoadReportLocked();
+ void SendClientLoadReportLocked();
+
+ static bool LoadReportCountersAreZero(xds_grpclb_request* request);
+
+ static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
+ static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
+ static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
+ static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
+ static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
+
+ // The owning LB policy.
+ RefCountedPtr<LoadBalancingPolicy> xdslb_policy_;
+
+ // The streaming call to the LB server. Always non-NULL.
+ grpc_call* lb_call_ = nullptr;
+
+ // recv_initial_metadata
+ grpc_metadata_array lb_initial_metadata_recv_;
+
+ // send_message
+ grpc_byte_buffer* send_message_payload_ = nullptr;
+ grpc_closure lb_on_initial_request_sent_;
+
+ // recv_message
+ grpc_byte_buffer* recv_message_payload_ = nullptr;
+ grpc_closure lb_on_balancer_message_received_;
+ bool seen_initial_response_ = false;
+
+ // recv_trailing_metadata
+ grpc_closure lb_on_balancer_status_received_;
+ grpc_metadata_array lb_trailing_metadata_recv_;
+ grpc_status_code lb_call_status_;
+ grpc_slice lb_call_status_details_;
+
+ // The stats for client-side load reporting associated with this LB call.
+ // Created after the first serverlist is received.
+ RefCountedPtr<XdsLbClientStats> client_stats_;
+ grpc_millis client_stats_report_interval_ = 0;
+ grpc_timer client_load_report_timer_;
+ bool client_load_report_timer_callback_pending_ = false;
+ bool last_client_load_report_counters_were_zero_ = false;
+ bool client_load_report_is_due_ = false;
+ // The closure used for either the load report timer or the callback for
+ // completion of sending the load report.
+ grpc_closure client_load_report_closure_;
+ };
+
+ ~XdsLb();
+
+ void ShutdownLocked() override;
+
+ // Helper function used in ctor and UpdateLocked().
+ void ProcessChannelArgsLocked(const grpc_channel_args& args);
+
+ // Methods for dealing with the balancer channel and call.
+ void StartPickingLocked();
+ void StartBalancerCallLocked();
+ static void OnFallbackTimerLocked(void* arg, grpc_error* error);
+ void StartBalancerCallRetryTimerLocked();
+ static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
+ static void OnBalancerChannelConnectivityChangedLocked(void* arg,
+ grpc_error* error);
+
+ // Pending pick methods.
+ static void PendingPickSetMetadataAndContext(PendingPick* pp);
+ PendingPick* PendingPickCreate(PickState* pick);
+ void AddPendingPick(PendingPick* pp);
+ static void OnPendingPickComplete(void* arg, grpc_error* error);
+
+ // Methods for dealing with the RR policy.
+ void CreateOrUpdateRoundRobinPolicyLocked();
+ grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
+ void CreateRoundRobinPolicyLocked(const Args& args);
+ bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
+ grpc_error** error);
+ void UpdateConnectivityStateFromRoundRobinPolicyLocked(
+ grpc_error* rr_state_error);
+ static void OnRoundRobinConnectivityChangedLocked(void* arg,
+ grpc_error* error);
+ static void OnRoundRobinRequestReresolutionLocked(void* arg,
+ grpc_error* error);
+
+ // Who the client is trying to communicate with.
+ const char* server_name_ = nullptr;
+
+ // Current channel args from the resolver.
+ grpc_channel_args* args_ = nullptr;
+
+ // Internal state.
+ bool started_picking_ = false;
+ bool shutting_down_ = false;
+ grpc_connectivity_state_tracker state_tracker_;
+
+ // The channel for communicating with the LB server.
+ grpc_channel* lb_channel_ = nullptr;
+ // Mutex to protect the channel to the LB server. This is used when
+ // processing a channelz request.
+ gpr_mu lb_channel_mu_;
+ grpc_connectivity_state lb_channel_connectivity_;
+ grpc_closure lb_channel_on_connectivity_changed_;
+ // Are we already watching the LB channel's connectivity?
+ bool watching_lb_channel_ = false;
+ // Response generator to inject address updates into lb_channel_.
+ RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
+
+ // The data associated with the current LB call. It holds a ref to this LB
+ // policy. It's initialized every time we query for backends. It's reset to
+ // NULL whenever the current LB call is no longer needed (e.g., the LB policy
+ // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
+ // contains a non-NULL lb_call_.
+ OrphanablePtr<BalancerCallState> lb_calld_;
+ // Timeout in milliseconds for the LB call. 0 means no deadline.
+ int lb_call_timeout_ms_ = 0;
+ // Balancer call retry state.
+ BackOff lb_call_backoff_;
+ bool retry_timer_callback_pending_ = false;
+ grpc_timer lb_call_retry_timer_;
+ grpc_closure lb_on_call_retry_;
+
+ // The deserialized response from the balancer. May be nullptr until one
+ // such response has arrived.
+ xds_grpclb_serverlist* serverlist_ = nullptr;
+ // Index into serverlist for next pick.
+ // If the server at this index is a drop, we return a drop.
+ // Otherwise, we delegate to the RR policy.
+ size_t serverlist_index_ = 0;
+
+ // Timeout in milliseconds for before using fallback backend addresses.
+ // 0 means not using fallback.
+ int lb_fallback_timeout_ms_ = 0;
+ // The backend addresses from the resolver.
+ grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
+ // Fallback timer.
+ bool fallback_timer_callback_pending_ = false;
+ grpc_timer lb_fallback_timer_;
+ grpc_closure lb_on_fallback_;
+
+ // Pending picks that are waiting on the RR policy's connectivity.
+ PendingPick* pending_picks_ = nullptr;
+
+ // The RR policy to use for the backends.
+ OrphanablePtr<LoadBalancingPolicy> rr_policy_;
+ grpc_connectivity_state rr_connectivity_state_;
+ grpc_closure on_rr_connectivity_changed_;
+ grpc_closure on_rr_request_reresolution_;
+};
+
+//
+// serverlist parsing code
+//
+
+// vtable for LB tokens in grpc_lb_addresses
+void* lb_token_copy(void* token) {
+ return token == nullptr
+ ? nullptr
+ : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
+}
+void lb_token_destroy(void* token) {
+ if (token != nullptr) {
+ GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
+ }
+}
+int lb_token_cmp(void* token1, void* token2) {
+ if (token1 > token2) return 1;
+ if (token1 < token2) return -1;
+ return 0;
+}
+const grpc_lb_user_data_vtable lb_token_vtable = {
+ lb_token_copy, lb_token_destroy, lb_token_cmp};
+
+// Returns the backend addresses extracted from the given addresses.
+grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
+ // First pass: count the number of backend addresses.
+ size_t num_backends = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) {
+ ++num_backends;
+ }
+ }
+ // Second pass: actually populate the addresses and (empty) LB tokens.
+ grpc_lb_addresses* backend_addresses =
+ grpc_lb_addresses_create(num_backends, &lb_token_vtable);
+ size_t num_copied = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) continue;
+ const grpc_resolved_address* addr = &addresses->addresses[i].address;
+ grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
+ addr->len, false /* is_balancer */,
+ nullptr /* balancer_name */,
+ (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
+ ++num_copied;
+ }
+ return backend_addresses;
+}
+
+bool IsServerValid(const xds_grpclb_server* server, size_t idx, bool log) {
+ if (server->drop) return false;
+ const xds_grpclb_ip_address* ip = &server->ip_address;
+ if (GPR_UNLIKELY(server->port >> 16 != 0)) {
+ if (log) {
+ gpr_log(GPR_ERROR,
+ "Invalid port '%d' at index %lu of serverlist. Ignoring.",
+ server->port, (unsigned long)idx);
+ }
+ return false;
+ }
+ if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) {
+ if (log) {
+ gpr_log(GPR_ERROR,
+ "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
+ "serverlist. Ignoring",
+ ip->size, (unsigned long)idx);
+ }
+ return false;
+ }
+ return true;
+}
+
+void ParseServer(const xds_grpclb_server* server, grpc_resolved_address* addr) {
+ memset(addr, 0, sizeof(*addr));
+ if (server->drop) return;
+ const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
+ /* the addresses are given in binary format (a in(6)_addr struct) in
+ * server->ip_address.bytes. */
+ const xds_grpclb_ip_address* ip = &server->ip_address;
+ if (ip->size == 4) {
+ addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
+ grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
+ addr4->sin_family = GRPC_AF_INET;
+ memcpy(&addr4->sin_addr, ip->bytes, ip->size);
+ addr4->sin_port = netorder_port;
+ } else if (ip->size == 16) {
+ addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
+ grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
+ addr6->sin6_family = GRPC_AF_INET6;
+ memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
+ addr6->sin6_port = netorder_port;
+ }
+}
+
+// Returns addresses extracted from \a serverlist.
+grpc_lb_addresses* ProcessServerlist(const xds_grpclb_serverlist* serverlist) {
+ size_t num_valid = 0;
+ /* first pass: count how many are valid in order to allocate the necessary
+ * memory in a single block */
+ for (size_t i = 0; i < serverlist->num_servers; ++i) {
+ if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
+ }
+ grpc_lb_addresses* lb_addresses =
+ grpc_lb_addresses_create(num_valid, &lb_token_vtable);
+ /* second pass: actually populate the addresses and LB tokens (aka user data
+ * to the outside world) to be read by the RR policy during its creation.
+ * Given that the validity tests are very cheap, they are performed again
+ * instead of marking the valid ones during the first pass, as this would
+ * incurr in an allocation due to the arbitrary number of server */
+ size_t addr_idx = 0;
+ for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
+ const xds_grpclb_server* server = serverlist->servers[sl_idx];
+ if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
+ GPR_ASSERT(addr_idx < num_valid);
+ /* address processing */
+ grpc_resolved_address addr;
+ ParseServer(server, &addr);
+ /* lb token processing */
+ void* user_data;
+ if (server->has_load_balance_token) {
+ const size_t lb_token_max_length =
+ GPR_ARRAY_SIZE(server->load_balance_token);
+ const size_t lb_token_length =
+ strnlen(server->load_balance_token, lb_token_max_length);
+ grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
+ server->load_balance_token, lb_token_length);
+ user_data =
+ (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
+ .payload;
+ } else {
+ char* uri = grpc_sockaddr_to_uri(&addr);
+ gpr_log(GPR_INFO,
+ "Missing LB token for backend address '%s'. The empty token will "
+ "be used instead",
+ uri);
+ gpr_free(uri);
+ user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
+ }
+ grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
+ false /* is_balancer */,
+ nullptr /* balancer_name */, user_data);
+ ++addr_idx;
+ }
+ GPR_ASSERT(addr_idx == num_valid);
+ return lb_addresses;
+}
+
+//
+// XdsLb::BalancerCallState
+//
+
+XdsLb::BalancerCallState::BalancerCallState(
+ RefCountedPtr<LoadBalancingPolicy> parent_xdslb_policy)
+ : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_xds_trace),
+ xdslb_policy_(std::move(parent_xdslb_policy)) {
+ GPR_ASSERT(xdslb_policy_ != nullptr);
+ GPR_ASSERT(!xdslb_policy()->shutting_down_);
+ // Init the LB call. Note that the LB call will progress every time there's
+ // activity in xdslb_policy_->interested_parties(), which is comprised of
+ // the polling entities from client_channel.
+ GPR_ASSERT(xdslb_policy()->server_name_ != nullptr);
+ GPR_ASSERT(xdslb_policy()->server_name_[0] != '\0');
+ const grpc_millis deadline =
+ xdslb_policy()->lb_call_timeout_ms_ == 0
+ ? GRPC_MILLIS_INF_FUTURE
+ : ExecCtx::Get()->Now() + xdslb_policy()->lb_call_timeout_ms_;
+ lb_call_ = grpc_channel_create_pollset_set_call(
+ xdslb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
+ xdslb_policy_->interested_parties(),
+ GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
+ nullptr, deadline, nullptr);
+ // Init the LB call request payload.
+ xds_grpclb_request* request =
+ xds_grpclb_request_create(xdslb_policy()->server_name_);
+ grpc_slice request_payload_slice = xds_grpclb_request_encode(request);
+ send_message_payload_ =
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+ grpc_slice_unref_internal(request_payload_slice);
+ xds_grpclb_request_destroy(request);
+ // Init other data associated with the LB call.
+ grpc_metadata_array_init(&lb_initial_metadata_recv_);
+ grpc_metadata_array_init(&lb_trailing_metadata_recv_);
+ GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
+ this, grpc_combiner_scheduler(xdslb_policy()->combiner()));
+ GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
+ OnBalancerMessageReceivedLocked, this,
+ grpc_combiner_scheduler(xdslb_policy()->combiner()));
+ GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
+ OnBalancerStatusReceivedLocked, this,
+ grpc_combiner_scheduler(xdslb_policy()->combiner()));
+}
+
+XdsLb::BalancerCallState::~BalancerCallState() {
+ GPR_ASSERT(lb_call_ != nullptr);
+ grpc_call_unref(lb_call_);
+ grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
+ grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
+ grpc_byte_buffer_destroy(send_message_payload_);
+ grpc_byte_buffer_destroy(recv_message_payload_);
+ grpc_slice_unref_internal(lb_call_status_details_);
+}
+
+void XdsLb::BalancerCallState::Orphan() {
+ GPR_ASSERT(lb_call_ != nullptr);
+ // If we are here because xdslb_policy wants to cancel the call,
+ // lb_on_balancer_status_received_ will complete the cancellation and clean
+ // up. Otherwise, we are here because xdslb_policy has to orphan a failed
+ // call, then the following cancellation will be a no-op.
+ grpc_call_cancel(lb_call_, nullptr);
+ if (client_load_report_timer_callback_pending_) {
+ grpc_timer_cancel(&client_load_report_timer_);
+ }
+ // Note that the initial ref is hold by lb_on_balancer_status_received_
+ // instead of the caller of this function. So the corresponding unref happens
+ // in lb_on_balancer_status_received_ instead of here.
+}
+
+void XdsLb::BalancerCallState::StartQuery() {
+ GPR_ASSERT(lb_call_ != nullptr);
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO, "[xdslb %p] Starting LB call (lb_calld: %p, lb_call: %p)",
+ xdslb_policy_.get(), this, lb_call_);
+ }
+ // Create the ops.
+ grpc_call_error call_error;
+ grpc_op ops[3];
+ memset(ops, 0, sizeof(ops));
+ // Op: send initial metadata.
+ grpc_op* op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ // Op: send request message.
+ GPR_ASSERT(send_message_payload_ != nullptr);
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message.send_message = send_message_payload_;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
+ self.release();
+ call_error = grpc_call_start_batch_and_execute(
+ lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ // Op: recv initial metadata.
+ op = ops;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata.recv_initial_metadata =
+ &lb_initial_metadata_recv_;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ // Op: recv response.
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message.recv_message = &recv_message_payload_;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ self = Ref(DEBUG_LOCATION, "on_message_received");
+ self.release();
+ call_error = grpc_call_start_batch_and_execute(
+ lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ // Op: recv server status.
+ op = ops;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata =
+ &lb_trailing_metadata_recv_;
+ op->data.recv_status_on_client.status = &lb_call_status_;
+ op->data.recv_status_on_client.status_details = &lb_call_status_details_;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ // This callback signals the end of the LB call, so it relies on the initial
+ // ref instead of a new ref. When it's invoked, it's the initial ref that is
+ // unreffed.
+ call_error = grpc_call_start_batch_and_execute(
+ lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+}
+
+void XdsLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
+ const grpc_millis next_client_load_report_time =
+ ExecCtx::Get()->Now() + client_stats_report_interval_;
+ GRPC_CLOSURE_INIT(&client_load_report_closure_,
+ MaybeSendClientLoadReportLocked, this,
+ grpc_combiner_scheduler(xdslb_policy()->combiner()));
+ grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
+ &client_load_report_closure_);
+ client_load_report_timer_callback_pending_ = true;
+}
+
+void XdsLb::BalancerCallState::MaybeSendClientLoadReportLocked(
+ void* arg, grpc_error* error) {
+ BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
+ XdsLb* xdslb_policy = lb_calld->xdslb_policy();
+ lb_calld->client_load_report_timer_callback_pending_ = false;
+ if (error != GRPC_ERROR_NONE || lb_calld != xdslb_policy->lb_calld_.get()) {
+ lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
+ return;
+ }
+ // If we've already sent the initial request, then we can go ahead and send
+ // the load report. Otherwise, we need to wait until the initial request has
+ // been sent to send this (see OnInitialRequestSentLocked()).
+ if (lb_calld->send_message_payload_ == nullptr) {
+ lb_calld->SendClientLoadReportLocked();
+ } else {
+ lb_calld->client_load_report_is_due_ = true;
+ }
+}
+
+bool XdsLb::BalancerCallState::LoadReportCountersAreZero(
+ xds_grpclb_request* request) {
+ XdsLbClientStats::DroppedCallCounts* drop_entries =
+ static_cast<XdsLbClientStats::DroppedCallCounts*>(
+ request->client_stats.calls_finished_with_drop.arg);
+ return request->client_stats.num_calls_started == 0 &&
+ request->client_stats.num_calls_finished == 0 &&
+ request->client_stats.num_calls_finished_with_client_failed_to_send ==
+ 0 &&
+ request->client_stats.num_calls_finished_known_received == 0 &&
+ (drop_entries == nullptr || drop_entries->empty());
+}
+
+void XdsLb::BalancerCallState::SendClientLoadReportLocked() {
+ // Construct message payload.
+ GPR_ASSERT(send_message_payload_ == nullptr);
+ xds_grpclb_request* request =
+ xds_grpclb_load_report_request_create_locked(client_stats_.get());
+ // Skip client load report if the counters were all zero in the last
+ // report and they are still zero in this one.
+ if (LoadReportCountersAreZero(request)) {
+ if (last_client_load_report_counters_were_zero_) {
+ xds_grpclb_request_destroy(request);
+ ScheduleNextClientLoadReportLocked();
+ return;
+ }
+ last_client_load_report_counters_were_zero_ = true;
+ } else {
+ last_client_load_report_counters_were_zero_ = false;
+ }
+ grpc_slice request_payload_slice = xds_grpclb_request_encode(request);
+ send_message_payload_ =
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+ grpc_slice_unref_internal(request_payload_slice);
+ xds_grpclb_request_destroy(request);
+ // Send the report.
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_SEND_MESSAGE;
+ op.data.send_message.send_message = send_message_payload_;
+ GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
+ this, grpc_combiner_scheduler(xdslb_policy()->combiner()));
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ lb_call_, &op, 1, &client_load_report_closure_);
+ if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
+ gpr_log(GPR_ERROR, "[xdslb %p] call_error=%d", xdslb_policy_.get(),
+ call_error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ }
+}
+
+void XdsLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
+ grpc_error* error) {
+ BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
+ XdsLb* xdslb_policy = lb_calld->xdslb_policy();
+ grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
+ lb_calld->send_message_payload_ = nullptr;
+ if (error != GRPC_ERROR_NONE || lb_calld != xdslb_policy->lb_calld_.get()) {
+ lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
+ return;
+ }
+ lb_calld->ScheduleNextClientLoadReportLocked();
+}
+
+void XdsLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
+ grpc_error* error) {
+ BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
+ grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
+ lb_calld->send_message_payload_ = nullptr;
+ // If we attempted to send a client load report before the initial request was
+ // sent (and this lb_calld is still in use), send the load report now.
+ if (lb_calld->client_load_report_is_due_ &&
+ lb_calld == lb_calld->xdslb_policy()->lb_calld_.get()) {
+ lb_calld->SendClientLoadReportLocked();
+ lb_calld->client_load_report_is_due_ = false;
+ }
+ lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
+}
+
+void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked(
+ void* arg, grpc_error* error) {
+ BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
+ XdsLb* xdslb_policy = lb_calld->xdslb_policy();
+ // Empty payload means the LB call was cancelled.
+ if (lb_calld != xdslb_policy->lb_calld_.get() ||
+ lb_calld->recv_message_payload_ == nullptr) {
+ lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
+ return;
+ }
+ grpc_byte_buffer_reader bbr;
+ grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
+ grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
+ grpc_byte_buffer_reader_destroy(&bbr);
+ grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
+ lb_calld->recv_message_payload_ = nullptr;
+ xds_grpclb_initial_response* initial_response;
+ xds_grpclb_serverlist* serverlist;
+ if (!lb_calld->seen_initial_response_ &&
+ (initial_response = xds_grpclb_initial_response_parse(response_slice)) !=
+ nullptr) {
+ // Have NOT seen initial response, look for initial response.
+ if (initial_response->has_client_stats_report_interval) {
+ lb_calld->client_stats_report_interval_ = GPR_MAX(
+ GPR_MS_PER_SEC, xds_grpclb_duration_to_millis(
+ &initial_response->client_stats_report_interval));
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[xdslb %p] Received initial LB response message; "
+ "client load reporting interval = %" PRId64 " milliseconds",
+ xdslb_policy, lb_calld->client_stats_report_interval_);
+ }
+ } else if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[xdslb %p] Received initial LB response message; client load "
+ "reporting NOT enabled",
+ xdslb_policy);
+ }
+ xds_grpclb_initial_response_destroy(initial_response);
+ lb_calld->seen_initial_response_ = true;
+ } else if ((serverlist = xds_grpclb_response_parse_serverlist(
+ response_slice)) != nullptr) {
+ // Have seen initial response, look for serverlist.
+ GPR_ASSERT(lb_calld->lb_call_ != nullptr);
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[xdslb %p] Serverlist with %" PRIuPTR " servers received",
+ xdslb_policy, serverlist->num_servers);
+ for (size_t i = 0; i < serverlist->num_servers; ++i) {
+ grpc_resolved_address addr;
+ ParseServer(serverlist->servers[i], &addr);
+ char* ipport;
+ grpc_sockaddr_to_string(&ipport, &addr, false);
+ gpr_log(GPR_INFO, "[xdslb %p] Serverlist[%" PRIuPTR "]: %s",
+ xdslb_policy, i, ipport);
+ gpr_free(ipport);
+ }
+ }
+ /* update serverlist */
+ if (serverlist->num_servers > 0) {
+ // Start sending client load report only after we start using the
+ // serverlist returned from the current LB call.
+ if (lb_calld->client_stats_report_interval_ > 0 &&
+ lb_calld->client_stats_ == nullptr) {
+ lb_calld->client_stats_.reset(New<XdsLbClientStats>());
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
+ self.release();
+ lb_calld->ScheduleNextClientLoadReportLocked();
+ }
+ if (xds_grpclb_serverlist_equals(xdslb_policy->serverlist_, serverlist)) {
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[xdslb %p] Incoming server list identical to current, "
+ "ignoring.",
+ xdslb_policy);
+ }
+ xds_grpclb_destroy_serverlist(serverlist);
+ } else { /* new serverlist */
+ if (xdslb_policy->serverlist_ != nullptr) {
+ /* dispose of the old serverlist */
+ xds_grpclb_destroy_serverlist(xdslb_policy->serverlist_);
+ } else {
+ /* or dispose of the fallback */
+ grpc_lb_addresses_destroy(xdslb_policy->fallback_backend_addresses_);
+ xdslb_policy->fallback_backend_addresses_ = nullptr;
+ if (xdslb_policy->fallback_timer_callback_pending_) {
+ grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_);
+ }
+ }
+ // and update the copy in the XdsLb instance. This
+ // serverlist instance will be destroyed either upon the next
+ // update or when the XdsLb instance is destroyed.
+ xdslb_policy->serverlist_ = serverlist;
+ xdslb_policy->serverlist_index_ = 0;
+ xdslb_policy->CreateOrUpdateRoundRobinPolicyLocked();
+ }
+ } else {
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO, "[xdslb %p] Received empty server list, ignoring.",
+ xdslb_policy);
+ }
+ xds_grpclb_destroy_serverlist(serverlist);
+ }
+ } else {
+ // No valid initial response or serverlist found.
+ char* response_slice_str =
+ grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
+ gpr_log(GPR_ERROR,
+ "[xdslb %p] Invalid LB response received: '%s'. Ignoring.",
+ xdslb_policy, response_slice_str);
+ gpr_free(response_slice_str);
+ }
+ grpc_slice_unref_internal(response_slice);
+ if (!xdslb_policy->shutting_down_) {
+ // Keep listening for serverlist updates.
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_RECV_MESSAGE;
+ op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
+ op.flags = 0;
+ op.reserved = nullptr;
+ // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
+ const grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ lb_calld->lb_call_, &op, 1,
+ &lb_calld->lb_on_balancer_message_received_);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ } else {
+ lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
+ }
+}
+
+void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked(
+ void* arg, grpc_error* error) {
+ BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
+ XdsLb* xdslb_policy = lb_calld->xdslb_policy();
+ GPR_ASSERT(lb_calld->lb_call_ != nullptr);
+ if (grpc_lb_xds_trace.enabled()) {
+ char* status_details =
+ grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
+ gpr_log(GPR_INFO,
+ "[xdslb %p] Status from LB server received. Status = %d, details "
+ "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
+ xdslb_policy, lb_calld->lb_call_status_, status_details, lb_calld,
+ lb_calld->lb_call_, grpc_error_string(error));
+ gpr_free(status_details);
+ }
+ xdslb_policy->TryReresolutionLocked(&grpc_lb_xds_trace, GRPC_ERROR_NONE);
+ // If this lb_calld is still in use, this call ended because of a failure so
+ // we want to retry connecting. Otherwise, we have deliberately ended this
+ // call and no further action is required.
+ if (lb_calld == xdslb_policy->lb_calld_.get()) {
+ xdslb_policy->lb_calld_.reset();
+ GPR_ASSERT(!xdslb_policy->shutting_down_);
+ if (lb_calld->seen_initial_response_) {
+ // If we lose connection to the LB server, reset the backoff and restart
+ // the LB call immediately.
+ xdslb_policy->lb_call_backoff_.Reset();
+ xdslb_policy->StartBalancerCallLocked();
+ } else {
+ // If this LB call fails establishing any connection to the LB server,
+ // retry later.
+ xdslb_policy->StartBalancerCallRetryTimerLocked();
+ }
+ }
+ lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
+}
+
+//
+// helper code for creating balancer channel
+//
+
+grpc_lb_addresses* ExtractBalancerAddresses(
+ const grpc_lb_addresses* addresses) {
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ }
+ // There must be at least one balancer address, or else the
+ // client_channel would not have chosen this LB policy.
+ GPR_ASSERT(num_grpclb_addrs > 0);
+ grpc_lb_addresses* lb_addresses =
+ grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
+ size_t lb_addresses_idx = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) continue;
+ if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+ grpc_lb_addresses_set_address(
+ lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
+ addresses->addresses[i].address.len, false /* is balancer */,
+ addresses->addresses[i].balancer_name, nullptr /* user data */);
+ }
+ GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
+ return lb_addresses;
+}
+
+/* Returns the channel args for the LB channel, used to create a bidirectional
+ * stream for the reception of load balancing updates.
+ *
+ * Inputs:
+ * - \a addresses: corresponding to the balancers.
+ * - \a response_generator: in order to propagate updates from the resolver
+ * above the grpclb policy.
+ * - \a args: other args inherited from the grpclb policy. */
+grpc_channel_args* BuildBalancerChannelArgs(
+ const grpc_lb_addresses* addresses,
+ FakeResolverResponseGenerator* response_generator,
+ const grpc_channel_args* args) {
+ grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses);
+ // Channel args to remove.
+ static const char* args_to_remove[] = {
+ // LB policy name, since we want to use the default (pick_first) in
+ // the LB channel.
+ GRPC_ARG_LB_POLICY_NAME,
+ // The channel arg for the server URI, since that will be different for
+ // the LB channel than for the parent channel. The client channel
+ // factory will re-add this arg with the right value.
+ GRPC_ARG_SERVER_URI,
+ // The resolved addresses, which will be generated by the name resolver
+ // used in the LB channel. Note that the LB channel will use the fake
+ // resolver, so this won't actually generate a query to DNS (or some
+ // other name service). However, the addresses returned by the fake
+ // resolver will have is_balancer=false, whereas our own addresses have
+ // is_balancer=true. We need the LB channel to return addresses with
+ // is_balancer=false so that it does not wind up recursively using the
+ // grpclb LB policy, as per the special case logic in client_channel.c.
+ GRPC_ARG_LB_ADDRESSES,
+ // The fake resolver response generator, because we are replacing it
+ // with the one from the grpclb policy, used to propagate updates to
+ // the LB channel.
+ GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
+ // The LB channel should use the authority indicated by the target
+ // authority table (see \a grpc_lb_policy_xds_modify_lb_channel_args),
+ // as opposed to the authority from the parent channel.
+ GRPC_ARG_DEFAULT_AUTHORITY,
+ // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
+ // treated as a stand-alone channel and not inherit this argument from the
+ // args of the parent channel.
+ GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
+ };
+ // Channel args to add.
+ const grpc_arg args_to_add[] = {
+ // New LB addresses.
+ // Note that we pass these in both when creating the LB channel
+ // and via the fake resolver. The latter is what actually gets used.
+ grpc_lb_addresses_create_channel_arg(lb_addresses),
+ // The fake resolver response generator, which we use to inject
+ // address updates into the LB channel.
+ grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
+ response_generator),
+ // A channel arg indicating the target is a grpclb load balancer.
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER), 1),
+ // A channel arg indicating this is an internal channels, aka it is
+ // owned by components in Core, not by the user application.
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1),
+ };
+ // Construct channel args.
+ grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
+ args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
+ GPR_ARRAY_SIZE(args_to_add));
+ // Make any necessary modifications for security.
+ new_args = grpc_lb_policy_xds_modify_lb_channel_args(new_args);
+ // Clean up.
+ grpc_lb_addresses_destroy(lb_addresses);
+ return new_args;
+}
+
+//
+// ctor and dtor
+//
+
+XdsLb::XdsLb(const grpc_lb_addresses* addresses,
+ const LoadBalancingPolicy::Args& args)
+ : LoadBalancingPolicy(args),
+ response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
+ lb_call_backoff_(
+ BackOff::Options()
+ .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
+ 1000)
+ .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_XDS_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
+ // Initialization.
+ gpr_mu_init(&lb_channel_mu_);
+ grpc_subchannel_index_ref();
+ GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
+ &XdsLb::OnBalancerChannelConnectivityChangedLocked, this,
+ grpc_combiner_scheduler(args.combiner));
+ GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_,
+ &XdsLb::OnRoundRobinConnectivityChangedLocked, this,
+ grpc_combiner_scheduler(args.combiner));
+ GRPC_CLOSURE_INIT(&on_rr_request_reresolution_,
+ &XdsLb::OnRoundRobinRequestReresolutionLocked, this,
+ grpc_combiner_scheduler(args.combiner));
+ grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "xds");
+ // Record server name.
+ const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
+ const char* server_uri = grpc_channel_arg_get_string(arg);
+ GPR_ASSERT(server_uri != nullptr);
+ grpc_uri* uri = grpc_uri_parse(server_uri, true);
+ GPR_ASSERT(uri->path[0] != '\0');
+ server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[xdslb %p] Will use '%s' as the server name for LB request.", this,
+ server_name_);
+ }
+ grpc_uri_destroy(uri);
+ // Record LB call timeout.
+ arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
+ lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
+ // Record fallback timeout.
+ arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
+ lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
+ arg, {GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
+ // Process channel args.
+ ProcessChannelArgsLocked(*args.args);
+}
+
+XdsLb::~XdsLb() {
+ GPR_ASSERT(pending_picks_ == nullptr);
+ gpr_mu_destroy(&lb_channel_mu_);
+ gpr_free((void*)server_name_);
+ grpc_channel_args_destroy(args_);
+ grpc_connectivity_state_destroy(&state_tracker_);
+ if (serverlist_ != nullptr) {
+ xds_grpclb_destroy_serverlist(serverlist_);
+ }
+ if (fallback_backend_addresses_ != nullptr) {
+ grpc_lb_addresses_destroy(fallback_backend_addresses_);
+ }
+ grpc_subchannel_index_unref();
+}
+
+void XdsLb::ShutdownLocked() {
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
+ shutting_down_ = true;
+ lb_calld_.reset();
+ if (retry_timer_callback_pending_) {
+ grpc_timer_cancel(&lb_call_retry_timer_);
+ }
+ if (fallback_timer_callback_pending_) {
+ grpc_timer_cancel(&lb_fallback_timer_);
+ }
+ rr_policy_.reset();
+ TryReresolutionLocked(&grpc_lb_xds_trace, GRPC_ERROR_CANCELLED);
+ // We destroy the LB channel here instead of in our destructor because
+ // destroying the channel triggers a last callback to
+ // OnBalancerChannelConnectivityChangedLocked(), and we need to be
+ // alive when that callback is invoked.
+ if (lb_channel_ != nullptr) {
+ gpr_mu_lock(&lb_channel_mu_);
+ grpc_channel_destroy(lb_channel_);
+ lb_channel_ = nullptr;
+ gpr_mu_unlock(&lb_channel_mu_);
+ }
+ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_REF(error), "grpclb_shutdown");
+ // Clear pending picks.
+ PendingPick* pp;
+ while ((pp = pending_picks_) != nullptr) {
+ pending_picks_ = pp->next;
+ pp->pick->connected_subchannel.reset();
+ // Note: pp is deleted in this callback.
+ GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+//
+// public methods
+//
+
+void XdsLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
+ PendingPick* pp;
+ while ((pp = pending_picks_) != nullptr) {
+ pending_picks_ = pp->next;
+ pp->pick->on_complete = pp->original_on_complete;
+ pp->pick->user_data = nullptr;
+ grpc_error* error = GRPC_ERROR_NONE;
+ if (new_policy->PickLocked(pp->pick, &error)) {
+ // Synchronous return; schedule closure.
+ GRPC_CLOSURE_SCHED(pp->pick->on_complete, error);
+ }
+ Delete(pp);
+ }
+}
+
+// Cancel a specific pending pick.
+//
+// A grpclb pick progresses as follows:
+// - If there's a Round Robin policy (rr_policy_) available, it'll be
+// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
+// that point onwards, it'll be RR's responsibility. For cancellations, that
+// implies the pick needs also be cancelled by the RR instance.
+// - Otherwise, without an RR instance, picks stay pending at this policy's
+// level (grpclb), inside the pending_picks_ list. To cancel these,
+// we invoke the completion closure and set the pick's connected
+// subchannel to nullptr right here.
+void XdsLb::CancelPickLocked(PickState* pick, grpc_error* error) {
+ PendingPick* pp = pending_picks_;
+ pending_picks_ = nullptr;
+ while (pp != nullptr) {
+ PendingPick* next = pp->next;
+ if (pp->pick == pick) {
+ pick->connected_subchannel.reset();
+ // Note: pp is deleted in this callback.
+ GRPC_CLOSURE_SCHED(&pp->on_complete,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Pick Cancelled", &error, 1));
+ } else {
+ pp->next = pending_picks_;
+ pending_picks_ = pp;
+ }
+ pp = next;
+ }
+ if (rr_policy_ != nullptr) {
+ rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error));
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+// Cancel all pending picks.
+//
+// A grpclb pick progresses as follows:
+// - If there's a Round Robin policy (rr_policy_) available, it'll be
+// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
+// that point onwards, it'll be RR's responsibility. For cancellations, that
+// implies the pick needs also be cancelled by the RR instance.
+// - Otherwise, without an RR instance, picks stay pending at this policy's
+// level (grpclb), inside the pending_picks_ list. To cancel these,
+// we invoke the completion closure and set the pick's connected
+// subchannel to nullptr right here.
+void XdsLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error* error) {
+ PendingPick* pp = pending_picks_;
+ pending_picks_ = nullptr;
+ while (pp != nullptr) {
+ PendingPick* next = pp->next;
+ if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq) {
+ // Note: pp is deleted in this callback.
+ GRPC_CLOSURE_SCHED(&pp->on_complete,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Pick Cancelled", &error, 1));
+ } else {
+ pp->next = pending_picks_;
+ pending_picks_ = pp;
+ }
+ pp = next;
+ }
+ if (rr_policy_ != nullptr) {
+ rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
+ initial_metadata_flags_eq,
+ GRPC_ERROR_REF(error));
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+void XdsLb::ExitIdleLocked() {
+ if (!started_picking_) {
+ StartPickingLocked();
+ }
+}
+
+void XdsLb::ResetBackoffLocked() {
+ if (lb_channel_ != nullptr) {
+ grpc_channel_reset_connect_backoff(lb_channel_);
+ }
+ if (rr_policy_ != nullptr) {
+ rr_policy_->ResetBackoffLocked();
+ }
+}
+
+bool XdsLb::PickLocked(PickState* pick, grpc_error** error) {
+ PendingPick* pp = PendingPickCreate(pick);
+ bool pick_done = false;
+ if (rr_policy_ != nullptr) {
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO, "[xdslb %p] about to PICK from RR %p", this,
+ rr_policy_.get());
+ }
+ pick_done =
+ PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error);
+ } else { // rr_policy_ == NULL
+ if (pick->on_complete == nullptr) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "No pick result available but synchronous result required.");
+ pick_done = true;
+ } else {
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[xdslb %p] No RR policy. Adding to grpclb's pending picks",
+ this);
+ }
+ AddPendingPick(pp);
+ if (!started_picking_) {
+ StartPickingLocked();
+ }
+ pick_done = false;
+ }
+ }
+ return pick_done;
+}
+
+void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
+ channelz::ChildRefsList* child_channels) {
+ // delegate to the RoundRobin to fill the children subchannels.
+ rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
+ MutexLock lock(&lb_channel_mu_);
+ if (lb_channel_ != nullptr) {
+ grpc_core::channelz::ChannelNode* channel_node =
+ grpc_channel_get_channelz_node(lb_channel_);
+ if (channel_node != nullptr) {
+ child_channels->push_back(channel_node->uuid());
+ }
+ }
+}
+
+grpc_connectivity_state XdsLb::CheckConnectivityLocked(
+ grpc_error** connectivity_error) {
+ return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
+}
+
+void XdsLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
+ grpc_closure* closure) {
+ grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
+ closure);
+}
+
+void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
+ const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
+ if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
+ // Ignore this update.
+ gpr_log(GPR_ERROR,
+ "[xdslb %p] No valid LB addresses channel arg in update, ignoring.",
+ this);
+ return;
+ }
+ const grpc_lb_addresses* addresses =
+ static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
+ // Update fallback address list.
+ if (fallback_backend_addresses_ != nullptr) {
+ grpc_lb_addresses_destroy(fallback_backend_addresses_);
+ }
+ fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
+ // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
+ // since we use this to trigger the client_load_reporting filter.
+ static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
+ grpc_arg new_arg = grpc_channel_arg_string_create(
+ (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"xds");
+ grpc_channel_args_destroy(args_);
+ args_ = grpc_channel_args_copy_and_add_and_remove(
+ &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
+ // Construct args for balancer channel.
+ grpc_channel_args* lb_channel_args =
+ BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
+ // Create balancer channel if needed.
+ if (lb_channel_ == nullptr) {
+ char* uri_str;
+ gpr_asprintf(&uri_str, "fake:///%s", server_name_);
+ gpr_mu_lock(&lb_channel_mu_);
+ lb_channel_ = grpc_client_channel_factory_create_channel(
+ client_channel_factory(), uri_str,
+ GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
+ gpr_mu_unlock(&lb_channel_mu_);
+ GPR_ASSERT(lb_channel_ != nullptr);
+ gpr_free(uri_str);
+ }
+ // Propagate updates to the LB channel (pick_first) through the fake
+ // resolver.
+ response_generator_->SetResponse(lb_channel_args);
+ grpc_channel_args_destroy(lb_channel_args);
+}
+
+void XdsLb::UpdateLocked(const grpc_channel_args& args) {
+ ProcessChannelArgsLocked(args);
+ // If fallback is configured and the RR policy already exists, update
+ // it with the new fallback addresses.
+ if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) {
+ CreateOrUpdateRoundRobinPolicyLocked();
+ }
+ // Start watching the LB channel connectivity for connection, if not
+ // already doing so.
+ if (!watching_lb_channel_) {
+ lb_channel_connectivity_ = grpc_channel_check_connectivity_state(
+ lb_channel_, true /* try to connect */);
+ grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(lb_channel_));
+ GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
+ watching_lb_channel_ = true;
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ auto self = Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
+ self.release();
+ grpc_client_channel_watch_connectivity_state(
+ client_channel_elem,
+ grpc_polling_entity_create_from_pollset_set(interested_parties()),
+ &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
+ nullptr);
+ }
+}
+
+//
+// code for balancer channel and call
+//
+
+void XdsLb::StartPickingLocked() {
+ // Start a timer to fall back.
+ if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
+ !fallback_timer_callback_pending_) {
+ grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ auto self = Ref(DEBUG_LOCATION, "on_fallback_timer");
+ self.release();
+ GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this,
+ grpc_combiner_scheduler(combiner()));
+ fallback_timer_callback_pending_ = true;
+ grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
+ }
+ started_picking_ = true;
+ StartBalancerCallLocked();
+}
+
+void XdsLb::StartBalancerCallLocked() {
+ GPR_ASSERT(lb_channel_ != nullptr);
+ if (shutting_down_) return;
+ // Init the LB call data.
+ GPR_ASSERT(lb_calld_ == nullptr);
+ lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[xdslb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
+ this, lb_channel_, lb_calld_.get());
+ }
+ lb_calld_->StartQuery();
+}
+
+void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
+ XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
+ xdslb_policy->fallback_timer_callback_pending_ = false;
+ // If we receive a serverlist after the timer fires but before this callback
+ // actually runs, don't fall back.
+ if (xdslb_policy->serverlist_ == nullptr && !xdslb_policy->shutting_down_ &&
+ error == GRPC_ERROR_NONE) {
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO, "[xdslb %p] Falling back to use backends from resolver",
+ xdslb_policy);
+ }
+ GPR_ASSERT(xdslb_policy->fallback_backend_addresses_ != nullptr);
+ xdslb_policy->CreateOrUpdateRoundRobinPolicyLocked();
+ }
+ xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
+}
+
+void XdsLb::StartBalancerCallRetryTimerLocked() {
+ grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO, "[xdslb %p] Connection to LB server lost...", this);
+ grpc_millis timeout = next_try - ExecCtx::Get()->Now();
+ if (timeout > 0) {
+ gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active in %" PRId64 "ms.",
+ this, timeout);
+ } else {
+ gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active immediately.", this);
+ }
+ }
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
+ self.release();
+ GRPC_CLOSURE_INIT(&lb_on_call_retry_, &XdsLb::OnBalancerCallRetryTimerLocked,
+ this, grpc_combiner_scheduler(combiner()));
+ retry_timer_callback_pending_ = true;
+ grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
+}
+
+void XdsLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
+ XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
+ xdslb_policy->retry_timer_callback_pending_ = false;
+ if (!xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
+ xdslb_policy->lb_calld_ == nullptr) {
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO, "[xdslb %p] Restarting call to LB server",
+ xdslb_policy);
+ }
+ xdslb_policy->StartBalancerCallLocked();
+ }
+ xdslb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
+}
+
+// Invoked as part of the update process. It continues watching the LB channel
+// until it shuts down or becomes READY. It's invoked even if the LB channel
+// stayed READY throughout the update (for example if the update is identical).
+void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
+ grpc_error* error) {
+ XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
+ if (xdslb_policy->shutting_down_) goto done;
+ // Re-initialize the lb_call. This should also take care of updating the
+ // embedded RR policy. Note that the current RR policy, if any, will stay in
+ // effect until an update from the new lb_call is received.
+ switch (xdslb_policy->lb_channel_connectivity_) {
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+ // Keep watching the LB channel.
+ grpc_channel_element* client_channel_elem =
+ grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(xdslb_policy->lb_channel_));
+ GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
+ grpc_client_channel_watch_connectivity_state(
+ client_channel_elem,
+ grpc_polling_entity_create_from_pollset_set(
+ xdslb_policy->interested_parties()),
+ &xdslb_policy->lb_channel_connectivity_,
+ &xdslb_policy->lb_channel_on_connectivity_changed_, nullptr);
+ break;
+ }
+ // The LB channel may be IDLE because it's shut down before the update.
+ // Restart the LB call to kick the LB channel into gear.
+ case GRPC_CHANNEL_IDLE:
+ case GRPC_CHANNEL_READY:
+ xdslb_policy->lb_calld_.reset();
+ if (xdslb_policy->started_picking_) {
+ if (xdslb_policy->retry_timer_callback_pending_) {
+ grpc_timer_cancel(&xdslb_policy->lb_call_retry_timer_);
+ }
+ xdslb_policy->lb_call_backoff_.Reset();
+ xdslb_policy->StartBalancerCallLocked();
+ }
+ // Fall through.
+ case GRPC_CHANNEL_SHUTDOWN:
+ done:
+ xdslb_policy->watching_lb_channel_ = false;
+ xdslb_policy->Unref(DEBUG_LOCATION,
+ "watch_lb_channel_connectivity_cb_shutdown");
+ }
+}
+
+//
+// PendingPick
+//
+
+// Adds lb_token of selected subchannel (address) to the call's initial
+// metadata.
+grpc_error* AddLbTokenToInitialMetadata(
+ grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
+ grpc_metadata_batch* initial_metadata) {
+ GPR_ASSERT(lb_token_mdelem_storage != nullptr);
+ GPR_ASSERT(!GRPC_MDISNULL(lb_token));
+ return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
+ lb_token);
+}
+
+// Destroy function used when embedding client stats in call context.
+void DestroyClientStats(void* arg) {
+ static_cast<XdsLbClientStats*>(arg)->Unref();
+}
+
+void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
+ /* if connected_subchannel is nullptr, no pick has been made by the RR
+ * policy (e.g., all addresses failed to connect). There won't be any
+ * user_data/token available */
+ if (pp->pick->connected_subchannel != nullptr) {
+ if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) {
+ AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
+ &pp->pick->lb_token_mdelem_storage,
+ pp->pick->initial_metadata);
+ } else {
+ gpr_log(GPR_ERROR,
+ "[xdslb %p] No LB token for connected subchannel pick %p",
+ pp->xdslb_policy, pp->pick);
+ abort();
+ }
+ // Pass on client stats via context. Passes ownership of the reference.
+ if (pp->client_stats != nullptr) {
+ pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
+ pp->client_stats.release();
+ pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
+ DestroyClientStats;
+ }
+ } else {
+ pp->client_stats.reset();
+ }
+}
+
+/* The \a on_complete closure passed as part of the pick requires keeping a
+ * reference to its associated round robin instance. We wrap this closure in
+ * order to unref the round robin instance upon its invocation */
+void XdsLb::OnPendingPickComplete(void* arg, grpc_error* error) {
+ PendingPick* pp = static_cast<PendingPick*>(arg);
+ PendingPickSetMetadataAndContext(pp);
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
+ Delete(pp);
+}
+
+XdsLb::PendingPick* XdsLb::PendingPickCreate(PickState* pick) {
+ PendingPick* pp = New<PendingPick>();
+ pp->xdslb_policy = this;
+ pp->pick = pick;
+ GRPC_CLOSURE_INIT(&pp->on_complete, &XdsLb::OnPendingPickComplete, pp,
+ grpc_schedule_on_exec_ctx);
+ pp->original_on_complete = pick->on_complete;
+ pick->on_complete = &pp->on_complete;
+ return pp;
+}
+
+void XdsLb::AddPendingPick(PendingPick* pp) {
+ pp->next = pending_picks_;
+ pending_picks_ = pp;
+}
+
+//
+// code for interacting with the RR policy
+//
+
+// Performs a pick over \a rr_policy_. Given that a pick can return
+// immediately (ignoring its completion callback), we need to perform the
+// cleanups this callback would otherwise be responsible for.
+// If \a force_async is true, then we will manually schedule the
+// completion callback even if the pick is available immediately.
+bool XdsLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
+ grpc_error** error) {
+ // Check for drops if we are not using fallback backend addresses.
+ if (serverlist_ != nullptr) {
+ // Look at the index into the serverlist to see if we should drop this call.
+ xds_grpclb_server* server = serverlist_->servers[serverlist_index_++];
+ if (serverlist_index_ == serverlist_->num_servers) {
+ serverlist_index_ = 0; // Wrap-around.
+ }
+ if (server->drop) {
+ // Update client load reporting stats to indicate the number of
+ // dropped calls. Note that we have to do this here instead of in
+ // the client_load_reporting filter, because we do not create a
+ // subchannel call (and therefore no client_load_reporting filter)
+ // for dropped calls.
+ if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
+ lb_calld_->client_stats()->AddCallDroppedLocked(
+ server->load_balance_token);
+ }
+ if (force_async) {
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
+ Delete(pp);
+ return false;
+ }
+ Delete(pp);
+ return true;
+ }
+ }
+ // Set client_stats and user_data.
+ if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
+ pp->client_stats = lb_calld_->client_stats()->Ref();
+ }
+ GPR_ASSERT(pp->pick->user_data == nullptr);
+ pp->pick->user_data = (void**)&pp->lb_token;
+ // Pick via the RR policy.
+ bool pick_done = rr_policy_->PickLocked(pp->pick, error);
+ if (pick_done) {
+ PendingPickSetMetadataAndContext(pp);
+ if (force_async) {
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, *error);
+ *error = GRPC_ERROR_NONE;
+ pick_done = false;
+ }
+ Delete(pp);
+ }
+ // else, the pending pick will be registered and taken care of by the
+ // pending pick list inside the RR policy. Eventually,
+ // OnPendingPickComplete() will be called, which will (among other
+ // things) add the LB token to the call's initial metadata.
+ return pick_done;
+}
+
+void XdsLb::CreateRoundRobinPolicyLocked(const Args& args) {
+ GPR_ASSERT(rr_policy_ == nullptr);
+ rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
+ "round_robin", args);
+ if (GPR_UNLIKELY(rr_policy_ == nullptr)) {
+ gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a RoundRobin policy", this);
+ return;
+ }
+ // TODO(roth): We currently track this ref manually. Once the new
+ // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
+ auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested");
+ self.release();
+ rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_);
+ grpc_error* rr_state_error = nullptr;
+ rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error);
+ // Connectivity state is a function of the RR policy updated/created.
+ UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error);
+ // Add the gRPC LB's interested_parties pollset_set to that of the newly
+ // created RR policy. This will make the RR policy progress upon activity on
+ // gRPC LB, which in turn is tied to the application's call.
+ grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
+ interested_parties());
+ // Subscribe to changes to the connectivity of the new RR.
+ // TODO(roth): We currently track this ref manually. Once the new
+ // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
+ self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed");
+ self.release();
+ rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_,
+ &on_rr_connectivity_changed_);
+ rr_policy_->ExitIdleLocked();
+ // Send pending picks to RR policy.
+ PendingPick* pp;
+ while ((pp = pending_picks_)) {
+ pending_picks_ = pp->next;
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[xdslb %p] Pending pick about to (async) PICK from RR %p", this,
+ rr_policy_.get());
+ }
+ grpc_error* error = GRPC_ERROR_NONE;
+ PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error);
+ }
+}
+
+grpc_channel_args* XdsLb::CreateRoundRobinPolicyArgsLocked() {
+ grpc_lb_addresses* addresses;
+ bool is_backend_from_grpclb_load_balancer = false;
+ if (serverlist_ != nullptr) {
+ GPR_ASSERT(serverlist_->num_servers > 0);
+ addresses = ProcessServerlist(serverlist_);
+ is_backend_from_grpclb_load_balancer = true;
+ } else {
+ // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
+ // received any serverlist from the balancer, we use the fallback backends
+ // returned by the resolver. Note that the fallback backend list may be
+ // empty, in which case the new round_robin policy will keep the requested
+ // picks pending.
+ GPR_ASSERT(fallback_backend_addresses_ != nullptr);
+ addresses = grpc_lb_addresses_copy(fallback_backend_addresses_);
+ }
+ GPR_ASSERT(addresses != nullptr);
+ // Replace the LB addresses in the channel args that we pass down to
+ // the subchannel.
+ static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
+ const grpc_arg args_to_add[] = {
+ grpc_lb_addresses_create_channel_arg(addresses),
+ // A channel arg indicating if the target is a backend inferred from a
+ // grpclb load balancer.
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER),
+ is_backend_from_grpclb_load_balancer),
+ };
+ grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
+ args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
+ GPR_ARRAY_SIZE(args_to_add));
+ grpc_lb_addresses_destroy(addresses);
+ return args;
+}
+
+void XdsLb::CreateOrUpdateRoundRobinPolicyLocked() {
+ if (shutting_down_) return;
+ grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked();
+ GPR_ASSERT(args != nullptr);
+ if (rr_policy_ != nullptr) {
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO, "[xdslb %p] Updating RR policy %p", this,
+ rr_policy_.get());
+ }
+ rr_policy_->UpdateLocked(*args);
+ } else {
+ LoadBalancingPolicy::Args lb_policy_args;
+ lb_policy_args.combiner = combiner();
+ lb_policy_args.client_channel_factory = client_channel_factory();
+ lb_policy_args.args = args;
+ CreateRoundRobinPolicyLocked(lb_policy_args);
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(GPR_INFO, "[xdslb %p] Created new RR policy %p", this,
+ rr_policy_.get());
+ }
+ }
+ grpc_channel_args_destroy(args);
+}
+
+void XdsLb::OnRoundRobinRequestReresolutionLocked(void* arg,
+ grpc_error* error) {
+ XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
+ if (xdslb_policy->shutting_down_ || error != GRPC_ERROR_NONE) {
+ xdslb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested");
+ return;
+ }
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(
+ GPR_INFO,
+ "[xdslb %p] Re-resolution requested from the internal RR policy (%p).",
+ xdslb_policy, xdslb_policy->rr_policy_.get());
+ }
+ // If we are talking to a balancer, we expect to get updated addresses form
+ // the balancer, so we can ignore the re-resolution request from the RR
+ // policy. Otherwise, handle the re-resolution request using the
+ // grpclb policy's original re-resolution closure.
+ if (xdslb_policy->lb_calld_ == nullptr ||
+ !xdslb_policy->lb_calld_->seen_initial_response()) {
+ xdslb_policy->TryReresolutionLocked(&grpc_lb_xds_trace, GRPC_ERROR_NONE);
+ }
+ // Give back the wrapper closure to the RR policy.
+ xdslb_policy->rr_policy_->SetReresolutionClosureLocked(
+ &xdslb_policy->on_rr_request_reresolution_);
+}
+
+void XdsLb::UpdateConnectivityStateFromRoundRobinPolicyLocked(
+ grpc_error* rr_state_error) {
+ const grpc_connectivity_state curr_glb_state =
+ grpc_connectivity_state_check(&state_tracker_);
+ /* The new connectivity status is a function of the previous one and the new
+ * input coming from the status of the RR policy.
+ *
+ * current state (grpclb's)
+ * |
+ * v || I | C | R | TF | SD | <- new state (RR's)
+ * ===++====+=====+=====+======+======+
+ * I || I | C | R | [I] | [I] |
+ * ---++----+-----+-----+------+------+
+ * C || I | C | R | [C] | [C] |
+ * ---++----+-----+-----+------+------+
+ * R || I | C | R | [R] | [R] |
+ * ---++----+-----+-----+------+------+
+ * TF || I | C | R | [TF] | [TF] |
+ * ---++----+-----+-----+------+------+
+ * SD || NA | NA | NA | NA | NA | (*)
+ * ---++----+-----+-----+------+------+
+ *
+ * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
+ * is the current state of grpclb, which is left untouched.
+ *
+ * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
+ * the previous RR instance.
+ *
+ * Note that the status is never updated to SHUTDOWN as a result of calling
+ * this function. Only glb_shutdown() has the power to set that state.
+ *
+ * (*) This function mustn't be called during shutting down. */
+ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
+ switch (rr_connectivity_state_) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
+ break;
+ case GRPC_CHANNEL_IDLE:
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_READY:
+ GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
+ }
+ if (grpc_lb_xds_trace.enabled()) {
+ gpr_log(
+ GPR_INFO,
+ "[xdslb %p] Setting grpclb's state to %s from new RR policy %p state.",
+ this, grpc_connectivity_state_name(rr_connectivity_state_),
+ rr_policy_.get());
+ }
+ grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_,
+ rr_state_error,
+ "update_lb_connectivity_status_locked");
+}
+
+void XdsLb::OnRoundRobinConnectivityChangedLocked(void* arg,
+ grpc_error* error) {
+ XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
+ if (xdslb_policy->shutting_down_) {
+ xdslb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed");
+ return;
+ }
+ xdslb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked(
+ GRPC_ERROR_REF(error));
+ // Resubscribe. Reuse the "on_rr_connectivity_changed" ref.
+ xdslb_policy->rr_policy_->NotifyOnStateChangeLocked(
+ &xdslb_policy->rr_connectivity_state_,
+ &xdslb_policy->on_rr_connectivity_changed_);
+}
+
+//
+// factory
+//
+
+class XdsFactory : public LoadBalancingPolicyFactory {
+ public:
+ OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+ const LoadBalancingPolicy::Args& args) const override {
+ /* Count the number of gRPC-LB addresses. There must be at least one. */
+ const grpc_arg* arg =
+ grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
+ return nullptr;
+ }
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ }
+ if (num_grpclb_addrs == 0) return nullptr;
+ return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(addresses, args));
+ }
+
+ const char* name() const override { return "xds"; }
+};
+
+} // namespace
+
+} // namespace grpc_core
+
+//
+// Plugin registration
+//
+
+void grpc_lb_policy_xds_init() {
+ grpc_core::LoadBalancingPolicyRegistry::Builder::
+ RegisterLoadBalancingPolicyFactory(
+ grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
+ grpc_core::New<grpc_core::XdsFactory>()));
+}
+
+void grpc_lb_policy_xds_shutdown() {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds.h
new file mode 100644
index 0000000000..8b20680f2d
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H
+
+#include <grpc/support/port_platform.h>
+
+/** Channel arg indicating if a target corresponding to the address is grpclb
+ * loadbalancer. The type of this arg is an integer and the value is treated as
+ * a bool. */
+#define GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER \
+ "grpc.address_is_xds_load_balancer"
+/** Channel arg indicating if a target corresponding to the address is a backend
+ * received from a balancer. The type of this arg is an integer and the value is
+ * treated as a bool. */
+#define GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER \
+ "grpc.address_is_backend_from_xds_load_balancer"
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H \
+ */
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.cc
new file mode 100644
index 0000000000..0aa145a24e
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.cc
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h"
+
+grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args(
+ grpc_channel_args* args) {
+ return args;
+}
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h
new file mode 100644
index 0000000000..32c4acc8a3
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CHANNEL_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CHANNEL_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+
+/// Makes any necessary modifications to \a args for use in the xds
+/// balancer channel.
+///
+/// Takes ownership of \a args.
+///
+/// Caller takes ownership of the returned args.
+grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args(
+ grpc_channel_args* args);
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CHANNEL_H \
+ */
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc
new file mode 100644
index 0000000000..5ab72efce4
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc
@@ -0,0 +1,107 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+#include <string.h>
+
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/security/credentials/credentials.h"
+#include "src/core/lib/security/transport/target_authority_table.h"
+#include "src/core/lib/slice/slice_internal.h"
+
+namespace grpc_core {
+namespace {
+
+int BalancerNameCmp(const grpc_core::UniquePtr<char>& a,
+ const grpc_core::UniquePtr<char>& b) {
+ return strcmp(a.get(), b.get());
+}
+
+RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable(
+ grpc_lb_addresses* addresses) {
+ TargetAuthorityTable::Entry* target_authority_entries =
+ static_cast<TargetAuthorityTable::Entry*>(gpr_zalloc(
+ sizeof(*target_authority_entries) * addresses->num_addresses));
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ char* addr_str;
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_str, &addresses->addresses[i].address, true) > 0);
+ target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str);
+ target_authority_entries[i].value.reset(
+ gpr_strdup(addresses->addresses[i].balancer_name));
+ gpr_free(addr_str);
+ }
+ RefCountedPtr<TargetAuthorityTable> target_authority_table =
+ TargetAuthorityTable::Create(addresses->num_addresses,
+ target_authority_entries, BalancerNameCmp);
+ gpr_free(target_authority_entries);
+ return target_authority_table;
+}
+
+} // namespace
+} // namespace grpc_core
+
+grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args(
+ grpc_channel_args* args) {
+ const char* args_to_remove[1];
+ size_t num_args_to_remove = 0;
+ grpc_arg args_to_add[2];
+ size_t num_args_to_add = 0;
+ // Add arg for targets info table.
+ const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES);
+ GPR_ASSERT(arg != nullptr);
+ GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
+ grpc_lb_addresses* addresses =
+ static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
+ grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable>
+ target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses);
+ args_to_add[num_args_to_add++] =
+ grpc_core::CreateTargetAuthorityTableChannelArg(
+ target_authority_table.get());
+ // Substitute the channel credentials with a version without call
+ // credentials: the load balancer is not necessarily trusted to handle
+ // bearer token credentials.
+ grpc_channel_credentials* channel_credentials =
+ grpc_channel_credentials_find_in_args(args);
+ grpc_channel_credentials* creds_sans_call_creds = nullptr;
+ if (channel_credentials != nullptr) {
+ creds_sans_call_creds =
+ grpc_channel_credentials_duplicate_without_call_credentials(
+ channel_credentials);
+ GPR_ASSERT(creds_sans_call_creds != nullptr);
+ args_to_remove[num_args_to_remove++] = GRPC_ARG_CHANNEL_CREDENTIALS;
+ args_to_add[num_args_to_add++] =
+ grpc_channel_credentials_to_arg(creds_sans_call_creds);
+ }
+ grpc_channel_args* result = grpc_channel_args_copy_and_add_and_remove(
+ args, args_to_remove, num_args_to_remove, args_to_add, num_args_to_add);
+ // Clean up.
+ grpc_channel_args_destroy(args);
+ if (creds_sans_call_creds != nullptr) {
+ grpc_channel_credentials_unref(creds_sans_call_creds);
+ }
+ return result;
+}
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc
new file mode 100644
index 0000000000..cdf5408be3
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h"
+
+#include <grpc/support/atm.h>
+#include <grpc/support/string_util.h>
+#include <string.h>
+
+namespace grpc_core {
+
+void XdsLbClientStats::AddCallStarted() {
+ gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1);
+}
+
+void XdsLbClientStats::AddCallFinished(bool finished_with_client_failed_to_send,
+ bool finished_known_received) {
+ gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1);
+ if (finished_with_client_failed_to_send) {
+ gpr_atm_full_fetch_add(&num_calls_finished_with_client_failed_to_send_,
+ (gpr_atm)1);
+ }
+ if (finished_known_received) {
+ gpr_atm_full_fetch_add(&num_calls_finished_known_received_, (gpr_atm)1);
+ }
+}
+
+void XdsLbClientStats::AddCallDroppedLocked(char* token) {
+ // Increment num_calls_started and num_calls_finished.
+ gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1);
+ gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1);
+ // Record the drop.
+ if (drop_token_counts_ == nullptr) {
+ drop_token_counts_.reset(New<DroppedCallCounts>());
+ }
+ for (size_t i = 0; i < drop_token_counts_->size(); ++i) {
+ if (strcmp((*drop_token_counts_)[i].token.get(), token) == 0) {
+ ++(*drop_token_counts_)[i].count;
+ return;
+ }
+ }
+ // Not found, so add a new entry.
+ drop_token_counts_->emplace_back(UniquePtr<char>(gpr_strdup(token)), 1);
+}
+
+namespace {
+
+void AtomicGetAndResetCounter(int64_t* value, gpr_atm* counter) {
+ *value = static_cast<int64_t>(gpr_atm_full_xchg(counter, (gpr_atm)0));
+}
+
+} // namespace
+
+void XdsLbClientStats::GetLocked(
+ int64_t* num_calls_started, int64_t* num_calls_finished,
+ int64_t* num_calls_finished_with_client_failed_to_send,
+ int64_t* num_calls_finished_known_received,
+ UniquePtr<DroppedCallCounts>* drop_token_counts) {
+ AtomicGetAndResetCounter(num_calls_started, &num_calls_started_);
+ AtomicGetAndResetCounter(num_calls_finished, &num_calls_finished_);
+ AtomicGetAndResetCounter(num_calls_finished_with_client_failed_to_send,
+ &num_calls_finished_with_client_failed_to_send_);
+ AtomicGetAndResetCounter(num_calls_finished_known_received,
+ &num_calls_finished_known_received_);
+ *drop_token_counts = std::move(drop_token_counts_);
+}
+
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h
new file mode 100644
index 0000000000..fa0b9f4b63
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CLIENT_STATS_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CLIENT_STATS_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/support/atm.h>
+
+#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/gprpp/ref_counted.h"
+
+namespace grpc_core {
+
+class XdsLbClientStats : public RefCounted<XdsLbClientStats> {
+ public:
+ struct DropTokenCount {
+ UniquePtr<char> token;
+ int64_t count;
+
+ DropTokenCount(UniquePtr<char> token, int64_t count)
+ : token(std::move(token)), count(count) {}
+ };
+
+ typedef InlinedVector<DropTokenCount, 10> DroppedCallCounts;
+
+ XdsLbClientStats() {}
+
+ void AddCallStarted();
+ void AddCallFinished(bool finished_with_client_failed_to_send,
+ bool finished_known_received);
+
+ // This method is not thread-safe; caller must synchronize.
+ void AddCallDroppedLocked(char* token);
+
+ // This method is not thread-safe; caller must synchronize.
+ void GetLocked(int64_t* num_calls_started, int64_t* num_calls_finished,
+ int64_t* num_calls_finished_with_client_failed_to_send,
+ int64_t* num_calls_finished_known_received,
+ UniquePtr<DroppedCallCounts>* drop_token_counts);
+
+ private:
+ // This field must only be accessed via *_locked() methods.
+ UniquePtr<DroppedCallCounts> drop_token_counts_;
+ // These fields may be accessed from multiple threads at a time.
+ gpr_atm num_calls_started_ = 0;
+ gpr_atm num_calls_finished_ = 0;
+ gpr_atm num_calls_finished_with_client_failed_to_send_ = 0;
+ gpr_atm num_calls_finished_known_received_ = 0;
+};
+
+} // namespace grpc_core
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CLIENT_STATS_H \
+ */
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc
new file mode 100644
index 0000000000..79b7bdbe33
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc
@@ -0,0 +1,307 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "pb_decode.h"
+#include "pb_encode.h"
+#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h"
+
+#include <grpc/support/alloc.h>
+
+/* invoked once for every Server in ServerList */
+static bool count_serverlist(pb_istream_t* stream, const pb_field_t* field,
+ void** arg) {
+ xds_grpclb_serverlist* sl = static_cast<xds_grpclb_serverlist*>(*arg);
+ xds_grpclb_server server;
+ if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, &server))) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
+ return false;
+ }
+ ++sl->num_servers;
+ return true;
+}
+
+typedef struct decode_serverlist_arg {
+ /* The decoding callback is invoked once per server in serverlist. Remember
+ * which index of the serverlist are we currently decoding */
+ size_t decoding_idx;
+ /* The decoded serverlist */
+ xds_grpclb_serverlist* serverlist;
+} decode_serverlist_arg;
+
+/* invoked once for every Server in ServerList */
+static bool decode_serverlist(pb_istream_t* stream, const pb_field_t* field,
+ void** arg) {
+ decode_serverlist_arg* dec_arg = static_cast<decode_serverlist_arg*>(*arg);
+ GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx);
+ xds_grpclb_server* server =
+ static_cast<xds_grpclb_server*>(gpr_zalloc(sizeof(xds_grpclb_server)));
+ if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, server))) {
+ gpr_free(server);
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
+ return false;
+ }
+ dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server;
+ return true;
+}
+
+xds_grpclb_request* xds_grpclb_request_create(const char* lb_service_name) {
+ xds_grpclb_request* req =
+ static_cast<xds_grpclb_request*>(gpr_malloc(sizeof(xds_grpclb_request)));
+ req->has_client_stats = false;
+ req->has_initial_request = true;
+ req->initial_request.has_name = true;
+ strncpy(req->initial_request.name, lb_service_name,
+ XDS_SERVICE_NAME_MAX_LENGTH);
+ return req;
+}
+
+static void populate_timestamp(gpr_timespec timestamp,
+ xds_grpclb_timestamp* timestamp_pb) {
+ timestamp_pb->has_seconds = true;
+ timestamp_pb->seconds = timestamp.tv_sec;
+ timestamp_pb->has_nanos = true;
+ timestamp_pb->nanos = timestamp.tv_nsec;
+}
+
+static bool encode_string(pb_ostream_t* stream, const pb_field_t* field,
+ void* const* arg) {
+ char* str = static_cast<char*>(*arg);
+ if (!pb_encode_tag_for_field(stream, field)) return false;
+ return pb_encode_string(stream, reinterpret_cast<uint8_t*>(str), strlen(str));
+}
+
+static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field,
+ void* const* arg) {
+ grpc_core::XdsLbClientStats::DroppedCallCounts* drop_entries =
+ static_cast<grpc_core::XdsLbClientStats::DroppedCallCounts*>(*arg);
+ if (drop_entries == nullptr) return true;
+ for (size_t i = 0; i < drop_entries->size(); ++i) {
+ if (!pb_encode_tag_for_field(stream, field)) return false;
+ grpc_lb_v1_ClientStatsPerToken drop_message;
+ drop_message.load_balance_token.funcs.encode = encode_string;
+ drop_message.load_balance_token.arg = (*drop_entries)[i].token.get();
+ drop_message.has_num_calls = true;
+ drop_message.num_calls = (*drop_entries)[i].count;
+ if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
+ &drop_message)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+xds_grpclb_request* xds_grpclb_load_report_request_create_locked(
+ grpc_core::XdsLbClientStats* client_stats) {
+ xds_grpclb_request* req =
+ static_cast<xds_grpclb_request*>(gpr_zalloc(sizeof(xds_grpclb_request)));
+ req->has_client_stats = true;
+ req->client_stats.has_timestamp = true;
+ populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp);
+ req->client_stats.has_num_calls_started = true;
+ req->client_stats.has_num_calls_finished = true;
+ req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
+ req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
+ req->client_stats.has_num_calls_finished_known_received = true;
+ req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
+ grpc_core::UniquePtr<grpc_core::XdsLbClientStats::DroppedCallCounts>
+ drop_counts;
+ client_stats->GetLocked(
+ &req->client_stats.num_calls_started,
+ &req->client_stats.num_calls_finished,
+ &req->client_stats.num_calls_finished_with_client_failed_to_send,
+ &req->client_stats.num_calls_finished_known_received, &drop_counts);
+ // Will be deleted in xds_grpclb_request_destroy().
+ req->client_stats.calls_finished_with_drop.arg = drop_counts.release();
+ return req;
+}
+
+grpc_slice xds_grpclb_request_encode(const xds_grpclb_request* request) {
+ size_t encoded_length;
+ pb_ostream_t sizestream;
+ pb_ostream_t outputstream;
+ grpc_slice slice;
+ memset(&sizestream, 0, sizeof(pb_ostream_t));
+ pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request);
+ encoded_length = sizestream.bytes_written;
+
+ slice = GRPC_SLICE_MALLOC(encoded_length);
+ outputstream =
+ pb_ostream_from_buffer(GRPC_SLICE_START_PTR(slice), encoded_length);
+ GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields,
+ request) != 0);
+ return slice;
+}
+
+void xds_grpclb_request_destroy(xds_grpclb_request* request) {
+ if (request->has_client_stats) {
+ grpc_core::XdsLbClientStats::DroppedCallCounts* drop_entries =
+ static_cast<grpc_core::XdsLbClientStats::DroppedCallCounts*>(
+ request->client_stats.calls_finished_with_drop.arg);
+ grpc_core::Delete(drop_entries);
+ }
+ gpr_free(request);
+}
+
+typedef grpc_lb_v1_LoadBalanceResponse xds_grpclb_response;
+xds_grpclb_initial_response* xds_grpclb_initial_response_parse(
+ grpc_slice encoded_xds_grpclb_response) {
+ pb_istream_t stream =
+ pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_xds_grpclb_response),
+ GRPC_SLICE_LENGTH(encoded_xds_grpclb_response));
+ xds_grpclb_response res;
+ memset(&res, 0, sizeof(xds_grpclb_response));
+ if (GPR_UNLIKELY(
+ !pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res))) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
+ return nullptr;
+ }
+
+ if (!res.has_initial_response) return nullptr;
+
+ xds_grpclb_initial_response* initial_res =
+ static_cast<xds_grpclb_initial_response*>(
+ gpr_malloc(sizeof(xds_grpclb_initial_response)));
+ memcpy(initial_res, &res.initial_response,
+ sizeof(xds_grpclb_initial_response));
+
+ return initial_res;
+}
+
+xds_grpclb_serverlist* xds_grpclb_response_parse_serverlist(
+ grpc_slice encoded_xds_grpclb_response) {
+ pb_istream_t stream =
+ pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_xds_grpclb_response),
+ GRPC_SLICE_LENGTH(encoded_xds_grpclb_response));
+ pb_istream_t stream_at_start = stream;
+ xds_grpclb_serverlist* sl = static_cast<xds_grpclb_serverlist*>(
+ gpr_zalloc(sizeof(xds_grpclb_serverlist)));
+ xds_grpclb_response res;
+ memset(&res, 0, sizeof(xds_grpclb_response));
+ // First pass: count number of servers.
+ res.server_list.servers.funcs.decode = count_serverlist;
+ res.server_list.servers.arg = sl;
+ bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
+ if (GPR_UNLIKELY(!status)) {
+ gpr_free(sl);
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
+ return nullptr;
+ }
+ // Second pass: populate servers.
+ if (sl->num_servers > 0) {
+ sl->servers = static_cast<xds_grpclb_server**>(
+ gpr_zalloc(sizeof(xds_grpclb_server*) * sl->num_servers));
+ decode_serverlist_arg decode_arg;
+ memset(&decode_arg, 0, sizeof(decode_arg));
+ decode_arg.serverlist = sl;
+ res.server_list.servers.funcs.decode = decode_serverlist;
+ res.server_list.servers.arg = &decode_arg;
+ status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields,
+ &res);
+ if (GPR_UNLIKELY(!status)) {
+ xds_grpclb_destroy_serverlist(sl);
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
+ return nullptr;
+ }
+ }
+ return sl;
+}
+
+void xds_grpclb_destroy_serverlist(xds_grpclb_serverlist* serverlist) {
+ if (serverlist == nullptr) {
+ return;
+ }
+ for (size_t i = 0; i < serverlist->num_servers; i++) {
+ gpr_free(serverlist->servers[i]);
+ }
+ gpr_free(serverlist->servers);
+ gpr_free(serverlist);
+}
+
+xds_grpclb_serverlist* xds_grpclb_serverlist_copy(
+ const xds_grpclb_serverlist* sl) {
+ xds_grpclb_serverlist* copy = static_cast<xds_grpclb_serverlist*>(
+ gpr_zalloc(sizeof(xds_grpclb_serverlist)));
+ copy->num_servers = sl->num_servers;
+ copy->servers = static_cast<xds_grpclb_server**>(
+ gpr_malloc(sizeof(xds_grpclb_server*) * sl->num_servers));
+ for (size_t i = 0; i < sl->num_servers; i++) {
+ copy->servers[i] =
+ static_cast<xds_grpclb_server*>(gpr_malloc(sizeof(xds_grpclb_server)));
+ memcpy(copy->servers[i], sl->servers[i], sizeof(xds_grpclb_server));
+ }
+ return copy;
+}
+
+bool xds_grpclb_serverlist_equals(const xds_grpclb_serverlist* lhs,
+ const xds_grpclb_serverlist* rhs) {
+ if (lhs == nullptr || rhs == nullptr) {
+ return false;
+ }
+ if (lhs->num_servers != rhs->num_servers) {
+ return false;
+ }
+ for (size_t i = 0; i < lhs->num_servers; i++) {
+ if (!xds_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool xds_grpclb_server_equals(const xds_grpclb_server* lhs,
+ const xds_grpclb_server* rhs) {
+ return memcmp(lhs, rhs, sizeof(xds_grpclb_server)) == 0;
+}
+
+int xds_grpclb_duration_compare(const xds_grpclb_duration* lhs,
+ const xds_grpclb_duration* rhs) {
+ GPR_ASSERT(lhs && rhs);
+ if (lhs->has_seconds && rhs->has_seconds) {
+ if (lhs->seconds < rhs->seconds) return -1;
+ if (lhs->seconds > rhs->seconds) return 1;
+ } else if (lhs->has_seconds) {
+ return 1;
+ } else if (rhs->has_seconds) {
+ return -1;
+ }
+
+ GPR_ASSERT(lhs->seconds == rhs->seconds);
+ if (lhs->has_nanos && rhs->has_nanos) {
+ if (lhs->nanos < rhs->nanos) return -1;
+ if (lhs->nanos > rhs->nanos) return 1;
+ } else if (lhs->has_nanos) {
+ return 1;
+ } else if (rhs->has_nanos) {
+ return -1;
+ }
+
+ return 0;
+}
+
+grpc_millis xds_grpclb_duration_to_millis(xds_grpclb_duration* duration_pb) {
+ return static_cast<grpc_millis>(
+ (duration_pb->has_seconds ? duration_pb->seconds : 0) * GPR_MS_PER_SEC +
+ (duration_pb->has_nanos ? duration_pb->nanos : 0) / GPR_NS_PER_MS);
+}
+
+void xds_grpclb_initial_response_destroy(
+ xds_grpclb_initial_response* response) {
+ gpr_free(response);
+}
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h
new file mode 100644
index 0000000000..9d08defa7e
--- /dev/null
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_LOAD_BALANCER_API_H
+#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_LOAD_BALANCER_API_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/slice_buffer.h>
+
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
+#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h"
+#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+
+#define XDS_SERVICE_NAME_MAX_LENGTH 128
+
+typedef grpc_lb_v1_Server_ip_address_t xds_grpclb_ip_address;
+typedef grpc_lb_v1_LoadBalanceRequest xds_grpclb_request;
+typedef grpc_lb_v1_InitialLoadBalanceResponse xds_grpclb_initial_response;
+typedef grpc_lb_v1_Server xds_grpclb_server;
+typedef google_protobuf_Duration xds_grpclb_duration;
+typedef google_protobuf_Timestamp xds_grpclb_timestamp;
+
+typedef struct {
+ xds_grpclb_server** servers;
+ size_t num_servers;
+} xds_grpclb_serverlist;
+
+/** Create a request for a gRPC LB service under \a lb_service_name */
+xds_grpclb_request* xds_grpclb_request_create(const char* lb_service_name);
+xds_grpclb_request* xds_grpclb_load_report_request_create_locked(
+ grpc_core::XdsLbClientStats* client_stats);
+
+/** Protocol Buffers v3-encode \a request */
+grpc_slice xds_grpclb_request_encode(const xds_grpclb_request* request);
+
+/** Destroy \a request */
+void xds_grpclb_request_destroy(xds_grpclb_request* request);
+
+/** Parse (ie, decode) the bytes in \a encoded_xds_grpclb_response as a \a
+ * xds_grpclb_initial_response */
+xds_grpclb_initial_response* xds_grpclb_initial_response_parse(
+ grpc_slice encoded_xds_grpclb_response);
+
+/** Parse the list of servers from an encoded \a xds_grpclb_response */
+xds_grpclb_serverlist* xds_grpclb_response_parse_serverlist(
+ grpc_slice encoded_xds_grpclb_response);
+
+/** Return a copy of \a sl. The caller is responsible for calling \a
+ * xds_grpclb_destroy_serverlist on the returned copy. */
+xds_grpclb_serverlist* xds_grpclb_serverlist_copy(
+ const xds_grpclb_serverlist* sl);
+
+bool xds_grpclb_serverlist_equals(const xds_grpclb_serverlist* lhs,
+ const xds_grpclb_serverlist* rhs);
+
+bool xds_grpclb_server_equals(const xds_grpclb_server* lhs,
+ const xds_grpclb_server* rhs);
+
+/** Destroy \a serverlist */
+void xds_grpclb_destroy_serverlist(xds_grpclb_serverlist* serverlist);
+
+/** Compare \a lhs against \a rhs and return 0 if \a lhs and \a rhs are equal,
+ * < 0 if \a lhs represents a duration shorter than \a rhs and > 0 otherwise */
+int xds_grpclb_duration_compare(const xds_grpclb_duration* lhs,
+ const xds_grpclb_duration* rhs);
+
+grpc_millis xds_grpclb_duration_to_millis(xds_grpclb_duration* duration_pb);
+
+/** Destroy \a initial_response */
+void xds_grpclb_initial_response_destroy(xds_grpclb_initial_response* response);
+
+#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_LOAD_BALANCER_API_H \
+ */
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index 62bdbf2689..a59deadb26 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -25,7 +25,7 @@
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
-#include "src/core/ext/filters/client_channel/uri_parser.h"
+#include "src/core/lib/uri/uri_parser.h"
//
// representation of an LB address
diff --git a/src/core/ext/filters/client_channel/parse_address.h b/src/core/ext/filters/client_channel/parse_address.h
index c2af0e6c49..5c050a2333 100644
--- a/src/core/ext/filters/client_channel/parse_address.h
+++ b/src/core/ext/filters/client_channel/parse_address.h
@@ -23,8 +23,8 @@
#include <stddef.h>
-#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/uri/uri_parser.h"
/** Populate \a resolved_addr from \a uri, whose path is expected to contain a
* unix socket path. Returns true upon success. */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index dfa52867d8..01796ca08f 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -201,7 +201,7 @@ void AresDnsResolver::ShutdownLocked() {
grpc_timer_cancel(&next_resolution_timer_);
}
if (pending_request_ != nullptr) {
- grpc_cancel_ares_request(pending_request_);
+ grpc_cancel_ares_request_locked(pending_request_);
}
if (next_completion_ != nullptr) {
*target_result_ = nullptr;
@@ -298,6 +298,7 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
grpc_channel_args* result = nullptr;
GPR_ASSERT(r->resolving_);
r->resolving_ = false;
+ gpr_free(r->pending_request_);
r->pending_request_ = nullptr;
if (r->lb_addresses_ != nullptr) {
static const char* args_to_remove[2];
@@ -473,7 +474,9 @@ void grpc_resolver_dns_ares_init() {
GRPC_LOG_IF_ERROR("ares_library_init() failed", error);
return;
}
- default_resolver = grpc_resolve_address_impl;
+ if (default_resolver == nullptr) {
+ default_resolver = grpc_resolve_address_impl;
+ }
grpc_set_resolver_impl(&ares_resolver);
grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
grpc_core::UniquePtr<grpc_core::ResolverFactory>(
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index 4c795c34c8..582e2203fc 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -144,12 +144,12 @@ static void grpc_ares_request_unref_locked(grpc_ares_request* r) {
void grpc_ares_complete_request_locked(grpc_ares_request* r) {
/* Invoke on_done callback and destroy the
request */
+ r->ev_driver = nullptr;
grpc_lb_addresses* lb_addrs = *(r->lb_addrs_out);
if (lb_addrs != nullptr) {
grpc_cares_wrapper_address_sorting_sort(lb_addrs);
}
GRPC_CLOSURE_SCHED(r->on_done, r->error);
- gpr_free(r);
}
static grpc_ares_hostbyname_request* create_hostbyname_request_locked(
@@ -356,15 +356,12 @@ done:
grpc_ares_request_unref_locked(r);
}
-static grpc_ares_request*
-grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
- const char* dns_server, const char* name, const char* default_port,
- grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
- grpc_combiner* combiner) {
+void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
+ grpc_ares_request* r, const char* dns_server, const char* name,
+ const char* default_port, grpc_pollset_set* interested_parties,
+ bool check_grpclb, grpc_combiner* combiner) {
grpc_error* error = GRPC_ERROR_NONE;
grpc_ares_hostbyname_request* hr = nullptr;
- grpc_ares_request* r = nullptr;
ares_channel* channel = nullptr;
/* TODO(zyc): Enable tracing after #9603 is checked in */
/* if (grpc_dns_trace) {
@@ -390,14 +387,6 @@ grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
}
port = gpr_strdup(default_port);
}
- r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request)));
- r->ev_driver = nullptr;
- r->on_done = on_done;
- r->lb_addrs_out = addrs;
- r->service_config_json_out = service_config_json;
- r->success = false;
- r->error = GRPC_ERROR_NONE;
- r->pending_queries = 0;
error = grpc_ares_ev_driver_create_locked(&r->ev_driver, interested_parties,
combiner, r);
if (error != GRPC_ERROR_NONE) goto error_cleanup;
@@ -458,7 +447,7 @@ grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
on_srv_query_done_locked, r);
gpr_free(service_name);
}
- if (service_config_json != nullptr) {
+ if (r->service_config_json_out != nullptr) {
grpc_ares_request_ref_locked(r);
char* config_name;
gpr_asprintf(&config_name, "_grpc_config.%s", host);
@@ -470,14 +459,12 @@ grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
grpc_ares_request_unref_locked(r);
gpr_free(host);
gpr_free(port);
- return r;
+ return;
error_cleanup:
- GRPC_CLOSURE_SCHED(on_done, error);
- gpr_free(r);
+ GRPC_CLOSURE_SCHED(r->on_done, error);
gpr_free(host);
gpr_free(port);
- return nullptr;
}
static bool inner_resolve_as_ip_literal_locked(const char* name,
@@ -536,21 +523,31 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
grpc_combiner* combiner) {
+ grpc_ares_request* r =
+ static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request)));
+ r->ev_driver = nullptr;
+ r->on_done = on_done;
+ r->lb_addrs_out = addrs;
+ r->service_config_json_out = service_config_json;
+ r->success = false;
+ r->error = GRPC_ERROR_NONE;
+ r->pending_queries = 0;
// Early out if the target is an ipv4 or ipv6 literal.
if (resolve_as_ip_literal_locked(name, default_port, addrs)) {
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
- return nullptr;
+ return r;
}
// Early out if the target is localhost and we're on Windows.
if (grpc_ares_maybe_resolve_localhost_manually_locked(name, default_port,
addrs)) {
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
- return nullptr;
+ return r;
}
// Look up name using c-ares lib.
- return grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
- dns_server, name, default_port, interested_parties, on_done, addrs,
- check_grpclb, service_config_json, combiner);
+ grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
+ r, dns_server, name, default_port, interested_parties, check_grpclb,
+ combiner);
+ return r;
}
grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
@@ -559,14 +556,16 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
-void grpc_cancel_ares_request(grpc_ares_request* r) {
- if (grpc_dns_lookup_ares_locked == grpc_dns_lookup_ares_locked_impl) {
- if (r != nullptr) {
- grpc_ares_ev_driver_shutdown_locked(r->ev_driver);
- }
+static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {
+ GPR_ASSERT(r != nullptr);
+ if (r->ev_driver != nullptr) {
+ grpc_ares_ev_driver_shutdown_locked(r->ev_driver);
}
}
+void (*grpc_cancel_ares_request_locked)(grpc_ares_request* r) =
+ grpc_cancel_ares_request_locked_impl;
+
grpc_error* grpc_ares_init(void) {
gpr_once_init(&g_basic_init, do_basic_init);
gpr_mu_lock(&g_init_mu);
@@ -603,20 +602,23 @@ typedef struct grpc_resolve_address_ares_request {
grpc_lb_addresses* lb_addrs;
/** closure to call when the resolve_address_ares request completes */
grpc_closure* on_resolve_address_done;
- /** a closure wrapping on_dns_lookup_done_cb, which should be invoked when the
- grpc_dns_lookup_ares_locked operation is done. */
- grpc_closure on_dns_lookup_done;
+ /** a closure wrapping on_resolve_address_done, which should be invoked when
+ the grpc_dns_lookup_ares_locked operation is done. */
+ grpc_closure on_dns_lookup_done_locked;
/* target name */
const char* name;
/* default port to use if none is specified */
const char* default_port;
/* pollset_set to be driven by */
grpc_pollset_set* interested_parties;
+ /* underlying ares_request that the query is performed on */
+ grpc_ares_request* ares_request;
} grpc_resolve_address_ares_request;
-static void on_dns_lookup_done_cb(void* arg, grpc_error* error) {
+static void on_dns_lookup_done_locked(void* arg, grpc_error* error) {
grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>(arg);
+ gpr_free(r->ares_request);
grpc_resolved_addresses** resolved_addresses = r->addrs_out;
if (r->lb_addrs == nullptr || r->lb_addrs->num_addresses == 0) {
*resolved_addresses = nullptr;
@@ -643,9 +645,9 @@ static void grpc_resolve_address_invoke_dns_lookup_ares_locked(
void* arg, grpc_error* unused_error) {
grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>(arg);
- grpc_dns_lookup_ares_locked(
+ r->ares_request = grpc_dns_lookup_ares_locked(
nullptr /* dns_server */, r->name, r->default_port, r->interested_parties,
- &r->on_dns_lookup_done, &r->lb_addrs, false /* check_grpclb */,
+ &r->on_dns_lookup_done_locked, &r->lb_addrs, false /* check_grpclb */,
nullptr /* service_config_json */, r->combiner);
}
@@ -660,8 +662,8 @@ static void grpc_resolve_address_ares_impl(const char* name,
r->combiner = grpc_combiner_create();
r->addrs_out = addrs;
r->on_resolve_address_done = on_done;
- GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r,
- grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked, on_dns_lookup_done_locked, r,
+ grpc_combiner_scheduler(r->combiner));
r->name = name;
r->default_port = default_port;
r->interested_parties = interested_parties;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
index 1bc457d4cf..a1231cc4e0 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -54,7 +54,8 @@ extern void (*grpc_resolve_address_ares)(const char* name,
port in \a name. grpc_ares_init() must be called at least once before this
function. \a on_done may be called directly in this function without being
scheduled with \a exec_ctx, so it must not try to acquire locks that are
- being held by the caller. */
+ being held by the caller. The returned grpc_ares_request object is owned
+ by the caller and it is safe to free after on_done is called back. */
extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
@@ -62,7 +63,7 @@ extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
char** service_config_json, grpc_combiner* combiner);
/* Cancel the pending grpc_ares_request \a request */
-void grpc_cancel_ares_request(grpc_ares_request* request);
+extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request);
/* Initialize gRPC ares wrapper. Must be called at least once before
grpc_resolve_address_ares(). */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
index d6a76fc8b6..9f293c1ac0 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
@@ -40,7 +40,10 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
-void grpc_cancel_ares_request(grpc_ares_request* r) {}
+static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {}
+
+void (*grpc_cancel_ares_request_locked)(grpc_ares_request* r) =
+ grpc_cancel_ares_request_locked_impl;
grpc_error* grpc_ares_init(void) { return GRPC_ERROR_NONE; }
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
index 708eaf1147..74a3062e7f 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
@@ -20,9 +20,9 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
-#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/ref_counted.h"
+#include "src/core/lib/uri/uri_parser.h"
#define GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR \
"grpc.fake_resolver.response_generator"
diff --git a/src/core/ext/filters/client_channel/resolver_factory.h b/src/core/ext/filters/client_channel/resolver_factory.h
index ee3cfeeb9b..d891ef62e1 100644
--- a/src/core/ext/filters/client_channel/resolver_factory.h
+++ b/src/core/ext/filters/client_channel/resolver_factory.h
@@ -24,11 +24,11 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/resolver.h"
-#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/uri/uri_parser.h"
namespace grpc_core {
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 3a1c14c6f1..b98f238be0 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -30,10 +30,10 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/health/health_check_client.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
-#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
@@ -41,6 +41,7 @@
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
@@ -50,7 +51,9 @@
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/service_config.h"
#include "src/core/lib/transport/status_metadata.h"
+#include "src/core/lib/uri/uri_parser.h"
#define INTERNAL_REF_BITS 16
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
@@ -66,6 +69,10 @@ struct state_watcher {
grpc_closure closure;
grpc_subchannel* subchannel;
grpc_connectivity_state connectivity_state;
+ grpc_connectivity_state last_connectivity_state;
+ grpc_core::OrphanablePtr<grpc_core::HealthCheckClient> health_check_client;
+ grpc_closure health_check_closure;
+ grpc_connectivity_state health_state;
};
} // namespace
@@ -78,6 +85,12 @@ typedef struct external_state_watcher {
struct external_state_watcher* prev;
} external_state_watcher;
+namespace grpc_core {
+
+class ConnectedSubchannelStateWatcher;
+
+} // namespace grpc_core
+
struct grpc_subchannel {
grpc_connector* connector;
@@ -109,19 +122,24 @@ struct grpc_subchannel {
being setup */
grpc_pollset_set* pollset_set;
+ grpc_core::UniquePtr<char> health_check_service_name;
+
/** mutex protecting remaining elements */
gpr_mu mu;
- /** active connection, or null; of type grpc_core::ConnectedSubchannel
- */
+ /** active connection, or null */
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
+ grpc_core::OrphanablePtr<grpc_core::ConnectedSubchannelStateWatcher>
+ connected_subchannel_watcher;
/** have we seen a disconnection? */
bool disconnected;
/** are we connecting */
bool connecting;
+
/** connectivity state tracking */
grpc_connectivity_state_tracker state_tracker;
+ grpc_connectivity_state_tracker state_and_health_tracker;
external_state_watcher root_external_state_watcher;
@@ -153,6 +171,171 @@ struct grpc_subchannel_call {
grpc_millis deadline;
};
+static void maybe_start_connecting_locked(grpc_subchannel* c);
+
+static const char* subchannel_connectivity_state_change_string(
+ grpc_connectivity_state state) {
+ switch (state) {
+ case GRPC_CHANNEL_IDLE:
+ return "Subchannel state change to IDLE";
+ case GRPC_CHANNEL_CONNECTING:
+ return "Subchannel state change to CONNECTING";
+ case GRPC_CHANNEL_READY:
+ return "Subchannel state change to READY";
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ return "Subchannel state change to TRANSIENT_FAILURE";
+ case GRPC_CHANNEL_SHUTDOWN:
+ return "Subchannel state change to SHUTDOWN";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
+static void set_subchannel_connectivity_state_locked(
+ grpc_subchannel* c, grpc_connectivity_state state, grpc_error* error,
+ const char* reason) {
+ if (c->channelz_subchannel != nullptr) {
+ c->channelz_subchannel->AddTraceEvent(
+ grpc_core::channelz::ChannelTrace::Severity::Info,
+ grpc_slice_from_static_string(
+ subchannel_connectivity_state_change_string(state)));
+ }
+ grpc_connectivity_state_set(&c->state_tracker, state, error, reason);
+}
+
+namespace grpc_core {
+
+class ConnectedSubchannelStateWatcher
+ : public InternallyRefCounted<ConnectedSubchannelStateWatcher> {
+ public:
+ // Must be instantiated while holding c->mu.
+ explicit ConnectedSubchannelStateWatcher(grpc_subchannel* c)
+ : subchannel_(c) {
+ // Steal subchannel ref for connecting.
+ GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher");
+ GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting");
+ // Start watching for connectivity state changes.
+ // Callback uses initial ref to this.
+ GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this,
+ grpc_schedule_on_exec_ctx);
+ c->connected_subchannel->NotifyOnStateChange(c->pollset_set,
+ &pending_connectivity_state_,
+ &on_connectivity_changed_);
+ // Start health check if needed.
+ grpc_connectivity_state health_state = GRPC_CHANNEL_READY;
+ if (c->health_check_service_name != nullptr) {
+ health_check_client_ = grpc_core::MakeOrphanable<HealthCheckClient>(
+ c->health_check_service_name.get(), c->connected_subchannel,
+ c->pollset_set, c->channelz_subchannel);
+ GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this,
+ grpc_schedule_on_exec_ctx);
+ Ref().release(); // Ref for health callback tracked manually.
+ health_check_client_->NotifyOnHealthChange(&health_state_,
+ &on_health_changed_);
+ health_state = GRPC_CHANNEL_CONNECTING;
+ }
+ // Report initial state.
+ set_subchannel_connectivity_state_locked(
+ c, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "subchannel_connected");
+ grpc_connectivity_state_set(&c->state_and_health_tracker, health_state,
+ GRPC_ERROR_NONE, "subchannel_connected");
+ }
+
+ ~ConnectedSubchannelStateWatcher() {
+ GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher");
+ }
+
+ void Orphan() override { health_check_client_.reset(); }
+
+ private:
+ static void OnConnectivityChanged(void* arg, grpc_error* error) {
+ auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg);
+ grpc_subchannel* c = self->subchannel_;
+ {
+ MutexLock lock(&c->mu);
+ switch (self->pending_connectivity_state_) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ case GRPC_CHANNEL_SHUTDOWN: {
+ if (!c->disconnected && c->connected_subchannel != nullptr) {
+ if (grpc_trace_stream_refcount.enabled()) {
+ gpr_log(GPR_INFO,
+ "Connected subchannel %p of subchannel %p has gone into "
+ "%s. Attempting to reconnect.",
+ c->connected_subchannel.get(), c,
+ grpc_connectivity_state_name(
+ self->pending_connectivity_state_));
+ }
+ c->connected_subchannel.reset();
+ c->connected_subchannel_watcher.reset();
+ self->last_connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ set_subchannel_connectivity_state_locked(
+ c, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error),
+ "reflect_child");
+ grpc_connectivity_state_set(&c->state_and_health_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "reflect_child");
+ c->backoff_begun = false;
+ c->backoff->Reset();
+ maybe_start_connecting_locked(c);
+ } else {
+ self->last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN;
+ }
+ self->health_check_client_.reset();
+ break;
+ }
+ default: {
+ // In principle, this should never happen. We should not get
+ // a callback for READY, because that was the state we started
+ // this watch from. And a connected subchannel should never go
+ // from READY to CONNECTING or IDLE.
+ self->last_connectivity_state_ = self->pending_connectivity_state_;
+ set_subchannel_connectivity_state_locked(
+ c, self->pending_connectivity_state_, GRPC_ERROR_REF(error),
+ "reflect_child");
+ if (self->pending_connectivity_state_ != GRPC_CHANNEL_READY) {
+ grpc_connectivity_state_set(&c->state_and_health_tracker,
+ self->pending_connectivity_state_,
+ GRPC_ERROR_REF(error), "reflect_child");
+ }
+ c->connected_subchannel->NotifyOnStateChange(
+ nullptr, &self->pending_connectivity_state_,
+ &self->on_connectivity_changed_);
+ self = nullptr; // So we don't unref below.
+ }
+ }
+ }
+ // Don't unref until we've released the lock, because this might
+ // cause the subchannel (which contains the lock) to be destroyed.
+ if (self != nullptr) self->Unref();
+ }
+
+ static void OnHealthChanged(void* arg, grpc_error* error) {
+ auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg);
+ if (self->health_state_ == GRPC_CHANNEL_SHUTDOWN) {
+ self->Unref();
+ return;
+ }
+ grpc_subchannel* c = self->subchannel_;
+ MutexLock lock(&c->mu);
+ if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) {
+ grpc_connectivity_state_set(&c->state_and_health_tracker,
+ self->health_state_, GRPC_ERROR_REF(error),
+ "health_changed");
+ }
+ self->health_check_client_->NotifyOnHealthChange(&self->health_state_,
+ &self->on_health_changed_);
+ }
+
+ grpc_subchannel* subchannel_;
+ grpc_closure on_connectivity_changed_;
+ grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY;
+ grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_READY;
+ grpc_core::OrphanablePtr<grpc_core::HealthCheckClient> health_check_client_;
+ grpc_closure on_health_changed_;
+ grpc_connectivity_state health_state_ = GRPC_CHANNEL_CONNECTING;
+};
+
+} // namespace grpc_core
+
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
(grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
sizeof(grpc_subchannel_call)))
@@ -198,8 +381,10 @@ static void subchannel_destroy(void* arg, grpc_error* error) {
c->channelz_subchannel.reset();
}
gpr_free((void*)c->filters);
+ c->health_check_service_name.reset();
grpc_channel_args_destroy(c->args);
grpc_connectivity_state_destroy(&c->state_tracker);
+ grpc_connectivity_state_destroy(&c->state_and_health_tracker);
grpc_connector_unref(c->connector);
grpc_pollset_set_destroy(c->pollset_set);
grpc_subchannel_key_destroy(c->key);
@@ -262,6 +447,7 @@ static void disconnect(grpc_subchannel* c) {
grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Subchannel disconnected"));
c->connected_subchannel.reset();
+ c->connected_subchannel_watcher.reset();
gpr_mu_unlock(&c->mu);
}
@@ -337,6 +523,31 @@ static void parse_args_for_backoff_values(
.set_max_backoff(max_backoff_ms);
}
+namespace grpc_core {
+namespace {
+
+struct HealthCheckParams {
+ UniquePtr<char> service_name;
+
+ static void Parse(const grpc_json* field, HealthCheckParams* params) {
+ if (strcmp(field->key, "healthCheckConfig") == 0) {
+ if (field->type != GRPC_JSON_OBJECT) return;
+ for (grpc_json* sub_field = field->child; sub_field != nullptr;
+ sub_field = sub_field->next) {
+ if (sub_field->key == nullptr) return;
+ if (strcmp(sub_field->key, "serviceName") == 0) {
+ if (params->service_name != nullptr) return; // Duplicate.
+ if (sub_field->type != GRPC_JSON_STRING) return;
+ params->service_name.reset(gpr_strdup(sub_field->value));
+ }
+ }
+ }
+ }
+};
+
+} // namespace
+} // namespace grpc_core
+
grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
const grpc_subchannel_args* args) {
grpc_subchannel_key* key = grpc_subchannel_key_create(args);
@@ -387,12 +598,28 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
grpc_schedule_on_exec_ctx);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
+ grpc_connectivity_state_init(&c->state_and_health_tracker, GRPC_CHANNEL_IDLE,
+ "subchannel");
grpc_core::BackOff::Options backoff_options;
parse_args_for_backoff_values(args->args, &backoff_options,
&c->min_connect_timeout_ms);
c->backoff.Init(backoff_options);
gpr_mu_init(&c->mu);
+ // Check whether we should enable health checking.
+ const char* service_config_json = grpc_channel_arg_get_string(
+ grpc_channel_args_find(c->args, GRPC_ARG_SERVICE_CONFIG));
+ if (service_config_json != nullptr) {
+ grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
+ grpc_core::ServiceConfig::Create(service_config_json);
+ if (service_config != nullptr) {
+ grpc_core::HealthCheckParams params;
+ service_config->ParseGlobalParams(grpc_core::HealthCheckParams::Parse,
+ &params);
+ c->health_check_service_name = std::move(params.service_name);
+ }
+ }
+
const grpc_arg* arg =
grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ);
bool channelz_enabled =
@@ -436,17 +663,21 @@ static void continue_connect_locked(grpc_subchannel* c) {
c->next_attempt_deadline = c->backoff->NextAttemptTime();
args.deadline = std::max(c->next_attempt_deadline, min_deadline);
args.channel_args = c->args;
- grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
- GRPC_ERROR_NONE, "connecting");
+ set_subchannel_connectivity_state_locked(c, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_NONE, "connecting");
+ grpc_connectivity_state_set(&c->state_and_health_tracker,
+ GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
+ "connecting");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
&c->on_connected);
}
-grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c,
- grpc_error** error) {
- grpc_connectivity_state state;
+grpc_connectivity_state grpc_subchannel_check_connectivity(
+ grpc_subchannel* c, grpc_error** error, bool inhibit_health_checks) {
gpr_mu_lock(&c->mu);
- state = grpc_connectivity_state_get(&c->state_tracker, error);
+ grpc_connectivity_state_tracker* tracker =
+ inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker;
+ grpc_connectivity_state state = grpc_connectivity_state_get(tracker, error);
gpr_mu_unlock(&c->mu);
return state;
}
@@ -504,7 +735,8 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
/* Already connected: don't restart */
return;
}
- if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
+ if (!grpc_connectivity_state_has_watchers(&c->state_tracker) &&
+ !grpc_connectivity_state_has_watchers(&c->state_and_health_tracker)) {
/* Nobody is interested in connecting: so don't just yet */
return;
}
@@ -531,16 +763,18 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
void grpc_subchannel_notify_on_state_change(
grpc_subchannel* c, grpc_pollset_set* interested_parties,
- grpc_connectivity_state* state, grpc_closure* notify) {
+ grpc_connectivity_state* state, grpc_closure* notify,
+ bool inhibit_health_checks) {
+ grpc_connectivity_state_tracker* tracker =
+ inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker;
external_state_watcher* w;
-
if (state == nullptr) {
gpr_mu_lock(&c->mu);
for (w = c->root_external_state_watcher.next;
w != &c->root_external_state_watcher; w = w->next) {
if (w->notify == notify) {
- grpc_connectivity_state_notify_on_state_change(&c->state_tracker,
- nullptr, &w->closure);
+ grpc_connectivity_state_notify_on_state_change(tracker, nullptr,
+ &w->closure);
}
}
gpr_mu_unlock(&c->mu);
@@ -559,62 +793,12 @@ void grpc_subchannel_notify_on_state_change(
w->next = &c->root_external_state_watcher;
w->prev = w->next->prev;
w->next->prev = w->prev->next = w;
- grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
- &w->closure);
+ grpc_connectivity_state_notify_on_state_change(tracker, state, &w->closure);
maybe_start_connecting_locked(c);
gpr_mu_unlock(&c->mu);
}
}
-static void on_connected_subchannel_connectivity_changed(void* p,
- grpc_error* error) {
- state_watcher* connected_subchannel_watcher = static_cast<state_watcher*>(p);
- grpc_subchannel* c = connected_subchannel_watcher->subchannel;
- gpr_mu* mu = &c->mu;
-
- gpr_mu_lock(mu);
-
- switch (connected_subchannel_watcher->connectivity_state) {
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- case GRPC_CHANNEL_SHUTDOWN: {
- if (!c->disconnected && c->connected_subchannel != nullptr) {
- if (grpc_trace_stream_refcount.enabled()) {
- gpr_log(GPR_INFO,
- "Connected subchannel %p of subchannel %p has gone into %s. "
- "Attempting to reconnect.",
- c->connected_subchannel.get(), c,
- grpc_connectivity_state_name(
- connected_subchannel_watcher->connectivity_state));
- }
- c->connected_subchannel.reset();
- grpc_connectivity_state_set(&c->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "reflect_child");
- c->backoff_begun = false;
- c->backoff->Reset();
- maybe_start_connecting_locked(c);
- } else {
- connected_subchannel_watcher->connectivity_state =
- GRPC_CHANNEL_SHUTDOWN;
- }
- break;
- }
- default: {
- grpc_connectivity_state_set(
- &c->state_tracker, connected_subchannel_watcher->connectivity_state,
- GRPC_ERROR_REF(error), "reflect_child");
- GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- c->connected_subchannel->NotifyOnStateChange(
- nullptr, &connected_subchannel_watcher->connectivity_state,
- &connected_subchannel_watcher->closure);
- connected_subchannel_watcher = nullptr;
- }
- }
- gpr_mu_unlock(mu);
- GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher");
- gpr_free(connected_subchannel_watcher);
-}
-
static bool publish_transport_locked(grpc_subchannel* c) {
/* construct channel stack */
grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
@@ -641,17 +825,7 @@ static bool publish_transport_locked(grpc_subchannel* c) {
intptr_t socket_uuid = c->connecting_result.socket_uuid;
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
- /* initialize state watcher */
- state_watcher* connected_subchannel_watcher = static_cast<state_watcher*>(
- gpr_zalloc(sizeof(*connected_subchannel_watcher)));
- connected_subchannel_watcher->subchannel = c;
- connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY;
- GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure,
- on_connected_subchannel_connectivity_changed,
- connected_subchannel_watcher, grpc_schedule_on_exec_ctx);
-
if (c->disconnected) {
- gpr_free(connected_subchannel_watcher);
grpc_channel_stack_destroy(stk);
gpr_free(stk);
return false;
@@ -663,17 +837,10 @@ static bool publish_transport_locked(grpc_subchannel* c) {
gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
c->connected_subchannel.get(), c);
- /* setup subchannel watching connected subchannel for changes; subchannel
- ref for connecting is donated to the state watcher */
- GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
- c->connected_subchannel->NotifyOnStateChange(
- c->pollset_set, &connected_subchannel_watcher->connectivity_state,
- &connected_subchannel_watcher->closure);
-
- /* signal completion */
- grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY,
- GRPC_ERROR_NONE, "connected");
+ // Instantiate state watcher. Will clean itself up.
+ c->connected_subchannel_watcher =
+ grpc_core::MakeOrphanable<grpc_core::ConnectedSubchannelStateWatcher>(c);
+
return true;
}
@@ -690,8 +857,14 @@ static void on_subchannel_connected(void* arg, grpc_error* error) {
} else if (c->disconnected) {
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
} else {
+ set_subchannel_connectivity_state_locked(
+ c, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Connect Failed", &error, 1),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
+ "connect_failed");
grpc_connectivity_state_set(
- &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &c->state_and_health_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Connect Failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
@@ -760,9 +933,12 @@ static void get_call_status(grpc_subchannel_call* call,
grpc_error_get_status(error, call->deadline, status, nullptr, nullptr,
nullptr);
} else {
- GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
- *status =
- grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
+ if (md_batch->idx.named.grpc_status != nullptr) {
+ *status = grpc_get_status_code_from_metadata(
+ md_batch->idx.named.grpc_status->md);
+ } else {
+ *status = GRPC_STATUS_UNKNOWN;
+ }
}
GRPC_ERROR_UNREF(error);
}
@@ -924,15 +1100,8 @@ void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
grpc_subchannel_call** call) {
- size_t allocation_size =
- GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call));
- if (args.parent_data_size > 0) {
- allocation_size +=
- GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) +
- args.parent_data_size;
- } else {
- allocation_size += channel_stack_->call_stack_size;
- }
+ const size_t allocation_size =
+ GetInitialCallSizeEstimate(args.parent_data_size);
*call = static_cast<grpc_subchannel_call*>(
gpr_arena_alloc(args.arena, allocation_size));
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
@@ -962,4 +1131,18 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
return GRPC_ERROR_NONE;
}
+size_t ConnectedSubchannel::GetInitialCallSizeEstimate(
+ size_t parent_data_size) const {
+ size_t allocation_size =
+ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call));
+ if (parent_data_size > 0) {
+ allocation_size +=
+ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) +
+ parent_data_size;
+ } else {
+ allocation_size += channel_stack_->call_stack_size;
+ }
+ return allocation_size;
+}
+
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index c53b13e37e..ec3b4d86e4 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -103,6 +103,8 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> {
}
intptr_t socket_uuid() { return socket_uuid_; }
+ size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
+
private:
grpc_channel_stack* channel_stack_;
// ref counted pointer to the channelz node in this connected subchannel's
@@ -143,13 +145,14 @@ void* grpc_connected_subchannel_call_get_parent_data(
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
- grpc_subchannel* channel, grpc_error** error);
+ grpc_subchannel* channel, grpc_error** error, bool inhibit_health_checking);
/** Calls notify when the connectivity state of a channel becomes different
from *state. Updates *state with the new state of the channel. */
void grpc_subchannel_notify_on_state_change(
grpc_subchannel* channel, grpc_pollset_set* interested_parties,
- grpc_connectivity_state* state, grpc_closure* notify);
+ grpc_connectivity_state* state, grpc_closure* notify,
+ bool inhibit_health_checks);
/** retrieve the grpc_core::ConnectedSubchannel - or nullptr if not connected
* (which may happen before it initially connects or during transient failures)
diff --git a/src/core/ext/filters/client_channel/uri_parser.cc b/src/core/ext/filters/client_channel/uri_parser.cc
deleted file mode 100644
index 0572034a9c..0000000000
--- a/src/core/ext/filters/client_channel/uri_parser.cc
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include "src/core/ext/filters/client_channel/uri_parser.h"
-
-#include <string.h>
-
-#include <grpc/slice_buffer.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-
-#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/slice/percent_encoding.h"
-#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/slice/slice_string_helpers.h"
-
-/** a size_t default value... maps to all 1's */
-#define NOT_SET (~(size_t)0)
-
-static grpc_uri* bad_uri(const char* uri_text, size_t pos, const char* section,
- bool suppress_errors) {
- char* line_prefix;
- size_t pfx_len;
-
- if (!suppress_errors) {
- gpr_asprintf(&line_prefix, "bad uri.%s: '", section);
- pfx_len = strlen(line_prefix) + pos;
- gpr_log(GPR_ERROR, "%s%s'", line_prefix, uri_text);
- gpr_free(line_prefix);
-
- line_prefix = static_cast<char*>(gpr_malloc(pfx_len + 1));
- memset(line_prefix, ' ', pfx_len);
- line_prefix[pfx_len] = 0;
- gpr_log(GPR_ERROR, "%s^ here", line_prefix);
- gpr_free(line_prefix);
- }
-
- return nullptr;
-}
-
-/** Returns a copy of percent decoded \a src[begin, end) */
-static char* decode_and_copy_component(const char* src, size_t begin,
- size_t end) {
- grpc_slice component =
- (begin == NOT_SET || end == NOT_SET)
- ? grpc_empty_slice()
- : grpc_slice_from_copied_buffer(src + begin, end - begin);
- grpc_slice decoded_component =
- grpc_permissive_percent_decode_slice(component);
- char* out = grpc_dump_slice(decoded_component, GPR_DUMP_ASCII);
- grpc_slice_unref_internal(component);
- grpc_slice_unref_internal(decoded_component);
- return out;
-}
-
-static bool valid_hex(char c) {
- return ((c >= 'a') && (c <= 'f')) || ((c >= 'A') && (c <= 'F')) ||
- ((c >= '0') && (c <= '9'));
-}
-
-/** Returns how many chars to advance if \a uri_text[i] begins a valid \a pchar
- * production. If \a uri_text[i] introduces an invalid \a pchar (such as percent
- * sign not followed by two hex digits), NOT_SET is returned. */
-static size_t parse_pchar(const char* uri_text, size_t i) {
- /* pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
- * unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
- * pct-encoded = "%" HEXDIG HEXDIG
- * sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
- / "*" / "+" / "," / ";" / "=" */
- char c = uri_text[i];
- switch (c) {
- default:
- if (((c >= 'a') && (c <= 'z')) || ((c >= 'A') && (c <= 'Z')) ||
- ((c >= '0') && (c <= '9'))) {
- return 1;
- }
- break;
- case ':':
- case '@':
- case '-':
- case '.':
- case '_':
- case '~':
- case '!':
- case '$':
- case '&':
- case '\'':
- case '(':
- case ')':
- case '*':
- case '+':
- case ',':
- case ';':
- case '=':
- return 1;
- case '%': /* pct-encoded */
- if (valid_hex(uri_text[i + 1]) && valid_hex(uri_text[i + 2])) {
- return 2;
- }
- return NOT_SET;
- }
- return 0;
-}
-
-/* *( pchar / "?" / "/" ) */
-static int parse_fragment_or_query(const char* uri_text, size_t* i) {
- char c;
- while ((c = uri_text[*i]) != 0) {
- const size_t advance = parse_pchar(uri_text, *i); /* pchar */
- switch (advance) {
- case 0: /* uri_text[i] isn't in pchar */
- /* maybe it's ? or / */
- if (uri_text[*i] == '?' || uri_text[*i] == '/') {
- (*i)++;
- break;
- } else {
- return 1;
- }
- GPR_UNREACHABLE_CODE(return 0);
- default:
- (*i) += advance;
- break;
- case NOT_SET: /* uri_text[i] introduces an invalid URI */
- return 0;
- }
- }
- /* *i is the first uri_text position past the \a query production, maybe \0 */
- return 1;
-}
-
-static void parse_query_parts(grpc_uri* uri) {
- static const char* QUERY_PARTS_SEPARATOR = "&";
- static const char* QUERY_PARTS_VALUE_SEPARATOR = "=";
- GPR_ASSERT(uri->query != nullptr);
- if (uri->query[0] == '\0') {
- uri->query_parts = nullptr;
- uri->query_parts_values = nullptr;
- uri->num_query_parts = 0;
- return;
- }
-
- gpr_string_split(uri->query, QUERY_PARTS_SEPARATOR, &uri->query_parts,
- &uri->num_query_parts);
- uri->query_parts_values =
- static_cast<char**>(gpr_malloc(uri->num_query_parts * sizeof(char**)));
- for (size_t i = 0; i < uri->num_query_parts; i++) {
- char** query_param_parts;
- size_t num_query_param_parts;
- char* full = uri->query_parts[i];
- gpr_string_split(full, QUERY_PARTS_VALUE_SEPARATOR, &query_param_parts,
- &num_query_param_parts);
- GPR_ASSERT(num_query_param_parts > 0);
- uri->query_parts[i] = query_param_parts[0];
- if (num_query_param_parts > 1) {
- /* TODO(dgq): only the first value after the separator is considered.
- * Perhaps all chars after the first separator for the query part should
- * be included, even if they include the separator. */
- uri->query_parts_values[i] = query_param_parts[1];
- } else {
- uri->query_parts_values[i] = nullptr;
- }
- for (size_t j = 2; j < num_query_param_parts; j++) {
- gpr_free(query_param_parts[j]);
- }
- gpr_free(query_param_parts);
- gpr_free(full);
- }
-}
-
-grpc_uri* grpc_uri_parse(const char* uri_text, bool suppress_errors) {
- grpc_uri* uri;
- size_t scheme_begin = 0;
- size_t scheme_end = NOT_SET;
- size_t authority_begin = NOT_SET;
- size_t authority_end = NOT_SET;
- size_t path_begin = NOT_SET;
- size_t path_end = NOT_SET;
- size_t query_begin = NOT_SET;
- size_t query_end = NOT_SET;
- size_t fragment_begin = NOT_SET;
- size_t fragment_end = NOT_SET;
- size_t i;
-
- for (i = scheme_begin; uri_text[i] != 0; i++) {
- if (uri_text[i] == ':') {
- scheme_end = i;
- break;
- }
- if (uri_text[i] >= 'a' && uri_text[i] <= 'z') continue;
- if (uri_text[i] >= 'A' && uri_text[i] <= 'Z') continue;
- if (i != scheme_begin) {
- if (uri_text[i] >= '0' && uri_text[i] <= '9') continue;
- if (uri_text[i] == '+') continue;
- if (uri_text[i] == '-') continue;
- if (uri_text[i] == '.') continue;
- }
- break;
- }
- if (scheme_end == NOT_SET) {
- return bad_uri(uri_text, i, "scheme", suppress_errors);
- }
-
- if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') {
- authority_begin = scheme_end + 3;
- for (i = authority_begin; uri_text[i] != 0 && authority_end == NOT_SET;
- i++) {
- if (uri_text[i] == '/' || uri_text[i] == '?' || uri_text[i] == '#') {
- authority_end = i;
- }
- }
- if (authority_end == NOT_SET && uri_text[i] == 0) {
- authority_end = i;
- }
- if (authority_end == NOT_SET) {
- return bad_uri(uri_text, i, "authority", suppress_errors);
- }
- /* TODO(ctiller): parse the authority correctly */
- path_begin = authority_end;
- } else {
- path_begin = scheme_end + 1;
- }
-
- for (i = path_begin; uri_text[i] != 0; i++) {
- if (uri_text[i] == '?' || uri_text[i] == '#') {
- path_end = i;
- break;
- }
- }
- if (path_end == NOT_SET && uri_text[i] == 0) {
- path_end = i;
- }
- if (path_end == NOT_SET) {
- return bad_uri(uri_text, i, "path", suppress_errors);
- }
-
- if (uri_text[i] == '?') {
- query_begin = ++i;
- if (!parse_fragment_or_query(uri_text, &i)) {
- return bad_uri(uri_text, i, "query", suppress_errors);
- } else if (uri_text[i] != 0 && uri_text[i] != '#') {
- /* We must be at the end or at the beginning of a fragment */
- return bad_uri(uri_text, i, "query", suppress_errors);
- }
- query_end = i;
- }
- if (uri_text[i] == '#') {
- fragment_begin = ++i;
- if (!parse_fragment_or_query(uri_text, &i)) {
- return bad_uri(uri_text, i - fragment_end, "fragment", suppress_errors);
- } else if (uri_text[i] != 0) {
- /* We must be at the end */
- return bad_uri(uri_text, i, "fragment", suppress_errors);
- }
- fragment_end = i;
- }
-
- uri = static_cast<grpc_uri*>(gpr_zalloc(sizeof(*uri)));
- uri->scheme = decode_and_copy_component(uri_text, scheme_begin, scheme_end);
- uri->authority =
- decode_and_copy_component(uri_text, authority_begin, authority_end);
- uri->path = decode_and_copy_component(uri_text, path_begin, path_end);
- uri->query = decode_and_copy_component(uri_text, query_begin, query_end);
- uri->fragment =
- decode_and_copy_component(uri_text, fragment_begin, fragment_end);
- parse_query_parts(uri);
-
- return uri;
-}
-
-const char* grpc_uri_get_query_arg(const grpc_uri* uri, const char* key) {
- GPR_ASSERT(key != nullptr);
- if (key[0] == '\0') return nullptr;
-
- for (size_t i = 0; i < uri->num_query_parts; ++i) {
- if (0 == strcmp(key, uri->query_parts[i])) {
- return uri->query_parts_values[i];
- }
- }
- return nullptr;
-}
-
-void grpc_uri_destroy(grpc_uri* uri) {
- if (!uri) return;
- gpr_free(uri->scheme);
- gpr_free(uri->authority);
- gpr_free(uri->path);
- gpr_free(uri->query);
- for (size_t i = 0; i < uri->num_query_parts; ++i) {
- gpr_free(uri->query_parts[i]);
- gpr_free(uri->query_parts_values[i]);
- }
- gpr_free(uri->query_parts);
- gpr_free(uri->query_parts_values);
- gpr_free(uri->fragment);
- gpr_free(uri);
-}
diff --git a/src/core/ext/filters/client_channel/uri_parser.h b/src/core/ext/filters/client_channel/uri_parser.h
deleted file mode 100644
index d749f23308..0000000000
--- a/src/core/ext/filters/client_channel/uri_parser.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_URI_PARSER_H
-#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_URI_PARSER_H
-
-#include <grpc/support/port_platform.h>
-
-#include <stddef.h>
-
-typedef struct {
- char* scheme;
- char* authority;
- char* path;
- char* query;
- /** Query substrings separated by '&' */
- char** query_parts;
- /** Number of elements in \a query_parts and \a query_parts_values */
- size_t num_query_parts;
- /** Split each query part by '='. NULL if not present. */
- char** query_parts_values;
- char* fragment;
-} grpc_uri;
-
-/** parse a uri, return NULL on failure */
-grpc_uri* grpc_uri_parse(const char* uri_text, bool suppress_errors);
-
-/** return the part of a query string after the '=' in "?key=xxx&...", or NULL
- * if key is not present */
-const char* grpc_uri_get_query_arg(const grpc_uri* uri, const char* key);
-
-/** destroy a uri */
-void grpc_uri_destroy(grpc_uri* uri);
-
-#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_URI_PARSER_H */