aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc64
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc865
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc47
3 files changed, 512 insertions, 464 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
index 1708d81e61..4596f90745 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
@@ -69,11 +69,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
call_data* calld = (call_data*)elem->call_data;
// Get stats object from context and take a ref.
GPR_ASSERT(args->context != nullptr);
- GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr);
- calld->client_stats = grpc_grpclb_client_stats_ref(
- (grpc_grpclb_client_stats*)args->context[GRPC_GRPCLB_CLIENT_STATS].value);
- // Record call started.
- grpc_grpclb_client_stats_add_call_started(calld->client_stats);
+ if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) {
+ calld->client_stats = grpc_grpclb_client_stats_ref(
+ (grpc_grpclb_client_stats*)args->context[GRPC_GRPCLB_CLIENT_STATS]
+ .value);
+ // Record call started.
+ grpc_grpclb_client_stats_add_call_started(calld->client_stats);
+ }
return GRPC_ERROR_NONE;
}
@@ -81,36 +83,40 @@ static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = (call_data*)elem->call_data;
- // Record call finished, optionally setting client_failed_to_send and
- // received.
- grpc_grpclb_client_stats_add_call_finished(
- !calld->send_initial_metadata_succeeded /* client_failed_to_send */,
- calld->recv_initial_metadata_succeeded /* known_received */,
- calld->client_stats);
- // All done, so unref the stats object.
- grpc_grpclb_client_stats_unref(calld->client_stats);
+ if (calld->client_stats != nullptr) {
+ // Record call finished, optionally setting client_failed_to_send and
+ // received.
+ grpc_grpclb_client_stats_add_call_finished(
+ !calld->send_initial_metadata_succeeded /* client_failed_to_send */,
+ calld->recv_initial_metadata_succeeded /* known_received */,
+ calld->client_stats);
+ // All done, so unref the stats object.
+ grpc_grpclb_client_stats_unref(calld->client_stats);
+ }
}
static void start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = (call_data*)elem->call_data;
GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0);
- // Intercept send_initial_metadata.
- if (batch->send_initial_metadata) {
- calld->original_on_complete_for_send = batch->on_complete;
- GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, calld,
- grpc_schedule_on_exec_ctx);
- batch->on_complete = &calld->on_complete_for_send;
- }
- // Intercept recv_initial_metadata.
- if (batch->recv_initial_metadata) {
- calld->original_recv_initial_metadata_ready =
- batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
- GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
- recv_initial_metadata_ready, calld,
- grpc_schedule_on_exec_ctx);
- batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
- &calld->recv_initial_metadata_ready;
+ if (calld->client_stats != nullptr) {
+ // Intercept send_initial_metadata.
+ if (batch->send_initial_metadata) {
+ calld->original_on_complete_for_send = batch->on_complete;
+ GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send,
+ calld, grpc_schedule_on_exec_ctx);
+ batch->on_complete = &calld->on_complete_for_send;
+ }
+ // Intercept recv_initial_metadata.
+ if (batch->recv_initial_metadata) {
+ calld->original_recv_initial_metadata_ready =
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
+ recv_initial_metadata_ready, calld,
+ grpc_schedule_on_exec_ctx);
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->recv_initial_metadata_ready;
+ }
}
// Chain to next filter.
grpc_call_next_op(elem, batch);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 06ae79041e..6393884127 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -169,25 +169,78 @@ struct pending_ping {
} // namespace
-struct glb_lb_policy {
- /** base policy: must be first */
+typedef struct glb_lb_call_data {
+ struct glb_lb_policy* glb_policy;
+ // todo refactor
+ gpr_refcount refs;
+
+ /** The streaming call to the LB server. Always non-NULL. */
+ grpc_call* lb_call;
+
+ /** The initial metadata received from the LB server. */
+ grpc_metadata_array lb_initial_metadata_recv;
+
+ /** The message sent to the LB server. It's used to query for backends (the
+ * value may vary if the LB server indicates a redirect) or send client load
+ * report. */
+ grpc_byte_buffer* send_message_payload;
+ /** The callback after the initial request is sent. */
+ grpc_closure lb_on_sent_initial_request;
+
+ /** The response received from the LB server, if any. */
+ grpc_byte_buffer* recv_message_payload;
+ /** The callback to process the response received from the LB server. */
+ grpc_closure lb_on_response_received;
+ bool seen_initial_response;
+
+ /** The callback to process the status received from the LB server, which
+ * signals the end of the LB call. */
+ grpc_closure lb_on_server_status_received;
+ /** The trailing metadata from the LB server. */
+ grpc_metadata_array lb_trailing_metadata_recv;
+ /** The call status code and details. */
+ grpc_status_code lb_call_status;
+ grpc_slice lb_call_status_details;
+
+ /** The stats for client-side load reporting associated with this LB call.
+ * Created after the first serverlist is received. */
+ grpc_grpclb_client_stats* client_stats;
+ /** The interval and timer for next client load report. */
+ grpc_millis client_stats_report_interval;
+ grpc_timer client_load_report_timer;
+ bool client_load_report_timer_callback_pending;
+ bool last_client_load_report_counters_were_zero;
+ bool client_load_report_is_due;
+ /** The closure used for either the load report timer or the callback for
+ * completion of sending the load report. */
+ grpc_closure client_load_report_closure;
+} glb_lb_call_data;
+
+typedef struct glb_lb_policy {
+ /** Base policy: must be first. */
grpc_lb_policy base;
- /** who the client is trying to communicate with */
+ /** Who the client is trying to communicate with. */
const char* server_name;
+
+ /** Channel related data that will be propagated to the internal RR policy. */
grpc_client_channel_factory* cc_factory;
grpc_channel_args* args;
- /** timeout in milliseconds for the LB call. 0 means no deadline. */
- int lb_call_timeout_ms;
-
- /** timeout in milliseconds for before using fallback backend addresses.
+ /** Timeout in milliseconds for before using fallback backend addresses.
* 0 means not using fallback. */
int lb_fallback_timeout_ms;
- /** for communicating with the LB server */
+ /** The channel for communicating with the LB server. */
grpc_channel* lb_channel;
+ /** The data associated with the current LB call. It holds a ref to this LB
+ * policy. It's initialized every time we query for backends. It's reset to
+ * NULL whenever the current LB call is no longer needed (e.g., the LB policy
+ * is shutting down, or the LB call has ended). A non-NULL lb_calld always
+ * contains a non-NULL lb_call. */
+ glb_lb_call_data* lb_calld;
+
/** response generator to inject address updates into \a lb_channel */
grpc_fake_resolver_response_generator* response_generator;
@@ -225,9 +278,6 @@ struct glb_lb_policy {
bool shutting_down;
- /** are we currently updating lb_call? */
- bool updating_lb_call;
-
/** are we already watching the LB channel's connectivity? */
bool watching_lb_channel;
@@ -243,65 +293,70 @@ struct glb_lb_policy {
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
- /* Finished sending initial request. */
- grpc_closure lb_on_sent_initial_request;
-
- /* Status from the LB server has been received. This signals the end of the LB
- * call. */
- grpc_closure lb_on_server_status_received;
-
- /* A response from the LB server has been received. Process it */
- grpc_closure lb_on_response_received;
-
- /* LB call retry timer callback. */
- grpc_closure lb_on_call_retry;
-
- /* LB fallback timer callback. */
- grpc_closure lb_on_fallback;
-
- grpc_call* lb_call; /* streaming call to the LB server, */
-
- grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
- grpc_metadata_array
- lb_trailing_metadata_recv; /* trailing MD from LB server */
-
- /* what's being sent to the LB server. Note that its value may vary if the LB
- * server indicates a redirect. */
- grpc_byte_buffer* lb_request_payload;
-
- /* response the LB server, if any. Processed in lb_on_response_received() */
- grpc_byte_buffer* lb_response_payload;
-
- /* call status code and details, set in lb_on_server_status_received() */
- grpc_status_code lb_call_status;
- grpc_slice lb_call_status_details;
/** LB call retry backoff state */
grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff;
+ /** timeout in milliseconds for the LB call. 0 means no deadline. */
+ int lb_call_timeout_ms;
+
/** LB call retry timer */
grpc_timer lb_call_retry_timer;
+ /** LB call retry timer callback */
+ grpc_closure lb_on_call_retry;
/** LB fallback timer */
grpc_timer lb_fallback_timer;
+ /** LB fallback timer callback */
+ grpc_closure lb_on_fallback;
+} glb_lb_policy;
- bool initial_request_sent;
- bool seen_initial_response;
+static void glb_lb_call_data_ref(glb_lb_call_data* lb_calld,
+ const char* reason) {
+ gpr_ref_non_zero(&lb_calld->refs);
+ if (grpc_lb_glb_trace.enabled()) {
+ const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count);
+ gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p REF %lu->%lu (%s)",
+ grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld,
+ (unsigned long)(count - 1), (unsigned long)count, reason);
+ }
+}
- /* Stats for client-side load reporting. Should be unreffed and
- * recreated whenever lb_call is replaced. */
- grpc_grpclb_client_stats* client_stats;
- /* Interval and timer for next client load report. */
- grpc_millis client_stats_report_interval;
- grpc_timer client_load_report_timer;
- bool client_load_report_timer_callback_pending;
- bool last_client_load_report_counters_were_zero;
- /* Closure used for either the load report timer or the callback for
- * completion of sending the load report. */
- grpc_closure client_load_report_closure;
- /* Client load report message payload. */
- grpc_byte_buffer* client_load_report_payload;
-};
+static void glb_lb_call_data_unref(glb_lb_call_data* lb_calld,
+ const char* reason) {
+ const bool done = gpr_unref(&lb_calld->refs);
+ if (grpc_lb_glb_trace.enabled()) {
+ const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count);
+ gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p UNREF %lu->%lu (%s)",
+ grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld,
+ (unsigned long)(count + 1), (unsigned long)count, reason);
+ }
+ if (done) {
+ GPR_ASSERT(lb_calld->lb_call != nullptr);
+ grpc_call_unref(lb_calld->lb_call);
+ grpc_metadata_array_destroy(&lb_calld->lb_initial_metadata_recv);
+ grpc_metadata_array_destroy(&lb_calld->lb_trailing_metadata_recv);
+ grpc_byte_buffer_destroy(lb_calld->send_message_payload);
+ grpc_byte_buffer_destroy(lb_calld->recv_message_payload);
+ grpc_slice_unref_internal(lb_calld->lb_call_status_details);
+ if (lb_calld->client_stats != nullptr) {
+ grpc_grpclb_client_stats_unref(lb_calld->client_stats);
+ }
+ GRPC_LB_POLICY_UNREF(&lb_calld->glb_policy->base, "lb_calld");
+ gpr_free(lb_calld);
+ }
+}
+
+static void lb_call_data_shutdown(glb_lb_policy* glb_policy) {
+ GPR_ASSERT(glb_policy->lb_calld != nullptr);
+ GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr);
+ // lb_on_server_status_received will complete the cancellation and clean up.
+ grpc_call_cancel(glb_policy->lb_calld->lb_call, nullptr);
+ if (glb_policy->lb_calld->client_load_report_timer_callback_pending) {
+ grpc_timer_cancel(&glb_policy->lb_calld->client_load_report_timer);
+ }
+ glb_policy->lb_calld = nullptr;
+}
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
@@ -334,11 +389,12 @@ static void pending_pick_set_metadata_and_context(pending_pick* pp) {
abort();
}
// Pass on client stats via context. Passes ownership of the reference.
- GPR_ASSERT(pp->client_stats != nullptr);
- pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
- pp->client_stats;
- pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
- destroy_client_stats;
+ if (pp->client_stats != nullptr) {
+ pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
+ pp->client_stats;
+ pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
+ destroy_client_stats;
+ }
} else {
if (pp->client_stats != nullptr) {
grpc_grpclb_client_stats_unref(pp->client_stats);
@@ -605,9 +661,11 @@ static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
- GPR_ASSERT(glb_policy->client_stats != nullptr);
- grpc_grpclb_client_stats_add_call_dropped_locked(
- server->load_balance_token, glb_policy->client_stats);
+ if (glb_policy->lb_calld != nullptr &&
+ glb_policy->lb_calld->client_stats != nullptr) {
+ grpc_grpclb_client_stats_add_call_dropped_locked(
+ server->load_balance_token, glb_policy->lb_calld->client_stats);
+ }
if (force_async) {
GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
@@ -618,7 +676,11 @@ static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
}
}
// Set client_stats and user_data.
- pp->client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats);
+ if (glb_policy->lb_calld != nullptr &&
+ glb_policy->lb_calld->client_stats != nullptr) {
+ pp->client_stats =
+ grpc_grpclb_client_stats_ref(glb_policy->lb_calld->client_stats);
+ }
GPR_ASSERT(pp->pick->user_data == nullptr);
pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the RR policy.
@@ -872,9 +934,6 @@ static void glb_destroy(grpc_lb_policy* pol) {
GPR_ASSERT(glb_policy->pending_pings == nullptr);
gpr_free((void*)glb_policy->server_name);
grpc_channel_args_destroy(glb_policy->args);
- if (glb_policy->client_stats != nullptr) {
- grpc_grpclb_client_stats_unref(glb_policy->client_stats);
- }
grpc_connectivity_state_destroy(&glb_policy->state_tracker);
if (glb_policy->serverlist != nullptr) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
@@ -892,13 +951,8 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
glb_policy->shutting_down = true;
- /* glb_policy->lb_call and this local lb_call must be consistent at this point
- * because glb_policy->lb_call is only assigned in lb_call_init_locked as part
- * of query_for_backends_locked, which can only be invoked while
- * glb_policy->shutting_down is false. */
- if (glb_policy->lb_call != nullptr) {
- grpc_call_cancel(glb_policy->lb_call, nullptr);
- /* lb_on_server_status_received will pick up the cancel and clean up */
+ if (glb_policy->lb_calld != nullptr) {
+ lb_call_data_shutdown(glb_policy);
}
if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
@@ -1048,7 +1102,6 @@ static void start_picking_locked(glb_lb_policy* glb_policy) {
grpc_timer_init(&glb_policy->lb_fallback_timer, deadline,
&glb_policy->lb_on_fallback);
}
-
glb_policy->started_picking = true;
glb_policy->lb_call_backoff->Reset();
query_for_backends_locked(glb_policy);
@@ -1089,7 +1142,6 @@ static int glb_pick_locked(grpc_lb_policy* pol,
gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy,
glb_policy->rr_policy);
}
- GPR_ASSERT(glb_policy->client_stats != nullptr);
pick_done =
pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp);
}
@@ -1139,8 +1191,8 @@ static void glb_notify_on_state_change_locked(grpc_lb_policy* pol,
static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
glb_policy->retry_timer_callback_pending = false;
- if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr &&
- error == GRPC_ERROR_NONE) {
+ if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE &&
+ glb_policy->lb_calld == nullptr) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy);
}
@@ -1149,84 +1201,55 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer");
}
-static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
- if (glb_policy->started_picking && glb_policy->updating_lb_call) {
- if (glb_policy->retry_timer_callback_pending) {
- grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
- }
- if (!glb_policy->shutting_down) start_picking_locked(glb_policy);
- glb_policy->updating_lb_call = false;
- } else if (!glb_policy->shutting_down) {
- /* if we aren't shutting down, restart the LB client call after some time */
- grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime();
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
+static void start_lb_call_retry_timer_locked(glb_lb_policy* glb_policy) {
+ grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime();
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
+ glb_policy);
+ grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
+ if (timeout > 0) {
+ gpr_log(GPR_DEBUG,
+ "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.",
+ glb_policy, timeout);
+ } else {
+ gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
glb_policy);
- grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
- if (timeout > 0) {
- gpr_log(GPR_DEBUG,
- "[grpclb %p] ... retry LB call after %" PRIuPTR "ms.",
- glb_policy, timeout);
- } else {
- gpr_log(GPR_DEBUG, "[grpclb %p] ... retry LB call immediately.",
- glb_policy);
- }
}
- GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer");
- GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
- lb_call_on_retry_timer_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- glb_policy->retry_timer_callback_pending = true;
- grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
- &glb_policy->lb_on_call_retry);
}
- GRPC_LB_POLICY_UNREF(&glb_policy->base,
- "lb_on_server_status_received_locked");
+ GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer");
+ GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
+ lb_call_on_retry_timer_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
+ glb_policy->retry_timer_callback_pending = true;
+ grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
+ &glb_policy->lb_on_call_retry);
}
-static void send_client_load_report_locked(void* arg, grpc_error* error);
+static void maybe_send_client_load_report_locked(void* arg, grpc_error* error);
-static void schedule_next_client_load_report(glb_lb_policy* glb_policy) {
+static void schedule_next_client_load_report(glb_lb_call_data* lb_calld) {
const grpc_millis next_client_load_report_time =
- grpc_core::ExecCtx::Get()->Now() +
- glb_policy->client_stats_report_interval;
- GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
- send_client_load_report_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_timer_init(&glb_policy->client_load_report_timer,
+ grpc_core::ExecCtx::Get()->Now() + lb_calld->client_stats_report_interval;
+ GRPC_CLOSURE_INIT(
+ &lb_calld->client_load_report_closure,
+ maybe_send_client_load_report_locked, lb_calld,
+ grpc_combiner_scheduler(lb_calld->glb_policy->base.combiner));
+ grpc_timer_init(&lb_calld->client_load_report_timer,
next_client_load_report_time,
- &glb_policy->client_load_report_closure);
+ &lb_calld->client_load_report_closure);
+ lb_calld->client_load_report_timer_callback_pending = true;
}
static void client_load_report_done_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
- glb_policy->client_load_report_payload = nullptr;
- if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) {
- glb_policy->client_load_report_timer_callback_pending = false;
- GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
- if (glb_policy->lb_call == nullptr) {
- maybe_restart_lb_call(glb_policy);
- }
+ glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
+ glb_lb_policy* glb_policy = lb_calld->glb_policy;
+ grpc_byte_buffer_destroy(lb_calld->send_message_payload);
+ lb_calld->send_message_payload = nullptr;
+ if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) {
+ glb_lb_call_data_unref(lb_calld, "client_load_report");
return;
}
- schedule_next_client_load_report(glb_policy);
-}
-
-static void do_send_client_load_report_locked(glb_lb_policy* glb_policy) {
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_SEND_MESSAGE;
- op.data.send_message.send_message = glb_policy->client_load_report_payload;
- GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
- client_load_report_done_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure);
- if (call_error != GRPC_CALL_OK) {
- gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
- }
+ schedule_next_client_load_report(lb_calld);
}
static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
@@ -1241,341 +1264,377 @@ static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
(drop_entries == nullptr || drop_entries->num_entries == 0);
}
-static void send_client_load_report_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) {
- glb_policy->client_load_report_timer_callback_pending = false;
- GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report");
- if (glb_policy->lb_call == nullptr) {
- maybe_restart_lb_call(glb_policy);
- }
- return;
- }
+static void send_client_load_report_locked(glb_lb_call_data* lb_calld) {
+ glb_lb_policy* glb_policy = lb_calld->glb_policy;
// Construct message payload.
- GPR_ASSERT(glb_policy->client_load_report_payload == nullptr);
+ GPR_ASSERT(lb_calld->send_message_payload == nullptr);
grpc_grpclb_request* request =
- grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
+ grpc_grpclb_load_report_request_create_locked(lb_calld->client_stats);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (load_report_counters_are_zero(request)) {
- if (glb_policy->last_client_load_report_counters_were_zero) {
+ if (lb_calld->last_client_load_report_counters_were_zero) {
grpc_grpclb_request_destroy(request);
- schedule_next_client_load_report(glb_policy);
+ schedule_next_client_load_report(lb_calld);
return;
}
- glb_policy->last_client_load_report_counters_were_zero = true;
+ lb_calld->last_client_load_report_counters_were_zero = true;
} else {
- glb_policy->last_client_load_report_counters_were_zero = false;
+ lb_calld->last_client_load_report_counters_were_zero = false;
}
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
- glb_policy->client_load_report_payload =
+ lb_calld->send_message_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
+ // Send the report.
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_SEND_MESSAGE;
+ op.data.send_message.send_message = lb_calld->send_message_payload;
+ GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure,
+ client_load_report_done_locked, lb_calld,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ lb_calld->lb_call, &op, 1, &lb_calld->client_load_report_closure);
+ if (call_error != GRPC_CALL_OK) {
+ gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ }
+}
+
+static void maybe_send_client_load_report_locked(void* arg, grpc_error* error) {
+ glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
+ glb_lb_policy* glb_policy = lb_calld->glb_policy;
+ lb_calld->client_load_report_timer_callback_pending = false;
+ if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) {
+ glb_lb_call_data_unref(lb_calld, "client_load_report");
+ return;
+ }
// If we've already sent the initial request, then we can go ahead and send
// the load report. Otherwise, we need to wait until the initial request has
- // been sent to send this (see lb_on_sent_initial_request_locked() below).
- if (glb_policy->initial_request_sent) {
- do_send_client_load_report_locked(glb_policy);
+ // been sent to send this (see lb_on_sent_initial_request_locked()).
+ if (lb_calld->send_message_payload == nullptr) {
+ send_client_load_report_locked(lb_calld);
+ } else {
+ lb_calld->client_load_report_is_due = true;
}
}
static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error);
static void lb_on_server_status_received_locked(void* arg, grpc_error* error);
static void lb_on_response_received_locked(void* arg, grpc_error* error);
-static void lb_call_init_locked(glb_lb_policy* glb_policy) {
+static glb_lb_call_data* lb_call_data_create_locked(glb_lb_policy* glb_policy) {
+ GPR_ASSERT(!glb_policy->shutting_down);
+ // Init the LB call. Note that the LB call will progress every time there's
+ // activity in glb_policy->base.interested_parties, which is comprised of the
+ // polling entities from client_channel.
GPR_ASSERT(glb_policy->server_name != nullptr);
GPR_ASSERT(glb_policy->server_name[0] != '\0');
- GPR_ASSERT(glb_policy->lb_call == nullptr);
- GPR_ASSERT(!glb_policy->shutting_down);
-
- /* Note the following LB call progresses every time there's activity in \a
- * glb_policy->base.interested_parties, which is comprised of the polling
- * entities from \a client_channel. */
grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name);
grpc_millis deadline =
glb_policy->lb_call_timeout_ms == 0
? GRPC_MILLIS_INF_FUTURE
: grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms;
- glb_policy->lb_call = grpc_channel_create_pollset_set_call(
+ glb_lb_call_data* lb_calld = (glb_lb_call_data*)gpr_zalloc(sizeof(*lb_calld));
+ lb_calld->lb_call = grpc_channel_create_pollset_set_call(
glb_policy->lb_channel, nullptr, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties,
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
&host, deadline, nullptr);
grpc_slice_unref_internal(host);
-
- if (glb_policy->client_stats != nullptr) {
- grpc_grpclb_client_stats_unref(glb_policy->client_stats);
- }
- glb_policy->client_stats = grpc_grpclb_client_stats_create();
-
- grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
- grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
-
+ // Init the LB call request payload.
grpc_grpclb_request* request =
grpc_grpclb_request_create(glb_policy->server_name);
grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
- glb_policy->lb_request_payload =
+ lb_calld->send_message_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
-
- GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request,
- lb_on_sent_initial_request_locked, glb_policy,
+ // Init other data associated with the LB call.
+ lb_calld->glb_policy = glb_policy;
+ gpr_ref_init(&lb_calld->refs, 1);
+ grpc_metadata_array_init(&lb_calld->lb_initial_metadata_recv);
+ grpc_metadata_array_init(&lb_calld->lb_trailing_metadata_recv);
+ GRPC_CLOSURE_INIT(&lb_calld->lb_on_sent_initial_request,
+ lb_on_sent_initial_request_locked, lb_calld,
grpc_combiner_scheduler(glb_policy->base.combiner));
- GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
- lb_on_server_status_received_locked, glb_policy,
+ GRPC_CLOSURE_INIT(&lb_calld->lb_on_response_received,
+ lb_on_response_received_locked, lb_calld,
grpc_combiner_scheduler(glb_policy->base.combiner));
- GRPC_CLOSURE_INIT(&glb_policy->lb_on_response_received,
- lb_on_response_received_locked, glb_policy,
+ GRPC_CLOSURE_INIT(&lb_calld->lb_on_server_status_received,
+ lb_on_server_status_received_locked, lb_calld,
grpc_combiner_scheduler(glb_policy->base.combiner));
-
- grpc_core::BackOff::Options backoff_options;
- backoff_options
- .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
- .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
- .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
- .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
-
- glb_policy->lb_call_backoff.Init(backoff_options);
-
- glb_policy->initial_request_sent = false;
- glb_policy->seen_initial_response = false;
- glb_policy->last_client_load_report_counters_were_zero = false;
-}
-
-static void lb_call_destroy_locked(glb_lb_policy* glb_policy) {
- GPR_ASSERT(glb_policy->lb_call != nullptr);
- grpc_call_unref(glb_policy->lb_call);
- glb_policy->lb_call = nullptr;
-
- grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv);
- grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv);
-
- grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
- grpc_slice_unref_internal(glb_policy->lb_call_status_details);
-
- if (glb_policy->client_load_report_timer_callback_pending) {
- grpc_timer_cancel(&glb_policy->client_load_report_timer);
- }
+ // Hold a ref to the glb_policy.
+ GRPC_LB_POLICY_REF(&glb_policy->base, "lb_calld");
+ return lb_calld;
}
/*
* Auxiliary functions and LB client callbacks.
*/
+
static void query_for_backends_locked(glb_lb_policy* glb_policy) {
GPR_ASSERT(glb_policy->lb_channel != nullptr);
if (glb_policy->shutting_down) return;
-
- lb_call_init_locked(glb_policy);
-
+ // Init the LB call data.
+ GPR_ASSERT(glb_policy->lb_calld == nullptr);
+ glb_policy->lb_calld = lb_call_data_create_locked(glb_policy);
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Query for backends (lb_channel: %p, lb_call: %p)",
- glb_policy, glb_policy->lb_channel, glb_policy->lb_call);
+ "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p, "
+ "lb_call: %p)",
+ glb_policy, glb_policy->lb_channel, glb_policy->lb_calld,
+ glb_policy->lb_calld->lb_call);
}
- GPR_ASSERT(glb_policy->lb_call != nullptr);
-
+ GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr);
+ // Create the ops.
grpc_call_error call_error;
grpc_op ops[3];
memset(ops, 0, sizeof(ops));
-
+ // Op: send initial metadata.
grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
+ // Op: send request message.
+ GPR_ASSERT(glb_policy->lb_calld->send_message_payload != nullptr);
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message.send_message =
+ glb_policy->lb_calld->send_message_payload;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ glb_lb_call_data_ref(glb_policy->lb_calld,
+ "lb_on_sent_initial_request_locked");
+ call_error = grpc_call_start_batch_and_execute(
+ glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops),
+ &glb_policy->lb_calld->lb_on_sent_initial_request);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ // Op: recv initial metadata.
+ op = ops;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
- &glb_policy->lb_initial_metadata_recv;
+ &glb_policy->lb_calld->lb_initial_metadata_recv;
op->flags = 0;
op->reserved = nullptr;
op++;
- GPR_ASSERT(glb_policy->lb_request_payload != nullptr);
- op->op = GRPC_OP_SEND_MESSAGE;
- op->data.send_message.send_message = glb_policy->lb_request_payload;
+ // Op: recv response.
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message.recv_message =
+ &glb_policy->lb_calld->recv_message_payload;
op->flags = 0;
op->reserved = nullptr;
op++;
- /* take a ref to be released in lb_on_sent_initial_request_locked() */
- GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_sent_initial_request_locked");
+ glb_lb_call_data_ref(glb_policy->lb_calld, "lb_on_response_received_locked");
call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, ops, (size_t)(op - ops),
- &glb_policy->lb_on_sent_initial_request);
+ glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops),
+ &glb_policy->lb_calld->lb_on_response_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
-
+ // Op: recv server status.
op = ops;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata =
- &glb_policy->lb_trailing_metadata_recv;
- op->data.recv_status_on_client.status = &glb_policy->lb_call_status;
+ &glb_policy->lb_calld->lb_trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &glb_policy->lb_calld->lb_call_status;
op->data.recv_status_on_client.status_details =
- &glb_policy->lb_call_status_details;
+ &glb_policy->lb_calld->lb_call_status_details;
op->flags = 0;
op->reserved = nullptr;
op++;
- /* take a ref to be released in lb_on_server_status_received_locked() */
- GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_server_status_received_locked");
+ // This callback signals the end of the LB call, so it relies on the initial
+ // ref instead of a new ref. When it's invoked, it's the initial ref that is
+ // unreffed.
call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, ops, (size_t)(op - ops),
- &glb_policy->lb_on_server_status_received);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
-
- op = ops;
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- /* take a ref to be unref'd/reused in lb_on_response_received_locked() */
- GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_response_received_locked");
- call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, ops, (size_t)(op - ops),
- &glb_policy->lb_on_response_received);
+ glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops),
+ &glb_policy->lb_calld->lb_on_server_status_received);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- glb_policy->initial_request_sent = true;
+ glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
+ grpc_byte_buffer_destroy(lb_calld->send_message_payload);
+ lb_calld->send_message_payload = nullptr;
// If we attempted to send a client load report before the initial request was
- // sent, send the load report now.
- if (glb_policy->client_load_report_payload != nullptr) {
- do_send_client_load_report_locked(glb_policy);
+ // sent (and this lb_calld is still in use), send the load report now.
+ if (lb_calld->client_load_report_is_due &&
+ lb_calld == lb_calld->glb_policy->lb_calld) {
+ send_client_load_report_locked(lb_calld);
+ lb_calld->client_load_report_is_due = false;
}
- GRPC_LB_POLICY_UNREF(&glb_policy->base, "lb_on_sent_initial_request_locked");
+ glb_lb_call_data_unref(lb_calld, "lb_on_sent_initial_request_locked");
}
static void lb_on_response_received_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
+ glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
+ glb_lb_policy* glb_policy = lb_calld->glb_policy;
+ // Empty payload means the LB call was cancelled.
+ if (lb_calld != glb_policy->lb_calld ||
+ lb_calld->recv_message_payload == nullptr) {
+ glb_lb_call_data_unref(lb_calld, "lb_on_response_received_locked");
+ return;
+ }
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op* op = ops;
- if (glb_policy->lb_response_payload != nullptr) {
- glb_policy->lb_call_backoff->Reset();
- /* Received data from the LB server. Look inside
- * glb_policy->lb_response_payload, for a serverlist. */
- grpc_byte_buffer_reader bbr;
- grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload);
- grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
- grpc_byte_buffer_reader_destroy(&bbr);
- grpc_byte_buffer_destroy(glb_policy->lb_response_payload);
-
- grpc_grpclb_initial_response* response = nullptr;
- if (!glb_policy->seen_initial_response &&
- (response = grpc_grpclb_initial_response_parse(response_slice)) !=
- nullptr) {
- if (response->has_client_stats_report_interval) {
- glb_policy->client_stats_report_interval = GPR_MAX(
- GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
- &response->client_stats_report_interval));
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Received initial LB response message; "
- "client load reporting interval = %" PRIdPTR " milliseconds",
- glb_policy, glb_policy->client_stats_report_interval);
- }
- /* take a ref to be unref'd in send_client_load_report_locked() */
- glb_policy->client_load_report_timer_callback_pending = true;
- GRPC_LB_POLICY_REF(&glb_policy->base, "client_load_report");
- schedule_next_client_load_report(glb_policy);
- } else if (grpc_lb_glb_trace.enabled()) {
+ glb_policy->lb_call_backoff->Reset();
+ grpc_byte_buffer_reader bbr;
+ grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload);
+ grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
+ grpc_byte_buffer_reader_destroy(&bbr);
+ grpc_byte_buffer_destroy(lb_calld->recv_message_payload);
+ lb_calld->recv_message_payload = nullptr;
+ grpc_grpclb_initial_response* initial_response;
+ grpc_grpclb_serverlist* serverlist;
+ if (!lb_calld->seen_initial_response &&
+ (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
+ nullptr) {
+ // Have NOT seen initial response, look for initial response.
+ if (initial_response->has_client_stats_report_interval) {
+ lb_calld->client_stats_report_interval = GPR_MAX(
+ GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
+ &initial_response->client_stats_report_interval));
+ if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Received initial LB response message; client load "
- "reporting NOT enabled",
- glb_policy);
+ "[grpclb %p] Received initial LB response message; "
+ "client load reporting interval = %" PRIdPTR " milliseconds",
+ glb_policy, lb_calld->client_stats_report_interval);
}
- grpc_grpclb_initial_response_destroy(response);
- glb_policy->seen_initial_response = true;
- } else {
- grpc_grpclb_serverlist* serverlist =
- grpc_grpclb_response_parse_serverlist(response_slice);
- if (serverlist != nullptr) {
- GPR_ASSERT(glb_policy->lb_call != nullptr);
+ } else if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Received initial LB response message; client load "
+ "reporting NOT enabled",
+ glb_policy);
+ }
+ grpc_grpclb_initial_response_destroy(initial_response);
+ lb_calld->seen_initial_response = true;
+ } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
+ response_slice)) != nullptr) {
+ // Have seen initial response, look for serverlist.
+ GPR_ASSERT(lb_calld->lb_call != nullptr);
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
+ glb_policy, serverlist->num_servers);
+ for (size_t i = 0; i < serverlist->num_servers; ++i) {
+ grpc_resolved_address addr;
+ parse_server(serverlist->servers[i], &addr);
+ char* ipport;
+ grpc_sockaddr_to_string(&ipport, &addr, false);
+ gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
+ glb_policy, i, ipport);
+ gpr_free(ipport);
+ }
+ }
+ /* update serverlist */
+ if (serverlist->num_servers > 0) {
+ // Start sending client load report only after we start using the
+ // serverlist returned from the current LB call.
+ if (lb_calld->client_stats_report_interval > 0 &&
+ lb_calld->client_stats == nullptr) {
+ lb_calld->client_stats = grpc_grpclb_client_stats_create();
+ glb_lb_call_data_ref(lb_calld, "client_load_report");
+ schedule_next_client_load_report(lb_calld);
+ }
+ if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
- "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
- glb_policy, serverlist->num_servers);
- for (size_t i = 0; i < serverlist->num_servers; ++i) {
- grpc_resolved_address addr;
- parse_server(serverlist->servers[i], &addr);
- char* ipport;
- grpc_sockaddr_to_string(&ipport, &addr, false);
- gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
- glb_policy, i, ipport);
- gpr_free(ipport);
- }
+ "[grpclb %p] Incoming server list identical to current, "
+ "ignoring.",
+ glb_policy);
}
- /* update serverlist */
- if (serverlist->num_servers > 0) {
- if (grpc_grpclb_serverlist_equals(glb_policy->serverlist,
- serverlist)) {
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Incoming server list identical to current, "
- "ignoring.",
- glb_policy);
- }
- grpc_grpclb_destroy_serverlist(serverlist);
- } else { /* new serverlist */
- if (glb_policy->serverlist != nullptr) {
- /* dispose of the old serverlist */
- grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
- } else {
- /* or dispose of the fallback */
- grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
- glb_policy->fallback_backend_addresses = nullptr;
- if (glb_policy->fallback_timer_callback_pending) {
- grpc_timer_cancel(&glb_policy->lb_fallback_timer);
- }
- }
- /* and update the copy in the glb_lb_policy instance. This
- * serverlist instance will be destroyed either upon the next
- * update or in glb_destroy() */
- glb_policy->serverlist = serverlist;
- glb_policy->serverlist_index = 0;
- rr_handover_locked(glb_policy);
- }
+ grpc_grpclb_destroy_serverlist(serverlist);
+ } else { /* new serverlist */
+ if (glb_policy->serverlist != nullptr) {
+ /* dispose of the old serverlist */
+ grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
} else {
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Received empty server list, ignoring.",
- glb_policy);
+ /* or dispose of the fallback */
+ grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
+ glb_policy->fallback_backend_addresses = nullptr;
+ if (glb_policy->fallback_timer_callback_pending) {
+ grpc_timer_cancel(&glb_policy->lb_fallback_timer);
+ glb_policy->fallback_timer_callback_pending = false;
}
- grpc_grpclb_destroy_serverlist(serverlist);
}
- } else { /* serverlist == nullptr */
- gpr_log(GPR_ERROR,
- "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
- glb_policy,
- grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
+ /* and update the copy in the glb_lb_policy instance. This
+ * serverlist instance will be destroyed either upon the next
+ * update or in glb_destroy() */
+ glb_policy->serverlist = serverlist;
+ glb_policy->serverlist_index = 0;
+ rr_handover_locked(glb_policy);
}
+ } else {
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
+ glb_policy);
+ }
+ grpc_grpclb_destroy_serverlist(serverlist);
+ }
+ } else {
+ // No valid initial response or serverlist found.
+ gpr_log(GPR_ERROR,
+ "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
+ glb_policy,
+ grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
+ }
+ grpc_slice_unref_internal(response_slice);
+ if (!glb_policy->shutting_down) {
+ // Keep listening for serverlist updates.
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message.recv_message = &lb_calld->recv_message_payload;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ // Reuse the "lb_on_response_received_locked" ref taken in
+ // query_for_backends_locked().
+ const grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ lb_calld->lb_call, ops, (size_t)(op - ops),
+ &lb_calld->lb_on_response_received);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ } else {
+ glb_lb_call_data_unref(lb_calld,
+ "lb_on_response_received_locked+glb_shutdown");
+ }
+}
+
+static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
+ glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg;
+ glb_lb_policy* glb_policy = lb_calld->glb_policy;
+ GPR_ASSERT(lb_calld->lb_call != nullptr);
+ if (grpc_lb_glb_trace.enabled()) {
+ char* status_details =
+ grpc_slice_to_c_string(lb_calld->lb_call_status_details);
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Status from LB server received. Status = %d, details "
+ "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
+ lb_calld->glb_policy, lb_calld->lb_call_status, status_details,
+ lb_calld, lb_calld->lb_call, grpc_error_string(error));
+ gpr_free(status_details);
+ }
+ // If this lb_calld is still in use, this call ended because of a failure so
+ // we want to retry connecting. Otherwise, we have deliberately ended this
+ // call and no further action is required.
+ if (lb_calld == glb_policy->lb_calld) {
+ glb_policy->lb_calld = nullptr;
+ if (lb_calld->client_load_report_timer_callback_pending) {
+ grpc_timer_cancel(&lb_calld->client_load_report_timer);
}
- grpc_slice_unref_internal(response_slice);
- if (!glb_policy->shutting_down) {
- /* keep listening for serverlist updates */
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
- op->flags = 0;
- op->reserved = nullptr;
- op++;
- /* reuse the "lb_on_response_received_locked" ref taken in
- * query_for_backends_locked() */
- const grpc_call_error call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, ops, (size_t)(op - ops),
- &glb_policy->lb_on_response_received); /* loop */
- GPR_ASSERT(GRPC_CALL_OK == call_error);
+ GPR_ASSERT(!glb_policy->shutting_down);
+ if (lb_calld->seen_initial_response) {
+ // If we lose connection to the LB server, reset the backoff and restart
+ // the LB call immediately.
+ glb_policy->lb_call_backoff->Reset();
+ query_for_backends_locked(glb_policy);
} else {
- GRPC_LB_POLICY_UNREF(&glb_policy->base,
- "lb_on_response_received_locked_shutdown");
+ // If this LB call fails establishing any connection to the LB server,
+ // retry later.
+ start_lb_call_retry_timer_locked(glb_policy);
}
- } else { /* empty payload: call cancelled. */
- /* dispose of the "lb_on_response_received_locked" ref taken in
- * query_for_backends_locked() and reused in every reception loop */
- GRPC_LB_POLICY_UNREF(&glb_policy->base,
- "lb_on_response_received_locked_empty_payload");
}
+ glb_lb_call_data_unref(lb_calld, "lb_call_ended");
}
static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
@@ -1597,29 +1656,6 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
}
-static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- GPR_ASSERT(glb_policy->lb_call != nullptr);
- if (grpc_lb_glb_trace.enabled()) {
- char* status_details =
- grpc_slice_to_c_string(glb_policy->lb_call_status_details);
- gpr_log(GPR_INFO,
- "[grpclb %p] Status from LB server received. Status = %d, Details "
- "= '%s', (call: %p), error '%s'",
- glb_policy, glb_policy->lb_call_status, status_details,
- glb_policy->lb_call, grpc_error_string(error));
- gpr_free(status_details);
- }
- /* We need to perform cleanups no matter what. */
- lb_call_destroy_locked(glb_policy);
- // If the load report timer is still pending, we wait for it to be
- // called before restarting the call. Otherwise, we restart the call
- // here.
- if (!glb_policy->client_load_report_timer_callback_pending) {
- maybe_restart_lb_call(glb_policy);
- }
-}
-
static void fallback_update_locked(glb_lb_policy* glb_policy,
const grpc_lb_addresses* addresses) {
GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
@@ -1701,7 +1737,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
switch (glb_policy->lb_channel_connectivity) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- /* resub. */
+ // Keep watching the LB channel.
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(glb_policy->lb_channel));
@@ -1714,29 +1750,26 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
&glb_policy->lb_channel_on_connectivity_changed, nullptr);
break;
}
+ // The LB channel may be IDLE because it's shut down before the update.
+ // Restart the LB call to kick the LB channel into gear.
case GRPC_CHANNEL_IDLE:
- // lb channel inactive (probably shutdown prior to update). Restart lb
- // call to kick the lb channel into gear.
- /* fallthrough */
case GRPC_CHANNEL_READY:
- if (glb_policy->lb_call != nullptr) {
- glb_policy->updating_lb_call = true;
- grpc_call_cancel(glb_policy->lb_call, nullptr);
- // lb_on_server_status_received() will pick up the cancel and reinit
- // lb_call.
- } else if (glb_policy->started_picking) {
+ if (glb_policy->lb_calld != nullptr) {
+ lb_call_data_shutdown(glb_policy);
+ }
+ if (glb_policy->started_picking) {
if (glb_policy->retry_timer_callback_pending) {
grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
}
- start_picking_locked(glb_policy);
+ glb_policy->lb_call_backoff->Reset();
+ query_for_backends_locked(glb_policy);
}
- /* fallthrough */
+ // Fall through.
case GRPC_CHANNEL_SHUTDOWN:
done:
glb_policy->watching_lb_channel = false;
GRPC_LB_POLICY_UNREF(&glb_policy->base,
"watch_lb_channel_connectivity_cb_shutdown");
- break;
}
}
@@ -1851,6 +1884,14 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
+ // Init LB call backoff option.
+ grpc_core::BackOff::Options backoff_options;
+ backoff_options
+ .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
+ .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ glb_policy->lb_call_backoff.Init(backoff_options);
return &glb_policy->base;
}
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 815dfd0c4f..78527587cf 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -220,30 +220,31 @@ class BalancerServiceImpl : public BalancerService {
if (client_load_reporting_interval_seconds_ > 0) {
request.Clear();
- stream->Read(&request);
- gpr_log(GPR_INFO, "LB[%p]: recv client load report msg: '%s'", this,
- request.DebugString().c_str());
- GPR_ASSERT(request.has_client_stats());
- // We need to acquire the lock here in order to prevent the notify_one
- // below from firing before its corresponding wait is executed.
- std::lock_guard<std::mutex> lock(mu_);
- client_stats_.num_calls_started +=
- request.client_stats().num_calls_started();
- client_stats_.num_calls_finished +=
- request.client_stats().num_calls_finished();
- client_stats_.num_calls_finished_with_client_failed_to_send +=
- request.client_stats()
- .num_calls_finished_with_client_failed_to_send();
- client_stats_.num_calls_finished_known_received +=
- request.client_stats().num_calls_finished_known_received();
- for (const auto& drop_token_count :
- request.client_stats().calls_finished_with_drop()) {
- client_stats_
- .drop_token_counts[drop_token_count.load_balance_token()] +=
- drop_token_count.num_calls();
+ if (stream->Read(&request)) {
+ gpr_log(GPR_INFO, "LB[%p]: recv client load report msg: '%s'", this,
+ request.DebugString().c_str());
+ GPR_ASSERT(request.has_client_stats());
+ // We need to acquire the lock here in order to prevent the notify_one
+ // below from firing before its corresponding wait is executed.
+ std::lock_guard<std::mutex> lock(mu_);
+ client_stats_.num_calls_started +=
+ request.client_stats().num_calls_started();
+ client_stats_.num_calls_finished +=
+ request.client_stats().num_calls_finished();
+ client_stats_.num_calls_finished_with_client_failed_to_send +=
+ request.client_stats()
+ .num_calls_finished_with_client_failed_to_send();
+ client_stats_.num_calls_finished_known_received +=
+ request.client_stats().num_calls_finished_known_received();
+ for (const auto& drop_token_count :
+ request.client_stats().calls_finished_with_drop()) {
+ client_stats_
+ .drop_token_counts[drop_token_count.load_balance_token()] +=
+ drop_token_count.num_calls();
+ }
+ load_report_ready_ = true;
+ load_report_cond_.notify_one();
}
- load_report_ready_ = true;
- load_report_cond_.notify_one();
}
done:
gpr_log(GPR_INFO, "LB[%p]: done", this);