aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-02-20 10:27:25 -0800
committerGravatar GitHub <noreply@github.com>2018-02-20 10:27:25 -0800
commit824b21e13ae50a44c34865abe8e454f13a82008d (patch)
tree54f0bfcec52ee192c2b7c7fd53448e1a05dcbf0e
parent2336c69d32d63ab87c2269c5a9eae2cba0c2cf0b (diff)
parentc887549f9296d893957c6df17deaf5e2c6f4f633 (diff)
Merge pull request #13870 from markdroth/c++_lb_policy
C++ LB policy API
-rw-r--r--BUILD2
-rw-r--r--build.yaml2
-rw-r--r--gRPC-C++.podspec1
-rw-r--r--gRPC-Core.podspec2
-rw-r--r--grpc.gemspec1
-rw-r--r--package.xml1
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc110
-rw-r--r--src/core/ext/filters/client_channel/client_channel_plugin.cc4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.cc123
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h331
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc2939
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h29
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc523
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc584
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc28
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/subchannel_list.h16
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.cc14
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.h50
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_registry.cc91
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_registry.h37
-rw-r--r--tools/doxygen/Doxyfile.core.internal1
-rw-r--r--tools/run_tests/generated/sources_and_headers.json4
22 files changed, 2362 insertions, 2531 deletions
diff --git a/BUILD b/BUILD
index 31dd9fded7..5489e4cf62 100644
--- a/BUILD
+++ b/BUILD
@@ -1148,7 +1148,6 @@ grpc_cc_library(
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
@@ -1177,7 +1176,6 @@ grpc_cc_library(
],
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
diff --git a/build.yaml b/build.yaml
index 74c76a4072..b0ae5793af 100644
--- a/build.yaml
+++ b/build.yaml
@@ -518,7 +518,6 @@ filegroups:
- name: grpc_lb_policy_grpclb
headers:
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
- - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -539,7 +538,6 @@ filegroups:
- name: grpc_lb_policy_grpclb_secure
headers:
- src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
- - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 2f5465c6b8..21d37c87f3 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -504,7 +504,6 @@ Pod::Spec.new do |s|
'src/core/lib/transport/transport_impl.h',
'src/core/lib/debug/trace.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
- 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 95db0d8e7d..f130d20cb3 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -448,7 +448,6 @@ Pod::Spec.new do |s|
'src/core/lib/transport/transport_impl.h',
'src/core/lib/debug/trace.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
- 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h',
@@ -937,7 +936,6 @@ Pod::Spec.new do |s|
'src/core/lib/transport/transport_impl.h',
'src/core/lib/debug/trace.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h',
- 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h',
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index ac901da0fe..398ce50705 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -374,7 +374,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/transport_impl.h )
s.files += %w( src/core/lib/debug/trace.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h )
- s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h )
diff --git a/package.xml b/package.xml
index 5575855648..d469d878b4 100644
--- a/package.xml
+++ b/package.xml
@@ -381,7 +381,6 @@
<file baseinstalldir="/" name="src/core/lib/transport/transport_impl.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/debug/trace.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" role="src" />
- <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" role="src" />
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 8aa9905d5c..174a15b447 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -175,7 +175,7 @@ typedef struct client_channel_channel_data {
/** combiner protecting all variables below in this data structure */
grpc_combiner* combiner;
/** currently active load balancer */
- grpc_lb_policy* lb_policy;
+ grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy;
/** retry throttle data */
grpc_server_retry_throttle_data* retry_throttle_data;
/** maps method names to method_parameters structs */
@@ -212,7 +212,7 @@ typedef struct {
channel_data* chand;
/** used as an identifier, don't dereference it because the LB policy may be
* non-existing when the callback is run */
- grpc_lb_policy* lb_policy;
+ grpc_core::LoadBalancingPolicy* lb_policy;
grpc_closure closure;
} reresolution_request_args;
@@ -223,11 +223,11 @@ typedef struct {
channel_data* chand;
grpc_closure on_changed;
grpc_connectivity_state state;
- grpc_lb_policy* lb_policy;
+ grpc_core::LoadBalancingPolicy* lb_policy;
} lb_policy_connectivity_watcher;
static void watch_lb_policy_locked(channel_data* chand,
- grpc_lb_policy* lb_policy,
+ grpc_core::LoadBalancingPolicy* lb_policy,
grpc_connectivity_state current_state);
static void set_channel_connectivity_state_locked(channel_data* chand,
@@ -241,15 +241,13 @@ static void set_channel_connectivity_state_locked(channel_data* chand,
if (chand->lb_policy != nullptr) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
/* cancel picks with wait_for_ready=false */
- grpc_lb_policy_cancel_picks_locked(
- chand->lb_policy,
+ chand->lb_policy->CancelMatchingPicksLocked(
/* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
/* check= */ 0, GRPC_ERROR_REF(error));
} else if (state == GRPC_CHANNEL_SHUTDOWN) {
/* cancel all picks */
- grpc_lb_policy_cancel_picks_locked(chand->lb_policy,
- /* mask= */ 0, /* check= */ 0,
- GRPC_ERROR_REF(error));
+ chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0,
+ GRPC_ERROR_REF(error));
}
}
if (grpc_client_channel_trace.enabled()) {
@@ -263,7 +261,7 @@ static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
lb_policy_connectivity_watcher* w =
static_cast<lb_policy_connectivity_watcher*>(arg);
/* check if the notification is for the latest policy */
- if (w->lb_policy == w->chand->lb_policy) {
+ if (w->lb_policy == w->chand->lb_policy.get()) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
w->lb_policy, grpc_connectivity_state_name(w->state));
@@ -279,7 +277,7 @@ static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
}
static void watch_lb_policy_locked(channel_data* chand,
- grpc_lb_policy* lb_policy,
+ grpc_core::LoadBalancingPolicy* lb_policy,
grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher* w =
static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w)));
@@ -289,8 +287,7 @@ static void watch_lb_policy_locked(channel_data* chand,
grpc_combiner_scheduler(chand->combiner));
w->state = current_state;
w->lb_policy = lb_policy;
- grpc_lb_policy_notify_on_state_change_locked(lb_policy, &w->state,
- &w->on_changed);
+ lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed);
}
static void start_resolving_locked(channel_data* chand) {
@@ -371,7 +368,7 @@ static void request_reresolution_locked(void* arg, grpc_error* error) {
channel_data* chand = args->chand;
// If this invocation is for a stale LB policy, treat it as an LB shutdown
// signal.
- if (args->lb_policy != chand->lb_policy || error != GRPC_ERROR_NONE ||
+ if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
chand->resolver == nullptr) {
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
gpr_free(args);
@@ -382,7 +379,7 @@ static void request_reresolution_locked(void* arg, grpc_error* error) {
}
chand->resolver->RequestReresolutionLocked();
// Give back the closure to the LB policy.
- grpc_lb_policy_set_reresolve_closure_locked(chand->lb_policy, &args->closure);
+ chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
}
static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
@@ -393,9 +390,10 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
}
// Extract the following fields from the resolver result, if non-NULL.
bool lb_policy_updated = false;
+ bool lb_policy_created = false;
char* lb_policy_name_dup = nullptr;
bool lb_policy_name_changed = false;
- grpc_lb_policy* new_lb_policy = nullptr;
+ grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy;
char* service_config_json = nullptr;
grpc_server_retry_throttle_data* retry_throttle_data = nullptr;
grpc_slice_hash_table* method_params_table = nullptr;
@@ -433,10 +431,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
- grpc_lb_policy_args lb_policy_args;
- lb_policy_args.args = chand->resolver_result;
- lb_policy_args.client_channel_factory = chand->client_channel_factory;
- lb_policy_args.combiner = chand->combiner;
+
// Check to see if we're already using the right LB policy.
// Note: It's safe to use chand->info_lb_policy_name here without
// taking a lock on chand->info_mu, because this function is the
@@ -448,10 +443,17 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
// Continue using the same LB policy. Update with new addresses.
lb_policy_updated = true;
- grpc_lb_policy_update_locked(chand->lb_policy, &lb_policy_args);
+ chand->lb_policy->UpdateLocked(*chand->resolver_result);
} else {
// Instantiate new LB policy.
- new_lb_policy = grpc_lb_policy_create(lb_policy_name, &lb_policy_args);
+ lb_policy_created = true;
+ grpc_core::LoadBalancingPolicy::Args lb_policy_args;
+ lb_policy_args.combiner = chand->combiner;
+ lb_policy_args.client_channel_factory = chand->client_channel_factory;
+ lb_policy_args.args = chand->resolver_result;
+ new_lb_policy =
+ grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
+ lb_policy_name, lb_policy_args);
if (new_lb_policy == nullptr) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
lb_policy_name);
@@ -460,12 +462,11 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
static_cast<reresolution_request_args*>(
gpr_zalloc(sizeof(*args)));
args->chand = chand;
- args->lb_policy = new_lb_policy;
+ args->lb_policy = new_lb_policy.get();
GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
grpc_combiner_scheduler(chand->combiner));
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
- grpc_lb_policy_set_reresolve_closure_locked(new_lb_policy,
- &args->closure);
+ new_lb_policy->SetReresolutionClosureLocked(&args->closure);
}
}
// Find service config.
@@ -548,14 +549,14 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
if (chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
- chand->lb_policy);
+ chand->lb_policy.get());
}
- grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
+ grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
- grpc_lb_policy_shutdown_locked(chand->lb_policy, new_lb_policy);
- GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
+ chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
+ chand->lb_policy.reset();
}
- chand->lb_policy = new_lb_policy;
+ chand->lb_policy = std::move(new_lb_policy);
}
// Now that we've swapped out the relevant fields of chand, check for
// error or shutdown.
@@ -583,21 +584,20 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_error* state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
- if (new_lb_policy != nullptr) {
+ if (lb_policy_created) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
}
GRPC_ERROR_UNREF(state_error);
- state =
- grpc_lb_policy_check_connectivity_locked(new_lb_policy, &state_error);
- grpc_pollset_set_add_pollset_set(new_lb_policy->interested_parties,
+ state = chand->lb_policy->CheckConnectivityLocked(&state_error);
+ grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
if (chand->exit_idle_when_lb_policy_arrives) {
- grpc_lb_policy_exit_idle_locked(new_lb_policy);
+ chand->lb_policy->ExitIdleLocked();
chand->exit_idle_when_lb_policy_arrives = false;
}
- watch_lb_policy_locked(chand, new_lb_policy, state);
+ watch_lb_policy_locked(chand, chand->lb_policy.get(), state);
}
if (!lb_policy_updated) {
set_channel_connectivity_state_locked(
@@ -632,8 +632,8 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
op->send_ping.on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
} else {
- grpc_lb_policy_ping_one_locked(
- chand->lb_policy, op->send_ping.on_initiate, op->send_ping.on_ack);
+ chand->lb_policy->PingOneLocked(op->send_ping.on_initiate,
+ op->send_ping.on_ack);
op->bind_pollset = nullptr;
}
op->send_ping.on_initiate = nullptr;
@@ -652,11 +652,9 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
}
if (chand->lb_policy != nullptr) {
- grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
+ grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
- grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr);
- GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
- chand->lb_policy = nullptr;
+ chand->lb_policy.reset();
}
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
@@ -786,10 +784,9 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
grpc_client_channel_factory_unref(chand->client_channel_factory);
}
if (chand->lb_policy != nullptr) {
- grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties,
+ grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
- grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr);
- GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
+ chand->lb_policy.reset();
}
gpr_free(chand->info_lb_policy_name);
gpr_free(chand->info_service_config_json);
@@ -849,7 +846,7 @@ typedef struct client_channel_call_data {
grpc_subchannel_call* subchannel_call;
grpc_error* error;
- grpc_lb_policy_pick_state pick;
+ grpc_core::LoadBalancingPolicy::PickState pick;
grpc_closure lb_pick_closure;
grpc_closure lb_pick_cancel_closure;
@@ -1070,15 +1067,14 @@ static void pick_callback_cancel_locked(void* arg, grpc_error* error) {
if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
- chand, calld, chand->lb_policy);
+ chand, calld, chand->lb_policy.get());
}
- grpc_lb_policy_cancel_pick_locked(chand->lb_policy, &calld->pick,
- GRPC_ERROR_REF(error));
+ chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
}
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
}
-// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
+// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
// Unrefs the LB policy and invokes async_pick_done_locked().
static void pick_callback_done_locked(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
@@ -1092,15 +1088,14 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) {
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
}
-// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
-// If the pick was completed synchronously, unrefs the LB policy and
-// returns true.
+// Starts a pick on chand->lb_policy.
+// Returns true if pick is completed synchronously.
static bool pick_callback_start_locked(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
- chand, calld, chand->lb_policy);
+ chand, calld, chand->lb_policy.get());
}
apply_service_config_to_call_locked(elem);
// If the application explicitly set wait_for_ready, use that.
@@ -1130,10 +1125,9 @@ static bool pick_callback_start_locked(grpc_call_element* elem) {
grpc_combiner_scheduler(chand->combiner));
calld->pick.on_complete = &calld->lb_pick_closure;
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
- const bool pick_done =
- grpc_lb_policy_pick_locked(chand->lb_policy, &calld->pick);
+ const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
if (pick_done) {
- /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
+ // Pick completed synchronously.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
chand, calld);
@@ -1498,7 +1492,7 @@ const grpc_channel_filter grpc_client_channel_filter = {
static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
channel_data* chand = static_cast<channel_data*>(arg);
if (chand->lb_policy != nullptr) {
- grpc_lb_policy_exit_idle_locked(chand->lb_policy);
+ chand->lb_policy->ExitIdleLocked();
} else {
chand->exit_idle_when_lb_policy_arrives = true;
if (!chand->started_resolving && chand->resolver != nullptr) {
diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.cc b/src/core/ext/filters/client_channel/client_channel_plugin.cc
index 9172fa781c..3c3a97532f 100644
--- a/src/core/ext/filters/client_channel/client_channel_plugin.cc
+++ b/src/core/ext/filters/client_channel/client_channel_plugin.cc
@@ -63,7 +63,7 @@ static bool set_default_host_if_unset(grpc_channel_stack_builder* builder,
}
void grpc_client_channel_init(void) {
- grpc_lb_policy_registry_init();
+ grpc_core::LoadBalancingPolicyRegistry::Builder::InitRegistry();
grpc_core::ResolverRegistry::Builder::InitRegistry();
grpc_retry_throttle_map_init();
grpc_proxy_mapper_registry_init();
@@ -83,5 +83,5 @@ void grpc_client_channel_shutdown(void) {
grpc_proxy_mapper_registry_shutdown();
grpc_retry_throttle_map_shutdown();
grpc_core::ResolverRegistry::Builder::ShutdownRegistry();
- grpc_lb_policy_registry_shutdown();
+ grpc_core::LoadBalancingPolicyRegistry::Builder::ShutdownRegistry();
}
diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc
index 27fb2ad1f4..59f4cdafb3 100644
--- a/src/core/ext/filters/client_channel/lb_policy.cc
+++ b/src/core/ext/filters/client_channel/lb_policy.cc
@@ -22,121 +22,36 @@
grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(
false, "lb_policy_refcount");
-void grpc_lb_policy_init(grpc_lb_policy* policy,
- const grpc_lb_policy_vtable* vtable,
- grpc_combiner* combiner) {
- policy->vtable = vtable;
- gpr_ref_init(&policy->refs, 1);
- policy->interested_parties = grpc_pollset_set_create();
- policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy");
-}
-
-#ifndef NDEBUG
-void grpc_lb_policy_ref(grpc_lb_policy* lb_policy, const char* file, int line,
- const char* reason) {
- if (grpc_trace_lb_policy_refcount.enabled()) {
- gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count);
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "LB_POLICY:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy,
- old_refs, old_refs + 1, reason);
- }
-#else
-void grpc_lb_policy_ref(grpc_lb_policy* lb_policy) {
-#endif
- gpr_ref(&lb_policy->refs);
-}
-
-#ifndef NDEBUG
-void grpc_lb_policy_unref(grpc_lb_policy* lb_policy, const char* file, int line,
- const char* reason) {
- if (grpc_trace_lb_policy_refcount.enabled()) {
- gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count);
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "LB_POLICY:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy,
- old_refs, old_refs - 1, reason);
- }
-#else
-void grpc_lb_policy_unref(grpc_lb_policy* lb_policy) {
-#endif
- if (gpr_unref(&lb_policy->refs)) {
- grpc_pollset_set_destroy(lb_policy->interested_parties);
- grpc_combiner* combiner = lb_policy->combiner;
- lb_policy->vtable->destroy(lb_policy);
- GRPC_COMBINER_UNREF(combiner, "lb_policy");
- }
-}
-
-void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
- grpc_lb_policy* new_policy) {
- policy->vtable->shutdown_locked(policy, new_policy);
-}
-
-int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
- grpc_lb_policy_pick_state* pick) {
- return policy->vtable->pick_locked(policy, pick);
-}
-
-void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy,
- grpc_lb_policy_pick_state* pick,
- grpc_error* error) {
- policy->vtable->cancel_pick_locked(policy, pick, error);
-}
-
-void grpc_lb_policy_cancel_picks_locked(grpc_lb_policy* policy,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq,
- grpc_error* error) {
- policy->vtable->cancel_picks_locked(policy, initial_metadata_flags_mask,
- initial_metadata_flags_eq, error);
-}
+namespace grpc_core {
-void grpc_lb_policy_exit_idle_locked(grpc_lb_policy* policy) {
- policy->vtable->exit_idle_locked(policy);
-}
+LoadBalancingPolicy::LoadBalancingPolicy(const Args& args)
+ : InternallyRefCountedWithTracing(&grpc_trace_lb_policy_refcount),
+ combiner_(GRPC_COMBINER_REF(args.combiner, "lb_policy")),
+ client_channel_factory_(args.client_channel_factory),
+ interested_parties_(grpc_pollset_set_create()),
+ request_reresolution_(nullptr) {}
-void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy,
- grpc_closure* on_initiate,
- grpc_closure* on_ack) {
- policy->vtable->ping_one_locked(policy, on_initiate, on_ack);
+LoadBalancingPolicy::~LoadBalancingPolicy() {
+ grpc_pollset_set_destroy(interested_parties_);
+ GRPC_COMBINER_UNREF(combiner_, "lb_policy");
}
-void grpc_lb_policy_notify_on_state_change_locked(
- grpc_lb_policy* policy, grpc_connectivity_state* state,
- grpc_closure* closure) {
- policy->vtable->notify_on_state_change_locked(policy, state, closure);
-}
-
-grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
- grpc_lb_policy* policy, grpc_error** connectivity_error) {
- return policy->vtable->check_connectivity_locked(policy, connectivity_error);
-}
-
-void grpc_lb_policy_update_locked(grpc_lb_policy* policy,
- const grpc_lb_policy_args* lb_policy_args) {
- policy->vtable->update_locked(policy, lb_policy_args);
-}
-
-void grpc_lb_policy_set_reresolve_closure_locked(
- grpc_lb_policy* policy, grpc_closure* request_reresolution) {
- GPR_ASSERT(policy->request_reresolution == nullptr);
- policy->request_reresolution = request_reresolution;
-}
-
-void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
- grpc_core::TraceFlag* grpc_lb_trace,
- grpc_error* error) {
- if (policy->request_reresolution != nullptr) {
- GRPC_CLOSURE_SCHED(policy->request_reresolution, error);
- policy->request_reresolution = nullptr;
+void LoadBalancingPolicy::TryReresolutionLocked(
+ grpc_core::TraceFlag* grpc_lb_trace, grpc_error* error) {
+ if (request_reresolution_ != nullptr) {
+ GRPC_CLOSURE_SCHED(request_reresolution_, error);
+ request_reresolution_ = nullptr;
if (grpc_lb_trace->enabled()) {
gpr_log(GPR_DEBUG,
"%s %p: scheduling re-resolution closure with error=%s.",
- grpc_lb_trace->name(), policy, grpc_error_string(error));
+ grpc_lb_trace->name(), this, grpc_error_string(error));
}
} else {
if (grpc_lb_trace->enabled()) {
gpr_log(GPR_DEBUG, "%s %p: no available re-resolution closure.",
- grpc_lb_trace->name(), policy);
+ grpc_lb_trace->name(), this);
}
}
}
+
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 6edd314d5e..6de652747e 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -19,182 +19,181 @@
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H
+#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/lib/gprpp/abstract.h"
+#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
-/** A load balancing policy: specified by a vtable and a struct (which
- is expected to be extended to contain some parameters) */
-typedef struct grpc_lb_policy grpc_lb_policy;
-typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable;
-typedef struct grpc_lb_policy_args grpc_lb_policy_args;
-
extern grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount;
-struct grpc_lb_policy {
- const grpc_lb_policy_vtable* vtable;
- gpr_refcount refs;
- /* owned pointer to interested parties in load balancing decisions */
- grpc_pollset_set* interested_parties;
- /* combiner under which lb_policy actions take place */
- grpc_combiner* combiner;
- /* callback to force a re-resolution */
- grpc_closure* request_reresolution;
-};
-
-/// State used for an LB pick.
-typedef struct grpc_lb_policy_pick_state {
- /// Initial metadata associated with the picking call.
- grpc_metadata_batch* initial_metadata;
- /// Bitmask used for selective cancelling. See \a
- /// grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in
- /// grpc_types.h.
- uint32_t initial_metadata_flags;
- /// Storage for LB token in \a initial_metadata, or NULL if not used.
- grpc_linked_mdelem lb_token_mdelem_storage;
- /// Closure to run when pick is complete, if not completed synchronously.
- grpc_closure* on_complete;
- /// Will be set to the selected subchannel, or nullptr on failure or when
- /// the LB policy decides to drop the call.
- grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
- /// Will be populated with context to pass to the subchannel call, if needed.
- grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
- /// Upon success, \a *user_data will be set to whatever opaque information
- /// may need to be propagated from the LB policy, or NULL if not needed.
- void** user_data;
- /// Next pointer. For internal use by LB policy.
- struct grpc_lb_policy_pick_state* next;
-} grpc_lb_policy_pick_state;
-
-struct grpc_lb_policy_vtable {
- void (*destroy)(grpc_lb_policy* policy);
-
- /// \see grpc_lb_policy_shutdown_locked().
- void (*shutdown_locked)(grpc_lb_policy* policy, grpc_lb_policy* new_policy);
-
- /** \see grpc_lb_policy_pick */
- int (*pick_locked)(grpc_lb_policy* policy, grpc_lb_policy_pick_state* pick);
-
- /** \see grpc_lb_policy_cancel_pick */
- void (*cancel_pick_locked)(grpc_lb_policy* policy,
- grpc_lb_policy_pick_state* pick,
+namespace grpc_core {
+
+/// Interface for load balancing policies.
+///
+/// Note: All methods with a "Locked" suffix must be called from the
+/// combiner passed to the constructor.
+///
+/// Any I/O done by the LB policy should be done under the pollset_set
+/// returned by \a interested_parties().
+class LoadBalancingPolicy
+ : public InternallyRefCountedWithTracing<LoadBalancingPolicy> {
+ public:
+ struct Args {
+ /// The combiner under which all LB policy calls will be run.
+ /// Policy does NOT take ownership of the reference to the combiner.
+ // TODO(roth): Once we have a C++-like interface for combiners, this
+ // API should change to take a smart pointer that does pass ownership
+ // of a reference.
+ grpc_combiner* combiner = nullptr;
+ /// Used to create channels and subchannels.
+ grpc_client_channel_factory* client_channel_factory = nullptr;
+ /// Channel args from the resolver.
+ /// Note that the LB policy gets the set of addresses from the
+ /// GRPC_ARG_LB_ADDRESSES channel arg.
+ grpc_channel_args* args = nullptr;
+ };
+
+ /// State used for an LB pick.
+ struct PickState {
+ /// Initial metadata associated with the picking call.
+ grpc_metadata_batch* initial_metadata;
+ /// Bitmask used for selective cancelling. See
+ /// \a CancelMatchingPicksLocked() and \a GRPC_INITIAL_METADATA_* in
+ /// grpc_types.h.
+ uint32_t initial_metadata_flags;
+ /// Storage for LB token in \a initial_metadata, or nullptr if not used.
+ grpc_linked_mdelem lb_token_mdelem_storage;
+ /// Closure to run when pick is complete, if not completed synchronously.
+ grpc_closure* on_complete;
+ /// Will be set to the selected subchannel, or nullptr on failure or when
+ /// the LB policy decides to drop the call.
+ RefCountedPtr<ConnectedSubchannel> connected_subchannel;
+ /// Will be populated with context to pass to the subchannel call, if
+ /// needed.
+ grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
+ /// Upon success, \a *user_data will be set to whatever opaque information
+ /// may need to be propagated from the LB policy, or nullptr if not needed.
+ // TODO(roth): As part of revamping our metadata APIs, try to find a
+ // way to clean this up and C++-ify it.
+ void** user_data;
+ /// Next pointer. For internal use by LB policy.
+ PickState* next;
+ };
+
+ // Not copyable nor movable.
+ LoadBalancingPolicy(const LoadBalancingPolicy&) = delete;
+ LoadBalancingPolicy& operator=(const LoadBalancingPolicy&) = delete;
+
+ /// Updates the policy with a new set of \a args from the resolver.
+ /// Note that the LB policy gets the set of addresses from the
+ /// GRPC_ARG_LB_ADDRESSES channel arg.
+ virtual void UpdateLocked(const grpc_channel_args& args) GRPC_ABSTRACT;
+
+ /// Finds an appropriate subchannel for a call, based on data in \a pick.
+ /// \a pick must remain alive until the pick is complete.
+ ///
+ /// If the pick succeeds and a result is known immediately, returns true.
+ /// Otherwise, \a pick->on_complete will be invoked once the pick is
+ /// complete with its error argument set to indicate success or failure.
+ virtual bool PickLocked(PickState* pick) GRPC_ABSTRACT;
+
+ /// Cancels \a pick.
+ /// The \a on_complete callback of the pending pick will be invoked with
+ /// \a pick->connected_subchannel set to null.
+ virtual void CancelPickLocked(PickState* pick,
+ grpc_error* error) GRPC_ABSTRACT;
+
+ /// Cancels all pending picks for which their \a initial_metadata_flags (as
+ /// given in the call to \a PickLocked()) matches
+ /// \a initial_metadata_flags_eq when ANDed with
+ /// \a initial_metadata_flags_mask.
+ virtual void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error* error) GRPC_ABSTRACT;
+
+ /// Requests a notification when the connectivity state of the policy
+ /// changes from \a *state. When that happens, sets \a *state to the
+ /// new state and schedules \a closure.
+ virtual void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
+ grpc_closure* closure) GRPC_ABSTRACT;
+
+ /// Returns the policy's current connectivity state. Sets \a error to
+ /// the associated error, if any.
+ virtual grpc_connectivity_state CheckConnectivityLocked(
+ grpc_error** connectivity_error) GRPC_ABSTRACT;
+
+ /// Hands off pending picks to \a new_policy.
+ virtual void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy)
+ GRPC_ABSTRACT;
+
+ /// Performs a connected subchannel ping via \a ConnectedSubchannel::Ping()
+ /// against one of the connected subchannels managed by the policy.
+ /// Note: This is intended only for use in tests.
+ virtual void PingOneLocked(grpc_closure* on_initiate,
+ grpc_closure* on_ack) GRPC_ABSTRACT;
+
+ /// Tries to enter a READY connectivity state.
+ /// TODO(roth): As part of restructuring how we handle IDLE state,
+ /// consider whether this method is still needed.
+ virtual void ExitIdleLocked() GRPC_ABSTRACT;
+
+ void Orphan() override {
+ // Invoke ShutdownAndUnrefLocked() inside of the combiner.
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_CREATE(&LoadBalancingPolicy::ShutdownAndUnrefLocked, this,
+ grpc_combiner_scheduler(combiner_)),
+ GRPC_ERROR_NONE);
+ }
+
+ /// Sets the re-resolution closure to \a request_reresolution.
+ void SetReresolutionClosureLocked(grpc_closure* request_reresolution) {
+ GPR_ASSERT(request_reresolution_ == nullptr);
+ request_reresolution_ = request_reresolution;
+ }
+
+ grpc_pollset_set* interested_parties() const { return interested_parties_; }
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ explicit LoadBalancingPolicy(const Args& args);
+ virtual ~LoadBalancingPolicy();
+
+ grpc_combiner* combiner() const { return combiner_; }
+ grpc_client_channel_factory* client_channel_factory() const {
+ return client_channel_factory_;
+ }
+
+ /// Shuts down the policy. Any pending picks that have not been
+ /// handed off to a new policy via HandOffPendingPicksLocked() will be
+ /// failed.
+ virtual void ShutdownLocked() GRPC_ABSTRACT;
+
+ /// Tries to request a re-resolution.
+ void TryReresolutionLocked(grpc_core::TraceFlag* grpc_lb_trace,
grpc_error* error);
- /** \see grpc_lb_policy_cancel_picks */
- void (*cancel_picks_locked)(grpc_lb_policy* policy,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq,
- grpc_error* error);
-
- /** \see grpc_lb_policy_ping_one */
- void (*ping_one_locked)(grpc_lb_policy* policy, grpc_closure* on_initiate,
- grpc_closure* on_ack);
-
- /** Try to enter a READY connectivity state */
- void (*exit_idle_locked)(grpc_lb_policy* policy);
-
- /** check the current connectivity of the lb_policy */
- grpc_connectivity_state (*check_connectivity_locked)(
- grpc_lb_policy* policy, grpc_error** connectivity_error);
-
- /** call notify when the connectivity state of a channel changes from *state.
- Updates *state with the new state of the policy. Calling with a NULL \a
- state cancels the subscription. */
- void (*notify_on_state_change_locked)(grpc_lb_policy* policy,
- grpc_connectivity_state* state,
- grpc_closure* closure);
-
- void (*update_locked)(grpc_lb_policy* policy,
- const grpc_lb_policy_args* args);
+ private:
+ static void ShutdownAndUnrefLocked(void* arg, grpc_error* ignored) {
+ LoadBalancingPolicy* policy = static_cast<LoadBalancingPolicy*>(arg);
+ policy->ShutdownLocked();
+ policy->Unref();
+ }
+
+ /// Combiner under which LB policy actions take place.
+ grpc_combiner* combiner_;
+ /// Client channel factory, used to create channels and subchannels.
+ grpc_client_channel_factory* client_channel_factory_;
+ /// Owned pointer to interested parties in load balancing decisions.
+ grpc_pollset_set* interested_parties_;
+ /// Callback to force a re-resolution.
+ grpc_closure* request_reresolution_;
};
-#ifndef NDEBUG
-#define GRPC_LB_POLICY_REF(p, r) \
- grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_LB_POLICY_UNREF(p, r) \
- grpc_lb_policy_unref((p), __FILE__, __LINE__, (r))
-void grpc_lb_policy_ref(grpc_lb_policy* policy, const char* file, int line,
- const char* reason);
-void grpc_lb_policy_unref(grpc_lb_policy* policy, const char* file, int line,
- const char* reason);
-#else // !NDEBUG
-#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
-#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p))
-void grpc_lb_policy_ref(grpc_lb_policy* policy);
-void grpc_lb_policy_unref(grpc_lb_policy* policy);
-#endif
-
-/** called by concrete implementations to initialize the base struct */
-void grpc_lb_policy_init(grpc_lb_policy* policy,
- const grpc_lb_policy_vtable* vtable,
- grpc_combiner* combiner);
-
-/// Shuts down \a policy.
-/// If \a new_policy is non-null, any pending picks will be restarted
-/// on that policy; otherwise, they will be failed.
-void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy,
- grpc_lb_policy* new_policy);
-
-/** Finds an appropriate subchannel for a call, based on data in \a pick.
- \a pick must remain alive until the pick is complete.
-
- If the pick succeeds and a result is known immediately, a non-zero
- value will be returned. Otherwise, \a pick->on_complete will be invoked
- once the pick is complete with its error argument set to indicate
- success or failure.
-
- Any IO should be done under the \a interested_parties \a grpc_pollset_set
- in the \a grpc_lb_policy struct. */
-int grpc_lb_policy_pick_locked(grpc_lb_policy* policy,
- grpc_lb_policy_pick_state* pick);
-
-/** Perform a connected subchannel ping (see \a
- grpc_core::ConnectedSubchannel::Ping)
- against one of the connected subchannels managed by \a policy. */
-void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy,
- grpc_closure* on_initiate,
- grpc_closure* on_ack);
-
-/** Cancel picks for \a pick.
- The \a on_complete callback of the pending picks will be invoked with \a
- *target set to NULL. */
-void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy,
- grpc_lb_policy_pick_state* pick,
- grpc_error* error);
-
-/** Cancel all pending picks for which their \a initial_metadata_flags (as given
- in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
- when AND'd with \a initial_metadata_flags_mask */
-void grpc_lb_policy_cancel_picks_locked(grpc_lb_policy* policy,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq,
- grpc_error* error);
-
-/** Try to enter a READY connectivity state */
-void grpc_lb_policy_exit_idle_locked(grpc_lb_policy* policy);
-
-/* Call notify when the connectivity state of a channel changes from \a *state.
- * Updates \a *state with the new state of the policy */
-void grpc_lb_policy_notify_on_state_change_locked(
- grpc_lb_policy* policy, grpc_connectivity_state* state,
- grpc_closure* closure);
-
-grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
- grpc_lb_policy* policy, grpc_error** connectivity_error);
-
-/** Update \a policy with \a lb_policy_args. */
-void grpc_lb_policy_update_locked(grpc_lb_policy* policy,
- const grpc_lb_policy_args* lb_policy_args);
-
-/** Set the re-resolution closure to \a request_reresolution. */
-void grpc_lb_policy_set_reresolve_closure_locked(
- grpc_lb_policy* policy, grpc_closure* request_reresolution);
-
-/** Try to request a re-resolution. It's NOT a public API; it's only for use by
- the LB policy implementations. */
-void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
- grpc_core::TraceFlag* grpc_lb_trace,
- grpc_error* error);
+} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index da82b3f4da..11163a56dc 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -16,68 +16,48 @@
*
*/
-/** Implementation of the gRPC LB policy.
- *
- * This policy takes as input a set of resolved addresses {a1..an} for which the
- * LB set was set (it's the resolver's responsibility to ensure this). That is
- * to say, {a1..an} represent a collection of LB servers.
- *
- * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
- * This channel behaves just like a regular channel. In particular, the
- * constructed URI over the addresses a1..an will use the default pick first
- * policy to select from this list of LB server backends.
- *
- * The first time the policy gets a request for a pick, a ping, or to exit the
- * idle state, \a query_for_backends_locked() is called. This function sets up
- * and initiates the internal communication with the LB server. In particular,
- * it's responsible for instantiating the internal *streaming* call to the LB
- * server (whichever address from {a1..an} pick-first chose). This call is
- * serviced by two callbacks, \a lb_on_server_status_received and \a
- * lb_on_response_received. The former will be called when the call to the LB
- * server completes. This can happen if the LB server closes the connection or
- * if this policy itself cancels the call (for example because it's shutting
- * down). If the internal call times out, the usual behavior of pick-first
- * applies, continuing to pick from the list {a1..an}.
- *
- * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a
- * res_recv. An invalid one results in the termination of the streaming call. A
- * new streaming call should be created if possible, failing the original call
- * otherwise. For a valid \a LoadBalancingResponse, the server list of actual
- * backends is extracted. A Round Robin policy will be created from this list.
- * There are two possible scenarios:
- *
- * 1. This is the first server list received. There was no previous instance of
- * the Round Robin policy. \a rr_handover_locked() will instantiate the RR
- * policy and perform all the pending operations over it.
- * 2. There's already a RR policy instance active. We need to introduce the new
- * one build from the new serverlist, but taking care not to disrupt the
- * operations in progress over the old RR instance. This is done by
- * decreasing the reference count on the old policy. The moment no more
- * references are held on the old RR policy, it'll be destroyed and \a
- * on_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
- * state. At this point we can transition to a new RR instance safely, which
- * is done once again via \a rr_handover_locked().
- *
- *
- * Once a RR policy instance is in place (and getting updated as described),
- * calls to for a pick, a ping or a cancellation will be serviced right away by
- * forwarding them to the RR instance. Any time there's no RR policy available
- * (ie, right after the creation of the gRPCLB policy, if an empty serverlist is
- * received, etc), pick/ping requests are added to a list of pending picks/pings
- * to be flushed and serviced as part of \a rr_handover_locked() the moment the
- * RR policy instance becomes available.
- *
- * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
- * high level design and details. */
-
-/* TODO(dgq):
- * - Implement LB service forwarding (point 2c. in the doc's diagram).
- */
+/// Implementation of the gRPC LB policy.
+///
+/// This policy takes as input a list of resolved addresses, which must
+/// include at least one balancer address.
+///
+/// An internal channel (\a lb_channel_) is created for the addresses
+/// from that are balancers. This channel behaves just like a regular
+/// channel that uses pick_first to select from the list of balancer
+/// addresses.
+///
+/// The first time the policy gets a request for a pick, a ping, or to exit
+/// the idle state, \a StartPickingLocked() is called. This method is
+/// responsible for instantiating the internal *streaming* call to the LB
+/// server (whichever address pick_first chose). The call will be complete
+/// when either the balancer sends status or when we cancel the call (e.g.,
+/// because we are shutting down). In needed, we retry the call. If we
+/// received at least one valid message from the server, a new call attempt
+/// will be made immediately; otherwise, we apply back-off delays between
+/// attempts.
+///
+/// We maintain an internal round_robin policy instance for distributing
+/// requests across backends. Whenever we receive a new serverlist from
+/// the balancer, we update the round_robin policy with the new list of
+/// addresses. If we cannot communicate with the balancer on startup,
+/// however, we may enter fallback mode, in which case we will populate
+/// the RR policy's addresses from the backend addresses returned by the
+/// resolver.
+///
+/// Once an RR policy instance is in place (and getting updated as described),
+/// calls for a pick, a ping, or a cancellation will be serviced right
+/// away by forwarding them to the RR instance. Any time there's no RR
+/// policy available (i.e., right after the creation of the gRPCLB policy),
+/// pick and ping requests are added to a list of pending picks and pings
+/// to be flushed and serviced when the RR policy instance becomes available.
+///
+/// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
+/// high level design and details.
-/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
- using that endpoint. Because of various transitive includes in uv.h,
- including windows.h on Windows, uv.h must be included before other system
- headers. Therefore, sockaddr.h must always be included first */
+// With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
+// using that endpoint. Because of various transitive includes in uv.h,
+// including windows.h on Windows, uv.h must be included before other system
+// headers. Therefore, sockaddr.h must always be included first.
#include "src/core/lib/iomgr/sockaddr.h"
#include <inttypes.h>
@@ -93,7 +73,6 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
-#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
@@ -108,6 +87,8 @@
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -127,336 +108,294 @@
#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
-grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb");
+namespace grpc_core {
-struct glb_lb_policy;
+TraceFlag grpc_lb_glb_trace(false, "glb");
namespace {
-/// Linked list of pending pick requests. It stores all information needed to
-/// eventually call (Round Robin's) pick() on them. They mainly stay pending
-/// waiting for the RR policy to be created.
-///
-/// Note that when a pick is sent to the RR policy, we inject our own
-/// on_complete callback, so that we can intercept the result before
-/// invoking the original on_complete callback. This allows us to set the
-/// LB token metadata and add client_stats to the call context.
-/// See \a pending_pick_complete() for details.
-struct pending_pick {
- // Our on_complete closure and the original one.
- grpc_closure on_complete;
- grpc_closure* original_on_complete;
- // The original pick.
- grpc_lb_policy_pick_state* pick;
- // Stats for client-side load reporting. Note that this holds a
- // reference, which must be either passed on via context or unreffed.
- grpc_grpclb_client_stats* client_stats;
- // The LB token associated with the pick. This is set via user_data in
- // the pick.
- grpc_mdelem lb_token;
- // The grpclb instance that created the wrapping. This instance is not owned,
- // reference counts are untouched. It's used only for logging purposes.
- glb_lb_policy* glb_policy;
- // Next pending pick.
- struct pending_pick* next;
-};
+class GrpcLb : public LoadBalancingPolicy {
+ public:
+ GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
+
+ void UpdateLocked(const grpc_channel_args& args) override;
+ bool PickLocked(PickState* pick) override;
+ void CancelPickLocked(PickState* pick, grpc_error* error) override;
+ void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error* error) override;
+ void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
+ grpc_closure* closure) override;
+ grpc_connectivity_state CheckConnectivityLocked(
+ grpc_error** connectivity_error) override;
+ void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
+ void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
+ void ExitIdleLocked() override;
+
+ private:
+ /// Linked list of pending pick requests. It stores all information needed to
+ /// eventually call (Round Robin's) pick() on them. They mainly stay pending
+ /// waiting for the RR policy to be created.
+ ///
+ /// Note that when a pick is sent to the RR policy, we inject our own
+ /// on_complete callback, so that we can intercept the result before
+ /// invoking the original on_complete callback. This allows us to set the
+ /// LB token metadata and add client_stats to the call context.
+ /// See \a pending_pick_complete() for details.
+ struct PendingPick {
+ // The grpclb instance that created the wrapping. This instance is not
+ // owned; reference counts are untouched. It's used only for logging
+ // purposes.
+ GrpcLb* grpclb_policy;
+ // The original pick.
+ PickState* pick;
+ // Our on_complete closure and the original one.
+ grpc_closure on_complete;
+ grpc_closure* original_on_complete;
+ // The LB token associated with the pick. This is set via user_data in
+ // the pick.
+ grpc_mdelem lb_token;
+ // Stats for client-side load reporting. Note that this holds a
+ // reference, which must be either passed on via context or unreffed.
+ grpc_grpclb_client_stats* client_stats = nullptr;
+ // Next pending pick.
+ PendingPick* next = nullptr;
+ };
+
+ /// A linked list of pending pings waiting for the RR policy to be created.
+ struct PendingPing {
+ grpc_closure* on_initiate;
+ grpc_closure* on_ack;
+ PendingPing* next = nullptr;
+ };
+
+ /// Contains a call to the LB server and all the data related to the call.
+ class BalancerCallState
+ : public InternallyRefCountedWithTracing<BalancerCallState> {
+ public:
+ explicit BalancerCallState(
+ RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
+
+ // It's the caller's responsibility to ensure that Orphan() is called from
+ // inside the combiner.
+ void Orphan() override;
+
+ void StartQuery();
+
+ grpc_grpclb_client_stats* client_stats() const { return client_stats_; }
+ bool seen_initial_response() const { return seen_initial_response_; }
+
+ private:
+ ~BalancerCallState();
+
+ GrpcLb* grpclb_policy() const {
+ return reinterpret_cast<GrpcLb*>(grpclb_policy_.get());
+ }
-/// A linked list of pending pings waiting for the RR policy to be created.
-struct pending_ping {
- grpc_closure* on_initiate;
- grpc_closure* on_ack;
- struct pending_ping* next;
+ void ScheduleNextClientLoadReportLocked();
+ void SendClientLoadReportLocked();
+
+ static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
+
+ static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
+ static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
+ static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
+ static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
+ static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
+
+ // The owning LB policy.
+ RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
+
+ // The streaming call to the LB server. Always non-NULL.
+ grpc_call* lb_call_ = nullptr;
+
+ // recv_initial_metadata
+ grpc_metadata_array lb_initial_metadata_recv_;
+
+ // send_message
+ grpc_byte_buffer* send_message_payload_ = nullptr;
+ grpc_closure lb_on_initial_request_sent_;
+
+ // recv_message
+ grpc_byte_buffer* recv_message_payload_ = nullptr;
+ grpc_closure lb_on_balancer_message_received_;
+ bool seen_initial_response_ = false;
+
+ // recv_trailing_metadata
+ grpc_closure lb_on_balancer_status_received_;
+ grpc_metadata_array lb_trailing_metadata_recv_;
+ grpc_status_code lb_call_status_;
+ grpc_slice lb_call_status_details_;
+
+ // The stats for client-side load reporting associated with this LB call.
+ // Created after the first serverlist is received.
+ grpc_grpclb_client_stats* client_stats_ = nullptr;
+ grpc_millis client_stats_report_interval_ = 0;
+ grpc_timer client_load_report_timer_;
+ bool client_load_report_timer_callback_pending_ = false;
+ bool last_client_load_report_counters_were_zero_ = false;
+ bool client_load_report_is_due_ = false;
+ // The closure used for either the load report timer or the callback for
+ // completion of sending the load report.
+ grpc_closure client_load_report_closure_;
+ };
+
+ ~GrpcLb();
+
+ void ShutdownLocked() override;
+
+ // Helper function used in ctor and UpdateLocked().
+ void ProcessChannelArgsLocked(const grpc_channel_args& args);
+
+ // Methods for dealing with the balancer channel and call.
+ void StartPickingLocked();
+ void StartBalancerCallLocked();
+ static void OnFallbackTimerLocked(void* arg, grpc_error* error);
+ void StartBalancerCallRetryTimerLocked();
+ static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
+ static void OnBalancerChannelConnectivityChangedLocked(void* arg,
+ grpc_error* error);
+
+ // Pending pick methods.
+ static void PendingPickSetMetadataAndContext(PendingPick* pp);
+ PendingPick* PendingPickCreate(PickState* pick);
+ void AddPendingPick(PendingPick* pp);
+ static void OnPendingPickComplete(void* arg, grpc_error* error);
+
+ // Pending ping methods.
+ void AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack);
+
+ // Methods for dealing with the RR policy.
+ void CreateOrUpdateRoundRobinPolicyLocked();
+ grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
+ void CreateRoundRobinPolicyLocked(const Args& args);
+ bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp);
+ void UpdateConnectivityStateFromRoundRobinPolicyLocked(
+ grpc_error* rr_state_error);
+ static void OnRoundRobinConnectivityChangedLocked(void* arg,
+ grpc_error* error);
+ static void OnRoundRobinRequestReresolutionLocked(void* arg,
+ grpc_error* error);
+
+ // Who the client is trying to communicate with.
+ const char* server_name_ = nullptr;
+
+ // Current channel args from the resolver.
+ grpc_channel_args* args_ = nullptr;
+
+ // Internal state.
+ bool started_picking_ = false;
+ bool shutting_down_ = false;
+ grpc_connectivity_state_tracker state_tracker_;
+
+ // The channel for communicating with the LB server.
+ grpc_channel* lb_channel_ = nullptr;
+ grpc_connectivity_state lb_channel_connectivity_;
+ grpc_closure lb_channel_on_connectivity_changed_;
+ // Are we already watching the LB channel's connectivity?
+ bool watching_lb_channel_ = false;
+ // Response generator to inject address updates into lb_channel_.
+ RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
+
+ // The data associated with the current LB call. It holds a ref to this LB
+ // policy. It's initialized every time we query for backends. It's reset to
+ // NULL whenever the current LB call is no longer needed (e.g., the LB policy
+ // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
+ // contains a non-NULL lb_call_.
+ OrphanablePtr<BalancerCallState> lb_calld_;
+ // Timeout in milliseconds for the LB call. 0 means no deadline.
+ int lb_call_timeout_ms_ = 0;
+ // Balancer call retry state.
+ BackOff lb_call_backoff_;
+ bool retry_timer_callback_pending_ = false;
+ grpc_timer lb_call_retry_timer_;
+ grpc_closure lb_on_call_retry_;
+
+ // The deserialized response from the balancer. May be nullptr until one
+ // such response has arrived.
+ grpc_grpclb_serverlist* serverlist_ = nullptr;
+ // Index into serverlist for next pick.
+ // If the server at this index is a drop, we return a drop.
+ // Otherwise, we delegate to the RR policy.
+ size_t serverlist_index_ = 0;
+
+ // Timeout in milliseconds for before using fallback backend addresses.
+ // 0 means not using fallback.
+ int lb_fallback_timeout_ms_ = 0;
+ // The backend addresses from the resolver.
+ grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
+ // Fallback timer.
+ bool fallback_timer_callback_pending_ = false;
+ grpc_timer lb_fallback_timer_;
+ grpc_closure lb_on_fallback_;
+
+ // Pending picks and pings that are waiting on the RR policy's connectivity.
+ PendingPick* pending_picks_ = nullptr;
+ PendingPing* pending_pings_ = nullptr;
+
+ // The RR policy to use for the backends.
+ OrphanablePtr<LoadBalancingPolicy> rr_policy_;
+ grpc_connectivity_state rr_connectivity_state_;
+ grpc_closure on_rr_connectivity_changed_;
+ grpc_closure on_rr_request_reresolution_;
};
-} // namespace
-
-typedef struct glb_lb_call_data {
- struct glb_lb_policy* glb_policy;
- // TODO(juanlishen): c++ize this struct.
- gpr_refcount refs;
-
- /** The streaming call to the LB server. Always non-NULL. */
- grpc_call* lb_call;
-
- /** The initial metadata received from the LB server. */
- grpc_metadata_array lb_initial_metadata_recv;
-
- /** The message sent to the LB server. It's used to query for backends (the
- * value may vary if the LB server indicates a redirect) or send client load
- * report. */
- grpc_byte_buffer* send_message_payload;
- /** The callback after the initial request is sent. */
- grpc_closure lb_on_sent_initial_request;
-
- /** The response received from the LB server, if any. */
- grpc_byte_buffer* recv_message_payload;
- /** The callback to process the response received from the LB server. */
- grpc_closure lb_on_response_received;
- bool seen_initial_response;
-
- /** The callback to process the status received from the LB server, which
- * signals the end of the LB call. */
- grpc_closure lb_on_server_status_received;
- /** The trailing metadata from the LB server. */
- grpc_metadata_array lb_trailing_metadata_recv;
- /** The call status code and details. */
- grpc_status_code lb_call_status;
- grpc_slice lb_call_status_details;
-
- /** The stats for client-side load reporting associated with this LB call.
- * Created after the first serverlist is received. */
- grpc_grpclb_client_stats* client_stats;
- /** The interval and timer for next client load report. */
- grpc_millis client_stats_report_interval;
- grpc_timer client_load_report_timer;
- bool client_load_report_timer_callback_pending;
- bool last_client_load_report_counters_were_zero;
- bool client_load_report_is_due;
- /** The closure used for either the load report timer or the callback for
- * completion of sending the load report. */
- grpc_closure client_load_report_closure;
-} glb_lb_call_data;
-
-typedef struct glb_lb_policy {
- /** Base policy: must be first. */
- grpc_lb_policy base;
-
- /** Who the client is trying to communicate with. */
- const char* server_name;
-
- /** Channel related data that will be propagated to the internal RR policy. */
- grpc_client_channel_factory* cc_factory;
- grpc_channel_args* args;
-
- /** Timeout in milliseconds for before using fallback backend addresses.
- * 0 means not using fallback. */
- int lb_fallback_timeout_ms;
-
- /** The channel for communicating with the LB server. */
- grpc_channel* lb_channel;
-
- /** The data associated with the current LB call. It holds a ref to this LB
- * policy. It's initialized every time we query for backends. It's reset to
- * NULL whenever the current LB call is no longer needed (e.g., the LB policy
- * is shutting down, or the LB call has ended). A non-NULL lb_calld always
- * contains a non-NULL lb_call. */
- glb_lb_call_data* lb_calld;
-
- /** response generator to inject address updates into \a lb_channel */
- grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
- response_generator;
-
- /** the RR policy to use of the backend servers returned by the LB server */
- grpc_lb_policy* rr_policy;
-
- /** the connectivity state of the embedded RR policy */
- grpc_connectivity_state rr_connectivity_state;
-
- bool started_picking;
-
- /** our connectivity state tracker */
- grpc_connectivity_state_tracker state_tracker;
-
- /** connectivity state of the LB channel */
- grpc_connectivity_state lb_channel_connectivity;
-
- /** stores the deserialized response from the LB. May be nullptr until one
- * such response has arrived. */
- grpc_grpclb_serverlist* serverlist;
-
- /** Index into serverlist for next pick.
- * If the server at this index is a drop, we return a drop.
- * Otherwise, we delegate to the RR policy. */
- size_t serverlist_index;
-
- /** stores the backend addresses from the resolver */
- grpc_lb_addresses* fallback_backend_addresses;
-
- /** list of picks that are waiting on RR's policy connectivity */
- pending_pick* pending_picks;
-
- /** list of pings that are waiting on RR's policy connectivity */
- pending_ping* pending_pings;
-
- bool shutting_down;
-
- /** are we already watching the LB channel's connectivity? */
- bool watching_lb_channel;
-
- /** is the callback associated with \a lb_call_retry_timer pending? */
- bool retry_timer_callback_pending;
-
- /** is the callback associated with \a lb_fallback_timer pending? */
- bool fallback_timer_callback_pending;
-
- /** called upon changes to the LB channel's connectivity. */
- grpc_closure lb_channel_on_connectivity_changed;
-
- /** called upon changes to the RR's connectivity. */
- grpc_closure rr_on_connectivity_changed;
-
- /** called upon reresolution request from the RR policy. */
- grpc_closure rr_on_reresolution_requested;
-
- /************************************************************/
- /* client data associated with the LB server communication */
- /************************************************************/
-
- /** LB call retry backoff state */
- grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff;
-
- /** timeout in milliseconds for the LB call. 0 means no deadline. */
- int lb_call_timeout_ms;
-
- /** LB call retry timer */
- grpc_timer lb_call_retry_timer;
- /** LB call retry timer callback */
- grpc_closure lb_on_call_retry;
-
- /** LB fallback timer */
- grpc_timer lb_fallback_timer;
- /** LB fallback timer callback */
- grpc_closure lb_on_fallback;
-} glb_lb_policy;
-
-static void glb_lb_call_data_ref(glb_lb_call_data* lb_calld,
- const char* reason) {
- gpr_ref_non_zero(&lb_calld->refs);
- if (grpc_lb_glb_trace.enabled()) {
- const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count);
- gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p REF %lu->%lu (%s)",
- grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld,
- static_cast<unsigned long>(count - 1),
- static_cast<unsigned long>(count), reason);
- }
-}
+//
+// serverlist parsing code
+//
-static void glb_lb_call_data_unref(glb_lb_call_data* lb_calld,
- const char* reason) {
- const bool done = gpr_unref(&lb_calld->refs);
- if (grpc_lb_glb_trace.enabled()) {
- const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count);
- gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p UNREF %lu->%lu (%s)",
- grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld,
- static_cast<unsigned long>(count + 1),
- static_cast<unsigned long>(count), reason);
- }
- if (done) {
- GPR_ASSERT(lb_calld->lb_call != nullptr);
- grpc_call_unref(lb_calld->lb_call);
- grpc_metadata_array_destroy(&lb_calld->lb_initial_metadata_recv);
- grpc_metadata_array_destroy(&lb_calld->lb_trailing_metadata_recv);
- grpc_byte_buffer_destroy(lb_calld->send_message_payload);
- grpc_byte_buffer_destroy(lb_calld->recv_message_payload);
- grpc_slice_unref_internal(lb_calld->lb_call_status_details);
- if (lb_calld->client_stats != nullptr) {
- grpc_grpclb_client_stats_unref(lb_calld->client_stats);
- }
- GRPC_LB_POLICY_UNREF(&lb_calld->glb_policy->base, "lb_calld");
- gpr_free(lb_calld);
- }
+// vtable for LB tokens in grpc_lb_addresses
+void* lb_token_copy(void* token) {
+ return token == nullptr
+ ? nullptr
+ : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
}
-
-static void lb_call_data_shutdown(glb_lb_policy* glb_policy) {
- GPR_ASSERT(glb_policy->lb_calld != nullptr);
- GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr);
- // lb_on_server_status_received will complete the cancellation and clean up.
- grpc_call_cancel(glb_policy->lb_calld->lb_call, nullptr);
- if (glb_policy->lb_calld->client_load_report_timer_callback_pending) {
- grpc_timer_cancel(&glb_policy->lb_calld->client_load_report_timer);
+void lb_token_destroy(void* token) {
+ if (token != nullptr) {
+ GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
}
- glb_policy->lb_calld = nullptr;
}
-
-/* add lb_token of selected subchannel (address) to the call's initial
- * metadata */
-static grpc_error* initial_metadata_add_lb_token(
- grpc_metadata_batch* initial_metadata,
- grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) {
- GPR_ASSERT(lb_token_mdelem_storage != nullptr);
- GPR_ASSERT(!GRPC_MDISNULL(lb_token));
- return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
- lb_token);
-}
-
-static void destroy_client_stats(void* arg) {
- grpc_grpclb_client_stats_unref(static_cast<grpc_grpclb_client_stats*>(arg));
+int lb_token_cmp(void* token1, void* token2) {
+ if (token1 > token2) return 1;
+ if (token1 < token2) return -1;
+ return 0;
}
+const grpc_lb_user_data_vtable lb_token_vtable = {
+ lb_token_copy, lb_token_destroy, lb_token_cmp};
-static void pending_pick_set_metadata_and_context(pending_pick* pp) {
- /* if connected_subchannel is nullptr, no pick has been made by the RR
- * policy (e.g., all addresses failed to connect). There won't be any
- * user_data/token available */
- if (pp->pick->connected_subchannel != nullptr) {
- if (!GRPC_MDISNULL(pp->lb_token)) {
- initial_metadata_add_lb_token(pp->pick->initial_metadata,
- &pp->pick->lb_token_mdelem_storage,
- GRPC_MDELEM_REF(pp->lb_token));
- } else {
- gpr_log(GPR_ERROR,
- "[grpclb %p] No LB token for connected subchannel pick %p",
- pp->glb_policy, pp->pick);
- abort();
- }
- // Pass on client stats via context. Passes ownership of the reference.
- if (pp->client_stats != nullptr) {
- pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
- pp->client_stats;
- pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
- destroy_client_stats;
- }
- } else {
- if (pp->client_stats != nullptr) {
- grpc_grpclb_client_stats_unref(pp->client_stats);
+// Returns the backend addresses extracted from the given addresses.
+grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
+ // First pass: count the number of backend addresses.
+ size_t num_backends = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) {
+ ++num_backends;
}
}
+ // Second pass: actually populate the addresses and (empty) LB tokens.
+ grpc_lb_addresses* backend_addresses =
+ grpc_lb_addresses_create(num_backends, &lb_token_vtable);
+ size_t num_copied = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) continue;
+ const grpc_resolved_address* addr = &addresses->addresses[i].address;
+ grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
+ addr->len, false /* is_balancer */,
+ nullptr /* balancer_name */,
+ (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
+ ++num_copied;
+ }
+ return backend_addresses;
}
-/* The \a on_complete closure passed as part of the pick requires keeping a
- * reference to its associated round robin instance. We wrap this closure in
- * order to unref the round robin instance upon its invocation */
-static void pending_pick_complete(void* arg, grpc_error* error) {
- pending_pick* pp = static_cast<pending_pick*>(arg);
- pending_pick_set_metadata_and_context(pp);
- GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
- gpr_free(pp);
-}
-
-static pending_pick* pending_pick_create(glb_lb_policy* glb_policy,
- grpc_lb_policy_pick_state* pick) {
- pending_pick* pp = static_cast<pending_pick*>(gpr_zalloc(sizeof(*pp)));
- pp->pick = pick;
- pp->glb_policy = glb_policy;
- GRPC_CLOSURE_INIT(&pp->on_complete, pending_pick_complete, pp,
- grpc_schedule_on_exec_ctx);
- pp->original_on_complete = pick->on_complete;
- pp->pick->on_complete = &pp->on_complete;
- return pp;
-}
-
-static void pending_pick_add(pending_pick** root, pending_pick* new_pp) {
- new_pp->next = *root;
- *root = new_pp;
-}
-
-static void pending_ping_add(pending_ping** root, grpc_closure* on_initiate,
- grpc_closure* on_ack) {
- pending_ping* pping = static_cast<pending_ping*>(gpr_zalloc(sizeof(*pping)));
- pping->on_initiate = on_initiate;
- pping->on_ack = on_ack;
- pping->next = *root;
- *root = pping;
-}
-
-static bool is_server_valid(const grpc_grpclb_server* server, size_t idx,
- bool log) {
+bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
if (server->drop) return false;
const grpc_grpclb_ip_address* ip = &server->ip_address;
if (server->port >> 16 != 0) {
if (log) {
gpr_log(GPR_ERROR,
"Invalid port '%d' at index %lu of serverlist. Ignoring.",
- server->port, static_cast<unsigned long>(idx));
+ server->port, (unsigned long)idx);
}
return false;
}
@@ -465,65 +404,43 @@ static bool is_server_valid(const grpc_grpclb_server* server, size_t idx,
gpr_log(GPR_ERROR,
"Expected IP to be 4 or 16 bytes, got %d at index %lu of "
"serverlist. Ignoring",
- ip->size, static_cast<unsigned long>(idx));
+ ip->size, (unsigned long)idx);
}
return false;
}
return true;
}
-/* vtable for LB tokens in grpc_lb_addresses. */
-static void* lb_token_copy(void* token) {
- return token == nullptr
- ? nullptr
- : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
-}
-static void lb_token_destroy(void* token) {
- if (token != nullptr) {
- GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
- }
-}
-static int lb_token_cmp(void* token1, void* token2) {
- if (token1 > token2) return 1;
- if (token1 < token2) return -1;
- return 0;
-}
-static const grpc_lb_user_data_vtable lb_token_vtable = {
- lb_token_copy, lb_token_destroy, lb_token_cmp};
-
-static void parse_server(const grpc_grpclb_server* server,
- grpc_resolved_address* addr) {
+void ParseServer(const grpc_grpclb_server* server,
+ grpc_resolved_address* addr) {
memset(addr, 0, sizeof(*addr));
if (server->drop) return;
- const uint16_t netorder_port = htons(static_cast<uint16_t>(server->port));
+ const uint16_t netorder_port = htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
const grpc_grpclb_ip_address* ip = &server->ip_address;
if (ip->size == 4) {
addr->len = sizeof(struct sockaddr_in);
- struct sockaddr_in* addr4 =
- reinterpret_cast<struct sockaddr_in*>(&addr->addr);
+ struct sockaddr_in* addr4 = (struct sockaddr_in*)&addr->addr;
addr4->sin_family = AF_INET;
memcpy(&addr4->sin_addr, ip->bytes, ip->size);
addr4->sin_port = netorder_port;
} else if (ip->size == 16) {
addr->len = sizeof(struct sockaddr_in6);
- struct sockaddr_in6* addr6 =
- reinterpret_cast<struct sockaddr_in6*>(&addr->addr);
+ struct sockaddr_in6* addr6 = (struct sockaddr_in6*)&addr->addr;
addr6->sin6_family = AF_INET6;
memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
addr6->sin6_port = netorder_port;
}
}
-/* Returns addresses extracted from \a serverlist. */
-static grpc_lb_addresses* process_serverlist_locked(
- const grpc_grpclb_serverlist* serverlist) {
+// Returns addresses extracted from \a serverlist.
+grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
size_t num_valid = 0;
/* first pass: count how many are valid in order to allocate the necessary
* memory in a single block */
for (size_t i = 0; i < serverlist->num_servers; ++i) {
- if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
+ if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
}
grpc_lb_addresses* lb_addresses =
grpc_lb_addresses_create(num_valid, &lb_token_vtable);
@@ -535,11 +452,11 @@ static grpc_lb_addresses* process_serverlist_locked(
size_t addr_idx = 0;
for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
const grpc_grpclb_server* server = serverlist->servers[sl_idx];
- if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
+ if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
GPR_ASSERT(addr_idx < num_valid);
/* address processing */
grpc_resolved_address addr;
- parse_server(server, &addr);
+ ParseServer(server, &addr);
/* lb token processing */
void* user_data;
if (server->has_load_balance_token) {
@@ -570,317 +487,448 @@ static grpc_lb_addresses* process_serverlist_locked(
return lb_addresses;
}
-/* Returns the backend addresses extracted from the given addresses */
-static grpc_lb_addresses* extract_backend_addresses_locked(
- const grpc_lb_addresses* addresses) {
- /* first pass: count the number of backend addresses */
- size_t num_backends = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (!addresses->addresses[i].is_balancer) {
- ++num_backends;
- }
- }
- /* second pass: actually populate the addresses and (empty) LB tokens */
- grpc_lb_addresses* backend_addresses =
- grpc_lb_addresses_create(num_backends, &lb_token_vtable);
- size_t num_copied = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) continue;
- const grpc_resolved_address* addr = &addresses->addresses[i].address;
- grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
- addr->len, false /* is_balancer */,
- nullptr /* balancer_name */,
- (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
- ++num_copied;
- }
- return backend_addresses;
+//
+// GrpcLb::BalancerCallState
+//
+
+GrpcLb::BalancerCallState::BalancerCallState(
+ RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
+ : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_glb_trace),
+ grpclb_policy_(std::move(parent_grpclb_policy)) {
+ GPR_ASSERT(grpclb_policy_ != nullptr);
+ GPR_ASSERT(!grpclb_policy()->shutting_down_);
+ // Init the LB call. Note that the LB call will progress every time there's
+ // activity in grpclb_policy_->interested_parties(), which is comprised of
+ // the polling entities from client_channel.
+ GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
+ GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
+ grpc_slice host =
+ grpc_slice_from_copied_string(grpclb_policy()->server_name_);
+ grpc_millis deadline =
+ grpclb_policy()->lb_call_timeout_ms_ == 0
+ ? GRPC_MILLIS_INF_FUTURE
+ : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
+ lb_call_ = grpc_channel_create_pollset_set_call(
+ grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
+ grpclb_policy_->interested_parties(),
+ GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
+ &host, deadline, nullptr);
+ grpc_slice_unref_internal(host);
+ // Init the LB call request payload.
+ grpc_grpclb_request* request =
+ grpc_grpclb_request_create(grpclb_policy()->server_name_);
+ grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
+ send_message_payload_ =
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+ grpc_slice_unref_internal(request_payload_slice);
+ grpc_grpclb_request_destroy(request);
+ // Init other data associated with the LB call.
+ grpc_metadata_array_init(&lb_initial_metadata_recv_);
+ grpc_metadata_array_init(&lb_trailing_metadata_recv_);
+ GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
+ this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
+ GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
+ OnBalancerMessageReceivedLocked, this,
+ grpc_combiner_scheduler(grpclb_policy()->combiner()));
+ GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
+ OnBalancerStatusReceivedLocked, this,
+ grpc_combiner_scheduler(grpclb_policy()->combiner()));
}
-static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy,
- grpc_error* rr_state_error) {
- const grpc_connectivity_state curr_glb_state =
- grpc_connectivity_state_check(&glb_policy->state_tracker);
- /* The new connectivity status is a function of the previous one and the new
- * input coming from the status of the RR policy.
- *
- * current state (grpclb's)
- * |
- * v || I | C | R | TF | SD | <- new state (RR's)
- * ===++====+=====+=====+======+======+
- * I || I | C | R | [I] | [I] |
- * ---++----+-----+-----+------+------+
- * C || I | C | R | [C] | [C] |
- * ---++----+-----+-----+------+------+
- * R || I | C | R | [R] | [R] |
- * ---++----+-----+-----+------+------+
- * TF || I | C | R | [TF] | [TF] |
- * ---++----+-----+-----+------+------+
- * SD || NA | NA | NA | NA | NA | (*)
- * ---++----+-----+-----+------+------+
- *
- * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
- * is the current state of grpclb, which is left untouched.
- *
- * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
- * the previous RR instance.
- *
- * Note that the status is never updated to SHUTDOWN as a result of calling
- * this function. Only glb_shutdown() has the power to set that state.
- *
- * (*) This function mustn't be called during shutting down. */
- GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
- switch (glb_policy->rr_connectivity_state) {
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- case GRPC_CHANNEL_SHUTDOWN:
- GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
- break;
- case GRPC_CHANNEL_IDLE:
- case GRPC_CHANNEL_CONNECTING:
- case GRPC_CHANNEL_READY:
- GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
+GrpcLb::BalancerCallState::~BalancerCallState() {
+ GPR_ASSERT(lb_call_ != nullptr);
+ grpc_call_unref(lb_call_);
+ grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
+ grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
+ grpc_byte_buffer_destroy(send_message_payload_);
+ grpc_byte_buffer_destroy(recv_message_payload_);
+ grpc_slice_unref_internal(lb_call_status_details_);
+ if (client_stats_ != nullptr) {
+ grpc_grpclb_client_stats_unref(client_stats_);
}
+}
+
+void GrpcLb::BalancerCallState::Orphan() {
+ GPR_ASSERT(lb_call_ != nullptr);
+ // If we are here because grpclb_policy wants to cancel the call,
+ // lb_on_balancer_status_received_ will complete the cancellation and clean
+ // up. Otherwise, we are here because grpclb_policy has to orphan a failed
+ // call, then the following cancellation will be a no-op.
+ grpc_call_cancel(lb_call_, nullptr);
+ if (client_load_report_timer_callback_pending_) {
+ grpc_timer_cancel(&client_load_report_timer_);
+ }
+ // Note that the initial ref is hold by lb_on_balancer_status_received_
+ // instead of the caller of this function. So the corresponding unref happens
+ // in lb_on_balancer_status_received_ instead of here.
+}
+
+void GrpcLb::BalancerCallState::StartQuery() {
+ GPR_ASSERT(lb_call_ != nullptr);
if (grpc_lb_glb_trace.enabled()) {
- gpr_log(
- GPR_INFO,
- "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
- glb_policy,
- grpc_connectivity_state_name(glb_policy->rr_connectivity_state),
- glb_policy->rr_policy);
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)",
+ grpclb_policy_.get(), this, lb_call_);
}
- grpc_connectivity_state_set(&glb_policy->state_tracker,
- glb_policy->rr_connectivity_state, rr_state_error,
- "update_lb_connectivity_status_locked");
+ // Create the ops.
+ grpc_call_error call_error;
+ grpc_op ops[3];
+ memset(ops, 0, sizeof(ops));
+ // Op: send initial metadata.
+ grpc_op* op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ // Op: send request message.
+ GPR_ASSERT(send_message_payload_ != nullptr);
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message.send_message = send_message_payload_;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
+ self.release();
+ call_error = grpc_call_start_batch_and_execute(
+ lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ // Op: recv initial metadata.
+ op = ops;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata.recv_initial_metadata =
+ &lb_initial_metadata_recv_;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ // Op: recv response.
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message.recv_message = &recv_message_payload_;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ self = Ref(DEBUG_LOCATION, "on_message_received");
+ self.release();
+ call_error = grpc_call_start_batch_and_execute(
+ lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ // Op: recv server status.
+ op = ops;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata =
+ &lb_trailing_metadata_recv_;
+ op->data.recv_status_on_client.status = &lb_call_status_;
+ op->data.recv_status_on_client.status_details = &lb_call_status_details_;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ // This callback signals the end of the LB call, so it relies on the initial
+ // ref instead of a new ref. When it's invoked, it's the initial ref that is
+ // unreffed.
+ call_error = grpc_call_start_batch_and_execute(
+ lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+};
+
+void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
+ const grpc_millis next_client_load_report_time =
+ ExecCtx::Get()->Now() + client_stats_report_interval_;
+ GRPC_CLOSURE_INIT(&client_load_report_closure_,
+ MaybeSendClientLoadReportLocked, this,
+ grpc_combiner_scheduler(grpclb_policy()->combiner()));
+ grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
+ &client_load_report_closure_);
+ client_load_report_timer_callback_pending_ = true;
}
-/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
- * immediately (ignoring its completion callback), we need to perform the
- * cleanups this callback would otherwise be responsible for.
- * If \a force_async is true, then we will manually schedule the
- * completion callback even if the pick is available immediately. */
-static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
- bool force_async, pending_pick* pp) {
- // Check for drops if we are not using fallback backend addresses.
- if (glb_policy->serverlist != nullptr) {
- // Look at the index into the serverlist to see if we should drop this call.
- grpc_grpclb_server* server =
- glb_policy->serverlist->servers[glb_policy->serverlist_index++];
- if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
- glb_policy->serverlist_index = 0; // Wrap-around.
- }
- if (server->drop) {
- // Update client load reporting stats to indicate the number of
- // dropped calls. Note that we have to do this here instead of in
- // the client_load_reporting filter, because we do not create a
- // subchannel call (and therefore no client_load_reporting filter)
- // for dropped calls.
- if (glb_policy->lb_calld != nullptr &&
- glb_policy->lb_calld->client_stats != nullptr) {
- grpc_grpclb_client_stats_add_call_dropped_locked(
- server->load_balance_token, glb_policy->lb_calld->client_stats);
- }
- if (force_async) {
- GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
- gpr_free(pp);
- return false;
- }
- gpr_free(pp);
- return true;
- }
- }
- // Set client_stats and user_data.
- if (glb_policy->lb_calld != nullptr &&
- glb_policy->lb_calld->client_stats != nullptr) {
- pp->client_stats =
- grpc_grpclb_client_stats_ref(glb_policy->lb_calld->client_stats);
+void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
+ void* arg, grpc_error* error) {
+ BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg);
+ GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
+ lb_calld->client_load_report_timer_callback_pending_ = false;
+ if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
+ lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
+ return;
}
- GPR_ASSERT(pp->pick->user_data == nullptr);
- pp->pick->user_data = reinterpret_cast<void**>(&pp->lb_token);
- // Pick via the RR policy.
- bool pick_done = grpc_lb_policy_pick_locked(glb_policy->rr_policy, pp->pick);
- if (pick_done) {
- pending_pick_set_metadata_and_context(pp);
- if (force_async) {
- GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
- pick_done = false;
- }
- gpr_free(pp);
+ // If we've already sent the initial request, then we can go ahead and send
+ // the load report. Otherwise, we need to wait until the initial request has
+ // been sent to send this (see OnInitialRequestSentLocked()).
+ if (lb_calld->send_message_payload_ == nullptr) {
+ lb_calld->SendClientLoadReportLocked();
+ } else {
+ lb_calld->client_load_report_is_due_ = true;
}
- /* else, the pending pick will be registered and taken care of by the
- * pending pick list inside the RR policy (glb_policy->rr_policy).
- * Eventually, wrapped_on_complete will be called, which will -among other
- * things- add the LB token to the call's initial metadata */
- return pick_done;
}
-static grpc_lb_policy_args* lb_policy_args_create(glb_lb_policy* glb_policy) {
- grpc_lb_addresses* addresses;
- if (glb_policy->serverlist != nullptr) {
- GPR_ASSERT(glb_policy->serverlist->num_servers > 0);
- addresses = process_serverlist_locked(glb_policy->serverlist);
- } else {
- // If rr_handover_locked() is invoked when we haven't received any
- // serverlist from the balancer, we use the fallback backends returned by
- // the resolver. Note that the fallback backend list may be empty, in which
- // case the new round_robin policy will keep the requested picks pending.
- GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
- addresses = grpc_lb_addresses_copy(glb_policy->fallback_backend_addresses);
- }
- GPR_ASSERT(addresses != nullptr);
- grpc_lb_policy_args* args =
- static_cast<grpc_lb_policy_args*>(gpr_zalloc(sizeof(*args)));
- args->client_channel_factory = glb_policy->cc_factory;
- args->combiner = glb_policy->base.combiner;
- // Replace the LB addresses in the channel args that we pass down to
- // the subchannel.
- static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
- const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
- args->args = grpc_channel_args_copy_and_add_and_remove(
- glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
- 1);
- grpc_lb_addresses_destroy(addresses);
- return args;
+bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
+ grpc_grpclb_request* request) {
+ grpc_grpclb_dropped_call_counts* drop_entries =
+ static_cast<grpc_grpclb_dropped_call_counts*>(
+ request->client_stats.calls_finished_with_drop.arg);
+ return request->client_stats.num_calls_started == 0 &&
+ request->client_stats.num_calls_finished == 0 &&
+ request->client_stats.num_calls_finished_with_client_failed_to_send ==
+ 0 &&
+ request->client_stats.num_calls_finished_known_received == 0 &&
+ (drop_entries == nullptr || drop_entries->num_entries == 0);
}
-static void lb_policy_args_destroy(grpc_lb_policy_args* args) {
- grpc_channel_args_destroy(args->args);
- gpr_free(args);
+void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
+ // Construct message payload.
+ GPR_ASSERT(send_message_payload_ == nullptr);
+ grpc_grpclb_request* request =
+ grpc_grpclb_load_report_request_create_locked(client_stats_);
+ // Skip client load report if the counters were all zero in the last
+ // report and they are still zero in this one.
+ if (LoadReportCountersAreZero(request)) {
+ if (last_client_load_report_counters_were_zero_) {
+ grpc_grpclb_request_destroy(request);
+ ScheduleNextClientLoadReportLocked();
+ return;
+ }
+ last_client_load_report_counters_were_zero_ = true;
+ } else {
+ last_client_load_report_counters_were_zero_ = false;
+ }
+ grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
+ send_message_payload_ =
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+ grpc_slice_unref_internal(request_payload_slice);
+ grpc_grpclb_request_destroy(request);
+ // Send the report.
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_SEND_MESSAGE;
+ op.data.send_message.send_message = send_message_payload_;
+ GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
+ this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ lb_call_, &op, 1, &client_load_report_closure_);
+ if (call_error != GRPC_CALL_OK) {
+ gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(),
+ call_error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ }
}
-static void rr_on_reresolution_requested_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- if (glb_policy->shutting_down || error != GRPC_ERROR_NONE) {
- GRPC_LB_POLICY_UNREF(&glb_policy->base,
- "rr_on_reresolution_requested_locked");
+void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
+ grpc_error* error) {
+ BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg);
+ GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
+ grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
+ lb_calld->send_message_payload_ = nullptr;
+ if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
+ lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
return;
}
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(
- GPR_DEBUG,
- "[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
- glb_policy, glb_policy->rr_policy);
- }
- // If we are talking to a balancer, we expect to get updated addresses form
- // the balancer, so we can ignore the re-resolution request from the RR
- // policy. Otherwise, handle the re-resolution request using glb's original
- // re-resolution closure.
- if (glb_policy->lb_calld == nullptr ||
- !glb_policy->lb_calld->seen_initial_response) {
- grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace,
- GRPC_ERROR_NONE);
+ lb_calld->ScheduleNextClientLoadReportLocked();
+}
+
+void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
+ grpc_error* error) {
+ BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg);
+ grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
+ lb_calld->send_message_payload_ = nullptr;
+ // If we attempted to send a client load report before the initial request was
+ // sent (and this lb_calld is still in use), send the load report now.
+ if (lb_calld->client_load_report_is_due_ &&
+ lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
+ lb_calld->SendClientLoadReportLocked();
+ lb_calld->client_load_report_is_due_ = false;
}
- // Give back the wrapper closure to the RR policy.
- grpc_lb_policy_set_reresolve_closure_locked(
- glb_policy->rr_policy, &glb_policy->rr_on_reresolution_requested);
+ lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
}
-static void create_rr_locked(glb_lb_policy* glb_policy,
- grpc_lb_policy_args* args) {
- GPR_ASSERT(glb_policy->rr_policy == nullptr);
- grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args);
- if (new_rr_policy == nullptr) {
- gpr_log(GPR_ERROR,
- "[grpclb %p] Failure creating a RoundRobin policy for serverlist "
- "update with %" PRIuPTR
- " entries. The previous RR instance (%p), if any, will continue to "
- "be used. Future updates from the LB will attempt to create new "
- "instances.",
- glb_policy, glb_policy->serverlist->num_servers,
- glb_policy->rr_policy);
+void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
+ void* arg, grpc_error* error) {
+ BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg);
+ GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
+ // Empty payload means the LB call was cancelled.
+ if (lb_calld != grpclb_policy->lb_calld_.get() ||
+ lb_calld->recv_message_payload_ == nullptr) {
+ lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
return;
}
- GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_reresolution_requested_locked");
- grpc_lb_policy_set_reresolve_closure_locked(
- new_rr_policy, &glb_policy->rr_on_reresolution_requested);
- glb_policy->rr_policy = new_rr_policy;
- grpc_error* rr_state_error = nullptr;
- glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked(
- glb_policy->rr_policy, &rr_state_error);
- /* Connectivity state is a function of the RR policy updated/created */
- update_lb_connectivity_status_locked(glb_policy, rr_state_error);
- /* Add the gRPC LB's interested_parties pollset_set to that of the newly
- * created RR policy. This will make the RR policy progress upon activity on
- * gRPC LB, which in turn is tied to the application's call */
- grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties,
- glb_policy->base.interested_parties);
- /* Subscribe to changes to the connectivity of the new RR */
- GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_connectivity_changed_locked");
- grpc_lb_policy_notify_on_state_change_locked(
- glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
- &glb_policy->rr_on_connectivity_changed);
- grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy);
- // Send pending picks to RR policy.
- pending_pick* pp;
- while ((pp = glb_policy->pending_picks)) {
- glb_policy->pending_picks = pp->next;
- if (grpc_lb_glb_trace.enabled()) {
+ grpc_byte_buffer_reader bbr;
+ grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
+ grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
+ grpc_byte_buffer_reader_destroy(&bbr);
+ grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
+ lb_calld->recv_message_payload_ = nullptr;
+ grpc_grpclb_initial_response* initial_response;
+ grpc_grpclb_serverlist* serverlist;
+ if (!lb_calld->seen_initial_response_ &&
+ (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
+ nullptr) {
+ // Have NOT seen initial response, look for initial response.
+ if (initial_response->has_client_stats_report_interval) {
+ lb_calld->client_stats_report_interval_ = GPR_MAX(
+ GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
+ &initial_response->client_stats_report_interval));
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Received initial LB response message; "
+ "client load reporting interval = %" PRIdPTR " milliseconds",
+ grpclb_policy, lb_calld->client_stats_report_interval_);
+ }
+ } else if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Pending pick about to (async) PICK from RR %p",
- glb_policy, glb_policy->rr_policy);
+ "[grpclb %p] Received initial LB response message; client load "
+ "reporting NOT enabled",
+ grpclb_policy);
}
- pick_from_internal_rr_locked(glb_policy, true /* force_async */, pp);
- }
- // Send pending pings to RR policy.
- pending_ping* pping;
- while ((pping = glb_policy->pending_pings)) {
- glb_policy->pending_pings = pping->next;
+ grpc_grpclb_initial_response_destroy(initial_response);
+ lb_calld->seen_initial_response_ = true;
+ } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
+ response_slice)) != nullptr) {
+ // Have seen initial response, look for serverlist.
+ GPR_ASSERT(lb_calld->lb_call_ != nullptr);
if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
- glb_policy, glb_policy->rr_policy);
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
+ grpclb_policy, serverlist->num_servers);
+ for (size_t i = 0; i < serverlist->num_servers; ++i) {
+ grpc_resolved_address addr;
+ ParseServer(serverlist->servers[i], &addr);
+ char* ipport;
+ grpc_sockaddr_to_string(&ipport, &addr, false);
+ gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
+ grpclb_policy, i, ipport);
+ gpr_free(ipport);
+ }
}
- grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, pping->on_initiate,
- pping->on_ack);
- gpr_free(pping);
- }
-}
-
-/* glb_policy->rr_policy may be nullptr (initial handover) */
-static void rr_handover_locked(glb_lb_policy* glb_policy) {
- if (glb_policy->shutting_down) return;
- grpc_lb_policy_args* args = lb_policy_args_create(glb_policy);
- GPR_ASSERT(args != nullptr);
- if (glb_policy->rr_policy != nullptr) {
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", glb_policy,
- glb_policy->rr_policy);
+ /* update serverlist */
+ if (serverlist->num_servers > 0) {
+ // Start sending client load report only after we start using the
+ // serverlist returned from the current LB call.
+ if (lb_calld->client_stats_report_interval_ > 0 &&
+ lb_calld->client_stats_ == nullptr) {
+ lb_calld->client_stats_ = grpc_grpclb_client_stats_create();
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
+ self.release();
+ lb_calld->ScheduleNextClientLoadReportLocked();
+ }
+ if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_,
+ serverlist)) {
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Incoming server list identical to current, "
+ "ignoring.",
+ grpclb_policy);
+ }
+ grpc_grpclb_destroy_serverlist(serverlist);
+ } else { /* new serverlist */
+ if (grpclb_policy->serverlist_ != nullptr) {
+ /* dispose of the old serverlist */
+ grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
+ } else {
+ /* or dispose of the fallback */
+ grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
+ grpclb_policy->fallback_backend_addresses_ = nullptr;
+ if (grpclb_policy->fallback_timer_callback_pending_) {
+ grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
+ }
+ }
+ // and update the copy in the GrpcLb instance. This
+ // serverlist instance will be destroyed either upon the next
+ // update or when the GrpcLb instance is destroyed.
+ grpclb_policy->serverlist_ = serverlist;
+ grpclb_policy->serverlist_index_ = 0;
+ grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
+ }
+ } else {
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
+ grpclb_policy);
+ }
+ grpc_grpclb_destroy_serverlist(serverlist);
}
- grpc_lb_policy_update_locked(glb_policy->rr_policy, args);
} else {
- create_rr_locked(glb_policy, args);
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", glb_policy,
- glb_policy->rr_policy);
- }
+ // No valid initial response or serverlist found.
+ gpr_log(GPR_ERROR,
+ "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
+ grpclb_policy,
+ grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
+ }
+ grpc_slice_unref_internal(response_slice);
+ if (!grpclb_policy->shutting_down_) {
+ // Keep listening for serverlist updates.
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_RECV_MESSAGE;
+ op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
+ op.flags = 0;
+ op.reserved = nullptr;
+ // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
+ const grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ lb_calld->lb_call_, &op, 1,
+ &lb_calld->lb_on_balancer_message_received_);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ } else {
+ lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
}
- lb_policy_args_destroy(args);
}
-static void rr_on_connectivity_changed_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- if (glb_policy->shutting_down) {
- GRPC_LB_POLICY_UNREF(&glb_policy->base,
- "rr_on_connectivity_changed_locked");
- return;
+void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
+ void* arg, grpc_error* error) {
+ BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg);
+ GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
+ GPR_ASSERT(lb_calld->lb_call_ != nullptr);
+ if (grpc_lb_glb_trace.enabled()) {
+ char* status_details =
+ grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Status from LB server received. Status = %d, details "
+ "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
+ grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld,
+ lb_calld->lb_call_, grpc_error_string(error));
+ gpr_free(status_details);
+ }
+ grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
+ // If this lb_calld is still in use, this call ended because of a failure so
+ // we want to retry connecting. Otherwise, we have deliberately ended this
+ // call and no further action is required.
+ if (lb_calld == grpclb_policy->lb_calld_.get()) {
+ grpclb_policy->lb_calld_.reset();
+ GPR_ASSERT(!grpclb_policy->shutting_down_);
+ if (lb_calld->seen_initial_response_) {
+ // If we lose connection to the LB server, reset the backoff and restart
+ // the LB call immediately.
+ grpclb_policy->lb_call_backoff_.Reset();
+ grpclb_policy->StartBalancerCallLocked();
+ } else {
+ // If this LB call fails establishing any connection to the LB server,
+ // retry later.
+ grpclb_policy->StartBalancerCallRetryTimerLocked();
+ }
}
- update_lb_connectivity_status_locked(glb_policy, GRPC_ERROR_REF(error));
- // Resubscribe. Reuse the "rr_on_connectivity_changed_locked" ref.
- grpc_lb_policy_notify_on_state_change_locked(
- glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
- &glb_policy->rr_on_connectivity_changed);
+ lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
}
-static void destroy_balancer_name(void* balancer_name) {
- gpr_free(balancer_name);
-}
+//
+// helper code for creating balancer channel
+//
-static grpc_slice_hash_table_entry targets_info_entry_create(
- const char* address, const char* balancer_name) {
+// Helper function to construct a target info entry.
+grpc_slice_hash_table_entry BalancerEntryCreate(const char* address,
+ const char* balancer_name) {
grpc_slice_hash_table_entry entry;
entry.key = grpc_slice_from_copied_string(address);
entry.value = gpr_strdup(balancer_name);
return entry;
}
-static int balancer_name_cmp_fn(void* a, void* b) {
+// Comparison function used for slice_hash_table vtable.
+int BalancerNameCmp(void* a, void* b) {
const char* a_str = static_cast<const char*>(a);
const char* b_str = static_cast<const char*>(b);
return strcmp(a_str, b_str);
@@ -894,24 +942,22 @@ static int balancer_name_cmp_fn(void* a, void* b) {
* - \a response_generator: in order to propagate updates from the resolver
* above the grpclb policy.
* - \a args: other args inherited from the grpclb policy. */
-static grpc_channel_args* build_lb_channel_args(
+grpc_channel_args* BuildBalancerChannelArgs(
const grpc_lb_addresses* addresses,
- grpc_core::FakeResolverResponseGenerator* response_generator,
+ FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
}
- /* All input addresses come from a resolver that claims they are LB services.
- * It's the resolver's responsibility to make sure this policy is only
- * instantiated and used in that case. Otherwise, something has gone wrong. */
+ // There must be at least one balancer address, or else the
+ // client_channel would not have chosen this LB policy.
GPR_ASSERT(num_grpclb_addrs > 0);
grpc_lb_addresses* lb_addresses =
grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
grpc_slice_hash_table_entry* targets_info_entries =
- static_cast<grpc_slice_hash_table_entry*>(
- gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs));
-
+ (grpc_slice_hash_table_entry*)gpr_zalloc(sizeof(*targets_info_entries) *
+ num_grpclb_addrs);
size_t lb_addresses_idx = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (!addresses->addresses[i].is_balancer) continue;
@@ -922,28 +968,23 @@ static grpc_channel_args* build_lb_channel_args(
char* addr_str;
GPR_ASSERT(grpc_sockaddr_to_string(
&addr_str, &addresses->addresses[i].address, true) > 0);
- targets_info_entries[lb_addresses_idx] = targets_info_entry_create(
- addr_str, addresses->addresses[i].balancer_name);
+ targets_info_entries[lb_addresses_idx] =
+ BalancerEntryCreate(addr_str, addresses->addresses[i].balancer_name);
gpr_free(addr_str);
-
grpc_lb_addresses_set_address(
lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
addresses->addresses[i].address.len, false /* is balancer */,
addresses->addresses[i].balancer_name, nullptr /* user data */);
}
GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
- grpc_slice_hash_table* targets_info =
- grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries,
- destroy_balancer_name, balancer_name_cmp_fn);
+ grpc_slice_hash_table* targets_info = grpc_slice_hash_table_create(
+ num_grpclb_addrs, targets_info_entries, gpr_free, BalancerNameCmp);
gpr_free(targets_info_entries);
-
grpc_channel_args* lb_channel_args =
grpc_lb_policy_grpclb_build_lb_channel_args(targets_info,
response_generator, args);
-
grpc_arg lb_channel_addresses_arg =
grpc_lb_addresses_create_channel_arg(lb_addresses);
-
grpc_channel_args* result = grpc_channel_args_copy_and_add(
lb_channel_args, &lb_channel_addresses_arg, 1);
grpc_slice_hash_table_unref(targets_info);
@@ -952,121 +993,162 @@ static grpc_channel_args* build_lb_channel_args(
return result;
}
-static void glb_destroy(grpc_lb_policy* pol) {
- glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
- GPR_ASSERT(glb_policy->pending_picks == nullptr);
- GPR_ASSERT(glb_policy->pending_pings == nullptr);
- gpr_free((void*)glb_policy->server_name);
- grpc_channel_args_destroy(glb_policy->args);
- grpc_connectivity_state_destroy(&glb_policy->state_tracker);
- if (glb_policy->serverlist != nullptr) {
- grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
- }
- if (glb_policy->fallback_backend_addresses != nullptr) {
- grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
- }
- // TODO(roth): Remove this once the LB policy becomes a C++ object.
- glb_policy->response_generator.reset();
+//
+// ctor and dtor
+//
+
+GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
+ const LoadBalancingPolicy::Args& args)
+ : LoadBalancingPolicy(args),
+ response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
+ lb_call_backoff_(
+ BackOff::Options()
+ .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
+ 1000)
+ .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
+ 1000)) {
+ // Initialization.
+ grpc_subchannel_index_ref();
+ GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
+ &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
+ grpc_combiner_scheduler(args.combiner));
+ GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_,
+ &GrpcLb::OnRoundRobinConnectivityChangedLocked, this,
+ grpc_combiner_scheduler(args.combiner));
+ GRPC_CLOSURE_INIT(&on_rr_request_reresolution_,
+ &GrpcLb::OnRoundRobinRequestReresolutionLocked, this,
+ grpc_combiner_scheduler(args.combiner));
+ grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "grpclb");
+ // Record server name.
+ const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
+ const char* server_uri = grpc_channel_arg_get_string(arg);
+ GPR_ASSERT(server_uri != nullptr);
+ grpc_uri* uri = grpc_uri_parse(server_uri, true);
+ GPR_ASSERT(uri->path[0] != '\0');
+ server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Will use '%s' as the server name for LB request.",
+ this, server_name_);
+ }
+ grpc_uri_destroy(uri);
+ // Record LB call timeout.
+ arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
+ lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
+ // Record fallback timeout.
+ arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
+ lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
+ arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
+ // Process channel args.
+ ProcessChannelArgsLocked(*args.args);
+}
+
+GrpcLb::~GrpcLb() {
+ GPR_ASSERT(pending_picks_ == nullptr);
+ GPR_ASSERT(pending_pings_ == nullptr);
+ gpr_free((void*)server_name_);
+ grpc_channel_args_destroy(args_);
+ grpc_connectivity_state_destroy(&state_tracker_);
+ if (serverlist_ != nullptr) {
+ grpc_grpclb_destroy_serverlist(serverlist_);
+ }
+ if (fallback_backend_addresses_ != nullptr) {
+ grpc_lb_addresses_destroy(fallback_backend_addresses_);
+ }
grpc_subchannel_index_unref();
- gpr_free(glb_policy);
}
-static void glb_shutdown_locked(grpc_lb_policy* pol,
- grpc_lb_policy* new_policy) {
- glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
+void GrpcLb::ShutdownLocked() {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
- glb_policy->shutting_down = true;
- if (glb_policy->lb_calld != nullptr) {
- lb_call_data_shutdown(glb_policy);
- }
- if (glb_policy->retry_timer_callback_pending) {
- grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
- }
- if (glb_policy->fallback_timer_callback_pending) {
- grpc_timer_cancel(&glb_policy->lb_fallback_timer);
- }
- if (glb_policy->rr_policy != nullptr) {
- grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr);
- GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
- }
- // We destroy the LB channel here because
- // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
- // instance. Destroying the lb channel in glb_destroy would likely result in
- // a callback invocation without a valid glb_policy arg.
- if (glb_policy->lb_channel != nullptr) {
- grpc_channel_destroy(glb_policy->lb_channel);
- glb_policy->lb_channel = nullptr;
- }
- grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_REF(error), "glb_shutdown");
- grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
+ shutting_down_ = true;
+ lb_calld_.reset();
+ if (retry_timer_callback_pending_) {
+ grpc_timer_cancel(&lb_call_retry_timer_);
+ }
+ if (fallback_timer_callback_pending_) {
+ grpc_timer_cancel(&lb_fallback_timer_);
+ }
+ rr_policy_.reset();
+ TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
+ // We destroy the LB channel here instead of in our destructor because
+ // destroying the channel triggers a last callback to
+ // OnBalancerChannelConnectivityChangedLocked(), and we need to be
+ // alive when that callback is invoked.
+ if (lb_channel_ != nullptr) {
+ grpc_channel_destroy(lb_channel_);
+ lb_channel_ = nullptr;
+ }
+ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_REF(error), "grpclb_shutdown");
// Clear pending picks.
- pending_pick* pp = glb_policy->pending_picks;
- glb_policy->pending_picks = nullptr;
- while (pp != nullptr) {
- pending_pick* next = pp->next;
- if (new_policy != nullptr) {
- // Hand pick over to new policy.
- if (pp->client_stats != nullptr) {
- grpc_grpclb_client_stats_unref(pp->client_stats);
- }
- pp->pick->on_complete = pp->original_on_complete;
- if (grpc_lb_policy_pick_locked(new_policy, pp->pick)) {
- // Synchronous return; schedule callback.
- GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE);
- }
- gpr_free(pp);
- } else {
- pp->pick->connected_subchannel.reset();
- GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
- }
- pp = next;
+ PendingPick* pp;
+ while ((pp = pending_picks_) != nullptr) {
+ pending_picks_ = pp->next;
+ pp->pick->connected_subchannel.reset();
+ // Note: pp is deleted in this callback.
+ GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
}
// Clear pending pings.
- pending_ping* pping = glb_policy->pending_pings;
- glb_policy->pending_pings = nullptr;
- while (pping != nullptr) {
- pending_ping* next = pping->next;
+ PendingPing* pping;
+ while ((pping = pending_pings_) != nullptr) {
+ pending_pings_ = pping->next;
GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error));
- gpr_free(pping);
- pping = next;
+ Delete(pping);
}
GRPC_ERROR_UNREF(error);
}
+//
+// public methods
+//
+
+void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
+ PendingPick* pp;
+ while ((pp = pending_picks_) != nullptr) {
+ pending_picks_ = pp->next;
+ pp->pick->on_complete = pp->original_on_complete;
+ pp->pick->user_data = nullptr;
+ if (new_policy->PickLocked(pp->pick)) {
+ // Synchronous return; schedule closure.
+ GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE);
+ }
+ Delete(pp);
+ }
+}
+
// Cancel a specific pending pick.
//
// A grpclb pick progresses as follows:
-// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be
-// handed over to the RR policy (in create_rr_locked()). From that point
-// onwards, it'll be RR's responsibility. For cancellations, that implies the
-// pick needs also be cancelled by the RR instance.
+// - If there's a Round Robin policy (rr_policy_) available, it'll be
+// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
+// that point onwards, it'll be RR's responsibility. For cancellations, that
+// implies the pick needs also be cancelled by the RR instance.
// - Otherwise, without an RR instance, picks stay pending at this policy's
-// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
-// we invoke the completion closure and set *target to nullptr right here.
-static void glb_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_lb_policy_pick_state* pick,
- grpc_error* error) {
- glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
- pending_pick* pp = glb_policy->pending_picks;
- glb_policy->pending_picks = nullptr;
+// level (grpclb), inside the pending_picks_ list. To cancel these,
+// we invoke the completion closure and set the pick's connected
+// subchannel to nullptr right here.
+void GrpcLb::CancelPickLocked(PickState* pick, grpc_error* error) {
+ PendingPick* pp = pending_picks_;
+ pending_picks_ = nullptr;
while (pp != nullptr) {
- pending_pick* next = pp->next;
+ PendingPick* next = pp->next;
if (pp->pick == pick) {
pick->connected_subchannel.reset();
+ // Note: pp is deleted in this callback.
GRPC_CLOSURE_SCHED(&pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
- pp->next = glb_policy->pending_picks;
- glb_policy->pending_picks = pp;
+ pp->next = pending_picks_;
+ pending_picks_ = pp;
}
pp = next;
}
- if (glb_policy->rr_policy != nullptr) {
- grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, pick,
- GRPC_ERROR_REF(error));
+ if (rr_policy_ != nullptr) {
+ rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
@@ -1074,863 +1156,710 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol,
// Cancel all pending picks.
//
// A grpclb pick progresses as follows:
-// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be
-// handed over to the RR policy (in create_rr_locked()). From that point
-// onwards, it'll be RR's responsibility. For cancellations, that implies the
-// pick needs also be cancelled by the RR instance.
+// - If there's a Round Robin policy (rr_policy_) available, it'll be
+// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
+// that point onwards, it'll be RR's responsibility. For cancellations, that
+// implies the pick needs also be cancelled by the RR instance.
// - Otherwise, without an RR instance, picks stay pending at this policy's
-// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
-// we invoke the completion closure and set *target to nullptr right here.
-static void glb_cancel_picks_locked(grpc_lb_policy* pol,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq,
- grpc_error* error) {
- glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
- pending_pick* pp = glb_policy->pending_picks;
- glb_policy->pending_picks = nullptr;
+// level (grpclb), inside the pending_picks_ list. To cancel these,
+// we invoke the completion closure and set the pick's connected
+// subchannel to nullptr right here.
+void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error* error) {
+ PendingPick* pp = pending_picks_;
+ pending_picks_ = nullptr;
while (pp != nullptr) {
- pending_pick* next = pp->next;
+ PendingPick* next = pp->next;
if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
+ // Note: pp is deleted in this callback.
GRPC_CLOSURE_SCHED(&pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
- pp->next = glb_policy->pending_picks;
- glb_policy->pending_picks = pp;
+ pp->next = pending_picks_;
+ pending_picks_ = pp;
}
pp = next;
}
- if (glb_policy->rr_policy != nullptr) {
- grpc_lb_policy_cancel_picks_locked(
- glb_policy->rr_policy, initial_metadata_flags_mask,
- initial_metadata_flags_eq, GRPC_ERROR_REF(error));
+ if (rr_policy_ != nullptr) {
+ rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
+ initial_metadata_flags_eq,
+ GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
-static void lb_on_fallback_timer_locked(void* arg, grpc_error* error);
-static void query_for_backends_locked(glb_lb_policy* glb_policy);
-static void start_picking_locked(glb_lb_policy* glb_policy) {
- /* start a timer to fall back */
- if (glb_policy->lb_fallback_timeout_ms > 0 &&
- glb_policy->serverlist == nullptr &&
- !glb_policy->fallback_timer_callback_pending) {
- grpc_millis deadline =
- grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms;
- GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_fallback_timer");
- GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked,
- glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- glb_policy->fallback_timer_callback_pending = true;
- grpc_timer_init(&glb_policy->lb_fallback_timer, deadline,
- &glb_policy->lb_on_fallback);
- }
- glb_policy->started_picking = true;
- glb_policy->lb_call_backoff->Reset();
- query_for_backends_locked(glb_policy);
-}
-
-static void glb_exit_idle_locked(grpc_lb_policy* pol) {
- glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
- if (!glb_policy->started_picking) {
- start_picking_locked(glb_policy);
+void GrpcLb::ExitIdleLocked() {
+ if (!started_picking_) {
+ StartPickingLocked();
}
}
-static int glb_pick_locked(grpc_lb_policy* pol,
- grpc_lb_policy_pick_state* pick) {
- glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
- pending_pick* pp = pending_pick_create(glb_policy, pick);
+bool GrpcLb::PickLocked(PickState* pick) {
+ PendingPick* pp = PendingPickCreate(pick);
bool pick_done = false;
- if (glb_policy->rr_policy != nullptr) {
+ if (rr_policy_ != nullptr) {
const grpc_connectivity_state rr_connectivity_state =
- grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy,
- nullptr);
- // The glb_policy->rr_policy may have transitioned to SHUTDOWN but the
- // callback registered to capture this event
- // (on_rr_connectivity_changed_locked) may not have been invoked yet. We
- // need to make sure we aren't trying to pick from a RR policy instance
- // that's in shutdown.
+ rr_policy_->CheckConnectivityLocked(nullptr);
+ // The RR policy may have transitioned to SHUTDOWN but the callback
+ // registered to capture this event (on_rr_connectivity_changed_) may not
+ // have been invoked yet. We need to make sure we aren't trying to pick
+ // from an RR policy instance that's in shutdown.
if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] NOT picking from from RR %p: RR conn state=%s",
- glb_policy, glb_policy->rr_policy,
+ this, rr_policy_.get(),
grpc_connectivity_state_name(rr_connectivity_state));
}
- pending_pick_add(&glb_policy->pending_picks, pp);
+ AddPendingPick(pp);
pick_done = false;
} else { // RR not in shutdown
if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy,
- glb_policy->rr_policy);
+ gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
+ rr_policy_.get());
}
- pick_done =
- pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp);
+ pick_done = PickFromRoundRobinPolicyLocked(false /* force_async */, pp);
}
- } else { // glb_policy->rr_policy == NULL
+ } else { // rr_policy_ == NULL
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[grpclb %p] No RR policy. Adding to grpclb's pending picks",
- glb_policy);
+ this);
}
- pending_pick_add(&glb_policy->pending_picks, pp);
- if (!glb_policy->started_picking) {
- start_picking_locked(glb_policy);
+ AddPendingPick(pp);
+ if (!started_picking_) {
+ StartPickingLocked();
}
pick_done = false;
}
return pick_done;
}
-static grpc_connectivity_state glb_check_connectivity_locked(
- grpc_lb_policy* pol, grpc_error** connectivity_error) {
- glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
- return grpc_connectivity_state_get(&glb_policy->state_tracker,
- connectivity_error);
-}
-
-static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
- grpc_closure* on_ack) {
- glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
- if (glb_policy->rr_policy) {
- grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
+void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
+ if (rr_policy_ != nullptr) {
+ rr_policy_->PingOneLocked(on_initiate, on_ack);
} else {
- pending_ping_add(&glb_policy->pending_pings, on_initiate, on_ack);
- if (!glb_policy->started_picking) {
- start_picking_locked(glb_policy);
+ AddPendingPing(on_initiate, on_ack);
+ if (!started_picking_) {
+ StartPickingLocked();
}
}
}
-static void glb_notify_on_state_change_locked(grpc_lb_policy* pol,
- grpc_connectivity_state* current,
- grpc_closure* notify) {
- glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
- grpc_connectivity_state_notify_on_state_change(&glb_policy->state_tracker,
- current, notify);
-}
-
-static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg);
- glb_policy->retry_timer_callback_pending = false;
- if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE &&
- glb_policy->lb_calld == nullptr) {
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy);
- }
- query_for_backends_locked(glb_policy);
- }
- GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer");
-}
-
-static void start_lb_call_retry_timer_locked(glb_lb_policy* glb_policy) {
- grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime();
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
- glb_policy);
- grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
- if (timeout > 0) {
- gpr_log(GPR_DEBUG,
- "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.",
- glb_policy, timeout);
- } else {
- gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
- glb_policy);
- }
- }
- GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer");
- GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
- lb_call_on_retry_timer_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- glb_policy->retry_timer_callback_pending = true;
- grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
- &glb_policy->lb_on_call_retry);
+grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
+ grpc_error** connectivity_error) {
+ return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
}
-static void maybe_send_client_load_report_locked(void* arg, grpc_error* error);
-
-static void schedule_next_client_load_report(glb_lb_call_data* lb_calld) {
- const grpc_millis next_client_load_report_time =
- grpc_core::ExecCtx::Get()->Now() + lb_calld->client_stats_report_interval;
- GRPC_CLOSURE_INIT(
- &lb_calld->client_load_report_closure,
- maybe_send_client_load_report_locked, lb_calld,
- grpc_combiner_scheduler(lb_calld->glb_policy->base.combiner));
- grpc_timer_init(&lb_calld->client_load_report_timer,
- next_client_load_report_time,
- &lb_calld->client_load_report_closure);
- lb_calld->client_load_report_timer_callback_pending = true;
+void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
+ grpc_closure* notify) {
+ grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
+ notify);
}
-static void client_load_report_done_locked(void* arg, grpc_error* error) {
- glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg);
- glb_lb_policy* glb_policy = lb_calld->glb_policy;
- grpc_byte_buffer_destroy(lb_calld->send_message_payload);
- lb_calld->send_message_payload = nullptr;
- if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) {
- glb_lb_call_data_unref(lb_calld, "client_load_report");
+void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
+ const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
+ // Ignore this update.
+ gpr_log(
+ GPR_ERROR,
+ "[grpclb %p] No valid LB addresses channel arg in update, ignoring.",
+ this);
return;
}
- schedule_next_client_load_report(lb_calld);
-}
-
-static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
- grpc_grpclb_dropped_call_counts* drop_entries =
- static_cast<grpc_grpclb_dropped_call_counts*>(
- request->client_stats.calls_finished_with_drop.arg);
- return request->client_stats.num_calls_started == 0 &&
- request->client_stats.num_calls_finished == 0 &&
- request->client_stats.num_calls_finished_with_client_failed_to_send ==
- 0 &&
- request->client_stats.num_calls_finished_known_received == 0 &&
- (drop_entries == nullptr || drop_entries->num_entries == 0);
-}
-
-static void send_client_load_report_locked(glb_lb_call_data* lb_calld) {
- glb_lb_policy* glb_policy = lb_calld->glb_policy;
- // Construct message payload.
- GPR_ASSERT(lb_calld->send_message_payload == nullptr);
- grpc_grpclb_request* request =
- grpc_grpclb_load_report_request_create_locked(lb_calld->client_stats);
- // Skip client load report if the counters were all zero in the last
- // report and they are still zero in this one.
- if (load_report_counters_are_zero(request)) {
- if (lb_calld->last_client_load_report_counters_were_zero) {
- grpc_grpclb_request_destroy(request);
- schedule_next_client_load_report(lb_calld);
- return;
- }
- lb_calld->last_client_load_report_counters_were_zero = true;
- } else {
- lb_calld->last_client_load_report_counters_were_zero = false;
+ const grpc_lb_addresses* addresses =
+ reinterpret_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
+ // Update fallback address list.
+ if (fallback_backend_addresses_ != nullptr) {
+ grpc_lb_addresses_destroy(fallback_backend_addresses_);
}
- grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
- lb_calld->send_message_payload =
- grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref_internal(request_payload_slice);
- grpc_grpclb_request_destroy(request);
- // Send the report.
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_SEND_MESSAGE;
- op.data.send_message.send_message = lb_calld->send_message_payload;
- GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure,
- client_load_report_done_locked, lb_calld,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- lb_calld->lb_call, &op, 1, &lb_calld->client_load_report_closure);
- if (call_error != GRPC_CALL_OK) {
- gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
+ fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
+ // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
+ // since we use this to trigger the client_load_reporting filter.
+ static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
+ grpc_arg new_arg = grpc_channel_arg_string_create(
+ (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
+ grpc_channel_args_destroy(args_);
+ args_ = grpc_channel_args_copy_and_add_and_remove(
+ &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
+ // Construct args for balancer channel.
+ grpc_channel_args* lb_channel_args =
+ BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
+ // Create balancer channel if needed.
+ if (lb_channel_ == nullptr) {
+ char* uri_str;
+ gpr_asprintf(&uri_str, "fake:///%s", server_name_);
+ lb_channel_ = grpc_lb_policy_grpclb_create_lb_channel(
+ uri_str, client_channel_factory(), lb_channel_args);
+ GPR_ASSERT(lb_channel_ != nullptr);
+ gpr_free(uri_str);
}
+ // Propagate updates to the LB channel (pick_first) through the fake
+ // resolver.
+ response_generator_->SetResponse(lb_channel_args);
+ grpc_channel_args_destroy(lb_channel_args);
}
-static void maybe_send_client_load_report_locked(void* arg, grpc_error* error) {
- glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg);
- glb_lb_policy* glb_policy = lb_calld->glb_policy;
- lb_calld->client_load_report_timer_callback_pending = false;
- if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) {
- glb_lb_call_data_unref(lb_calld, "client_load_report");
- return;
+void GrpcLb::UpdateLocked(const grpc_channel_args& args) {
+ ProcessChannelArgsLocked(args);
+ // If fallback is configured and the RR policy already exists, update
+ // it with the new fallback addresses.
+ if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) {
+ CreateOrUpdateRoundRobinPolicyLocked();
}
- // If we've already sent the initial request, then we can go ahead and send
- // the load report. Otherwise, we need to wait until the initial request has
- // been sent to send this (see lb_on_sent_initial_request_locked()).
- if (lb_calld->send_message_payload == nullptr) {
- send_client_load_report_locked(lb_calld);
- } else {
- lb_calld->client_load_report_is_due = true;
+ // Start watching the LB channel connectivity for connection, if not
+ // already doing so.
+ if (!watching_lb_channel_) {
+ lb_channel_connectivity_ = grpc_channel_check_connectivity_state(
+ lb_channel_, true /* try to connect */);
+ grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(lb_channel_));
+ GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
+ watching_lb_channel_ = true;
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ auto self = Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
+ self.release();
+ grpc_client_channel_watch_connectivity_state(
+ client_channel_elem,
+ grpc_polling_entity_create_from_pollset_set(interested_parties()),
+ &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
+ nullptr);
}
}
-static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error);
-static void lb_on_server_status_received_locked(void* arg, grpc_error* error);
-static void lb_on_response_received_locked(void* arg, grpc_error* error);
-static glb_lb_call_data* lb_call_data_create_locked(glb_lb_policy* glb_policy) {
- GPR_ASSERT(!glb_policy->shutting_down);
- // Init the LB call. Note that the LB call will progress every time there's
- // activity in glb_policy->base.interested_parties, which is comprised of the
- // polling entities from client_channel.
- GPR_ASSERT(glb_policy->server_name != nullptr);
- GPR_ASSERT(glb_policy->server_name[0] != '\0');
- grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name);
- grpc_millis deadline =
- glb_policy->lb_call_timeout_ms == 0
- ? GRPC_MILLIS_INF_FUTURE
- : grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms;
- glb_lb_call_data* lb_calld =
- static_cast<glb_lb_call_data*>(gpr_zalloc(sizeof(*lb_calld)));
- lb_calld->lb_call = grpc_channel_create_pollset_set_call(
- glb_policy->lb_channel, nullptr, GRPC_PROPAGATE_DEFAULTS,
- glb_policy->base.interested_parties,
- GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
- &host, deadline, nullptr);
- grpc_slice_unref_internal(host);
- // Init the LB call request payload.
- grpc_grpclb_request* request =
- grpc_grpclb_request_create(glb_policy->server_name);
- grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
- lb_calld->send_message_payload =
- grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_slice_unref_internal(request_payload_slice);
- grpc_grpclb_request_destroy(request);
- // Init other data associated with the LB call.
- lb_calld->glb_policy = glb_policy;
- gpr_ref_init(&lb_calld->refs, 1);
- grpc_metadata_array_init(&lb_calld->lb_initial_metadata_recv);
- grpc_metadata_array_init(&lb_calld->lb_trailing_metadata_recv);
- GRPC_CLOSURE_INIT(&lb_calld->lb_on_sent_initial_request,
- lb_on_sent_initial_request_locked, lb_calld,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- GRPC_CLOSURE_INIT(&lb_calld->lb_on_response_received,
- lb_on_response_received_locked, lb_calld,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- GRPC_CLOSURE_INIT(&lb_calld->lb_on_server_status_received,
- lb_on_server_status_received_locked, lb_calld,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- // Hold a ref to the glb_policy.
- GRPC_LB_POLICY_REF(&glb_policy->base, "lb_calld");
- return lb_calld;
-}
+//
+// code for balancer channel and call
+//
-/*
- * Auxiliary functions and LB client callbacks.
- */
+void GrpcLb::StartPickingLocked() {
+ // Start a timer to fall back.
+ if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
+ !fallback_timer_callback_pending_) {
+ grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ auto self = Ref(DEBUG_LOCATION, "on_fallback_timer");
+ self.release();
+ GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
+ grpc_combiner_scheduler(combiner()));
+ fallback_timer_callback_pending_ = true;
+ grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
+ }
+ started_picking_ = true;
+ StartBalancerCallLocked();
+}
-static void query_for_backends_locked(glb_lb_policy* glb_policy) {
- GPR_ASSERT(glb_policy->lb_channel != nullptr);
- if (glb_policy->shutting_down) return;
+void GrpcLb::StartBalancerCallLocked() {
+ GPR_ASSERT(lb_channel_ != nullptr);
+ if (shutting_down_) return;
// Init the LB call data.
- GPR_ASSERT(glb_policy->lb_calld == nullptr);
- glb_policy->lb_calld = lb_call_data_create_locked(glb_policy);
+ GPR_ASSERT(lb_calld_ == nullptr);
+ lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p, "
- "lb_call: %p)",
- glb_policy, glb_policy->lb_channel, glb_policy->lb_calld,
- glb_policy->lb_calld->lb_call);
- }
- GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr);
- // Create the ops.
- grpc_call_error call_error;
- grpc_op ops[3];
- memset(ops, 0, sizeof(ops));
- // Op: send initial metadata.
- grpc_op* op = ops;
- op->op = GRPC_OP_SEND_INITIAL_METADATA;
- op->data.send_initial_metadata.count = 0;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- // Op: send request message.
- GPR_ASSERT(glb_policy->lb_calld->send_message_payload != nullptr);
- op->op = GRPC_OP_SEND_MESSAGE;
- op->data.send_message.send_message =
- glb_policy->lb_calld->send_message_payload;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- glb_lb_call_data_ref(glb_policy->lb_calld,
- "lb_on_sent_initial_request_locked");
- call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_calld->lb_call, ops, static_cast<size_t>(op - ops),
- &glb_policy->lb_calld->lb_on_sent_initial_request);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- // Op: recv initial metadata.
- op = ops;
- op->op = GRPC_OP_RECV_INITIAL_METADATA;
- op->data.recv_initial_metadata.recv_initial_metadata =
- &glb_policy->lb_calld->lb_initial_metadata_recv;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- // Op: recv response.
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message.recv_message =
- &glb_policy->lb_calld->recv_message_payload;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- glb_lb_call_data_ref(glb_policy->lb_calld, "lb_on_response_received_locked");
- call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_calld->lb_call, ops, static_cast<size_t>(op - ops),
- &glb_policy->lb_calld->lb_on_response_received);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- // Op: recv server status.
- op = ops;
- op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- op->data.recv_status_on_client.trailing_metadata =
- &glb_policy->lb_calld->lb_trailing_metadata_recv;
- op->data.recv_status_on_client.status = &glb_policy->lb_calld->lb_call_status;
- op->data.recv_status_on_client.status_details =
- &glb_policy->lb_calld->lb_call_status_details;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- // This callback signals the end of the LB call, so it relies on the initial
- // ref instead of a new ref. When it's invoked, it's the initial ref that is
- // unreffed.
- call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_calld->lb_call, ops, static_cast<size_t>(op - ops),
- &glb_policy->lb_calld->lb_on_server_status_received);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
-}
-
-static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) {
- glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg);
- grpc_byte_buffer_destroy(lb_calld->send_message_payload);
- lb_calld->send_message_payload = nullptr;
- // If we attempted to send a client load report before the initial request was
- // sent (and this lb_calld is still in use), send the load report now.
- if (lb_calld->client_load_report_is_due &&
- lb_calld == lb_calld->glb_policy->lb_calld) {
- send_client_load_report_locked(lb_calld);
- lb_calld->client_load_report_is_due = false;
+ "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
+ this, lb_channel_, lb_calld_.get());
}
- glb_lb_call_data_unref(lb_calld, "lb_on_sent_initial_request_locked");
+ lb_calld_->StartQuery();
}
-static void lb_on_response_received_locked(void* arg, grpc_error* error) {
- glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg);
- glb_lb_policy* glb_policy = lb_calld->glb_policy;
- // Empty payload means the LB call was cancelled.
- if (lb_calld != glb_policy->lb_calld ||
- lb_calld->recv_message_payload == nullptr) {
- glb_lb_call_data_unref(lb_calld, "lb_on_response_received_locked");
- return;
- }
- grpc_op ops[2];
- memset(ops, 0, sizeof(ops));
- grpc_op* op = ops;
- glb_policy->lb_call_backoff->Reset();
- grpc_byte_buffer_reader bbr;
- grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload);
- grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
- grpc_byte_buffer_reader_destroy(&bbr);
- grpc_byte_buffer_destroy(lb_calld->recv_message_payload);
- lb_calld->recv_message_payload = nullptr;
- grpc_grpclb_initial_response* initial_response;
- grpc_grpclb_serverlist* serverlist;
- if (!lb_calld->seen_initial_response &&
- (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
- nullptr) {
- // Have NOT seen initial response, look for initial response.
- if (initial_response->has_client_stats_report_interval) {
- lb_calld->client_stats_report_interval = GPR_MAX(
- GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
- &initial_response->client_stats_report_interval));
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Received initial LB response message; "
- "client load reporting interval = %" PRIdPTR " milliseconds",
- glb_policy, lb_calld->client_stats_report_interval);
- }
- } else if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Received initial LB response message; client load "
- "reporting NOT enabled",
- glb_policy);
- }
- grpc_grpclb_initial_response_destroy(initial_response);
- lb_calld->seen_initial_response = true;
- } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
- response_slice)) != nullptr) {
- // Have seen initial response, look for serverlist.
- GPR_ASSERT(lb_calld->lb_call != nullptr);
+void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
+ GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
+ grpclb_policy->fallback_timer_callback_pending_ = false;
+ // If we receive a serverlist after the timer fires but before this callback
+ // actually runs, don't fall back.
+ if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ &&
+ error == GRPC_ERROR_NONE) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
- glb_policy, serverlist->num_servers);
- for (size_t i = 0; i < serverlist->num_servers; ++i) {
- grpc_resolved_address addr;
- parse_server(serverlist->servers[i], &addr);
- char* ipport;
- grpc_sockaddr_to_string(&ipport, &addr, false);
- gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
- glb_policy, i, ipport);
- gpr_free(ipport);
- }
- }
- /* update serverlist */
- if (serverlist->num_servers > 0) {
- // Start sending client load report only after we start using the
- // serverlist returned from the current LB call.
- if (lb_calld->client_stats_report_interval > 0 &&
- lb_calld->client_stats == nullptr) {
- lb_calld->client_stats = grpc_grpclb_client_stats_create();
- glb_lb_call_data_ref(lb_calld, "client_load_report");
- schedule_next_client_load_report(lb_calld);
- }
- if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Incoming server list identical to current, "
- "ignoring.",
- glb_policy);
- }
- grpc_grpclb_destroy_serverlist(serverlist);
- } else { /* new serverlist */
- if (glb_policy->serverlist != nullptr) {
- /* dispose of the old serverlist */
- grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
- } else {
- /* or dispose of the fallback */
- grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
- glb_policy->fallback_backend_addresses = nullptr;
- if (glb_policy->fallback_timer_callback_pending) {
- grpc_timer_cancel(&glb_policy->lb_fallback_timer);
- glb_policy->fallback_timer_callback_pending = false;
- }
- }
- /* and update the copy in the glb_lb_policy instance. This
- * serverlist instance will be destroyed either upon the next
- * update or in glb_destroy() */
- glb_policy->serverlist = serverlist;
- glb_policy->serverlist_index = 0;
- rr_handover_locked(glb_policy);
- }
- } else {
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
- glb_policy);
- }
- grpc_grpclb_destroy_serverlist(serverlist);
+ "[grpclb %p] Falling back to use backends from resolver",
+ grpclb_policy);
}
- } else {
- // No valid initial response or serverlist found.
- gpr_log(GPR_ERROR,
- "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
- glb_policy,
- grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
- }
- grpc_slice_unref_internal(response_slice);
- if (!glb_policy->shutting_down) {
- // Keep listening for serverlist updates.
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message.recv_message = &lb_calld->recv_message_payload;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- // Reuse the "lb_on_response_received_locked" ref taken in
- // query_for_backends_locked().
- const grpc_call_error call_error = grpc_call_start_batch_and_execute(
- lb_calld->lb_call, ops, static_cast<size_t>(op - ops),
- &lb_calld->lb_on_response_received);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- } else {
- glb_lb_call_data_unref(lb_calld,
- "lb_on_response_received_locked+glb_shutdown");
+ GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr);
+ grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
}
+ grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
}
-static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
- glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg);
- glb_lb_policy* glb_policy = lb_calld->glb_policy;
- GPR_ASSERT(lb_calld->lb_call != nullptr);
+void GrpcLb::StartBalancerCallRetryTimerLocked() {
+ grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
if (grpc_lb_glb_trace.enabled()) {
- char* status_details =
- grpc_slice_to_c_string(lb_calld->lb_call_status_details);
- gpr_log(GPR_INFO,
- "[grpclb %p] Status from LB server received. Status = %d, details "
- "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
- lb_calld->glb_policy, lb_calld->lb_call_status, status_details,
- lb_calld, lb_calld->lb_call, grpc_error_string(error));
- gpr_free(status_details);
- }
- grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace,
- GRPC_ERROR_NONE);
- // If this lb_calld is still in use, this call ended because of a failure so
- // we want to retry connecting. Otherwise, we have deliberately ended this
- // call and no further action is required.
- if (lb_calld == glb_policy->lb_calld) {
- glb_policy->lb_calld = nullptr;
- if (lb_calld->client_load_report_timer_callback_pending) {
- grpc_timer_cancel(&lb_calld->client_load_report_timer);
- }
- GPR_ASSERT(!glb_policy->shutting_down);
- if (lb_calld->seen_initial_response) {
- // If we lose connection to the LB server, reset the backoff and restart
- // the LB call immediately.
- glb_policy->lb_call_backoff->Reset();
- query_for_backends_locked(glb_policy);
+ gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", this);
+ grpc_millis timeout = next_try - ExecCtx::Get()->Now();
+ if (timeout > 0) {
+ gpr_log(GPR_DEBUG,
+ "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", this,
+ timeout);
} else {
- // If this LB call fails establishing any connection to the LB server,
- // retry later.
- start_lb_call_retry_timer_locked(glb_policy);
+ gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
+ this);
}
}
- glb_lb_call_data_unref(lb_calld, "lb_call_ended");
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
+ self.release();
+ GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
+ this, grpc_combiner_scheduler(combiner()));
+ retry_timer_callback_pending_ = true;
+ grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
}
-static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg);
- glb_policy->fallback_timer_callback_pending = false;
- /* If we receive a serverlist after the timer fires but before this callback
- * actually runs, don't fall back. */
- if (glb_policy->serverlist == nullptr && !glb_policy->shutting_down &&
- error == GRPC_ERROR_NONE) {
+void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
+ GrpcLb* grpclb_policy = reinterpret_cast<GrpcLb*>(arg);
+ grpclb_policy->retry_timer_callback_pending_ = false;
+ if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
+ grpclb_policy->lb_calld_ == nullptr) {
if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Falling back to use backends from resolver",
- glb_policy);
+ gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
+ grpclb_policy);
}
- GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
- rr_handover_locked(glb_policy);
- }
- GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
-}
-
-static void fallback_update_locked(glb_lb_policy* glb_policy,
- const grpc_lb_addresses* addresses) {
- GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
- grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
- glb_policy->fallback_backend_addresses =
- extract_backend_addresses_locked(addresses);
- if (glb_policy->lb_fallback_timeout_ms > 0 &&
- glb_policy->rr_policy != nullptr) {
- rr_handover_locked(glb_policy);
- }
-}
-
-static void glb_update_locked(grpc_lb_policy* policy,
- const grpc_lb_policy_args* args) {
- glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(policy);
- const grpc_arg* arg =
- grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
- if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
- if (glb_policy->lb_channel == nullptr) {
- // If we don't have a current channel to the LB, go into TRANSIENT
- // FAILURE.
- grpc_connectivity_state_set(
- &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
- "glb_update_missing");
- } else {
- // otherwise, keep using the current LB channel (ignore this update).
- gpr_log(
- GPR_ERROR,
- "[grpclb %p] No valid LB addresses channel arg in update, ignoring.",
- glb_policy);
- }
- return;
- }
- const grpc_lb_addresses* addresses =
- static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
- // If a non-empty serverlist hasn't been received from the balancer,
- // propagate the update to fallback_backend_addresses.
- if (glb_policy->serverlist == nullptr) {
- fallback_update_locked(glb_policy, addresses);
- }
- GPR_ASSERT(glb_policy->lb_channel != nullptr);
- // Propagate updates to the LB channel (pick_first) through the fake
- // resolver.
- grpc_channel_args* lb_channel_args = build_lb_channel_args(
- addresses, glb_policy->response_generator.get(), args->args);
- glb_policy->response_generator->SetResponse(lb_channel_args);
- grpc_channel_args_destroy(lb_channel_args);
- // Start watching the LB channel connectivity for connection, if not
- // already doing so.
- if (!glb_policy->watching_lb_channel) {
- glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
- glb_policy->lb_channel, true /* try to connect */);
- grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
- grpc_channel_get_channel_stack(glb_policy->lb_channel));
- GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
- glb_policy->watching_lb_channel = true;
- GRPC_LB_POLICY_REF(&glb_policy->base, "watch_lb_channel_connectivity");
- grpc_client_channel_watch_connectivity_state(
- client_channel_elem,
- grpc_polling_entity_create_from_pollset_set(
- glb_policy->base.interested_parties),
- &glb_policy->lb_channel_connectivity,
- &glb_policy->lb_channel_on_connectivity_changed, nullptr);
+ grpclb_policy->StartBalancerCallLocked();
}
+ grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
}
// Invoked as part of the update process. It continues watching the LB channel
// until it shuts down or becomes READY. It's invoked even if the LB channel
// stayed READY throughout the update (for example if the update is identical).
-static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
- grpc_error* error) {
- glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg);
- if (glb_policy->shutting_down) goto done;
+void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
+ grpc_error* error) {
+ GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
+ if (grpclb_policy->shutting_down_) goto done;
// Re-initialize the lb_call. This should also take care of updating the
// embedded RR policy. Note that the current RR policy, if any, will stay in
// effect until an update from the new lb_call is received.
- switch (glb_policy->lb_channel_connectivity) {
+ switch (grpclb_policy->lb_channel_connectivity_) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
// Keep watching the LB channel.
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
- grpc_channel_get_channel_stack(glb_policy->lb_channel));
+ grpc_channel_get_channel_stack(grpclb_policy->lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
- glb_policy->base.interested_parties),
- &glb_policy->lb_channel_connectivity,
- &glb_policy->lb_channel_on_connectivity_changed, nullptr);
+ grpclb_policy->interested_parties()),
+ &grpclb_policy->lb_channel_connectivity_,
+ &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr);
break;
}
// The LB channel may be IDLE because it's shut down before the update.
// Restart the LB call to kick the LB channel into gear.
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_READY:
- if (glb_policy->lb_calld != nullptr) {
- lb_call_data_shutdown(glb_policy);
- }
- if (glb_policy->started_picking) {
- if (glb_policy->retry_timer_callback_pending) {
- grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
+ grpclb_policy->lb_calld_.reset();
+ if (grpclb_policy->started_picking_) {
+ if (grpclb_policy->retry_timer_callback_pending_) {
+ grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_);
}
- glb_policy->lb_call_backoff->Reset();
- query_for_backends_locked(glb_policy);
+ grpclb_policy->lb_call_backoff_.Reset();
+ grpclb_policy->StartBalancerCallLocked();
}
// Fall through.
case GRPC_CHANNEL_SHUTDOWN:
done:
- glb_policy->watching_lb_channel = false;
- GRPC_LB_POLICY_UNREF(&glb_policy->base,
+ grpclb_policy->watching_lb_channel_ = false;
+ grpclb_policy->Unref(DEBUG_LOCATION,
"watch_lb_channel_connectivity_cb_shutdown");
}
}
-/* Code wiring the policy with the rest of the core */
-static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
- glb_destroy,
- glb_shutdown_locked,
- glb_pick_locked,
- glb_cancel_pick_locked,
- glb_cancel_picks_locked,
- glb_ping_one_locked,
- glb_exit_idle_locked,
- glb_check_connectivity_locked,
- glb_notify_on_state_change_locked,
- glb_update_locked};
-
-static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
- grpc_lb_policy_args* args) {
- /* Count the number of gRPC-LB addresses. There must be at least one. */
- const grpc_arg* arg =
- grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
- if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
- return nullptr;
+//
+// PendingPick
+//
+
+// Adds lb_token of selected subchannel (address) to the call's initial
+// metadata.
+grpc_error* AddLbTokenToInitialMetadata(
+ grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
+ grpc_metadata_batch* initial_metadata) {
+ GPR_ASSERT(lb_token_mdelem_storage != nullptr);
+ GPR_ASSERT(!GRPC_MDISNULL(lb_token));
+ return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
+ lb_token);
+}
+
+// Destroy function used when embedding client stats in call context.
+void DestroyClientStats(void* arg) {
+ grpc_grpclb_client_stats_unref(
+ reinterpret_cast<grpc_grpclb_client_stats*>(arg));
+}
+
+void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
+ /* if connected_subchannel is nullptr, no pick has been made by the RR
+ * policy (e.g., all addresses failed to connect). There won't be any
+ * user_data/token available */
+ if (pp->pick->connected_subchannel != nullptr) {
+ if (!GRPC_MDISNULL(pp->lb_token)) {
+ AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
+ &pp->pick->lb_token_mdelem_storage,
+ pp->pick->initial_metadata);
+ } else {
+ gpr_log(GPR_ERROR,
+ "[grpclb %p] No LB token for connected subchannel pick %p",
+ pp->grpclb_policy, pp->pick);
+ abort();
+ }
+ // Pass on client stats via context. Passes ownership of the reference.
+ if (pp->client_stats != nullptr) {
+ pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
+ pp->client_stats;
+ pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
+ DestroyClientStats;
+ }
+ } else {
+ if (pp->client_stats != nullptr) {
+ grpc_grpclb_client_stats_unref(pp->client_stats);
+ }
}
- grpc_lb_addresses* addresses =
- static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
- size_t num_grpclb_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+}
+
+/* The \a on_complete closure passed as part of the pick requires keeping a
+ * reference to its associated round robin instance. We wrap this closure in
+ * order to unref the round robin instance upon its invocation */
+void GrpcLb::OnPendingPickComplete(void* arg, grpc_error* error) {
+ PendingPick* pp = reinterpret_cast<PendingPick*>(arg);
+ PendingPickSetMetadataAndContext(pp);
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
+ Delete(pp);
+}
+
+GrpcLb::PendingPick* GrpcLb::PendingPickCreate(PickState* pick) {
+ PendingPick* pp = New<PendingPick>();
+ pp->grpclb_policy = this;
+ pp->pick = pick;
+ GRPC_CLOSURE_INIT(&pp->on_complete, &GrpcLb::OnPendingPickComplete, pp,
+ grpc_schedule_on_exec_ctx);
+ pp->original_on_complete = pick->on_complete;
+ pick->on_complete = &pp->on_complete;
+ return pp;
+}
+
+void GrpcLb::AddPendingPick(PendingPick* pp) {
+ pp->next = pending_picks_;
+ pending_picks_ = pp;
+}
+
+//
+// PendingPing
+//
+
+void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) {
+ PendingPing* pping = New<PendingPing>();
+ pping->on_initiate = on_initiate;
+ pping->on_ack = on_ack;
+ pping->next = pending_pings_;
+ pending_pings_ = pping;
+}
+
+//
+// code for interacting with the RR policy
+//
+
+// Performs a pick over \a rr_policy_. Given that a pick can return
+// immediately (ignoring its completion callback), we need to perform the
+// cleanups this callback would otherwise be responsible for.
+// If \a force_async is true, then we will manually schedule the
+// completion callback even if the pick is available immediately.
+bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
+ // Check for drops if we are not using fallback backend addresses.
+ if (serverlist_ != nullptr) {
+ // Look at the index into the serverlist to see if we should drop this call.
+ grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++];
+ if (serverlist_index_ == serverlist_->num_servers) {
+ serverlist_index_ = 0; // Wrap-around.
+ }
+ if (server->drop) {
+ // Update client load reporting stats to indicate the number of
+ // dropped calls. Note that we have to do this here instead of in
+ // the client_load_reporting filter, because we do not create a
+ // subchannel call (and therefore no client_load_reporting filter)
+ // for dropped calls.
+ if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
+ grpc_grpclb_client_stats_add_call_dropped_locked(
+ server->load_balance_token, lb_calld_->client_stats());
+ }
+ if (force_async) {
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
+ Delete(pp);
+ return false;
+ }
+ Delete(pp);
+ return true;
+ }
}
- if (num_grpclb_addrs == 0) return nullptr;
+ // Set client_stats and user_data.
+ if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
+ pp->client_stats = grpc_grpclb_client_stats_ref(lb_calld_->client_stats());
+ }
+ GPR_ASSERT(pp->pick->user_data == nullptr);
+ pp->pick->user_data = (void**)&pp->lb_token;
+ // Pick via the RR policy.
+ bool pick_done = rr_policy_->PickLocked(pp->pick);
+ if (pick_done) {
+ PendingPickSetMetadataAndContext(pp);
+ if (force_async) {
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
+ pick_done = false;
+ }
+ Delete(pp);
+ }
+ // else, the pending pick will be registered and taken care of by the
+ // pending pick list inside the RR policy. Eventually,
+ // OnPendingPickComplete() will be called, which will (among other
+ // things) add the LB token to the call's initial metadata.
+ return pick_done;
+}
- glb_lb_policy* glb_policy =
- static_cast<glb_lb_policy*>(gpr_zalloc(sizeof(*glb_policy)));
+void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
+ GPR_ASSERT(rr_policy_ == nullptr);
+ rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
+ "round_robin", args);
+ if (rr_policy_ == nullptr) {
+ gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy",
+ this);
+ return;
+ }
+ // TODO(roth): We currently track this ref manually. Once the new
+ // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
+ auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested");
+ self.release();
+ rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_);
+ grpc_error* rr_state_error = nullptr;
+ rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error);
+ // Connectivity state is a function of the RR policy updated/created.
+ UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error);
+ // Add the gRPC LB's interested_parties pollset_set to that of the newly
+ // created RR policy. This will make the RR policy progress upon activity on
+ // gRPC LB, which in turn is tied to the application's call.
+ grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
+ interested_parties());
+ // Subscribe to changes to the connectivity of the new RR.
+ // TODO(roth): We currently track this ref manually. Once the new
+ // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
+ self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed");
+ self.release();
+ rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_,
+ &on_rr_connectivity_changed_);
+ rr_policy_->ExitIdleLocked();
+ // Send pending picks to RR policy.
+ PendingPick* pp;
+ while ((pp = pending_picks_)) {
+ pending_picks_ = pp->next;
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Pending pick about to (async) PICK from RR %p", this,
+ rr_policy_.get());
+ }
+ PickFromRoundRobinPolicyLocked(true /* force_async */, pp);
+ }
+ // Send pending pings to RR policy.
+ PendingPing* pping;
+ while ((pping = pending_pings_)) {
+ pending_pings_ = pping->next;
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
+ this, rr_policy_.get());
+ }
+ rr_policy_->PingOneLocked(pping->on_initiate, pping->on_ack);
+ Delete(pping);
+ }
+}
- /* Get server name. */
- arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
- const char* server_uri = grpc_channel_arg_get_string(arg);
- GPR_ASSERT(server_uri != nullptr);
- grpc_uri* uri = grpc_uri_parse(server_uri, true);
- GPR_ASSERT(uri->path[0] != '\0');
- glb_policy->server_name =
- gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Will use '%s' as the server name for LB request.",
- glb_policy, glb_policy->server_name);
+grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
+ grpc_lb_addresses* addresses;
+ if (serverlist_ != nullptr) {
+ GPR_ASSERT(serverlist_->num_servers > 0);
+ addresses = ProcessServerlist(serverlist_);
+ } else {
+ // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
+ // received any serverlist from the balancer, we use the fallback backends
+ // returned by the resolver. Note that the fallback backend list may be
+ // empty, in which case the new round_robin policy will keep the requested
+ // picks pending.
+ GPR_ASSERT(fallback_backend_addresses_ != nullptr);
+ addresses = grpc_lb_addresses_copy(fallback_backend_addresses_);
}
- grpc_uri_destroy(uri);
+ GPR_ASSERT(addresses != nullptr);
+ // Replace the LB addresses in the channel args that we pass down to
+ // the subchannel.
+ static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
+ const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
+ args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1);
+ grpc_lb_addresses_destroy(addresses);
+ return args;
+}
- glb_policy->cc_factory = args->client_channel_factory;
- GPR_ASSERT(glb_policy->cc_factory != nullptr);
+void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
+ if (shutting_down_) return;
+ grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked();
+ GPR_ASSERT(args != nullptr);
+ if (rr_policy_ != nullptr) {
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", this,
+ rr_policy_.get());
+ }
+ rr_policy_->UpdateLocked(*args);
+ } else {
+ LoadBalancingPolicy::Args lb_policy_args;
+ lb_policy_args.combiner = combiner();
+ lb_policy_args.client_channel_factory = client_channel_factory();
+ lb_policy_args.args = args;
+ CreateRoundRobinPolicyLocked(lb_policy_args);
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", this,
+ rr_policy_.get());
+ }
+ }
+ grpc_channel_args_destroy(args);
+}
- arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
- glb_policy->lb_call_timeout_ms =
- grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
+void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg,
+ grpc_error* error) {
+ GrpcLb* grpclb_policy = reinterpret_cast<GrpcLb*>(arg);
+ if (grpclb_policy->shutting_down_ || error != GRPC_ERROR_NONE) {
+ grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested");
+ return;
+ }
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(
+ GPR_DEBUG,
+ "[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
+ grpclb_policy, grpclb_policy->rr_policy_.get());
+ }
+ // If we are talking to a balancer, we expect to get updated addresses form
+ // the balancer, so we can ignore the re-resolution request from the RR
+ // policy. Otherwise, handle the re-resolution request using the
+ // grpclb policy's original re-resolution closure.
+ if (grpclb_policy->lb_calld_ == nullptr ||
+ !grpclb_policy->lb_calld_->seen_initial_response()) {
+ grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
+ }
+ // Give back the wrapper closure to the RR policy.
+ grpclb_policy->rr_policy_->SetReresolutionClosureLocked(
+ &grpclb_policy->on_rr_request_reresolution_);
+}
- arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
- glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer(
- arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
+void GrpcLb::UpdateConnectivityStateFromRoundRobinPolicyLocked(
+ grpc_error* rr_state_error) {
+ const grpc_connectivity_state curr_glb_state =
+ grpc_connectivity_state_check(&state_tracker_);
+ /* The new connectivity status is a function of the previous one and the new
+ * input coming from the status of the RR policy.
+ *
+ * current state (grpclb's)
+ * |
+ * v || I | C | R | TF | SD | <- new state (RR's)
+ * ===++====+=====+=====+======+======+
+ * I || I | C | R | [I] | [I] |
+ * ---++----+-----+-----+------+------+
+ * C || I | C | R | [C] | [C] |
+ * ---++----+-----+-----+------+------+
+ * R || I | C | R | [R] | [R] |
+ * ---++----+-----+-----+------+------+
+ * TF || I | C | R | [TF] | [TF] |
+ * ---++----+-----+-----+------+------+
+ * SD || NA | NA | NA | NA | NA | (*)
+ * ---++----+-----+-----+------+------+
+ *
+ * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
+ * is the current state of grpclb, which is left untouched.
+ *
+ * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
+ * the previous RR instance.
+ *
+ * Note that the status is never updated to SHUTDOWN as a result of calling
+ * this function. Only glb_shutdown() has the power to set that state.
+ *
+ * (*) This function mustn't be called during shutting down. */
+ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
+ switch (rr_connectivity_state_) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
+ break;
+ case GRPC_CHANNEL_IDLE:
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_READY:
+ GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
+ }
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(
+ GPR_INFO,
+ "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
+ this, grpc_connectivity_state_name(rr_connectivity_state_),
+ rr_policy_.get());
+ }
+ grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_,
+ rr_state_error,
+ "update_lb_connectivity_status_locked");
+}
- // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
- // since we use this to trigger the client_load_reporting filter.
- grpc_arg new_arg = grpc_channel_arg_string_create(
- (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
- static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
- glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
- args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
-
- /* Extract the backend addresses (may be empty) from the resolver for
- * fallback. */
- glb_policy->fallback_backend_addresses =
- extract_backend_addresses_locked(addresses);
-
- /* Create a client channel over them to communicate with a LB service */
- glb_policy->response_generator =
- grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
- grpc_channel_args* lb_channel_args = build_lb_channel_args(
- addresses, glb_policy->response_generator.get(), args->args);
- char* uri_str;
- gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
- glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
- uri_str, args->client_channel_factory, lb_channel_args);
-
- /* Propagate initial resolution */
- glb_policy->response_generator->SetResponse(lb_channel_args);
- grpc_channel_args_destroy(lb_channel_args);
- gpr_free(uri_str);
- if (glb_policy->lb_channel == nullptr) {
- gpr_free((void*)glb_policy->server_name);
- grpc_channel_args_destroy(glb_policy->args);
- gpr_free(glb_policy);
- return nullptr;
+void GrpcLb::OnRoundRobinConnectivityChangedLocked(void* arg,
+ grpc_error* error) {
+ GrpcLb* grpclb_policy = reinterpret_cast<GrpcLb*>(arg);
+ if (grpclb_policy->shutting_down_) {
+ grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed");
+ return;
}
- grpc_subchannel_index_ref();
- GRPC_CLOSURE_INIT(&glb_policy->rr_on_connectivity_changed,
- rr_on_connectivity_changed_locked, glb_policy,
- grpc_combiner_scheduler(args->combiner));
- GRPC_CLOSURE_INIT(&glb_policy->rr_on_reresolution_requested,
- rr_on_reresolution_requested_locked, glb_policy,
- grpc_combiner_scheduler(args->combiner));
- GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
- glb_lb_channel_on_connectivity_changed_cb, glb_policy,
- grpc_combiner_scheduler(args->combiner));
- grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
- grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
- "grpclb");
- // Init LB call backoff option.
- grpc_core::BackOff::Options backoff_options;
- backoff_options
- .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
- .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
- .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
- .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
- glb_policy->lb_call_backoff.Init(backoff_options);
- return &glb_policy->base;
+ grpclb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked(
+ GRPC_ERROR_REF(error));
+ // Resubscribe. Reuse the "on_rr_connectivity_changed" ref.
+ grpclb_policy->rr_policy_->NotifyOnStateChangeLocked(
+ &grpclb_policy->rr_connectivity_state_,
+ &grpclb_policy->on_rr_connectivity_changed_);
}
-static void glb_factory_ref(grpc_lb_policy_factory* factory) {}
+//
+// factory
+//
-static void glb_factory_unref(grpc_lb_policy_factory* factory) {}
+class GrpcLbFactory : public LoadBalancingPolicyFactory {
+ public:
+ OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+ const LoadBalancingPolicy::Args& args) const override {
+ /* Count the number of gRPC-LB addresses. There must be at least one. */
+ const grpc_arg* arg =
+ grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
+ return nullptr;
+ }
+ grpc_lb_addresses* addresses =
+ reinterpret_cast<grpc_lb_addresses*>(arg->value.pointer.p);
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ }
+ if (num_grpclb_addrs == 0) return nullptr;
+ return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args));
+ }
-static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
- glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
+ const char* name() const override { return "grpclb"; }
+};
-static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
+} // namespace
-grpc_lb_policy_factory* grpc_glb_lb_factory_create() {
- return &glb_lb_policy_factory;
-}
+} // namespace grpc_core
-/* Plugin registration */
+//
+// Plugin registration
+//
+
+namespace {
// Only add client_load_reporting filter if the grpclb LB policy is used.
-static bool maybe_add_client_load_reporting_filter(
- grpc_channel_stack_builder* builder, void* arg) {
+bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
+ void* arg) {
const grpc_channel_args* args =
grpc_channel_stack_builder_get_channel_arguments(builder);
const grpc_arg* channel_arg =
@@ -1938,14 +1867,18 @@ static bool maybe_add_client_load_reporting_filter(
if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
strcmp(channel_arg->value.string, "grpclb") == 0) {
return grpc_channel_stack_builder_append_filter(
- builder, static_cast<const grpc_channel_filter*>(arg), nullptr,
- nullptr);
+ builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
}
return true;
}
+} // namespace
+
void grpc_lb_policy_grpclb_init() {
- grpc_register_lb_policy(grpc_glb_lb_factory_create());
+ grpc_core::LoadBalancingPolicyRegistry::Builder::
+ RegisterLoadBalancingPolicyFactory(
+ grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
+ grpc_core::New<grpc_core::GrpcLbFactory>()));
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_client_load_reporting_filter,
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
deleted file mode 100644
index 0a2edb0e3d..0000000000
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H
-#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H
-
-#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
-
-/** Returns a load balancing factory for the glb policy, which tries to connect
- * to a load balancing server to decide the next successfully connected
- * subchannel to pick. */
-grpc_lb_policy_factory* grpc_glb_lb_factory_create();
-
-#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H */
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 296bdcb247..f1878a594e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -29,194 +29,225 @@
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
-grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
+namespace grpc_core {
+
+TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
+
+namespace {
+
+//
+// pick_first LB policy
+//
+
+class PickFirst : public LoadBalancingPolicy {
+ public:
+ explicit PickFirst(const Args& args);
+
+ void UpdateLocked(const grpc_channel_args& args) override;
+ bool PickLocked(PickState* pick) override;
+ void CancelPickLocked(PickState* pick, grpc_error* error) override;
+ void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error* error) override;
+ void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
+ grpc_closure* closure) override;
+ grpc_connectivity_state CheckConnectivityLocked(
+ grpc_error** connectivity_error) override;
+ void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
+ void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
+ void ExitIdleLocked() override;
+
+ private:
+ ~PickFirst();
+
+ void ShutdownLocked() override;
+
+ void StartPickingLocked();
+ void DestroyUnselectedSubchannelsLocked();
+
+ static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
+
+ void SubchannelListRefForConnectivityWatch(
+ grpc_lb_subchannel_list* subchannel_list, const char* reason);
+ void SubchannelListUnrefForConnectivityWatch(
+ grpc_lb_subchannel_list* subchannel_list, const char* reason);
-typedef struct {
- /** base policy: must be first */
- grpc_lb_policy base;
/** all our subchannels */
- grpc_lb_subchannel_list* subchannel_list;
+ grpc_lb_subchannel_list* subchannel_list_ = nullptr;
/** latest pending subchannel list */
- grpc_lb_subchannel_list* latest_pending_subchannel_list;
+ grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr;
/** selected subchannel in \a subchannel_list */
- grpc_lb_subchannel_data* selected;
+ grpc_lb_subchannel_data* selected_ = nullptr;
/** have we started picking? */
- bool started_picking;
+ bool started_picking_ = false;
/** are we shut down? */
- bool shutdown;
+ bool shutdown_ = false;
/** list of picks that are waiting on connectivity */
- grpc_lb_policy_pick_state* pending_picks;
+ PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
- grpc_connectivity_state_tracker state_tracker;
-} pick_first_lb_policy;
-
-static void pf_destroy(grpc_lb_policy* pol) {
- pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
- GPR_ASSERT(p->subchannel_list == nullptr);
- GPR_ASSERT(p->latest_pending_subchannel_list == nullptr);
- GPR_ASSERT(p->pending_picks == nullptr);
- grpc_connectivity_state_destroy(&p->state_tracker);
- gpr_free(p);
- grpc_subchannel_index_unref();
+ grpc_connectivity_state_tracker state_tracker_;
+};
+
+PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
+ GPR_ASSERT(args.client_channel_factory != nullptr);
+ grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
+ "pick_first");
if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void*)p);
+ gpr_log(GPR_DEBUG, "Pick First %p created.", this);
}
+ UpdateLocked(*args.args);
+ grpc_subchannel_index_ref();
}
-static void pf_shutdown_locked(grpc_lb_policy* pol,
- grpc_lb_policy* new_policy) {
- pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
- grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
+PickFirst::~PickFirst() {
if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p);
+ gpr_log(GPR_DEBUG, "Destroying Pick First %p", this);
}
- p->shutdown = true;
- grpc_lb_policy_pick_state* pick;
- while ((pick = p->pending_picks) != nullptr) {
- p->pending_picks = pick->next;
- if (new_policy != nullptr) {
- // Hand off to new LB policy.
- if (grpc_lb_policy_pick_locked(new_policy, pick)) {
- // Synchronous return, schedule closure.
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
- }
- } else {
- pick->connected_subchannel.reset();
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
+ GPR_ASSERT(subchannel_list_ == nullptr);
+ GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
+ GPR_ASSERT(pending_picks_ == nullptr);
+ grpc_connectivity_state_destroy(&state_tracker_);
+ grpc_subchannel_index_unref();
+}
+
+void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
+ PickState* pick;
+ while ((pick = pending_picks_) != nullptr) {
+ pending_picks_ = pick->next;
+ if (new_policy->PickLocked(pick)) {
+ // Synchronous return, schedule closure.
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
}
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+}
+
+void PickFirst::ShutdownLocked() {
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
+ if (grpc_lb_pick_first_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "Pick First %p Shutting down", this);
+ }
+ shutdown_ = true;
+ PickState* pick;
+ while ((pick = pending_picks_) != nullptr) {
+ pending_picks_ = pick->next;
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
+ }
+ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "shutdown");
- if (p->subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
- "pf_shutdown");
- p->subchannel_list = nullptr;
+ if (subchannel_list_ != nullptr) {
+ grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "pf_shutdown");
+ subchannel_list_ = nullptr;
}
- if (p->latest_pending_subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(
- p->latest_pending_subchannel_list, "pf_shutdown");
- p->latest_pending_subchannel_list = nullptr;
+ if (latest_pending_subchannel_list_ != nullptr) {
+ grpc_lb_subchannel_list_shutdown_and_unref(latest_pending_subchannel_list_,
+ "pf_shutdown");
+ latest_pending_subchannel_list_ = nullptr;
}
- grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
- GRPC_ERROR_CANCELLED);
+ TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
-static void pf_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_lb_policy_pick_state* pick,
- grpc_error* error) {
- pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
- grpc_lb_policy_pick_state* pp = p->pending_picks;
- p->pending_picks = nullptr;
+void PickFirst::CancelPickLocked(PickState* pick, grpc_error* error) {
+ PickState* pp = pending_picks_;
+ pending_picks_ = nullptr;
while (pp != nullptr) {
- grpc_lb_policy_pick_state* next = pp->next;
+ PickState* next = pp->next;
if (pp == pick) {
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
- pp->next = p->pending_picks;
- p->pending_picks = pp;
+ pp->next = pending_picks_;
+ pending_picks_ = pp;
}
pp = next;
}
GRPC_ERROR_UNREF(error);
}
-static void pf_cancel_picks_locked(grpc_lb_policy* pol,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq,
- grpc_error* error) {
- pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
- grpc_lb_policy_pick_state* pick = p->pending_picks;
- p->pending_picks = nullptr;
+void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error* error) {
+ PickState* pick = pending_picks_;
+ pending_picks_ = nullptr;
while (pick != nullptr) {
- grpc_lb_policy_pick_state* next = pick->next;
+ PickState* next = pick->next;
if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
- pick->next = p->pending_picks;
- p->pending_picks = pick;
+ pick->next = pending_picks_;
+ pending_picks_ = pick;
}
pick = next;
}
GRPC_ERROR_UNREF(error);
}
-static void start_picking_locked(pick_first_lb_policy* p) {
- p->started_picking = true;
- if (p->subchannel_list != nullptr &&
- p->subchannel_list->num_subchannels > 0) {
- p->subchannel_list->checking_subchannel = 0;
- for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
- if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
- grpc_lb_subchannel_list_ref_for_connectivity_watch(
- p->subchannel_list, "connectivity_watch+start_picking");
+void PickFirst::StartPickingLocked() {
+ started_picking_ = true;
+ if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels > 0) {
+ subchannel_list_->checking_subchannel = 0;
+ for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
+ if (subchannel_list_->subchannels[i].subchannel != nullptr) {
+ SubchannelListRefForConnectivityWatch(
+ subchannel_list_, "connectivity_watch+start_picking");
grpc_lb_subchannel_data_start_connectivity_watch(
- &p->subchannel_list->subchannels[i]);
+ &subchannel_list_->subchannels[i]);
break;
}
}
}
}
-static void pf_exit_idle_locked(grpc_lb_policy* pol) {
- pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
- if (!p->started_picking) {
- start_picking_locked(p);
+void PickFirst::ExitIdleLocked() {
+ if (!started_picking_) {
+ StartPickingLocked();
}
}
-static int pf_pick_locked(grpc_lb_policy* pol,
- grpc_lb_policy_pick_state* pick) {
- pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
+bool PickFirst::PickLocked(PickState* pick) {
// If we have a selected subchannel already, return synchronously.
- if (p->selected != nullptr) {
- pick->connected_subchannel = p->selected->connected_subchannel;
- return 1;
+ if (selected_ != nullptr) {
+ pick->connected_subchannel = selected_->connected_subchannel;
+ return true;
}
// No subchannel selected yet, so handle asynchronously.
- if (!p->started_picking) {
- start_picking_locked(p);
+ if (!started_picking_) {
+ StartPickingLocked();
}
- pick->next = p->pending_picks;
- p->pending_picks = pick;
- return 0;
+ pick->next = pending_picks_;
+ pending_picks_ = pick;
+ return false;
}
-static void destroy_unselected_subchannels_locked(pick_first_lb_policy* p) {
- for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
- grpc_lb_subchannel_data* sd = &p->subchannel_list->subchannels[i];
- if (p->selected != sd) {
+void PickFirst::DestroyUnselectedSubchannelsLocked() {
+ for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
+ grpc_lb_subchannel_data* sd = &subchannel_list_->subchannels[i];
+ if (selected_ != sd) {
grpc_lb_subchannel_data_unref_subchannel(sd,
"selected_different_subchannel");
}
}
}
-static grpc_connectivity_state pf_check_connectivity_locked(
- grpc_lb_policy* pol, grpc_error** error) {
- pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
- return grpc_connectivity_state_get(&p->state_tracker, error);
+grpc_connectivity_state PickFirst::CheckConnectivityLocked(grpc_error** error) {
+ return grpc_connectivity_state_get(&state_tracker_, error);
}
-static void pf_notify_on_state_change_locked(grpc_lb_policy* pol,
- grpc_connectivity_state* current,
- grpc_closure* notify) {
- pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
- grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
+ grpc_closure* notify) {
+ grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
notify);
}
-static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
- grpc_closure* on_ack) {
- pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol);
- if (p->selected) {
- p->selected->connected_subchannel->Ping(on_initiate, on_ack);
+void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
+ if (selected_ != nullptr) {
+ selected_->connected_subchannel->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
@@ -225,18 +256,31 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
}
}
-static void pf_connectivity_changed_locked(void* arg, grpc_error* error);
+void PickFirst::SubchannelListRefForConnectivityWatch(
+ grpc_lb_subchannel_list* subchannel_list, const char* reason) {
+ // TODO(roth): We currently track this ref manually. Once the new
+ // ClosureRef API is ready and the subchannel_list code has been
+ // converted to a C++ API, find a way to hold the RefCountedPtr<>
+ // somewhere (maybe in the subchannel_data object) instead of doing
+ // this manually.
+ auto self = Ref(DEBUG_LOCATION, reason);
+ self.release();
+ grpc_lb_subchannel_list_ref(subchannel_list, reason);
+}
-static void pf_update_locked(grpc_lb_policy* policy,
- const grpc_lb_policy_args* args) {
- pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(policy);
- const grpc_arg* arg =
- grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+void PickFirst::SubchannelListUnrefForConnectivityWatch(
+ grpc_lb_subchannel_list* subchannel_list, const char* reason) {
+ Unref(DEBUG_LOCATION, reason);
+ grpc_lb_subchannel_list_unref(subchannel_list, reason);
+}
+
+void PickFirst::UpdateLocked(const grpc_channel_args& args) {
+ const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
- if (p->subchannel_list == nullptr) {
+ if (subchannel_list_ == nullptr) {
// If we don't have a current subchannel list, go into TRANSIENT FAILURE.
grpc_connectivity_state_set(
- &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"pf_update_missing");
} else {
@@ -244,77 +288,78 @@ static void pf_update_locked(grpc_lb_policy* policy,
gpr_log(GPR_ERROR,
"No valid LB addresses channel arg for Pick First %p update, "
"ignoring.",
- (void*)p);
+ this);
}
return;
}
const grpc_lb_addresses* addresses =
- static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
+ (const grpc_lb_addresses*)arg->value.pointer.p;
if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
- (void*)p, static_cast<unsigned long>(addresses->num_addresses));
+ gpr_log(GPR_INFO,
+ "Pick First %p received update with %" PRIuPTR " addresses", this,
+ addresses->num_addresses);
}
grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
- &p->base, &grpc_lb_pick_first_trace, addresses, args,
- pf_connectivity_changed_locked);
+ this, &grpc_lb_pick_first_trace, addresses, combiner(),
+ client_channel_factory(), args, &PickFirst::OnConnectivityChangedLocked);
if (subchannel_list->num_subchannels == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current
// subchannels and put the channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
- &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"pf_update_empty");
- if (p->subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
+ if (subchannel_list_ != nullptr) {
+ grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
"sl_shutdown_empty_update");
}
- p->subchannel_list = subchannel_list; // Empty list.
- p->selected = nullptr;
+ subchannel_list_ = subchannel_list; // Empty list.
+ selected_ = nullptr;
return;
}
- if (p->selected == nullptr) {
+ if (selected_ == nullptr) {
// We don't yet have a selected subchannel, so replace the current
// subchannel list immediately.
- if (p->subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
+ if (subchannel_list_ != nullptr) {
+ grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
"pf_update_before_selected");
}
- p->subchannel_list = subchannel_list;
+ subchannel_list_ = subchannel_list;
} else {
// We do have a selected subchannel.
// Check if it's present in the new list. If so, we're done.
for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i];
- if (sd->subchannel == p->selected->subchannel) {
+ if (sd->subchannel == selected_->subchannel) {
// The currently selected subchannel is in the update: we are done.
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p found already selected subchannel %p "
"at update index %" PRIuPTR " of %" PRIuPTR "; update done",
- p, p->selected->subchannel, i,
+ this, selected_->subchannel, i,
subchannel_list->num_subchannels);
}
- if (p->selected->connected_subchannel != nullptr) {
- sd->connected_subchannel = p->selected->connected_subchannel;
+ if (selected_->connected_subchannel != nullptr) {
+ sd->connected_subchannel = selected_->connected_subchannel;
}
- p->selected = sd;
- if (p->subchannel_list != nullptr) {
+ selected_ = sd;
+ if (subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
- p->subchannel_list, "pf_update_includes_selected");
+ subchannel_list_, "pf_update_includes_selected");
}
- p->subchannel_list = subchannel_list;
- destroy_unselected_subchannels_locked(p);
- grpc_lb_subchannel_list_ref_for_connectivity_watch(
+ subchannel_list_ = subchannel_list;
+ DestroyUnselectedSubchannelsLocked();
+ SubchannelListRefForConnectivityWatch(
subchannel_list, "connectivity_watch+replace_selected");
grpc_lb_subchannel_data_start_connectivity_watch(sd);
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
- if (p->latest_pending_subchannel_list != nullptr) {
+ if (latest_pending_subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
- p->latest_pending_subchannel_list,
+ latest_pending_subchannel_list_,
"pf_update_includes_selected+outdated");
- p->latest_pending_subchannel_list = nullptr;
+ latest_pending_subchannel_list_ = nullptr;
}
return;
}
@@ -323,84 +368,81 @@ static void pf_update_locked(grpc_lb_policy* policy,
// pending subchannel list to the new subchannel list. We will wait
// for it to report READY before swapping it into the current
// subchannel list.
- if (p->latest_pending_subchannel_list != nullptr) {
+ if (latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG,
"Pick First %p Shutting down latest pending subchannel list "
"%p, about to be replaced by newer latest %p",
- (void*)p, (void*)p->latest_pending_subchannel_list,
- (void*)subchannel_list);
+ this, latest_pending_subchannel_list_, subchannel_list);
}
grpc_lb_subchannel_list_shutdown_and_unref(
- p->latest_pending_subchannel_list, "sl_outdated_dont_smash");
+ latest_pending_subchannel_list_, "sl_outdated_dont_smash");
}
- p->latest_pending_subchannel_list = subchannel_list;
+ latest_pending_subchannel_list_ = subchannel_list;
}
// If we've started picking, start trying to connect to the first
// subchannel in the new list.
- if (p->started_picking) {
- grpc_lb_subchannel_list_ref_for_connectivity_watch(
- subchannel_list, "connectivity_watch+update");
+ if (started_picking_) {
+ SubchannelListRefForConnectivityWatch(subchannel_list,
+ "connectivity_watch+update");
grpc_lb_subchannel_data_start_connectivity_watch(
&subchannel_list->subchannels[0]);
}
}
-static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
- grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg);
- pick_first_lb_policy* p =
- reinterpret_cast<pick_first_lb_policy*>(sd->subchannel_list->policy);
+void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
+ grpc_lb_subchannel_data* sd = reinterpret_cast<grpc_lb_subchannel_data*>(arg);
+ PickFirst* p = reinterpret_cast<PickFirst*>(sd->subchannel_list->policy);
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG,
"Pick First %p connectivity changed for subchannel %p (%" PRIuPTR
" of %" PRIuPTR
- "), subchannel_list %p: state=%s p->shutdown=%d "
+ "), subchannel_list %p: state=%s p->shutdown_=%d "
"sd->subchannel_list->shutting_down=%d error=%s",
- (void*)p, (void*)sd->subchannel,
- sd->subchannel_list->checking_subchannel,
- sd->subchannel_list->num_subchannels, (void*)sd->subchannel_list,
+ p, sd->subchannel, sd->subchannel_list->checking_subchannel,
+ sd->subchannel_list->num_subchannels, sd->subchannel_list,
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
- p->shutdown, sd->subchannel_list->shutting_down,
+ p->shutdown_, sd->subchannel_list->shutting_down,
grpc_error_string(error));
}
// If the policy is shutting down, unref and return.
- if (p->shutdown) {
+ if (p->shutdown_) {
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_data_unref_subchannel(sd, "pf_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
- "pf_shutdown");
+ p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
+ "pf_shutdown");
return;
}
// If the subchannel list is shutting down, stop watching.
if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_data_unref_subchannel(sd, "pf_sl_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
- "pf_sl_shutdown");
+ p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
+ "pf_sl_shutdown");
return;
}
// If we're still here, the notification must be for a subchannel in
// either the current or latest pending subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
- sd->subchannel_list == p->latest_pending_subchannel_list);
+ GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ ||
+ sd->subchannel_list == p->latest_pending_subchannel_list_);
// Update state.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
// Handle updates for the currently selected subchannel.
- if (p->selected == sd) {
+ if (p->selected_ == sd) {
// If the new state is anything other than READY and there is a
// pending update, switch to the pending update.
if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
- p->latest_pending_subchannel_list != nullptr) {
- p->selected = nullptr;
+ p->latest_pending_subchannel_list_ != nullptr) {
+ p->selected_ = nullptr;
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
+ p->SubchannelListUnrefForConnectivityWatch(
sd->subchannel_list, "selected_not_ready+switch_to_update");
grpc_lb_subchannel_list_shutdown_and_unref(
- p->subchannel_list, "selected_not_ready+switch_to_update");
- p->subchannel_list = p->latest_pending_subchannel_list;
- p->latest_pending_subchannel_list = nullptr;
+ p->subchannel_list_, "selected_not_ready+switch_to_update");
+ p->subchannel_list_ = p->latest_pending_subchannel_list_;
+ p->latest_pending_subchannel_list_ = nullptr;
grpc_connectivity_state_set(
- &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
} else {
// TODO(juanlishen): we re-resolve when the selected subchannel goes to
@@ -411,21 +453,20 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
+ grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
"selected_changed+reresolve");
- p->started_picking = false;
- grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace,
- GRPC_ERROR_NONE);
- // in transient failure. Rely on re-resolution to recover.
- p->selected = nullptr;
+ p->started_picking_ = false;
+ p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
+ // In transient failure. Rely on re-resolution to recover.
+ p->selected_ = nullptr;
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
- grpc_lb_subchannel_list_unref_for_connectivity_watch(
- sd->subchannel_list, "pf_selected_shutdown");
+ p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
+ "pf_selected_shutdown");
grpc_lb_subchannel_data_unref_subchannel(
sd, "pf_selected_shutdown"); // Unrefs connected subchannel
} else {
- grpc_connectivity_state_set(&p->state_tracker,
+ grpc_connectivity_state_set(&p->state_tracker_,
sd->curr_connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
// Renew notification.
@@ -436,45 +477,45 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
}
// If we get here, there are two possible cases:
// 1. We do not currently have a selected subchannel, and the update is
- // for a subchannel in p->subchannel_list that we're trying to
+ // for a subchannel in p->subchannel_list_ that we're trying to
// connect to. The goal here is to find a subchannel that we can
// select.
// 2. We do currently have a selected subchannel, and the update is
- // for a subchannel in p->latest_pending_subchannel_list. The
+ // for a subchannel in p->latest_pending_subchannel_list_. The
// goal here is to find a subchannel from the update that we can
// select in place of the current one.
switch (sd->curr_connectivity_state) {
case GRPC_CHANNEL_READY: {
- // Case 2. Promote p->latest_pending_subchannel_list to
- // p->subchannel_list.
+ // Case 2. Promote p->latest_pending_subchannel_list_ to
+ // p->subchannel_list_.
sd->connected_subchannel =
grpc_subchannel_get_connected_subchannel(sd->subchannel);
- if (sd->subchannel_list == p->latest_pending_subchannel_list) {
- GPR_ASSERT(p->subchannel_list != nullptr);
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
+ if (sd->subchannel_list == p->latest_pending_subchannel_list_) {
+ GPR_ASSERT(p->subchannel_list_ != nullptr);
+ grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_,
"finish_update");
- p->subchannel_list = p->latest_pending_subchannel_list;
- p->latest_pending_subchannel_list = nullptr;
+ p->subchannel_list_ = p->latest_pending_subchannel_list_;
+ p->latest_pending_subchannel_list_ = nullptr;
}
// Cases 1 and 2.
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
+ grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "connecting_ready");
- p->selected = sd;
+ p->selected_ = sd;
if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p,
- (void*)sd->subchannel);
+ gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p,
+ sd->subchannel);
}
// Drop all other subchannels, since we are now connected.
- destroy_unselected_subchannels_locked(p);
+ p->DestroyUnselectedSubchannelsLocked();
// Update any calls that were waiting for a pick.
- grpc_lb_policy_pick_state* pick;
- while ((pick = p->pending_picks)) {
- p->pending_picks = pick->next;
- pick->connected_subchannel = p->selected->connected_subchannel;
+ PickState* pick;
+ while ((pick = p->pending_picks_)) {
+ p->pending_picks_ = pick->next;
+ pick->connected_subchannel = p->selected_->connected_subchannel;
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Servicing pending pick with selected subchannel %p",
- (void*)p->selected);
+ p->selected_);
}
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
@@ -494,9 +535,9 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->subchannel_list->checking_subchannel == 0 &&
- sd->subchannel_list == p->subchannel_list) {
+ sd->subchannel_list == p->subchannel_list_) {
grpc_connectivity_state_set(
- &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connecting_transient_failure");
}
// Reuses the connectivity refs from the previous watch.
@@ -506,8 +547,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE: {
// Only update connectivity state in case 1.
- if (sd->subchannel_list == p->subchannel_list) {
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING,
+ if (sd->subchannel_list == p->subchannel_list_) {
+ grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_REF(error),
"connecting_changed");
}
@@ -520,51 +561,29 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
}
}
-static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_destroy,
- pf_shutdown_locked,
- pf_pick_locked,
- pf_cancel_pick_locked,
- pf_cancel_picks_locked,
- pf_ping_one_locked,
- pf_exit_idle_locked,
- pf_check_connectivity_locked,
- pf_notify_on_state_change_locked,
- pf_update_locked};
-
-static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {}
-
-static void pick_first_factory_unref(grpc_lb_policy_factory* factory) {}
-
-static grpc_lb_policy* create_pick_first(grpc_lb_policy_factory* factory,
- grpc_lb_policy_args* args) {
- GPR_ASSERT(args->client_channel_factory != nullptr);
- pick_first_lb_policy* p =
- static_cast<pick_first_lb_policy*>(gpr_zalloc(sizeof(*p)));
- if (grpc_lb_pick_first_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Pick First %p created.", (void*)p);
- }
- pf_update_locked(&p->base, args);
- grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
- grpc_subchannel_index_ref();
- return &p->base;
-}
+//
+// factory
+//
-static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = {
- pick_first_factory_ref, pick_first_factory_unref, create_pick_first,
- "pick_first"};
+class PickFirstFactory : public LoadBalancingPolicyFactory {
+ public:
+ OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+ const LoadBalancingPolicy::Args& args) const override {
+ return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(args));
+ }
-static grpc_lb_policy_factory pick_first_lb_policy_factory = {
- &pick_first_factory_vtable};
+ const char* name() const override { return "pick_first"; }
+};
-static grpc_lb_policy_factory* pick_first_lb_factory_create() {
- return &pick_first_lb_policy_factory;
-}
+} // namespace
-/* Plugin registration */
+} // namespace grpc_core
void grpc_lb_policy_pick_first_init() {
- grpc_register_lb_policy(pick_first_lb_factory_create());
+ grpc_core::LoadBalancingPolicyRegistry::Builder::
+ RegisterLoadBalancingPolicyFactory(
+ grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
+ grpc_core::New<grpc_core::PickFirstFactory>()));
}
void grpc_lb_policy_pick_first_shutdown() {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index b5b4c44ef1..178e299b61 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -40,34 +40,94 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
-grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
-
-typedef struct round_robin_lb_policy {
- /** base policy: must be first */
- grpc_lb_policy base;
-
- grpc_lb_subchannel_list* subchannel_list;
-
+namespace grpc_core {
+
+TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
+
+namespace {
+
+//
+// round_robin LB policy
+//
+
+class RoundRobin : public LoadBalancingPolicy {
+ public:
+ explicit RoundRobin(const Args& args);
+
+ void UpdateLocked(const grpc_channel_args& args) override;
+ bool PickLocked(PickState* pick) override;
+ void CancelPickLocked(PickState* pick, grpc_error* error) override;
+ void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error* error) override;
+ void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
+ grpc_closure* closure) override;
+ grpc_connectivity_state CheckConnectivityLocked(
+ grpc_error** connectivity_error) override;
+ void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
+ void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
+ void ExitIdleLocked() override;
+
+ private:
+ ~RoundRobin();
+
+ void ShutdownLocked() override;
+
+ void StartPickingLocked();
+ size_t GetNextReadySubchannelIndexLocked();
+ void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index);
+ void UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
+ grpc_error* error);
+
+ static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
+
+ void SubchannelListRefForConnectivityWatch(
+ grpc_lb_subchannel_list* subchannel_list, const char* reason);
+ void SubchannelListUnrefForConnectivityWatch(
+ grpc_lb_subchannel_list* subchannel_list, const char* reason);
+
+ /** list of subchannels */
+ grpc_lb_subchannel_list* subchannel_list_ = nullptr;
/** have we started picking? */
- bool started_picking;
+ bool started_picking_ = false;
/** are we shutting down? */
- bool shutdown;
+ bool shutdown_ = false;
/** List of picks that are waiting on connectivity */
- grpc_lb_policy_pick_state* pending_picks;
-
+ PickState* pending_picks_ = nullptr;
/** our connectivity state tracker */
- grpc_connectivity_state_tracker state_tracker;
-
+ grpc_connectivity_state_tracker state_tracker_;
/** Index into subchannels for last pick. */
- size_t last_ready_subchannel_index;
-
+ size_t last_ready_subchannel_index_ = 0;
/** Latest version of the subchannel list.
* Subchannel connectivity callbacks will only promote updated subchannel
* lists if they equal \a latest_pending_subchannel_list. In other words,
* racing callbacks that reference outdated subchannel lists won't perform any
* update. */
- grpc_lb_subchannel_list* latest_pending_subchannel_list;
-} round_robin_lb_policy;
+ grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr;
+};
+
+RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
+ GPR_ASSERT(args.client_channel_factory != nullptr);
+ grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
+ "round_robin");
+ UpdateLocked(*args.args);
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "[RR %p] Created with %" PRIuPTR " subchannels", this,
+ subchannel_list_->num_subchannels);
+ }
+ grpc_subchannel_index_ref();
+}
+
+RoundRobin::~RoundRobin() {
+ if (grpc_lb_round_robin_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy", this);
+ }
+ GPR_ASSERT(subchannel_list_ == nullptr);
+ GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
+ GPR_ASSERT(pending_picks_ == nullptr);
+ grpc_connectivity_state_destroy(&state_tracker_);
+ grpc_subchannel_index_unref();
+}
/** Returns the index into p->subchannel_list->subchannels of the next
* subchannel in READY state, or p->subchannel_list->num_subchannels if no
@@ -75,195 +135,190 @@ typedef struct round_robin_lb_policy {
*
* Note that this function does *not* update p->last_ready_subchannel_index.
* The caller must do that if it returns a pick. */
-static size_t get_next_ready_subchannel_index_locked(
- const round_robin_lb_policy* p) {
- GPR_ASSERT(p->subchannel_list != nullptr);
+size_t RoundRobin::GetNextReadySubchannelIndexLocked() {
+ GPR_ASSERT(subchannel_list_ != nullptr);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
- "[RR %p] getting next ready subchannel (out of %lu), "
- "last_ready_subchannel_index=%lu",
- (void*)p,
- static_cast<unsigned long>(p->subchannel_list->num_subchannels),
- static_cast<unsigned long>(p->last_ready_subchannel_index));
- }
- for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
- const size_t index = (i + p->last_ready_subchannel_index + 1) %
- p->subchannel_list->num_subchannels;
+ "[RR %p] getting next ready subchannel (out of %" PRIuPTR
+ "), "
+ "last_ready_subchannel_index=%" PRIuPTR,
+ this, subchannel_list_->num_subchannels,
+ last_ready_subchannel_index_);
+ }
+ for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) {
+ const size_t index = (i + last_ready_subchannel_index_ + 1) %
+ subchannel_list_->num_subchannels;
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_DEBUG,
- "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
- "state=%s",
- (void*)p, (void*)p->subchannel_list->subchannels[index].subchannel,
- (void*)p->subchannel_list, static_cast<unsigned long>(index),
+ "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR
+ ": state=%s",
+ this, subchannel_list_->subchannels[index].subchannel,
+ subchannel_list_, index,
grpc_connectivity_state_name(
- p->subchannel_list->subchannels[index].curr_connectivity_state));
+ subchannel_list_->subchannels[index].curr_connectivity_state));
}
- if (p->subchannel_list->subchannels[index].curr_connectivity_state ==
+ if (subchannel_list_->subchannels[index].curr_connectivity_state ==
GRPC_CHANNEL_READY) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
- "[RR %p] found next ready subchannel (%p) at index %lu of "
- "subchannel_list %p",
- (void*)p,
- (void*)p->subchannel_list->subchannels[index].subchannel,
- static_cast<unsigned long>(index), (void*)p->subchannel_list);
+ "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR
+ " of subchannel_list %p",
+ this, subchannel_list_->subchannels[index].subchannel, index,
+ subchannel_list_);
}
return index;
}
}
if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void*)p);
+ gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", this);
}
- return p->subchannel_list->num_subchannels;
+ return subchannel_list_->num_subchannels;
}
-// Sets p->last_ready_subchannel_index to last_ready_index.
-static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p,
- size_t last_ready_index) {
- GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels);
- p->last_ready_subchannel_index = last_ready_index;
+// Sets last_ready_subchannel_index_ to last_ready_index.
+void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) {
+ GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels);
+ last_ready_subchannel_index_ = last_ready_index;
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
- "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
- (void*)p, static_cast<unsigned long>(last_ready_index),
- (void*)p->subchannel_list->subchannels[last_ready_index].subchannel,
- (void*)p->subchannel_list->subchannels[last_ready_index]
+ "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR
+ " (SC %p, CSC %p)",
+ this, last_ready_index,
+ subchannel_list_->subchannels[last_ready_index].subchannel,
+ subchannel_list_->subchannels[last_ready_index]
.connected_subchannel.get());
}
}
-static void rr_destroy(grpc_lb_policy* pol) {
- round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p",
- (void*)pol, (void*)pol);
+void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
+ PickState* pick;
+ while ((pick = pending_picks_) != nullptr) {
+ pending_picks_ = pick->next;
+ if (new_policy->PickLocked(pick)) {
+ // Synchronous return, schedule closure.
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
+ }
}
- GPR_ASSERT(p->subchannel_list == nullptr);
- GPR_ASSERT(p->latest_pending_subchannel_list == nullptr);
- grpc_connectivity_state_destroy(&p->state_tracker);
- grpc_subchannel_index_unref();
- gpr_free(p);
}
-static void rr_shutdown_locked(grpc_lb_policy* pol,
- grpc_lb_policy* new_policy) {
- round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
+void RoundRobin::ShutdownLocked() {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
- }
- p->shutdown = true;
- grpc_lb_policy_pick_state* pick;
- while ((pick = p->pending_picks) != nullptr) {
- p->pending_picks = pick->next;
- if (new_policy != nullptr) {
- // Hand off to new LB policy.
- if (grpc_lb_policy_pick_locked(new_policy, pick)) {
- // Synchronous return; schedule callback.
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
- }
- } else {
- pick->connected_subchannel.reset();
- GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
- }
+ gpr_log(GPR_DEBUG, "[RR %p] Shutting down", this);
+ }
+ shutdown_ = true;
+ PickState* pick;
+ while ((pick = pending_picks_) != nullptr) {
+ pending_picks_ = pick->next;
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error));
}
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "rr_shutdown");
- if (p->subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
+ if (subchannel_list_ != nullptr) {
+ grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
"sl_shutdown_rr_shutdown");
- p->subchannel_list = nullptr;
+ subchannel_list_ = nullptr;
}
- if (p->latest_pending_subchannel_list != nullptr) {
+ if (latest_pending_subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
- p->latest_pending_subchannel_list, "sl_shutdown_pending_rr_shutdown");
- p->latest_pending_subchannel_list = nullptr;
+ latest_pending_subchannel_list_, "sl_shutdown_pending_rr_shutdown");
+ latest_pending_subchannel_list_ = nullptr;
}
- grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
- GRPC_ERROR_CANCELLED);
+ TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
-static void rr_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_lb_policy_pick_state* pick,
- grpc_error* error) {
- round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
- grpc_lb_policy_pick_state* pp = p->pending_picks;
- p->pending_picks = nullptr;
+void RoundRobin::CancelPickLocked(PickState* pick, grpc_error* error) {
+ PickState* pp = pending_picks_;
+ pending_picks_ = nullptr;
while (pp != nullptr) {
- grpc_lb_policy_pick_state* next = pp->next;
+ PickState* next = pp->next;
if (pp == pick) {
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick cancelled", &error, 1));
+ "Pick Cancelled", &error, 1));
} else {
- pp->next = p->pending_picks;
- p->pending_picks = pp;
+ pp->next = pending_picks_;
+ pending_picks_ = pp;
}
pp = next;
}
GRPC_ERROR_UNREF(error);
}
-static void rr_cancel_picks_locked(grpc_lb_policy* pol,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq,
- grpc_error* error) {
- round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
- grpc_lb_policy_pick_state* pick = p->pending_picks;
- p->pending_picks = nullptr;
+void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error* error) {
+ PickState* pick = pending_picks_;
+ pending_picks_ = nullptr;
while (pick != nullptr) {
- grpc_lb_policy_pick_state* next = pick->next;
+ PickState* next = pick->next;
if ((pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
pick->connected_subchannel.reset();
GRPC_CLOSURE_SCHED(pick->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick cancelled", &error, 1));
+ "Pick Cancelled", &error, 1));
} else {
- pick->next = p->pending_picks;
- p->pending_picks = pick;
+ pick->next = pending_picks_;
+ pending_picks_ = pick;
}
pick = next;
}
GRPC_ERROR_UNREF(error);
}
-static void start_picking_locked(round_robin_lb_policy* p) {
- p->started_picking = true;
- for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
- if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
- grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
- "connectivity_watch");
+void RoundRobin::SubchannelListRefForConnectivityWatch(
+ grpc_lb_subchannel_list* subchannel_list, const char* reason) {
+ // TODO(roth): We currently track this ref manually. Once the new
+ // ClosureRef API is ready and the subchannel_list code has been
+ // converted to a C++ API, find a way to hold the RefCountedPtr<>
+ // somewhere (maybe in the subchannel_data object) instead of doing
+ // this manually.
+ auto self = Ref(DEBUG_LOCATION, reason);
+ self.release();
+ grpc_lb_subchannel_list_ref(subchannel_list, reason);
+}
+
+void RoundRobin::SubchannelListUnrefForConnectivityWatch(
+ grpc_lb_subchannel_list* subchannel_list, const char* reason) {
+ Unref(DEBUG_LOCATION, reason);
+ grpc_lb_subchannel_list_unref(subchannel_list, reason);
+}
+
+void RoundRobin::StartPickingLocked() {
+ started_picking_ = true;
+ for (size_t i = 0; i < subchannel_list_->num_subchannels; i++) {
+ if (subchannel_list_->subchannels[i].subchannel != nullptr) {
+ SubchannelListRefForConnectivityWatch(subchannel_list_,
+ "connectivity_watch");
grpc_lb_subchannel_data_start_connectivity_watch(
- &p->subchannel_list->subchannels[i]);
+ &subchannel_list_->subchannels[i]);
}
}
}
-static void rr_exit_idle_locked(grpc_lb_policy* pol) {
- round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
- if (!p->started_picking) {
- start_picking_locked(p);
+void RoundRobin::ExitIdleLocked() {
+ if (!started_picking_) {
+ StartPickingLocked();
}
}
-static int rr_pick_locked(grpc_lb_policy* pol,
- grpc_lb_policy_pick_state* pick) {
- round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
+bool RoundRobin::PickLocked(PickState* pick) {
if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", pol,
- p->shutdown);
+ gpr_log(GPR_DEBUG, "[RR %p] Trying to pick (shutdown: %d)", this,
+ shutdown_);
}
- GPR_ASSERT(!p->shutdown);
- if (p->subchannel_list != nullptr) {
- const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
- if (next_ready_index < p->subchannel_list->num_subchannels) {
+ GPR_ASSERT(!shutdown_);
+ if (subchannel_list_ != nullptr) {
+ const size_t next_ready_index = GetNextReadySubchannelIndexLocked();
+ if (next_ready_index < subchannel_list_->num_subchannels) {
/* readily available, report right away */
grpc_lb_subchannel_data* sd =
- &p->subchannel_list->subchannels[next_ready_index];
+ &subchannel_list_->subchannels[next_ready_index];
pick->connected_subchannel = sd->connected_subchannel;
if (pick->user_data != nullptr) {
*pick->user_data = sd->user_data;
@@ -273,24 +328,24 @@ static int rr_pick_locked(grpc_lb_policy* pol,
GPR_DEBUG,
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, "
"index %" PRIuPTR ")",
- p, sd->subchannel, pick->connected_subchannel.get(),
+ this, sd->subchannel, pick->connected_subchannel.get(),
sd->subchannel_list, next_ready_index);
}
/* only advance the last picked pointer if the selection was used */
- update_last_ready_subchannel_index_locked(p, next_ready_index);
- return 1;
+ UpdateLastReadySubchannelIndexLocked(next_ready_index);
+ return true;
}
}
/* no pick currently available. Save for later in list of pending picks */
- if (!p->started_picking) {
- start_picking_locked(p);
+ if (!started_picking_) {
+ StartPickingLocked();
}
- pick->next = p->pending_picks;
- p->pending_picks = pick;
- return 0;
+ pick->next = pending_picks_;
+ pending_picks_ = pick;
+ return false;
}
-static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
+void UpdateStateCountersLocked(grpc_lb_subchannel_data* sd) {
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN);
@@ -318,8 +373,8 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
* (the grpc_lb_subchannel_data associated with the updated subchannel) and the
* subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used
* only if the policy transitions to state TRANSIENT_FAILURE. */
-static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
- grpc_error* error) {
+void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd,
+ grpc_error* error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
@@ -335,64 +390,61 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
* subchannel_list->num_subchannels.
*/
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
- round_robin_lb_policy* p =
- reinterpret_cast<round_robin_lb_policy*>(subchannel_list->policy);
GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE);
if (subchannel_list->num_ready > 0) {
/* 1) READY */
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
+ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) {
/* 2) CONNECTING */
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING,
+ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "rr_connecting");
} else if (subchannel_list->num_transient_failures ==
subchannel_list->num_subchannels) {
/* 3) TRANSIENT_FAILURE */
- grpc_connectivity_state_set(&p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "rr_transient_failure");
+ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error),
+ "rr_exhausted_subchannels");
}
GRPC_ERROR_UNREF(error);
}
-static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
- grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg);
- round_robin_lb_policy* p =
- reinterpret_cast<round_robin_lb_policy*>(sd->subchannel_list->policy);
+void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) {
+ grpc_lb_subchannel_data* sd = reinterpret_cast<grpc_lb_subchannel_data*>(arg);
+ RoundRobin* p = reinterpret_cast<RoundRobin*>(sd->subchannel_list->policy);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(
GPR_DEBUG,
"[RR %p] connectivity changed for subchannel %p, subchannel_list %p: "
"prev_state=%s new_state=%s p->shutdown=%d "
"sd->subchannel_list->shutting_down=%d error=%s",
- (void*)p, (void*)sd->subchannel, (void*)sd->subchannel_list,
+ p, sd->subchannel, sd->subchannel_list,
grpc_connectivity_state_name(sd->prev_connectivity_state),
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
- p->shutdown, sd->subchannel_list->shutting_down,
+ p->shutdown_, sd->subchannel_list->shutting_down,
grpc_error_string(error));
}
GPR_ASSERT(sd->subchannel != nullptr);
// If the policy is shutting down, unref and return.
- if (p->shutdown) {
+ if (p->shutdown_) {
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
- "rr_shutdown");
+ p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
+ "rr_shutdown");
return;
}
// If the subchannel list is shutting down, stop watching.
if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) {
grpc_lb_subchannel_data_stop_connectivity_watch(sd);
grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown");
- grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list,
- "rr_sl_shutdown");
+ p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list,
+ "rr_sl_shutdown");
return;
}
// If we're still here, the notification must be for a subchannel in
// either the current or latest pending subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->subchannel_list ||
- sd->subchannel_list == p->latest_pending_subchannel_list);
+ GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ ||
+ sd->subchannel_list == p->latest_pending_subchannel_list_);
GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN);
// Now that we're inside the combiner, copy the pending connectivity
// state (which was set by the connectivity state watcher) to
@@ -409,8 +461,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
"Requesting re-resolution",
p, sd->subchannel);
}
- grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
- GRPC_ERROR_NONE);
+ p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE);
break;
}
case GRPC_CHANNEL_READY: {
@@ -418,49 +469,47 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
sd->connected_subchannel =
grpc_subchannel_get_connected_subchannel(sd->subchannel);
}
- if (sd->subchannel_list != p->subchannel_list) {
- // promote sd->subchannel_list to p->subchannel_list.
+ if (sd->subchannel_list != p->subchannel_list_) {
+ // promote sd->subchannel_list to p->subchannel_list_.
// sd->subchannel_list must be equal to
- // p->latest_pending_subchannel_list because we have already filtered
+ // p->latest_pending_subchannel_list_ because we have already filtered
// for sds belonging to outdated subchannel lists.
- GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list);
+ GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list_);
GPR_ASSERT(!sd->subchannel_list->shutting_down);
if (grpc_lb_round_robin_trace.enabled()) {
- const unsigned long num_subchannels =
- p->subchannel_list != nullptr
- ? static_cast<unsigned long>(
- p->subchannel_list->num_subchannels)
+ const size_t num_subchannels =
+ p->subchannel_list_ != nullptr
+ ? p->subchannel_list_->num_subchannels
: 0;
gpr_log(GPR_DEBUG,
- "[RR %p] phasing out subchannel list %p (size %lu) in favor "
- "of %p (size %lu)",
- p, p->subchannel_list, num_subchannels, sd->subchannel_list,
+ "[RR %p] phasing out subchannel list %p (size %" PRIuPTR
+ ") in favor of %p (size %" PRIuPTR ")",
+ p, p->subchannel_list_, num_subchannels, sd->subchannel_list,
num_subchannels);
}
- if (p->subchannel_list != nullptr) {
+ if (p->subchannel_list_ != nullptr) {
// dispose of the current subchannel_list
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
+ grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_,
"sl_phase_out_shutdown");
}
- p->subchannel_list = p->latest_pending_subchannel_list;
- p->latest_pending_subchannel_list = nullptr;
+ p->subchannel_list_ = p->latest_pending_subchannel_list_;
+ p->latest_pending_subchannel_list_ = nullptr;
}
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
- * p->pending_picks. This preemptively replicates rr_pick()'s actions.
- */
- const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
- GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
+ * p->pending_picks. This preemptively replicates rr_pick()'s actions. */
+ const size_t next_ready_index = p->GetNextReadySubchannelIndexLocked();
+ GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels);
grpc_lb_subchannel_data* selected =
- &p->subchannel_list->subchannels[next_ready_index];
- if (p->pending_picks != nullptr) {
+ &p->subchannel_list_->subchannels[next_ready_index];
+ if (p->pending_picks_ != nullptr) {
// if the selected subchannel is going to be used for the pending
// picks, update the last picked pointer
- update_last_ready_subchannel_index_locked(p, next_ready_index);
+ p->UpdateLastReadySubchannelIndexLocked(next_ready_index);
}
- grpc_lb_policy_pick_state* pick;
- while ((pick = p->pending_picks)) {
- p->pending_picks = pick->next;
+ PickState* pick;
+ while ((pick = p->pending_picks_)) {
+ p->pending_picks_ = pick->next;
pick->connected_subchannel = selected->connected_subchannel;
if (pick->user_data != nullptr) {
*pick->user_data = selected->user_data;
@@ -468,10 +517,9 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] Fulfilling pending pick. Target <-- subchannel %p "
- "(subchannel_list %p, index %lu)",
- (void*)p, (void*)selected->subchannel,
- (void*)p->subchannel_list,
- static_cast<unsigned long>(next_ready_index));
+ "(subchannel_list %p, index %" PRIuPTR ")",
+ p, selected->subchannel, p->subchannel_list_,
+ next_ready_index);
}
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE);
}
@@ -482,40 +530,34 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:; // fallthrough
}
- // Update state counters and new overall state.
- update_state_counters_locked(sd);
+ // Update state counters.
+ UpdateStateCountersLocked(sd);
// Only update connectivity based on the selected subchannel list.
- if (sd->subchannel_list == p->subchannel_list) {
- update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error));
+ if (sd->subchannel_list == p->subchannel_list_) {
+ p->UpdateConnectivityStatusLocked(sd, GRPC_ERROR_REF(error));
}
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(sd);
}
-static grpc_connectivity_state rr_check_connectivity_locked(
- grpc_lb_policy* pol, grpc_error** error) {
- round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
- return grpc_connectivity_state_get(&p->state_tracker, error);
+grpc_connectivity_state RoundRobin::CheckConnectivityLocked(
+ grpc_error** error) {
+ return grpc_connectivity_state_get(&state_tracker_, error);
}
-static void rr_notify_on_state_change_locked(grpc_lb_policy* pol,
- grpc_connectivity_state* current,
- grpc_closure* notify) {
- round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
- grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
+ grpc_closure* notify) {
+ grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
notify);
}
-static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
+void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
grpc_closure* on_ack) {
- round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol);
- const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
- if (next_ready_index < p->subchannel_list->num_subchannels) {
+ const size_t next_ready_index = GetNextReadySubchannelIndexLocked();
+ if (next_ready_index < subchannel_list_->num_subchannels) {
grpc_lb_subchannel_data* selected =
- &p->subchannel_list->subchannels[next_ready_index];
- grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> target =
- selected->connected_subchannel;
- target->Ping(on_initiate, on_ack);
+ &subchannel_list_->subchannels[next_ready_index];
+ selected->connected_subchannel->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
@@ -524,45 +566,41 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
}
}
-static void rr_update_locked(grpc_lb_policy* policy,
- const grpc_lb_policy_args* args) {
- round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(policy);
- const grpc_arg* arg =
- grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
+ const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
- gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", p);
+ gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this);
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
// Otherwise, keep using the current subchannel list (ignore this update).
- if (p->subchannel_list == nullptr) {
+ if (subchannel_list_ == nullptr) {
grpc_connectivity_state_set(
- &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"rr_update_missing");
}
return;
}
- grpc_lb_addresses* addresses =
- static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
+ grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p;
if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", p,
- addresses->num_addresses);
+ gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses",
+ this, addresses->num_addresses);
}
grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create(
- &p->base, &grpc_lb_round_robin_trace, addresses, args,
- rr_connectivity_changed_locked);
+ this, &grpc_lb_round_robin_trace, addresses, combiner(),
+ client_channel_factory(), args, &RoundRobin::OnConnectivityChangedLocked);
if (subchannel_list->num_subchannels == 0) {
grpc_connectivity_state_set(
- &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
"rr_update_empty");
- if (p->subchannel_list != nullptr) {
- grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list,
+ if (subchannel_list_ != nullptr) {
+ grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_,
"sl_shutdown_empty_update");
}
- p->subchannel_list = subchannel_list; // empty list
+ subchannel_list_ = subchannel_list; // empty list
return;
}
- if (p->started_picking) {
+ if (started_picking_) {
for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
const grpc_connectivity_state subchannel_state =
grpc_subchannel_check_connectivity(
@@ -587,87 +625,61 @@ static void rr_update_locked(grpc_lb_policy* policy,
++subchannel_list->num_transient_failures;
}
}
- if (p->latest_pending_subchannel_list != nullptr) {
+ if (latest_pending_subchannel_list_ != nullptr) {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG,
"[RR %p] Shutting down latest pending subchannel list %p, "
"about to be replaced by newer latest %p",
- (void*)p, (void*)p->latest_pending_subchannel_list,
- (void*)subchannel_list);
+ this, latest_pending_subchannel_list_, subchannel_list);
}
grpc_lb_subchannel_list_shutdown_and_unref(
- p->latest_pending_subchannel_list, "sl_outdated");
+ latest_pending_subchannel_list_, "sl_outdated");
}
- p->latest_pending_subchannel_list = subchannel_list;
+ latest_pending_subchannel_list_ = subchannel_list;
for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) {
/* Watch every new subchannel. A subchannel list becomes active the
* moment one of its subchannels is READY. At that moment, we swap
* p->subchannel_list for sd->subchannel_list, provided the subchannel
* list is still valid (ie, isn't shutting down) */
- grpc_lb_subchannel_list_ref_for_connectivity_watch(subchannel_list,
- "connectivity_watch");
+ SubchannelListRefForConnectivityWatch(subchannel_list,
+ "connectivity_watch");
grpc_lb_subchannel_data_start_connectivity_watch(
&subchannel_list->subchannels[i]);
}
} else {
// The policy isn't picking yet. Save the update for later, disposing of
// previous version if any.
- if (p->subchannel_list != nullptr) {
+ if (subchannel_list_ != nullptr) {
grpc_lb_subchannel_list_shutdown_and_unref(
- p->subchannel_list, "rr_update_before_started_picking");
+ subchannel_list_, "rr_update_before_started_picking");
}
- p->subchannel_list = subchannel_list;
+ subchannel_list_ = subchannel_list;
}
}
-static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy,
- rr_shutdown_locked,
- rr_pick_locked,
- rr_cancel_pick_locked,
- rr_cancel_picks_locked,
- rr_ping_one_locked,
- rr_exit_idle_locked,
- rr_check_connectivity_locked,
- rr_notify_on_state_change_locked,
- rr_update_locked};
-
-static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}
-
-static void round_robin_factory_unref(grpc_lb_policy_factory* factory) {}
-
-static grpc_lb_policy* round_robin_create(grpc_lb_policy_factory* factory,
- grpc_lb_policy_args* args) {
- GPR_ASSERT(args->client_channel_factory != nullptr);
- round_robin_lb_policy* p =
- static_cast<round_robin_lb_policy*>(gpr_zalloc(sizeof(*p)));
- grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
- grpc_subchannel_index_ref();
- grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
- "round_robin");
- rr_update_locked(&p->base, args);
- if (grpc_lb_round_robin_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void*)p,
- static_cast<unsigned long>(p->subchannel_list->num_subchannels));
- }
- return &p->base;
-}
+//
+// factory
+//
-static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = {
- round_robin_factory_ref, round_robin_factory_unref, round_robin_create,
- "round_robin"};
+class RoundRobinFactory : public LoadBalancingPolicyFactory {
+ public:
+ OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+ const LoadBalancingPolicy::Args& args) const override {
+ return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args));
+ }
-static grpc_lb_policy_factory round_robin_lb_policy_factory = {
- &round_robin_factory_vtable};
+ const char* name() const override { return "round_robin"; }
+};
-static grpc_lb_policy_factory* round_robin_lb_factory_create() {
- return &round_robin_lb_policy_factory;
-}
+} // namespace
-/* Plugin registration */
+} // namespace grpc_core
void grpc_lb_policy_round_robin_init() {
- grpc_register_lb_policy(round_robin_lb_factory_create());
+ grpc_core::LoadBalancingPolicyRegistry::Builder::
+ RegisterLoadBalancingPolicyFactory(
+ grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
+ grpc_core::New<grpc_core::RoundRobinFactory>()));
}
void grpc_lb_policy_round_robin_shutdown() {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
index e35c5e8db3..f1580e8b91 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc
@@ -67,7 +67,7 @@ void grpc_lb_subchannel_data_start_connectivity_watch(
}
sd->connectivity_notification_pending = true;
grpc_subchannel_notify_on_state_change(
- sd->subchannel, sd->subchannel_list->policy->interested_parties,
+ sd->subchannel, sd->subchannel_list->policy->interested_parties(),
&sd->pending_connectivity_state_unsafe,
&sd->connectivity_changed_closure);
}
@@ -88,9 +88,10 @@ void grpc_lb_subchannel_data_stop_connectivity_watch(
}
grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
- grpc_lb_policy* p, grpc_core::TraceFlag* tracer,
- const grpc_lb_addresses* addresses, const grpc_lb_policy_args* args,
- grpc_iomgr_cb_func connectivity_changed_cb) {
+ grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb) {
grpc_lb_subchannel_list* subchannel_list =
static_cast<grpc_lb_subchannel_list*>(
gpr_zalloc(sizeof(*subchannel_list)));
@@ -118,12 +119,11 @@ grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
- args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
- 1);
+ &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
gpr_free(addr_arg.value.string);
sc_args.args = new_args;
grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
- args->client_channel_factory, &sc_args);
+ client_channel_factory, &sc_args);
grpc_channel_args_destroy(new_args);
if (subchannel == nullptr) {
// Subchannel could not be created.
@@ -154,7 +154,7 @@ grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
sd->subchannel = subchannel;
GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
connectivity_changed_cb, sd,
- grpc_combiner_scheduler(args->combiner));
+ grpc_combiner_scheduler(combiner));
// We assume that the current state is IDLE. If not, we'll get a
// callback telling us that.
sd->prev_connectivity_state = GRPC_CHANNEL_IDLE;
@@ -212,18 +212,6 @@ void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
}
}
-void grpc_lb_subchannel_list_ref_for_connectivity_watch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- GRPC_LB_POLICY_REF(subchannel_list->policy, reason);
- grpc_lb_subchannel_list_ref(subchannel_list, reason);
-}
-
-void grpc_lb_subchannel_list_unref_for_connectivity_watch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason) {
- GRPC_LB_POLICY_UNREF(subchannel_list->policy, reason);
- grpc_lb_subchannel_list_unref(subchannel_list, reason);
-}
-
static void subchannel_data_cancel_connectivity_watch(
grpc_lb_subchannel_data* sd, const char* reason) {
if (sd->subchannel_list->tracer->enabled()) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 91537f3afe..7a8f5f1029 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -82,7 +82,7 @@ void grpc_lb_subchannel_data_stop_connectivity_watch(
struct grpc_lb_subchannel_list {
/** backpointer to owning policy */
- grpc_lb_policy* policy;
+ grpc_core::LoadBalancingPolicy* policy;
grpc_core::TraceFlag* tracer;
@@ -115,9 +115,10 @@ struct grpc_lb_subchannel_list {
};
grpc_lb_subchannel_list* grpc_lb_subchannel_list_create(
- grpc_lb_policy* p, grpc_core::TraceFlag* tracer,
- const grpc_lb_addresses* addresses, const grpc_lb_policy_args* args,
- grpc_iomgr_cb_func connectivity_changed_cb);
+ grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer,
+ const grpc_lb_addresses* addresses, grpc_combiner* combiner,
+ grpc_client_channel_factory* client_channel_factory,
+ const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb);
void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
const char* reason);
@@ -125,13 +126,6 @@ void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list,
void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list,
const char* reason);
-/// Takes and releases refs needed for a connectivity notification.
-/// This includes a ref to subchannel_list and a weak ref to the LB policy.
-void grpc_lb_subchannel_list_ref_for_connectivity_watch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
-void grpc_lb_subchannel_list_unref_for_connectivity_watch(
- grpc_lb_subchannel_list* subchannel_list, const char* reason);
-
/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The
/// connectivity state notification callback will ultimately unref it.
void grpc_lb_subchannel_list_shutdown_and_unref(
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.cc b/src/core/ext/filters/client_channel/lb_policy_factory.cc
index f2a800b221..2018aabb91 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.cc
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.cc
@@ -151,17 +151,3 @@ grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
return nullptr;
return static_cast<grpc_lb_addresses*>(lb_addresses_arg->value.pointer.p);
}
-
-void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
- factory->vtable->ref(factory);
-}
-
-void grpc_lb_policy_factory_unref(grpc_lb_policy_factory* factory) {
- factory->vtable->unref(factory);
-}
-
-grpc_lb_policy* grpc_lb_policy_factory_create_lb_policy(
- grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) {
- if (factory == nullptr) return nullptr;
- return factory->vtable->create_lb_policy(factory, args);
-}
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index 9da231b657..e0e7d8bf5c 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -26,21 +26,20 @@
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/uri_parser.h"
+//
+// representation of an LB address
+//
+
// Channel arg key for grpc_lb_addresses.
#define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses"
-typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
-typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;
-
-struct grpc_lb_policy_factory {
- const grpc_lb_policy_factory_vtable* vtable;
-};
-
/** A resolved address alongside any LB related information associated with it.
* \a user_data, if not NULL, contains opaque data meant to be consumed by the
* gRPC LB policy. Note that no all LB policies support \a user_data as input.
* Those who don't will simply ignore it and will correspondingly return NULL in
* their namesake pick() output argument. */
+// TODO(roth): Once we figure out a better way of handling user_data in
+// LB policies, convert these structs to C++ classes.
typedef struct grpc_lb_address {
grpc_resolved_address address;
bool is_balancer;
@@ -101,30 +100,27 @@ grpc_arg grpc_lb_addresses_create_channel_arg(
grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
const grpc_channel_args* channel_args);
-/** Arguments passed to LB policies. */
-struct grpc_lb_policy_args {
- grpc_client_channel_factory* client_channel_factory;
- grpc_channel_args* args;
- grpc_combiner* combiner;
-};
+//
+// LB policy factory
+//
-struct grpc_lb_policy_factory_vtable {
- void (*ref)(grpc_lb_policy_factory* factory);
- void (*unref)(grpc_lb_policy_factory* factory);
+namespace grpc_core {
- /** Implementation of grpc_lb_policy_factory_create_lb_policy */
- grpc_lb_policy* (*create_lb_policy)(grpc_lb_policy_factory* factory,
- grpc_lb_policy_args* args);
+class LoadBalancingPolicyFactory {
+ public:
+ /// Returns a new LB policy instance.
+ virtual OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+ const LoadBalancingPolicy::Args& args) const GRPC_ABSTRACT;
- /** Name for the LB policy this factory implements */
- const char* name;
-};
+ /// Returns the LB policy name that this factory provides.
+ /// Caller does NOT take ownership of result.
+ virtual const char* name() const GRPC_ABSTRACT;
-void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory);
-void grpc_lb_policy_factory_unref(grpc_lb_policy_factory* factory);
+ virtual ~LoadBalancingPolicyFactory() {}
+
+ GRPC_ABSTRACT_BASE_CLASS
+};
-/** Create a lb_policy instance. */
-grpc_lb_policy* grpc_lb_policy_factory_create_lb_policy(
- grpc_lb_policy_factory* factory, grpc_lb_policy_args* args);
+} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_FACTORY_H */
diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.cc b/src/core/ext/filters/client_channel/lb_policy_registry.cc
index 8414504e8f..f495cdb3c2 100644
--- a/src/core/ext/filters/client_channel/lb_policy_registry.cc
+++ b/src/core/ext/filters/client_channel/lb_policy_registry.cc
@@ -21,50 +21,75 @@
#include <string.h>
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
-#define MAX_POLICIES 10
+namespace grpc_core {
-static grpc_lb_policy_factory* g_all_of_the_lb_policies[MAX_POLICIES];
-static int g_number_of_lb_policies = 0;
+namespace {
-void grpc_lb_policy_registry_init(void) { g_number_of_lb_policies = 0; }
+class RegistryState {
+ public:
+ RegistryState() {}
-void grpc_lb_policy_registry_shutdown(void) {
- int i;
- for (i = 0; i < g_number_of_lb_policies; i++) {
- grpc_lb_policy_factory_unref(g_all_of_the_lb_policies[i]);
+ void RegisterLoadBalancingPolicyFactory(
+ UniquePtr<LoadBalancingPolicyFactory> factory) {
+ for (size_t i = 0; i < factories_.size(); ++i) {
+ GPR_ASSERT(strcmp(factories_[i]->name(), factory->name()) != 0);
+ }
+ factories_.push_back(std::move(factory));
}
-}
-void grpc_register_lb_policy(grpc_lb_policy_factory* factory) {
- int i;
- for (i = 0; i < g_number_of_lb_policies; i++) {
- GPR_ASSERT(0 != gpr_stricmp(factory->vtable->name,
- g_all_of_the_lb_policies[i]->vtable->name));
+ LoadBalancingPolicyFactory* GetLoadBalancingPolicyFactory(
+ const char* name) const {
+ for (size_t i = 0; i < factories_.size(); ++i) {
+ if (strcmp(name, factories_[i]->name()) == 0) {
+ return factories_[i].get();
+ }
+ }
+ return nullptr;
}
- GPR_ASSERT(g_number_of_lb_policies != MAX_POLICIES);
- grpc_lb_policy_factory_ref(factory);
- g_all_of_the_lb_policies[g_number_of_lb_policies++] = factory;
-}
-static grpc_lb_policy_factory* lookup_factory(const char* name) {
- int i;
+ private:
+ InlinedVector<UniquePtr<LoadBalancingPolicyFactory>, 10> factories_;
+};
- if (name == nullptr) return nullptr;
+RegistryState* g_state = nullptr;
- for (i = 0; i < g_number_of_lb_policies; i++) {
- if (0 == gpr_stricmp(name, g_all_of_the_lb_policies[i]->vtable->name)) {
- return g_all_of_the_lb_policies[i];
- }
- }
+} // namespace
+
+//
+// LoadBalancingPolicyRegistry::Builder
+//
- return nullptr;
+void LoadBalancingPolicyRegistry::Builder::InitRegistry() {
+ if (g_state == nullptr) g_state = New<RegistryState>();
}
-grpc_lb_policy* grpc_lb_policy_create(const char* name,
- grpc_lb_policy_args* args) {
- grpc_lb_policy_factory* factory = lookup_factory(name);
- grpc_lb_policy* lb_policy =
- grpc_lb_policy_factory_create_lb_policy(factory, args);
- return lb_policy;
+void LoadBalancingPolicyRegistry::Builder::ShutdownRegistry() {
+ Delete(g_state);
+ g_state = nullptr;
}
+
+void LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
+ UniquePtr<LoadBalancingPolicyFactory> factory) {
+ InitRegistry();
+ g_state->RegisterLoadBalancingPolicyFactory(std::move(factory));
+}
+
+//
+// LoadBalancingPolicyRegistry
+//
+
+OrphanablePtr<LoadBalancingPolicy>
+LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
+ const char* name, const LoadBalancingPolicy::Args& args) {
+ GPR_ASSERT(g_state != nullptr);
+ // Find factory.
+ LoadBalancingPolicyFactory* factory =
+ g_state->GetLoadBalancingPolicyFactory(name);
+ if (factory == nullptr) return nullptr; // Specified name not found.
+ // Create policy via factory.
+ return factory->CreateLoadBalancingPolicy(args);
+}
+
+} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.h b/src/core/ext/filters/client_channel/lb_policy_registry.h
index 5aff79376b..14c21dfe63 100644
--- a/src/core/ext/filters/client_channel/lb_policy_registry.h
+++ b/src/core/ext/filters/client_channel/lb_policy_registry.h
@@ -20,21 +20,34 @@
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/exec_ctx.h"
-/** Initialize the registry and set \a default_factory as the factory to be
- * returned when no name is provided in a lookup */
-void grpc_lb_policy_registry_init(void);
-void grpc_lb_policy_registry_shutdown(void);
+namespace grpc_core {
-/** Register a LB policy factory. */
-void grpc_register_lb_policy(grpc_lb_policy_factory* factory);
+class LoadBalancingPolicyRegistry {
+ public:
+ /// Methods used to create and populate the LoadBalancingPolicyRegistry.
+ /// NOT THREAD SAFE -- to be used only during global gRPC
+ /// initialization and shutdown.
+ class Builder {
+ public:
+ /// Global initialization and shutdown hooks.
+ static void InitRegistry();
+ static void ShutdownRegistry();
-/** Create a \a grpc_lb_policy instance.
- *
- * If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init
- * will be returned. */
-grpc_lb_policy* grpc_lb_policy_create(const char* name,
- grpc_lb_policy_args* args);
+ /// Registers an LB policy factory. The factory will be used to create an
+ /// LB policy whose name matches that of the factory.
+ static void RegisterLoadBalancingPolicyFactory(
+ UniquePtr<LoadBalancingPolicyFactory> factory);
+ };
+
+ /// Creates an LB policy of the type specified by \a name.
+ static OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+ const char* name, const LoadBalancingPolicy::Args& args);
+};
+
+} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H */
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index c095b5bed9..e00dc3a4ee 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -885,7 +885,6 @@ src/core/ext/filters/client_channel/lb_policy.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \
-src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc \
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc \
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 3efaa6e686..0e15ab47ce 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -9002,7 +9002,6 @@
],
"headers": [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
@@ -9015,7 +9014,6 @@
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc",
@@ -9039,7 +9037,6 @@
],
"headers": [
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h",
@@ -9052,7 +9049,6 @@
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc",
- "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc",
"src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc",