aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2017-09-26 13:29:06 -0700
committerGravatar GitHub <noreply@github.com>2017-09-26 13:29:06 -0700
commit5490cdd17233f3aaeb5f5fac2ad65a13c8ac75c5 (patch)
tree8ec2ad93e9c3df3f2bddc4c18abd7d3358f805d9
parent873b55cfebfc726b06ae2f47bf60ed9255c80807 (diff)
parenta4792f5ad13dec52dfde7dca8e996b7075a2fdc3 (diff)
Merge pull request #12709 from markdroth/grpclb_client_load_report_timer_fix
grpclb client load report timer fix
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c178
1 files changed, 78 insertions, 100 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index 85ef7894ea..05a106f214 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -354,9 +354,6 @@ typedef struct glb_lb_policy {
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
- /* Finished sending initial request. */
- grpc_closure lb_on_sent_initial_request;
-
/* Status from the LB server has been received. This signals the end of the LB
* call. */
grpc_closure lb_on_server_status_received;
@@ -390,7 +387,6 @@ typedef struct glb_lb_policy {
/** LB call retry timer */
grpc_timer lb_call_retry_timer;
- bool initial_request_sent;
bool seen_initial_response;
/* Stats for client-side load reporting. Should be unreffed and
@@ -1173,6 +1169,58 @@ static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
exec_ctx, &glb_policy->state_tracker, current, notify);
}
+static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
+ glb_policy->retry_timer_active = false;
+ if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
+ (void *)glb_policy);
+ }
+ GPR_ASSERT(glb_policy->lb_call == NULL);
+ query_for_backends_locked(exec_ctx, glb_policy);
+ }
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
+}
+
+static void maybe_restart_lb_call(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy) {
+ if (glb_policy->started_picking && glb_policy->updating_lb_call) {
+ if (glb_policy->retry_timer_active) {
+ grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
+ }
+ if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy);
+ glb_policy->updating_lb_call = false;
+ } else if (!glb_policy->shutting_down) {
+ /* if we aren't shutting down, restart the LB client call after some time */
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec next_try =
+ gpr_backoff_step(&glb_policy->lb_call_backoff_state, now);
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...",
+ (void *)glb_policy);
+ gpr_timespec timeout = gpr_time_sub(next_try, now);
+ if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) {
+ gpr_log(GPR_DEBUG,
+ "... retry_timer_active in %" PRId64 ".%09d seconds.",
+ timeout.tv_sec, timeout.tv_nsec);
+ } else {
+ gpr_log(GPR_DEBUG, "... retry_timer_active immediately.");
+ }
+ }
+ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
+ GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
+ lb_call_on_retry_timer_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
+ glb_policy->retry_timer_active = true;
+ grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
+ &glb_policy->lb_on_call_retry, now);
+ }
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+ "lb_on_server_status_received_locked");
+}
+
static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
@@ -1203,21 +1251,6 @@ static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
schedule_next_client_load_report(exec_ctx, glb_policy);
}
-static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy) {
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_SEND_MESSAGE;
- op.data.send_message.send_message = glb_policy->client_load_report_payload;
- GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
- client_load_report_done_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- exec_ctx, glb_policy->lb_call, &op, 1,
- &glb_policy->client_load_report_closure);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
-}
-
static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
grpc_grpclb_dropped_call_counts *drop_entries =
(grpc_grpclb_dropped_call_counts *)
@@ -1237,6 +1270,9 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
glb_policy->client_load_report_timer_pending = false;
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"client_load_report");
+ if (glb_policy->lb_call == NULL) {
+ maybe_restart_lb_call(exec_ctx, glb_policy);
+ }
return;
}
// Construct message payload.
@@ -1260,17 +1296,23 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(exec_ctx, request_payload_slice);
grpc_grpclb_request_destroy(request);
- // If we've already sent the initial request, then we can go ahead and
- // sent the load report. Otherwise, we need to wait until the initial
- // request has been sent to send this
- // (see lb_on_sent_initial_request_locked() below).
- if (glb_policy->initial_request_sent) {
- do_send_client_load_report_locked(exec_ctx, glb_policy);
+ // Send load report message.
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_SEND_MESSAGE;
+ op.data.send_message.send_message = glb_policy->client_load_report_payload;
+ GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
+ client_load_report_done_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ exec_ctx, glb_policy->lb_call, &op, 1,
+ &glb_policy->client_load_report_closure);
+ if (call_error != GRPC_CALL_OK) {
+ gpr_log(GPR_ERROR, "call_error=%d", call_error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
}
}
-static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx,
- void *arg, grpc_error *error);
static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error);
static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -1315,9 +1357,6 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_slice_unref_internal(exec_ctx, request_payload_slice);
grpc_grpclb_request_destroy(request);
- GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request,
- lb_on_sent_initial_request_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
lb_on_server_status_received_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
@@ -1332,7 +1371,6 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
- glb_policy->initial_request_sent = false;
glb_policy->seen_initial_response = false;
glb_policy->last_client_load_report_counters_were_zero = false;
}
@@ -1349,7 +1387,7 @@ static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details);
- if (!glb_policy->client_load_report_timer_pending) {
+ if (glb_policy->client_load_report_timer_pending) {
grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer);
}
}
@@ -1373,7 +1411,7 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(glb_policy->lb_call != NULL);
grpc_call_error call_error;
- grpc_op ops[4];
+ grpc_op ops[3];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
@@ -1394,13 +1432,8 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
op->flags = 0;
op->reserved = NULL;
op++;
- /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
- * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
- "lb_on_sent_initial_request_locked");
- call_error = grpc_call_start_batch_and_execute(
- exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
- &glb_policy->lb_on_sent_initial_request);
+ call_error = grpc_call_start_batch_and_execute(exec_ctx, glb_policy->lb_call,
+ ops, (size_t)(op - ops), NULL);
GPR_ASSERT(GRPC_CALL_OK == call_error);
op = ops;
@@ -1437,19 +1470,6 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
-static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx,
- void *arg, grpc_error *error) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
- glb_policy->initial_request_sent = true;
- // If we attempted to send a client load report before the initial
- // request was sent, send the load report now.
- if (glb_policy->client_load_report_payload != NULL) {
- do_send_client_load_report_locked(exec_ctx, glb_policy);
- }
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "lb_on_sent_initial_request_locked");
-}
-
static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
@@ -1572,21 +1592,6 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
}
-static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
- glb_policy->retry_timer_active = false;
- if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
- (void *)glb_policy);
- }
- GPR_ASSERT(glb_policy->lb_call == NULL);
- query_for_backends_locked(exec_ctx, glb_policy);
- }
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
-}
-
static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
@@ -1603,39 +1608,12 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
}
/* We need to perform cleanups no matter what. */
lb_call_destroy_locked(exec_ctx, glb_policy);
- if (glb_policy->started_picking && glb_policy->updating_lb_call) {
- if (glb_policy->retry_timer_active) {
- grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
- }
- if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy);
- glb_policy->updating_lb_call = false;
- } else if (!glb_policy->shutting_down) {
- /* if we aren't shutting down, restart the LB client call after some time */
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- gpr_timespec next_try =
- gpr_backoff_step(&glb_policy->lb_call_backoff_state, now);
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...",
- (void *)glb_policy);
- gpr_timespec timeout = gpr_time_sub(next_try, now);
- if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) {
- gpr_log(GPR_DEBUG,
- "... retry_timer_active in %" PRId64 ".%09d seconds.",
- timeout.tv_sec, timeout.tv_nsec);
- } else {
- gpr_log(GPR_DEBUG, "... retry_timer_active immediately.");
- }
- }
- GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
- GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
- lb_call_on_retry_timer_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- glb_policy->retry_timer_active = true;
- grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
- &glb_policy->lb_on_call_retry, now);
+ // If the load report timer is still pending, we wait for it to be
+ // called before restarting the call. Otherwise, we restart the call
+ // here.
+ if (!glb_policy->client_load_report_timer_pending) {
+ maybe_restart_lb_call(exec_ctx, glb_policy);
}
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
- "lb_on_server_status_received_locked");
}
static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,