aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc670
1 files changed, 256 insertions, 414 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index ba4e90d4c2..06ae79041e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -54,7 +54,7 @@
* operations in progress over the old RR instance. This is done by
* decreasing the reference count on the old policy. The moment no more
* references are held on the old RR policy, it'll be destroyed and \a
- * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
+ * on_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
* state. At this point we can transition to a new RR instance safely, which
* is done once again via \a rr_handover_locked().
*
@@ -106,6 +106,8 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -113,8 +115,6 @@
#include "src/core/lib/slice/slice_hash_table.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
-#include "src/core/lib/support/manual_constructor.h"
-#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
@@ -128,185 +128,48 @@
grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb");
-/* add lb_token of selected subchannel (address) to the call's initial
- * metadata */
-static grpc_error* initial_metadata_add_lb_token(
- grpc_metadata_batch* initial_metadata,
- grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) {
- GPR_ASSERT(lb_token_mdelem_storage != nullptr);
- GPR_ASSERT(!GRPC_MDISNULL(lb_token));
- return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
- lb_token);
-}
-
-static void destroy_client_stats(void* arg) {
- grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg);
-}
-
-typedef struct wrapped_rr_closure_arg {
- /* the closure instance using this struct as argument */
- grpc_closure wrapper_closure;
-
- /* the original closure. Usually a on_complete/notify cb for pick() and ping()
- * calls against the internal RR instance, respectively. */
- grpc_closure* wrapped_closure;
-
- /* the pick's initial metadata, kept in order to append the LB token for the
- * pick */
- grpc_metadata_batch* initial_metadata;
-
- /* the picked target, used to determine which LB token to add to the pick's
- * initial metadata */
- grpc_connected_subchannel** target;
-
- /* the context to be populated for the subchannel call */
- grpc_call_context_element* context;
-
- /* Stats for client-side load reporting. Note that this holds a
- * reference, which must be either passed on via context or unreffed. */
+struct glb_lb_policy;
+
+namespace {
+
+/// Linked list of pending pick requests. It stores all information needed to
+/// eventually call (Round Robin's) pick() on them. They mainly stay pending
+/// waiting for the RR policy to be created.
+///
+/// Note that when a pick is sent to the RR policy, we inject our own
+/// on_complete callback, so that we can intercept the result before
+/// invoking the original on_complete callback. This allows us to set the
+/// LB token metadata and add client_stats to the call context.
+/// See \a pending_pick_complete() for details.
+struct pending_pick {
+ // Our on_complete closure and the original one.
+ grpc_closure on_complete;
+ grpc_closure* original_on_complete;
+ // The original pick.
+ grpc_lb_policy_pick_state* pick;
+ // Stats for client-side load reporting. Note that this holds a
+ // reference, which must be either passed on via context or unreffed.
grpc_grpclb_client_stats* client_stats;
-
- /* the LB token associated with the pick */
+ // The LB token associated with the pick. This is set via user_data in
+ // the pick.
grpc_mdelem lb_token;
-
- /* storage for the lb token initial metadata mdelem */
- grpc_linked_mdelem* lb_token_mdelem_storage;
-
- /* The RR instance related to the closure */
- grpc_lb_policy* rr_policy;
-
- /* The grpclb instance that created the wrapping. This instance is not owned,
- * reference counts are untouched. It's used only for logging purposes. */
- grpc_lb_policy* glb_policy;
-
- /* heap memory to be freed upon closure execution. */
- void* free_when_done;
-} wrapped_rr_closure_arg;
-
-/* The \a on_complete closure passed as part of the pick requires keeping a
- * reference to its associated round robin instance. We wrap this closure in
- * order to unref the round robin instance upon its invocation */
-static void wrapped_rr_closure(void* arg, grpc_error* error) {
- wrapped_rr_closure_arg* wc_arg = (wrapped_rr_closure_arg*)arg;
-
- GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
- GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
-
- if (wc_arg->rr_policy != nullptr) {
- /* if *target is nullptr, no pick has been made by the RR policy (eg, all
- * addresses failed to connect). There won't be any user_data/token
- * available */
- if (*wc_arg->target != nullptr) {
- if (!GRPC_MDISNULL(wc_arg->lb_token)) {
- initial_metadata_add_lb_token(wc_arg->initial_metadata,
- wc_arg->lb_token_mdelem_storage,
- GRPC_MDELEM_REF(wc_arg->lb_token));
- } else {
- gpr_log(
- GPR_ERROR,
- "[grpclb %p] No LB token for connected subchannel pick %p (from RR "
- "instance %p).",
- wc_arg->glb_policy, *wc_arg->target, wc_arg->rr_policy);
- abort();
- }
- // Pass on client stats via context. Passes ownership of the reference.
- GPR_ASSERT(wc_arg->client_stats != nullptr);
- wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
- wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
- } else {
- grpc_grpclb_client_stats_unref(wc_arg->client_stats);
- }
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", wc_arg->glb_policy,
- wc_arg->rr_policy);
- }
- GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "wrapped_rr_closure");
- }
- GPR_ASSERT(wc_arg->free_when_done != nullptr);
- gpr_free(wc_arg->free_when_done);
-}
-
-/* Linked list of pending pick requests. It stores all information needed to
- * eventually call (Round Robin's) pick() on them. They mainly stay pending
- * waiting for the RR policy to be created/updated.
- *
- * One particularity is the wrapping of the user-provided \a on_complete closure
- * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
- * order to correctly unref the RR policy instance upon completion of the pick.
- * See \a wrapped_rr_closure for details. */
-typedef struct pending_pick {
+ // The grpclb instance that created the wrapping. This instance is not owned,
+ // reference counts are untouched. It's used only for logging purposes.
+ glb_lb_policy* glb_policy;
+ // Next pending pick.
struct pending_pick* next;
+};
- /* original pick()'s arguments */
- grpc_lb_policy_pick_args pick_args;
-
- /* output argument where to store the pick()ed connected subchannel, or
- * nullptr upon error. */
- grpc_connected_subchannel** target;
-
- /* args for wrapped_on_complete */
- wrapped_rr_closure_arg wrapped_on_complete_arg;
-} pending_pick;
-
-static void add_pending_pick(pending_pick** root,
- const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
- grpc_call_context_element* context,
- grpc_closure* on_complete) {
- pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp));
- pp->next = *root;
- pp->pick_args = *pick_args;
- pp->target = target;
- pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
- pp->wrapped_on_complete_arg.target = target;
- pp->wrapped_on_complete_arg.context = context;
- pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
- pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
- pick_args->lb_token_mdelem_storage;
- pp->wrapped_on_complete_arg.free_when_done = pp;
- GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure,
- wrapped_rr_closure, &pp->wrapped_on_complete_arg,
- grpc_schedule_on_exec_ctx);
- *root = pp;
-}
-
-/* Same as the \a pending_pick struct but for ping operations */
-typedef struct pending_ping {
+/// A linked list of pending pings waiting for the RR policy to be created.
+struct pending_ping {
+ grpc_closure* on_initiate;
+ grpc_closure* on_ack;
struct pending_ping* next;
+};
- /* args for sending the ping */
- wrapped_rr_closure_arg* on_initiate;
- wrapped_rr_closure_arg* on_ack;
-} pending_ping;
-
-static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate,
- grpc_closure* on_ack) {
- pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
- if (on_initiate != nullptr) {
- pping->on_initiate =
- (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate));
- pping->on_initiate->wrapped_closure = on_initiate;
- pping->on_initiate->free_when_done = pping->on_initiate;
- GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure,
- &pping->on_initiate, grpc_schedule_on_exec_ctx);
- }
- if (on_ack != nullptr) {
- pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack));
- pping->on_ack->wrapped_closure = on_ack;
- pping->on_ack->free_when_done = pping->on_ack;
- GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure,
- &pping->on_ack, grpc_schedule_on_exec_ctx);
- }
- pping->next = *root;
- *root = pping;
-}
-
-/*
- * glb_lb_policy
- */
-typedef struct rr_connectivity_data rr_connectivity_data;
+} // namespace
-typedef struct glb_lb_policy {
+struct glb_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
@@ -331,6 +194,9 @@ typedef struct glb_lb_policy {
/** the RR policy to use of the backend servers returned by the LB server */
grpc_lb_policy* rr_policy;
+ grpc_closure on_rr_connectivity_changed;
+ grpc_connectivity_state rr_connectivity_state;
+
bool started_picking;
/** our connectivity state tracker */
@@ -365,11 +231,11 @@ typedef struct glb_lb_policy {
/** are we already watching the LB channel's connectivity? */
bool watching_lb_channel;
- /** is \a lb_call_retry_timer active? */
- bool retry_timer_active;
+ /** is the callback associated with \a lb_call_retry_timer pending? */
+ bool retry_timer_callback_pending;
- /** is \a lb_fallback_timer active? */
- bool fallback_timer_active;
+ /** is the callback associated with \a lb_fallback_timer pending? */
+ bool fallback_timer_callback_pending;
/** called upon changes to the LB channel's connectivity. */
grpc_closure lb_channel_on_connectivity_changed;
@@ -428,22 +294,94 @@ typedef struct glb_lb_policy {
/* Interval and timer for next client load report. */
grpc_millis client_stats_report_interval;
grpc_timer client_load_report_timer;
- bool client_load_report_timer_pending;
+ bool client_load_report_timer_callback_pending;
bool last_client_load_report_counters_were_zero;
/* Closure used for either the load report timer or the callback for
* completion of sending the load report. */
grpc_closure client_load_report_closure;
/* Client load report message payload. */
grpc_byte_buffer* client_load_report_payload;
-} glb_lb_policy;
-
-/* Keeps track and reacts to changes in connectivity of the RR instance */
-struct rr_connectivity_data {
- grpc_closure on_change;
- grpc_connectivity_state state;
- glb_lb_policy* glb_policy;
};
+/* add lb_token of selected subchannel (address) to the call's initial
+ * metadata */
+static grpc_error* initial_metadata_add_lb_token(
+ grpc_metadata_batch* initial_metadata,
+ grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) {
+ GPR_ASSERT(lb_token_mdelem_storage != nullptr);
+ GPR_ASSERT(!GRPC_MDISNULL(lb_token));
+ return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
+ lb_token);
+}
+
+static void destroy_client_stats(void* arg) {
+ grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats*)arg);
+}
+
+static void pending_pick_set_metadata_and_context(pending_pick* pp) {
+ /* if connected_subchannel is nullptr, no pick has been made by the RR
+ * policy (e.g., all addresses failed to connect). There won't be any
+ * user_data/token available */
+ if (pp->pick->connected_subchannel != nullptr) {
+ if (!GRPC_MDISNULL(pp->lb_token)) {
+ initial_metadata_add_lb_token(pp->pick->initial_metadata,
+ &pp->pick->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(pp->lb_token));
+ } else {
+ gpr_log(GPR_ERROR,
+ "[grpclb %p] No LB token for connected subchannel pick %p",
+ pp->glb_policy, pp->pick);
+ abort();
+ }
+ // Pass on client stats via context. Passes ownership of the reference.
+ GPR_ASSERT(pp->client_stats != nullptr);
+ pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
+ pp->client_stats;
+ pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
+ destroy_client_stats;
+ } else {
+ if (pp->client_stats != nullptr) {
+ grpc_grpclb_client_stats_unref(pp->client_stats);
+ }
+ }
+}
+
+/* The \a on_complete closure passed as part of the pick requires keeping a
+ * reference to its associated round robin instance. We wrap this closure in
+ * order to unref the round robin instance upon its invocation */
+static void pending_pick_complete(void* arg, grpc_error* error) {
+ pending_pick* pp = (pending_pick*)arg;
+ pending_pick_set_metadata_and_context(pp);
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
+ gpr_free(pp);
+}
+
+static pending_pick* pending_pick_create(glb_lb_policy* glb_policy,
+ grpc_lb_policy_pick_state* pick) {
+ pending_pick* pp = (pending_pick*)gpr_zalloc(sizeof(*pp));
+ pp->pick = pick;
+ pp->glb_policy = glb_policy;
+ GRPC_CLOSURE_INIT(&pp->on_complete, pending_pick_complete, pp,
+ grpc_schedule_on_exec_ctx);
+ pp->original_on_complete = pick->on_complete;
+ pp->pick->on_complete = &pp->on_complete;
+ return pp;
+}
+
+static void pending_pick_add(pending_pick** root, pending_pick* new_pp) {
+ new_pp->next = *root;
+ *root = new_pp;
+}
+
+static void pending_ping_add(pending_ping** root, grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
+ pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
+ pping->on_initiate = on_initiate;
+ pping->on_ack = on_ack;
+ pping->next = *root;
+ *root = pping;
+}
+
static bool is_server_valid(const grpc_grpclb_server* server, size_t idx,
bool log) {
if (server->drop) return false;
@@ -555,7 +493,6 @@ static grpc_lb_addresses* process_serverlist_locked(
gpr_free(uri);
user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
}
-
grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
false /* is_balancer */,
nullptr /* balancer_name */, user_data);
@@ -596,7 +533,6 @@ static void update_lb_connectivity_status_locked(
grpc_error* rr_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker);
-
/* The new connectivity status is a function of the previous one and the new
* input coming from the status of the RR policy.
*
@@ -626,7 +562,6 @@ static void update_lb_connectivity_status_locked(
*
* (*) This function mustn't be called during shutting down. */
GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
-
switch (rr_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN:
@@ -637,7 +572,6 @@ static void update_lb_connectivity_status_locked(
case GRPC_CHANNEL_READY:
GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
}
-
if (grpc_lb_glb_trace.enabled()) {
gpr_log(
GPR_INFO,
@@ -655,10 +589,8 @@ static void update_lb_connectivity_status_locked(
* cleanups this callback would otherwise be responsible for.
* If \a force_async is true, then we will manually schedule the
* completion callback even if the pick is available immediately. */
-static bool pick_from_internal_rr_locked(
- glb_lb_policy* glb_policy, const grpc_lb_policy_pick_args* pick_args,
- bool force_async, grpc_connected_subchannel** target,
- wrapped_rr_closure_arg* wc_arg) {
+static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
+ bool force_async, pending_pick* pp) {
// Check for drops if we are not using fallback backend addresses.
if (glb_policy->serverlist != nullptr) {
// Look at the index into the serverlist to see if we should drop this call.
@@ -668,57 +600,36 @@ static bool pick_from_internal_rr_locked(
glb_policy->serverlist_index = 0; // Wrap-around.
}
if (server->drop) {
- // Not using the RR policy, so unref it.
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p for drop", glb_policy,
- wc_arg->rr_policy);
- }
- GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
// Update client load reporting stats to indicate the number of
// dropped calls. Note that we have to do this here instead of in
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
- GPR_ASSERT(wc_arg->client_stats != nullptr);
+ GPR_ASSERT(glb_policy->client_stats != nullptr);
grpc_grpclb_client_stats_add_call_dropped_locked(
- server->load_balance_token, wc_arg->client_stats);
- grpc_grpclb_client_stats_unref(wc_arg->client_stats);
+ server->load_balance_token, glb_policy->client_stats);
if (force_async) {
- GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
- GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE);
- gpr_free(wc_arg->free_when_done);
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
+ gpr_free(pp);
return false;
}
- gpr_free(wc_arg->free_when_done);
+ gpr_free(pp);
return true;
}
}
+ // Set client_stats and user_data.
+ pp->client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats);
+ GPR_ASSERT(pp->pick->user_data == nullptr);
+ pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the RR policy.
- const bool pick_done = grpc_lb_policy_pick_locked(
- wc_arg->rr_policy, pick_args, target, wc_arg->context,
- (void**)&wc_arg->lb_token, &wc_arg->wrapper_closure);
+ bool pick_done = grpc_lb_policy_pick_locked(glb_policy->rr_policy, pp->pick);
if (pick_done) {
- /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Unreffing RR %p", glb_policy,
- wc_arg->rr_policy);
- }
- GRPC_LB_POLICY_UNREF(wc_arg->rr_policy, "glb_pick_sync");
- /* add the load reporting initial metadata */
- initial_metadata_add_lb_token(pick_args->initial_metadata,
- pick_args->lb_token_mdelem_storage,
- GRPC_MDELEM_REF(wc_arg->lb_token));
- // Pass on client stats via context. Passes ownership of the reference.
- GPR_ASSERT(wc_arg->client_stats != nullptr);
- wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
- wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
+ pending_pick_set_metadata_and_context(pp);
if (force_async) {
- GPR_ASSERT(wc_arg->wrapped_closure != nullptr);
- GRPC_CLOSURE_SCHED(wc_arg->wrapped_closure, GRPC_ERROR_NONE);
- gpr_free(wc_arg->free_when_done);
- return false;
+ GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
+ pick_done = false;
}
- gpr_free(wc_arg->free_when_done);
+ gpr_free(pp);
}
/* else, the pending pick will be registered and taken care of by the
* pending pick list inside the RR policy (glb_policy->rr_policy).
@@ -760,7 +671,7 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) {
gpr_free(args);
}
-static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error);
+static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error);
static void create_rr_locked(glb_lb_policy* glb_policy,
grpc_lb_policy_args* args) {
GPR_ASSERT(glb_policy->rr_policy == nullptr);
@@ -782,72 +693,46 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
glb_policy->base.request_reresolution = nullptr;
glb_policy->rr_policy = new_rr_policy;
grpc_error* rr_state_error = nullptr;
- const grpc_connectivity_state rr_state =
- grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy,
- &rr_state_error);
+ glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked(
+ glb_policy->rr_policy, &rr_state_error);
/* Connectivity state is a function of the RR policy updated/created */
- update_lb_connectivity_status_locked(glb_policy, rr_state, rr_state_error);
+ update_lb_connectivity_status_locked(
+ glb_policy, glb_policy->rr_connectivity_state, rr_state_error);
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */
grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties);
-
- /* Allocate the data for the tracking of the new RR policy's connectivity.
- * It'll be deallocated in glb_rr_connectivity_changed() */
- rr_connectivity_data* rr_connectivity =
- (rr_connectivity_data*)gpr_zalloc(sizeof(rr_connectivity_data));
- GRPC_CLOSURE_INIT(&rr_connectivity->on_change,
- glb_rr_connectivity_changed_locked, rr_connectivity,
+ GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed,
+ on_rr_connectivity_changed_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
- rr_connectivity->glb_policy = glb_policy;
- rr_connectivity->state = rr_state;
-
/* Subscribe to changes to the connectivity of the new RR */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb");
- grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy,
- &rr_connectivity->state,
- &rr_connectivity->on_change);
+ GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb");
+ grpc_lb_policy_notify_on_state_change_locked(
+ glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
+ &glb_policy->on_rr_connectivity_changed);
grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy);
-
- /* Update picks and pings in wait */
+ // Send pending picks to RR policy.
pending_pick* pp;
while ((pp = glb_policy->pending_picks)) {
glb_policy->pending_picks = pp->next;
- GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
- pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
- pp->wrapped_on_complete_arg.client_stats =
- grpc_grpclb_client_stats_ref(glb_policy->client_stats);
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] Pending pick about to (async) PICK from RR %p",
glb_policy, glb_policy->rr_policy);
}
- pick_from_internal_rr_locked(glb_policy, &pp->pick_args,
- true /* force_async */, pp->target,
- &pp->wrapped_on_complete_arg);
+ pick_from_internal_rr_locked(glb_policy, true /* force_async */, pp);
}
-
+ // Send pending pings to RR policy.
pending_ping* pping;
while ((pping = glb_policy->pending_pings)) {
glb_policy->pending_pings = pping->next;
- grpc_closure* on_initiate = nullptr;
- grpc_closure* on_ack = nullptr;
- if (pping->on_initiate != nullptr) {
- GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
- pping->on_initiate->rr_policy = glb_policy->rr_policy;
- on_initiate = &pping->on_initiate->wrapper_closure;
- }
- if (pping->on_ack != nullptr) {
- GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
- pping->on_ack->rr_policy = glb_policy->rr_policy;
- on_ack = &pping->on_ack->wrapper_closure;
- }
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
glb_policy, glb_policy->rr_policy);
}
- grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
+ grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, pping->on_initiate,
+ pping->on_ack);
gpr_free(pping);
}
}
@@ -873,31 +758,28 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) {
lb_policy_args_destroy(args);
}
-static void glb_rr_connectivity_changed_locked(void* arg, grpc_error* error) {
- rr_connectivity_data* rr_connectivity = (rr_connectivity_data*)arg;
- glb_lb_policy* glb_policy = rr_connectivity->glb_policy;
+static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (glb_policy->shutting_down) {
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
- gpr_free(rr_connectivity);
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
return;
}
- if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) {
+ if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
/* An RR policy that has transitioned into the SHUTDOWN connectivity state
* should not be considered for picks or updates: the SHUTDOWN state is a
* sink, policies can't transition back from it. .*/
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown");
glb_policy->rr_policy = nullptr;
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
- gpr_free(rr_connectivity);
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
return;
}
/* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
- update_lb_connectivity_status_locked(glb_policy, rr_connectivity->state,
- GRPC_ERROR_REF(error));
- /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
- grpc_lb_policy_notify_on_state_change_locked(glb_policy->rr_policy,
- &rr_connectivity->state,
- &rr_connectivity->on_change);
+ update_lb_connectivity_status_locked(
+ glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error));
+ /* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */
+ grpc_lb_policy_notify_on_state_change_locked(
+ glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
+ &glb_policy->on_rr_connectivity_changed);
}
static void destroy_balancer_name(void* balancer_name) {
@@ -1005,38 +887,27 @@ static void glb_destroy(grpc_lb_policy* pol) {
gpr_free(glb_policy);
}
-static void glb_shutdown_locked(grpc_lb_policy* pol) {
+static void glb_shutdown_locked(grpc_lb_policy* pol,
+ grpc_lb_policy* new_policy) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
glb_policy->shutting_down = true;
-
- /* We need a copy of the lb_call pointer because we can't cancell the call
- * while holding glb_policy->mu: lb_on_server_status_received, invoked due to
- * the cancel, needs to acquire that same lock */
- grpc_call* lb_call = glb_policy->lb_call;
-
/* glb_policy->lb_call and this local lb_call must be consistent at this point
* because glb_policy->lb_call is only assigned in lb_call_init_locked as part
* of query_for_backends_locked, which can only be invoked while
* glb_policy->shutting_down is false. */
- if (lb_call != nullptr) {
- grpc_call_cancel(lb_call, nullptr);
+ if (glb_policy->lb_call != nullptr) {
+ grpc_call_cancel(glb_policy->lb_call, nullptr);
/* lb_on_server_status_received will pick up the cancel and clean up */
}
- if (glb_policy->retry_timer_active) {
+ if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
- glb_policy->retry_timer_active = false;
}
- if (glb_policy->fallback_timer_active) {
+ if (glb_policy->fallback_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_fallback_timer);
- glb_policy->fallback_timer_active = false;
}
-
- pending_pick* pp = glb_policy->pending_picks;
- glb_policy->pending_picks = nullptr;
- pending_ping* pping = glb_policy->pending_pings;
- glb_policy->pending_pings = nullptr;
if (glb_policy->rr_policy != nullptr) {
+ grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr);
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
} else {
grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
@@ -1051,28 +922,35 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) {
}
grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "glb_shutdown");
-
+ // Clear pending picks.
+ pending_pick* pp = glb_policy->pending_picks;
+ glb_policy->pending_picks = nullptr;
while (pp != nullptr) {
pending_pick* next = pp->next;
- *pp->target = nullptr;
- GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
- GRPC_ERROR_REF(error));
- gpr_free(pp);
+ if (new_policy != nullptr) {
+ // Hand pick over to new policy.
+ if (pp->client_stats != nullptr) {
+ grpc_grpclb_client_stats_unref(pp->client_stats);
+ }
+ pp->pick->on_complete = pp->original_on_complete;
+ if (grpc_lb_policy_pick_locked(new_policy, pp->pick)) {
+ // Synchronous return; schedule callback.
+ GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE);
+ }
+ gpr_free(pp);
+ } else {
+ pp->pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
+ }
pp = next;
}
-
+ // Clear pending pings.
+ pending_ping* pping = glb_policy->pending_pings;
+ glb_policy->pending_pings = nullptr;
while (pping != nullptr) {
pending_ping* next = pping->next;
- if (pping->on_initiate != nullptr) {
- GRPC_CLOSURE_SCHED(&pping->on_initiate->wrapper_closure,
- GRPC_ERROR_REF(error));
- gpr_free(pping->on_initiate);
- }
- if (pping->on_ack != nullptr) {
- GRPC_CLOSURE_SCHED(&pping->on_ack->wrapper_closure,
- GRPC_ERROR_REF(error));
- gpr_free(pping->on_ack);
- }
+ GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error));
gpr_free(pping);
pping = next;
}
@@ -1090,16 +968,16 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) {
// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
// we invoke the completion closure and set *target to nullptr right here.
static void glb_cancel_pick_locked(grpc_lb_policy* pol,
- grpc_connected_subchannel** target,
+ grpc_lb_policy_pick_state* pick,
grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
pending_pick* pp = glb_policy->pending_picks;
glb_policy->pending_picks = nullptr;
while (pp != nullptr) {
pending_pick* next = pp->next;
- if (pp->target == target) {
- *target = nullptr;
- GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
+ if (pp->pick == pick) {
+ pick->connected_subchannel.reset();
+ GRPC_CLOSURE_SCHED(&pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
@@ -1109,7 +987,7 @@ static void glb_cancel_pick_locked(grpc_lb_policy* pol,
pp = next;
}
if (glb_policy->rr_policy != nullptr) {
- grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, target,
+ grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, pick,
GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
@@ -1134,9 +1012,9 @@ static void glb_cancel_picks_locked(grpc_lb_policy* pol,
glb_policy->pending_picks = nullptr;
while (pp != nullptr) {
pending_pick* next = pp->next;
- if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
+ if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- GRPC_CLOSURE_SCHED(&pp->wrapped_on_complete_arg.wrapper_closure,
+ GRPC_CLOSURE_SCHED(&pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick Cancelled", &error, 1));
} else {
@@ -1158,14 +1036,15 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy);
static void start_picking_locked(glb_lb_policy* glb_policy) {
/* start a timer to fall back */
if (glb_policy->lb_fallback_timeout_ms > 0 &&
- glb_policy->serverlist == nullptr && !glb_policy->fallback_timer_active) {
+ glb_policy->serverlist == nullptr &&
+ !glb_policy->fallback_timer_callback_pending) {
grpc_millis deadline =
grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms;
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer");
+ GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_fallback_timer");
GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked,
glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
- glb_policy->fallback_timer_active = true;
+ glb_policy->fallback_timer_callback_pending = true;
grpc_timer_init(&glb_policy->lb_fallback_timer, deadline,
&glb_policy->lb_on_fallback);
}
@@ -1183,19 +1062,9 @@ static void glb_exit_idle_locked(grpc_lb_policy* pol) {
}
static int glb_pick_locked(grpc_lb_policy* pol,
- const grpc_lb_policy_pick_args* pick_args,
- grpc_connected_subchannel** target,
- grpc_call_context_element* context, void** user_data,
- grpc_closure* on_complete) {
- if (pick_args->lb_token_mdelem_storage == nullptr) {
- *target = nullptr;
- GRPC_CLOSURE_SCHED(on_complete,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "No mdelem storage for the LB token. Load reporting "
- "won't work without it. Failing"));
- return 0;
- }
+ grpc_lb_policy_pick_state* pick) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
+ pending_pick* pp = pending_pick_create(glb_policy, pick);
bool pick_done = false;
if (glb_policy->rr_policy != nullptr) {
const grpc_connectivity_state rr_connectivity_state =
@@ -1203,7 +1072,7 @@ static int glb_pick_locked(grpc_lb_policy* pol,
nullptr);
// The glb_policy->rr_policy may have transitioned to SHUTDOWN but the
// callback registered to capture this event
- // (glb_rr_connectivity_changed_locked) may not have been invoked yet. We
+ // (on_rr_connectivity_changed_locked) may not have been invoked yet. We
// need to make sure we aren't trying to pick from a RR policy instance
// that's in shutdown.
if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
@@ -1213,32 +1082,16 @@ static int glb_pick_locked(grpc_lb_policy* pol,
glb_policy, glb_policy->rr_policy,
grpc_connectivity_state_name(rr_connectivity_state));
}
- add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
- on_complete);
+ pending_pick_add(&glb_policy->pending_picks, pp);
pick_done = false;
} else { // RR not in shutdown
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy,
glb_policy->rr_policy);
}
- GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
- wrapped_rr_closure_arg* wc_arg =
- (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(wrapped_rr_closure_arg));
- GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
- grpc_schedule_on_exec_ctx);
- wc_arg->rr_policy = glb_policy->rr_policy;
- wc_arg->target = target;
- wc_arg->context = context;
GPR_ASSERT(glb_policy->client_stats != nullptr);
- wc_arg->client_stats =
- grpc_grpclb_client_stats_ref(glb_policy->client_stats);
- wc_arg->wrapped_closure = on_complete;
- wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
- wc_arg->initial_metadata = pick_args->initial_metadata;
- wc_arg->free_when_done = wc_arg;
- wc_arg->glb_policy = pol;
- pick_done = pick_from_internal_rr_locked(
- glb_policy, pick_args, false /* force_async */, target, wc_arg);
+ pick_done =
+ pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp);
}
} else { // glb_policy->rr_policy == NULL
if (grpc_lb_glb_trace.enabled()) {
@@ -1246,8 +1099,7 @@ static int glb_pick_locked(grpc_lb_policy* pol,
"[grpclb %p] No RR policy. Adding to grpclb's pending picks",
glb_policy);
}
- add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
- on_complete);
+ pending_pick_add(&glb_policy->pending_picks, pp);
if (!glb_policy->started_picking) {
start_picking_locked(glb_policy);
}
@@ -1269,7 +1121,7 @@ static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
if (glb_policy->rr_policy) {
grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
} else {
- add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack);
+ pending_ping_add(&glb_policy->pending_pings, on_initiate, on_ack);
if (!glb_policy->started_picking) {
start_picking_locked(glb_policy);
}
@@ -1286,7 +1138,7 @@ static void glb_notify_on_state_change_locked(grpc_lb_policy* pol,
static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- glb_policy->retry_timer_active = false;
+ glb_policy->retry_timer_callback_pending = false;
if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr &&
error == GRPC_ERROR_NONE) {
if (grpc_lb_glb_trace.enabled()) {
@@ -1294,42 +1146,42 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
}
query_for_backends_locked(glb_policy);
}
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_retry_timer");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer");
}
static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
if (glb_policy->started_picking && glb_policy->updating_lb_call) {
- if (glb_policy->retry_timer_active) {
+ if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
}
if (!glb_policy->shutting_down) start_picking_locked(glb_policy);
glb_policy->updating_lb_call = false;
} else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
- grpc_millis next_try = glb_policy->lb_call_backoff->Step();
+ grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime();
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
glb_policy);
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
if (timeout > 0) {
gpr_log(GPR_DEBUG,
- "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.",
+ "[grpclb %p] ... retry LB call after %" PRIuPTR "ms.",
glb_policy, timeout);
} else {
- gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
+ gpr_log(GPR_DEBUG, "[grpclb %p] ... retry LB call immediately.",
glb_policy);
}
}
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
+ GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer");
GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
lb_call_on_retry_timer_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
- glb_policy->retry_timer_active = true;
+ glb_policy->retry_timer_callback_pending = true;
grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry);
}
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
- "lb_on_server_status_received_locked");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base,
+ "lb_on_server_status_received_locked");
}
static void send_client_load_report_locked(void* arg, grpc_error* error);
@@ -1351,8 +1203,8 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) {
grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
glb_policy->client_load_report_payload = nullptr;
if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) {
- glb_policy->client_load_report_timer_pending = false;
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report");
+ glb_policy->client_load_report_timer_callback_pending = false;
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
if (glb_policy->lb_call == nullptr) {
maybe_restart_lb_call(glb_policy);
}
@@ -1392,8 +1244,8 @@ static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
static void send_client_load_report_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) {
- glb_policy->client_load_report_timer_pending = false;
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "client_load_report");
+ glb_policy->client_load_report_timer_callback_pending = false;
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
if (glb_policy->lb_call == nullptr) {
maybe_restart_lb_call(glb_policy);
}
@@ -1503,7 +1355,7 @@ static void lb_call_destroy_locked(glb_lb_policy* glb_policy) {
grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
grpc_slice_unref_internal(glb_policy->lb_call_status_details);
- if (glb_policy->client_load_report_timer_pending) {
+ if (glb_policy->client_load_report_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->client_load_report_timer);
}
}
@@ -1546,10 +1398,8 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op->flags = 0;
op->reserved = nullptr;
op++;
- /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
- * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
- "lb_on_sent_initial_request_locked");
+ /* take a ref to be released in lb_on_sent_initial_request_locked() */
+ GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_sent_initial_request_locked");
call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_sent_initial_request);
@@ -1565,10 +1415,8 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op->flags = 0;
op->reserved = nullptr;
op++;
- /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
- * count goes to zero) to be unref'd in lb_on_server_status_received_locked */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
- "lb_on_server_status_received_locked");
+ /* take a ref to be released in lb_on_server_status_received_locked() */
+ GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_server_status_received_locked");
call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_server_status_received);
@@ -1580,9 +1428,8 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op->flags = 0;
op->reserved = nullptr;
op++;
- /* take another weak ref to be unref'd/reused in
- * lb_on_response_received_locked */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked");
+ /* take a ref to be unref'd/reused in lb_on_response_received_locked() */
+ GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_response_received_locked");
call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received);
@@ -1597,8 +1444,7 @@ static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) {
if (glb_policy->client_load_report_payload != nullptr) {
do_send_client_load_report_locked(glb_policy);
}
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
- "lb_on_sent_initial_request_locked");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "lb_on_sent_initial_request_locked");
}
static void lb_on_response_received_locked(void* arg, grpc_error* error) {
@@ -1630,11 +1476,9 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
"client load reporting interval = %" PRIdPTR " milliseconds",
glb_policy, glb_policy->client_stats_report_interval);
}
- /* take a weak ref (won't prevent calling of \a glb_shutdown() if the
- * strong ref count goes to zero) to be unref'd in
- * send_client_load_report_locked() */
- glb_policy->client_load_report_timer_pending = true;
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report");
+ /* take a ref to be unref'd in send_client_load_report_locked() */
+ glb_policy->client_load_report_timer_callback_pending = true;
+ GRPC_LB_POLICY_REF(&glb_policy->base, "client_load_report");
schedule_next_client_load_report(glb_policy);
} else if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
@@ -1682,9 +1526,8 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
/* or dispose of the fallback */
grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses = nullptr;
- if (glb_policy->fallback_timer_active) {
+ if (glb_policy->fallback_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_fallback_timer);
- glb_policy->fallback_timer_active = false;
}
}
/* and update the copy in the glb_lb_policy instance. This
@@ -1717,27 +1560,27 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
op->flags = 0;
op->reserved = nullptr;
op++;
- /* reuse the "lb_on_response_received_locked" weak ref taken in
+ /* reuse the "lb_on_response_received_locked" ref taken in
* query_for_backends_locked() */
const grpc_call_error call_error = grpc_call_start_batch_and_execute(
glb_policy->lb_call, ops, (size_t)(op - ops),
&glb_policy->lb_on_response_received); /* loop */
GPR_ASSERT(GRPC_CALL_OK == call_error);
} else {
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
- "lb_on_response_received_locked_shutdown");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base,
+ "lb_on_response_received_locked_shutdown");
}
} else { /* empty payload: call cancelled. */
- /* dispose of the "lb_on_response_received_locked" weak ref taken in
+ /* dispose of the "lb_on_response_received_locked" ref taken in
* query_for_backends_locked() and reused in every reception loop */
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
- "lb_on_response_received_locked_empty_payload");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base,
+ "lb_on_response_received_locked_empty_payload");
}
}
static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- glb_policy->fallback_timer_active = false;
+ glb_policy->fallback_timer_callback_pending = false;
/* If we receive a serverlist after the timer fires but before this callback
* actually runs, don't fall back. */
if (glb_policy->serverlist == nullptr) {
@@ -1751,7 +1594,7 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
rr_handover_locked(glb_policy);
}
}
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, "grpclb_fallback_timer");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
}
static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
@@ -1772,7 +1615,7 @@ static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
// If the load report timer is still pending, we wait for it to be
// called before restarting the call. Otherwise, we restart the call
// here.
- if (!glb_policy->client_load_report_timer_pending) {
+ if (!glb_policy->client_load_report_timer_callback_pending) {
maybe_restart_lb_call(glb_policy);
}
}
@@ -1835,7 +1678,7 @@ static void glb_update_locked(grpc_lb_policy* policy,
grpc_channel_get_channel_stack(glb_policy->lb_channel));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
glb_policy->watching_lb_channel = true;
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity");
+ GRPC_LB_POLICY_REF(&glb_policy->base, "watch_lb_channel_connectivity");
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
@@ -1882,9 +1725,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
// lb_on_server_status_received() will pick up the cancel and reinit
// lb_call.
} else if (glb_policy->started_picking) {
- if (glb_policy->retry_timer_active) {
+ if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
- glb_policy->retry_timer_active = false;
}
start_picking_locked(glb_policy);
}
@@ -1892,8 +1734,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
case GRPC_CHANNEL_SHUTDOWN:
done:
glb_policy->watching_lb_channel = false;
- GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
- "watch_lb_channel_connectivity_cb_shutdown");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base,
+ "watch_lb_channel_connectivity_cb_shutdown");
break;
}
}