aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/lb_policy
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-07 19:38:43 -0800
committerGravatar Yash Tibrewal <yashkt@google.com>2017-12-07 19:38:43 -0800
commitd6c292f17779771e665816f8f3a4cf366fc7012e (patch)
treeae779349708ad4896f8a3b5c074e45e581172cba /src/core/ext/filters/client_channel/lb_policy
parent8cf1470a51ea276ca84825e7495d4ee24743540d (diff)
parentc01a91da2d43e858cace8b34119fa35148818458 (diff)
Merge master
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc65
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc10
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc33
3 files changed, 69 insertions, 39 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index eadeea0368..3c64213fb9 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -274,18 +274,30 @@ static void add_pending_pick(pending_pick** root,
typedef struct pending_ping {
struct pending_ping* next;
- /* args for wrapped_notify */
- wrapped_rr_closure_arg wrapped_notify_arg;
+ /* args for sending the ping */
+ wrapped_rr_closure_arg* on_initiate;
+ wrapped_rr_closure_arg* on_ack;
} pending_ping;
-static void add_pending_ping(pending_ping** root, grpc_closure* notify) {
+static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
- pping->wrapped_notify_arg.wrapped_closure = notify;
- pping->wrapped_notify_arg.free_when_done = pping;
+ if (on_initiate != nullptr) {
+ pping->on_initiate =
+ (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate));
+ pping->on_initiate->wrapped_closure = on_initiate;
+ pping->on_initiate->free_when_done = pping->on_initiate;
+ GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure,
+ &pping->on_initiate, grpc_schedule_on_exec_ctx);
+ }
+ if (on_ack != nullptr) {
+ pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack));
+ pping->on_ack->wrapped_closure = on_ack;
+ pping->on_ack->free_when_done = pping->on_ack;
+ GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure,
+ &pping->on_ack, grpc_schedule_on_exec_ctx);
+ }
pping->next = *root;
- GRPC_CLOSURE_INIT(&pping->wrapped_notify_arg.wrapper_closure,
- wrapped_rr_closure, &pping->wrapped_notify_arg,
- grpc_schedule_on_exec_ctx);
*root = pping;
}
@@ -815,14 +827,24 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
pending_ping* pping;
while ((pping = glb_policy->pending_pings)) {
glb_policy->pending_pings = pping->next;
- GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
- pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
+ grpc_closure* on_initiate = nullptr;
+ grpc_closure* on_ack = nullptr;
+ if (pping->on_initiate != nullptr) {
+ GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
+ pping->on_initiate->rr_policy = glb_policy->rr_policy;
+ on_initiate = &pping->on_initiate->wrapper_closure;
+ }
+ if (pping->on_ack != nullptr) {
+ GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
+ pping->on_ack->rr_policy = glb_policy->rr_policy;
+ on_ack = &pping->on_ack->wrapper_closure;
+ }
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
glb_policy, glb_policy->rr_policy);
}
- grpc_lb_policy_ping_one_locked(glb_policy->rr_policy,
- &pping->wrapped_notify_arg.wrapper_closure);
+ grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
+ gpr_free(pping);
}
}
@@ -1037,8 +1059,16 @@ static void glb_shutdown_locked(grpc_lb_policy* pol) {
while (pping != nullptr) {
pending_ping* next = pping->next;
- GRPC_CLOSURE_SCHED(&pping->wrapped_notify_arg.wrapper_closure,
- GRPC_ERROR_REF(error));
+ if (pping->on_initiate != nullptr) {
+ GRPC_CLOSURE_SCHED(&pping->on_initiate->wrapper_closure,
+ GRPC_ERROR_REF(error));
+ gpr_free(pping->on_initiate);
+ }
+ if (pping->on_ack != nullptr) {
+ GRPC_CLOSURE_SCHED(&pping->on_ack->wrapper_closure,
+ GRPC_ERROR_REF(error));
+ gpr_free(pping->on_ack);
+ }
gpr_free(pping);
pping = next;
}
@@ -1229,12 +1259,13 @@ static grpc_connectivity_state glb_check_connectivity_locked(
connectivity_error);
}
-static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* closure) {
+static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
if (glb_policy->rr_policy) {
- grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, closure);
+ grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
} else {
- add_pending_ping(&glb_policy->pending_pings, closure);
+ add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack);
if (!glb_policy->started_picking) {
start_picking_locked(glb_policy);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 5e75b64843..0861261359 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -221,12 +221,16 @@ static void pf_notify_on_state_change_locked(grpc_lb_policy* pol,
notify);
}
-static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* closure) {
+static void pf_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
if (p->selected) {
- grpc_connected_subchannel_ping(p->selected->connected_subchannel, closure);
+ grpc_connected_subchannel_ping(p->selected->connected_subchannel,
+ on_initiate, on_ack);
} else {
- GRPC_CLOSURE_SCHED(closure,
+ GRPC_CLOSURE_SCHED(on_initiate,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
+ GRPC_CLOSURE_SCHED(on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 6958b72693..b0c84017df 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -361,21 +361,16 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
* CHECK: subchannel_list->num_shutdown ==
* subchannel_list->num_subchannels.
*
- * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
+ * 4) RULE: ALL subchannels are SHUTDOWN or TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
- * CHECK: subchannel_list->num_transient_failures ==
+ * CHECK: subchannel_list->num_shutdown +
+ * subchannel_list->num_transient_failures ==
* subchannel_list->num_subchannels.
- *
- * 5) RULE: ALL subchannels are IDLE => policy is IDLE.
- * CHECK: subchannel_list->num_idle == subchannel_list->num_subchannels.
- * (Note that all the subchannels will transition from IDLE to CONNECTING
- * in batch when we start trying to connect.)
*/
- // TODO(juanlishen): if the subchannel states are mixed by {SHUTDOWN,
- // TRANSIENT_FAILURE}, we don't change the state. We may want to improve on
- // this.
+ // TODO(juanlishen): For rule 4, we may want to re-resolve instead.
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy;
+ GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE);
if (subchannel_list->num_ready > 0) {
/* 1) READY */
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
@@ -393,16 +388,13 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd,
p->started_picking = false;
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace,
GRPC_ERROR_NONE);
- } else if (subchannel_list->num_transient_failures ==
+ } else if (subchannel_list->num_shutdown +
+ subchannel_list->num_transient_failures ==
subchannel_list->num_subchannels) {
/* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
- } else if (subchannel_list->num_idle == subchannel_list->num_subchannels) {
- /* 5) IDLE */
- grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_IDLE,
- GRPC_ERROR_NONE, "rr_idle");
}
GRPC_ERROR_UNREF(error);
}
@@ -539,7 +531,8 @@ static void rr_notify_on_state_change_locked(grpc_lb_policy* pol,
notify);
}
-static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* closure) {
+static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
+ grpc_closure* on_ack) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
if (next_ready_index < p->subchannel_list->num_subchannels) {
@@ -547,11 +540,13 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* closure) {
&p->subchannel_list->subchannels[next_ready_index];
grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
selected->connected_subchannel, "rr_ping");
- grpc_connected_subchannel_ping(target, closure);
+ grpc_connected_subchannel_ping(target, on_initiate, on_ack);
GRPC_CONNECTED_SUBCHANNEL_UNREF(target, "rr_ping");
} else {
- GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Round Robin not connected"));
+ GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Round Robin not connected"));
+ GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Round Robin not connected"));
}
}