diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-22 10:42:19 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-22 10:42:19 -0700 |
commit | 45724b35e411fef7c5da66a74c78428c11d56843 (patch) | |
tree | 9264034aca675c89444e02f72ef58e67d7043604 /src/core/client_config/lb_policies | |
parent | 298751c1195523ef6228595043b583c3a6270e08 (diff) |
indent pass to get logical source lines on one physical line
Diffstat (limited to 'src/core/client_config/lb_policies')
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 463 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.h | 2 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/round_robin.c | 727 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/round_robin.h | 2 |
4 files changed, 627 insertions, 567 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 7557053711..b8991dfaa7 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -39,14 +39,16 @@ #include <grpc/support/alloc.h> #include "src/core/transport/connectivity_state.h" -typedef struct pending_pick { +typedef struct pending_pick +{ struct pending_pick *next; grpc_pollset *pollset; grpc_subchannel **target; grpc_closure *on_complete; } pending_pick; -typedef struct { +typedef struct +{ /** base policy: must be first */ grpc_lb_policy base; /** all our subchannels */ @@ -76,286 +78,303 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -static void del_interested_parties_locked(pick_first_lb_policy *p, - grpc_closure_list *closure_list) { +static void +del_interested_parties_locked (pick_first_lb_policy * p, grpc_closure_list * closure_list) +{ pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], - pp->pollset, closure_list); - } + for (pp = p->pending_picks; pp; pp = pp->next) + { + grpc_subchannel_del_interested_party (p->subchannels[p->checking_subchannel], pp->pollset, closure_list); + } } -static void add_interested_parties_locked(pick_first_lb_policy *p, - grpc_closure_list *closure_list) { +static void +add_interested_parties_locked (pick_first_lb_policy * p, grpc_closure_list * closure_list) +{ pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], - pp->pollset, closure_list); - } + for (pp = p->pending_picks; pp; pp = pp->next) + { + grpc_subchannel_add_interested_party (p->subchannels[p->checking_subchannel], pp->pollset, closure_list); + } } -void pf_destroy(grpc_lb_policy *pol, grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; +void +pf_destroy (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; size_t i; - GPR_ASSERT(p->pending_picks == NULL); - for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first", closure_list); - } - grpc_connectivity_state_destroy(&p->state_tracker, closure_list); - gpr_free(p->subchannels); - gpr_mu_destroy(&p->mu); - gpr_free(p); + GPR_ASSERT (p->pending_picks == NULL); + for (i = 0; i < p->num_subchannels; i++) + { + GRPC_SUBCHANNEL_UNREF (p->subchannels[i], "pick_first", closure_list); + } + grpc_connectivity_state_destroy (&p->state_tracker, closure_list); + gpr_free (p->subchannels); + gpr_mu_destroy (&p->mu); + gpr_free (p); } -void pf_shutdown(grpc_lb_policy *pol, grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; +void +pf_shutdown (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; pending_pick *pp; - gpr_mu_lock(&p->mu); - del_interested_parties_locked(p, closure_list); + gpr_mu_lock (&p->mu); + del_interested_parties_locked (p, closure_list); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, - "shutdown", closure_list); - gpr_mu_unlock(&p->mu); - while (pp != NULL) { - pending_pick *next = pp->next; - *pp->target = NULL; - grpc_closure_list_add(closure_list, pp->on_complete, 1); - gpr_free(pp); - pp = next; - } + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown", closure_list); + gpr_mu_unlock (&p->mu); + while (pp != NULL) + { + pending_pick *next = pp->next; + *pp->target = NULL; + grpc_closure_list_add (closure_list, pp->on_complete, 1); + gpr_free (pp); + pp = next; + } } -static void start_picking(pick_first_lb_policy *p, - grpc_closure_list *closure_list) { +static void +start_picking (pick_first_lb_policy * p, grpc_closure_list * closure_list) +{ p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed, closure_list); + GRPC_LB_POLICY_REF (&p->base, "pick_first_connectivity"); + grpc_subchannel_notify_on_state_change (p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed, closure_list); } -void pf_exit_idle(grpc_lb_policy *pol, grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - gpr_mu_lock(&p->mu); - if (!p->started_picking) { - start_picking(p, closure_list); - } - gpr_mu_unlock(&p->mu); +void +pf_exit_idle (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; + gpr_mu_lock (&p->mu); + if (!p->started_picking) + { + start_picking (p, closure_list); + } + gpr_mu_unlock (&p->mu); } -void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_closure *on_complete, grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; +void +pf_pick (grpc_lb_policy * pol, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; pending_pick *pp; - gpr_mu_lock(&p->mu); - if (p->selected) { - gpr_mu_unlock(&p->mu); - *target = p->selected; - grpc_closure_list_add(closure_list, on_complete, 1); - } else { - if (!p->started_picking) { - start_picking(p, closure_list); + gpr_mu_lock (&p->mu); + if (p->selected) + { + gpr_mu_unlock (&p->mu); + *target = p->selected; + grpc_closure_list_add (closure_list, on_complete, 1); + } + else + { + if (!p->started_picking) + { + start_picking (p, closure_list); + } + grpc_subchannel_add_interested_party (p->subchannels[p->checking_subchannel], pollset, closure_list); + pp = gpr_malloc (sizeof (*pp)); + pp->next = p->pending_picks; + pp->pollset = pollset; + pp->target = target; + pp->on_complete = on_complete; + p->pending_picks = pp; + gpr_mu_unlock (&p->mu); } - grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], - pollset, closure_list); - pp = gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->pollset = pollset; - pp->target = target; - pp->on_complete = on_complete; - p->pending_picks = pp; - gpr_mu_unlock(&p->mu); - } } -static void pf_connectivity_changed(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +pf_connectivity_changed (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ pick_first_lb_policy *p = arg; pending_pick *pp; - gpr_mu_lock(&p->mu); + gpr_mu_lock (&p->mu); - if (p->shutdown) { - gpr_mu_unlock(&p->mu); - GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", closure_list); - return; - } else if (p->selected != NULL) { - grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, - "selected_changed", closure_list); - if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_subchannel_notify_on_state_change( - p->selected, &p->checking_connectivity, &p->connectivity_changed, - closure_list); - } else { - GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", closure_list); + if (p->shutdown) + { + gpr_mu_unlock (&p->mu); + GRPC_LB_POLICY_UNREF (&p->base, "pick_first_connectivity", closure_list); + return; } - } else { - loop: - switch (p->checking_connectivity) { - case GRPC_CHANNEL_READY: - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, - "connecting_ready", closure_list); - p->selected = p->subchannels[p->checking_subchannel]; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = p->selected; - grpc_subchannel_del_interested_party(p->selected, pp->pollset, - closure_list); - grpc_closure_list_add(closure_list, pp->on_complete, 1); - gpr_free(pp); - } - grpc_subchannel_notify_on_state_change( - p->selected, &p->checking_connectivity, &p->connectivity_changed, - closure_list); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - "connecting_transient_failure", closure_list); - del_interested_parties_locked(p, closure_list); - p->checking_subchannel = - (p->checking_subchannel + 1) % p->num_subchannels; - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(p, closure_list); - if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed, closure_list); - } else { - goto loop; - } - break; - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING, - "connecting_changed", closure_list); - grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed, closure_list); - break; - case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(p, closure_list); - GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], - p->subchannels[p->num_subchannels - 1]); - p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first", - closure_list); - if (p->num_subchannels == 0) { - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, - "no_more_channels", closure_list); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_closure_list_add(closure_list, pp->on_complete, 1); - gpr_free(pp); - } - GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", - closure_list); - } else { - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "subchannel_failed", closure_list); - p->checking_subchannel %= p->num_subchannels; - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(p, closure_list); - goto loop; - } + else if (p->selected != NULL) + { + grpc_connectivity_state_set (&p->state_tracker, p->checking_connectivity, "selected_changed", closure_list); + if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) + { + grpc_subchannel_notify_on_state_change (p->selected, &p->checking_connectivity, &p->connectivity_changed, closure_list); + } + else + { + GRPC_LB_POLICY_UNREF (&p->base, "pick_first_connectivity", closure_list); + } + } + else + { + loop: + switch (p->checking_connectivity) + { + case GRPC_CHANNEL_READY: + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready", closure_list); + p->selected = p->subchannels[p->checking_subchannel]; + while ((pp = p->pending_picks)) + { + p->pending_picks = pp->next; + *pp->target = p->selected; + grpc_subchannel_del_interested_party (p->selected, pp->pollset, closure_list); + grpc_closure_list_add (closure_list, pp->on_complete, 1); + gpr_free (pp); + } + grpc_subchannel_notify_on_state_change (p->selected, &p->checking_connectivity, &p->connectivity_changed, closure_list); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure", closure_list); + del_interested_parties_locked (p, closure_list); + p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; + p->checking_connectivity = grpc_subchannel_check_connectivity (p->subchannels[p->checking_subchannel]); + add_interested_parties_locked (p, closure_list); + if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) + { + grpc_subchannel_notify_on_state_change (p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed, closure_list); + } + else + { + goto loop; + } + break; + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE: + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_CONNECTING, "connecting_changed", closure_list); + grpc_subchannel_notify_on_state_change (p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed, closure_list); + break; + case GRPC_CHANNEL_FATAL_FAILURE: + del_interested_parties_locked (p, closure_list); + GPR_SWAP (grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]); + p->num_subchannels--; + GRPC_SUBCHANNEL_UNREF (p->subchannels[p->num_subchannels], "pick_first", closure_list); + if (p->num_subchannels == 0) + { + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "no_more_channels", closure_list); + while ((pp = p->pending_picks)) + { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_closure_list_add (closure_list, pp->on_complete, 1); + gpr_free (pp); + } + GRPC_LB_POLICY_UNREF (&p->base, "pick_first_connectivity", closure_list); + } + else + { + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "subchannel_failed", closure_list); + p->checking_subchannel %= p->num_subchannels; + p->checking_connectivity = grpc_subchannel_check_connectivity (p->subchannels[p->checking_subchannel]); + add_interested_parties_locked (p, closure_list); + goto loop; + } + } } - } - gpr_mu_unlock(&p->mu); + gpr_mu_unlock (&p->mu); } -static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, - grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; +static void +pf_broadcast (grpc_lb_policy * pol, grpc_transport_op * op, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; size_t i; size_t n; grpc_subchannel **subchannels; - gpr_mu_lock(&p->mu); + gpr_mu_lock (&p->mu); n = p->num_subchannels; - subchannels = gpr_malloc(n * sizeof(*subchannels)); - for (i = 0; i < n; i++) { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast"); - } - gpr_mu_unlock(&p->mu); + subchannels = gpr_malloc (n * sizeof (*subchannels)); + for (i = 0; i < n; i++) + { + subchannels[i] = p->subchannels[i]; + GRPC_SUBCHANNEL_REF (subchannels[i], "pf_broadcast"); + } + gpr_mu_unlock (&p->mu); - for (i = 0; i < n; i++) { - grpc_subchannel_process_transport_op(subchannels[i], op, closure_list); - GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast", closure_list); - } - gpr_free(subchannels); + for (i = 0; i < n; i++) + { + grpc_subchannel_process_transport_op (subchannels[i], op, closure_list); + GRPC_SUBCHANNEL_UNREF (subchannels[i], "pf_broadcast", closure_list); + } + gpr_free (subchannels); } -static grpc_connectivity_state pf_check_connectivity( - grpc_lb_policy *pol, grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; +static grpc_connectivity_state +pf_check_connectivity (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; grpc_connectivity_state st; - gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker); - gpr_mu_unlock(&p->mu); + gpr_mu_lock (&p->mu); + st = grpc_connectivity_state_check (&p->state_tracker); + gpr_mu_unlock (&p->mu); return st; } -void pf_notify_on_state_change(grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify, - grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - gpr_mu_lock(&p->mu); - grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, - notify, closure_list); - gpr_mu_unlock(&p->mu); +void +pf_notify_on_state_change (grpc_lb_policy * pol, grpc_connectivity_state * current, grpc_closure * notify, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; + gpr_mu_lock (&p->mu); + grpc_connectivity_state_notify_on_state_change (&p->state_tracker, current, notify, closure_list); + gpr_mu_unlock (&p->mu); } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, - pf_shutdown, - pf_pick, - pf_exit_idle, - pf_broadcast, - pf_check_connectivity, - pf_notify_on_state_change}; + pf_destroy, + pf_shutdown, + pf_pick, + pf_exit_idle, + pf_broadcast, + pf_check_connectivity, + pf_notify_on_state_change +}; -static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} +static void +pick_first_factory_ref (grpc_lb_policy_factory * factory) +{ +} -static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} +static void +pick_first_factory_unref (grpc_lb_policy_factory * factory) +{ +} -static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { - pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); - GPR_ASSERT(args->num_subchannels > 0); - memset(p, 0, sizeof(*p)); - grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); - p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); +static grpc_lb_policy * +create_pick_first (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args) +{ + pick_first_lb_policy *p = gpr_malloc (sizeof (*p)); + GPR_ASSERT (args->num_subchannels > 0); + memset (p, 0, sizeof (*p)); + grpc_lb_policy_init (&p->base, &pick_first_lb_policy_vtable); + p->subchannels = gpr_malloc (sizeof (grpc_subchannel *) * args->num_subchannels); p->num_subchannels = args->num_subchannels; - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, - "pick_first"); - memcpy(p->subchannels, args->subchannels, - sizeof(grpc_subchannel *) * args->num_subchannels); - grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); - gpr_mu_init(&p->mu); + grpc_connectivity_state_init (&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first"); + memcpy (p->subchannels, args->subchannels, sizeof (grpc_subchannel *) * args->num_subchannels); + grpc_closure_init (&p->connectivity_changed, pf_connectivity_changed, p); + gpr_mu_init (&p->mu); return &p->base; } static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = { - pick_first_factory_ref, pick_first_factory_unref, create_pick_first, - "pick_first"}; + pick_first_factory_ref, pick_first_factory_unref, create_pick_first, + "pick_first" +}; static grpc_lb_policy_factory pick_first_lb_policy_factory = { - &pick_first_factory_vtable}; + &pick_first_factory_vtable +}; -grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() { +grpc_lb_policy_factory * +grpc_pick_first_lb_factory_create () +{ return &pick_first_lb_policy_factory; } diff --git a/src/core/client_config/lb_policies/pick_first.h b/src/core/client_config/lb_policies/pick_first.h index 3ca53ad42a..72a4e7c32c 100644 --- a/src/core/client_config/lb_policies/pick_first.h +++ b/src/core/client_config/lb_policies/pick_first.h @@ -38,6 +38,6 @@ /** Returns a load balancing factory for the pick first policy, which picks up * the first subchannel from \a subchannels to succesfully connect */ -grpc_lb_policy_factory *grpc_pick_first_lb_factory_create(); +grpc_lb_policy_factory *grpc_pick_first_lb_factory_create (); #endif diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index b4139719f4..2e64da9e10 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -43,7 +43,8 @@ int grpc_lb_round_robin_trace = 0; /** List of entities waiting for a pick. * * Once a pick is available, \a target is updated and \a on_complete called. */ -typedef struct pending_pick { +typedef struct pending_pick +{ struct pending_pick *next; grpc_pollset *pollset; grpc_subchannel **target; @@ -51,18 +52,21 @@ typedef struct pending_pick { } pending_pick; /** List of subchannels in a connectivity READY state */ -typedef struct ready_list { +typedef struct ready_list +{ grpc_subchannel *subchannel; struct ready_list *next; struct ready_list *prev; } ready_list; -typedef struct { +typedef struct +{ size_t subchannel_idx; /**< Index over p->subchannels */ - void *p; /**< round_robin_lb_policy instance */ + void *p; /**< round_robin_lb_policy instance */ } connectivity_changed_cb_arg; -typedef struct { +typedef struct +{ /** base policy: must be first */ grpc_lb_policy base; @@ -106,225 +110,264 @@ typedef struct { * * Note that this function does *not* advance p->ready_list_last_pick. Use \a * advance_last_picked_locked() for that. */ -static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) { +static ready_list * +peek_next_connected_locked (const round_robin_lb_policy * p) +{ ready_list *selected; selected = p->ready_list_last_pick->next; - while (selected != NULL) { - if (selected == &p->ready_list) { - GPR_ASSERT(selected->subchannel == NULL); - /* skip dummy root */ - selected = selected->next; - } else { - GPR_ASSERT(selected->subchannel != NULL); - return selected; + while (selected != NULL) + { + if (selected == &p->ready_list) + { + GPR_ASSERT (selected->subchannel == NULL); + /* skip dummy root */ + selected = selected->next; + } + else + { + GPR_ASSERT (selected->subchannel != NULL); + return selected; + } } - } return NULL; } /** Advance the \a ready_list picking head. */ -static void advance_last_picked_locked(round_robin_lb_policy *p) { - if (p->ready_list_last_pick->next != NULL) { /* non-empty list */ - p->ready_list_last_pick = p->ready_list_last_pick->next; - if (p->ready_list_last_pick == &p->ready_list) { - /* skip dummy root */ +static void +advance_last_picked_locked (round_robin_lb_policy * p) +{ + if (p->ready_list_last_pick->next != NULL) + { /* non-empty list */ p->ready_list_last_pick = p->ready_list_last_pick->next; + if (p->ready_list_last_pick == &p->ready_list) + { + /* skip dummy root */ + p->ready_list_last_pick = p->ready_list_last_pick->next; + } + } + else + { /* should be an empty list */ + GPR_ASSERT (p->ready_list_last_pick == &p->ready_list); + } + + if (grpc_lb_round_robin_trace) + { + gpr_log (GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", p->ready_list_last_pick, p->ready_list_last_pick->subchannel); } - } else { /* should be an empty list */ - GPR_ASSERT(p->ready_list_last_pick == &p->ready_list); - } - - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", - p->ready_list_last_pick, p->ready_list_last_pick->subchannel); - } } /** Prepends (relative to the root at p->ready_list) the connected subchannel \a * csc to the list of ready subchannels. */ -static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, - grpc_subchannel *csc) { - ready_list *new_elem = gpr_malloc(sizeof(ready_list)); +static ready_list * +add_connected_sc_locked (round_robin_lb_policy * p, grpc_subchannel * csc) +{ + ready_list *new_elem = gpr_malloc (sizeof (ready_list)); new_elem->subchannel = csc; - if (p->ready_list.prev == NULL) { - /* first element */ - new_elem->next = &p->ready_list; - new_elem->prev = &p->ready_list; - p->ready_list.next = new_elem; - p->ready_list.prev = new_elem; - } else { - new_elem->next = &p->ready_list; - new_elem->prev = p->ready_list.prev; - p->ready_list.prev->next = new_elem; - p->ready_list.prev = new_elem; - } - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc); - } + if (p->ready_list.prev == NULL) + { + /* first element */ + new_elem->next = &p->ready_list; + new_elem->prev = &p->ready_list; + p->ready_list.next = new_elem; + p->ready_list.prev = new_elem; + } + else + { + new_elem->next = &p->ready_list; + new_elem->prev = p->ready_list.prev; + p->ready_list.prev->next = new_elem; + p->ready_list.prev = new_elem; + } + if (grpc_lb_round_robin_trace) + { + gpr_log (GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc); + } return new_elem; } /** Removes \a node from the list of connected subchannels */ -static void remove_disconnected_sc_locked(round_robin_lb_policy *p, - ready_list *node) { - if (node == NULL) { - return; - } - if (node == p->ready_list_last_pick) { - /* If removing the lastly picked node, reset the last pick pointer to the - * dummy root of the list */ - p->ready_list_last_pick = &p->ready_list; - } +static void +remove_disconnected_sc_locked (round_robin_lb_policy * p, ready_list * node) +{ + if (node == NULL) + { + return; + } + if (node == p->ready_list_last_pick) + { + /* If removing the lastly picked node, reset the last pick pointer to the + * dummy root of the list */ + p->ready_list_last_pick = &p->ready_list; + } /* removing last item */ - if (node->next == &p->ready_list && node->prev == &p->ready_list) { - GPR_ASSERT(p->ready_list.next == node); - GPR_ASSERT(p->ready_list.prev == node); - p->ready_list.next = NULL; - p->ready_list.prev = NULL; - } else { - node->prev->next = node->next; - node->next->prev = node->prev; - } - - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, - node->subchannel); - } + if (node->next == &p->ready_list && node->prev == &p->ready_list) + { + GPR_ASSERT (p->ready_list.next == node); + GPR_ASSERT (p->ready_list.prev == node); + p->ready_list.next = NULL; + p->ready_list.prev = NULL; + } + else + { + node->prev->next = node->next; + node->next->prev = node->prev; + } + + if (grpc_lb_round_robin_trace) + { + gpr_log (GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, node->subchannel); + } node->next = NULL; node->prev = NULL; node->subchannel = NULL; - gpr_free(node); + gpr_free (node); } -static void del_interested_parties_locked(round_robin_lb_policy *p, - const size_t subchannel_idx, - grpc_closure_list *closure_list) { +static void +del_interested_parties_locked (round_robin_lb_policy * p, const size_t subchannel_idx, grpc_closure_list * closure_list) +{ pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party(p->subchannels[subchannel_idx], - pp->pollset, closure_list); - } + for (pp = p->pending_picks; pp; pp = pp->next) + { + grpc_subchannel_del_interested_party (p->subchannels[subchannel_idx], pp->pollset, closure_list); + } } -void rr_destroy(grpc_lb_policy *pol, grpc_closure_list *closure_list) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; +void +rr_destroy (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; size_t i; ready_list *elem; - for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(p, i, closure_list); - } - for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "round_robin", closure_list); - } - gpr_free(p->connectivity_changed_cbs); - gpr_free(p->subchannel_connectivity); - - grpc_connectivity_state_destroy(&p->state_tracker, closure_list); - gpr_free(p->subchannels); - gpr_mu_destroy(&p->mu); + for (i = 0; i < p->num_subchannels; i++) + { + del_interested_parties_locked (p, i, closure_list); + } + for (i = 0; i < p->num_subchannels; i++) + { + GRPC_SUBCHANNEL_UNREF (p->subchannels[i], "round_robin", closure_list); + } + gpr_free (p->connectivity_changed_cbs); + gpr_free (p->subchannel_connectivity); + + grpc_connectivity_state_destroy (&p->state_tracker, closure_list); + gpr_free (p->subchannels); + gpr_mu_destroy (&p->mu); elem = p->ready_list.next; - while (elem != NULL && elem != &p->ready_list) { - ready_list *tmp; - tmp = elem->next; - elem->next = NULL; - elem->prev = NULL; - elem->subchannel = NULL; - gpr_free(elem); - elem = tmp; - } - gpr_free(p->subchannel_index_to_readylist_node); - gpr_free(p->cb_args); - gpr_free(p); + while (elem != NULL && elem != &p->ready_list) + { + ready_list *tmp; + tmp = elem->next; + elem->next = NULL; + elem->prev = NULL; + elem->subchannel = NULL; + gpr_free (elem); + elem = tmp; + } + gpr_free (p->subchannel_index_to_readylist_node); + gpr_free (p->cb_args); + gpr_free (p); } -void rr_shutdown(grpc_lb_policy *pol, grpc_closure_list *closure_list) { +void +rr_shutdown (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ size_t i; - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; pending_pick *pp; - gpr_mu_lock(&p->mu); + gpr_mu_lock (&p->mu); - for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(p, i, closure_list); - } + for (i = 0; i < p->num_subchannels; i++) + { + del_interested_parties_locked (p, i, closure_list); + } p->shutdown = 1; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_closure_list_add(closure_list, pp->on_complete, 0); - gpr_free(pp); - } - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, - "shutdown", closure_list); - gpr_mu_unlock(&p->mu); + while ((pp = p->pending_picks)) + { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_closure_list_add (closure_list, pp->on_complete, 0); + gpr_free (pp); + } + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown", closure_list); + gpr_mu_unlock (&p->mu); } -static void start_picking(round_robin_lb_policy *p, - grpc_closure_list *closure_list) { +static void +start_picking (round_robin_lb_policy * p, grpc_closure_list * closure_list) +{ size_t i; p->started_picking = 1; - for (i = 0; i < p->num_subchannels; i++) { - p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; - grpc_subchannel_notify_on_state_change( - p->subchannels[i], &p->subchannel_connectivity[i], - &p->connectivity_changed_cbs[i], closure_list); - GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity"); - } + for (i = 0; i < p->num_subchannels; i++) + { + p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; + grpc_subchannel_notify_on_state_change (p->subchannels[i], &p->subchannel_connectivity[i], &p->connectivity_changed_cbs[i], closure_list); + GRPC_LB_POLICY_REF (&p->base, "round_robin_connectivity"); + } } -void rr_exit_idle(grpc_lb_policy *pol, grpc_closure_list *closure_list) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - gpr_mu_lock(&p->mu); - if (!p->started_picking) { - start_picking(p, closure_list); - } - gpr_mu_unlock(&p->mu); +void +rr_exit_idle (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; + gpr_mu_lock (&p->mu); + if (!p->started_picking) + { + start_picking (p, closure_list); + } + gpr_mu_unlock (&p->mu); } -void rr_pick(grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_closure *on_complete, grpc_closure_list *closure_list) { +void +rr_pick (grpc_lb_policy * pol, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete, grpc_closure_list * closure_list) +{ size_t i; - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; pending_pick *pp; ready_list *selected; - gpr_mu_lock(&p->mu); - if ((selected = peek_next_connected_locked(p))) { - gpr_mu_unlock(&p->mu); - *target = selected->subchannel; - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)", - selected->subchannel, selected); - } - /* only advance the last picked pointer if the selection was used */ - advance_last_picked_locked(p); - on_complete->cb(on_complete->cb_arg, 1, closure_list); - } else { - if (!p->started_picking) { - start_picking(p, closure_list); + gpr_mu_lock (&p->mu); + if ((selected = peek_next_connected_locked (p))) + { + gpr_mu_unlock (&p->mu); + *target = selected->subchannel; + if (grpc_lb_round_robin_trace) + { + gpr_log (GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); + } + /* only advance the last picked pointer if the selection was used */ + advance_last_picked_locked (p); + on_complete->cb (on_complete->cb_arg, 1, closure_list); } - for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party(p->subchannels[i], pollset, - closure_list); + else + { + if (!p->started_picking) + { + start_picking (p, closure_list); + } + for (i = 0; i < p->num_subchannels; i++) + { + grpc_subchannel_add_interested_party (p->subchannels[i], pollset, closure_list); + } + pp = gpr_malloc (sizeof (*pp)); + pp->next = p->pending_picks; + pp->pollset = pollset; + pp->target = target; + pp->on_complete = on_complete; + p->pending_picks = pp; + gpr_mu_unlock (&p->mu); } - pp = gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->pollset = pollset; - pp->target = target; - pp->on_complete = on_complete; - p->pending_picks = pp; - gpr_mu_unlock(&p->mu); - } } -static void rr_connectivity_changed(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +rr_connectivity_changed (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ connectivity_changed_cb_arg *cb_arg = arg; round_robin_lb_policy *p = cb_arg->p; /* index over p->subchannels of this cb's subchannel */ @@ -337,198 +380,194 @@ static void rr_connectivity_changed(void *arg, int iomgr_success, /* connectivity state of this cb's subchannel */ grpc_connectivity_state *this_connectivity; - gpr_mu_lock(&p->mu); + gpr_mu_lock (&p->mu); this_connectivity = &p->subchannel_connectivity[this_idx]; - if (p->shutdown) { - unref = 1; - } else { - switch (*this_connectivity) { - case GRPC_CHANNEL_READY: - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, - "connecting_ready", closure_list); - /* add the newly connected subchannel to the list of connected ones. - * Note that it goes to the "end of the line". */ - p->subchannel_index_to_readylist_node[this_idx] = - add_connected_sc_locked(p, p->subchannels[this_idx]); - /* at this point we know there's at least one suitable subchannel. Go - * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - selected = peek_next_connected_locked(p); - if (p->pending_picks != NULL) { - /* if the selected subchannel is going to be used for the pending - * picks, update the last picked pointer */ - advance_last_picked_locked(p); - } - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = selected->subchannel; - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, - "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - selected->subchannel, selected); - } - grpc_subchannel_del_interested_party(selected->subchannel, - pp->pollset, closure_list); - grpc_closure_list_add(closure_list, pp->on_complete, 1); - gpr_free(pp); - } - grpc_subchannel_notify_on_state_change( - p->subchannels[this_idx], this_connectivity, - &p->connectivity_changed_cbs[this_idx], closure_list); - break; - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set(&p->state_tracker, *this_connectivity, - "connecting_changed", closure_list); - grpc_subchannel_notify_on_state_change( - p->subchannels[this_idx], this_connectivity, - &p->connectivity_changed_cbs[this_idx], closure_list); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - del_interested_parties_locked(p, this_idx, closure_list); - /* renew state notification */ - grpc_subchannel_notify_on_state_change( - p->subchannels[this_idx], this_connectivity, - &p->connectivity_changed_cbs[this_idx], closure_list); - - /* remove from ready list if still present */ - if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { - remove_disconnected_sc_locked( - p, p->subchannel_index_to_readylist_node[this_idx]); - p->subchannel_index_to_readylist_node[this_idx] = NULL; - } - grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - "connecting_transient_failure", closure_list); - break; - case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(p, this_idx, closure_list); - if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { - remove_disconnected_sc_locked( - p, p->subchannel_index_to_readylist_node[this_idx]); - p->subchannel_index_to_readylist_node[this_idx] = NULL; - } - - GPR_SWAP(grpc_subchannel *, p->subchannels[this_idx], - p->subchannels[p->num_subchannels - 1]); - p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "round_robin", - closure_list); - - if (p->num_subchannels == 0) { - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, - "no_more_channels", closure_list); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_closure_list_add(closure_list, pp->on_complete, 1); - gpr_free(pp); - } - unref = 1; - } else { - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "subchannel_failed", closure_list); - } - } /* switch */ - } /* !unref */ - - gpr_mu_unlock(&p->mu); - - if (unref) { - GRPC_LB_POLICY_UNREF(&p->base, "round_robin_connectivity", closure_list); - } + if (p->shutdown) + { + unref = 1; + } + else + { + switch (*this_connectivity) + { + case GRPC_CHANNEL_READY: + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready", closure_list); + /* add the newly connected subchannel to the list of connected ones. + * Note that it goes to the "end of the line". */ + p->subchannel_index_to_readylist_node[this_idx] = add_connected_sc_locked (p, p->subchannels[this_idx]); + /* at this point we know there's at least one suitable subchannel. Go + * ahead and pick one and notify the pending suitors in + * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ + selected = peek_next_connected_locked (p); + if (p->pending_picks != NULL) + { + /* if the selected subchannel is going to be used for the pending + * picks, update the last picked pointer */ + advance_last_picked_locked (p); + } + while ((pp = p->pending_picks)) + { + p->pending_picks = pp->next; + *pp->target = selected->subchannel; + if (grpc_lb_round_robin_trace) + { + gpr_log (GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); + } + grpc_subchannel_del_interested_party (selected->subchannel, pp->pollset, closure_list); + grpc_closure_list_add (closure_list, pp->on_complete, 1); + gpr_free (pp); + } + grpc_subchannel_notify_on_state_change (p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx], closure_list); + break; + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE: + grpc_connectivity_state_set (&p->state_tracker, *this_connectivity, "connecting_changed", closure_list); + grpc_subchannel_notify_on_state_change (p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx], closure_list); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + del_interested_parties_locked (p, this_idx, closure_list); + /* renew state notification */ + grpc_subchannel_notify_on_state_change (p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx], closure_list); + + /* remove from ready list if still present */ + if (p->subchannel_index_to_readylist_node[this_idx] != NULL) + { + remove_disconnected_sc_locked (p, p->subchannel_index_to_readylist_node[this_idx]); + p->subchannel_index_to_readylist_node[this_idx] = NULL; + } + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure", closure_list); + break; + case GRPC_CHANNEL_FATAL_FAILURE: + del_interested_parties_locked (p, this_idx, closure_list); + if (p->subchannel_index_to_readylist_node[this_idx] != NULL) + { + remove_disconnected_sc_locked (p, p->subchannel_index_to_readylist_node[this_idx]); + p->subchannel_index_to_readylist_node[this_idx] = NULL; + } + + GPR_SWAP (grpc_subchannel *, p->subchannels[this_idx], p->subchannels[p->num_subchannels - 1]); + p->num_subchannels--; + GRPC_SUBCHANNEL_UNREF (p->subchannels[p->num_subchannels], "round_robin", closure_list); + + if (p->num_subchannels == 0) + { + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "no_more_channels", closure_list); + while ((pp = p->pending_picks)) + { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_closure_list_add (closure_list, pp->on_complete, 1); + gpr_free (pp); + } + unref = 1; + } + else + { + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "subchannel_failed", closure_list); + } + } /* switch */ + } /* !unref */ + + gpr_mu_unlock (&p->mu); + + if (unref) + { + GRPC_LB_POLICY_UNREF (&p->base, "round_robin_connectivity", closure_list); + } } -static void rr_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, - grpc_closure_list *closure_list) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; +static void +rr_broadcast (grpc_lb_policy * pol, grpc_transport_op * op, grpc_closure_list * closure_list) +{ + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; size_t i; size_t n; grpc_subchannel **subchannels; - gpr_mu_lock(&p->mu); + gpr_mu_lock (&p->mu); n = p->num_subchannels; - subchannels = gpr_malloc(n * sizeof(*subchannels)); - for (i = 0; i < n; i++) { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast"); - } - gpr_mu_unlock(&p->mu); - - for (i = 0; i < n; i++) { - grpc_subchannel_process_transport_op(subchannels[i], op, closure_list); - GRPC_SUBCHANNEL_UNREF(subchannels[i], "rr_broadcast", closure_list); - } - gpr_free(subchannels); + subchannels = gpr_malloc (n * sizeof (*subchannels)); + for (i = 0; i < n; i++) + { + subchannels[i] = p->subchannels[i]; + GRPC_SUBCHANNEL_REF (subchannels[i], "rr_broadcast"); + } + gpr_mu_unlock (&p->mu); + + for (i = 0; i < n; i++) + { + grpc_subchannel_process_transport_op (subchannels[i], op, closure_list); + GRPC_SUBCHANNEL_UNREF (subchannels[i], "rr_broadcast", closure_list); + } + gpr_free (subchannels); } -static grpc_connectivity_state rr_check_connectivity( - grpc_lb_policy *pol, grpc_closure_list *closure_list) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; +static grpc_connectivity_state +rr_check_connectivity (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; grpc_connectivity_state st; - gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker); - gpr_mu_unlock(&p->mu); + gpr_mu_lock (&p->mu); + st = grpc_connectivity_state_check (&p->state_tracker); + gpr_mu_unlock (&p->mu); return st; } -static void rr_notify_on_state_change(grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify, - grpc_closure_list *closure_list) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - gpr_mu_lock(&p->mu); - grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, - notify, closure_list); - gpr_mu_unlock(&p->mu); +static void +rr_notify_on_state_change (grpc_lb_policy * pol, grpc_connectivity_state * current, grpc_closure * notify, grpc_closure_list * closure_list) +{ + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; + gpr_mu_lock (&p->mu); + grpc_connectivity_state_notify_on_state_change (&p->state_tracker, current, notify, closure_list); + gpr_mu_unlock (&p->mu); } static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, - rr_shutdown, - rr_pick, - rr_exit_idle, - rr_broadcast, - rr_check_connectivity, - rr_notify_on_state_change}; - -static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} + rr_destroy, + rr_shutdown, + rr_pick, + rr_exit_idle, + rr_broadcast, + rr_check_connectivity, + rr_notify_on_state_change +}; + +static void +round_robin_factory_ref (grpc_lb_policy_factory * factory) +{ +} -static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} +static void +round_robin_factory_unref (grpc_lb_policy_factory * factory) +{ +} -static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { +static grpc_lb_policy * +create_round_robin (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args) +{ size_t i; - round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); - GPR_ASSERT(args->num_subchannels > 0); - memset(p, 0, sizeof(*p)); - grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); - p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); + round_robin_lb_policy *p = gpr_malloc (sizeof (*p)); + GPR_ASSERT (args->num_subchannels > 0); + memset (p, 0, sizeof (*p)); + grpc_lb_policy_init (&p->base, &round_robin_lb_policy_vtable); + p->subchannels = gpr_malloc (sizeof (grpc_subchannel *) * args->num_subchannels); p->num_subchannels = args->num_subchannels; - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, - "round_robin"); - memcpy(p->subchannels, args->subchannels, - sizeof(grpc_subchannel *) * args->num_subchannels); - - gpr_mu_init(&p->mu); - p->connectivity_changed_cbs = - gpr_malloc(sizeof(grpc_closure) * args->num_subchannels); - p->subchannel_connectivity = - gpr_malloc(sizeof(grpc_connectivity_state) * args->num_subchannels); - - p->cb_args = - gpr_malloc(sizeof(connectivity_changed_cb_arg) * args->num_subchannels); - for (i = 0; i < args->num_subchannels; i++) { - p->cb_args[i].subchannel_idx = i; - p->cb_args[i].p = p; - grpc_closure_init(&p->connectivity_changed_cbs[i], rr_connectivity_changed, - &p->cb_args[i]); - } + grpc_connectivity_state_init (&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); + memcpy (p->subchannels, args->subchannels, sizeof (grpc_subchannel *) * args->num_subchannels); + + gpr_mu_init (&p->mu); + p->connectivity_changed_cbs = gpr_malloc (sizeof (grpc_closure) * args->num_subchannels); + p->subchannel_connectivity = gpr_malloc (sizeof (grpc_connectivity_state) * args->num_subchannels); + + p->cb_args = gpr_malloc (sizeof (connectivity_changed_cb_arg) * args->num_subchannels); + for (i = 0; i < args->num_subchannels; i++) + { + p->cb_args[i].subchannel_idx = i; + p->cb_args[i].p = p; + grpc_closure_init (&p->connectivity_changed_cbs[i], rr_connectivity_changed, &p->cb_args[i]); + } /* The (dummy node) root of the ready list */ p->ready_list.subchannel = NULL; @@ -536,20 +575,22 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, p->ready_list.next = NULL; p->ready_list_last_pick = &p->ready_list; - p->subchannel_index_to_readylist_node = - gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); - memset(p->subchannel_index_to_readylist_node, 0, - sizeof(grpc_subchannel *) * args->num_subchannels); + p->subchannel_index_to_readylist_node = gpr_malloc (sizeof (grpc_subchannel *) * args->num_subchannels); + memset (p->subchannel_index_to_readylist_node, 0, sizeof (grpc_subchannel *) * args->num_subchannels); return &p->base; } static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = { - round_robin_factory_ref, round_robin_factory_unref, create_round_robin, - "round_robin"}; + round_robin_factory_ref, round_robin_factory_unref, create_round_robin, + "round_robin" +}; static grpc_lb_policy_factory round_robin_lb_policy_factory = { - &round_robin_factory_vtable}; + &round_robin_factory_vtable +}; -grpc_lb_policy_factory *grpc_round_robin_lb_factory_create() { +grpc_lb_policy_factory * +grpc_round_robin_lb_factory_create () +{ return &round_robin_lb_policy_factory; } diff --git a/src/core/client_config/lb_policies/round_robin.h b/src/core/client_config/lb_policies/round_robin.h index cf1f69c85f..213995aa3f 100644 --- a/src/core/client_config/lb_policies/round_robin.h +++ b/src/core/client_config/lb_policies/round_robin.h @@ -41,6 +41,6 @@ extern int grpc_lb_round_robin_trace; #include "src/core/client_config/lb_policy_factory.h" /** Returns a load balancing factory for the round robin policy */ -grpc_lb_policy_factory *grpc_round_robin_lb_factory_create(); +grpc_lb_policy_factory *grpc_round_robin_lb_factory_create (); #endif |