diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
3 files changed, 69 insertions, 39 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index eadeea0368..3c64213fb9 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -274,18 +274,30 @@ static void add_pending_pick(pending_pick** root, typedef struct pending_ping { struct pending_ping* next; - /* args for wrapped_notify */ - wrapped_rr_closure_arg wrapped_notify_arg; + /* args for sending the ping */ + wrapped_rr_closure_arg* on_initiate; + wrapped_rr_closure_arg* on_ack; } pending_ping; -static void add_pending_ping(pending_ping** root, grpc_closure* notify) { +static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate, + grpc_closure* on_ack) { pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping)); - pping->wrapped_notify_arg.wrapped_closure = notify; - pping->wrapped_notify_arg.free_when_done = pping; + if (on_initiate != nullptr) { + pping->on_initiate = + (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate)); + pping->on_initiate->wrapped_closure = on_initiate; + pping->on_initiate->free_when_done = pping->on_initiate; + GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure, + &pping->on_initiate, grpc_schedule_on_exec_ctx); + } + if (on_ack != nullptr) { + pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack)); + pping->on_ack->wrapped_closure = on_ack; + pping->on_ack->free_when_done = pping->on_ack; + GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure, + &pping->on_ack, grpc_schedule_on_exec_ctx); + } pping->next = *root; - GRPC_CLOSURE_INIT(&pping->wrapped_notify_arg.wrapper_closure, - wrapped_rr_closure, &pping->wrapped_notify_arg, - grpc_schedule_on_exec_ctx); *root = pping; } @@ -815,14 +827,24 @@ static void create_rr_locked(glb_lb_policy* glb_policy, pending_ping* pping; while ((pping = glb_policy->pending_pings)) { glb_policy->pending_pings = pping->next; - GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); - pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy; + grpc_closure* on_initiate = nullptr; + grpc_closure* on_ack = nullptr; + if (pping->on_initiate != nullptr) { + GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); + pping->on_initiate->rr_policy = glb_policy->rr_policy; + on_initiate = &pping->on_initiate->wrapper_closure; + } + if (pping->on_ack != nullptr) { + GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); + pping->on_ack->rr_policy = glb_policy->rr_policy; + on_ack = &pping->on_ack->wrapper_closure; + } if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", glb_policy, glb_policy->rr_policy); } - grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, - &pping->wrapped_notify_arg.wrapper_closure); + grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); + gpr_free(pping); } } @@ -1037,8 +1059,16 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) { while (pping != nullptr) { pending_ping* next = pping->next; - GRPC_CLOSURE_SCHED(&pping->wrapped_notify_arg.wrapper_closure, - GRPC_ERROR_REF(error)); + if (pping->on_initiate != nullptr) { + GRPC_CLOSURE_SCHED(&pping->on_initiate->wrapper_closure, + GRPC_ERROR_REF(error)); + gpr_free(pping->on_initiate); + } + if (pping->on_ack != nullptr) { + GRPC_CLOSURE_SCHED(&pping->on_ack->wrapper_closure, + GRPC_ERROR_REF(error)); + gpr_free(pping->on_ack); + } gpr_free(pping); pping = next; } @@ -1229,12 +1259,13 @@ static grpc_connectivity_state glb_check_connectivity_locked( connectivity_error); } -static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* closure) { +static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, + grpc_closure* on_ack) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; if (glb_policy->rr_policy) { - grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, closure); + grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack); } else { - add_pending_ping(&glb_policy->pending_pings, closure); + add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack); if (!glb_policy->started_picking) { start_picking_locked(glb_policy); } diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 5e75b64843..0861261359 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -221,12 +221,16 @@ static void pf_notify_on_state_change_locked(grpc_lb_policy* pol, notify); } -static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* closure) { +static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, + grpc_closure* on_ack) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; if (p->selected) { - grpc_connected_subchannel_ping(p->selected->connected_subchannel, closure); + grpc_connected_subchannel_ping(p->selected->connected_subchannel, + on_initiate, on_ack); } else { - GRPC_CLOSURE_SCHED(closure, + GRPC_CLOSURE_SCHED(on_initiate, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); + GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 6958b72693..b0c84017df 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -361,21 +361,16 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, * CHECK: subchannel_list->num_shutdown == * subchannel_list->num_subchannels. * - * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is + * 4) RULE: ALL subchannels are SHUTDOWN or TRANSIENT_FAILURE => policy is * TRANSIENT_FAILURE. - * CHECK: subchannel_list->num_transient_failures == + * CHECK: subchannel_list->num_shutdown + + * subchannel_list->num_transient_failures == * subchannel_list->num_subchannels. - * - * 5) RULE: ALL subchannels are IDLE => policy is IDLE. - * CHECK: subchannel_list->num_idle == subchannel_list->num_subchannels. - * (Note that all the subchannels will transition from IDLE to CONNECTING - * in batch when we start trying to connect.) */ - // TODO(juanlishen): if the subchannel states are mixed by {SHUTDOWN, - // TRANSIENT_FAILURE}, we don't change the state. We may want to improve on - // this. + // TODO(juanlishen): For rule 4, we may want to re-resolve instead. grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy; + GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE); if (subchannel_list->num_ready > 0) { /* 1) READY */ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, @@ -393,16 +388,13 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, p->started_picking = false; grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, GRPC_ERROR_NONE); - } else if (subchannel_list->num_transient_failures == + } else if (subchannel_list->num_shutdown + + subchannel_list->num_transient_failures == subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "rr_transient_failure"); - } else if (subchannel_list->num_idle == subchannel_list->num_subchannels) { - /* 5) IDLE */ - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE, - GRPC_ERROR_NONE, "rr_idle"); } GRPC_ERROR_UNREF(error); } @@ -539,7 +531,8 @@ static void rr_notify_on_state_change_locked(grpc_lb_policy* pol, notify); } -static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* closure) { +static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, + grpc_closure* on_ack) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); if (next_ready_index < p->subchannel_list->num_subchannels) { @@ -547,11 +540,13 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* closure) { &p->subchannel_list->subchannels[next_ready_index]; grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( selected->connected_subchannel, "rr_ping"); - grpc_connected_subchannel_ping(target, closure); + grpc_connected_subchannel_ping(target, on_initiate, on_ack); GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping"); } else { - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Round Robin not connected")); + GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Round Robin not connected")); + GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Round Robin not connected")); } } |