diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 670 |
1 files changed, 256 insertions, 414 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index ba4e90d4c2..06ae79041e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -54,7 +54,7 @@ * operations in progress over the old RR instance. This is done by * decreasing the reference count on the old policy. The moment no more * references are held on the old RR policy, it'll be destroyed and \a - * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN + * on_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN * state. At this point we can transition to a new RR instance safely, which * is done once again via \a rr_handover_locked(). * @@ -106,6 +106,8 @@ #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -113,8 +115,6 @@ #include "src/core/lib/slice/slice_hash_table.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/support/manual_constructor.h" -#include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" @@ -128,185 +128,48 @@ grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb"); -/* add lb_token of selected subchannel (address) to the call's initial - * metadata */ -static grpc_error* initial_metadata_add_lb_token( - grpc_metadata_batch* initial_metadata, - grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) { - GPR_ASSERT(lb_token_mdelem_storage != nullptr); - GPR_ASSERT(!GRPC_MDISNULL(lb_token)); - return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, - lb_token); -} - -static void destroy_client_stats(void* arg) { - grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg); -} - -typedef struct wrapped_rr_closure_arg { - /* the closure instance using this struct as argument */ - grpc_closure wrapper_closure; - - /* the original closure. Usually a on_complete/notify cb for pick() and ping() - * calls against the internal RR instance, respectively. */ - grpc_closure* wrapped_closure; - - /* the pick's initial metadata, kept in order to append the LB token for the - * pick */ - grpc_metadata_batch* initial_metadata; - - /* the picked target, used to determine which LB token to add to the pick's - * initial metadata */ - grpc_connected_subchannel** target; - - /* the context to be populated for the subchannel call */ - grpc_call_context_element* context; - - /* Stats for client-side load reporting. Note that this holds a - * reference, which must be either passed on via context or unreffed. */ +struct glb_lb_policy; + +namespace { + +/// Linked list of pending pick requests. It stores all information needed to +/// eventually call (Round Robin's) pick() on them. They mainly stay pending +/// waiting for the RR policy to be created. +/// +/// Note that when a pick is sent to the RR policy, we inject our own +/// on_complete callback, so that we can intercept the result before +/// invoking the original on_complete callback. This allows us to set the +/// LB token metadata and add client_stats to the call context. +/// See \a pending_pick_complete() for details. +struct pending_pick { + // Our on_complete closure and the original one. + grpc_closure on_complete; + grpc_closure* original_on_complete; + // The original pick. + grpc_lb_policy_pick_state* pick; + // Stats for client-side load reporting. Note that this holds a + // reference, which must be either passed on via context or unreffed. grpc_grpclb_client_stats* client_stats; - - /* the LB token associated with the pick */ + // The LB token associated with the pick. This is set via user_data in + // the pick. grpc_mdelem lb_token; - - /* storage for the lb token initial metadata mdelem */ - grpc_linked_mdelem* lb_token_mdelem_storage; - - /* The RR instance related to the closure */ - grpc_lb_policy* rr_policy; - - /* The grpclb instance that created the wrapping. This instance is not owned, - * reference counts are untouched. It's used only for logging purposes. */ - grpc_lb_policy* glb_policy; - - /* heap memory to be freed upon closure execution. */ - void* free_when_done; -} wrapped_rr_closure_arg; - -/* The \a on_complete closure passed as part of the pick requires keeping a - * reference to its associated round robin instance. We wrap this closure in - * order to unref the round robin instance upon its invocation */ -static void wrapped_rr_closure(void* arg, grpc_error* error) { - wrapped_rr_closure_arg* wc_arg = (wrapped_rr_closure_arg*)arg; - - GPR_ASSERT(wc_arg->wrapped_closure != nullptr); - GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); - - if (wc_arg->rr_policy != nullptr) { - /* if *target is nullptr, no pick has been made by the RR policy (eg, all - * addresses failed to connect). There won't be any user_data/token - * available */ - if (*wc_arg->target != nullptr) { - if (!GRPC_MDISNULL(wc_arg->lb_token)) { - initial_metadata_add_lb_token(wc_arg->initial_metadata, - wc_arg->lb_token_mdelem_storage, - GRPC_MDELEM_REF(wc_arg->lb_token)); - } else { - gpr_log( - GPR_ERROR, - "[grpclb %p] No LB token for connected subchannel pick %p (from RR " - "instance %p).", - wc_arg->glb_policy, *wc_arg->target, wc_arg->rr_policy); - abort(); - } - // Pass on client stats via context. Passes ownership of the reference. - GPR_ASSERT(wc_arg->client_stats != nullptr); - wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; - wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; - } else { - grpc_grpclb_client_stats_unref(wc_arg->client_stats); - } - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy, - wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure"); - } - GPR_ASSERT(wc_arg->free_when_done != nullptr); - gpr_free(wc_arg->free_when_done); -} - -/* Linked list of pending pick requests. It stores all information needed to - * eventually call (Round Robin's) pick() on them. They mainly stay pending - * waiting for the RR policy to be created/updated. - * - * One particularity is the wrapping of the user-provided \a on_complete closure - * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in - * order to correctly unref the RR policy instance upon completion of the pick. - * See \a wrapped_rr_closure for details. */ -typedef struct pending_pick { + // The grpclb instance that created the wrapping. This instance is not owned, + // reference counts are untouched. It's used only for logging purposes. + glb_lb_policy* glb_policy; + // Next pending pick. struct pending_pick* next; +}; - /* original pick()'s arguments */ - grpc_lb_policy_pick_args pick_args; - - /* output argument where to store the pick()ed connected subchannel, or - * nullptr upon error. */ - grpc_connected_subchannel** target; - - /* args for wrapped_on_complete */ - wrapped_rr_closure_arg wrapped_on_complete_arg; -} pending_pick; - -static void add_pending_pick(pending_pick** root, - const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, - grpc_call_context_element* context, - grpc_closure* on_complete) { - pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp)); - pp->next = *root; - pp->pick_args = *pick_args; - pp->target = target; - pp->wrapped_on_complete_arg.wrapped_closure = on_complete; - pp->wrapped_on_complete_arg.target = target; - pp->wrapped_on_complete_arg.context = context; - pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; - pp->wrapped_on_complete_arg.lb_token_mdelem_storage = - pick_args->lb_token_mdelem_storage; - pp->wrapped_on_complete_arg.free_when_done = pp; - GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure, - wrapped_rr_closure, &pp->wrapped_on_complete_arg, - grpc_schedule_on_exec_ctx); - *root = pp; -} - -/* Same as the \a pending_pick struct but for ping operations */ -typedef struct pending_ping { +/// A linked list of pending pings waiting for the RR policy to be created. +struct pending_ping { + grpc_closure* on_initiate; + grpc_closure* on_ack; struct pending_ping* next; +}; - /* args for sending the ping */ - wrapped_rr_closure_arg* on_initiate; - wrapped_rr_closure_arg* on_ack; -} pending_ping; - -static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate, - grpc_closure* on_ack) { - pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping)); - if (on_initiate != nullptr) { - pping->on_initiate = - (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate)); - pping->on_initiate->wrapped_closure = on_initiate; - pping->on_initiate->free_when_done = pping->on_initiate; - GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure, - &pping->on_initiate, grpc_schedule_on_exec_ctx); - } - if (on_ack != nullptr) { - pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack)); - pping->on_ack->wrapped_closure = on_ack; - pping->on_ack->free_when_done = pping->on_ack; - GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure, - &pping->on_ack, grpc_schedule_on_exec_ctx); - } - pping->next = *root; - *root = pping; -} - -/* - * glb_lb_policy - */ -typedef struct rr_connectivity_data rr_connectivity_data; +} // namespace -typedef struct glb_lb_policy { +struct glb_lb_policy { /** base policy: must be first */ grpc_lb_policy base; @@ -331,6 +194,9 @@ typedef struct glb_lb_policy { /** the RR policy to use of the backend servers returned by the LB server */ grpc_lb_policy* rr_policy; + grpc_closure on_rr_connectivity_changed; + grpc_connectivity_state rr_connectivity_state; + bool started_picking; /** our connectivity state tracker */ @@ -365,11 +231,11 @@ typedef struct glb_lb_policy { /** are we already watching the LB channel's connectivity? */ bool watching_lb_channel; - /** is \a lb_call_retry_timer active? */ - bool retry_timer_active; + /** is the callback associated with \a lb_call_retry_timer pending? */ + bool retry_timer_callback_pending; - /** is \a lb_fallback_timer active? */ - bool fallback_timer_active; + /** is the callback associated with \a lb_fallback_timer pending? */ + bool fallback_timer_callback_pending; /** called upon changes to the LB channel's connectivity. */ grpc_closure lb_channel_on_connectivity_changed; @@ -428,22 +294,94 @@ typedef struct glb_lb_policy { /* Interval and timer for next client load report. */ grpc_millis client_stats_report_interval; grpc_timer client_load_report_timer; - bool client_load_report_timer_pending; + bool client_load_report_timer_callback_pending; bool last_client_load_report_counters_were_zero; /* Closure used for either the load report timer or the callback for * completion of sending the load report. */ grpc_closure client_load_report_closure; /* Client load report message payload. */ grpc_byte_buffer* client_load_report_payload; -} glb_lb_policy; - -/* Keeps track and reacts to changes in connectivity of the RR instance */ -struct rr_connectivity_data { - grpc_closure on_change; - grpc_connectivity_state state; - glb_lb_policy* glb_policy; }; +/* add lb_token of selected subchannel (address) to the call's initial + * metadata */ +static grpc_error* initial_metadata_add_lb_token( + grpc_metadata_batch* initial_metadata, + grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) { + GPR_ASSERT(lb_token_mdelem_storage != nullptr); + GPR_ASSERT(!GRPC_MDISNULL(lb_token)); + return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, + lb_token); +} + +static void destroy_client_stats(void* arg) { + grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg); +} + +static void pending_pick_set_metadata_and_context(pending_pick* pp) { + /* if connected_subchannel is nullptr, no pick has been made by the RR + * policy (e.g., all addresses failed to connect). There won't be any + * user_data/token available */ + if (pp->pick->connected_subchannel != nullptr) { + if (!GRPC_MDISNULL(pp->lb_token)) { + initial_metadata_add_lb_token(pp->pick->initial_metadata, + &pp->pick->lb_token_mdelem_storage, + GRPC_MDELEM_REF(pp->lb_token)); + } else { + gpr_log(GPR_ERROR, + "[grpclb %p] No LB token for connected subchannel pick %p", + pp->glb_policy, pp->pick); + abort(); + } + // Pass on client stats via context. Passes ownership of the reference. + GPR_ASSERT(pp->client_stats != nullptr); + pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = + pp->client_stats; + pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = + destroy_client_stats; + } else { + if (pp->client_stats != nullptr) { + grpc_grpclb_client_stats_unref(pp->client_stats); + } + } +} + +/* The \a on_complete closure passed as part of the pick requires keeping a + * reference to its associated round robin instance. We wrap this closure in + * order to unref the round robin instance upon its invocation */ +static void pending_pick_complete(void* arg, grpc_error* error) { + pending_pick* pp = (pending_pick*)arg; + pending_pick_set_metadata_and_context(pp); + GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error)); + gpr_free(pp); +} + +static pending_pick* pending_pick_create(glb_lb_policy* glb_policy, + grpc_lb_policy_pick_state* pick) { + pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp)); + pp->pick = pick; + pp->glb_policy = glb_policy; + GRPC_CLOSURE_INIT(&pp->on_complete, pending_pick_complete, pp, + grpc_schedule_on_exec_ctx); + pp->original_on_complete = pick->on_complete; + pp->pick->on_complete = &pp->on_complete; + return pp; +} + +static void pending_pick_add(pending_pick** root, pending_pick* new_pp) { + new_pp->next = *root; + *root = new_pp; +} + +static void pending_ping_add(pending_ping** root, grpc_closure* on_initiate, + grpc_closure* on_ack) { + pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping)); + pping->on_initiate = on_initiate; + pping->on_ack = on_ack; + pping->next = *root; + *root = pping; +} + static bool is_server_valid(const grpc_grpclb_server* server, size_t idx, bool log) { if (server->drop) return false; @@ -555,7 +493,6 @@ static grpc_lb_addresses* process_serverlist_locked( gpr_free(uri); user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; } - grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, false /* is_balancer */, nullptr /* balancer_name */, user_data); @@ -596,7 +533,6 @@ static void update_lb_connectivity_status_locked( grpc_error* rr_state_error) { const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(&glb_policy->state_tracker); - /* The new connectivity status is a function of the previous one and the new * input coming from the status of the RR policy. * @@ -626,7 +562,6 @@ static void update_lb_connectivity_status_locked( * * (*) This function mustn't be called during shutting down. */ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); - switch (rr_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: @@ -637,7 +572,6 @@ static void update_lb_connectivity_status_locked( case GRPC_CHANNEL_READY: GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); } - if (grpc_lb_glb_trace.enabled()) { gpr_log( GPR_INFO, @@ -655,10 +589,8 @@ static void update_lb_connectivity_status_locked( * cleanups this callback would otherwise be responsible for. * If \a force_async is true, then we will manually schedule the * completion callback even if the pick is available immediately. */ -static bool pick_from_internal_rr_locked( - glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args, - bool force_async, grpc_connected_subchannel** target, - wrapped_rr_closure_arg* wc_arg) { +static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy, + bool force_async, pending_pick* pp) { // Check for drops if we are not using fallback backend addresses. if (glb_policy->serverlist != nullptr) { // Look at the index into the serverlist to see if we should drop this call. @@ -668,57 +600,36 @@ static bool pick_from_internal_rr_locked( glb_policy->serverlist_index = 0; // Wrap-around. } if (server->drop) { - // Not using the RR policy, so unref it. - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy, - wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync"); // Update client load reporting stats to indicate the number of // dropped calls. Note that we have to do this here instead of in // the client_load_reporting filter, because we do not create a // subchannel call (and therefore no client_load_reporting filter) // for dropped calls. - GPR_ASSERT(wc_arg->client_stats != nullptr); + GPR_ASSERT(glb_policy->client_stats != nullptr); grpc_grpclb_client_stats_add_call_dropped_locked( - server->load_balance_token, wc_arg->client_stats); - grpc_grpclb_client_stats_unref(wc_arg->client_stats); + server->load_balance_token, glb_policy->client_stats); if (force_async) { - GPR_ASSERT(wc_arg->wrapped_closure != nullptr); - GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE); - gpr_free(wc_arg->free_when_done); + GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); + gpr_free(pp); return false; } - gpr_free(wc_arg->free_when_done); + gpr_free(pp); return true; } } + // Set client_stats and user_data. + pp->client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats); + GPR_ASSERT(pp->pick->user_data == nullptr); + pp->pick->user_data = (void**)&pp->lb_token; // Pick via the RR policy. - const bool pick_done = grpc_lb_policy_pick_locked( - wc_arg->rr_policy, pick_args, target, wc_arg->context, - (void**)&wc_arg->lb_token, &wc_arg->wrapper_closure); + bool pick_done = grpc_lb_policy_pick_locked(glb_policy->rr_policy, pp->pick); if (pick_done) { - /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy, - wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync"); - /* add the load reporting initial metadata */ - initial_metadata_add_lb_token(pick_args->initial_metadata, - pick_args->lb_token_mdelem_storage, - GRPC_MDELEM_REF(wc_arg->lb_token)); - // Pass on client stats via context. Passes ownership of the reference. - GPR_ASSERT(wc_arg->client_stats != nullptr); - wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; - wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; + pending_pick_set_metadata_and_context(pp); if (force_async) { - GPR_ASSERT(wc_arg->wrapped_closure != nullptr); - GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE); - gpr_free(wc_arg->free_when_done); - return false; + GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); + pick_done = false; } - gpr_free(wc_arg->free_when_done); + gpr_free(pp); } /* else, the pending pick will be registered and taken care of by the * pending pick list inside the RR policy (glb_policy->rr_policy). @@ -760,7 +671,7 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) { gpr_free(args); } -static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error); +static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error); static void create_rr_locked(glb_lb_policy* glb_policy, grpc_lb_policy_args* args) { GPR_ASSERT(glb_policy->rr_policy == nullptr); @@ -782,72 +693,46 @@ static void create_rr_locked(glb_lb_policy* glb_policy, glb_policy->base.request_reresolution = nullptr; glb_policy->rr_policy = new_rr_policy; grpc_error* rr_state_error = nullptr; - const grpc_connectivity_state rr_state = - grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy, - &rr_state_error); + glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked( + glb_policy->rr_policy, &rr_state_error); /* Connectivity state is a function of the RR policy updated/created */ - update_lb_connectivity_status_locked(glb_policy, rr_state, rr_state_error); + update_lb_connectivity_status_locked( + glb_policy, glb_policy->rr_connectivity_state, rr_state_error); /* Add the gRPC LB's interested_parties pollset_set to that of the newly * created RR policy. This will make the RR policy progress upon activity on * gRPC LB, which in turn is tied to the application's call */ grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); - - /* Allocate the data for the tracking of the new RR policy's connectivity. - * It'll be deallocated in glb_rr_connectivity_changed() */ - rr_connectivity_data* rr_connectivity = - (rr_connectivity_data*)gpr_zalloc(sizeof(rr_connectivity_data)); - GRPC_CLOSURE_INIT(&rr_connectivity->on_change, - glb_rr_connectivity_changed_locked, rr_connectivity, + GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed, + on_rr_connectivity_changed_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - rr_connectivity->glb_policy = glb_policy; - rr_connectivity->state = rr_state; - /* Subscribe to changes to the connectivity of the new RR */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb"); - grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy, - &rr_connectivity->state, - &rr_connectivity->on_change); + GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb"); + grpc_lb_policy_notify_on_state_change_locked( + glb_policy->rr_policy, &glb_policy->rr_connectivity_state, + &glb_policy->on_rr_connectivity_changed); grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy); - - /* Update picks and pings in wait */ + // Send pending picks to RR policy. pending_pick* pp; while ((pp = glb_policy->pending_picks)) { glb_policy->pending_picks = pp->next; - GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick"); - pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy; - pp->wrapped_on_complete_arg.client_stats = - grpc_grpclb_client_stats_ref(glb_policy->client_stats); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Pending pick about to (async) PICK from RR %p", glb_policy, glb_policy->rr_policy); } - pick_from_internal_rr_locked(glb_policy, &pp->pick_args, - true /* force_async */, pp->target, - &pp->wrapped_on_complete_arg); + pick_from_internal_rr_locked(glb_policy, true /* force_async */, pp); } - + // Send pending pings to RR policy. pending_ping* pping; while ((pping = glb_policy->pending_pings)) { glb_policy->pending_pings = pping->next; - grpc_closure* on_initiate = nullptr; - grpc_closure* on_ack = nullptr; - if (pping->on_initiate != nullptr) { - GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); - pping->on_initiate->rr_policy = glb_policy->rr_policy; - on_initiate = &pping->on_initiate->wrapper_closure; - } - if (pping->on_ack != nullptr) { - GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); - pping->on_ack->rr_policy = glb_policy->rr_policy; - on_ack = &pping->on_ack->wrapper_closure; - } if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", glb_policy, glb_policy->rr_policy); } - grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); + grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, pping->on_initiate, + pping->on_ack); gpr_free(pping); } } @@ -873,31 +758,28 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) { lb_policy_args_destroy(args); } -static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error) { - rr_connectivity_data* rr_connectivity = (rr_connectivity_data*)arg; - glb_lb_policy* glb_policy = rr_connectivity->glb_policy; +static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) { + glb_lb_policy* glb_policy = (glb_lb_policy*)arg; if (glb_policy->shutting_down) { - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); - gpr_free(rr_connectivity); + GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); return; } - if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) { + if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { /* An RR policy that has transitioned into the SHUTDOWN connectivity state * should not be considered for picks or updates: the SHUTDOWN state is a * sink, policies can't transition back from it. .*/ GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown"); glb_policy->rr_policy = nullptr; - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); - gpr_free(rr_connectivity); + GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); return; } /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */ - update_lb_connectivity_status_locked(glb_policy, rr_connectivity->state, - GRPC_ERROR_REF(error)); - /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */ - grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy, - &rr_connectivity->state, - &rr_connectivity->on_change); + update_lb_connectivity_status_locked( + glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error)); + /* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */ + grpc_lb_policy_notify_on_state_change_locked( + glb_policy->rr_policy, &glb_policy->rr_connectivity_state, + &glb_policy->on_rr_connectivity_changed); } static void destroy_balancer_name(void* balancer_name) { @@ -1005,38 +887,27 @@ static void glb_destroy(grpc_lb_policy* pol) { gpr_free(glb_policy); } -static void glb_shutdown_locked(grpc_lb_policy* pol) { +static void glb_shutdown_locked(grpc_lb_policy* pol, + grpc_lb_policy* new_policy) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); glb_policy->shutting_down = true; - - /* We need a copy of the lb_call pointer because we can't cancell the call - * while holding glb_policy->mu: lb_on_server_status_received, invoked due to - * the cancel, needs to acquire that same lock */ - grpc_call* lb_call = glb_policy->lb_call; - /* glb_policy->lb_call and this local lb_call must be consistent at this point * because glb_policy->lb_call is only assigned in lb_call_init_locked as part * of query_for_backends_locked, which can only be invoked while * glb_policy->shutting_down is false. */ - if (lb_call != nullptr) { - grpc_call_cancel(lb_call, nullptr); + if (glb_policy->lb_call != nullptr) { + grpc_call_cancel(glb_policy->lb_call, nullptr); /* lb_on_server_status_received will pick up the cancel and clean up */ } - if (glb_policy->retry_timer_active) { + if (glb_policy->retry_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_call_retry_timer); - glb_policy->retry_timer_active = false; } - if (glb_policy->fallback_timer_active) { + if (glb_policy->fallback_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_fallback_timer); - glb_policy->fallback_timer_active = false; } - - pending_pick* pp = glb_policy->pending_picks; - glb_policy->pending_picks = nullptr; - pending_ping* pping = glb_policy->pending_pings; - glb_policy->pending_pings = nullptr; if (glb_policy->rr_policy != nullptr) { + grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr); GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown"); } else { grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); @@ -1051,28 +922,35 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) { } grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "glb_shutdown"); - + // Clear pending picks. + pending_pick* pp = glb_policy->pending_picks; + glb_policy->pending_picks = nullptr; while (pp != nullptr) { pending_pick* next = pp->next; - *pp->target = nullptr; - GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, - GRPC_ERROR_REF(error)); - gpr_free(pp); + if (new_policy != nullptr) { + // Hand pick over to new policy. + if (pp->client_stats != nullptr) { + grpc_grpclb_client_stats_unref(pp->client_stats); + } + pp->pick->on_complete = pp->original_on_complete; + if (grpc_lb_policy_pick_locked(new_policy, pp->pick)) { + // Synchronous return; schedule callback. + GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE); + } + gpr_free(pp); + } else { + pp->pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); + } pp = next; } - + // Clear pending pings. + pending_ping* pping = glb_policy->pending_pings; + glb_policy->pending_pings = nullptr; while (pping != nullptr) { pending_ping* next = pping->next; - if (pping->on_initiate != nullptr) { - GRPC_CLOSURE_SCHED(&pping->on_initiate->wrapper_closure, - GRPC_ERROR_REF(error)); - gpr_free(pping->on_initiate); - } - if (pping->on_ack != nullptr) { - GRPC_CLOSURE_SCHED(&pping->on_ack->wrapper_closure, - GRPC_ERROR_REF(error)); - gpr_free(pping->on_ack); - } + GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error)); gpr_free(pping); pping = next; } @@ -1090,16 +968,16 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) { // level (grpclb), inside the glb_policy->pending_picks list. To cancel these, // we invoke the completion closure and set *target to nullptr right here. static void glb_cancel_pick_locked(grpc_lb_policy* pol, - grpc_connected_subchannel** target, + grpc_lb_policy_pick_state* pick, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; pending_pick* pp = glb_policy->pending_picks; glb_policy->pending_picks = nullptr; while (pp != nullptr) { pending_pick* next = pp->next; - if (pp->target == target) { - *target = nullptr; - GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, + if (pp->pick == pick) { + pick->connected_subchannel.reset(); + GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1109,7 +987,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol, pp = next; } if (glb_policy->rr_policy != nullptr) { - grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, target, + grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, pick, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); @@ -1134,9 +1012,9 @@ static void glb_cancel_picks_locked(grpc_lb_policy* pol, glb_policy->pending_picks = nullptr; while (pp != nullptr) { pending_pick* next = pp->next; - if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == + if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { @@ -1158,14 +1036,15 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy); static void start_picking_locked(glb_lb_policy* glb_policy) { /* start a timer to fall back */ if (glb_policy->lb_fallback_timeout_ms > 0 && - glb_policy->serverlist == nullptr && !glb_policy->fallback_timer_active) { + glb_policy->serverlist == nullptr && + !glb_policy->fallback_timer_callback_pending) { grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms; - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); + GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_fallback_timer"); GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - glb_policy->fallback_timer_active = true; + glb_policy->fallback_timer_callback_pending = true; grpc_timer_init(&glb_policy->lb_fallback_timer, deadline, &glb_policy->lb_on_fallback); } @@ -1183,19 +1062,9 @@ static void glb_exit_idle_locked(grpc_lb_policy* pol) { } static int glb_pick_locked(grpc_lb_policy* pol, - const grpc_lb_policy_pick_args* pick_args, - grpc_connected_subchannel** target, - grpc_call_context_element* context, void** user_data, - grpc_closure* on_complete) { - if (pick_args->lb_token_mdelem_storage == nullptr) { - *target = nullptr; - GRPC_CLOSURE_SCHED(on_complete, - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "No mdelem storage for the LB token. Load reporting " - "won't work without it. Failing")); - return 0; - } + grpc_lb_policy_pick_state* pick) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; + pending_pick* pp = pending_pick_create(glb_policy, pick); bool pick_done = false; if (glb_policy->rr_policy != nullptr) { const grpc_connectivity_state rr_connectivity_state = @@ -1203,7 +1072,7 @@ static int glb_pick_locked(grpc_lb_policy* pol, nullptr); // The glb_policy->rr_policy may have transitioned to SHUTDOWN but the // callback registered to capture this event - // (glb_rr_connectivity_changed_locked) may not have been invoked yet. We + // (on_rr_connectivity_changed_locked) may not have been invoked yet. We // need to make sure we aren't trying to pick from a RR policy instance // that's in shutdown. if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { @@ -1213,32 +1082,16 @@ static int glb_pick_locked(grpc_lb_policy* pol, glb_policy, glb_policy->rr_policy, grpc_connectivity_state_name(rr_connectivity_state)); } - add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, - on_complete); + pending_pick_add(&glb_policy->pending_picks, pp); pick_done = false; } else { // RR not in shutdown if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy, glb_policy->rr_policy); } - GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); - wrapped_rr_closure_arg* wc_arg = - (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(wrapped_rr_closure_arg)); - GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg, - grpc_schedule_on_exec_ctx); - wc_arg->rr_policy = glb_policy->rr_policy; - wc_arg->target = target; - wc_arg->context = context; GPR_ASSERT(glb_policy->client_stats != nullptr); - wc_arg->client_stats = - grpc_grpclb_client_stats_ref(glb_policy->client_stats); - wc_arg->wrapped_closure = on_complete; - wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; - wc_arg->initial_metadata = pick_args->initial_metadata; - wc_arg->free_when_done = wc_arg; - wc_arg->glb_policy = pol; - pick_done = pick_from_internal_rr_locked( - glb_policy, pick_args, false /* force_async */, target, wc_arg); + pick_done = + pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp); } } else { // glb_policy->rr_policy == NULL if (grpc_lb_glb_trace.enabled()) { @@ -1246,8 +1099,7 @@ static int glb_pick_locked(grpc_lb_policy* pol, "[grpclb %p] No RR policy. Adding to grpclb's pending picks", glb_policy); } - add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, - on_complete); + pending_pick_add(&glb_policy->pending_picks, pp); if (!glb_policy->started_picking) { start_picking_locked(glb_policy); } @@ -1269,7 +1121,7 @@ static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, if (glb_policy->rr_policy) { grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); } else { - add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack); + pending_ping_add(&glb_policy->pending_pings, on_initiate, on_ack); if (!glb_policy->started_picking) { start_picking_locked(glb_policy); } @@ -1286,7 +1138,7 @@ static void glb_notify_on_state_change_locked(grpc_lb_policy* pol, static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - glb_policy->retry_timer_active = false; + glb_policy->retry_timer_callback_pending = false; if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr && error == GRPC_ERROR_NONE) { if (grpc_lb_glb_trace.enabled()) { @@ -1294,42 +1146,42 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { } query_for_backends_locked(glb_policy); } - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_retry_timer"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer"); } static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { if (glb_policy->started_picking && glb_policy->updating_lb_call) { - if (glb_policy->retry_timer_active) { + if (glb_policy->retry_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_call_retry_timer); } if (!glb_policy->shutting_down) start_picking_locked(glb_policy); glb_policy->updating_lb_call = false; } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ - grpc_millis next_try = glb_policy->lb_call_backoff->Step(); + grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime(); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", glb_policy); grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); if (timeout > 0) { gpr_log(GPR_DEBUG, - "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", + "[grpclb %p] ... retry LB call after %" PRIuPTR "ms.", glb_policy, timeout); } else { - gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.", + gpr_log(GPR_DEBUG, "[grpclb %p] ... retry LB call immediately.", glb_policy); } } - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); + GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer"); GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - glb_policy->retry_timer_active = true; + glb_policy->retry_timer_callback_pending = true; grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry); } - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, - "lb_on_server_status_received_locked"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, + "lb_on_server_status_received_locked"); } static void send_client_load_report_locked(void* arg, grpc_error* error); @@ -1351,8 +1203,8 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) { grpc_byte_buffer_destroy(glb_policy->client_load_report_payload); glb_policy->client_load_report_payload = nullptr; if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) { - glb_policy->client_load_report_timer_pending = false; - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); + glb_policy->client_load_report_timer_callback_pending = false; + GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report"); if (glb_policy->lb_call == nullptr) { maybe_restart_lb_call(glb_policy); } @@ -1392,8 +1244,8 @@ static bool load_report_counters_are_zero(grpc_grpclb_request* request) { static void send_client_load_report_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) { - glb_policy->client_load_report_timer_pending = false; - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report"); + glb_policy->client_load_report_timer_callback_pending = false; + GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report"); if (glb_policy->lb_call == nullptr) { maybe_restart_lb_call(glb_policy); } @@ -1503,7 +1355,7 @@ static void lb_call_destroy_locked(glb_lb_policy* glb_policy) { grpc_byte_buffer_destroy(glb_policy->lb_request_payload); grpc_slice_unref_internal(glb_policy->lb_call_status_details); - if (glb_policy->client_load_report_timer_pending) { + if (glb_policy->client_load_report_timer_callback_pending) { grpc_timer_cancel(&glb_policy->client_load_report_timer); } } @@ -1546,10 +1398,8 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) { op->flags = 0; op->reserved = nullptr; op++; - /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref - * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, - "lb_on_sent_initial_request_locked"); + /* take a ref to be released in lb_on_sent_initial_request_locked() */ + GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_sent_initial_request_locked"); call_error = grpc_call_start_batch_and_execute( glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_sent_initial_request); @@ -1565,10 +1415,8 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) { op->flags = 0; op->reserved = nullptr; op++; - /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref - * count goes to zero) to be unref'd in lb_on_server_status_received_locked */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, - "lb_on_server_status_received_locked"); + /* take a ref to be released in lb_on_server_status_received_locked() */ + GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_server_status_received_locked"); call_error = grpc_call_start_batch_and_execute( glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_server_status_received); @@ -1580,9 +1428,8 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) { op->flags = 0; op->reserved = nullptr; op++; - /* take another weak ref to be unref'd/reused in - * lb_on_response_received_locked */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked"); + /* take a ref to be unref'd/reused in lb_on_response_received_locked() */ + GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_response_received_locked"); call_error = grpc_call_start_batch_and_execute( glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); @@ -1597,8 +1444,7 @@ static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) { if (glb_policy->client_load_report_payload != nullptr) { do_send_client_load_report_locked(glb_policy); } - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, - "lb_on_sent_initial_request_locked"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, "lb_on_sent_initial_request_locked"); } static void lb_on_response_received_locked(void* arg, grpc_error* error) { @@ -1630,11 +1476,9 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { "client load reporting interval = %" PRIdPTR " milliseconds", glb_policy, glb_policy->client_stats_report_interval); } - /* take a weak ref (won't prevent calling of \a glb_shutdown() if the - * strong ref count goes to zero) to be unref'd in - * send_client_load_report_locked() */ - glb_policy->client_load_report_timer_pending = true; - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); + /* take a ref to be unref'd in send_client_load_report_locked() */ + glb_policy->client_load_report_timer_callback_pending = true; + GRPC_LB_POLICY_REF(&glb_policy->base, "client_load_report"); schedule_next_client_load_report(glb_policy); } else if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, @@ -1682,9 +1526,8 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { /* or dispose of the fallback */ grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); glb_policy->fallback_backend_addresses = nullptr; - if (glb_policy->fallback_timer_active) { + if (glb_policy->fallback_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_fallback_timer); - glb_policy->fallback_timer_active = false; } } /* and update the copy in the glb_lb_policy instance. This @@ -1717,27 +1560,27 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { op->flags = 0; op->reserved = nullptr; op++; - /* reuse the "lb_on_response_received_locked" weak ref taken in + /* reuse the "lb_on_response_received_locked" ref taken in * query_for_backends_locked() */ const grpc_call_error call_error = grpc_call_start_batch_and_execute( glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); } else { - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, - "lb_on_response_received_locked_shutdown"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, + "lb_on_response_received_locked_shutdown"); } } else { /* empty payload: call cancelled. */ - /* dispose of the "lb_on_response_received_locked" weak ref taken in + /* dispose of the "lb_on_response_received_locked" ref taken in * query_for_backends_locked() and reused in every reception loop */ - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, - "lb_on_response_received_locked_empty_payload"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, + "lb_on_response_received_locked_empty_payload"); } } static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - glb_policy->fallback_timer_active = false; + glb_policy->fallback_timer_callback_pending = false; /* If we receive a serverlist after the timer fires but before this callback * actually runs, don't fall back. */ if (glb_policy->serverlist == nullptr) { @@ -1751,7 +1594,7 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { rr_handover_locked(glb_policy); } } - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_fallback_timer"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer"); } static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { @@ -1772,7 +1615,7 @@ static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { // If the load report timer is still pending, we wait for it to be // called before restarting the call. Otherwise, we restart the call // here. - if (!glb_policy->client_load_report_timer_pending) { + if (!glb_policy->client_load_report_timer_callback_pending) { maybe_restart_lb_call(glb_policy); } } @@ -1835,7 +1678,7 @@ static void glb_update_locked(grpc_lb_policy* policy, grpc_channel_get_channel_stack(glb_policy->lb_channel)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); glb_policy->watching_lb_channel = true; - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity"); + GRPC_LB_POLICY_REF(&glb_policy->base, "watch_lb_channel_connectivity"); grpc_client_channel_watch_connectivity_state( client_channel_elem, grpc_polling_entity_create_from_pollset_set( @@ -1882,9 +1725,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, // lb_on_server_status_received() will pick up the cancel and reinit // lb_call. } else if (glb_policy->started_picking) { - if (glb_policy->retry_timer_active) { + if (glb_policy->retry_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_call_retry_timer); - glb_policy->retry_timer_active = false; } start_picking_locked(glb_policy); } @@ -1892,8 +1734,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, case GRPC_CHANNEL_SHUTDOWN: done: glb_policy->watching_lb_channel = false; - GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, - "watch_lb_channel_connectivity_cb_shutdown"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, + "watch_lb_channel_connectivity_cb_shutdown"); break; } } |