aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
diff options
context:
space:
mode:
authorGravatar Juanli Shen <aspirinsjl@gmail.com>2018-01-03 10:14:38 -0800
committerGravatar GitHub <noreply@github.com>2018-01-03 10:14:38 -0800
commit30bd91e66381d652fadc9c64784f850f3960e473 (patch)
tree77ebadd04b06ef60c07e0320a87fce87d849160c /src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
parent7542ba6e0caee740f124dc25706287fd62f546d8 (diff)
parentf2a0ae7e3bca636a3a2a95e960a76cc266f75dd4 (diff)
Merge pull request #13879 from AspirinSJL/client_load_report_bug
Restore checking initial request sent
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc64
1 files changed, 49 insertions, 15 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index dd6fc602ab..ba4e90d4c2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -377,6 +377,9 @@ typedef struct glb_lb_policy {
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
+ /* Finished sending initial request. */
+ grpc_closure lb_on_sent_initial_request;
+
/* Status from the LB server has been received. This signals the end of the LB
* call. */
grpc_closure lb_on_server_status_received;
@@ -416,6 +419,7 @@ typedef struct glb_lb_policy {
/** LB fallback timer */
grpc_timer lb_fallback_timer;
+ bool initial_request_sent;
bool seen_initial_response;
/* Stats for client-side load reporting. Should be unreffed and
@@ -1357,6 +1361,22 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) {
schedule_next_client_load_report(glb_policy);
}
+static void do_send_client_load_report_locked(glb_lb_policy* glb_policy) {
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_SEND_MESSAGE;
+ op.data.send_message.send_message = glb_policy->client_load_report_payload;
+ GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
+ client_load_report_done_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure);
+ if (call_error != GRPC_CALL_OK) {
+ gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ }
+}
+
static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
grpc_grpclb_dropped_call_counts* drop_entries =
(grpc_grpclb_dropped_call_counts*)
@@ -1400,22 +1420,15 @@ static void send_client_load_report_locked(void* arg, grpc_error* error) {
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
- // Send load report message.
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_SEND_MESSAGE;
- op.data.send_message.send_message = glb_policy->client_load_report_payload;
- GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
- client_load_report_done_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure);
- if (call_error != GRPC_CALL_OK) {
- gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
+ // If we've already sent the initial request, then we can go ahead and send
+ // the load report. Otherwise, we need to wait until the initial request has
+ // been sent to send this (see lb_on_sent_initial_request_locked() below).
+ if (glb_policy->initial_request_sent) {
+ do_send_client_load_report_locked(glb_policy);
}
}
+static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error);
static void lb_on_server_status_received_locked(void* arg, grpc_error* error);
static void lb_on_response_received_locked(void* arg, grpc_error* error);
static void lb_call_init_locked(glb_lb_policy* glb_policy) {
@@ -1455,6 +1468,9 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) {
grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
+ GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request,
+ lb_on_sent_initial_request_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
lb_on_server_status_received_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
@@ -1471,6 +1487,7 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) {
glb_policy->lb_call_backoff.Init(backoff_options);
+ glb_policy->initial_request_sent = false;
glb_policy->seen_initial_response = false;
glb_policy->last_client_load_report_counters_were_zero = false;
}
@@ -1529,8 +1546,13 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op->flags = 0;
op->reserved = nullptr;
op++;
- call_error = grpc_call_start_batch_and_execute(glb_policy->lb_call, ops,
- (size_t)(op - ops), nullptr);
+ /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
+ * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */
+ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
+ "lb_on_sent_initial_request_locked");
+ call_error = grpc_call_start_batch_and_execute(
+ glb_policy->lb_call, ops, (size_t)(op - ops),
+ &glb_policy->lb_on_sent_initial_request);
GPR_ASSERT(GRPC_CALL_OK == call_error);
op = ops;
@@ -1567,6 +1589,18 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
+static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
+ glb_policy->initial_request_sent = true;
+ // If we attempted to send a client load report before the initial request was
+ // sent, send the load report now.
+ if (glb_policy->client_load_report_payload != nullptr) {
+ do_send_client_load_report_locked(glb_policy);
+ }
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
+ "lb_on_sent_initial_request_locked");
+}
+
static void lb_on_response_received_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
grpc_op ops[2];