diff options
author | Yuchen Zeng <zyc@google.com> | 2017-12-06 13:24:27 -0800 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2017-12-06 13:24:27 -0800 |
commit | 625a5c05456a3001ecd4519b133aecd61e5b333b (patch) | |
tree | f0e470a0be1049f751e5f197ea2101c41912cbb7 /src/core | |
parent | c272dd73aad1273b70fbd038efb02d6f0e169b60 (diff) |
Fix pending pings in grpclb
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 49 |
1 files changed, 41 insertions, 8 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index a6972b850f..704cbd8a3b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -276,15 +276,28 @@ typedef struct pending_ping { struct pending_ping* next; /* args for sending the ping */ - grpc_closure* on_initiate; - grpc_closure* on_ack; + wrapped_rr_closure_arg* on_initiate; + wrapped_rr_closure_arg* on_ack; } pending_ping; static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate, grpc_closure* on_ack) { pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping)); - pping->on_initiate = on_initiate; - pping->on_ack = on_ack; + if (on_initiate != nullptr) { + pping->on_initiate = + (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate)); + pping->on_initiate->wrapped_closure = on_initiate; + pping->on_initiate->free_when_done = pping->on_initiate; + GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure, + &pping->on_initiate, grpc_schedule_on_exec_ctx); + } + if (on_ack != nullptr) { + pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack)); + pping->on_ack->wrapped_closure = on_ack; + pping->on_ack->free_when_done = pping->on_ack; + GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure, + &pping->on_ack, grpc_schedule_on_exec_ctx); + } pping->next = *root; *root = pping; } @@ -821,12 +834,24 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, pending_ping* pping; while ((pping = glb_policy->pending_pings)) { glb_policy->pending_pings = pping->next; + grpc_closure* on_initiate = nullptr; + grpc_closure* on_ack = nullptr; + if (pping->on_initiate != nullptr) { + GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); + pping->on_initiate->rr_policy = glb_policy->rr_policy; + on_initiate = &pping->on_initiate->wrapper_closure; + } + if (pping->on_ack != nullptr) { + GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); + pping->on_ack->rr_policy = glb_policy->rr_policy; + on_ack = &pping->on_ack->wrapper_closure; + } if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", glb_policy, glb_policy->rr_policy); } - grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, - pping->on_initiate, pping->on_ack); + grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, on_initiate, + on_ack); gpr_free(pping); } } @@ -1050,8 +1075,16 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { while (pping != nullptr) { pending_ping* next = pping->next; - GRPC_CLOSURE_SCHED(exec_ctx, pping->on_initiate, GRPC_ERROR_REF(error)); - GRPC_CLOSURE_SCHED(exec_ctx, pping->on_ack, GRPC_ERROR_REF(error)); + if (pping->on_initiate != nullptr) { + GRPC_CLOSURE_SCHED(exec_ctx, &pping->on_initiate->wrapper_closure, + GRPC_ERROR_REF(error)); + gpr_free(pping->on_initiate); + } + if (pping->on_ack != nullptr) { + GRPC_CLOSURE_SCHED(exec_ctx, &pping->on_ack->wrapper_closure, + GRPC_ERROR_REF(error)); + gpr_free(pping->on_ack); + } gpr_free(pping); pping = next; } |