From f2a0ae7e3bca636a3a2a95e960a76cc266f75dd4 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Wed, 27 Dec 2017 16:08:12 -0800 Subject: Restore checking initial request sent --- .../client_channel/lb_policy/grpclb/grpclb.cc | 64 +++++++++++++++++----- 1 file changed, 49 insertions(+), 15 deletions(-) (limited to 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc') diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 3c64213fb9..21a705ab37 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -377,6 +377,9 @@ typedef struct glb_lb_policy { /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ + /* Finished sending initial request. */ + grpc_closure lb_on_sent_initial_request; + /* Status from the LB server has been received. This signals the end of the LB * call. */ grpc_closure lb_on_server_status_received; @@ -416,6 +419,7 @@ typedef struct glb_lb_policy { /** LB fallback timer */ grpc_timer lb_fallback_timer; + bool initial_request_sent; bool seen_initial_response; /* Stats for client-side load reporting. Should be unreffed and @@ -1358,6 +1362,22 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) { schedule_next_client_load_report(glb_policy); } +static void do_send_client_load_report_locked(glb_lb_policy* glb_policy) { + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = glb_policy->client_load_report_payload; + GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, + client_load_report_done_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); + grpc_call_error call_error = grpc_call_start_batch_and_execute( + glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); + if (call_error != GRPC_CALL_OK) { + gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } +} + static bool load_report_counters_are_zero(grpc_grpclb_request* request) { grpc_grpclb_dropped_call_counts* drop_entries = (grpc_grpclb_dropped_call_counts*) @@ -1401,22 +1421,15 @@ static void send_client_load_report_locked(void* arg, grpc_error* error) { grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_slice_unref_internal(request_payload_slice); grpc_grpclb_request_destroy(request); - // Send load report message. - grpc_op op; - memset(&op, 0, sizeof(op)); - op.op = GRPC_OP_SEND_MESSAGE; - op.data.send_message.send_message = glb_policy->client_load_report_payload; - GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, - client_load_report_done_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_call_error call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); - if (call_error != GRPC_CALL_OK) { - gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error); - GPR_ASSERT(GRPC_CALL_OK == call_error); + // If we've already sent the initial request, then we can go ahead and send + // the load report. Otherwise, we need to wait until the initial request has + // been sent to send this (see lb_on_sent_initial_request_locked() below). + if (glb_policy->initial_request_sent) { + do_send_client_load_report_locked(glb_policy); } } +static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error); static void lb_on_server_status_received_locked(void* arg, grpc_error* error); static void lb_on_response_received_locked(void* arg, grpc_error* error); static void lb_call_init_locked(glb_lb_policy* glb_policy) { @@ -1456,6 +1469,9 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) { grpc_slice_unref_internal(request_payload_slice); grpc_grpclb_request_destroy(request); + GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request, + lb_on_sent_initial_request_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received, lb_on_server_status_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); @@ -1470,6 +1486,7 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) { GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + glb_policy->initial_request_sent = false; glb_policy->seen_initial_response = false; glb_policy->last_client_load_report_counters_were_zero = false; } @@ -1528,8 +1545,13 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) { op->flags = 0; op->reserved = nullptr; op++; - call_error = grpc_call_start_batch_and_execute(glb_policy->lb_call, ops, - (size_t)(op - ops), nullptr); + /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref + * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, + "lb_on_sent_initial_request_locked"); + call_error = grpc_call_start_batch_and_execute( + glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_sent_initial_request); GPR_ASSERT(GRPC_CALL_OK == call_error); op = ops; @@ -1566,6 +1588,18 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) { GPR_ASSERT(GRPC_CALL_OK == call_error); } +static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) { + glb_lb_policy* glb_policy = (glb_lb_policy*)arg; + glb_policy->initial_request_sent = true; + // If we attempted to send a client load report before the initial request was + // sent, send the load report now. + if (glb_policy->client_load_report_payload != nullptr) { + do_send_client_load_report_locked(glb_policy); + } + GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base, + "lb_on_sent_initial_request_locked"); +} + static void lb_on_response_received_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; grpc_op ops[2]; -- cgit v1.2.3