diff options
Diffstat (limited to 'src/core/ext')
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 4 | ||||
-rw-r--r-- | src/core/ext/client_channel/subchannel.c | 6 | ||||
-rw-r--r-- | src/core/ext/client_channel/subchannel.h | 4 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 26 | ||||
-rw-r--r-- | src/core/ext/lb_policy/pick_first/pick_first.c | 2 | ||||
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 313 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 8 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/parsing.c | 3 |
8 files changed, 230 insertions, 136 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 1cce3bc329..b66fed4b88 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -641,7 +641,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_subchannel_call *subchannel_call = NULL; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->deadline, &subchannel_call); + calld->call_start_time, calld->deadline, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); subchannel_call = CANCELLED_CALL; @@ -894,7 +894,7 @@ retry: grpc_subchannel_call *subchannel_call = NULL; grpc_error *error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->deadline, &subchannel_call); + calld->call_start_time, calld->deadline, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index d08c22278a..7364daf020 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -700,15 +700,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline, - grpc_subchannel_call **call) { + grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec start_time, + gpr_timespec deadline, grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. grpc_error *error = grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, path, deadline, callstk); + NULL, NULL, path, start_time, deadline, callstk); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 93bd72d20d..10bae620df 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -111,8 +111,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, /** construct a subchannel call */ grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline, - grpc_subchannel_call **subchannel_call); + grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec start_time, + gpr_timespec deadline, grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 44ac9a017e..d8ef0c8098 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -761,17 +761,24 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->rr_policy) { GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); } - if (glb_policy->started_picking) { - if (glb_policy->lb_call != NULL) { - grpc_call_cancel(glb_policy->lb_call, NULL); - /* lb_on_server_status_received will pick up the cancel and clean up */ - } - } grpc_connectivity_state_set( exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown"); + /* We need a copy of the lb_call pointer because we can't cancell the call + * while holding glb_policy->mu: lb_on_server_status_received, invoked due to + * the cancel, needs to acquire that same lock */ + grpc_call *lb_call = glb_policy->lb_call; + glb_policy->lb_call = NULL; gpr_mu_unlock(&glb_policy->mu); + /* glb_policy->lb_call and this local lb_call must be consistent at this point + * because glb_policy->lb_call is only assigned in lb_call_init_locked as part + * of query_for_backends_locked, which can only be invoked while + * glb_policy->shutting_down is false. */ + if (lb_call != NULL) { + grpc_call_cancel(lb_call, NULL); + /* lb_on_server_status_received will pick up the cancel and clean up */ + } while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; @@ -955,9 +962,10 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void lb_call_init(glb_lb_policy *glb_policy) { +static void lb_call_init_locked(glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); + GPR_ASSERT(!glb_policy->shutting_down); /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling @@ -1010,7 +1018,9 @@ static void lb_call_destroy_locked(glb_lb_policy *glb_policy) { static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_channel != NULL); - lb_call_init(glb_policy); + if (glb_policy->shutting_down) return; + + lb_call_init_locked(glb_policy); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index ac3c6a305a..c69f773e78 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -292,6 +292,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } else { loop: switch (p->checking_connectivity) { + case GRPC_CHANNEL_INIT: + GPR_UNREACHABLE_CODE(return ); case GRPC_CHANNEL_READY: grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 0fd3abe099..59f84054c4 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -116,8 +116,13 @@ typedef struct { grpc_closure connectivity_changed_closure; /** this subchannels current position in subchannel->ready_list */ ready_list *ready_list_node; - /** last observed connectivity */ - grpc_connectivity_state connectivity_state; + /** last observed connectivity. Not updated by + * \a grpc_subchannel_notify_on_state_change. Used to determine the previous + * state while processing the new state in \a rr_connectivity_changed */ + grpc_connectivity_state prev_connectivity_state; + /** current connectivity state. Updated by \a + * grpc_subchannel_notify_on_state_change */ + grpc_connectivity_state curr_connectivity_state; /** the subchannel's target user data */ void *user_data; /** vtable to operate over \a user_data */ @@ -127,6 +132,7 @@ typedef struct { struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; + gpr_mu mu; /** total number of addresses received at creation time */ size_t num_addresses; @@ -135,8 +141,11 @@ struct round_robin_lb_policy { size_t num_subchannels; subchannel_data **subchannels; - /** mutex protecting remaining members */ - gpr_mu mu; + /** how many subchannels are in TRANSIENT_FAILURE */ + size_t num_transient_failures; + /** how many subchannels are IDLE */ + size_t num_idle; + /** have we started picking? */ int started_picking; /** are we shutting down? */ @@ -258,6 +267,10 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, gpr_free(node); } +static bool is_ready_list_empty(round_robin_lb_policy *p) { + return p->ready_list.prev == NULL; +} + static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; ready_list *elem; @@ -268,7 +281,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); if (sd->user_data != NULL) { GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(sd->user_data); @@ -381,18 +394,18 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { size_t i; p->started_picking = 1; - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p, - p->num_subchannels); - } - for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - sd->connectivity_state = GRPC_CHANNEL_IDLE; + /* use some sentinel value outside of the range of grpc_connectivity_state + * to signal an undefined previous state. We won't be referring to this + * value again and it'll be overwritten after the first call to + * rr_connectivity_changed */ + sd->prev_connectivity_state = GRPC_CHANNEL_INIT; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); + &sd->curr_connectivity_state, &sd->connectivity_changed_closure); } } @@ -422,7 +435,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, /* readily available, report right away */ *target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), - "picked"); + "rr_picked"); if (user_data != NULL) { *user_data = selected->user_data; @@ -453,125 +466,184 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } } +static void update_state_counters(subchannel_data *sd) { + round_robin_lb_policy *p = sd->policy; + + /* update p->num_transient_failures (resp. p->num_idle): if the previous + * state was TRANSIENT_FAILURE (resp. IDLE), decrement + * p->num_transient_failures (resp. p->num_idle). */ + if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(p->num_transient_failures > 0); + --p->num_transient_failures; + } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { + GPR_ASSERT(p->num_idle > 0); + --p->num_idle; + } +} + +/* sd is the subchannel_data associted with the updated subchannel. + * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE + * or SHUTDOWN */ +static grpc_connectivity_state update_lb_connectivity_status( + grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { + /* In priority order. The first rule to match terminates the search (ie, if we + * are on rule n, all previous rules were unfulfilled). + * + * 1) RULE: ANY subchannel is READY => policy is READY. + * CHECK: At least one subchannel is ready iff p->ready_list is NOT empty. + * + * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. + * CHECK: sd->curr_connectivity_state == CONNECTING. + * + * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN. + * CHECK: p->num_subchannels = 0. + * + * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is + * TRANSIENT_FAILURE. + * CHECK: p->num_transient_failures == p->num_subchannels. + * + * 5) RULE: ALL subchannels are IDLE => policy is IDLE. + * CHECK: p->num_idle == p->num_subchannels. + */ + round_robin_lb_policy *p = sd->policy; + if (!is_ready_list_empty(p)) { /* 1) READY */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, + GRPC_ERROR_NONE, "rr_ready"); + return GRPC_CHANNEL_READY; + } else if (sd->curr_connectivity_state == + GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, + "rr_connecting"); + return GRPC_CHANNEL_CONNECTING; + } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "rr_shutdown"); + return GRPC_CHANNEL_SHUTDOWN; + } else if (p->num_transient_failures == + p->num_subchannels) { /* 4) TRANSIENT_FAILURE */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "rr_transient_failure"); + return GRPC_CHANNEL_TRANSIENT_FAILURE; + } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */ + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE, + GRPC_ERROR_NONE, "rr_idle"); + return GRPC_CHANNEL_IDLE; + } + /* no change */ + return sd->curr_connectivity_state; +} + static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; pending_pick *pp; - int unref = 0; - GRPC_ERROR_REF(error); gpr_mu_lock(&p->mu); if (p->shutdown) { - unref = 1; - } else { - switch (sd->connectivity_state) { - case GRPC_CHANNEL_READY: - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_READY, GRPC_ERROR_REF(error), - "connecting_ready"); - /* add the newly connected subchannel to the list of connected ones. - * Note that it goes to the "end of the line". */ - sd->ready_list_node = add_connected_sc_locked(p, sd); - /* at this point we know there's at least one suitable subchannel. Go - * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - ready_list *selected = peek_next_connected_locked(p); - GPR_ASSERT(selected != NULL); - if (p->pending_picks != NULL) { - /* if the selected subchannel is going to be used for the pending - * picks, update the last picked pointer */ - advance_last_picked_locked(p); + gpr_mu_unlock(&p->mu); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); + GRPC_ERROR_UNREF(error); + return; + } + switch (sd->curr_connectivity_state) { + case GRPC_CHANNEL_INIT: + GPR_UNREACHABLE_CODE(return ); + case GRPC_CHANNEL_READY: + /* add the newly connected subchannel to the list of connected ones. + * Note that it goes to the "end of the line". */ + sd->ready_list_node = add_connected_sc_locked(p, sd); + /* at this point we know there's at least one suitable subchannel. Go + * ahead and pick one and notify the pending suitors in + * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ + ready_list *selected = peek_next_connected_locked(p); + GPR_ASSERT(selected != NULL); + if (p->pending_picks != NULL) { + /* if the selected subchannel is going to be used for the pending + * picks, update the last picked pointer */ + advance_last_picked_locked(p); + } + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(selected->subchannel), + "rr_picked"); + if (pp->user_data != NULL) { + *pp->user_data = selected->user_data; } - + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, + "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", + (void *)selected->subchannel, (void *)selected); + } + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + gpr_free(pp); + } + update_lb_connectivity_status(exec_ctx, sd, error); + sd->prev_connectivity_state = sd->curr_connectivity_state; + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + break; + case GRPC_CHANNEL_IDLE: + ++p->num_idle; + /* fallthrough */ + case GRPC_CHANNEL_CONNECTING: + update_state_counters(sd); + update_lb_connectivity_status(exec_ctx, sd, error); + sd->prev_connectivity_state = sd->curr_connectivity_state; + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + ++p->num_transient_failures; + /* remove from ready list if still present */ + if (sd->ready_list_node != NULL) { + remove_disconnected_sc_locked(p, sd->ready_list_node); + sd->ready_list_node = NULL; + } + update_lb_connectivity_status(exec_ctx, sd, error); + sd->prev_connectivity_state = sd->curr_connectivity_state; + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + break; + case GRPC_CHANNEL_SHUTDOWN: + update_state_counters(sd); + if (sd->ready_list_node != NULL) { + remove_disconnected_sc_locked(p, sd->ready_list_node); + sd->ready_list_node = NULL; + } + --p->num_subchannels; + GPR_SWAP(subchannel_data *, p->subchannels[sd->index], + p->subchannels[p->num_subchannels]); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); + p->subchannels[sd->index]->index = sd->index; + if (update_lb_connectivity_status(exec_ctx, sd, error) == + GRPC_CHANNEL_SHUTDOWN) { + /* the policy is shutting down. Flush all the pending picks... */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected->subchannel), - "picked"); - if (pp->user_data != NULL) { - *pp->user_data = selected->user_data; - } - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, - "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - (void *)selected->subchannel, (void *)selected); - } + *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, sd->connectivity_state, - GRPC_ERROR_REF(error), "connecting_changed"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - /* renew state notification */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - - /* remove from ready list if still present */ - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "connecting_transient_failure"); - break; - case GRPC_CHANNEL_SHUTDOWN: - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - - p->num_subchannels--; - GPR_SWAP(subchannel_data *, p->subchannels[sd->index], - p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); - p->subchannels[sd->index]->index = sd->index; - gpr_free(sd); - - unref = 1; - if (p->num_subchannels == 0) { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE_REFERENCING("Round Robin Channels Exhausted", - &error, 1), - "no_more_channels"); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, - NULL); - gpr_free(pp); - } - } else { - grpc_connectivity_state_set( - exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "subchannel_failed"); - } - } /* switch */ - } /* !unref */ - - gpr_mu_unlock(&p->mu); - - if (unref) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "round_robin_connectivity"); + } + gpr_free(sd); + /* unref the "rr_connectivity" weak ref from start_picking */ + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); + break; } - + gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } @@ -607,9 +679,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), - "picked"); + "rr_picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "picked"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); } else { gpr_mu_unlock(&p->mu); grpc_exec_ctx_sched(exec_ctx, closure, @@ -705,6 +777,11 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels", + (void *)p, (unsigned long)p->num_subchannels); + } gpr_mu_init(&p->mu); return &p->base; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index fb8fbae0ab..127e1cdc13 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1037,7 +1037,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, "op.send_initial_metadata"); } } else { - s->send_trailing_metadata = NULL; + s->send_initial_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_CREATE( @@ -1523,13 +1523,17 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_error *error) { error = removal_error(error, s, "Pending writes failed due to stream closure"); - s->fetching_send_message = NULL; + s->send_initial_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error), "send_initial_metadata_finished"); + + s->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->send_trailing_metadata_finished, GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); + + s->fetching_send_message = NULL; grpc_chttp2_complete_closure_step( exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), "fetching_send_message_finished"); diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index b9c405158f..5efb49751c 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -471,7 +471,8 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, grpc_mdstr_as_c_string(md->value)); *cached_timeout = gpr_inf_future(GPR_TIMESPAN); } - grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); + cached_timeout = + grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } grpc_chttp2_incoming_metadata_buffer_set_deadline( &s->metadata_buffer[0], |