diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc | 91 |
1 files changed, 51 insertions, 40 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 60385272cf..9ff40aa53c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -31,6 +31,15 @@ grpc_core::TraceFlag grpc_lb_pick_first_trace(false, "pick_first"); +namespace { +struct pending_pick { + struct pending_pick* next; + uint32_t initial_metadata_flags; + grpc_connected_subchannel** target; + grpc_closure* on_complete; +}; +} // namespace + typedef struct { /** base policy: must be first */ grpc_lb_policy base; @@ -45,7 +54,7 @@ typedef struct { /** are we shut down? */ bool shutdown; /** list of picks that are waiting on connectivity */ - grpc_lb_policy_pick_state* pending_picks; + pending_pick* pending_picks; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; @@ -63,27 +72,19 @@ static void pf_destroy(grpc_lb_policy* pol) { } } -static void pf_shutdown_locked(grpc_lb_policy* pol, - grpc_lb_policy* new_policy) { +static void pf_shutdown_locked(grpc_lb_policy* pol) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p); } p->shutdown = true; - grpc_lb_policy_pick_state* pick; - while ((pick = p->pending_picks) != nullptr) { - p->pending_picks = pick->next; - if (new_policy != nullptr) { - // Hand off to new LB policy. - if (grpc_lb_policy_pick_locked(new_policy, pick)) { - // Synchronous return, schedule closure. - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); - } - } else { - pick->connected_subchannel = nullptr; - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); - } + pending_pick* pp; + while ((pp = p->pending_picks) != nullptr) { + p->pending_picks = pp->next; + *pp->target = nullptr; + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_REF(error)); + gpr_free(pp); } grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "shutdown"); @@ -103,18 +104,19 @@ static void pf_shutdown_locked(grpc_lb_policy* pol, } static void pf_cancel_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick, + grpc_connected_subchannel** target, grpc_error* error) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; - grpc_lb_policy_pick_state* pp = p->pending_picks; + pending_pick* pp = p->pending_picks; p->pending_picks = nullptr; while (pp != nullptr) { - grpc_lb_policy_pick_state* next = pp->next; - if (pp == pick) { - pick->connected_subchannel = nullptr; - GRPC_CLOSURE_SCHED(pick->on_complete, + pending_pick* next = pp->next; + if (pp->target == target) { + *target = nullptr; + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); + gpr_free(pp); } else { pp->next = p->pending_picks; p->pending_picks = pp; @@ -129,20 +131,21 @@ static void pf_cancel_picks_locked(grpc_lb_policy* pol, uint32_t initial_metadata_flags_eq, grpc_error* error) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; - grpc_lb_policy_pick_state* pick = p->pending_picks; + pending_pick* pp = p->pending_picks; p->pending_picks = nullptr; - while (pick != nullptr) { - grpc_lb_policy_pick_state* next = pick->next; - if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == + while (pp != nullptr) { + pending_pick* next = pp->next; + if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - GRPC_CLOSURE_SCHED(pick->on_complete, + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); + gpr_free(pp); } else { - pick->next = p->pending_picks; - p->pending_picks = pick; + pp->next = p->pending_picks; + p->pending_picks = pp; } - pick = next; + pp = next; } GRPC_ERROR_UNREF(error); } @@ -172,20 +175,27 @@ static void pf_exit_idle_locked(grpc_lb_policy* pol) { } static int pf_pick_locked(grpc_lb_policy* pol, - grpc_lb_policy_pick_state* pick) { + const grpc_lb_policy_pick_args* pick_args, + grpc_connected_subchannel** target, + grpc_call_context_element* context, void** user_data, + grpc_closure* on_complete) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; // If we have a selected subchannel already, return synchronously. if (p->selected != nullptr) { - pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( - p->selected->connected_subchannel, "picked"); + *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected->connected_subchannel, + "picked"); return 1; } // No subchannel selected yet, so handle asynchronously. if (!p->started_picking) { start_picking_locked(p); } - pick->next = p->pending_picks; - p->pending_picks = pick; + pending_pick* pp = (pending_pick*)gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->target = target; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->on_complete = on_complete; + p->pending_picks = pp; return 0; } @@ -471,17 +481,18 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { // Drop all other subchannels, since we are now connected. destroy_unselected_subchannels_locked(p); // Update any calls that were waiting for a pick. - grpc_lb_policy_pick_state* pick; - while ((pick = p->pending_picks)) { - p->pending_picks = pick->next; - pick->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF( + pending_pick* pp; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( p->selected->connected_subchannel, "picked"); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Servicing pending pick with selected subchannel %p", (void*)p->selected); } - GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pp->on_complete, GRPC_ERROR_NONE); + gpr_free(pp); } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); |