diff options
author | Craig Tiller <ctiller@google.com> | 2016-11-16 15:25:00 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-11-16 15:25:00 -0800 |
commit | c586666cbdfc5b5f2a3306892f8b4491862c5aba (patch) | |
tree | 924f3f760a489ccf7c6c70307b72d1753aedbdf3 /src/core/ext/lb_policy/grpclb | |
parent | 7cdad96fc49090eb5e3a12a7cca5a9f257d3f301 (diff) | |
parent | 1dc9ad33273e090a1c7ffa05991dc8ccc2badee6 (diff) |
Merge github.com:grpc/grpc into slice_with_exec_ctx
Diffstat (limited to 'src/core/ext/lb_policy/grpclb')
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 38 |
1 files changed, 25 insertions, 13 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index a85c16b881..4bca8def39 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -187,6 +187,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, * addresses failed to connect). There won't be any user_data/token * available */ if (wc_arg->target != NULL) { + GPR_ASSERT(wc_arg->lb_token != NULL); initial_metadata_add_lb_token(wc_arg->initial_metadata, wc_arg->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); @@ -606,10 +607,10 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, * right grpclb status. */ rr_connectivity_data *rr_conn_data = arg; glb_lb_policy *glb_policy = rr_conn_data->glb_policy; + gpr_mu_lock(&glb_policy->mu); if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN && !glb_policy->shutting_down) { - gpr_mu_lock(&glb_policy->mu); /* RR not shutting down. Mimic the RR's policy state */ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_conn_data->state, GRPC_ERROR_REF(error), @@ -618,12 +619,12 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_conn_data->state, &rr_conn_data->on_change); - gpr_mu_unlock(&glb_policy->mu); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "rr_connectivity_cb"); gpr_free(rr_conn_data); } + gpr_mu_unlock(&glb_policy->mu); } static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, @@ -761,17 +762,24 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->rr_policy) { GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); } - if (glb_policy->started_picking) { - if (glb_policy->lb_call != NULL) { - grpc_call_cancel(glb_policy->lb_call, NULL); - /* lb_on_server_status_received will pick up the cancel and clean up */ - } - } grpc_connectivity_state_set( exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown"); + /* We need a copy of the lb_call pointer because we can't cancell the call + * while holding glb_policy->mu: lb_on_server_status_received, invoked due to + * the cancel, needs to acquire that same lock */ + grpc_call *lb_call = glb_policy->lb_call; + glb_policy->lb_call = NULL; gpr_mu_unlock(&glb_policy->mu); + /* glb_policy->lb_call and this local lb_call must be consistent at this point + * because glb_policy->lb_call is only assigned in lb_call_init_locked as part + * of query_for_backends_locked, which can only be invoked while + * glb_policy->shutting_down is false. */ + if (lb_call != NULL) { + grpc_call_cancel(lb_call, NULL); + /* lb_on_server_status_received will pick up the cancel and clean up */ + } while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; @@ -955,9 +963,11 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void lb_call_init(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { +static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); + GPR_ASSERT(!glb_policy->shutting_down); /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling @@ -1010,7 +1020,9 @@ static void lb_call_destroy_locked(glb_lb_policy *glb_policy) { static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_channel != NULL); - lb_call_init(exec_ctx, glb_policy); + if (glb_policy->shutting_down) return; + + lb_call_init_locked(exec_ctx, glb_policy); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", @@ -1082,6 +1094,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; + gpr_mu_lock(&glb_policy->mu); if (glb_policy->lb_response_payload != NULL) { gpr_backoff_reset(&glb_policy->lb_call_backoff_state); /* Received data from the LB server. Look inside @@ -1110,7 +1123,6 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, /* update serverlist */ if (serverlist->num_servers > 0) { - gpr_mu_lock(&glb_policy->mu); if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, @@ -1126,7 +1138,6 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, rr_handover_locked(exec_ctx, glb_policy, error); } - gpr_mu_unlock(&glb_policy->mu); } else { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, @@ -1154,9 +1165,11 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); } + gpr_mu_unlock(&glb_policy->mu); } else { /* empty payload: call cancelled. */ /* dispose of the "lb_on_response_received" weak ref taken in * query_for_backends_locked() and reused in every reception loop */ + gpr_mu_unlock(&glb_policy->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "lb_on_response_received_empty_payload"); } @@ -1176,7 +1189,6 @@ static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, query_for_backends_locked(exec_ctx, glb_policy); } gpr_mu_unlock(&glb_policy->mu); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_on_retry_timer"); } |