diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-12-10 14:36:02 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-12-10 14:36:02 -0800 |
commit | a75ae6ee8aebd6db002d68f0b8e56f653d3de1f7 (patch) | |
tree | c297ca3c1b2f89c8d14114517f0a3552188301ed /src/core/client_config/lb_policies/round_robin.c | |
parent | 7908f16b4575c98fc0339cdcdd25a0258763c396 (diff) | |
parent | 4b6974ca8cd8cfc967d25980e9285f97409babb7 (diff) |
Merge github.com:grpc/grpc into census_suite
Diffstat (limited to 'src/core/client_config/lb_policies/round_robin.c')
-rw-r--r-- | src/core/client_config/lb_policies/round_robin.c | 240 |
1 files changed, 92 insertions, 148 deletions
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 1ffe32fff2..b86dba20ee 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -38,6 +38,8 @@ #include <grpc/support/alloc.h> #include "src/core/transport/connectivity_state.h" +typedef struct round_robin_lb_policy round_robin_lb_policy; + int grpc_lb_round_robin_trace = 0; /** List of entities waiting for a pick. @@ -46,7 +48,7 @@ int grpc_lb_round_robin_trace = 0; typedef struct pending_pick { struct pending_pick *next; grpc_pollset *pollset; - grpc_subchannel **target; + grpc_connected_subchannel **target; grpc_closure *on_complete; } pending_pick; @@ -58,22 +60,27 @@ typedef struct ready_list { } ready_list; typedef struct { - size_t subchannel_idx; /**< Index over p->subchannels */ - void *p; /**< round_robin_lb_policy instance */ -} connectivity_changed_cb_arg; - -typedef struct { + /** index within policy->subchannels */ + size_t index; + /** backpointer to owning policy */ + round_robin_lb_policy *policy; + /** subchannel itself */ + grpc_subchannel *subchannel; + /** notification that connectivity has changed on subchannel */ + grpc_closure connectivity_changed_closure; + /** this subchannels current position in subchannel->ready_list */ + ready_list *ready_list_node; + /** last observed connectivity */ + grpc_connectivity_state connectivity_state; +} subchannel_data; + +struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; /** all our subchannels */ - grpc_subchannel **subchannels; size_t num_subchannels; - - /** Callbacks, one per subchannel being watched, to be called when their - * respective connectivity changes */ - grpc_closure *connectivity_changed_cbs; - connectivity_changed_cb_arg *cb_args; + subchannel_data **subchannels; /** mutex protecting remaining members */ gpr_mu mu; @@ -81,8 +88,6 @@ typedef struct { int started_picking; /** are we shutting down? */ int shutdown; - /** Connectivity state of the subchannels being watched */ - grpc_connectivity_state *subchannel_connectivity; /** List of picks that are waiting on connectivity */ pending_pick *pending_picks; @@ -93,13 +98,7 @@ typedef struct { ready_list ready_list; /** Last pick from the ready list. */ ready_list *ready_list_last_pick; - - /** Subchannel index to ready_list node. - * - * Kept in order to remove nodes from the ready list associated with a - * subchannel */ - ready_list **subchannel_index_to_readylist_node; -} round_robin_lb_policy; +}; /** Returns the next subchannel from the connected list or NULL if the list is * empty. @@ -144,9 +143,9 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) { /** Prepends (relative to the root at p->ready_list) the connected subchannel \a * csc to the list of ready subchannels. */ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, - grpc_subchannel *csc) { + grpc_subchannel *sc) { ready_list *new_elem = gpr_malloc(sizeof(ready_list)); - new_elem->subchannel = csc; + new_elem->subchannel = sc; if (p->ready_list.prev == NULL) { /* first element */ new_elem->next = &p->ready_list; @@ -160,7 +159,7 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, p->ready_list.prev = new_elem; } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc); + gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc); } return new_elem; } @@ -200,28 +199,15 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, gpr_free(node); } -static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx, - round_robin_lb_policy *p, - const size_t subchannel_idx) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[subchannel_idx], pp->pollset); - } -} - void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; size_t i; ready_list *elem; for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(exec_ctx, p, i); + subchannel_data *sd = p->subchannels[i]; + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); + gpr_free(sd); } - for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "round_robin"); - } - gpr_free(p->connectivity_changed_cbs); - gpr_free(p->subchannel_connectivity); grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); @@ -237,20 +223,15 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_free(elem); elem = tmp; } - gpr_free(p->subchannel_index_to_readylist_node); - gpr_free(p->cb_args); gpr_free(p); } void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; - gpr_mu_lock(&p->mu); + size_t i; - for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(exec_ctx, p, i); - } + gpr_mu_lock(&p->mu); p->shutdown = 1; while ((pp = p->pending_picks)) { @@ -261,24 +242,26 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); + for (i = 0; i < p->num_subchannels; i++) { + subchannel_data *sd = p->subchannels[i]; + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, + &sd->connectivity_changed_closure); + } gpr_mu_unlock(&p->mu); } static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_subchannel **target) { + grpc_connected_subchannel **target) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; - size_t i; gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], - pp->pollset); - } + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -295,12 +278,16 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { size_t i; p->started_picking = 1; + gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p, + p->num_subchannels); + for (i = 0; i < p->num_subchannels; i++) { - p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; - grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i], - &p->subchannel_connectivity[i], - &p->connectivity_changed_cbs[i]); - GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity"); + subchannel_data *sd = p->subchannels[i]; + sd->connectivity_state = GRPC_CHANNEL_IDLE; + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); + GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); } } @@ -314,18 +301,18 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_closure *on_complete) { - size_t i; + grpc_metadata_batch *initial_metadata, + grpc_connected_subchannel **target, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; ready_list *selected; gpr_mu_lock(&p->mu); if ((selected = peek_next_connected_locked(p))) { gpr_mu_unlock(&p->mu); - *target = selected->subchannel; + *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)", + gpr_log(GPR_DEBUG, + "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } /* only advance the last picked pointer if the selection was used */ @@ -335,10 +322,8 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], - pollset); - } + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, + pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -352,33 +337,25 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { - connectivity_changed_cb_arg *cb_arg = arg; - round_robin_lb_policy *p = cb_arg->p; - /* index over p->subchannels of this cb's subchannel */ - const size_t this_idx = cb_arg->subchannel_idx; + subchannel_data *sd = arg; + round_robin_lb_policy *p = sd->policy; pending_pick *pp; ready_list *selected; int unref = 0; - /* connectivity state of this cb's subchannel */ - grpc_connectivity_state *this_connectivity; - gpr_mu_lock(&p->mu); - this_connectivity = &p->subchannel_connectivity[this_idx]; - if (p->shutdown) { unref = 1; } else { - switch (*this_connectivity) { + switch (sd->connectivity_state) { case GRPC_CHANNEL_READY: grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); /* add the newly connected subchannel to the list of connected ones. * Note that it goes to the "end of the line". */ - p->subchannel_index_to_readylist_node[this_idx] = - add_connected_sc_locked(p, p->subchannels[this_idx]); + sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel); /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ @@ -390,60 +367,60 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = selected->subchannel; + *pp->target = + grpc_subchannel_get_connected_subchannel(selected->subchannel); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_subchannel_del_interested_party(exec_ctx, selected->subchannel, - pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, - &p->connectivity_changed_cbs[this_idx]); + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - *this_connectivity, "connecting_changed"); + sd->connectivity_state, + "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, - &p->connectivity_changed_cbs[this_idx]); + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: - del_interested_parties_locked(exec_ctx, p, this_idx); /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, - &p->connectivity_changed_cbs[this_idx]); + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); /* remove from ready list if still present */ - if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { - remove_disconnected_sc_locked( - p, p->subchannel_index_to_readylist_node[this_idx]); - p->subchannel_index_to_readylist_node[this_idx] = NULL; + if (sd->ready_list_node != NULL) { + remove_disconnected_sc_locked(p, sd->ready_list_node); + sd->ready_list_node = NULL; } grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure"); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(exec_ctx, p, this_idx); - if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { - remove_disconnected_sc_locked( - p, p->subchannel_index_to_readylist_node[this_idx]); - p->subchannel_index_to_readylist_node[this_idx] = NULL; + if (sd->ready_list_node != NULL) { + remove_disconnected_sc_locked(p, sd->ready_list_node); + sd->ready_list_node = NULL; } - GPR_SWAP(grpc_subchannel *, p->subchannels[this_idx], - p->subchannels[p->num_subchannels - 1]); p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], - "round_robin"); + GPR_SWAP(subchannel_data *, p->subchannels[sd->index], + p->subchannels[p->num_subchannels]); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); + p->subchannels[sd->index]->index = sd->index; + gpr_free(sd); + unref = 1; if (p->num_subchannels == 0) { grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, @@ -454,7 +431,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } - unref = 1; } else { grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, @@ -466,31 +442,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&p->mu); if (unref) { - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "round_robin_connectivity"); - } -} - -static void rr_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_transport_op *op) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - size_t i; - size_t n; - grpc_subchannel **subchannels; - - gpr_mu_lock(&p->mu); - n = p->num_subchannels; - subchannels = gpr_malloc(n * sizeof(*subchannels)); - for (i = 0; i < n; i++) { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "round_robin_connectivity"); } - gpr_mu_unlock(&p->mu); - - for (i = 0; i < n; i++) { - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "rr_broadcast"); - } - gpr_free(subchannels); } static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, @@ -516,7 +469,7 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle, - rr_broadcast, rr_check_connectivity, rr_notify_on_state_change}; + rr_check_connectivity, rr_notify_on_state_change}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} @@ -529,27 +482,22 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, GPR_ASSERT(args->num_subchannels > 0); memset(p, 0, sizeof(*p)); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); - p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); p->num_subchannels = args->num_subchannels; + p->subchannels = gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels); + memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_subchannels); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); - memcpy(p->subchannels, args->subchannels, - sizeof(grpc_subchannel *) * args->num_subchannels); gpr_mu_init(&p->mu); - p->connectivity_changed_cbs = - gpr_malloc(sizeof(grpc_closure) * args->num_subchannels); - p->subchannel_connectivity = - gpr_malloc(sizeof(grpc_connectivity_state) * args->num_subchannels); - - p->cb_args = - gpr_malloc(sizeof(connectivity_changed_cb_arg) * args->num_subchannels); for (i = 0; i < args->num_subchannels; i++) { - p->cb_args[i].subchannel_idx = i; - p->cb_args[i].p = p; - grpc_closure_init(&p->connectivity_changed_cbs[i], rr_connectivity_changed, - &p->cb_args[i]); + subchannel_data *sd = gpr_malloc(sizeof(*sd)); + memset(sd, 0, sizeof(*sd)); + p->subchannels[i] = sd; + sd->policy = p; + sd->index = i; + sd->subchannel = args->subchannels[i]; + grpc_closure_init(&sd->connectivity_changed_closure, + rr_connectivity_changed, sd); } /* The (dummy node) root of the ready list */ @@ -558,10 +506,6 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, p->ready_list.next = NULL; p->ready_list_last_pick = &p->ready_list; - p->subchannel_index_to_readylist_node = - gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); - memset(p->subchannel_index_to_readylist_node, 0, - sizeof(grpc_subchannel *) * args->num_subchannels); return &p->base; } |