diff options
22 files changed, 2362 insertions, 2531 deletions
@@ -1148,7 +1148,6 @@ grpc_cc_library( ], hdrs = [ "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", - "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", @@ -1177,7 +1176,6 @@ grpc_cc_library( ], hdrs = [ "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", - "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", diff --git a/build.yaml b/build.yaml index 74c76a4072..b0ae5793af 100644 --- a/build.yaml +++ b/build.yaml @@ -518,7 +518,6 @@ filegroups: - name: grpc_lb_policy_grpclb headers: - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h - - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h - src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -539,7 +538,6 @@ filegroups: - name: grpc_lb_policy_grpclb_secure headers: - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h - - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h - src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 2f5465c6b8..21d37c87f3 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -504,7 +504,6 @@ Pod::Spec.new do |s| 'src/core/lib/transport/transport_impl.h', 'src/core/lib/debug/trace.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', - 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 95db0d8e7d..f130d20cb3 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -448,7 +448,6 @@ Pod::Spec.new do |s| 'src/core/lib/transport/transport_impl.h', 'src/core/lib/debug/trace.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', - 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h', @@ -937,7 +936,6 @@ Pod::Spec.new do |s| 'src/core/lib/transport/transport_impl.h', 'src/core/lib/debug/trace.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', - 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h', diff --git a/grpc.gemspec b/grpc.gemspec index ac901da0fe..398ce50705 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -374,7 +374,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/transport/transport_impl.h ) s.files += %w( src/core/lib/debug/trace.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h ) - s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h ) diff --git a/package.xml b/package.xml index 5575855648..d469d878b4 100644 --- a/package.xml +++ b/package.xml @@ -381,7 +381,6 @@ <file baseinstalldir="/" name="src/core/lib/transport/transport_impl.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/debug/trace.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" role="src" /> - <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" role="src" /> diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 8aa9905d5c..174a15b447 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -175,7 +175,7 @@ typedef struct client_channel_channel_data { /** combiner protecting all variables below in this data structure */ grpc_combiner* combiner; /** currently active load balancer */ - grpc_lb_policy* lb_policy; + grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy; /** retry throttle data */ grpc_server_retry_throttle_data* retry_throttle_data; /** maps method names to method_parameters structs */ @@ -212,7 +212,7 @@ typedef struct { channel_data* chand; /** used as an identifier, don't dereference it because the LB policy may be * non-existing when the callback is run */ - grpc_lb_policy* lb_policy; + grpc_core::LoadBalancingPolicy* lb_policy; grpc_closure closure; } reresolution_request_args; @@ -223,11 +223,11 @@ typedef struct { channel_data* chand; grpc_closure on_changed; grpc_connectivity_state state; - grpc_lb_policy* lb_policy; + grpc_core::LoadBalancingPolicy* lb_policy; } lb_policy_connectivity_watcher; static void watch_lb_policy_locked(channel_data* chand, - grpc_lb_policy* lb_policy, + grpc_core::LoadBalancingPolicy* lb_policy, grpc_connectivity_state current_state); static void set_channel_connectivity_state_locked(channel_data* chand, @@ -241,15 +241,13 @@ static void set_channel_connectivity_state_locked(channel_data* chand, if (chand->lb_policy != nullptr) { if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { /* cancel picks with wait_for_ready=false */ - grpc_lb_policy_cancel_picks_locked( - chand->lb_policy, + chand->lb_policy->CancelMatchingPicksLocked( /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, /* check= */ 0, GRPC_ERROR_REF(error)); } else if (state == GRPC_CHANNEL_SHUTDOWN) { /* cancel all picks */ - grpc_lb_policy_cancel_picks_locked(chand->lb_policy, - /* mask= */ 0, /* check= */ 0, - GRPC_ERROR_REF(error)); + chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0, + GRPC_ERROR_REF(error)); } } if (grpc_client_channel_trace.enabled()) { @@ -263,7 +261,7 @@ static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) { lb_policy_connectivity_watcher* w = static_cast<lb_policy_connectivity_watcher*>(arg); /* check if the notification is for the latest policy */ - if (w->lb_policy == w->chand->lb_policy) { + if (w->lb_policy == w->chand->lb_policy.get()) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand, w->lb_policy, grpc_connectivity_state_name(w->state)); @@ -279,7 +277,7 @@ static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) { } static void watch_lb_policy_locked(channel_data* chand, - grpc_lb_policy* lb_policy, + grpc_core::LoadBalancingPolicy* lb_policy, grpc_connectivity_state current_state) { lb_policy_connectivity_watcher* w = static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w))); @@ -289,8 +287,7 @@ static void watch_lb_policy_locked(channel_data* chand, grpc_combiner_scheduler(chand->combiner)); w->state = current_state; w->lb_policy = lb_policy; - grpc_lb_policy_notify_on_state_change_locked(lb_policy, &w->state, - &w->on_changed); + lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed); } static void start_resolving_locked(channel_data* chand) { @@ -371,7 +368,7 @@ static void request_reresolution_locked(void* arg, grpc_error* error) { channel_data* chand = args->chand; // If this invocation is for a stale LB policy, treat it as an LB shutdown // signal. - if (args->lb_policy != chand->lb_policy || error != GRPC_ERROR_NONE || + if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE || chand->resolver == nullptr) { GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution"); gpr_free(args); @@ -382,7 +379,7 @@ static void request_reresolution_locked(void* arg, grpc_error* error) { } chand->resolver->RequestReresolutionLocked(); // Give back the closure to the LB policy. - grpc_lb_policy_set_reresolve_closure_locked(chand->lb_policy, &args->closure); + chand->lb_policy->SetReresolutionClosureLocked(&args->closure); } static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { @@ -393,9 +390,10 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { } // Extract the following fields from the resolver result, if non-NULL. bool lb_policy_updated = false; + bool lb_policy_created = false; char* lb_policy_name_dup = nullptr; bool lb_policy_name_changed = false; - grpc_lb_policy* new_lb_policy = nullptr; + grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy; char* service_config_json = nullptr; grpc_server_retry_throttle_data* retry_throttle_data = nullptr; grpc_slice_hash_table* method_params_table = nullptr; @@ -433,10 +431,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { // Use pick_first if nothing was specified and we didn't select grpclb // above. if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; - grpc_lb_policy_args lb_policy_args; - lb_policy_args.args = chand->resolver_result; - lb_policy_args.client_channel_factory = chand->client_channel_factory; - lb_policy_args.combiner = chand->combiner; + // Check to see if we're already using the right LB policy. // Note: It's safe to use chand->info_lb_policy_name here without // taking a lock on chand->info_mu, because this function is the @@ -448,10 +443,17 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { if (chand->lb_policy != nullptr && !lb_policy_name_changed) { // Continue using the same LB policy. Update with new addresses. lb_policy_updated = true; - grpc_lb_policy_update_locked(chand->lb_policy, &lb_policy_args); + chand->lb_policy->UpdateLocked(*chand->resolver_result); } else { // Instantiate new LB policy. - new_lb_policy = grpc_lb_policy_create(lb_policy_name, &lb_policy_args); + lb_policy_created = true; + grpc_core::LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.combiner = chand->combiner; + lb_policy_args.client_channel_factory = chand->client_channel_factory; + lb_policy_args.args = chand->resolver_result; + new_lb_policy = + grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + lb_policy_name, lb_policy_args); if (new_lb_policy == nullptr) { gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); @@ -460,12 +462,11 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { static_cast<reresolution_request_args*>( gpr_zalloc(sizeof(*args))); args->chand = chand; - args->lb_policy = new_lb_policy; + args->lb_policy = new_lb_policy.get(); GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, grpc_combiner_scheduler(chand->combiner)); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); - grpc_lb_policy_set_reresolve_closure_locked(new_lb_policy, - &args->closure); + new_lb_policy->SetReresolutionClosureLocked(&args->closure); } } // Find service config. @@ -548,14 +549,14 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { if (chand->lb_policy != nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand, - chand->lb_policy); + chand->lb_policy.get()); } - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, + grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), chand->interested_parties); - grpc_lb_policy_shutdown_locked(chand->lb_policy, new_lb_policy); - GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); + chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get()); + chand->lb_policy.reset(); } - chand->lb_policy = new_lb_policy; + chand->lb_policy = std::move(new_lb_policy); } // Now that we've swapped out the relevant fields of chand, check for // error or shutdown. @@ -583,21 +584,20 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_error* state_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); - if (new_lb_policy != nullptr) { + if (lb_policy_created) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand); } GRPC_ERROR_UNREF(state_error); - state = - grpc_lb_policy_check_connectivity_locked(new_lb_policy, &state_error); - grpc_pollset_set_add_pollset_set(new_lb_policy->interested_parties, + state = chand->lb_policy->CheckConnectivityLocked(&state_error); + grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(), chand->interested_parties); GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); if (chand->exit_idle_when_lb_policy_arrives) { - grpc_lb_policy_exit_idle_locked(new_lb_policy); + chand->lb_policy->ExitIdleLocked(); chand->exit_idle_when_lb_policy_arrives = false; } - watch_lb_policy_locked(chand, new_lb_policy, state); + watch_lb_policy_locked(chand, chand->lb_policy.get(), state); } if (!lb_policy_updated) { set_channel_connectivity_state_locked( @@ -632,8 +632,8 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { op->send_ping.on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); } else { - grpc_lb_policy_ping_one_locked( - chand->lb_policy, op->send_ping.on_initiate, op->send_ping.on_ack); + chand->lb_policy->PingOneLocked(op->send_ping.on_initiate, + op->send_ping.on_ack); op->bind_pollset = nullptr; } op->send_ping.on_initiate = nullptr; @@ -652,11 +652,9 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); } if (chand->lb_policy != nullptr) { - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, + grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), chand->interested_parties); - grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr); - GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); - chand->lb_policy = nullptr; + chand->lb_policy.reset(); } } GRPC_ERROR_UNREF(op->disconnect_with_error); @@ -786,10 +784,9 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { grpc_client_channel_factory_unref(chand->client_channel_factory); } if (chand->lb_policy != nullptr) { - grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, + grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), chand->interested_parties); - grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr); - GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); + chand->lb_policy.reset(); } gpr_free(chand->info_lb_policy_name); gpr_free(chand->info_service_config_json); @@ -849,7 +846,7 @@ typedef struct client_channel_call_data { grpc_subchannel_call* subchannel_call; grpc_error* error; - grpc_lb_policy_pick_state pick; + grpc_core::LoadBalancingPolicy::PickState pick; grpc_closure lb_pick_closure; grpc_closure lb_pick_cancel_closure; @@ -1070,15 +1067,14 @@ static void pick_callback_cancel_locked(void* arg, grpc_error* error) { if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", - chand, calld, chand->lb_policy); + chand, calld, chand->lb_policy.get()); } - grpc_lb_policy_cancel_pick_locked(chand->lb_policy, &calld->pick, - GRPC_ERROR_REF(error)); + chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); } GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); } -// Callback invoked by grpc_lb_policy_pick_locked() for async picks. +// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. // Unrefs the LB policy and invokes async_pick_done_locked(). static void pick_callback_done_locked(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); @@ -1092,15 +1088,14 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) { GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); } -// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked(). -// If the pick was completed synchronously, unrefs the LB policy and -// returns true. +// Starts a pick on chand->lb_policy. +// Returns true if pick is completed synchronously. static bool pick_callback_start_locked(grpc_call_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", - chand, calld, chand->lb_policy); + chand, calld, chand->lb_policy.get()); } apply_service_config_to_call_locked(elem); // If the application explicitly set wait_for_ready, use that. @@ -1130,10 +1125,9 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { grpc_combiner_scheduler(chand->combiner)); calld->pick.on_complete = &calld->lb_pick_closure; GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); - const bool pick_done = - grpc_lb_policy_pick_locked(chand->lb_policy, &calld->pick); + const bool pick_done = chand->lb_policy->PickLocked(&calld->pick); if (pick_done) { - /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ + // Pick completed synchronously. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", chand, calld); @@ -1498,7 +1492,7 @@ const grpc_channel_filter grpc_client_channel_filter = { static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { channel_data* chand = static_cast<channel_data*>(arg); if (chand->lb_policy != nullptr) { - grpc_lb_policy_exit_idle_locked(chand->lb_policy); + chand->lb_policy->ExitIdleLocked(); } else { chand->exit_idle_when_lb_policy_arrives = true; if (!chand->started_resolving && chand->resolver != nullptr) { diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.cc b/src/core/ext/filters/client_channel/client_channel_plugin.cc index 9172fa781c..3c3a97532f 100644 --- a/src/core/ext/filters/client_channel/client_channel_plugin.cc +++ b/src/core/ext/filters/client_channel/client_channel_plugin.cc @@ -63,7 +63,7 @@ static bool set_default_host_if_unset(grpc_channel_stack_builder* builder, } void grpc_client_channel_init(void) { - grpc_lb_policy_registry_init(); + grpc_core::LoadBalancingPolicyRegistry::Builder::InitRegistry(); grpc_core::ResolverRegistry::Builder::InitRegistry(); grpc_retry_throttle_map_init(); grpc_proxy_mapper_registry_init(); @@ -83,5 +83,5 @@ void grpc_client_channel_shutdown(void) { grpc_proxy_mapper_registry_shutdown(); grpc_retry_throttle_map_shutdown(); grpc_core::ResolverRegistry::Builder::ShutdownRegistry(); - grpc_lb_policy_registry_shutdown(); + grpc_core::LoadBalancingPolicyRegistry::Builder::ShutdownRegistry(); } diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index 27fb2ad1f4..59f4cdafb3 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -22,121 +22,36 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount( false, "lb_policy_refcount"); -void grpc_lb_policy_init(grpc_lb_policy* policy, - const grpc_lb_policy_vtable* vtable, - grpc_combiner* combiner) { - policy->vtable = vtable; - gpr_ref_init(&policy->refs, 1); - policy->interested_parties = grpc_pollset_set_create(); - policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy"); -} - -#ifndef NDEBUG -void grpc_lb_policy_ref(grpc_lb_policy* lb_policy, const char* file, int line, - const char* reason) { - if (grpc_trace_lb_policy_refcount.enabled()) { - gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count); - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "LB_POLICY:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy, - old_refs, old_refs + 1, reason); - } -#else -void grpc_lb_policy_ref(grpc_lb_policy* lb_policy) { -#endif - gpr_ref(&lb_policy->refs); -} - -#ifndef NDEBUG -void grpc_lb_policy_unref(grpc_lb_policy* lb_policy, const char* file, int line, - const char* reason) { - if (grpc_trace_lb_policy_refcount.enabled()) { - gpr_atm old_refs = gpr_atm_no_barrier_load(&lb_policy->refs.count); - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "LB_POLICY:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", lb_policy, - old_refs, old_refs - 1, reason); - } -#else -void grpc_lb_policy_unref(grpc_lb_policy* lb_policy) { -#endif - if (gpr_unref(&lb_policy->refs)) { - grpc_pollset_set_destroy(lb_policy->interested_parties); - grpc_combiner* combiner = lb_policy->combiner; - lb_policy->vtable->destroy(lb_policy); - GRPC_COMBINER_UNREF(combiner, "lb_policy"); - } -} - -void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy, - grpc_lb_policy* new_policy) { - policy->vtable->shutdown_locked(policy, new_policy); -} - -int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, - grpc_lb_policy_pick_state* pick) { - return policy->vtable->pick_locked(policy, pick); -} - -void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy, - grpc_lb_policy_pick_state* pick, - grpc_error* error) { - policy->vtable->cancel_pick_locked(policy, pick, error); -} - -void grpc_lb_policy_cancel_picks_locked(grpc_lb_policy* policy, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error* error) { - policy->vtable->cancel_picks_locked(policy, initial_metadata_flags_mask, - initial_metadata_flags_eq, error); -} +namespace grpc_core { -void grpc_lb_policy_exit_idle_locked(grpc_lb_policy* policy) { - policy->vtable->exit_idle_locked(policy); -} +LoadBalancingPolicy::LoadBalancingPolicy(const Args& args) + : InternallyRefCountedWithTracing(&grpc_trace_lb_policy_refcount), + combiner_(GRPC_COMBINER_REF(args.combiner, "lb_policy")), + client_channel_factory_(args.client_channel_factory), + interested_parties_(grpc_pollset_set_create()), + request_reresolution_(nullptr) {} -void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy, - grpc_closure* on_initiate, - grpc_closure* on_ack) { - policy->vtable->ping_one_locked(policy, on_initiate, on_ack); +LoadBalancingPolicy::~LoadBalancingPolicy() { + grpc_pollset_set_destroy(interested_parties_); + GRPC_COMBINER_UNREF(combiner_, "lb_policy"); } -void grpc_lb_policy_notify_on_state_change_locked( - grpc_lb_policy* policy, grpc_connectivity_state* state, - grpc_closure* closure) { - policy->vtable->notify_on_state_change_locked(policy, state, closure); -} - -grpc_connectivity_state grpc_lb_policy_check_connectivity_locked( - grpc_lb_policy* policy, grpc_error** connectivity_error) { - return policy->vtable->check_connectivity_locked(policy, connectivity_error); -} - -void grpc_lb_policy_update_locked(grpc_lb_policy* policy, - const grpc_lb_policy_args* lb_policy_args) { - policy->vtable->update_locked(policy, lb_policy_args); -} - -void grpc_lb_policy_set_reresolve_closure_locked( - grpc_lb_policy* policy, grpc_closure* request_reresolution) { - GPR_ASSERT(policy->request_reresolution == nullptr); - policy->request_reresolution = request_reresolution; -} - -void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy, - grpc_core::TraceFlag* grpc_lb_trace, - grpc_error* error) { - if (policy->request_reresolution != nullptr) { - GRPC_CLOSURE_SCHED(policy->request_reresolution, error); - policy->request_reresolution = nullptr; +void LoadBalancingPolicy::TryReresolutionLocked( + grpc_core::TraceFlag* grpc_lb_trace, grpc_error* error) { + if (request_reresolution_ != nullptr) { + GRPC_CLOSURE_SCHED(request_reresolution_, error); + request_reresolution_ = nullptr; if (grpc_lb_trace->enabled()) { gpr_log(GPR_DEBUG, "%s %p: scheduling re-resolution closure with error=%s.", - grpc_lb_trace->name(), policy, grpc_error_string(error)); + grpc_lb_trace->name(), this, grpc_error_string(error)); } } else { if (grpc_lb_trace->enabled()) { gpr_log(GPR_DEBUG, "%s %p: no available re-resolution closure.", - grpc_lb_trace->name(), policy); + grpc_lb_trace->name(), this); } } } + +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 6edd314d5e..6de652747e 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -19,182 +19,181 @@ #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H +#include "src/core/ext/filters/client_channel/client_channel_factory.h" #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/transport/connectivity_state.h" -/** A load balancing policy: specified by a vtable and a struct (which - is expected to be extended to contain some parameters) */ -typedef struct grpc_lb_policy grpc_lb_policy; -typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable; -typedef struct grpc_lb_policy_args grpc_lb_policy_args; - extern grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount; -struct grpc_lb_policy { - const grpc_lb_policy_vtable* vtable; - gpr_refcount refs; - /* owned pointer to interested parties in load balancing decisions */ - grpc_pollset_set* interested_parties; - /* combiner under which lb_policy actions take place */ - grpc_combiner* combiner; - /* callback to force a re-resolution */ - grpc_closure* request_reresolution; -}; - -/// State used for an LB pick. -typedef struct grpc_lb_policy_pick_state { - /// Initial metadata associated with the picking call. - grpc_metadata_batch* initial_metadata; - /// Bitmask used for selective cancelling. See \a - /// grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in - /// grpc_types.h. - uint32_t initial_metadata_flags; - /// Storage for LB token in \a initial_metadata, or NULL if not used. - grpc_linked_mdelem lb_token_mdelem_storage; - /// Closure to run when pick is complete, if not completed synchronously. - grpc_closure* on_complete; - /// Will be set to the selected subchannel, or nullptr on failure or when - /// the LB policy decides to drop the call. - grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel; - /// Will be populated with context to pass to the subchannel call, if needed. - grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; - /// Upon success, \a *user_data will be set to whatever opaque information - /// may need to be propagated from the LB policy, or NULL if not needed. - void** user_data; - /// Next pointer. For internal use by LB policy. - struct grpc_lb_policy_pick_state* next; -} grpc_lb_policy_pick_state; - -struct grpc_lb_policy_vtable { - void (*destroy)(grpc_lb_policy* policy); - - /// \see grpc_lb_policy_shutdown_locked(). - void (*shutdown_locked)(grpc_lb_policy* policy, grpc_lb_policy* new_policy); - - /** \see grpc_lb_policy_pick */ - int (*pick_locked)(grpc_lb_policy* policy, grpc_lb_policy_pick_state* pick); - - /** \see grpc_lb_policy_cancel_pick */ - void (*cancel_pick_locked)(grpc_lb_policy* policy, - grpc_lb_policy_pick_state* pick, +namespace grpc_core { + +/// Interface for load balancing policies. +/// +/// Note: All methods with a "Locked" suffix must be called from the +/// combiner passed to the constructor. +/// +/// Any I/O done by the LB policy should be done under the pollset_set +/// returned by \a interested_parties(). +class LoadBalancingPolicy + : public InternallyRefCountedWithTracing<LoadBalancingPolicy> { + public: + struct Args { + /// The combiner under which all LB policy calls will be run. + /// Policy does NOT take ownership of the reference to the combiner. + // TODO(roth): Once we have a C++-like interface for combiners, this + // API should change to take a smart pointer that does pass ownership + // of a reference. + grpc_combiner* combiner = nullptr; + /// Used to create channels and subchannels. + grpc_client_channel_factory* client_channel_factory = nullptr; + /// Channel args from the resolver. + /// Note that the LB policy gets the set of addresses from the + /// GRPC_ARG_LB_ADDRESSES channel arg. + grpc_channel_args* args = nullptr; + }; + + /// State used for an LB pick. + struct PickState { + /// Initial metadata associated with the picking call. + grpc_metadata_batch* initial_metadata; + /// Bitmask used for selective cancelling. See + /// \a CancelMatchingPicksLocked() and \a GRPC_INITIAL_METADATA_* in + /// grpc_types.h. + uint32_t initial_metadata_flags; + /// Storage for LB token in \a initial_metadata, or nullptr if not used. + grpc_linked_mdelem lb_token_mdelem_storage; + /// Closure to run when pick is complete, if not completed synchronously. + grpc_closure* on_complete; + /// Will be set to the selected subchannel, or nullptr on failure or when + /// the LB policy decides to drop the call. + RefCountedPtr<ConnectedSubchannel> connected_subchannel; + /// Will be populated with context to pass to the subchannel call, if + /// needed. + grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; + /// Upon success, \a *user_data will be set to whatever opaque information + /// may need to be propagated from the LB policy, or nullptr if not needed. + // TODO(roth): As part of revamping our metadata APIs, try to find a + // way to clean this up and C++-ify it. + void** user_data; + /// Next pointer. For internal use by LB policy. + PickState* next; + }; + + // Not copyable nor movable. + LoadBalancingPolicy(const LoadBalancingPolicy&) = delete; + LoadBalancingPolicy& operator=(const LoadBalancingPolicy&) = delete; + + /// Updates the policy with a new set of \a args from the resolver. + /// Note that the LB policy gets the set of addresses from the + /// GRPC_ARG_LB_ADDRESSES channel arg. + virtual void UpdateLocked(const grpc_channel_args& args) GRPC_ABSTRACT; + + /// Finds an appropriate subchannel for a call, based on data in \a pick. + /// \a pick must remain alive until the pick is complete. + /// + /// If the pick succeeds and a result is known immediately, returns true. + /// Otherwise, \a pick->on_complete will be invoked once the pick is + /// complete with its error argument set to indicate success or failure. + virtual bool PickLocked(PickState* pick) GRPC_ABSTRACT; + + /// Cancels \a pick. + /// The \a on_complete callback of the pending pick will be invoked with + /// \a pick->connected_subchannel set to null. + virtual void CancelPickLocked(PickState* pick, + grpc_error* error) GRPC_ABSTRACT; + + /// Cancels all pending picks for which their \a initial_metadata_flags (as + /// given in the call to \a PickLocked()) matches + /// \a initial_metadata_flags_eq when ANDed with + /// \a initial_metadata_flags_mask. + virtual void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error* error) GRPC_ABSTRACT; + + /// Requests a notification when the connectivity state of the policy + /// changes from \a *state. When that happens, sets \a *state to the + /// new state and schedules \a closure. + virtual void NotifyOnStateChangeLocked(grpc_connectivity_state* state, + grpc_closure* closure) GRPC_ABSTRACT; + + /// Returns the policy's current connectivity state. Sets \a error to + /// the associated error, if any. + virtual grpc_connectivity_state CheckConnectivityLocked( + grpc_error** connectivity_error) GRPC_ABSTRACT; + + /// Hands off pending picks to \a new_policy. + virtual void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) + GRPC_ABSTRACT; + + /// Performs a connected subchannel ping via \a ConnectedSubchannel::Ping() + /// against one of the connected subchannels managed by the policy. + /// Note: This is intended only for use in tests. + virtual void PingOneLocked(grpc_closure* on_initiate, + grpc_closure* on_ack) GRPC_ABSTRACT; + + /// Tries to enter a READY connectivity state. + /// TODO(roth): As part of restructuring how we handle IDLE state, + /// consider whether this method is still needed. + virtual void ExitIdleLocked() GRPC_ABSTRACT; + + void Orphan() override { + // Invoke ShutdownAndUnrefLocked() inside of the combiner. + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE(&LoadBalancingPolicy::ShutdownAndUnrefLocked, this, + grpc_combiner_scheduler(combiner_)), + GRPC_ERROR_NONE); + } + + /// Sets the re-resolution closure to \a request_reresolution. + void SetReresolutionClosureLocked(grpc_closure* request_reresolution) { + GPR_ASSERT(request_reresolution_ == nullptr); + request_reresolution_ = request_reresolution; + } + + grpc_pollset_set* interested_parties() const { return interested_parties_; } + + GRPC_ABSTRACT_BASE_CLASS + + protected: + explicit LoadBalancingPolicy(const Args& args); + virtual ~LoadBalancingPolicy(); + + grpc_combiner* combiner() const { return combiner_; } + grpc_client_channel_factory* client_channel_factory() const { + return client_channel_factory_; + } + + /// Shuts down the policy. Any pending picks that have not been + /// handed off to a new policy via HandOffPendingPicksLocked() will be + /// failed. + virtual void ShutdownLocked() GRPC_ABSTRACT; + + /// Tries to request a re-resolution. + void TryReresolutionLocked(grpc_core::TraceFlag* grpc_lb_trace, grpc_error* error); - /** \see grpc_lb_policy_cancel_picks */ - void (*cancel_picks_locked)(grpc_lb_policy* policy, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error* error); - - /** \see grpc_lb_policy_ping_one */ - void (*ping_one_locked)(grpc_lb_policy* policy, grpc_closure* on_initiate, - grpc_closure* on_ack); - - /** Try to enter a READY connectivity state */ - void (*exit_idle_locked)(grpc_lb_policy* policy); - - /** check the current connectivity of the lb_policy */ - grpc_connectivity_state (*check_connectivity_locked)( - grpc_lb_policy* policy, grpc_error** connectivity_error); - - /** call notify when the connectivity state of a channel changes from *state. - Updates *state with the new state of the policy. Calling with a NULL \a - state cancels the subscription. */ - void (*notify_on_state_change_locked)(grpc_lb_policy* policy, - grpc_connectivity_state* state, - grpc_closure* closure); - - void (*update_locked)(grpc_lb_policy* policy, - const grpc_lb_policy_args* args); + private: + static void ShutdownAndUnrefLocked(void* arg, grpc_error* ignored) { + LoadBalancingPolicy* policy = static_cast<LoadBalancingPolicy*>(arg); + policy->ShutdownLocked(); + policy->Unref(); + } + + /// Combiner under which LB policy actions take place. + grpc_combiner* combiner_; + /// Client channel factory, used to create channels and subchannels. + grpc_client_channel_factory* client_channel_factory_; + /// Owned pointer to interested parties in load balancing decisions. + grpc_pollset_set* interested_parties_; + /// Callback to force a re-resolution. + grpc_closure* request_reresolution_; }; -#ifndef NDEBUG -#define GRPC_LB_POLICY_REF(p, r) \ - grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_LB_POLICY_UNREF(p, r) \ - grpc_lb_policy_unref((p), __FILE__, __LINE__, (r)) -void grpc_lb_policy_ref(grpc_lb_policy* policy, const char* file, int line, - const char* reason); -void grpc_lb_policy_unref(grpc_lb_policy* policy, const char* file, int line, - const char* reason); -#else // !NDEBUG -#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) -#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p)) -void grpc_lb_policy_ref(grpc_lb_policy* policy); -void grpc_lb_policy_unref(grpc_lb_policy* policy); -#endif - -/** called by concrete implementations to initialize the base struct */ -void grpc_lb_policy_init(grpc_lb_policy* policy, - const grpc_lb_policy_vtable* vtable, - grpc_combiner* combiner); - -/// Shuts down \a policy. -/// If \a new_policy is non-null, any pending picks will be restarted -/// on that policy; otherwise, they will be failed. -void grpc_lb_policy_shutdown_locked(grpc_lb_policy* policy, - grpc_lb_policy* new_policy); - -/** Finds an appropriate subchannel for a call, based on data in \a pick. - \a pick must remain alive until the pick is complete. - - If the pick succeeds and a result is known immediately, a non-zero - value will be returned. Otherwise, \a pick->on_complete will be invoked - once the pick is complete with its error argument set to indicate - success or failure. - - Any IO should be done under the \a interested_parties \a grpc_pollset_set - in the \a grpc_lb_policy struct. */ -int grpc_lb_policy_pick_locked(grpc_lb_policy* policy, - grpc_lb_policy_pick_state* pick); - -/** Perform a connected subchannel ping (see \a - grpc_core::ConnectedSubchannel::Ping) - against one of the connected subchannels managed by \a policy. */ -void grpc_lb_policy_ping_one_locked(grpc_lb_policy* policy, - grpc_closure* on_initiate, - grpc_closure* on_ack); - -/** Cancel picks for \a pick. - The \a on_complete callback of the pending picks will be invoked with \a - *target set to NULL. */ -void grpc_lb_policy_cancel_pick_locked(grpc_lb_policy* policy, - grpc_lb_policy_pick_state* pick, - grpc_error* error); - -/** Cancel all pending picks for which their \a initial_metadata_flags (as given - in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq - when AND'd with \a initial_metadata_flags_mask */ -void grpc_lb_policy_cancel_picks_locked(grpc_lb_policy* policy, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error* error); - -/** Try to enter a READY connectivity state */ -void grpc_lb_policy_exit_idle_locked(grpc_lb_policy* policy); - -/* Call notify when the connectivity state of a channel changes from \a *state. - * Updates \a *state with the new state of the policy */ -void grpc_lb_policy_notify_on_state_change_locked( - grpc_lb_policy* policy, grpc_connectivity_state* state, - grpc_closure* closure); - -grpc_connectivity_state grpc_lb_policy_check_connectivity_locked( - grpc_lb_policy* policy, grpc_error** connectivity_error); - -/** Update \a policy with \a lb_policy_args. */ -void grpc_lb_policy_update_locked(grpc_lb_policy* policy, - const grpc_lb_policy_args* lb_policy_args); - -/** Set the re-resolution closure to \a request_reresolution. */ -void grpc_lb_policy_set_reresolve_closure_locked( - grpc_lb_policy* policy, grpc_closure* request_reresolution); - -/** Try to request a re-resolution. It's NOT a public API; it's only for use by - the LB policy implementations. */ -void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy, - grpc_core::TraceFlag* grpc_lb_trace, - grpc_error* error); +} // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index da82b3f4da..11163a56dc 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -16,68 +16,48 @@ * */ -/** Implementation of the gRPC LB policy. - * - * This policy takes as input a set of resolved addresses {a1..an} for which the - * LB set was set (it's the resolver's responsibility to ensure this). That is - * to say, {a1..an} represent a collection of LB servers. - * - * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}. - * This channel behaves just like a regular channel. In particular, the - * constructed URI over the addresses a1..an will use the default pick first - * policy to select from this list of LB server backends. - * - * The first time the policy gets a request for a pick, a ping, or to exit the - * idle state, \a query_for_backends_locked() is called. This function sets up - * and initiates the internal communication with the LB server. In particular, - * it's responsible for instantiating the internal *streaming* call to the LB - * server (whichever address from {a1..an} pick-first chose). This call is - * serviced by two callbacks, \a lb_on_server_status_received and \a - * lb_on_response_received. The former will be called when the call to the LB - * server completes. This can happen if the LB server closes the connection or - * if this policy itself cancels the call (for example because it's shutting - * down). If the internal call times out, the usual behavior of pick-first - * applies, continuing to pick from the list {a1..an}. - * - * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a - * res_recv. An invalid one results in the termination of the streaming call. A - * new streaming call should be created if possible, failing the original call - * otherwise. For a valid \a LoadBalancingResponse, the server list of actual - * backends is extracted. A Round Robin policy will be created from this list. - * There are two possible scenarios: - * - * 1. This is the first server list received. There was no previous instance of - * the Round Robin policy. \a rr_handover_locked() will instantiate the RR - * policy and perform all the pending operations over it. - * 2. There's already a RR policy instance active. We need to introduce the new - * one build from the new serverlist, but taking care not to disrupt the - * operations in progress over the old RR instance. This is done by - * decreasing the reference count on the old policy. The moment no more - * references are held on the old RR policy, it'll be destroyed and \a - * on_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN - * state. At this point we can transition to a new RR instance safely, which - * is done once again via \a rr_handover_locked(). - * - * - * Once a RR policy instance is in place (and getting updated as described), - * calls to for a pick, a ping or a cancellation will be serviced right away by - * forwarding them to the RR instance. Any time there's no RR policy available - * (ie, right after the creation of the gRPCLB policy, if an empty serverlist is - * received, etc), pick/ping requests are added to a list of pending picks/pings - * to be flushed and serviced as part of \a rr_handover_locked() the moment the - * RR policy instance becomes available. - * - * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the - * high level design and details. */ - -/* TODO(dgq): - * - Implement LB service forwarding (point 2c. in the doc's diagram). - */ +/// Implementation of the gRPC LB policy. +/// +/// This policy takes as input a list of resolved addresses, which must +/// include at least one balancer address. +/// +/// An internal channel (\a lb_channel_) is created for the addresses +/// from that are balancers. This channel behaves just like a regular +/// channel that uses pick_first to select from the list of balancer +/// addresses. +/// +/// The first time the policy gets a request for a pick, a ping, or to exit +/// the idle state, \a StartPickingLocked() is called. This method is +/// responsible for instantiating the internal *streaming* call to the LB +/// server (whichever address pick_first chose). The call will be complete +/// when either the balancer sends status or when we cancel the call (e.g., +/// because we are shutting down). In needed, we retry the call. If we +/// received at least one valid message from the server, a new call attempt +/// will be made immediately; otherwise, we apply back-off delays between +/// attempts. +/// +/// We maintain an internal round_robin policy instance for distributing +/// requests across backends. Whenever we receive a new serverlist from +/// the balancer, we update the round_robin policy with the new list of +/// addresses. If we cannot communicate with the balancer on startup, +/// however, we may enter fallback mode, in which case we will populate +/// the RR policy's addresses from the backend addresses returned by the +/// resolver. +/// +/// Once an RR policy instance is in place (and getting updated as described), +/// calls for a pick, a ping, or a cancellation will be serviced right +/// away by forwarding them to the RR instance. Any time there's no RR +/// policy available (i.e., right after the creation of the gRPCLB policy), +/// pick and ping requests are added to a list of pending picks and pings +/// to be flushed and serviced when the RR policy instance becomes available. +/// +/// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the +/// high level design and details. -/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when - using that endpoint. Because of various transitive includes in uv.h, - including windows.h on Windows, uv.h must be included before other system - headers. Therefore, sockaddr.h must always be included first */ +// With the addition of a libuv endpoint, sockaddr.h now includes uv.h when +// using that endpoint. Because of various transitive includes in uv.h, +// including windows.h on Windows, uv.h must be included before other system +// headers. Therefore, sockaddr.h must always be included first. #include "src/core/lib/iomgr/sockaddr.h" #include <inttypes.h> @@ -93,7 +73,6 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel_factory.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" -#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" @@ -108,6 +87,8 @@ #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" @@ -127,336 +108,294 @@ #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 -grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb"); +namespace grpc_core { -struct glb_lb_policy; +TraceFlag grpc_lb_glb_trace(false, "glb"); namespace { -/// Linked list of pending pick requests. It stores all information needed to -/// eventually call (Round Robin's) pick() on them. They mainly stay pending -/// waiting for the RR policy to be created. -/// -/// Note that when a pick is sent to the RR policy, we inject our own -/// on_complete callback, so that we can intercept the result before -/// invoking the original on_complete callback. This allows us to set the -/// LB token metadata and add client_stats to the call context. -/// See \a pending_pick_complete() for details. -struct pending_pick { - // Our on_complete closure and the original one. - grpc_closure on_complete; - grpc_closure* original_on_complete; - // The original pick. - grpc_lb_policy_pick_state* pick; - // Stats for client-side load reporting. Note that this holds a - // reference, which must be either passed on via context or unreffed. - grpc_grpclb_client_stats* client_stats; - // The LB token associated with the pick. This is set via user_data in - // the pick. - grpc_mdelem lb_token; - // The grpclb instance that created the wrapping. This instance is not owned, - // reference counts are untouched. It's used only for logging purposes. - glb_lb_policy* glb_policy; - // Next pending pick. - struct pending_pick* next; -}; +class GrpcLb : public LoadBalancingPolicy { + public: + GrpcLb(const grpc_lb_addresses* addresses, const Args& args); + + void UpdateLocked(const grpc_channel_args& args) override; + bool PickLocked(PickState* pick) override; + void CancelPickLocked(PickState* pick, grpc_error* error) override; + void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error* error) override; + void NotifyOnStateChangeLocked(grpc_connectivity_state* state, + grpc_closure* closure) override; + grpc_connectivity_state CheckConnectivityLocked( + grpc_error** connectivity_error) override; + void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; + void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; + void ExitIdleLocked() override; + + private: + /// Linked list of pending pick requests. It stores all information needed to + /// eventually call (Round Robin's) pick() on them. They mainly stay pending + /// waiting for the RR policy to be created. + /// + /// Note that when a pick is sent to the RR policy, we inject our own + /// on_complete callback, so that we can intercept the result before + /// invoking the original on_complete callback. This allows us to set the + /// LB token metadata and add client_stats to the call context. + /// See \a pending_pick_complete() for details. + struct PendingPick { + // The grpclb instance that created the wrapping. This instance is not + // owned; reference counts are untouched. It's used only for logging + // purposes. + GrpcLb* grpclb_policy; + // The original pick. + PickState* pick; + // Our on_complete closure and the original one. + grpc_closure on_complete; + grpc_closure* original_on_complete; + // The LB token associated with the pick. This is set via user_data in + // the pick. + grpc_mdelem lb_token; + // Stats for client-side load reporting. Note that this holds a + // reference, which must be either passed on via context or unreffed. + grpc_grpclb_client_stats* client_stats = nullptr; + // Next pending pick. + PendingPick* next = nullptr; + }; + + /// A linked list of pending pings waiting for the RR policy to be created. + struct PendingPing { + grpc_closure* on_initiate; + grpc_closure* on_ack; + PendingPing* next = nullptr; + }; + + /// Contains a call to the LB server and all the data related to the call. + class BalancerCallState + : public InternallyRefCountedWithTracing<BalancerCallState> { + public: + explicit BalancerCallState( + RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy); + + // It's the caller's responsibility to ensure that Orphan() is called from + // inside the combiner. + void Orphan() override; + + void StartQuery(); + + grpc_grpclb_client_stats* client_stats() const { return client_stats_; } + bool seen_initial_response() const { return seen_initial_response_; } + + private: + ~BalancerCallState(); + + GrpcLb* grpclb_policy() const { + return reinterpret_cast<GrpcLb*>(grpclb_policy_.get()); + } -/// A linked list of pending pings waiting for the RR policy to be created. -struct pending_ping { - grpc_closure* on_initiate; - grpc_closure* on_ack; - struct pending_ping* next; + void ScheduleNextClientLoadReportLocked(); + void SendClientLoadReportLocked(); + + static bool LoadReportCountersAreZero(grpc_grpclb_request* request); + + static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error); + static void ClientLoadReportDoneLocked(void* arg, grpc_error* error); + static void OnInitialRequestSentLocked(void* arg, grpc_error* error); + static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error); + static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error); + + // The owning LB policy. + RefCountedPtr<LoadBalancingPolicy> grpclb_policy_; + + // The streaming call to the LB server. Always non-NULL. + grpc_call* lb_call_ = nullptr; + + // recv_initial_metadata + grpc_metadata_array lb_initial_metadata_recv_; + + // send_message + grpc_byte_buffer* send_message_payload_ = nullptr; + grpc_closure lb_on_initial_request_sent_; + + // recv_message + grpc_byte_buffer* recv_message_payload_ = nullptr; + grpc_closure lb_on_balancer_message_received_; + bool seen_initial_response_ = false; + + // recv_trailing_metadata + grpc_closure lb_on_balancer_status_received_; + grpc_metadata_array lb_trailing_metadata_recv_; + grpc_status_code lb_call_status_; + grpc_slice lb_call_status_details_; + + // The stats for client-side load reporting associated with this LB call. + // Created after the first serverlist is received. + grpc_grpclb_client_stats* client_stats_ = nullptr; + grpc_millis client_stats_report_interval_ = 0; + grpc_timer client_load_report_timer_; + bool client_load_report_timer_callback_pending_ = false; + bool last_client_load_report_counters_were_zero_ = false; + bool client_load_report_is_due_ = false; + // The closure used for either the load report timer or the callback for + // completion of sending the load report. + grpc_closure client_load_report_closure_; + }; + + ~GrpcLb(); + + void ShutdownLocked() override; + + // Helper function used in ctor and UpdateLocked(). + void ProcessChannelArgsLocked(const grpc_channel_args& args); + + // Methods for dealing with the balancer channel and call. + void StartPickingLocked(); + void StartBalancerCallLocked(); + static void OnFallbackTimerLocked(void* arg, grpc_error* error); + void StartBalancerCallRetryTimerLocked(); + static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error); + static void OnBalancerChannelConnectivityChangedLocked(void* arg, + grpc_error* error); + + // Pending pick methods. + static void PendingPickSetMetadataAndContext(PendingPick* pp); + PendingPick* PendingPickCreate(PickState* pick); + void AddPendingPick(PendingPick* pp); + static void OnPendingPickComplete(void* arg, grpc_error* error); + + // Pending ping methods. + void AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack); + + // Methods for dealing with the RR policy. + void CreateOrUpdateRoundRobinPolicyLocked(); + grpc_channel_args* CreateRoundRobinPolicyArgsLocked(); + void CreateRoundRobinPolicyLocked(const Args& args); + bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp); + void UpdateConnectivityStateFromRoundRobinPolicyLocked( + grpc_error* rr_state_error); + static void OnRoundRobinConnectivityChangedLocked(void* arg, + grpc_error* error); + static void OnRoundRobinRequestReresolutionLocked(void* arg, + grpc_error* error); + + // Who the client is trying to communicate with. + const char* server_name_ = nullptr; + + // Current channel args from the resolver. + grpc_channel_args* args_ = nullptr; + + // Internal state. + bool started_picking_ = false; + bool shutting_down_ = false; + grpc_connectivity_state_tracker state_tracker_; + + // The channel for communicating with the LB server. + grpc_channel* lb_channel_ = nullptr; + grpc_connectivity_state lb_channel_connectivity_; + grpc_closure lb_channel_on_connectivity_changed_; + // Are we already watching the LB channel's connectivity? + bool watching_lb_channel_ = false; + // Response generator to inject address updates into lb_channel_. + RefCountedPtr<FakeResolverResponseGenerator> response_generator_; + + // The data associated with the current LB call. It holds a ref to this LB + // policy. It's initialized every time we query for backends. It's reset to + // NULL whenever the current LB call is no longer needed (e.g., the LB policy + // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always + // contains a non-NULL lb_call_. + OrphanablePtr<BalancerCallState> lb_calld_; + // Timeout in milliseconds for the LB call. 0 means no deadline. + int lb_call_timeout_ms_ = 0; + // Balancer call retry state. + BackOff lb_call_backoff_; + bool retry_timer_callback_pending_ = false; + grpc_timer lb_call_retry_timer_; + grpc_closure lb_on_call_retry_; + + // The deserialized response from the balancer. May be nullptr until one + // such response has arrived. + grpc_grpclb_serverlist* serverlist_ = nullptr; + // Index into serverlist for next pick. + // If the server at this index is a drop, we return a drop. + // Otherwise, we delegate to the RR policy. + size_t serverlist_index_ = 0; + + // Timeout in milliseconds for before using fallback backend addresses. + // 0 means not using fallback. + int lb_fallback_timeout_ms_ = 0; + // The backend addresses from the resolver. + grpc_lb_addresses* fallback_backend_addresses_ = nullptr; + // Fallback timer. + bool fallback_timer_callback_pending_ = false; + grpc_timer lb_fallback_timer_; + grpc_closure lb_on_fallback_; + + // Pending picks and pings that are waiting on the RR policy's connectivity. + PendingPick* pending_picks_ = nullptr; + PendingPing* pending_pings_ = nullptr; + + // The RR policy to use for the backends. + OrphanablePtr<LoadBalancingPolicy> rr_policy_; + grpc_connectivity_state rr_connectivity_state_; + grpc_closure on_rr_connectivity_changed_; + grpc_closure on_rr_request_reresolution_; }; -} // namespace - -typedef struct glb_lb_call_data { - struct glb_lb_policy* glb_policy; - // TODO(juanlishen): c++ize this struct. - gpr_refcount refs; - - /** The streaming call to the LB server. Always non-NULL. */ - grpc_call* lb_call; - - /** The initial metadata received from the LB server. */ - grpc_metadata_array lb_initial_metadata_recv; - - /** The message sent to the LB server. It's used to query for backends (the - * value may vary if the LB server indicates a redirect) or send client load - * report. */ - grpc_byte_buffer* send_message_payload; - /** The callback after the initial request is sent. */ - grpc_closure lb_on_sent_initial_request; - - /** The response received from the LB server, if any. */ - grpc_byte_buffer* recv_message_payload; - /** The callback to process the response received from the LB server. */ - grpc_closure lb_on_response_received; - bool seen_initial_response; - - /** The callback to process the status received from the LB server, which - * signals the end of the LB call. */ - grpc_closure lb_on_server_status_received; - /** The trailing metadata from the LB server. */ - grpc_metadata_array lb_trailing_metadata_recv; - /** The call status code and details. */ - grpc_status_code lb_call_status; - grpc_slice lb_call_status_details; - - /** The stats for client-side load reporting associated with this LB call. - * Created after the first serverlist is received. */ - grpc_grpclb_client_stats* client_stats; - /** The interval and timer for next client load report. */ - grpc_millis client_stats_report_interval; - grpc_timer client_load_report_timer; - bool client_load_report_timer_callback_pending; - bool last_client_load_report_counters_were_zero; - bool client_load_report_is_due; - /** The closure used for either the load report timer or the callback for - * completion of sending the load report. */ - grpc_closure client_load_report_closure; -} glb_lb_call_data; - -typedef struct glb_lb_policy { - /** Base policy: must be first. */ - grpc_lb_policy base; - - /** Who the client is trying to communicate with. */ - const char* server_name; - - /** Channel related data that will be propagated to the internal RR policy. */ - grpc_client_channel_factory* cc_factory; - grpc_channel_args* args; - - /** Timeout in milliseconds for before using fallback backend addresses. - * 0 means not using fallback. */ - int lb_fallback_timeout_ms; - - /** The channel for communicating with the LB server. */ - grpc_channel* lb_channel; - - /** The data associated with the current LB call. It holds a ref to this LB - * policy. It's initialized every time we query for backends. It's reset to - * NULL whenever the current LB call is no longer needed (e.g., the LB policy - * is shutting down, or the LB call has ended). A non-NULL lb_calld always - * contains a non-NULL lb_call. */ - glb_lb_call_data* lb_calld; - - /** response generator to inject address updates into \a lb_channel */ - grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator> - response_generator; - - /** the RR policy to use of the backend servers returned by the LB server */ - grpc_lb_policy* rr_policy; - - /** the connectivity state of the embedded RR policy */ - grpc_connectivity_state rr_connectivity_state; - - bool started_picking; - - /** our connectivity state tracker */ - grpc_connectivity_state_tracker state_tracker; - - /** connectivity state of the LB channel */ - grpc_connectivity_state lb_channel_connectivity; - - /** stores the deserialized response from the LB. May be nullptr until one - * such response has arrived. */ - grpc_grpclb_serverlist* serverlist; - - /** Index into serverlist for next pick. - * If the server at this index is a drop, we return a drop. - * Otherwise, we delegate to the RR policy. */ - size_t serverlist_index; - - /** stores the backend addresses from the resolver */ - grpc_lb_addresses* fallback_backend_addresses; - - /** list of picks that are waiting on RR's policy connectivity */ - pending_pick* pending_picks; - - /** list of pings that are waiting on RR's policy connectivity */ - pending_ping* pending_pings; - - bool shutting_down; - - /** are we already watching the LB channel's connectivity? */ - bool watching_lb_channel; - - /** is the callback associated with \a lb_call_retry_timer pending? */ - bool retry_timer_callback_pending; - - /** is the callback associated with \a lb_fallback_timer pending? */ - bool fallback_timer_callback_pending; - - /** called upon changes to the LB channel's connectivity. */ - grpc_closure lb_channel_on_connectivity_changed; - - /** called upon changes to the RR's connectivity. */ - grpc_closure rr_on_connectivity_changed; - - /** called upon reresolution request from the RR policy. */ - grpc_closure rr_on_reresolution_requested; - - /************************************************************/ - /* client data associated with the LB server communication */ - /************************************************************/ - - /** LB call retry backoff state */ - grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff; - - /** timeout in milliseconds for the LB call. 0 means no deadline. */ - int lb_call_timeout_ms; - - /** LB call retry timer */ - grpc_timer lb_call_retry_timer; - /** LB call retry timer callback */ - grpc_closure lb_on_call_retry; - - /** LB fallback timer */ - grpc_timer lb_fallback_timer; - /** LB fallback timer callback */ - grpc_closure lb_on_fallback; -} glb_lb_policy; - -static void glb_lb_call_data_ref(glb_lb_call_data* lb_calld, - const char* reason) { - gpr_ref_non_zero(&lb_calld->refs); - if (grpc_lb_glb_trace.enabled()) { - const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count); - gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p REF %lu->%lu (%s)", - grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld, - static_cast<unsigned long>(count - 1), - static_cast<unsigned long>(count), reason); - } -} +// +// serverlist parsing code +// -static void glb_lb_call_data_unref(glb_lb_call_data* lb_calld, - const char* reason) { - const bool done = gpr_unref(&lb_calld->refs); - if (grpc_lb_glb_trace.enabled()) { - const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count); - gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p UNREF %lu->%lu (%s)", - grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld, - static_cast<unsigned long>(count + 1), - static_cast<unsigned long>(count), reason); - } - if (done) { - GPR_ASSERT(lb_calld->lb_call != nullptr); - grpc_call_unref(lb_calld->lb_call); - grpc_metadata_array_destroy(&lb_calld->lb_initial_metadata_recv); - grpc_metadata_array_destroy(&lb_calld->lb_trailing_metadata_recv); - grpc_byte_buffer_destroy(lb_calld->send_message_payload); - grpc_byte_buffer_destroy(lb_calld->recv_message_payload); - grpc_slice_unref_internal(lb_calld->lb_call_status_details); - if (lb_calld->client_stats != nullptr) { - grpc_grpclb_client_stats_unref(lb_calld->client_stats); - } - GRPC_LB_POLICY_UNREF(&lb_calld->glb_policy->base, "lb_calld"); - gpr_free(lb_calld); - } +// vtable for LB tokens in grpc_lb_addresses +void* lb_token_copy(void* token) { + return token == nullptr + ? nullptr + : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload; } - -static void lb_call_data_shutdown(glb_lb_policy* glb_policy) { - GPR_ASSERT(glb_policy->lb_calld != nullptr); - GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr); - // lb_on_server_status_received will complete the cancellation and clean up. - grpc_call_cancel(glb_policy->lb_calld->lb_call, nullptr); - if (glb_policy->lb_calld->client_load_report_timer_callback_pending) { - grpc_timer_cancel(&glb_policy->lb_calld->client_load_report_timer); +void lb_token_destroy(void* token) { + if (token != nullptr) { + GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token}); } - glb_policy->lb_calld = nullptr; } - -/* add lb_token of selected subchannel (address) to the call's initial - * metadata */ -static grpc_error* initial_metadata_add_lb_token( - grpc_metadata_batch* initial_metadata, - grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) { - GPR_ASSERT(lb_token_mdelem_storage != nullptr); - GPR_ASSERT(!GRPC_MDISNULL(lb_token)); - return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, - lb_token); -} - -static void destroy_client_stats(void* arg) { - grpc_grpclb_client_stats_unref(static_cast<grpc_grpclb_client_stats*>(arg)); +int lb_token_cmp(void* token1, void* token2) { + if (token1 > token2) return 1; + if (token1 < token2) return -1; + return 0; } +const grpc_lb_user_data_vtable lb_token_vtable = { + lb_token_copy, lb_token_destroy, lb_token_cmp}; -static void pending_pick_set_metadata_and_context(pending_pick* pp) { - /* if connected_subchannel is nullptr, no pick has been made by the RR - * policy (e.g., all addresses failed to connect). There won't be any - * user_data/token available */ - if (pp->pick->connected_subchannel != nullptr) { - if (!GRPC_MDISNULL(pp->lb_token)) { - initial_metadata_add_lb_token(pp->pick->initial_metadata, - &pp->pick->lb_token_mdelem_storage, - GRPC_MDELEM_REF(pp->lb_token)); - } else { - gpr_log(GPR_ERROR, - "[grpclb %p] No LB token for connected subchannel pick %p", - pp->glb_policy, pp->pick); - abort(); - } - // Pass on client stats via context. Passes ownership of the reference. - if (pp->client_stats != nullptr) { - pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = - pp->client_stats; - pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = - destroy_client_stats; - } - } else { - if (pp->client_stats != nullptr) { - grpc_grpclb_client_stats_unref(pp->client_stats); +// Returns the backend addresses extracted from the given addresses. +grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) { + // First pass: count the number of backend addresses. + size_t num_backends = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (!addresses->addresses[i].is_balancer) { + ++num_backends; } } + // Second pass: actually populate the addresses and (empty) LB tokens. + grpc_lb_addresses* backend_addresses = + grpc_lb_addresses_create(num_backends, &lb_token_vtable); + size_t num_copied = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) continue; + const grpc_resolved_address* addr = &addresses->addresses[i].address; + grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, + addr->len, false /* is_balancer */, + nullptr /* balancer_name */, + (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); + ++num_copied; + } + return backend_addresses; } -/* The \a on_complete closure passed as part of the pick requires keeping a - * reference to its associated round robin instance. We wrap this closure in - * order to unref the round robin instance upon its invocation */ -static void pending_pick_complete(void* arg, grpc_error* error) { - pending_pick* pp = static_cast<pending_pick*>(arg); - pending_pick_set_metadata_and_context(pp); - GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error)); - gpr_free(pp); -} - -static pending_pick* pending_pick_create(glb_lb_policy* glb_policy, - grpc_lb_policy_pick_state* pick) { - pending_pick* pp = static_cast<pending_pick*>(gpr_zalloc(sizeof(*pp))); - pp->pick = pick; - pp->glb_policy = glb_policy; - GRPC_CLOSURE_INIT(&pp->on_complete, pending_pick_complete, pp, - grpc_schedule_on_exec_ctx); - pp->original_on_complete = pick->on_complete; - pp->pick->on_complete = &pp->on_complete; - return pp; -} - -static void pending_pick_add(pending_pick** root, pending_pick* new_pp) { - new_pp->next = *root; - *root = new_pp; -} - -static void pending_ping_add(pending_ping** root, grpc_closure* on_initiate, - grpc_closure* on_ack) { - pending_ping* pping = static_cast<pending_ping*>(gpr_zalloc(sizeof(*pping))); - pping->on_initiate = on_initiate; - pping->on_ack = on_ack; - pping->next = *root; - *root = pping; -} - -static bool is_server_valid(const grpc_grpclb_server* server, size_t idx, - bool log) { +bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) { if (server->drop) return false; const grpc_grpclb_ip_address* ip = &server->ip_address; if (server->port >> 16 != 0) { if (log) { gpr_log(GPR_ERROR, "Invalid port '%d' at index %lu of serverlist. Ignoring.", - server->port, static_cast<unsigned long>(idx)); + server->port, (unsigned long)idx); } return false; } @@ -465,65 +404,43 @@ static bool is_server_valid(const grpc_grpclb_server* server, size_t idx, gpr_log(GPR_ERROR, "Expected IP to be 4 or 16 bytes, got %d at index %lu of " "serverlist. Ignoring", - ip->size, static_cast<unsigned long>(idx)); + ip->size, (unsigned long)idx); } return false; } return true; } -/* vtable for LB tokens in grpc_lb_addresses. */ -static void* lb_token_copy(void* token) { - return token == nullptr - ? nullptr - : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload; -} -static void lb_token_destroy(void* token) { - if (token != nullptr) { - GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token}); - } -} -static int lb_token_cmp(void* token1, void* token2) { - if (token1 > token2) return 1; - if (token1 < token2) return -1; - return 0; -} -static const grpc_lb_user_data_vtable lb_token_vtable = { - lb_token_copy, lb_token_destroy, lb_token_cmp}; - -static void parse_server(const grpc_grpclb_server* server, - grpc_resolved_address* addr) { +void ParseServer(const grpc_grpclb_server* server, + grpc_resolved_address* addr) { memset(addr, 0, sizeof(*addr)); if (server->drop) return; - const uint16_t netorder_port = htons(static_cast<uint16_t>(server->port)); + const uint16_t netorder_port = htons((uint16_t)server->port); /* the addresses are given in binary format (a in(6)_addr struct) in * server->ip_address.bytes. */ const grpc_grpclb_ip_address* ip = &server->ip_address; if (ip->size == 4) { addr->len = sizeof(struct sockaddr_in); - struct sockaddr_in* addr4 = - reinterpret_cast<struct sockaddr_in*>(&addr->addr); + struct sockaddr_in* addr4 = (struct sockaddr_in*)&addr->addr; addr4->sin_family = AF_INET; memcpy(&addr4->sin_addr, ip->bytes, ip->size); addr4->sin_port = netorder_port; } else if (ip->size == 16) { addr->len = sizeof(struct sockaddr_in6); - struct sockaddr_in6* addr6 = - reinterpret_cast<struct sockaddr_in6*>(&addr->addr); + struct sockaddr_in6* addr6 = (struct sockaddr_in6*)&addr->addr; addr6->sin6_family = AF_INET6; memcpy(&addr6->sin6_addr, ip->bytes, ip->size); addr6->sin6_port = netorder_port; } } -/* Returns addresses extracted from \a serverlist. */ -static grpc_lb_addresses* process_serverlist_locked( - const grpc_grpclb_serverlist* serverlist) { +// Returns addresses extracted from \a serverlist. +grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { size_t num_valid = 0; /* first pass: count how many are valid in order to allocate the necessary * memory in a single block */ for (size_t i = 0; i < serverlist->num_servers; ++i) { - if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid; + if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid; } grpc_lb_addresses* lb_addresses = grpc_lb_addresses_create(num_valid, &lb_token_vtable); @@ -535,11 +452,11 @@ static grpc_lb_addresses* process_serverlist_locked( size_t addr_idx = 0; for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { const grpc_grpclb_server* server = serverlist->servers[sl_idx]; - if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; + if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue; GPR_ASSERT(addr_idx < num_valid); /* address processing */ grpc_resolved_address addr; - parse_server(server, &addr); + ParseServer(server, &addr); /* lb token processing */ void* user_data; if (server->has_load_balance_token) { @@ -570,317 +487,448 @@ static grpc_lb_addresses* process_serverlist_locked( return lb_addresses; } -/* Returns the backend addresses extracted from the given addresses */ -static grpc_lb_addresses* extract_backend_addresses_locked( - const grpc_lb_addresses* addresses) { - /* first pass: count the number of backend addresses */ - size_t num_backends = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) { - ++num_backends; - } - } - /* second pass: actually populate the addresses and (empty) LB tokens */ - grpc_lb_addresses* backend_addresses = - grpc_lb_addresses_create(num_backends, &lb_token_vtable); - size_t num_copied = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) continue; - const grpc_resolved_address* addr = &addresses->addresses[i].address; - grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, - addr->len, false /* is_balancer */, - nullptr /* balancer_name */, - (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); - ++num_copied; - } - return backend_addresses; +// +// GrpcLb::BalancerCallState +// + +GrpcLb::BalancerCallState::BalancerCallState( + RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy) + : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_glb_trace), + grpclb_policy_(std::move(parent_grpclb_policy)) { + GPR_ASSERT(grpclb_policy_ != nullptr); + GPR_ASSERT(!grpclb_policy()->shutting_down_); + // Init the LB call. Note that the LB call will progress every time there's + // activity in grpclb_policy_->interested_parties(), which is comprised of + // the polling entities from client_channel. + GPR_ASSERT(grpclb_policy()->server_name_ != nullptr); + GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0'); + grpc_slice host = + grpc_slice_from_copied_string(grpclb_policy()->server_name_); + grpc_millis deadline = + grpclb_policy()->lb_call_timeout_ms_ == 0 + ? GRPC_MILLIS_INF_FUTURE + : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_; + lb_call_ = grpc_channel_create_pollset_set_call( + grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, + grpclb_policy_->interested_parties(), + GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, + &host, deadline, nullptr); + grpc_slice_unref_internal(host); + // Init the LB call request payload. + grpc_grpclb_request* request = + grpc_grpclb_request_create(grpclb_policy()->server_name_); + grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); + send_message_payload_ = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref_internal(request_payload_slice); + grpc_grpclb_request_destroy(request); + // Init other data associated with the LB call. + grpc_metadata_array_init(&lb_initial_metadata_recv_); + grpc_metadata_array_init(&lb_trailing_metadata_recv_); + GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked, + this, grpc_combiner_scheduler(grpclb_policy()->combiner())); + GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_, + OnBalancerMessageReceivedLocked, this, + grpc_combiner_scheduler(grpclb_policy()->combiner())); + GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, + OnBalancerStatusReceivedLocked, this, + grpc_combiner_scheduler(grpclb_policy()->combiner())); } -static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy, - grpc_error* rr_state_error) { - const grpc_connectivity_state curr_glb_state = - grpc_connectivity_state_check(&glb_policy->state_tracker); - /* The new connectivity status is a function of the previous one and the new - * input coming from the status of the RR policy. - * - * current state (grpclb's) - * | - * v || I | C | R | TF | SD | <- new state (RR's) - * ===++====+=====+=====+======+======+ - * I || I | C | R | [I] | [I] | - * ---++----+-----+-----+------+------+ - * C || I | C | R | [C] | [C] | - * ---++----+-----+-----+------+------+ - * R || I | C | R | [R] | [R] | - * ---++----+-----+-----+------+------+ - * TF || I | C | R | [TF] | [TF] | - * ---++----+-----+-----+------+------+ - * SD || NA | NA | NA | NA | NA | (*) - * ---++----+-----+-----+------+------+ - * - * A [STATE] indicates that the old RR policy is kept. In those cases, STATE - * is the current state of grpclb, which is left untouched. - * - * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to - * the previous RR instance. - * - * Note that the status is never updated to SHUTDOWN as a result of calling - * this function. Only glb_shutdown() has the power to set that state. - * - * (*) This function mustn't be called during shutting down. */ - GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); - switch (glb_policy->rr_connectivity_state) { - case GRPC_CHANNEL_TRANSIENT_FAILURE: - case GRPC_CHANNEL_SHUTDOWN: - GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); - break; - case GRPC_CHANNEL_IDLE: - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_READY: - GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); +GrpcLb::BalancerCallState::~BalancerCallState() { + GPR_ASSERT(lb_call_ != nullptr); + grpc_call_unref(lb_call_); + grpc_metadata_array_destroy(&lb_initial_metadata_recv_); + grpc_metadata_array_destroy(&lb_trailing_metadata_recv_); + grpc_byte_buffer_destroy(send_message_payload_); + grpc_byte_buffer_destroy(recv_message_payload_); + grpc_slice_unref_internal(lb_call_status_details_); + if (client_stats_ != nullptr) { + grpc_grpclb_client_stats_unref(client_stats_); } +} + +void GrpcLb::BalancerCallState::Orphan() { + GPR_ASSERT(lb_call_ != nullptr); + // If we are here because grpclb_policy wants to cancel the call, + // lb_on_balancer_status_received_ will complete the cancellation and clean + // up. Otherwise, we are here because grpclb_policy has to orphan a failed + // call, then the following cancellation will be a no-op. + grpc_call_cancel(lb_call_, nullptr); + if (client_load_report_timer_callback_pending_) { + grpc_timer_cancel(&client_load_report_timer_); + } + // Note that the initial ref is hold by lb_on_balancer_status_received_ + // instead of the caller of this function. So the corresponding unref happens + // in lb_on_balancer_status_received_ instead of here. +} + +void GrpcLb::BalancerCallState::StartQuery() { + GPR_ASSERT(lb_call_ != nullptr); if (grpc_lb_glb_trace.enabled()) { - gpr_log( - GPR_INFO, - "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.", - glb_policy, - grpc_connectivity_state_name(glb_policy->rr_connectivity_state), - glb_policy->rr_policy); + gpr_log(GPR_INFO, + "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)", + grpclb_policy_.get(), this, lb_call_); } - grpc_connectivity_state_set(&glb_policy->state_tracker, - glb_policy->rr_connectivity_state, rr_state_error, - "update_lb_connectivity_status_locked"); + // Create the ops. + grpc_call_error call_error; + grpc_op ops[3]; + memset(ops, 0, sizeof(ops)); + // Op: send initial metadata. + grpc_op* op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = nullptr; + op++; + // Op: send request message. + GPR_ASSERT(send_message_payload_ != nullptr); + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = send_message_payload_; + op->flags = 0; + op->reserved = nullptr; + op++; + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent"); + self.release(); + call_error = grpc_call_start_batch_and_execute( + lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_); + GPR_ASSERT(GRPC_CALL_OK == call_error); + // Op: recv initial metadata. + op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = + &lb_initial_metadata_recv_; + op->flags = 0; + op->reserved = nullptr; + op++; + // Op: recv response. + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &recv_message_payload_; + op->flags = 0; + op->reserved = nullptr; + op++; + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + self = Ref(DEBUG_LOCATION, "on_message_received"); + self.release(); + call_error = grpc_call_start_batch_and_execute( + lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); + // Op: recv server status. + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = + &lb_trailing_metadata_recv_; + op->data.recv_status_on_client.status = &lb_call_status_; + op->data.recv_status_on_client.status_details = &lb_call_status_details_; + op->flags = 0; + op->reserved = nullptr; + op++; + // This callback signals the end of the LB call, so it relies on the initial + // ref instead of a new ref. When it's invoked, it's the initial ref that is + // unreffed. + call_error = grpc_call_start_batch_and_execute( + lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); +}; + +void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { + const grpc_millis next_client_load_report_time = + ExecCtx::Get()->Now() + client_stats_report_interval_; + GRPC_CLOSURE_INIT(&client_load_report_closure_, + MaybeSendClientLoadReportLocked, this, + grpc_combiner_scheduler(grpclb_policy()->combiner())); + grpc_timer_init(&client_load_report_timer_, next_client_load_report_time, + &client_load_report_closure_); + client_load_report_timer_callback_pending_ = true; } -/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return - * immediately (ignoring its completion callback), we need to perform the - * cleanups this callback would otherwise be responsible for. - * If \a force_async is true, then we will manually schedule the - * completion callback even if the pick is available immediately. */ -static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy, - bool force_async, pending_pick* pp) { - // Check for drops if we are not using fallback backend addresses. - if (glb_policy->serverlist != nullptr) { - // Look at the index into the serverlist to see if we should drop this call. - grpc_grpclb_server* server = - glb_policy->serverlist->servers[glb_policy->serverlist_index++]; - if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { - glb_policy->serverlist_index = 0; // Wrap-around. - } - if (server->drop) { - // Update client load reporting stats to indicate the number of - // dropped calls. Note that we have to do this here instead of in - // the client_load_reporting filter, because we do not create a - // subchannel call (and therefore no client_load_reporting filter) - // for dropped calls. - if (glb_policy->lb_calld != nullptr && - glb_policy->lb_calld->client_stats != nullptr) { - grpc_grpclb_client_stats_add_call_dropped_locked( - server->load_balance_token, glb_policy->lb_calld->client_stats); - } - if (force_async) { - GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - return false; - } - gpr_free(pp); - return true; - } - } - // Set client_stats and user_data. - if (glb_policy->lb_calld != nullptr && - glb_policy->lb_calld->client_stats != nullptr) { - pp->client_stats = - grpc_grpclb_client_stats_ref(glb_policy->lb_calld->client_stats); +void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked( + void* arg, grpc_error* error) { + BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg); + GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); + lb_calld->client_load_report_timer_callback_pending_ = false; + if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) { + lb_calld->Unref(DEBUG_LOCATION, "client_load_report"); + return; } - GPR_ASSERT(pp->pick->user_data == nullptr); - pp->pick->user_data = reinterpret_cast<void**>(&pp->lb_token); - // Pick via the RR policy. - bool pick_done = grpc_lb_policy_pick_locked(glb_policy->rr_policy, pp->pick); - if (pick_done) { - pending_pick_set_metadata_and_context(pp); - if (force_async) { - GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); - pick_done = false; - } - gpr_free(pp); + // If we've already sent the initial request, then we can go ahead and send + // the load report. Otherwise, we need to wait until the initial request has + // been sent to send this (see OnInitialRequestSentLocked()). + if (lb_calld->send_message_payload_ == nullptr) { + lb_calld->SendClientLoadReportLocked(); + } else { + lb_calld->client_load_report_is_due_ = true; } - /* else, the pending pick will be registered and taken care of by the - * pending pick list inside the RR policy (glb_policy->rr_policy). - * Eventually, wrapped_on_complete will be called, which will -among other - * things- add the LB token to the call's initial metadata */ - return pick_done; } -static grpc_lb_policy_args* lb_policy_args_create(glb_lb_policy* glb_policy) { - grpc_lb_addresses* addresses; - if (glb_policy->serverlist != nullptr) { - GPR_ASSERT(glb_policy->serverlist->num_servers > 0); - addresses = process_serverlist_locked(glb_policy->serverlist); - } else { - // If rr_handover_locked() is invoked when we haven't received any - // serverlist from the balancer, we use the fallback backends returned by - // the resolver. Note that the fallback backend list may be empty, in which - // case the new round_robin policy will keep the requested picks pending. - GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr); - addresses = grpc_lb_addresses_copy(glb_policy->fallback_backend_addresses); - } - GPR_ASSERT(addresses != nullptr); - grpc_lb_policy_args* args = - static_cast<grpc_lb_policy_args*>(gpr_zalloc(sizeof(*args))); - args->client_channel_factory = glb_policy->cc_factory; - args->combiner = glb_policy->base.combiner; - // Replace the LB addresses in the channel args that we pass down to - // the subchannel. - static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; - const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses); - args->args = grpc_channel_args_copy_and_add_and_remove( - glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, - 1); - grpc_lb_addresses_destroy(addresses); - return args; +bool GrpcLb::BalancerCallState::LoadReportCountersAreZero( + grpc_grpclb_request* request) { + grpc_grpclb_dropped_call_counts* drop_entries = + static_cast<grpc_grpclb_dropped_call_counts*>( + request->client_stats.calls_finished_with_drop.arg); + return request->client_stats.num_calls_started == 0 && + request->client_stats.num_calls_finished == 0 && + request->client_stats.num_calls_finished_with_client_failed_to_send == + 0 && + request->client_stats.num_calls_finished_known_received == 0 && + (drop_entries == nullptr || drop_entries->num_entries == 0); } -static void lb_policy_args_destroy(grpc_lb_policy_args* args) { - grpc_channel_args_destroy(args->args); - gpr_free(args); +void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { + // Construct message payload. + GPR_ASSERT(send_message_payload_ == nullptr); + grpc_grpclb_request* request = + grpc_grpclb_load_report_request_create_locked(client_stats_); + // Skip client load report if the counters were all zero in the last + // report and they are still zero in this one. + if (LoadReportCountersAreZero(request)) { + if (last_client_load_report_counters_were_zero_) { + grpc_grpclb_request_destroy(request); + ScheduleNextClientLoadReportLocked(); + return; + } + last_client_load_report_counters_were_zero_ = true; + } else { + last_client_load_report_counters_were_zero_ = false; + } + grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); + send_message_payload_ = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref_internal(request_payload_slice); + grpc_grpclb_request_destroy(request); + // Send the report. + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = send_message_payload_; + GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked, + this, grpc_combiner_scheduler(grpclb_policy()->combiner())); + grpc_call_error call_error = grpc_call_start_batch_and_execute( + lb_call_, &op, 1, &client_load_report_closure_); + if (call_error != GRPC_CALL_OK) { + gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(), + call_error); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } } -static void rr_on_reresolution_requested_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - if (glb_policy->shutting_down || error != GRPC_ERROR_NONE) { - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "rr_on_reresolution_requested_locked"); +void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg, + grpc_error* error) { + BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg); + GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); + grpc_byte_buffer_destroy(lb_calld->send_message_payload_); + lb_calld->send_message_payload_ = nullptr; + if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) { + lb_calld->Unref(DEBUG_LOCATION, "client_load_report"); return; } - if (grpc_lb_glb_trace.enabled()) { - gpr_log( - GPR_DEBUG, - "[grpclb %p] Re-resolution requested from the internal RR policy (%p).", - glb_policy, glb_policy->rr_policy); - } - // If we are talking to a balancer, we expect to get updated addresses form - // the balancer, so we can ignore the re-resolution request from the RR - // policy. Otherwise, handle the re-resolution request using glb's original - // re-resolution closure. - if (glb_policy->lb_calld == nullptr || - !glb_policy->lb_calld->seen_initial_response) { - grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace, - GRPC_ERROR_NONE); + lb_calld->ScheduleNextClientLoadReportLocked(); +} + +void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg, + grpc_error* error) { + BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg); + grpc_byte_buffer_destroy(lb_calld->send_message_payload_); + lb_calld->send_message_payload_ = nullptr; + // If we attempted to send a client load report before the initial request was + // sent (and this lb_calld is still in use), send the load report now. + if (lb_calld->client_load_report_is_due_ && + lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) { + lb_calld->SendClientLoadReportLocked(); + lb_calld->client_load_report_is_due_ = false; } - // Give back the wrapper closure to the RR policy. - grpc_lb_policy_set_reresolve_closure_locked( - glb_policy->rr_policy, &glb_policy->rr_on_reresolution_requested); + lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent"); } -static void create_rr_locked(glb_lb_policy* glb_policy, - grpc_lb_policy_args* args) { - GPR_ASSERT(glb_policy->rr_policy == nullptr); - grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args); - if (new_rr_policy == nullptr) { - gpr_log(GPR_ERROR, - "[grpclb %p] Failure creating a RoundRobin policy for serverlist " - "update with %" PRIuPTR - " entries. The previous RR instance (%p), if any, will continue to " - "be used. Future updates from the LB will attempt to create new " - "instances.", - glb_policy, glb_policy->serverlist->num_servers, - glb_policy->rr_policy); +void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( + void* arg, grpc_error* error) { + BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg); + GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); + // Empty payload means the LB call was cancelled. + if (lb_calld != grpclb_policy->lb_calld_.get() || + lb_calld->recv_message_payload_ == nullptr) { + lb_calld->Unref(DEBUG_LOCATION, "on_message_received"); return; } - GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_reresolution_requested_locked"); - grpc_lb_policy_set_reresolve_closure_locked( - new_rr_policy, &glb_policy->rr_on_reresolution_requested); - glb_policy->rr_policy = new_rr_policy; - grpc_error* rr_state_error = nullptr; - glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked( - glb_policy->rr_policy, &rr_state_error); - /* Connectivity state is a function of the RR policy updated/created */ - update_lb_connectivity_status_locked(glb_policy, rr_state_error); - /* Add the gRPC LB's interested_parties pollset_set to that of the newly - * created RR policy. This will make the RR policy progress upon activity on - * gRPC LB, which in turn is tied to the application's call */ - grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties, - glb_policy->base.interested_parties); - /* Subscribe to changes to the connectivity of the new RR */ - GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_connectivity_changed_locked"); - grpc_lb_policy_notify_on_state_change_locked( - glb_policy->rr_policy, &glb_policy->rr_connectivity_state, - &glb_policy->rr_on_connectivity_changed); - grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy); - // Send pending picks to RR policy. - pending_pick* pp; - while ((pp = glb_policy->pending_picks)) { - glb_policy->pending_picks = pp->next; - if (grpc_lb_glb_trace.enabled()) { + grpc_byte_buffer_reader bbr; + grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_); + grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); + grpc_byte_buffer_reader_destroy(&bbr); + grpc_byte_buffer_destroy(lb_calld->recv_message_payload_); + lb_calld->recv_message_payload_ = nullptr; + grpc_grpclb_initial_response* initial_response; + grpc_grpclb_serverlist* serverlist; + if (!lb_calld->seen_initial_response_ && + (initial_response = grpc_grpclb_initial_response_parse(response_slice)) != + nullptr) { + // Have NOT seen initial response, look for initial response. + if (initial_response->has_client_stats_report_interval) { + lb_calld->client_stats_report_interval_ = GPR_MAX( + GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis( + &initial_response->client_stats_report_interval)); + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, + "[grpclb %p] Received initial LB response message; " + "client load reporting interval = %" PRIdPTR " milliseconds", + grpclb_policy, lb_calld->client_stats_report_interval_); + } + } else if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Pending pick about to (async) PICK from RR %p", - glb_policy, glb_policy->rr_policy); + "[grpclb %p] Received initial LB response message; client load " + "reporting NOT enabled", + grpclb_policy); } - pick_from_internal_rr_locked(glb_policy, true /* force_async */, pp); - } - // Send pending pings to RR policy. - pending_ping* pping; - while ((pping = glb_policy->pending_pings)) { - glb_policy->pending_pings = pping->next; + grpc_grpclb_initial_response_destroy(initial_response); + lb_calld->seen_initial_response_ = true; + } else if ((serverlist = grpc_grpclb_response_parse_serverlist( + response_slice)) != nullptr) { + // Have seen initial response, look for serverlist. + GPR_ASSERT(lb_calld->lb_call_ != nullptr); if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", - glb_policy, glb_policy->rr_policy); + gpr_log(GPR_INFO, + "[grpclb %p] Serverlist with %" PRIuPTR " servers received", + grpclb_policy, serverlist->num_servers); + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_resolved_address addr; + ParseServer(serverlist->servers[i], &addr); + char* ipport; + grpc_sockaddr_to_string(&ipport, &addr, false); + gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s", + grpclb_policy, i, ipport); + gpr_free(ipport); + } } - grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, pping->on_initiate, - pping->on_ack); - gpr_free(pping); - } -} - -/* glb_policy->rr_policy may be nullptr (initial handover) */ -static void rr_handover_locked(glb_lb_policy* glb_policy) { - if (glb_policy->shutting_down) return; - grpc_lb_policy_args* args = lb_policy_args_create(glb_policy); - GPR_ASSERT(args != nullptr); - if (glb_policy->rr_policy != nullptr) { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", glb_policy, - glb_policy->rr_policy); + /* update serverlist */ + if (serverlist->num_servers > 0) { + // Start sending client load report only after we start using the + // serverlist returned from the current LB call. + if (lb_calld->client_stats_report_interval_ > 0 && + lb_calld->client_stats_ == nullptr) { + lb_calld->client_stats_ = grpc_grpclb_client_stats_create(); + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); + self.release(); + lb_calld->ScheduleNextClientLoadReportLocked(); + } + if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, + serverlist)) { + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, + "[grpclb %p] Incoming server list identical to current, " + "ignoring.", + grpclb_policy); + } + grpc_grpclb_destroy_serverlist(serverlist); + } else { /* new serverlist */ + if (grpclb_policy->serverlist_ != nullptr) { + /* dispose of the old serverlist */ + grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); + } else { + /* or dispose of the fallback */ + grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_); + grpclb_policy->fallback_backend_addresses_ = nullptr; + if (grpclb_policy->fallback_timer_callback_pending_) { + grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); + } + } + // and update the copy in the GrpcLb instance. This + // serverlist instance will be destroyed either upon the next + // update or when the GrpcLb instance is destroyed. + grpclb_policy->serverlist_ = serverlist; + grpclb_policy->serverlist_index_ = 0; + grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); + } + } else { + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.", + grpclb_policy); + } + grpc_grpclb_destroy_serverlist(serverlist); } - grpc_lb_policy_update_locked(glb_policy->rr_policy, args); } else { - create_rr_locked(glb_policy, args); - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", glb_policy, - glb_policy->rr_policy); - } + // No valid initial response or serverlist found. + gpr_log(GPR_ERROR, + "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", + grpclb_policy, + grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); + } + grpc_slice_unref_internal(response_slice); + if (!grpclb_policy->shutting_down_) { + // Keep listening for serverlist updates. + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_RECV_MESSAGE; + op.data.recv_message.recv_message = &lb_calld->recv_message_payload_; + op.flags = 0; + op.reserved = nullptr; + // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery(). + const grpc_call_error call_error = grpc_call_start_batch_and_execute( + lb_calld->lb_call_, &op, 1, + &lb_calld->lb_on_balancer_message_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } else { + lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown"); } - lb_policy_args_destroy(args); } -static void rr_on_connectivity_changed_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - if (glb_policy->shutting_down) { - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "rr_on_connectivity_changed_locked"); - return; +void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( + void* arg, grpc_error* error) { + BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg); + GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); + GPR_ASSERT(lb_calld->lb_call_ != nullptr); + if (grpc_lb_glb_trace.enabled()) { + char* status_details = + grpc_slice_to_c_string(lb_calld->lb_call_status_details_); + gpr_log(GPR_INFO, + "[grpclb %p] Status from LB server received. Status = %d, details " + "= '%s', (lb_calld: %p, lb_call: %p), error '%s'", + grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld, + lb_calld->lb_call_, grpc_error_string(error)); + gpr_free(status_details); + } + grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE); + // If this lb_calld is still in use, this call ended because of a failure so + // we want to retry connecting. Otherwise, we have deliberately ended this + // call and no further action is required. + if (lb_calld == grpclb_policy->lb_calld_.get()) { + grpclb_policy->lb_calld_.reset(); + GPR_ASSERT(!grpclb_policy->shutting_down_); + if (lb_calld->seen_initial_response_) { + // If we lose connection to the LB server, reset the backoff and restart + // the LB call immediately. + grpclb_policy->lb_call_backoff_.Reset(); + grpclb_policy->StartBalancerCallLocked(); + } else { + // If this LB call fails establishing any connection to the LB server, + // retry later. + grpclb_policy->StartBalancerCallRetryTimerLocked(); + } } - update_lb_connectivity_status_locked(glb_policy, GRPC_ERROR_REF(error)); - // Resubscribe. Reuse the "rr_on_connectivity_changed_locked" ref. - grpc_lb_policy_notify_on_state_change_locked( - glb_policy->rr_policy, &glb_policy->rr_connectivity_state, - &glb_policy->rr_on_connectivity_changed); + lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended"); } -static void destroy_balancer_name(void* balancer_name) { - gpr_free(balancer_name); -} +// +// helper code for creating balancer channel +// -static grpc_slice_hash_table_entry targets_info_entry_create( - const char* address, const char* balancer_name) { +// Helper function to construct a target info entry. +grpc_slice_hash_table_entry BalancerEntryCreate(const char* address, + const char* balancer_name) { grpc_slice_hash_table_entry entry; entry.key = grpc_slice_from_copied_string(address); entry.value = gpr_strdup(balancer_name); return entry; } -static int balancer_name_cmp_fn(void* a, void* b) { +// Comparison function used for slice_hash_table vtable. +int BalancerNameCmp(void* a, void* b) { const char* a_str = static_cast<const char*>(a); const char* b_str = static_cast<const char*>(b); return strcmp(a_str, b_str); @@ -894,24 +942,22 @@ static int balancer_name_cmp_fn(void* a, void* b) { * - \a response_generator: in order to propagate updates from the resolver * above the grpclb policy. * - \a args: other args inherited from the grpclb policy. */ -static grpc_channel_args* build_lb_channel_args( +grpc_channel_args* BuildBalancerChannelArgs( const grpc_lb_addresses* addresses, - grpc_core::FakeResolverResponseGenerator* response_generator, + FakeResolverResponseGenerator* response_generator, const grpc_channel_args* args) { size_t num_grpclb_addrs = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; } - /* All input addresses come from a resolver that claims they are LB services. - * It's the resolver's responsibility to make sure this policy is only - * instantiated and used in that case. Otherwise, something has gone wrong. */ + // There must be at least one balancer address, or else the + // client_channel would not have chosen this LB policy. GPR_ASSERT(num_grpclb_addrs > 0); grpc_lb_addresses* lb_addresses = grpc_lb_addresses_create(num_grpclb_addrs, nullptr); grpc_slice_hash_table_entry* targets_info_entries = - static_cast<grpc_slice_hash_table_entry*>( - gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs)); - + (grpc_slice_hash_table_entry*)gpr_zalloc(sizeof(*targets_info_entries) * + num_grpclb_addrs); size_t lb_addresses_idx = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { if (!addresses->addresses[i].is_balancer) continue; @@ -922,28 +968,23 @@ static grpc_channel_args* build_lb_channel_args( char* addr_str; GPR_ASSERT(grpc_sockaddr_to_string( &addr_str, &addresses->addresses[i].address, true) > 0); - targets_info_entries[lb_addresses_idx] = targets_info_entry_create( - addr_str, addresses->addresses[i].balancer_name); + targets_info_entries[lb_addresses_idx] = + BalancerEntryCreate(addr_str, addresses->addresses[i].balancer_name); gpr_free(addr_str); - grpc_lb_addresses_set_address( lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, addresses->addresses[i].address.len, false /* is balancer */, addresses->addresses[i].balancer_name, nullptr /* user data */); } GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); - grpc_slice_hash_table* targets_info = - grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries, - destroy_balancer_name, balancer_name_cmp_fn); + grpc_slice_hash_table* targets_info = grpc_slice_hash_table_create( + num_grpclb_addrs, targets_info_entries, gpr_free, BalancerNameCmp); gpr_free(targets_info_entries); - grpc_channel_args* lb_channel_args = grpc_lb_policy_grpclb_build_lb_channel_args(targets_info, response_generator, args); - grpc_arg lb_channel_addresses_arg = grpc_lb_addresses_create_channel_arg(lb_addresses); - grpc_channel_args* result = grpc_channel_args_copy_and_add( lb_channel_args, &lb_channel_addresses_arg, 1); grpc_slice_hash_table_unref(targets_info); @@ -952,121 +993,162 @@ static grpc_channel_args* build_lb_channel_args( return result; } -static void glb_destroy(grpc_lb_policy* pol) { - glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol); - GPR_ASSERT(glb_policy->pending_picks == nullptr); - GPR_ASSERT(glb_policy->pending_pings == nullptr); - gpr_free((void*)glb_policy->server_name); - grpc_channel_args_destroy(glb_policy->args); - grpc_connectivity_state_destroy(&glb_policy->state_tracker); - if (glb_policy->serverlist != nullptr) { - grpc_grpclb_destroy_serverlist(glb_policy->serverlist); - } - if (glb_policy->fallback_backend_addresses != nullptr) { - grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); - } - // TODO(roth): Remove this once the LB policy becomes a C++ object. - glb_policy->response_generator.reset(); +// +// ctor and dtor +// + +GrpcLb::GrpcLb(const grpc_lb_addresses* addresses, + const LoadBalancingPolicy::Args& args) + : LoadBalancingPolicy(args), + response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), + lb_call_backoff_( + BackOff::Options() + .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * + 1000) + .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) + .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * + 1000)) { + // Initialization. + grpc_subchannel_index_ref(); + GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, + &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this, + grpc_combiner_scheduler(args.combiner)); + GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_, + &GrpcLb::OnRoundRobinConnectivityChangedLocked, this, + grpc_combiner_scheduler(args.combiner)); + GRPC_CLOSURE_INIT(&on_rr_request_reresolution_, + &GrpcLb::OnRoundRobinRequestReresolutionLocked, this, + grpc_combiner_scheduler(args.combiner)); + grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "grpclb"); + // Record server name. + const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); + const char* server_uri = grpc_channel_arg_get_string(arg); + GPR_ASSERT(server_uri != nullptr); + grpc_uri* uri = grpc_uri_parse(server_uri, true); + GPR_ASSERT(uri->path[0] != '\0'); + server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, + "[grpclb %p] Will use '%s' as the server name for LB request.", + this, server_name_); + } + grpc_uri_destroy(uri); + // Record LB call timeout. + arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); + lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX}); + // Record fallback timeout. + arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); + lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer( + arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}); + // Process channel args. + ProcessChannelArgsLocked(*args.args); +} + +GrpcLb::~GrpcLb() { + GPR_ASSERT(pending_picks_ == nullptr); + GPR_ASSERT(pending_pings_ == nullptr); + gpr_free((void*)server_name_); + grpc_channel_args_destroy(args_); + grpc_connectivity_state_destroy(&state_tracker_); + if (serverlist_ != nullptr) { + grpc_grpclb_destroy_serverlist(serverlist_); + } + if (fallback_backend_addresses_ != nullptr) { + grpc_lb_addresses_destroy(fallback_backend_addresses_); + } grpc_subchannel_index_unref(); - gpr_free(glb_policy); } -static void glb_shutdown_locked(grpc_lb_policy* pol, - grpc_lb_policy* new_policy) { - glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol); +void GrpcLb::ShutdownLocked() { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); - glb_policy->shutting_down = true; - if (glb_policy->lb_calld != nullptr) { - lb_call_data_shutdown(glb_policy); - } - if (glb_policy->retry_timer_callback_pending) { - grpc_timer_cancel(&glb_policy->lb_call_retry_timer); - } - if (glb_policy->fallback_timer_callback_pending) { - grpc_timer_cancel(&glb_policy->lb_fallback_timer); - } - if (glb_policy->rr_policy != nullptr) { - grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr); - GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown"); - } - // We destroy the LB channel here because - // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy - // instance. Destroying the lb channel in glb_destroy would likely result in - // a callback invocation without a valid glb_policy arg. - if (glb_policy->lb_channel != nullptr) { - grpc_channel_destroy(glb_policy->lb_channel); - glb_policy->lb_channel = nullptr; - } - grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_REF(error), "glb_shutdown"); - grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); + shutting_down_ = true; + lb_calld_.reset(); + if (retry_timer_callback_pending_) { + grpc_timer_cancel(&lb_call_retry_timer_); + } + if (fallback_timer_callback_pending_) { + grpc_timer_cancel(&lb_fallback_timer_); + } + rr_policy_.reset(); + TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); + // We destroy the LB channel here instead of in our destructor because + // destroying the channel triggers a last callback to + // OnBalancerChannelConnectivityChangedLocked(), and we need to be + // alive when that callback is invoked. + if (lb_channel_ != nullptr) { + grpc_channel_destroy(lb_channel_); + lb_channel_ = nullptr; + } + grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_REF(error), "grpclb_shutdown"); // Clear pending picks. - pending_pick* pp = glb_policy->pending_picks; - glb_policy->pending_picks = nullptr; - while (pp != nullptr) { - pending_pick* next = pp->next; - if (new_policy != nullptr) { - // Hand pick over to new policy. - if (pp->client_stats != nullptr) { - grpc_grpclb_client_stats_unref(pp->client_stats); - } - pp->pick->on_complete = pp->original_on_complete; - if (grpc_lb_policy_pick_locked(new_policy, pp->pick)) { - // Synchronous return; schedule callback. - GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE); - } - gpr_free(pp); - } else { - pp->pick->connected_subchannel.reset(); - GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); - } - pp = next; + PendingPick* pp; + while ((pp = pending_picks_) != nullptr) { + pending_picks_ = pp->next; + pp->pick->connected_subchannel.reset(); + // Note: pp is deleted in this callback. + GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); } // Clear pending pings. - pending_ping* pping = glb_policy->pending_pings; - glb_policy->pending_pings = nullptr; - while (pping != nullptr) { - pending_ping* next = pping->next; + PendingPing* pping; + while ((pping = pending_pings_) != nullptr) { + pending_pings_ = pping->next; GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error)); - gpr_free(pping); - pping = next; + Delete(pping); } GRPC_ERROR_UNREF(error); } +// +// public methods +// + +void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { + PendingPick* pp; + while ((pp = pending_picks_) != nullptr) { + pending_picks_ = pp->next; + pp->pick->on_complete = pp->original_on_complete; + pp->pick->user_data = nullptr; + if (new_policy->PickLocked(pp->pick)) { + // Synchronous return; schedule closure. + GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE); + } + Delete(pp); + } +} + // Cancel a specific pending pick. // // A grpclb pick progresses as follows: -// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be -// handed over to the RR policy (in create_rr_locked()). From that point -// onwards, it'll be RR's responsibility. For cancellations, that implies the -// pick needs also be cancelled by the RR instance. +// - If there's a Round Robin policy (rr_policy_) available, it'll be +// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From +// that point onwards, it'll be RR's responsibility. For cancellations, that +// implies the pick needs also be cancelled by the RR instance. // - Otherwise, without an RR instance, picks stay pending at this policy's -// level (grpclb), inside the glb_policy->pending_picks list. To cancel these, -// we invoke the completion closure and set *target to nullptr right here. -static void glb_cancel_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick, - grpc_error* error) { - glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol); - pending_pick* pp = glb_policy->pending_picks; - glb_policy->pending_picks = nullptr; +// level (grpclb), inside the pending_picks_ list. To cancel these, +// we invoke the completion closure and set the pick's connected +// subchannel to nullptr right here. +void GrpcLb::CancelPickLocked(PickState* pick, grpc_error* error) { + PendingPick* pp = pending_picks_; + pending_picks_ = nullptr; while (pp != nullptr) { - pending_pick* next = pp->next; + PendingPick* next = pp->next; if (pp->pick == pick) { pick->connected_subchannel.reset(); + // Note: pp is deleted in this callback. GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { - pp->next = glb_policy->pending_picks; - glb_policy->pending_picks = pp; + pp->next = pending_picks_; + pending_picks_ = pp; } pp = next; } - if (glb_policy->rr_policy != nullptr) { - grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, pick, - GRPC_ERROR_REF(error)); + if (rr_policy_ != nullptr) { + rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } @@ -1074,863 +1156,710 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol, // Cancel all pending picks. // // A grpclb pick progresses as follows: -// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be -// handed over to the RR policy (in create_rr_locked()). From that point -// onwards, it'll be RR's responsibility. For cancellations, that implies the -// pick needs also be cancelled by the RR instance. +// - If there's a Round Robin policy (rr_policy_) available, it'll be +// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From +// that point onwards, it'll be RR's responsibility. For cancellations, that +// implies the pick needs also be cancelled by the RR instance. // - Otherwise, without an RR instance, picks stay pending at this policy's -// level (grpclb), inside the glb_policy->pending_picks list. To cancel these, -// we invoke the completion closure and set *target to nullptr right here. -static void glb_cancel_picks_locked(grpc_lb_policy* pol, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error* error) { - glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol); - pending_pick* pp = glb_policy->pending_picks; - glb_policy->pending_picks = nullptr; +// level (grpclb), inside the pending_picks_ list. To cancel these, +// we invoke the completion closure and set the pick's connected +// subchannel to nullptr right here. +void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error* error) { + PendingPick* pp = pending_picks_; + pending_picks_ = nullptr; while (pp != nullptr) { - pending_pick* next = pp->next; + PendingPick* next = pp->next; if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { + // Note: pp is deleted in this callback. GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { - pp->next = glb_policy->pending_picks; - glb_policy->pending_picks = pp; + pp->next = pending_picks_; + pending_picks_ = pp; } pp = next; } - if (glb_policy->rr_policy != nullptr) { - grpc_lb_policy_cancel_picks_locked( - glb_policy->rr_policy, initial_metadata_flags_mask, - initial_metadata_flags_eq, GRPC_ERROR_REF(error)); + if (rr_policy_ != nullptr) { + rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask, + initial_metadata_flags_eq, + GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } -static void lb_on_fallback_timer_locked(void* arg, grpc_error* error); -static void query_for_backends_locked(glb_lb_policy* glb_policy); -static void start_picking_locked(glb_lb_policy* glb_policy) { - /* start a timer to fall back */ - if (glb_policy->lb_fallback_timeout_ms > 0 && - glb_policy->serverlist == nullptr && - !glb_policy->fallback_timer_callback_pending) { - grpc_millis deadline = - grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms; - GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_fallback_timer"); - GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, - glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); - glb_policy->fallback_timer_callback_pending = true; - grpc_timer_init(&glb_policy->lb_fallback_timer, deadline, - &glb_policy->lb_on_fallback); - } - glb_policy->started_picking = true; - glb_policy->lb_call_backoff->Reset(); - query_for_backends_locked(glb_policy); -} - -static void glb_exit_idle_locked(grpc_lb_policy* pol) { - glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol); - if (!glb_policy->started_picking) { - start_picking_locked(glb_policy); +void GrpcLb::ExitIdleLocked() { + if (!started_picking_) { + StartPickingLocked(); } } -static int glb_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick) { - glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol); - pending_pick* pp = pending_pick_create(glb_policy, pick); +bool GrpcLb::PickLocked(PickState* pick) { + PendingPick* pp = PendingPickCreate(pick); bool pick_done = false; - if (glb_policy->rr_policy != nullptr) { + if (rr_policy_ != nullptr) { const grpc_connectivity_state rr_connectivity_state = - grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy, - nullptr); - // The glb_policy->rr_policy may have transitioned to SHUTDOWN but the - // callback registered to capture this event - // (on_rr_connectivity_changed_locked) may not have been invoked yet. We - // need to make sure we aren't trying to pick from a RR policy instance - // that's in shutdown. + rr_policy_->CheckConnectivityLocked(nullptr); + // The RR policy may have transitioned to SHUTDOWN but the callback + // registered to capture this event (on_rr_connectivity_changed_) may not + // have been invoked yet. We need to make sure we aren't trying to pick + // from an RR policy instance that's in shutdown. if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] NOT picking from from RR %p: RR conn state=%s", - glb_policy, glb_policy->rr_policy, + this, rr_policy_.get(), grpc_connectivity_state_name(rr_connectivity_state)); } - pending_pick_add(&glb_policy->pending_picks, pp); + AddPendingPick(pp); pick_done = false; } else { // RR not in shutdown if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy, - glb_policy->rr_policy); + gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this, + rr_policy_.get()); } - pick_done = - pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp); + pick_done = PickFromRoundRobinPolicyLocked(false /* force_async */, pp); } - } else { // glb_policy->rr_policy == NULL + } else { // rr_policy_ == NULL if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] No RR policy. Adding to grpclb's pending picks", - glb_policy); + this); } - pending_pick_add(&glb_policy->pending_picks, pp); - if (!glb_policy->started_picking) { - start_picking_locked(glb_policy); + AddPendingPick(pp); + if (!started_picking_) { + StartPickingLocked(); } pick_done = false; } return pick_done; } -static grpc_connectivity_state glb_check_connectivity_locked( - grpc_lb_policy* pol, grpc_error** connectivity_error) { - glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol); - return grpc_connectivity_state_get(&glb_policy->state_tracker, - connectivity_error); -} - -static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, - grpc_closure* on_ack) { - glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol); - if (glb_policy->rr_policy) { - grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); +void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { + if (rr_policy_ != nullptr) { + rr_policy_->PingOneLocked(on_initiate, on_ack); } else { - pending_ping_add(&glb_policy->pending_pings, on_initiate, on_ack); - if (!glb_policy->started_picking) { - start_picking_locked(glb_policy); + AddPendingPing(on_initiate, on_ack); + if (!started_picking_) { + StartPickingLocked(); } } } -static void glb_notify_on_state_change_locked(grpc_lb_policy* pol, - grpc_connectivity_state* current, - grpc_closure* notify) { - glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol); - grpc_connectivity_state_notify_on_state_change(&glb_policy->state_tracker, - current, notify); -} - -static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg); - glb_policy->retry_timer_callback_pending = false; - if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE && - glb_policy->lb_calld == nullptr) { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy); - } - query_for_backends_locked(glb_policy); - } - GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer"); -} - -static void start_lb_call_retry_timer_locked(glb_lb_policy* glb_policy) { - grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime(); - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", - glb_policy); - grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); - if (timeout > 0) { - gpr_log(GPR_DEBUG, - "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", - glb_policy, timeout); - } else { - gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.", - glb_policy); - } - } - GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer"); - GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry, - lb_call_on_retry_timer_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); - glb_policy->retry_timer_callback_pending = true; - grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try, - &glb_policy->lb_on_call_retry); +grpc_connectivity_state GrpcLb::CheckConnectivityLocked( + grpc_error** connectivity_error) { + return grpc_connectivity_state_get(&state_tracker_, connectivity_error); } -static void maybe_send_client_load_report_locked(void* arg, grpc_error* error); - -static void schedule_next_client_load_report(glb_lb_call_data* lb_calld) { - const grpc_millis next_client_load_report_time = - grpc_core::ExecCtx::Get()->Now() + lb_calld->client_stats_report_interval; - GRPC_CLOSURE_INIT( - &lb_calld->client_load_report_closure, - maybe_send_client_load_report_locked, lb_calld, - grpc_combiner_scheduler(lb_calld->glb_policy->base.combiner)); - grpc_timer_init(&lb_calld->client_load_report_timer, - next_client_load_report_time, - &lb_calld->client_load_report_closure); - lb_calld->client_load_report_timer_callback_pending = true; +void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, + grpc_closure* notify) { + grpc_connectivity_state_notify_on_state_change(&state_tracker_, current, + notify); } -static void client_load_report_done_locked(void* arg, grpc_error* error) { - glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg); - glb_lb_policy* glb_policy = lb_calld->glb_policy; - grpc_byte_buffer_destroy(lb_calld->send_message_payload); - lb_calld->send_message_payload = nullptr; - if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) { - glb_lb_call_data_unref(lb_calld, "client_load_report"); +void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { + const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); + if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { + // Ignore this update. + gpr_log( + GPR_ERROR, + "[grpclb %p] No valid LB addresses channel arg in update, ignoring.", + this); return; } - schedule_next_client_load_report(lb_calld); -} - -static bool load_report_counters_are_zero(grpc_grpclb_request* request) { - grpc_grpclb_dropped_call_counts* drop_entries = - static_cast<grpc_grpclb_dropped_call_counts*>( - request->client_stats.calls_finished_with_drop.arg); - return request->client_stats.num_calls_started == 0 && - request->client_stats.num_calls_finished == 0 && - request->client_stats.num_calls_finished_with_client_failed_to_send == - 0 && - request->client_stats.num_calls_finished_known_received == 0 && - (drop_entries == nullptr || drop_entries->num_entries == 0); -} - -static void send_client_load_report_locked(glb_lb_call_data* lb_calld) { - glb_lb_policy* glb_policy = lb_calld->glb_policy; - // Construct message payload. - GPR_ASSERT(lb_calld->send_message_payload == nullptr); - grpc_grpclb_request* request = - grpc_grpclb_load_report_request_create_locked(lb_calld->client_stats); - // Skip client load report if the counters were all zero in the last - // report and they are still zero in this one. - if (load_report_counters_are_zero(request)) { - if (lb_calld->last_client_load_report_counters_were_zero) { - grpc_grpclb_request_destroy(request); - schedule_next_client_load_report(lb_calld); - return; - } - lb_calld->last_client_load_report_counters_were_zero = true; - } else { - lb_calld->last_client_load_report_counters_were_zero = false; + const grpc_lb_addresses* addresses = + reinterpret_cast<const grpc_lb_addresses*>(arg->value.pointer.p); + // Update fallback address list. + if (fallback_backend_addresses_ != nullptr) { + grpc_lb_addresses_destroy(fallback_backend_addresses_); } - grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); - lb_calld->send_message_payload = - grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_slice_unref_internal(request_payload_slice); - grpc_grpclb_request_destroy(request); - // Send the report. - grpc_op op; - memset(&op, 0, sizeof(op)); - op.op = GRPC_OP_SEND_MESSAGE; - op.data.send_message.send_message = lb_calld->send_message_payload; - GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure, - client_load_report_done_locked, lb_calld, - grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_call_error call_error = grpc_call_start_batch_and_execute( - lb_calld->lb_call, &op, 1, &lb_calld->client_load_report_closure); - if (call_error != GRPC_CALL_OK) { - gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error); - GPR_ASSERT(GRPC_CALL_OK == call_error); + fallback_backend_addresses_ = ExtractBackendAddresses(addresses); + // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, + // since we use this to trigger the client_load_reporting filter. + static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; + grpc_arg new_arg = grpc_channel_arg_string_create( + (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb"); + grpc_channel_args_destroy(args_); + args_ = grpc_channel_args_copy_and_add_and_remove( + &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); + // Construct args for balancer channel. + grpc_channel_args* lb_channel_args = + BuildBalancerChannelArgs(addresses, response_generator_.get(), &args); + // Create balancer channel if needed. + if (lb_channel_ == nullptr) { + char* uri_str; + gpr_asprintf(&uri_str, "fake:///%s", server_name_); + lb_channel_ = grpc_lb_policy_grpclb_create_lb_channel( + uri_str, client_channel_factory(), lb_channel_args); + GPR_ASSERT(lb_channel_ != nullptr); + gpr_free(uri_str); } + // Propagate updates to the LB channel (pick_first) through the fake + // resolver. + response_generator_->SetResponse(lb_channel_args); + grpc_channel_args_destroy(lb_channel_args); } -static void maybe_send_client_load_report_locked(void* arg, grpc_error* error) { - glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg); - glb_lb_policy* glb_policy = lb_calld->glb_policy; - lb_calld->client_load_report_timer_callback_pending = false; - if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) { - glb_lb_call_data_unref(lb_calld, "client_load_report"); - return; +void GrpcLb::UpdateLocked(const grpc_channel_args& args) { + ProcessChannelArgsLocked(args); + // If fallback is configured and the RR policy already exists, update + // it with the new fallback addresses. + if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) { + CreateOrUpdateRoundRobinPolicyLocked(); } - // If we've already sent the initial request, then we can go ahead and send - // the load report. Otherwise, we need to wait until the initial request has - // been sent to send this (see lb_on_sent_initial_request_locked()). - if (lb_calld->send_message_payload == nullptr) { - send_client_load_report_locked(lb_calld); - } else { - lb_calld->client_load_report_is_due = true; + // Start watching the LB channel connectivity for connection, if not + // already doing so. + if (!watching_lb_channel_) { + lb_channel_connectivity_ = grpc_channel_check_connectivity_state( + lb_channel_, true /* try to connect */); + grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(lb_channel_)); + GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + watching_lb_channel_ = true; + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity"); + self.release(); + grpc_client_channel_watch_connectivity_state( + client_channel_elem, + grpc_polling_entity_create_from_pollset_set(interested_parties()), + &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_, + nullptr); } } -static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error); -static void lb_on_server_status_received_locked(void* arg, grpc_error* error); -static void lb_on_response_received_locked(void* arg, grpc_error* error); -static glb_lb_call_data* lb_call_data_create_locked(glb_lb_policy* glb_policy) { - GPR_ASSERT(!glb_policy->shutting_down); - // Init the LB call. Note that the LB call will progress every time there's - // activity in glb_policy->base.interested_parties, which is comprised of the - // polling entities from client_channel. - GPR_ASSERT(glb_policy->server_name != nullptr); - GPR_ASSERT(glb_policy->server_name[0] != '\0'); - grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name); - grpc_millis deadline = - glb_policy->lb_call_timeout_ms == 0 - ? GRPC_MILLIS_INF_FUTURE - : grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms; - glb_lb_call_data* lb_calld = - static_cast<glb_lb_call_data*>(gpr_zalloc(sizeof(*lb_calld))); - lb_calld->lb_call = grpc_channel_create_pollset_set_call( - glb_policy->lb_channel, nullptr, GRPC_PROPAGATE_DEFAULTS, - glb_policy->base.interested_parties, - GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, - &host, deadline, nullptr); - grpc_slice_unref_internal(host); - // Init the LB call request payload. - grpc_grpclb_request* request = - grpc_grpclb_request_create(glb_policy->server_name); - grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); - lb_calld->send_message_payload = - grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_slice_unref_internal(request_payload_slice); - grpc_grpclb_request_destroy(request); - // Init other data associated with the LB call. - lb_calld->glb_policy = glb_policy; - gpr_ref_init(&lb_calld->refs, 1); - grpc_metadata_array_init(&lb_calld->lb_initial_metadata_recv); - grpc_metadata_array_init(&lb_calld->lb_trailing_metadata_recv); - GRPC_CLOSURE_INIT(&lb_calld->lb_on_sent_initial_request, - lb_on_sent_initial_request_locked, lb_calld, - grpc_combiner_scheduler(glb_policy->base.combiner)); - GRPC_CLOSURE_INIT(&lb_calld->lb_on_response_received, - lb_on_response_received_locked, lb_calld, - grpc_combiner_scheduler(glb_policy->base.combiner)); - GRPC_CLOSURE_INIT(&lb_calld->lb_on_server_status_received, - lb_on_server_status_received_locked, lb_calld, - grpc_combiner_scheduler(glb_policy->base.combiner)); - // Hold a ref to the glb_policy. - GRPC_LB_POLICY_REF(&glb_policy->base, "lb_calld"); - return lb_calld; -} +// +// code for balancer channel and call +// -/* - * Auxiliary functions and LB client callbacks. - */ +void GrpcLb::StartPickingLocked() { + // Start a timer to fall back. + if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && + !fallback_timer_callback_pending_) { + grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = Ref(DEBUG_LOCATION, "on_fallback_timer"); + self.release(); + GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this, + grpc_combiner_scheduler(combiner())); + fallback_timer_callback_pending_ = true; + grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); + } + started_picking_ = true; + StartBalancerCallLocked(); +} -static void query_for_backends_locked(glb_lb_policy* glb_policy) { - GPR_ASSERT(glb_policy->lb_channel != nullptr); - if (glb_policy->shutting_down) return; +void GrpcLb::StartBalancerCallLocked() { + GPR_ASSERT(lb_channel_ != nullptr); + if (shutting_down_) return; // Init the LB call data. - GPR_ASSERT(glb_policy->lb_calld == nullptr); - glb_policy->lb_calld = lb_call_data_create_locked(glb_policy); + GPR_ASSERT(lb_calld_ == nullptr); + lb_calld_ = MakeOrphanable<BalancerCallState>(Ref()); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p, " - "lb_call: %p)", - glb_policy, glb_policy->lb_channel, glb_policy->lb_calld, - glb_policy->lb_calld->lb_call); - } - GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr); - // Create the ops. - grpc_call_error call_error; - grpc_op ops[3]; - memset(ops, 0, sizeof(ops)); - // Op: send initial metadata. - grpc_op* op = ops; - op->op = GRPC_OP_SEND_INITIAL_METADATA; - op->data.send_initial_metadata.count = 0; - op->flags = 0; - op->reserved = nullptr; - op++; - // Op: send request message. - GPR_ASSERT(glb_policy->lb_calld->send_message_payload != nullptr); - op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message.send_message = - glb_policy->lb_calld->send_message_payload; - op->flags = 0; - op->reserved = nullptr; - op++; - glb_lb_call_data_ref(glb_policy->lb_calld, - "lb_on_sent_initial_request_locked"); - call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_calld->lb_call, ops, static_cast<size_t>(op - ops), - &glb_policy->lb_calld->lb_on_sent_initial_request); - GPR_ASSERT(GRPC_CALL_OK == call_error); - // Op: recv initial metadata. - op = ops; - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata.recv_initial_metadata = - &glb_policy->lb_calld->lb_initial_metadata_recv; - op->flags = 0; - op->reserved = nullptr; - op++; - // Op: recv response. - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = - &glb_policy->lb_calld->recv_message_payload; - op->flags = 0; - op->reserved = nullptr; - op++; - glb_lb_call_data_ref(glb_policy->lb_calld, "lb_on_response_received_locked"); - call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_calld->lb_call, ops, static_cast<size_t>(op - ops), - &glb_policy->lb_calld->lb_on_response_received); - GPR_ASSERT(GRPC_CALL_OK == call_error); - // Op: recv server status. - op = ops; - op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; - op->data.recv_status_on_client.trailing_metadata = - &glb_policy->lb_calld->lb_trailing_metadata_recv; - op->data.recv_status_on_client.status = &glb_policy->lb_calld->lb_call_status; - op->data.recv_status_on_client.status_details = - &glb_policy->lb_calld->lb_call_status_details; - op->flags = 0; - op->reserved = nullptr; - op++; - // This callback signals the end of the LB call, so it relies on the initial - // ref instead of a new ref. When it's invoked, it's the initial ref that is - // unreffed. - call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_calld->lb_call, ops, static_cast<size_t>(op - ops), - &glb_policy->lb_calld->lb_on_server_status_received); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) { - glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg); - grpc_byte_buffer_destroy(lb_calld->send_message_payload); - lb_calld->send_message_payload = nullptr; - // If we attempted to send a client load report before the initial request was - // sent (and this lb_calld is still in use), send the load report now. - if (lb_calld->client_load_report_is_due && - lb_calld == lb_calld->glb_policy->lb_calld) { - send_client_load_report_locked(lb_calld); - lb_calld->client_load_report_is_due = false; + "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)", + this, lb_channel_, lb_calld_.get()); } - glb_lb_call_data_unref(lb_calld, "lb_on_sent_initial_request_locked"); + lb_calld_->StartQuery(); } -static void lb_on_response_received_locked(void* arg, grpc_error* error) { - glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg); - glb_lb_policy* glb_policy = lb_calld->glb_policy; - // Empty payload means the LB call was cancelled. - if (lb_calld != glb_policy->lb_calld || - lb_calld->recv_message_payload == nullptr) { - glb_lb_call_data_unref(lb_calld, "lb_on_response_received_locked"); - return; - } - grpc_op ops[2]; - memset(ops, 0, sizeof(ops)); - grpc_op* op = ops; - glb_policy->lb_call_backoff->Reset(); - grpc_byte_buffer_reader bbr; - grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload); - grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); - grpc_byte_buffer_reader_destroy(&bbr); - grpc_byte_buffer_destroy(lb_calld->recv_message_payload); - lb_calld->recv_message_payload = nullptr; - grpc_grpclb_initial_response* initial_response; - grpc_grpclb_serverlist* serverlist; - if (!lb_calld->seen_initial_response && - (initial_response = grpc_grpclb_initial_response_parse(response_slice)) != - nullptr) { - // Have NOT seen initial response, look for initial response. - if (initial_response->has_client_stats_report_interval) { - lb_calld->client_stats_report_interval = GPR_MAX( - GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis( - &initial_response->client_stats_report_interval)); - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Received initial LB response message; " - "client load reporting interval = %" PRIdPTR " milliseconds", - glb_policy, lb_calld->client_stats_report_interval); - } - } else if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Received initial LB response message; client load " - "reporting NOT enabled", - glb_policy); - } - grpc_grpclb_initial_response_destroy(initial_response); - lb_calld->seen_initial_response = true; - } else if ((serverlist = grpc_grpclb_response_parse_serverlist( - response_slice)) != nullptr) { - // Have seen initial response, look for serverlist. - GPR_ASSERT(lb_calld->lb_call != nullptr); +void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { + GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); + grpclb_policy->fallback_timer_callback_pending_ = false; + // If we receive a serverlist after the timer fires but before this callback + // actually runs, don't fall back. + if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ && + error == GRPC_ERROR_NONE) { if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Serverlist with %" PRIuPTR " servers received", - glb_policy, serverlist->num_servers); - for (size_t i = 0; i < serverlist->num_servers; ++i) { - grpc_resolved_address addr; - parse_server(serverlist->servers[i], &addr); - char* ipport; - grpc_sockaddr_to_string(&ipport, &addr, false); - gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s", - glb_policy, i, ipport); - gpr_free(ipport); - } - } - /* update serverlist */ - if (serverlist->num_servers > 0) { - // Start sending client load report only after we start using the - // serverlist returned from the current LB call. - if (lb_calld->client_stats_report_interval > 0 && - lb_calld->client_stats == nullptr) { - lb_calld->client_stats = grpc_grpclb_client_stats_create(); - glb_lb_call_data_ref(lb_calld, "client_load_report"); - schedule_next_client_load_report(lb_calld); - } - if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Incoming server list identical to current, " - "ignoring.", - glb_policy); - } - grpc_grpclb_destroy_serverlist(serverlist); - } else { /* new serverlist */ - if (glb_policy->serverlist != nullptr) { - /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(glb_policy->serverlist); - } else { - /* or dispose of the fallback */ - grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); - glb_policy->fallback_backend_addresses = nullptr; - if (glb_policy->fallback_timer_callback_pending) { - grpc_timer_cancel(&glb_policy->lb_fallback_timer); - glb_policy->fallback_timer_callback_pending = false; - } - } - /* and update the copy in the glb_lb_policy instance. This - * serverlist instance will be destroyed either upon the next - * update or in glb_destroy() */ - glb_policy->serverlist = serverlist; - glb_policy->serverlist_index = 0; - rr_handover_locked(glb_policy); - } - } else { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.", - glb_policy); - } - grpc_grpclb_destroy_serverlist(serverlist); + "[grpclb %p] Falling back to use backends from resolver", + grpclb_policy); } - } else { - // No valid initial response or serverlist found. - gpr_log(GPR_ERROR, - "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", - glb_policy, - grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); - } - grpc_slice_unref_internal(response_slice); - if (!glb_policy->shutting_down) { - // Keep listening for serverlist updates. - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &lb_calld->recv_message_payload; - op->flags = 0; - op->reserved = nullptr; - op++; - // Reuse the "lb_on_response_received_locked" ref taken in - // query_for_backends_locked(). - const grpc_call_error call_error = grpc_call_start_batch_and_execute( - lb_calld->lb_call, ops, static_cast<size_t>(op - ops), - &lb_calld->lb_on_response_received); - GPR_ASSERT(GRPC_CALL_OK == call_error); - } else { - glb_lb_call_data_unref(lb_calld, - "lb_on_response_received_locked+glb_shutdown"); + GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr); + grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); } + grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); } -static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { - glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg); - glb_lb_policy* glb_policy = lb_calld->glb_policy; - GPR_ASSERT(lb_calld->lb_call != nullptr); +void GrpcLb::StartBalancerCallRetryTimerLocked() { + grpc_millis next_try = lb_call_backoff_.NextAttemptTime(); if (grpc_lb_glb_trace.enabled()) { - char* status_details = - grpc_slice_to_c_string(lb_calld->lb_call_status_details); - gpr_log(GPR_INFO, - "[grpclb %p] Status from LB server received. Status = %d, details " - "= '%s', (lb_calld: %p, lb_call: %p), error '%s'", - lb_calld->glb_policy, lb_calld->lb_call_status, status_details, - lb_calld, lb_calld->lb_call, grpc_error_string(error)); - gpr_free(status_details); - } - grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace, - GRPC_ERROR_NONE); - // If this lb_calld is still in use, this call ended because of a failure so - // we want to retry connecting. Otherwise, we have deliberately ended this - // call and no further action is required. - if (lb_calld == glb_policy->lb_calld) { - glb_policy->lb_calld = nullptr; - if (lb_calld->client_load_report_timer_callback_pending) { - grpc_timer_cancel(&lb_calld->client_load_report_timer); - } - GPR_ASSERT(!glb_policy->shutting_down); - if (lb_calld->seen_initial_response) { - // If we lose connection to the LB server, reset the backoff and restart - // the LB call immediately. - glb_policy->lb_call_backoff->Reset(); - query_for_backends_locked(glb_policy); + gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", this); + grpc_millis timeout = next_try - ExecCtx::Get()->Now(); + if (timeout > 0) { + gpr_log(GPR_DEBUG, + "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", this, + timeout); } else { - // If this LB call fails establishing any connection to the LB server, - // retry later. - start_lb_call_retry_timer_locked(glb_policy); + gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.", + this); } } - glb_lb_call_data_unref(lb_calld, "lb_call_ended"); + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); + self.release(); + GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked, + this, grpc_combiner_scheduler(combiner())); + retry_timer_callback_pending_ = true; + grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_); } -static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg); - glb_policy->fallback_timer_callback_pending = false; - /* If we receive a serverlist after the timer fires but before this callback - * actually runs, don't fall back. */ - if (glb_policy->serverlist == nullptr && !glb_policy->shutting_down && - error == GRPC_ERROR_NONE) { +void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) { + GrpcLb* grpclb_policy = reinterpret_cast<GrpcLb*>(arg); + grpclb_policy->retry_timer_callback_pending_ = false; + if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE && + grpclb_policy->lb_calld_ == nullptr) { if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Falling back to use backends from resolver", - glb_policy); + gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", + grpclb_policy); } - GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr); - rr_handover_locked(glb_policy); - } - GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer"); -} - -static void fallback_update_locked(glb_lb_policy* glb_policy, - const grpc_lb_addresses* addresses) { - GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr); - grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); - glb_policy->fallback_backend_addresses = - extract_backend_addresses_locked(addresses); - if (glb_policy->lb_fallback_timeout_ms > 0 && - glb_policy->rr_policy != nullptr) { - rr_handover_locked(glb_policy); - } -} - -static void glb_update_locked(grpc_lb_policy* policy, - const grpc_lb_policy_args* args) { - glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(policy); - const grpc_arg* arg = - grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { - if (glb_policy->lb_channel == nullptr) { - // If we don't have a current channel to the LB, go into TRANSIENT - // FAILURE. - grpc_connectivity_state_set( - &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), - "glb_update_missing"); - } else { - // otherwise, keep using the current LB channel (ignore this update). - gpr_log( - GPR_ERROR, - "[grpclb %p] No valid LB addresses channel arg in update, ignoring.", - glb_policy); - } - return; - } - const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); - // If a non-empty serverlist hasn't been received from the balancer, - // propagate the update to fallback_backend_addresses. - if (glb_policy->serverlist == nullptr) { - fallback_update_locked(glb_policy, addresses); - } - GPR_ASSERT(glb_policy->lb_channel != nullptr); - // Propagate updates to the LB channel (pick_first) through the fake - // resolver. - grpc_channel_args* lb_channel_args = build_lb_channel_args( - addresses, glb_policy->response_generator.get(), args->args); - glb_policy->response_generator->SetResponse(lb_channel_args); - grpc_channel_args_destroy(lb_channel_args); - // Start watching the LB channel connectivity for connection, if not - // already doing so. - if (!glb_policy->watching_lb_channel) { - glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state( - glb_policy->lb_channel, true /* try to connect */); - grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(glb_policy->lb_channel)); - GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - glb_policy->watching_lb_channel = true; - GRPC_LB_POLICY_REF(&glb_policy->base, "watch_lb_channel_connectivity"); - grpc_client_channel_watch_connectivity_state( - client_channel_elem, - grpc_polling_entity_create_from_pollset_set( - glb_policy->base.interested_parties), - &glb_policy->lb_channel_connectivity, - &glb_policy->lb_channel_on_connectivity_changed, nullptr); + grpclb_policy->StartBalancerCallLocked(); } + grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); } // Invoked as part of the update process. It continues watching the LB channel // until it shuts down or becomes READY. It's invoked even if the LB channel // stayed READY throughout the update (for example if the update is identical). -static void glb_lb_channel_on_connectivity_changed_cb(void* arg, - grpc_error* error) { - glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg); - if (glb_policy->shutting_down) goto done; +void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, + grpc_error* error) { + GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); + if (grpclb_policy->shutting_down_) goto done; // Re-initialize the lb_call. This should also take care of updating the // embedded RR policy. Note that the current RR policy, if any, will stay in // effect until an update from the new lb_call is received. - switch (glb_policy->lb_channel_connectivity) { + switch (grpclb_policy->lb_channel_connectivity_) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_TRANSIENT_FAILURE: { // Keep watching the LB channel. grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(glb_policy->lb_channel)); + grpc_channel_get_channel_stack(grpclb_policy->lb_channel_)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); grpc_client_channel_watch_connectivity_state( client_channel_elem, grpc_polling_entity_create_from_pollset_set( - glb_policy->base.interested_parties), - &glb_policy->lb_channel_connectivity, - &glb_policy->lb_channel_on_connectivity_changed, nullptr); + grpclb_policy->interested_parties()), + &grpclb_policy->lb_channel_connectivity_, + &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr); break; } // The LB channel may be IDLE because it's shut down before the update. // Restart the LB call to kick the LB channel into gear. case GRPC_CHANNEL_IDLE: case GRPC_CHANNEL_READY: - if (glb_policy->lb_calld != nullptr) { - lb_call_data_shutdown(glb_policy); - } - if (glb_policy->started_picking) { - if (glb_policy->retry_timer_callback_pending) { - grpc_timer_cancel(&glb_policy->lb_call_retry_timer); + grpclb_policy->lb_calld_.reset(); + if (grpclb_policy->started_picking_) { + if (grpclb_policy->retry_timer_callback_pending_) { + grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_); } - glb_policy->lb_call_backoff->Reset(); - query_for_backends_locked(glb_policy); + grpclb_policy->lb_call_backoff_.Reset(); + grpclb_policy->StartBalancerCallLocked(); } // Fall through. case GRPC_CHANNEL_SHUTDOWN: done: - glb_policy->watching_lb_channel = false; - GRPC_LB_POLICY_UNREF(&glb_policy->base, + grpclb_policy->watching_lb_channel_ = false; + grpclb_policy->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity_cb_shutdown"); } } -/* Code wiring the policy with the rest of the core */ -static const grpc_lb_policy_vtable glb_lb_policy_vtable = { - glb_destroy, - glb_shutdown_locked, - glb_pick_locked, - glb_cancel_pick_locked, - glb_cancel_picks_locked, - glb_ping_one_locked, - glb_exit_idle_locked, - glb_check_connectivity_locked, - glb_notify_on_state_change_locked, - glb_update_locked}; - -static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory, - grpc_lb_policy_args* args) { - /* Count the number of gRPC-LB addresses. There must be at least one. */ - const grpc_arg* arg = - grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { - return nullptr; +// +// PendingPick +// + +// Adds lb_token of selected subchannel (address) to the call's initial +// metadata. +grpc_error* AddLbTokenToInitialMetadata( + grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage, + grpc_metadata_batch* initial_metadata) { + GPR_ASSERT(lb_token_mdelem_storage != nullptr); + GPR_ASSERT(!GRPC_MDISNULL(lb_token)); + return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, + lb_token); +} + +// Destroy function used when embedding client stats in call context. +void DestroyClientStats(void* arg) { + grpc_grpclb_client_stats_unref( + reinterpret_cast<grpc_grpclb_client_stats*>(arg)); +} + +void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) { + /* if connected_subchannel is nullptr, no pick has been made by the RR + * policy (e.g., all addresses failed to connect). There won't be any + * user_data/token available */ + if (pp->pick->connected_subchannel != nullptr) { + if (!GRPC_MDISNULL(pp->lb_token)) { + AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token), + &pp->pick->lb_token_mdelem_storage, + pp->pick->initial_metadata); + } else { + gpr_log(GPR_ERROR, + "[grpclb %p] No LB token for connected subchannel pick %p", + pp->grpclb_policy, pp->pick); + abort(); + } + // Pass on client stats via context. Passes ownership of the reference. + if (pp->client_stats != nullptr) { + pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = + pp->client_stats; + pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = + DestroyClientStats; + } + } else { + if (pp->client_stats != nullptr) { + grpc_grpclb_client_stats_unref(pp->client_stats); + } } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; +} + +/* The \a on_complete closure passed as part of the pick requires keeping a + * reference to its associated round robin instance. We wrap this closure in + * order to unref the round robin instance upon its invocation */ +void GrpcLb::OnPendingPickComplete(void* arg, grpc_error* error) { + PendingPick* pp = reinterpret_cast<PendingPick*>(arg); + PendingPickSetMetadataAndContext(pp); + GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error)); + Delete(pp); +} + +GrpcLb::PendingPick* GrpcLb::PendingPickCreate(PickState* pick) { + PendingPick* pp = New<PendingPick>(); + pp->grpclb_policy = this; + pp->pick = pick; + GRPC_CLOSURE_INIT(&pp->on_complete, &GrpcLb::OnPendingPickComplete, pp, + grpc_schedule_on_exec_ctx); + pp->original_on_complete = pick->on_complete; + pick->on_complete = &pp->on_complete; + return pp; +} + +void GrpcLb::AddPendingPick(PendingPick* pp) { + pp->next = pending_picks_; + pending_picks_ = pp; +} + +// +// PendingPing +// + +void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) { + PendingPing* pping = New<PendingPing>(); + pping->on_initiate = on_initiate; + pping->on_ack = on_ack; + pping->next = pending_pings_; + pending_pings_ = pping; +} + +// +// code for interacting with the RR policy +// + +// Performs a pick over \a rr_policy_. Given that a pick can return +// immediately (ignoring its completion callback), we need to perform the +// cleanups this callback would otherwise be responsible for. +// If \a force_async is true, then we will manually schedule the +// completion callback even if the pick is available immediately. +bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) { + // Check for drops if we are not using fallback backend addresses. + if (serverlist_ != nullptr) { + // Look at the index into the serverlist to see if we should drop this call. + grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++]; + if (serverlist_index_ == serverlist_->num_servers) { + serverlist_index_ = 0; // Wrap-around. + } + if (server->drop) { + // Update client load reporting stats to indicate the number of + // dropped calls. Note that we have to do this here instead of in + // the client_load_reporting filter, because we do not create a + // subchannel call (and therefore no client_load_reporting filter) + // for dropped calls. + if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { + grpc_grpclb_client_stats_add_call_dropped_locked( + server->load_balance_token, lb_calld_->client_stats()); + } + if (force_async) { + GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); + Delete(pp); + return false; + } + Delete(pp); + return true; + } } - if (num_grpclb_addrs == 0) return nullptr; + // Set client_stats and user_data. + if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { + pp->client_stats = grpc_grpclb_client_stats_ref(lb_calld_->client_stats()); + } + GPR_ASSERT(pp->pick->user_data == nullptr); + pp->pick->user_data = (void**)&pp->lb_token; + // Pick via the RR policy. + bool pick_done = rr_policy_->PickLocked(pp->pick); + if (pick_done) { + PendingPickSetMetadataAndContext(pp); + if (force_async) { + GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); + pick_done = false; + } + Delete(pp); + } + // else, the pending pick will be registered and taken care of by the + // pending pick list inside the RR policy. Eventually, + // OnPendingPickComplete() will be called, which will (among other + // things) add the LB token to the call's initial metadata. + return pick_done; +} - glb_lb_policy* glb_policy = - static_cast<glb_lb_policy*>(gpr_zalloc(sizeof(*glb_policy))); +void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { + GPR_ASSERT(rr_policy_ == nullptr); + rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + "round_robin", args); + if (rr_policy_ == nullptr) { + gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy", + this); + return; + } + // TODO(roth): We currently track this ref manually. Once the new + // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. + auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested"); + self.release(); + rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_); + grpc_error* rr_state_error = nullptr; + rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error); + // Connectivity state is a function of the RR policy updated/created. + UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error); + // Add the gRPC LB's interested_parties pollset_set to that of the newly + // created RR policy. This will make the RR policy progress upon activity on + // gRPC LB, which in turn is tied to the application's call. + grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(), + interested_parties()); + // Subscribe to changes to the connectivity of the new RR. + // TODO(roth): We currently track this ref manually. Once the new + // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. + self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed"); + self.release(); + rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_, + &on_rr_connectivity_changed_); + rr_policy_->ExitIdleLocked(); + // Send pending picks to RR policy. + PendingPick* pp; + while ((pp = pending_picks_)) { + pending_picks_ = pp->next; + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, + "[grpclb %p] Pending pick about to (async) PICK from RR %p", this, + rr_policy_.get()); + } + PickFromRoundRobinPolicyLocked(true /* force_async */, pp); + } + // Send pending pings to RR policy. + PendingPing* pping; + while ((pping = pending_pings_)) { + pending_pings_ = pping->next; + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", + this, rr_policy_.get()); + } + rr_policy_->PingOneLocked(pping->on_initiate, pping->on_ack); + Delete(pping); + } +} - /* Get server name. */ - arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); - const char* server_uri = grpc_channel_arg_get_string(arg); - GPR_ASSERT(server_uri != nullptr); - grpc_uri* uri = grpc_uri_parse(server_uri, true); - GPR_ASSERT(uri->path[0] != '\0'); - glb_policy->server_name = - gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Will use '%s' as the server name for LB request.", - glb_policy, glb_policy->server_name); +grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { + grpc_lb_addresses* addresses; + if (serverlist_ != nullptr) { + GPR_ASSERT(serverlist_->num_servers > 0); + addresses = ProcessServerlist(serverlist_); + } else { + // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't + // received any serverlist from the balancer, we use the fallback backends + // returned by the resolver. Note that the fallback backend list may be + // empty, in which case the new round_robin policy will keep the requested + // picks pending. + GPR_ASSERT(fallback_backend_addresses_ != nullptr); + addresses = grpc_lb_addresses_copy(fallback_backend_addresses_); } - grpc_uri_destroy(uri); + GPR_ASSERT(addresses != nullptr); + // Replace the LB addresses in the channel args that we pass down to + // the subchannel. + static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; + const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses); + grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( + args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1); + grpc_lb_addresses_destroy(addresses); + return args; +} - glb_policy->cc_factory = args->client_channel_factory; - GPR_ASSERT(glb_policy->cc_factory != nullptr); +void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { + if (shutting_down_) return; + grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked(); + GPR_ASSERT(args != nullptr); + if (rr_policy_ != nullptr) { + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", this, + rr_policy_.get()); + } + rr_policy_->UpdateLocked(*args); + } else { + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.combiner = combiner(); + lb_policy_args.client_channel_factory = client_channel_factory(); + lb_policy_args.args = args; + CreateRoundRobinPolicyLocked(lb_policy_args); + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", this, + rr_policy_.get()); + } + } + grpc_channel_args_destroy(args); +} - arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); - glb_policy->lb_call_timeout_ms = - grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX}); +void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg, + grpc_error* error) { + GrpcLb* grpclb_policy = reinterpret_cast<GrpcLb*>(arg); + if (grpclb_policy->shutting_down_ || error != GRPC_ERROR_NONE) { + grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested"); + return; + } + if (grpc_lb_glb_trace.enabled()) { + gpr_log( + GPR_DEBUG, + "[grpclb %p] Re-resolution requested from the internal RR policy (%p).", + grpclb_policy, grpclb_policy->rr_policy_.get()); + } + // If we are talking to a balancer, we expect to get updated addresses form + // the balancer, so we can ignore the re-resolution request from the RR + // policy. Otherwise, handle the re-resolution request using the + // grpclb policy's original re-resolution closure. + if (grpclb_policy->lb_calld_ == nullptr || + !grpclb_policy->lb_calld_->seen_initial_response()) { + grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE); + } + // Give back the wrapper closure to the RR policy. + grpclb_policy->rr_policy_->SetReresolutionClosureLocked( + &grpclb_policy->on_rr_request_reresolution_); +} - arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); - glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer( - arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}); +void GrpcLb::UpdateConnectivityStateFromRoundRobinPolicyLocked( + grpc_error* rr_state_error) { + const grpc_connectivity_state curr_glb_state = + grpc_connectivity_state_check(&state_tracker_); + /* The new connectivity status is a function of the previous one and the new + * input coming from the status of the RR policy. + * + * current state (grpclb's) + * | + * v || I | C | R | TF | SD | <- new state (RR's) + * ===++====+=====+=====+======+======+ + * I || I | C | R | [I] | [I] | + * ---++----+-----+-----+------+------+ + * C || I | C | R | [C] | [C] | + * ---++----+-----+-----+------+------+ + * R || I | C | R | [R] | [R] | + * ---++----+-----+-----+------+------+ + * TF || I | C | R | [TF] | [TF] | + * ---++----+-----+-----+------+------+ + * SD || NA | NA | NA | NA | NA | (*) + * ---++----+-----+-----+------+------+ + * + * A [STATE] indicates that the old RR policy is kept. In those cases, STATE + * is the current state of grpclb, which is left untouched. + * + * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to + * the previous RR instance. + * + * Note that the status is never updated to SHUTDOWN as a result of calling + * this function. Only glb_shutdown() has the power to set that state. + * + * (*) This function mustn't be called during shutting down. */ + GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); + switch (rr_connectivity_state_) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: + case GRPC_CHANNEL_SHUTDOWN: + GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); + break; + case GRPC_CHANNEL_IDLE: + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_READY: + GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); + } + if (grpc_lb_glb_trace.enabled()) { + gpr_log( + GPR_INFO, + "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.", + this, grpc_connectivity_state_name(rr_connectivity_state_), + rr_policy_.get()); + } + grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_, + rr_state_error, + "update_lb_connectivity_status_locked"); +} - // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, - // since we use this to trigger the client_load_reporting filter. - grpc_arg new_arg = grpc_channel_arg_string_create( - (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb"); - static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; - glb_policy->args = grpc_channel_args_copy_and_add_and_remove( - args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); - - /* Extract the backend addresses (may be empty) from the resolver for - * fallback. */ - glb_policy->fallback_backend_addresses = - extract_backend_addresses_locked(addresses); - - /* Create a client channel over them to communicate with a LB service */ - glb_policy->response_generator = - grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>(); - grpc_channel_args* lb_channel_args = build_lb_channel_args( - addresses, glb_policy->response_generator.get(), args->args); - char* uri_str; - gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name); - glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel( - uri_str, args->client_channel_factory, lb_channel_args); - - /* Propagate initial resolution */ - glb_policy->response_generator->SetResponse(lb_channel_args); - grpc_channel_args_destroy(lb_channel_args); - gpr_free(uri_str); - if (glb_policy->lb_channel == nullptr) { - gpr_free((void*)glb_policy->server_name); - grpc_channel_args_destroy(glb_policy->args); - gpr_free(glb_policy); - return nullptr; +void GrpcLb::OnRoundRobinConnectivityChangedLocked(void* arg, + grpc_error* error) { + GrpcLb* grpclb_policy = reinterpret_cast<GrpcLb*>(arg); + if (grpclb_policy->shutting_down_) { + grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed"); + return; } - grpc_subchannel_index_ref(); - GRPC_CLOSURE_INIT(&glb_policy->rr_on_connectivity_changed, - rr_on_connectivity_changed_locked, glb_policy, - grpc_combiner_scheduler(args->combiner)); - GRPC_CLOSURE_INIT(&glb_policy->rr_on_reresolution_requested, - rr_on_reresolution_requested_locked, glb_policy, - grpc_combiner_scheduler(args->combiner)); - GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, - glb_lb_channel_on_connectivity_changed_cb, glb_policy, - grpc_combiner_scheduler(args->combiner)); - grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); - grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, - "grpclb"); - // Init LB call backoff option. - grpc_core::BackOff::Options backoff_options; - backoff_options - .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) - .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) - .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) - .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); - glb_policy->lb_call_backoff.Init(backoff_options); - return &glb_policy->base; + grpclb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked( + GRPC_ERROR_REF(error)); + // Resubscribe. Reuse the "on_rr_connectivity_changed" ref. + grpclb_policy->rr_policy_->NotifyOnStateChangeLocked( + &grpclb_policy->rr_connectivity_state_, + &grpclb_policy->on_rr_connectivity_changed_); } -static void glb_factory_ref(grpc_lb_policy_factory* factory) {} +// +// factory +// -static void glb_factory_unref(grpc_lb_policy_factory* factory) {} +class GrpcLbFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( + const LoadBalancingPolicy::Args& args) const override { + /* Count the number of gRPC-LB addresses. There must be at least one. */ + const grpc_arg* arg = + grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES); + if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { + return nullptr; + } + grpc_lb_addresses* addresses = + reinterpret_cast<grpc_lb_addresses*>(arg->value.pointer.p); + size_t num_grpclb_addrs = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + } + if (num_grpclb_addrs == 0) return nullptr; + return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args)); + } -static const grpc_lb_policy_factory_vtable glb_factory_vtable = { - glb_factory_ref, glb_factory_unref, glb_create, "grpclb"}; + const char* name() const override { return "grpclb"; } +}; -static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable}; +} // namespace -grpc_lb_policy_factory* grpc_glb_lb_factory_create() { - return &glb_lb_policy_factory; -} +} // namespace grpc_core -/* Plugin registration */ +// +// Plugin registration +// + +namespace { // Only add client_load_reporting filter if the grpclb LB policy is used. -static bool maybe_add_client_load_reporting_filter( - grpc_channel_stack_builder* builder, void* arg) { +bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder, + void* arg) { const grpc_channel_args* args = grpc_channel_stack_builder_get_channel_arguments(builder); const grpc_arg* channel_arg = @@ -1938,14 +1867,18 @@ static bool maybe_add_client_load_reporting_filter( if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING && strcmp(channel_arg->value.string, "grpclb") == 0) { return grpc_channel_stack_builder_append_filter( - builder, static_cast<const grpc_channel_filter*>(arg), nullptr, - nullptr); + builder, (const grpc_channel_filter*)arg, nullptr, nullptr); } return true; } +} // namespace + void grpc_lb_policy_grpclb_init() { - grpc_register_lb_policy(grpc_glb_lb_factory_create()); + grpc_core::LoadBalancingPolicyRegistry::Builder:: + RegisterLoadBalancingPolicyFactory( + grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>( + grpc_core::New<grpc_core::GrpcLbFactory>())); grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, maybe_add_client_load_reporting_filter, diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h deleted file mode 100644 index 0a2edb0e3d..0000000000 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Copyright 2016 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H -#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H - -#include "src/core/ext/filters/client_channel/lb_policy_factory.h" - -/** Returns a load balancing factory for the glb policy, which tries to connect - * to a load balancing server to decide the next successfully connected - * subchannel to pick. */ -grpc_lb_policy_factory* grpc_glb_lb_factory_create(); - -#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_H */ diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 296bdcb247..f1878a594e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -29,194 +29,225 @@ #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" -grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first"); +namespace grpc_core { + +TraceFlag grpc_lb_pick_first_trace(false, "pick_first"); + +namespace { + +// +// pick_first LB policy +// + +class PickFirst : public LoadBalancingPolicy { + public: + explicit PickFirst(const Args& args); + + void UpdateLocked(const grpc_channel_args& args) override; + bool PickLocked(PickState* pick) override; + void CancelPickLocked(PickState* pick, grpc_error* error) override; + void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error* error) override; + void NotifyOnStateChangeLocked(grpc_connectivity_state* state, + grpc_closure* closure) override; + grpc_connectivity_state CheckConnectivityLocked( + grpc_error** connectivity_error) override; + void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; + void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; + void ExitIdleLocked() override; + + private: + ~PickFirst(); + + void ShutdownLocked() override; + + void StartPickingLocked(); + void DestroyUnselectedSubchannelsLocked(); + + static void OnConnectivityChangedLocked(void* arg, grpc_error* error); + + void SubchannelListRefForConnectivityWatch( + grpc_lb_subchannel_list* subchannel_list, const char* reason); + void SubchannelListUnrefForConnectivityWatch( + grpc_lb_subchannel_list* subchannel_list, const char* reason); -typedef struct { - /** base policy: must be first */ - grpc_lb_policy base; /** all our subchannels */ - grpc_lb_subchannel_list* subchannel_list; + grpc_lb_subchannel_list* subchannel_list_ = nullptr; /** latest pending subchannel list */ - grpc_lb_subchannel_list* latest_pending_subchannel_list; + grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr; /** selected subchannel in \a subchannel_list */ - grpc_lb_subchannel_data* selected; + grpc_lb_subchannel_data* selected_ = nullptr; /** have we started picking? */ - bool started_picking; + bool started_picking_ = false; /** are we shut down? */ - bool shutdown; + bool shutdown_ = false; /** list of picks that are waiting on connectivity */ - grpc_lb_policy_pick_state* pending_picks; + PickState* pending_picks_ = nullptr; /** our connectivity state tracker */ - grpc_connectivity_state_tracker state_tracker; -} pick_first_lb_policy; - -static void pf_destroy(grpc_lb_policy* pol) { - pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol); - GPR_ASSERT(p->subchannel_list == nullptr); - GPR_ASSERT(p->latest_pending_subchannel_list == nullptr); - GPR_ASSERT(p->pending_picks == nullptr); - grpc_connectivity_state_destroy(&p->state_tracker); - gpr_free(p); - grpc_subchannel_index_unref(); + grpc_connectivity_state_tracker state_tracker_; +}; + +PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) { + GPR_ASSERT(args.client_channel_factory != nullptr); + grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, + "pick_first"); if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, "Pick First %p destroyed.", (void*)p); + gpr_log(GPR_DEBUG, "Pick First %p created.", this); } + UpdateLocked(*args.args); + grpc_subchannel_index_ref(); } -static void pf_shutdown_locked(grpc_lb_policy* pol, - grpc_lb_policy* new_policy) { - pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol); - grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); +PickFirst::~PickFirst() { if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p); + gpr_log(GPR_DEBUG, "Destroying Pick First %p", this); } - p->shutdown = true; - grpc_lb_policy_pick_state* pick; - while ((pick = p->pending_picks) != nullptr) { - p->pending_picks = pick->next; - if (new_policy != nullptr) { - // Hand off to new LB policy. - if (grpc_lb_policy_pick_locked(new_policy, pick)) { - // Synchronous return, schedule closure. - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); - } - } else { - pick->connected_subchannel.reset(); - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); + GPR_ASSERT(subchannel_list_ == nullptr); + GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); + GPR_ASSERT(pending_picks_ == nullptr); + grpc_connectivity_state_destroy(&state_tracker_); + grpc_subchannel_index_unref(); +} + +void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { + PickState* pick; + while ((pick = pending_picks_) != nullptr) { + pending_picks_ = pick->next; + if (new_policy->PickLocked(pick)) { + // Synchronous return, schedule closure. + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } } - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, +} + +void PickFirst::ShutdownLocked() { + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); + if (grpc_lb_pick_first_trace.enabled()) { + gpr_log(GPR_DEBUG, "Pick First %p Shutting down", this); + } + shutdown_ = true; + PickState* pick; + while ((pick = pending_picks_) != nullptr) { + pending_picks_ = pick->next; + pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); + } + grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "shutdown"); - if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, - "pf_shutdown"); - p->subchannel_list = nullptr; + if (subchannel_list_ != nullptr) { + grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "pf_shutdown"); + subchannel_list_ = nullptr; } - if (p->latest_pending_subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref( - p->latest_pending_subchannel_list, "pf_shutdown"); - p->latest_pending_subchannel_list = nullptr; + if (latest_pending_subchannel_list_ != nullptr) { + grpc_lb_subchannel_list_shutdown_and_unref(latest_pending_subchannel_list_, + "pf_shutdown"); + latest_pending_subchannel_list_ = nullptr; } - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, - GRPC_ERROR_CANCELLED); + TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } -static void pf_cancel_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick, - grpc_error* error) { - pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol); - grpc_lb_policy_pick_state* pp = p->pending_picks; - p->pending_picks = nullptr; +void PickFirst::CancelPickLocked(PickState* pick, grpc_error* error) { + PickState* pp = pending_picks_; + pending_picks_ = nullptr; while (pp != nullptr) { - grpc_lb_policy_pick_state* next = pp->next; + PickState* next = pp->next; if (pp == pick) { pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { - pp->next = p->pending_picks; - p->pending_picks = pp; + pp->next = pending_picks_; + pending_picks_ = pp; } pp = next; } GRPC_ERROR_UNREF(error); } -static void pf_cancel_picks_locked(grpc_lb_policy* pol, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error* error) { - pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol); - grpc_lb_policy_pick_state* pick = p->pending_picks; - p->pending_picks = nullptr; +void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error* error) { + PickState* pick = pending_picks_; + pending_picks_ = nullptr; while (pick != nullptr) { - grpc_lb_policy_pick_state* next = pick->next; + PickState* next = pick->next; if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { - pick->next = p->pending_picks; - p->pending_picks = pick; + pick->next = pending_picks_; + pending_picks_ = pick; } pick = next; } GRPC_ERROR_UNREF(error); } -static void start_picking_locked(pick_first_lb_policy* p) { - p->started_picking = true; - if (p->subchannel_list != nullptr && - p->subchannel_list->num_subchannels > 0) { - p->subchannel_list->checking_subchannel = 0; - for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { - if (p->subchannel_list->subchannels[i].subchannel != nullptr) { - grpc_lb_subchannel_list_ref_for_connectivity_watch( - p->subchannel_list, "connectivity_watch+start_picking"); +void PickFirst::StartPickingLocked() { + started_picking_ = true; + if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels > 0) { + subchannel_list_->checking_subchannel = 0; + for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { + if (subchannel_list_->subchannels[i].subchannel != nullptr) { + SubchannelListRefForConnectivityWatch( + subchannel_list_, "connectivity_watch+start_picking"); grpc_lb_subchannel_data_start_connectivity_watch( - &p->subchannel_list->subchannels[i]); + &subchannel_list_->subchannels[i]); break; } } } } -static void pf_exit_idle_locked(grpc_lb_policy* pol) { - pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol); - if (!p->started_picking) { - start_picking_locked(p); +void PickFirst::ExitIdleLocked() { + if (!started_picking_) { + StartPickingLocked(); } } -static int pf_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick) { - pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol); +bool PickFirst::PickLocked(PickState* pick) { // If we have a selected subchannel already, return synchronously. - if (p->selected != nullptr) { - pick->connected_subchannel = p->selected->connected_subchannel; - return 1; + if (selected_ != nullptr) { + pick->connected_subchannel = selected_->connected_subchannel; + return true; } // No subchannel selected yet, so handle asynchronously. - if (!p->started_picking) { - start_picking_locked(p); + if (!started_picking_) { + StartPickingLocked(); } - pick->next = p->pending_picks; - p->pending_picks = pick; - return 0; + pick->next = pending_picks_; + pending_picks_ = pick; + return false; } -static void destroy_unselected_subchannels_locked(pick_first_lb_policy* p) { - for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { - grpc_lb_subchannel_data* sd = &p->subchannel_list->subchannels[i]; - if (p->selected != sd) { +void PickFirst::DestroyUnselectedSubchannelsLocked() { + for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { + grpc_lb_subchannel_data* sd = &subchannel_list_->subchannels[i]; + if (selected_ != sd) { grpc_lb_subchannel_data_unref_subchannel(sd, "selected_different_subchannel"); } } } -static grpc_connectivity_state pf_check_connectivity_locked( - grpc_lb_policy* pol, grpc_error** error) { - pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol); - return grpc_connectivity_state_get(&p->state_tracker, error); +grpc_connectivity_state PickFirst::CheckConnectivityLocked(grpc_error** error) { + return grpc_connectivity_state_get(&state_tracker_, error); } -static void pf_notify_on_state_change_locked(grpc_lb_policy* pol, - grpc_connectivity_state* current, - grpc_closure* notify) { - pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol); - grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, +void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current, + grpc_closure* notify) { + grpc_connectivity_state_notify_on_state_change(&state_tracker_, current, notify); } -static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, - grpc_closure* on_ack) { - pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(pol); - if (p->selected) { - p->selected->connected_subchannel->Ping(on_initiate, on_ack); +void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { + if (selected_ != nullptr) { + selected_->connected_subchannel->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); @@ -225,18 +256,31 @@ static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, } } -static void pf_connectivity_changed_locked(void* arg, grpc_error* error); +void PickFirst::SubchannelListRefForConnectivityWatch( + grpc_lb_subchannel_list* subchannel_list, const char* reason) { + // TODO(roth): We currently track this ref manually. Once the new + // ClosureRef API is ready and the subchannel_list code has been + // converted to a C++ API, find a way to hold the RefCountedPtr<> + // somewhere (maybe in the subchannel_data object) instead of doing + // this manually. + auto self = Ref(DEBUG_LOCATION, reason); + self.release(); + grpc_lb_subchannel_list_ref(subchannel_list, reason); +} -static void pf_update_locked(grpc_lb_policy* policy, - const grpc_lb_policy_args* args) { - pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(policy); - const grpc_arg* arg = - grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); +void PickFirst::SubchannelListUnrefForConnectivityWatch( + grpc_lb_subchannel_list* subchannel_list, const char* reason) { + Unref(DEBUG_LOCATION, reason); + grpc_lb_subchannel_list_unref(subchannel_list, reason); +} + +void PickFirst::UpdateLocked(const grpc_channel_args& args) { + const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { - if (p->subchannel_list == nullptr) { + if (subchannel_list_ == nullptr) { // If we don't have a current subchannel list, go into TRANSIENT FAILURE. grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), "pf_update_missing"); } else { @@ -244,77 +288,78 @@ static void pf_update_locked(grpc_lb_policy* policy, gpr_log(GPR_ERROR, "No valid LB addresses channel arg for Pick First %p update, " "ignoring.", - (void*)p); + this); } return; } const grpc_lb_addresses* addresses = - static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); + (const grpc_lb_addresses*)arg->value.pointer.p; if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", - (void*)p, static_cast<unsigned long>(addresses->num_addresses)); + gpr_log(GPR_INFO, + "Pick First %p received update with %" PRIuPTR " addresses", this, + addresses->num_addresses); } grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( - &p->base, &grpc_lb_pick_first_trace, addresses, args, - pf_connectivity_changed_locked); + this, &grpc_lb_pick_first_trace, addresses, combiner(), + client_channel_factory(), args, &PickFirst::OnConnectivityChangedLocked); if (subchannel_list->num_subchannels == 0) { // Empty update or no valid subchannels. Unsubscribe from all current // subchannels and put the channel in TRANSIENT_FAILURE. grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "pf_update_empty"); - if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, + if (subchannel_list_ != nullptr) { + grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "sl_shutdown_empty_update"); } - p->subchannel_list = subchannel_list; // Empty list. - p->selected = nullptr; + subchannel_list_ = subchannel_list; // Empty list. + selected_ = nullptr; return; } - if (p->selected == nullptr) { + if (selected_ == nullptr) { // We don't yet have a selected subchannel, so replace the current // subchannel list immediately. - if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, + if (subchannel_list_ != nullptr) { + grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "pf_update_before_selected"); } - p->subchannel_list = subchannel_list; + subchannel_list_ = subchannel_list; } else { // We do have a selected subchannel. // Check if it's present in the new list. If so, we're done. for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; - if (sd->subchannel == p->selected->subchannel) { + if (sd->subchannel == selected_->subchannel) { // The currently selected subchannel is in the update: we are done. if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p found already selected subchannel %p " "at update index %" PRIuPTR " of %" PRIuPTR "; update done", - p, p->selected->subchannel, i, + this, selected_->subchannel, i, subchannel_list->num_subchannels); } - if (p->selected->connected_subchannel != nullptr) { - sd->connected_subchannel = p->selected->connected_subchannel; + if (selected_->connected_subchannel != nullptr) { + sd->connected_subchannel = selected_->connected_subchannel; } - p->selected = sd; - if (p->subchannel_list != nullptr) { + selected_ = sd; + if (subchannel_list_ != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( - p->subchannel_list, "pf_update_includes_selected"); + subchannel_list_, "pf_update_includes_selected"); } - p->subchannel_list = subchannel_list; - destroy_unselected_subchannels_locked(p); - grpc_lb_subchannel_list_ref_for_connectivity_watch( + subchannel_list_ = subchannel_list; + DestroyUnselectedSubchannelsLocked(); + SubchannelListRefForConnectivityWatch( subchannel_list, "connectivity_watch+replace_selected"); grpc_lb_subchannel_data_start_connectivity_watch(sd); // If there was a previously pending update (which may or may // not have contained the currently selected subchannel), drop // it, so that it doesn't override what we've done here. - if (p->latest_pending_subchannel_list != nullptr) { + if (latest_pending_subchannel_list_ != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( - p->latest_pending_subchannel_list, + latest_pending_subchannel_list_, "pf_update_includes_selected+outdated"); - p->latest_pending_subchannel_list = nullptr; + latest_pending_subchannel_list_ = nullptr; } return; } @@ -323,84 +368,81 @@ static void pf_update_locked(grpc_lb_policy* policy, // pending subchannel list to the new subchannel list. We will wait // for it to report READY before swapping it into the current // subchannel list. - if (p->latest_pending_subchannel_list != nullptr) { + if (latest_pending_subchannel_list_ != nullptr) { if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p Shutting down latest pending subchannel list " "%p, about to be replaced by newer latest %p", - (void*)p, (void*)p->latest_pending_subchannel_list, - (void*)subchannel_list); + this, latest_pending_subchannel_list_, subchannel_list); } grpc_lb_subchannel_list_shutdown_and_unref( - p->latest_pending_subchannel_list, "sl_outdated_dont_smash"); + latest_pending_subchannel_list_, "sl_outdated_dont_smash"); } - p->latest_pending_subchannel_list = subchannel_list; + latest_pending_subchannel_list_ = subchannel_list; } // If we've started picking, start trying to connect to the first // subchannel in the new list. - if (p->started_picking) { - grpc_lb_subchannel_list_ref_for_connectivity_watch( - subchannel_list, "connectivity_watch+update"); + if (started_picking_) { + SubchannelListRefForConnectivityWatch(subchannel_list, + "connectivity_watch+update"); grpc_lb_subchannel_data_start_connectivity_watch( &subchannel_list->subchannels[0]); } } -static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { - grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg); - pick_first_lb_policy* p = - reinterpret_cast<pick_first_lb_policy*>(sd->subchannel_list->policy); +void PickFirst::OnConnectivityChangedLocked(void* arg, grpc_error* error) { + grpc_lb_subchannel_data* sd = reinterpret_cast<grpc_lb_subchannel_data*>(arg); + PickFirst* p = reinterpret_cast<PickFirst*>(sd->subchannel_list->policy); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p connectivity changed for subchannel %p (%" PRIuPTR " of %" PRIuPTR - "), subchannel_list %p: state=%s p->shutdown=%d " + "), subchannel_list %p: state=%s p->shutdown_=%d " "sd->subchannel_list->shutting_down=%d error=%s", - (void*)p, (void*)sd->subchannel, - sd->subchannel_list->checking_subchannel, - sd->subchannel_list->num_subchannels, (void*)sd->subchannel_list, + p, sd->subchannel, sd->subchannel_list->checking_subchannel, + sd->subchannel_list->num_subchannels, sd->subchannel_list, grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), - p->shutdown, sd->subchannel_list->shutting_down, + p->shutdown_, sd->subchannel_list->shutting_down, grpc_error_string(error)); } // If the policy is shutting down, unref and return. - if (p->shutdown) { + if (p->shutdown_) { grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_data_unref_subchannel(sd, "pf_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list, - "pf_shutdown"); + p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, + "pf_shutdown"); return; } // If the subchannel list is shutting down, stop watching. if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_data_unref_subchannel(sd, "pf_sl_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list, - "pf_sl_shutdown"); + p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, + "pf_sl_shutdown"); return; } // If we're still here, the notification must be for a subchannel in // either the current or latest pending subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->subchannel_list || - sd->subchannel_list == p->latest_pending_subchannel_list); + GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ || + sd->subchannel_list == p->latest_pending_subchannel_list_); // Update state. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; // Handle updates for the currently selected subchannel. - if (p->selected == sd) { + if (p->selected_ == sd) { // If the new state is anything other than READY and there is a // pending update, switch to the pending update. if (sd->curr_connectivity_state != GRPC_CHANNEL_READY && - p->latest_pending_subchannel_list != nullptr) { - p->selected = nullptr; + p->latest_pending_subchannel_list_ != nullptr) { + p->selected_ = nullptr; grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_list_unref_for_connectivity_watch( + p->SubchannelListUnrefForConnectivityWatch( sd->subchannel_list, "selected_not_ready+switch_to_update"); grpc_lb_subchannel_list_shutdown_and_unref( - p->subchannel_list, "selected_not_ready+switch_to_update"); - p->subchannel_list = p->latest_pending_subchannel_list; - p->latest_pending_subchannel_list = nullptr; + p->subchannel_list_, "selected_not_ready+switch_to_update"); + p->subchannel_list_ = p->latest_pending_subchannel_list_; + p->latest_pending_subchannel_list_ = nullptr; grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update"); } else { // TODO(juanlishen): we re-resolve when the selected subchannel goes to @@ -411,21 +453,20 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If the selected channel goes bad, request a re-resolution. - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, + grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE, "selected_changed+reresolve"); - p->started_picking = false; - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_pick_first_trace, - GRPC_ERROR_NONE); - // in transient failure. Rely on re-resolution to recover. - p->selected = nullptr; + p->started_picking_ = false; + p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE); + // In transient failure. Rely on re-resolution to recover. + p->selected_ = nullptr; grpc_lb_subchannel_data_stop_connectivity_watch(sd); - grpc_lb_subchannel_list_unref_for_connectivity_watch( - sd->subchannel_list, "pf_selected_shutdown"); + p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, + "pf_selected_shutdown"); grpc_lb_subchannel_data_unref_subchannel( sd, "pf_selected_shutdown"); // Unrefs connected subchannel } else { - grpc_connectivity_state_set(&p->state_tracker, + grpc_connectivity_state_set(&p->state_tracker_, sd->curr_connectivity_state, GRPC_ERROR_REF(error), "selected_changed"); // Renew notification. @@ -436,45 +477,45 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { } // If we get here, there are two possible cases: // 1. We do not currently have a selected subchannel, and the update is - // for a subchannel in p->subchannel_list that we're trying to + // for a subchannel in p->subchannel_list_ that we're trying to // connect to. The goal here is to find a subchannel that we can // select. // 2. We do currently have a selected subchannel, and the update is - // for a subchannel in p->latest_pending_subchannel_list. The + // for a subchannel in p->latest_pending_subchannel_list_. The // goal here is to find a subchannel from the update that we can // select in place of the current one. switch (sd->curr_connectivity_state) { case GRPC_CHANNEL_READY: { - // Case 2. Promote p->latest_pending_subchannel_list to - // p->subchannel_list. + // Case 2. Promote p->latest_pending_subchannel_list_ to + // p->subchannel_list_. sd->connected_subchannel = grpc_subchannel_get_connected_subchannel(sd->subchannel); - if (sd->subchannel_list == p->latest_pending_subchannel_list) { - GPR_ASSERT(p->subchannel_list != nullptr); - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, + if (sd->subchannel_list == p->latest_pending_subchannel_list_) { + GPR_ASSERT(p->subchannel_list_ != nullptr); + grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_, "finish_update"); - p->subchannel_list = p->latest_pending_subchannel_list; - p->latest_pending_subchannel_list = nullptr; + p->subchannel_list_ = p->latest_pending_subchannel_list_; + p->latest_pending_subchannel_list_ = nullptr; } // Cases 1 and 2. - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, + grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); - p->selected = sd; + p->selected_ = sd; if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", (void*)p, - (void*)sd->subchannel); + gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, + sd->subchannel); } // Drop all other subchannels, since we are now connected. - destroy_unselected_subchannels_locked(p); + p->DestroyUnselectedSubchannelsLocked(); // Update any calls that were waiting for a pick. - grpc_lb_policy_pick_state* pick; - while ((pick = p->pending_picks)) { - p->pending_picks = pick->next; - pick->connected_subchannel = p->selected->connected_subchannel; + PickState* pick; + while ((pick = p->pending_picks_)) { + p->pending_picks_ = pick->next; + pick->connected_subchannel = p->selected_->connected_subchannel; if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", - (void*)p->selected); + p->selected_); } GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } @@ -494,9 +535,9 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // Case 1: Only set state to TRANSIENT_FAILURE if we've tried // all subchannels. if (sd->subchannel_list->checking_subchannel == 0 && - sd->subchannel_list == p->subchannel_list) { + sd->subchannel_list == p->subchannel_list_) { grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &p->state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "connecting_transient_failure"); } // Reuses the connectivity refs from the previous watch. @@ -506,8 +547,8 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: { // Only update connectivity state in case 1. - if (sd->subchannel_list == p->subchannel_list) { - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING, + if (sd->subchannel_list == p->subchannel_list_) { + grpc_connectivity_state_set(&p->state_tracker_, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_REF(error), "connecting_changed"); } @@ -520,51 +561,29 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { } } -static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, - pf_shutdown_locked, - pf_pick_locked, - pf_cancel_pick_locked, - pf_cancel_picks_locked, - pf_ping_one_locked, - pf_exit_idle_locked, - pf_check_connectivity_locked, - pf_notify_on_state_change_locked, - pf_update_locked}; - -static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {} - -static void pick_first_factory_unref(grpc_lb_policy_factory* factory) {} - -static grpc_lb_policy* create_pick_first(grpc_lb_policy_factory* factory, - grpc_lb_policy_args* args) { - GPR_ASSERT(args->client_channel_factory != nullptr); - pick_first_lb_policy* p = - static_cast<pick_first_lb_policy*>(gpr_zalloc(sizeof(*p))); - if (grpc_lb_pick_first_trace.enabled()) { - gpr_log(GPR_DEBUG, "Pick First %p created.", (void*)p); - } - pf_update_locked(&p->base, args); - grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); - grpc_subchannel_index_ref(); - return &p->base; -} +// +// factory +// -static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = { - pick_first_factory_ref, pick_first_factory_unref, create_pick_first, - "pick_first"}; +class PickFirstFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( + const LoadBalancingPolicy::Args& args) const override { + return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(args)); + } -static grpc_lb_policy_factory pick_first_lb_policy_factory = { - &pick_first_factory_vtable}; + const char* name() const override { return "pick_first"; } +}; -static grpc_lb_policy_factory* pick_first_lb_factory_create() { - return &pick_first_lb_policy_factory; -} +} // namespace -/* Plugin registration */ +} // namespace grpc_core void grpc_lb_policy_pick_first_init() { - grpc_register_lb_policy(pick_first_lb_factory_create()); + grpc_core::LoadBalancingPolicyRegistry::Builder:: + RegisterLoadBalancingPolicyFactory( + grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>( + grpc_core::New<grpc_core::PickFirstFactory>())); } void grpc_lb_policy_pick_first_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index b5b4c44ef1..178e299b61 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -40,34 +40,94 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" -grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); - -typedef struct round_robin_lb_policy { - /** base policy: must be first */ - grpc_lb_policy base; - - grpc_lb_subchannel_list* subchannel_list; - +namespace grpc_core { + +TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); + +namespace { + +// +// round_robin LB policy +// + +class RoundRobin : public LoadBalancingPolicy { + public: + explicit RoundRobin(const Args& args); + + void UpdateLocked(const grpc_channel_args& args) override; + bool PickLocked(PickState* pick) override; + void CancelPickLocked(PickState* pick, grpc_error* error) override; + void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error* error) override; + void NotifyOnStateChangeLocked(grpc_connectivity_state* state, + grpc_closure* closure) override; + grpc_connectivity_state CheckConnectivityLocked( + grpc_error** connectivity_error) override; + void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; + void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; + void ExitIdleLocked() override; + + private: + ~RoundRobin(); + + void ShutdownLocked() override; + + void StartPickingLocked(); + size_t GetNextReadySubchannelIndexLocked(); + void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); + void UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, + grpc_error* error); + + static void OnConnectivityChangedLocked(void* arg, grpc_error* error); + + void SubchannelListRefForConnectivityWatch( + grpc_lb_subchannel_list* subchannel_list, const char* reason); + void SubchannelListUnrefForConnectivityWatch( + grpc_lb_subchannel_list* subchannel_list, const char* reason); + + /** list of subchannels */ + grpc_lb_subchannel_list* subchannel_list_ = nullptr; /** have we started picking? */ - bool started_picking; + bool started_picking_ = false; /** are we shutting down? */ - bool shutdown; + bool shutdown_ = false; /** List of picks that are waiting on connectivity */ - grpc_lb_policy_pick_state* pending_picks; - + PickState* pending_picks_ = nullptr; /** our connectivity state tracker */ - grpc_connectivity_state_tracker state_tracker; - + grpc_connectivity_state_tracker state_tracker_; /** Index into subchannels for last pick. */ - size_t last_ready_subchannel_index; - + size_t last_ready_subchannel_index_ = 0; /** Latest version of the subchannel list. * Subchannel connectivity callbacks will only promote updated subchannel * lists if they equal \a latest_pending_subchannel_list. In other words, * racing callbacks that reference outdated subchannel lists won't perform any * update. */ - grpc_lb_subchannel_list* latest_pending_subchannel_list; -} round_robin_lb_policy; + grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr; +}; + +RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { + GPR_ASSERT(args.client_channel_factory != nullptr); + grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, + "round_robin"); + UpdateLocked(*args.args); + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_DEBUG, "[RR %p] Created with %" PRIuPTR " subchannels", this, + subchannel_list_->num_subchannels); + } + grpc_subchannel_index_ref(); +} + +RoundRobin::~RoundRobin() { + if (grpc_lb_round_robin_trace.enabled()) { + gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy", this); + } + GPR_ASSERT(subchannel_list_ == nullptr); + GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); + GPR_ASSERT(pending_picks_ == nullptr); + grpc_connectivity_state_destroy(&state_tracker_); + grpc_subchannel_index_unref(); +} /** Returns the index into p->subchannel_list->subchannels of the next * subchannel in READY state, or p->subchannel_list->num_subchannels if no @@ -75,195 +135,190 @@ typedef struct round_robin_lb_policy { * * Note that this function does *not* update p->last_ready_subchannel_index. * The caller must do that if it returns a pick. */ -static size_t get_next_ready_subchannel_index_locked( - const round_robin_lb_policy* p) { - GPR_ASSERT(p->subchannel_list != nullptr); +size_t RoundRobin::GetNextReadySubchannelIndexLocked() { + GPR_ASSERT(subchannel_list_ != nullptr); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, - "[RR %p] getting next ready subchannel (out of %lu), " - "last_ready_subchannel_index=%lu", - (void*)p, - static_cast<unsigned long>(p->subchannel_list->num_subchannels), - static_cast<unsigned long>(p->last_ready_subchannel_index)); - } - for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { - const size_t index = (i + p->last_ready_subchannel_index + 1) % - p->subchannel_list->num_subchannels; + "[RR %p] getting next ready subchannel (out of %" PRIuPTR + "), " + "last_ready_subchannel_index=%" PRIuPTR, + this, subchannel_list_->num_subchannels, + last_ready_subchannel_index_); + } + for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { + const size_t index = (i + last_ready_subchannel_index_ + 1) % + subchannel_list_->num_subchannels; if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, - "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " - "state=%s", - (void*)p, (void*)p->subchannel_list->subchannels[index].subchannel, - (void*)p->subchannel_list, static_cast<unsigned long>(index), + "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR + ": state=%s", + this, subchannel_list_->subchannels[index].subchannel, + subchannel_list_, index, grpc_connectivity_state_name( - p->subchannel_list->subchannels[index].curr_connectivity_state)); + subchannel_list_->subchannels[index].curr_connectivity_state)); } - if (p->subchannel_list->subchannels[index].curr_connectivity_state == + if (subchannel_list_->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, - "[RR %p] found next ready subchannel (%p) at index %lu of " - "subchannel_list %p", - (void*)p, - (void*)p->subchannel_list->subchannels[index].subchannel, - static_cast<unsigned long>(index), (void*)p->subchannel_list); + "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR + " of subchannel_list %p", + this, subchannel_list_->subchannels[index].subchannel, index, + subchannel_list_); } return index; } } if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void*)p); + gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", this); } - return p->subchannel_list->num_subchannels; + return subchannel_list_->num_subchannels; } -// Sets p->last_ready_subchannel_index to last_ready_index. -static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p, - size_t last_ready_index) { - GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels); - p->last_ready_subchannel_index = last_ready_index; +// Sets last_ready_subchannel_index_ to last_ready_index. +void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) { + GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels); + last_ready_subchannel_index_ = last_ready_index; if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, - "[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", - (void*)p, static_cast<unsigned long>(last_ready_index), - (void*)p->subchannel_list->subchannels[last_ready_index].subchannel, - (void*)p->subchannel_list->subchannels[last_ready_index] + "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR + " (SC %p, CSC %p)", + this, last_ready_index, + subchannel_list_->subchannels[last_ready_index].subchannel, + subchannel_list_->subchannels[last_ready_index] .connected_subchannel.get()); } } -static void rr_destroy(grpc_lb_policy* pol) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p", - (void*)pol, (void*)pol); +void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { + PickState* pick; + while ((pick = pending_picks_) != nullptr) { + pending_picks_ = pick->next; + if (new_policy->PickLocked(pick)) { + // Synchronous return, schedule closure. + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + } } - GPR_ASSERT(p->subchannel_list == nullptr); - GPR_ASSERT(p->latest_pending_subchannel_list == nullptr); - grpc_connectivity_state_destroy(&p->state_tracker); - grpc_subchannel_index_unref(); - gpr_free(p); } -static void rr_shutdown_locked(grpc_lb_policy* pol, - grpc_lb_policy* new_policy) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); +void RoundRobin::ShutdownLocked() { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p); - } - p->shutdown = true; - grpc_lb_policy_pick_state* pick; - while ((pick = p->pending_picks) != nullptr) { - p->pending_picks = pick->next; - if (new_policy != nullptr) { - // Hand off to new LB policy. - if (grpc_lb_policy_pick_locked(new_policy, pick)) { - // Synchronous return; schedule callback. - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); - } - } else { - pick->connected_subchannel.reset(); - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); - } + gpr_log(GPR_DEBUG, "[RR %p] Shutting down", this); + } + shutdown_ = true; + PickState* pick; + while ((pick = pending_picks_) != nullptr) { + pending_picks_ = pick->next; + pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); } - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, + grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); - if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, + if (subchannel_list_ != nullptr) { + grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "sl_shutdown_rr_shutdown"); - p->subchannel_list = nullptr; + subchannel_list_ = nullptr; } - if (p->latest_pending_subchannel_list != nullptr) { + if (latest_pending_subchannel_list_ != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( - p->latest_pending_subchannel_list, "sl_shutdown_pending_rr_shutdown"); - p->latest_pending_subchannel_list = nullptr; + latest_pending_subchannel_list_, "sl_shutdown_pending_rr_shutdown"); + latest_pending_subchannel_list_ = nullptr; } - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, - GRPC_ERROR_CANCELLED); + TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } -static void rr_cancel_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick, - grpc_error* error) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - grpc_lb_policy_pick_state* pp = p->pending_picks; - p->pending_picks = nullptr; +void RoundRobin::CancelPickLocked(PickState* pick, grpc_error* error) { + PickState* pp = pending_picks_; + pending_picks_ = nullptr; while (pp != nullptr) { - grpc_lb_policy_pick_state* next = pp->next; + PickState* next = pp->next; if (pp == pick) { pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); + "Pick Cancelled", &error, 1)); } else { - pp->next = p->pending_picks; - p->pending_picks = pp; + pp->next = pending_picks_; + pending_picks_ = pp; } pp = next; } GRPC_ERROR_UNREF(error); } -static void rr_cancel_picks_locked(grpc_lb_policy* pol, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error* error) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - grpc_lb_policy_pick_state* pick = p->pending_picks; - p->pending_picks = nullptr; +void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error* error) { + PickState* pick = pending_picks_; + pending_picks_ = nullptr; while (pick != nullptr) { - grpc_lb_policy_pick_state* next = pick->next; + PickState* next = pick->next; if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); + "Pick Cancelled", &error, 1)); } else { - pick->next = p->pending_picks; - p->pending_picks = pick; + pick->next = pending_picks_; + pending_picks_ = pick; } pick = next; } GRPC_ERROR_UNREF(error); } -static void start_picking_locked(round_robin_lb_policy* p) { - p->started_picking = true; - for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { - if (p->subchannel_list->subchannels[i].subchannel != nullptr) { - grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, - "connectivity_watch"); +void RoundRobin::SubchannelListRefForConnectivityWatch( + grpc_lb_subchannel_list* subchannel_list, const char* reason) { + // TODO(roth): We currently track this ref manually. Once the new + // ClosureRef API is ready and the subchannel_list code has been + // converted to a C++ API, find a way to hold the RefCountedPtr<> + // somewhere (maybe in the subchannel_data object) instead of doing + // this manually. + auto self = Ref(DEBUG_LOCATION, reason); + self.release(); + grpc_lb_subchannel_list_ref(subchannel_list, reason); +} + +void RoundRobin::SubchannelListUnrefForConnectivityWatch( + grpc_lb_subchannel_list* subchannel_list, const char* reason) { + Unref(DEBUG_LOCATION, reason); + grpc_lb_subchannel_list_unref(subchannel_list, reason); +} + +void RoundRobin::StartPickingLocked() { + started_picking_ = true; + for (size_t i = 0; i < subchannel_list_->num_subchannels; i++) { + if (subchannel_list_->subchannels[i].subchannel != nullptr) { + SubchannelListRefForConnectivityWatch(subchannel_list_, + "connectivity_watch"); grpc_lb_subchannel_data_start_connectivity_watch( - &p->subchannel_list->subchannels[i]); + &subchannel_list_->subchannels[i]); } } } -static void rr_exit_idle_locked(grpc_lb_policy* pol) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - if (!p->started_picking) { - start_picking_locked(p); +void RoundRobin::ExitIdleLocked() { + if (!started_picking_) { + StartPickingLocked(); } } -static int rr_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); +bool RoundRobin::PickLocked(PickState* pick) { if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", pol, - p->shutdown); + gpr_log(GPR_DEBUG, "[RR %p] Trying to pick (shutdown: %d)", this, + shutdown_); } - GPR_ASSERT(!p->shutdown); - if (p->subchannel_list != nullptr) { - const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - if (next_ready_index < p->subchannel_list->num_subchannels) { + GPR_ASSERT(!shutdown_); + if (subchannel_list_ != nullptr) { + const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); + if (next_ready_index < subchannel_list_->num_subchannels) { /* readily available, report right away */ grpc_lb_subchannel_data* sd = - &p->subchannel_list->subchannels[next_ready_index]; + &subchannel_list_->subchannels[next_ready_index]; pick->connected_subchannel = sd->connected_subchannel; if (pick->user_data != nullptr) { *pick->user_data = sd->user_data; @@ -273,24 +328,24 @@ static int rr_pick_locked(grpc_lb_policy* pol, GPR_DEBUG, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " "index %" PRIuPTR ")", - p, sd->subchannel, pick->connected_subchannel.get(), + this, sd->subchannel, pick->connected_subchannel.get(), sd->subchannel_list, next_ready_index); } /* only advance the last picked pointer if the selection was used */ - update_last_ready_subchannel_index_locked(p, next_ready_index); - return 1; + UpdateLastReadySubchannelIndexLocked(next_ready_index); + return true; } } /* no pick currently available. Save for later in list of pending picks */ - if (!p->started_picking) { - start_picking_locked(p); + if (!started_picking_) { + StartPickingLocked(); } - pick->next = p->pending_picks; - p->pending_picks = pick; - return 0; + pick->next = pending_picks_; + pending_picks_ = pick; + return false; } -static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { +void UpdateStateCountersLocked(grpc_lb_subchannel_data* sd) { grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN); GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); @@ -318,8 +373,8 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { * (the grpc_lb_subchannel_data associated with the updated subchannel) and the * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used * only if the policy transitions to state TRANSIENT_FAILURE. */ -static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, - grpc_error* error) { +void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, + grpc_error* error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * @@ -335,64 +390,61 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, * subchannel_list->num_subchannels. */ grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; - round_robin_lb_policy* p = - reinterpret_cast<round_robin_lb_policy*>(subchannel_list->policy); GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE); if (subchannel_list->num_ready > 0) { /* 1) READY */ - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, + grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING, + grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); } else if (subchannel_list->num_transient_failures == subchannel_list->num_subchannels) { /* 3) TRANSIENT_FAILURE */ - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "rr_transient_failure"); + grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), + "rr_exhausted_subchannels"); } GRPC_ERROR_UNREF(error); } -static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { - grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg); - round_robin_lb_policy* p = - reinterpret_cast<round_robin_lb_policy*>(sd->subchannel_list->policy); +void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) { + grpc_lb_subchannel_data* sd = reinterpret_cast<grpc_lb_subchannel_data*>(arg); + RoundRobin* p = reinterpret_cast<RoundRobin*>(sd->subchannel_list->policy); if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " "prev_state=%s new_state=%s p->shutdown=%d " "sd->subchannel_list->shutting_down=%d error=%s", - (void*)p, (void*)sd->subchannel, (void*)sd->subchannel_list, + p, sd->subchannel, sd->subchannel_list, grpc_connectivity_state_name(sd->prev_connectivity_state), grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), - p->shutdown, sd->subchannel_list->shutting_down, + p->shutdown_, sd->subchannel_list->shutting_down, grpc_error_string(error)); } GPR_ASSERT(sd->subchannel != nullptr); // If the policy is shutting down, unref and return. - if (p->shutdown) { + if (p->shutdown_) { grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list, - "rr_shutdown"); + p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, + "rr_shutdown"); return; } // If the subchannel list is shutting down, stop watching. if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown"); - grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list, - "rr_sl_shutdown"); + p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, + "rr_sl_shutdown"); return; } // If we're still here, the notification must be for a subchannel in // either the current or latest pending subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->subchannel_list || - sd->subchannel_list == p->latest_pending_subchannel_list); + GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ || + sd->subchannel_list == p->latest_pending_subchannel_list_); GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN); // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to @@ -409,8 +461,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { "Requesting re-resolution", p, sd->subchannel); } - grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, - GRPC_ERROR_NONE); + p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); break; } case GRPC_CHANNEL_READY: { @@ -418,49 +469,47 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { sd->connected_subchannel = grpc_subchannel_get_connected_subchannel(sd->subchannel); } - if (sd->subchannel_list != p->subchannel_list) { - // promote sd->subchannel_list to p->subchannel_list. + if (sd->subchannel_list != p->subchannel_list_) { + // promote sd->subchannel_list to p->subchannel_list_. // sd->subchannel_list must be equal to - // p->latest_pending_subchannel_list because we have already filtered + // p->latest_pending_subchannel_list_ because we have already filtered // for sds belonging to outdated subchannel lists. - GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list); + GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list_); GPR_ASSERT(!sd->subchannel_list->shutting_down); if (grpc_lb_round_robin_trace.enabled()) { - const unsigned long num_subchannels = - p->subchannel_list != nullptr - ? static_cast<unsigned long>( - p->subchannel_list->num_subchannels) + const size_t num_subchannels = + p->subchannel_list_ != nullptr + ? p->subchannel_list_->num_subchannels : 0; gpr_log(GPR_DEBUG, - "[RR %p] phasing out subchannel list %p (size %lu) in favor " - "of %p (size %lu)", - p, p->subchannel_list, num_subchannels, sd->subchannel_list, + "[RR %p] phasing out subchannel list %p (size %" PRIuPTR + ") in favor of %p (size %" PRIuPTR ")", + p, p->subchannel_list_, num_subchannels, sd->subchannel_list, num_subchannels); } - if (p->subchannel_list != nullptr) { + if (p->subchannel_list_ != nullptr) { // dispose of the current subchannel_list - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, + grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_, "sl_phase_out_shutdown"); } - p->subchannel_list = p->latest_pending_subchannel_list; - p->latest_pending_subchannel_list = nullptr; + p->subchannel_list_ = p->latest_pending_subchannel_list_; + p->latest_pending_subchannel_list_ = nullptr; } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemptively replicates rr_pick()'s actions. - */ - const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); + * p->pending_picks. This preemptively replicates rr_pick()'s actions. */ + const size_t next_ready_index = p->GetNextReadySubchannelIndexLocked(); + GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels); grpc_lb_subchannel_data* selected = - &p->subchannel_list->subchannels[next_ready_index]; - if (p->pending_picks != nullptr) { + &p->subchannel_list_->subchannels[next_ready_index]; + if (p->pending_picks_ != nullptr) { // if the selected subchannel is going to be used for the pending // picks, update the last picked pointer - update_last_ready_subchannel_index_locked(p, next_ready_index); + p->UpdateLastReadySubchannelIndexLocked(next_ready_index); } - grpc_lb_policy_pick_state* pick; - while ((pick = p->pending_picks)) { - p->pending_picks = pick->next; + PickState* pick; + while ((pick = p->pending_picks_)) { + p->pending_picks_ = pick->next; pick->connected_subchannel = selected->connected_subchannel; if (pick->user_data != nullptr) { *pick->user_data = selected->user_data; @@ -468,10 +517,9 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Fulfilling pending pick. Target <-- subchannel %p " - "(subchannel_list %p, index %lu)", - (void*)p, (void*)selected->subchannel, - (void*)p->subchannel_list, - static_cast<unsigned long>(next_ready_index)); + "(subchannel_list %p, index %" PRIuPTR ")", + p, selected->subchannel, p->subchannel_list_, + next_ready_index); } GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } @@ -482,40 +530,34 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE:; // fallthrough } - // Update state counters and new overall state. - update_state_counters_locked(sd); + // Update state counters. + UpdateStateCountersLocked(sd); // Only update connectivity based on the selected subchannel list. - if (sd->subchannel_list == p->subchannel_list) { - update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); + if (sd->subchannel_list == p->subchannel_list_) { + p->UpdateConnectivityStatusLocked(sd, GRPC_ERROR_REF(error)); } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); } -static grpc_connectivity_state rr_check_connectivity_locked( - grpc_lb_policy* pol, grpc_error** error) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - return grpc_connectivity_state_get(&p->state_tracker, error); +grpc_connectivity_state RoundRobin::CheckConnectivityLocked( + grpc_error** error) { + return grpc_connectivity_state_get(&state_tracker_, error); } -static void rr_notify_on_state_change_locked(grpc_lb_policy* pol, - grpc_connectivity_state* current, - grpc_closure* notify) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, +void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, + grpc_closure* notify) { + grpc_connectivity_state_notify_on_state_change(&state_tracker_, current, notify); } -static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, +void RoundRobin::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); - const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); - if (next_ready_index < p->subchannel_list->num_subchannels) { + const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); + if (next_ready_index < subchannel_list_->num_subchannels) { grpc_lb_subchannel_data* selected = - &p->subchannel_list->subchannels[next_ready_index]; - grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> target = - selected->connected_subchannel; - target->Ping(on_initiate, on_ack); + &subchannel_list_->subchannels[next_ready_index]; + selected->connected_subchannel->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); @@ -524,45 +566,41 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, } } -static void rr_update_locked(grpc_lb_policy* policy, - const grpc_lb_policy_args* args) { - round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(policy); - const grpc_arg* arg = - grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); +void RoundRobin::UpdateLocked(const grpc_channel_args& args) { + const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { - gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", p); + gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. // Otherwise, keep using the current subchannel list (ignore this update). - if (p->subchannel_list == nullptr) { + if (subchannel_list_ == nullptr) { grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), "rr_update_missing"); } return; } - grpc_lb_addresses* addresses = - static_cast<grpc_lb_addresses*>(arg->value.pointer.p); + grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p; if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", p, - addresses->num_addresses); + gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", + this, addresses->num_addresses); } grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( - &p->base, &grpc_lb_round_robin_trace, addresses, args, - rr_connectivity_changed_locked); + this, &grpc_lb_round_robin_trace, addresses, combiner(), + client_channel_factory(), args, &RoundRobin::OnConnectivityChangedLocked); if (subchannel_list->num_subchannels == 0) { grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "rr_update_empty"); - if (p->subchannel_list != nullptr) { - grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, + if (subchannel_list_ != nullptr) { + grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "sl_shutdown_empty_update"); } - p->subchannel_list = subchannel_list; // empty list + subchannel_list_ = subchannel_list; // empty list return; } - if (p->started_picking) { + if (started_picking_) { for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { const grpc_connectivity_state subchannel_state = grpc_subchannel_check_connectivity( @@ -587,87 +625,61 @@ static void rr_update_locked(grpc_lb_policy* policy, ++subchannel_list->num_transient_failures; } } - if (p->latest_pending_subchannel_list != nullptr) { + if (latest_pending_subchannel_list_ != nullptr) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down latest pending subchannel list %p, " "about to be replaced by newer latest %p", - (void*)p, (void*)p->latest_pending_subchannel_list, - (void*)subchannel_list); + this, latest_pending_subchannel_list_, subchannel_list); } grpc_lb_subchannel_list_shutdown_and_unref( - p->latest_pending_subchannel_list, "sl_outdated"); + latest_pending_subchannel_list_, "sl_outdated"); } - p->latest_pending_subchannel_list = subchannel_list; + latest_pending_subchannel_list_ = subchannel_list; for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { /* Watch every new subchannel. A subchannel list becomes active the * moment one of its subchannels is READY. At that moment, we swap * p->subchannel_list for sd->subchannel_list, provided the subchannel * list is still valid (ie, isn't shutting down) */ - grpc_lb_subchannel_list_ref_for_connectivity_watch(subchannel_list, - "connectivity_watch"); + SubchannelListRefForConnectivityWatch(subchannel_list, + "connectivity_watch"); grpc_lb_subchannel_data_start_connectivity_watch( &subchannel_list->subchannels[i]); } } else { // The policy isn't picking yet. Save the update for later, disposing of // previous version if any. - if (p->subchannel_list != nullptr) { + if (subchannel_list_ != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( - p->subchannel_list, "rr_update_before_started_picking"); + subchannel_list_, "rr_update_before_started_picking"); } - p->subchannel_list = subchannel_list; + subchannel_list_ = subchannel_list; } } -static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, - rr_shutdown_locked, - rr_pick_locked, - rr_cancel_pick_locked, - rr_cancel_picks_locked, - rr_ping_one_locked, - rr_exit_idle_locked, - rr_check_connectivity_locked, - rr_notify_on_state_change_locked, - rr_update_locked}; - -static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {} - -static void round_robin_factory_unref(grpc_lb_policy_factory* factory) {} - -static grpc_lb_policy* round_robin_create(grpc_lb_policy_factory* factory, - grpc_lb_policy_args* args) { - GPR_ASSERT(args->client_channel_factory != nullptr); - round_robin_lb_policy* p = - static_cast<round_robin_lb_policy*>(gpr_zalloc(sizeof(*p))); - grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); - grpc_subchannel_index_ref(); - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, - "round_robin"); - rr_update_locked(&p->base, args); - if (grpc_lb_round_robin_trace.enabled()) { - gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void*)p, - static_cast<unsigned long>(p->subchannel_list->num_subchannels)); - } - return &p->base; -} +// +// factory +// -static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = { - round_robin_factory_ref, round_robin_factory_unref, round_robin_create, - "round_robin"}; +class RoundRobinFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( + const LoadBalancingPolicy::Args& args) const override { + return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args)); + } -static grpc_lb_policy_factory round_robin_lb_policy_factory = { - &round_robin_factory_vtable}; + const char* name() const override { return "round_robin"; } +}; -static grpc_lb_policy_factory* round_robin_lb_factory_create() { - return &round_robin_lb_policy_factory; -} +} // namespace -/* Plugin registration */ +} // namespace grpc_core void grpc_lb_policy_round_robin_init() { - grpc_register_lb_policy(round_robin_lb_factory_create()); + grpc_core::LoadBalancingPolicyRegistry::Builder:: + RegisterLoadBalancingPolicyFactory( + grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>( + grpc_core::New<grpc_core::RoundRobinFactory>())); } void grpc_lb_policy_round_robin_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index e35c5e8db3..f1580e8b91 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -67,7 +67,7 @@ void grpc_lb_subchannel_data_start_connectivity_watch( } sd->connectivity_notification_pending = true; grpc_subchannel_notify_on_state_change( - sd->subchannel, sd->subchannel_list->policy->interested_parties, + sd->subchannel, sd->subchannel_list->policy->interested_parties(), &sd->pending_connectivity_state_unsafe, &sd->connectivity_changed_closure); } @@ -88,9 +88,10 @@ void grpc_lb_subchannel_data_stop_connectivity_watch( } grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( - grpc_lb_policy* p, grpc_core::TraceFlag* tracer, - const grpc_lb_addresses* addresses, const grpc_lb_policy_args* args, - grpc_iomgr_cb_func connectivity_changed_cb) { + grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer, + const grpc_lb_addresses* addresses, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb) { grpc_lb_subchannel_list* subchannel_list = static_cast<grpc_lb_subchannel_list*>( gpr_zalloc(sizeof(*subchannel_list))); @@ -118,12 +119,11 @@ grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( grpc_arg addr_arg = grpc_create_subchannel_address_arg(&addresses->addresses[i].address); grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, - 1); + &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1); gpr_free(addr_arg.value.string); sc_args.args = new_args; grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( - args->client_channel_factory, &sc_args); + client_channel_factory, &sc_args); grpc_channel_args_destroy(new_args); if (subchannel == nullptr) { // Subchannel could not be created. @@ -154,7 +154,7 @@ grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( sd->subchannel = subchannel; GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, connectivity_changed_cb, sd, - grpc_combiner_scheduler(args->combiner)); + grpc_combiner_scheduler(combiner)); // We assume that the current state is IDLE. If not, we'll get a // callback telling us that. sd->prev_connectivity_state = GRPC_CHANNEL_IDLE; @@ -212,18 +212,6 @@ void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list, } } -void grpc_lb_subchannel_list_ref_for_connectivity_watch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - GRPC_LB_POLICY_REF(subchannel_list->policy, reason); - grpc_lb_subchannel_list_ref(subchannel_list, reason); -} - -void grpc_lb_subchannel_list_unref_for_connectivity_watch( - grpc_lb_subchannel_list* subchannel_list, const char* reason) { - GRPC_LB_POLICY_UNREF(subchannel_list->policy, reason); - grpc_lb_subchannel_list_unref(subchannel_list, reason); -} - static void subchannel_data_cancel_connectivity_watch( grpc_lb_subchannel_data* sd, const char* reason) { if (sd->subchannel_list->tracer->enabled()) { diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 91537f3afe..7a8f5f1029 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -82,7 +82,7 @@ void grpc_lb_subchannel_data_stop_connectivity_watch( struct grpc_lb_subchannel_list { /** backpointer to owning policy */ - grpc_lb_policy* policy; + grpc_core::LoadBalancingPolicy* policy; grpc_core::TraceFlag* tracer; @@ -115,9 +115,10 @@ struct grpc_lb_subchannel_list { }; grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( - grpc_lb_policy* p, grpc_core::TraceFlag* tracer, - const grpc_lb_addresses* addresses, const grpc_lb_policy_args* args, - grpc_iomgr_cb_func connectivity_changed_cb); + grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer, + const grpc_lb_addresses* addresses, grpc_combiner* combiner, + grpc_client_channel_factory* client_channel_factory, + const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb); void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list, const char* reason); @@ -125,13 +126,6 @@ void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list, void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list, const char* reason); -/// Takes and releases refs needed for a connectivity notification. -/// This includes a ref to subchannel_list and a weak ref to the LB policy. -void grpc_lb_subchannel_list_ref_for_connectivity_watch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); -void grpc_lb_subchannel_list_unref_for_connectivity_watch( - grpc_lb_subchannel_list* subchannel_list, const char* reason); - /// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The /// connectivity state notification callback will ultimately unref it. void grpc_lb_subchannel_list_shutdown_and_unref( diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.cc b/src/core/ext/filters/client_channel/lb_policy_factory.cc index f2a800b221..2018aabb91 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.cc +++ b/src/core/ext/filters/client_channel/lb_policy_factory.cc @@ -151,17 +151,3 @@ grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( return nullptr; return static_cast<grpc_lb_addresses*>(lb_addresses_arg->value.pointer.p); } - -void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) { - factory->vtable->ref(factory); -} - -void grpc_lb_policy_factory_unref(grpc_lb_policy_factory* factory) { - factory->vtable->unref(factory); -} - -grpc_lb_policy* grpc_lb_policy_factory_create_lb_policy( - grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) { - if (factory == nullptr) return nullptr; - return factory->vtable->create_lb_policy(factory, args); -} diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index 9da231b657..e0e7d8bf5c 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -26,21 +26,20 @@ #include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/uri_parser.h" +// +// representation of an LB address +// + // Channel arg key for grpc_lb_addresses. #define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses" -typedef struct grpc_lb_policy_factory grpc_lb_policy_factory; -typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; - -struct grpc_lb_policy_factory { - const grpc_lb_policy_factory_vtable* vtable; -}; - /** A resolved address alongside any LB related information associated with it. * \a user_data, if not NULL, contains opaque data meant to be consumed by the * gRPC LB policy. Note that no all LB policies support \a user_data as input. * Those who don't will simply ignore it and will correspondingly return NULL in * their namesake pick() output argument. */ +// TODO(roth): Once we figure out a better way of handling user_data in +// LB policies, convert these structs to C++ classes. typedef struct grpc_lb_address { grpc_resolved_address address; bool is_balancer; @@ -101,30 +100,27 @@ grpc_arg grpc_lb_addresses_create_channel_arg( grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( const grpc_channel_args* channel_args); -/** Arguments passed to LB policies. */ -struct grpc_lb_policy_args { - grpc_client_channel_factory* client_channel_factory; - grpc_channel_args* args; - grpc_combiner* combiner; -}; +// +// LB policy factory +// -struct grpc_lb_policy_factory_vtable { - void (*ref)(grpc_lb_policy_factory* factory); - void (*unref)(grpc_lb_policy_factory* factory); +namespace grpc_core { - /** Implementation of grpc_lb_policy_factory_create_lb_policy */ - grpc_lb_policy* (*create_lb_policy)(grpc_lb_policy_factory* factory, - grpc_lb_policy_args* args); +class LoadBalancingPolicyFactory { + public: + /// Returns a new LB policy instance. + virtual OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( + const LoadBalancingPolicy::Args& args) const GRPC_ABSTRACT; - /** Name for the LB policy this factory implements */ - const char* name; -}; + /// Returns the LB policy name that this factory provides. + /// Caller does NOT take ownership of result. + virtual const char* name() const GRPC_ABSTRACT; -void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory); -void grpc_lb_policy_factory_unref(grpc_lb_policy_factory* factory); + virtual ~LoadBalancingPolicyFactory() {} + + GRPC_ABSTRACT_BASE_CLASS +}; -/** Create a lb_policy instance. */ -grpc_lb_policy* grpc_lb_policy_factory_create_lb_policy( - grpc_lb_policy_factory* factory, grpc_lb_policy_args* args); +} // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_FACTORY_H */ diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.cc b/src/core/ext/filters/client_channel/lb_policy_registry.cc index 8414504e8f..f495cdb3c2 100644 --- a/src/core/ext/filters/client_channel/lb_policy_registry.cc +++ b/src/core/ext/filters/client_channel/lb_policy_registry.cc @@ -21,50 +21,75 @@ #include <string.h> #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/inlined_vector.h" -#define MAX_POLICIES 10 +namespace grpc_core { -static grpc_lb_policy_factory* g_all_of_the_lb_policies[MAX_POLICIES]; -static int g_number_of_lb_policies = 0; +namespace { -void grpc_lb_policy_registry_init(void) { g_number_of_lb_policies = 0; } +class RegistryState { + public: + RegistryState() {} -void grpc_lb_policy_registry_shutdown(void) { - int i; - for (i = 0; i < g_number_of_lb_policies; i++) { - grpc_lb_policy_factory_unref(g_all_of_the_lb_policies[i]); + void RegisterLoadBalancingPolicyFactory( + UniquePtr<LoadBalancingPolicyFactory> factory) { + for (size_t i = 0; i < factories_.size(); ++i) { + GPR_ASSERT(strcmp(factories_[i]->name(), factory->name()) != 0); + } + factories_.push_back(std::move(factory)); } -} -void grpc_register_lb_policy(grpc_lb_policy_factory* factory) { - int i; - for (i = 0; i < g_number_of_lb_policies; i++) { - GPR_ASSERT(0 != gpr_stricmp(factory->vtable->name, - g_all_of_the_lb_policies[i]->vtable->name)); + LoadBalancingPolicyFactory* GetLoadBalancingPolicyFactory( + const char* name) const { + for (size_t i = 0; i < factories_.size(); ++i) { + if (strcmp(name, factories_[i]->name()) == 0) { + return factories_[i].get(); + } + } + return nullptr; } - GPR_ASSERT(g_number_of_lb_policies != MAX_POLICIES); - grpc_lb_policy_factory_ref(factory); - g_all_of_the_lb_policies[g_number_of_lb_policies++] = factory; -} -static grpc_lb_policy_factory* lookup_factory(const char* name) { - int i; + private: + InlinedVector<UniquePtr<LoadBalancingPolicyFactory>, 10> factories_; +}; - if (name == nullptr) return nullptr; +RegistryState* g_state = nullptr; - for (i = 0; i < g_number_of_lb_policies; i++) { - if (0 == gpr_stricmp(name, g_all_of_the_lb_policies[i]->vtable->name)) { - return g_all_of_the_lb_policies[i]; - } - } +} // namespace + +// +// LoadBalancingPolicyRegistry::Builder +// - return nullptr; +void LoadBalancingPolicyRegistry::Builder::InitRegistry() { + if (g_state == nullptr) g_state = New<RegistryState>(); } -grpc_lb_policy* grpc_lb_policy_create(const char* name, - grpc_lb_policy_args* args) { - grpc_lb_policy_factory* factory = lookup_factory(name); - grpc_lb_policy* lb_policy = - grpc_lb_policy_factory_create_lb_policy(factory, args); - return lb_policy; +void LoadBalancingPolicyRegistry::Builder::ShutdownRegistry() { + Delete(g_state); + g_state = nullptr; } + +void LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory( + UniquePtr<LoadBalancingPolicyFactory> factory) { + InitRegistry(); + g_state->RegisterLoadBalancingPolicyFactory(std::move(factory)); +} + +// +// LoadBalancingPolicyRegistry +// + +OrphanablePtr<LoadBalancingPolicy> +LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + const char* name, const LoadBalancingPolicy::Args& args) { + GPR_ASSERT(g_state != nullptr); + // Find factory. + LoadBalancingPolicyFactory* factory = + g_state->GetLoadBalancingPolicyFactory(name); + if (factory == nullptr) return nullptr; // Specified name not found. + // Create policy via factory. + return factory->CreateLoadBalancingPolicy(args); +} + +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.h b/src/core/ext/filters/client_channel/lb_policy_registry.h index 5aff79376b..14c21dfe63 100644 --- a/src/core/ext/filters/client_channel/lb_policy_registry.h +++ b/src/core/ext/filters/client_channel/lb_policy_registry.h @@ -20,21 +20,34 @@ #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H #include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/exec_ctx.h" -/** Initialize the registry and set \a default_factory as the factory to be - * returned when no name is provided in a lookup */ -void grpc_lb_policy_registry_init(void); -void grpc_lb_policy_registry_shutdown(void); +namespace grpc_core { -/** Register a LB policy factory. */ -void grpc_register_lb_policy(grpc_lb_policy_factory* factory); +class LoadBalancingPolicyRegistry { + public: + /// Methods used to create and populate the LoadBalancingPolicyRegistry. + /// NOT THREAD SAFE -- to be used only during global gRPC + /// initialization and shutdown. + class Builder { + public: + /// Global initialization and shutdown hooks. + static void InitRegistry(); + static void ShutdownRegistry(); -/** Create a \a grpc_lb_policy instance. - * - * If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init - * will be returned. */ -grpc_lb_policy* grpc_lb_policy_create(const char* name, - grpc_lb_policy_args* args); + /// Registers an LB policy factory. The factory will be used to create an + /// LB policy whose name matches that of the factory. + static void RegisterLoadBalancingPolicyFactory( + UniquePtr<LoadBalancingPolicyFactory> factory); + }; + + /// Creates an LB policy of the type specified by \a name. + static OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( + const char* name, const LoadBalancingPolicy::Args& args); +}; + +} // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H */ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index c095b5bed9..e00dc3a4ee 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -885,7 +885,6 @@ src/core/ext/filters/client_channel/lb_policy.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc \ -src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 3efaa6e686..0e15ab47ce 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -9002,7 +9002,6 @@ ], "headers": [ "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", - "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", @@ -9015,7 +9014,6 @@ "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc", "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc", - "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.cc", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc", @@ -9039,7 +9037,6 @@ ], "headers": [ "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", - "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", @@ -9052,7 +9049,6 @@ "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc", "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc", - "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc", |