aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2017-12-06 13:24:27 -0800
committerGravatar Yuchen Zeng <zyc@google.com>2017-12-06 13:24:27 -0800
commit625a5c05456a3001ecd4519b133aecd61e5b333b (patch)
treef0e470a0be1049f751e5f197ea2101c41912cbb7 /src/core
parentc272dd73aad1273b70fbd038efb02d6f0e169b60 (diff)
Fix pending pings in grpclb
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc49
1 files changed, 41 insertions, 8 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index a6972b850f..704cbd8a3b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -276,15 +276,28 @@ typedef struct pending_ping {
struct pending_ping* next;
/* args for sending the ping */
- grpc_closure* on_initiate;
- grpc_closure* on_ack;
+ wrapped_rr_closure_arg* on_initiate;
+ wrapped_rr_closure_arg* on_ack;
} pending_ping;
static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate,
grpc_closure* on_ack) {
pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
- pping->on_initiate = on_initiate;
- pping->on_ack = on_ack;
+ if (on_initiate != nullptr) {
+ pping->on_initiate =
+ (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate));
+ pping->on_initiate->wrapped_closure = on_initiate;
+ pping->on_initiate->free_when_done = pping->on_initiate;
+ GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure,
+ &pping->on_initiate, grpc_schedule_on_exec_ctx);
+ }
+ if (on_ack != nullptr) {
+ pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack));
+ pping->on_ack->wrapped_closure = on_ack;
+ pping->on_ack->free_when_done = pping->on_ack;
+ GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure,
+ &pping->on_ack, grpc_schedule_on_exec_ctx);
+ }
pping->next = *root;
*root = pping;
}
@@ -821,12 +834,24 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
pending_ping* pping;
while ((pping = glb_policy->pending_pings)) {
glb_policy->pending_pings = pping->next;
+ grpc_closure* on_initiate = nullptr;
+ grpc_closure* on_ack = nullptr;
+ if (pping->on_initiate != nullptr) {
+ GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
+ pping->on_initiate->rr_policy = glb_policy->rr_policy;
+ on_initiate = &pping->on_initiate->wrapper_closure;
+ }
+ if (pping->on_ack != nullptr) {
+ GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
+ pping->on_ack->rr_policy = glb_policy->rr_policy;
+ on_ack = &pping->on_ack->wrapper_closure;
+ }
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
glb_policy, glb_policy->rr_policy);
}
- grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy,
- pping->on_initiate, pping->on_ack);
+ grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, on_initiate,
+ on_ack);
gpr_free(pping);
}
}
@@ -1050,8 +1075,16 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
while (pping != nullptr) {
pending_ping* next = pping->next;
- GRPC_CLOSURE_SCHED(exec_ctx, pping->on_initiate, GRPC_ERROR_REF(error));
- GRPC_CLOSURE_SCHED(exec_ctx, pping->on_ack, GRPC_ERROR_REF(error));
+ if (pping->on_initiate != nullptr) {
+ GRPC_CLOSURE_SCHED(exec_ctx, &pping->on_initiate->wrapper_closure,
+ GRPC_ERROR_REF(error));
+ gpr_free(pping->on_initiate);
+ }
+ if (pping->on_ack != nullptr) {
+ GRPC_CLOSURE_SCHED(exec_ctx, &pping->on_ack->wrapper_closure,
+ GRPC_ERROR_REF(error));
+ gpr_free(pping->on_ack);
+ }
gpr_free(pping);
pping = next;
}