diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-22 10:42:19 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-22 10:42:19 -0700 |
commit | 45724b35e411fef7c5da66a74c78428c11d56843 (patch) | |
tree | 9264034aca675c89444e02f72ef58e67d7043604 /src/core/client_config | |
parent | 298751c1195523ef6228595043b583c3a6270e08 (diff) |
indent pass to get logical source lines on one physical line
Diffstat (limited to 'src/core/client_config')
36 files changed, 2460 insertions, 2179 deletions
diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c index 636ab6d1da..69d52be70b 100644 --- a/src/core/client_config/client_config.c +++ b/src/core/client_config/client_config.c @@ -37,37 +37,50 @@ #include <grpc/support/alloc.h> -struct grpc_client_config { +struct grpc_client_config +{ gpr_refcount refs; grpc_lb_policy *lb_policy; }; -grpc_client_config *grpc_client_config_create() { - grpc_client_config *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); - gpr_ref_init(&c->refs, 1); +grpc_client_config * +grpc_client_config_create () +{ + grpc_client_config *c = gpr_malloc (sizeof (*c)); + memset (c, 0, sizeof (*c)); + gpr_ref_init (&c->refs, 1); return c; } -void grpc_client_config_ref(grpc_client_config *c) { gpr_ref(&c->refs); } +void +grpc_client_config_ref (grpc_client_config * c) +{ + gpr_ref (&c->refs); +} -void grpc_client_config_unref(grpc_client_config *c, - grpc_closure_list *closure_list) { - if (gpr_unref(&c->refs)) { - GRPC_LB_POLICY_UNREF(c->lb_policy, "client_config", closure_list); - gpr_free(c); - } +void +grpc_client_config_unref (grpc_client_config * c, grpc_closure_list * closure_list) +{ + if (gpr_unref (&c->refs)) + { + GRPC_LB_POLICY_UNREF (c->lb_policy, "client_config", closure_list); + gpr_free (c); + } } -void grpc_client_config_set_lb_policy(grpc_client_config *c, - grpc_lb_policy *lb_policy) { - GPR_ASSERT(c->lb_policy == NULL); - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "client_config"); - } +void +grpc_client_config_set_lb_policy (grpc_client_config * c, grpc_lb_policy * lb_policy) +{ + GPR_ASSERT (c->lb_policy == NULL); + if (lb_policy) + { + GRPC_LB_POLICY_REF (lb_policy, "client_config"); + } c->lb_policy = lb_policy; } -grpc_lb_policy *grpc_client_config_get_lb_policy(grpc_client_config *c) { +grpc_lb_policy * +grpc_client_config_get_lb_policy (grpc_client_config * c) +{ return c->lb_policy; } diff --git a/src/core/client_config/client_config.h b/src/core/client_config/client_config.h index 8281dbe662..49585a1c62 100644 --- a/src/core/client_config/client_config.h +++ b/src/core/client_config/client_config.h @@ -40,14 +40,11 @@ grpc_resolver */ typedef struct grpc_client_config grpc_client_config; -grpc_client_config *grpc_client_config_create(); -void grpc_client_config_ref(grpc_client_config *client_config); -void grpc_client_config_unref(grpc_client_config *client_config, - grpc_closure_list *closure_list); +grpc_client_config *grpc_client_config_create (); +void grpc_client_config_ref (grpc_client_config * client_config); +void grpc_client_config_unref (grpc_client_config * client_config, grpc_closure_list * closure_list); -void grpc_client_config_set_lb_policy(grpc_client_config *client_config, - grpc_lb_policy *lb_policy); -grpc_lb_policy *grpc_client_config_get_lb_policy( - grpc_client_config *client_config); +void grpc_client_config_set_lb_policy (grpc_client_config * client_config, grpc_lb_policy * lb_policy); +grpc_lb_policy *grpc_client_config_get_lb_policy (grpc_client_config * client_config); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_CLIENT_CONFIG_H */ diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index 39581b00fc..2f50ce36fe 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -33,25 +33,26 @@ #include "src/core/client_config/connector.h" -void grpc_connector_ref(grpc_connector *connector) { - connector->vtable->ref(connector); +void +grpc_connector_ref (grpc_connector * connector) +{ + connector->vtable->ref (connector); } -void grpc_connector_unref(grpc_connector *connector, - grpc_closure_list *closure_list) { - connector->vtable->unref(connector, closure_list); +void +grpc_connector_unref (grpc_connector * connector, grpc_closure_list * closure_list) +{ + connector->vtable->unref (connector, closure_list); } -void grpc_connector_connect(grpc_connector *connector, - const grpc_connect_in_args *in_args, - grpc_connect_out_args *out_args, - grpc_closure *notify, - grpc_closure_list *closure_list) { - connector->vtable->connect(connector, in_args, out_args, notify, - closure_list); +void +grpc_connector_connect (grpc_connector * connector, const grpc_connect_in_args * in_args, grpc_connect_out_args * out_args, grpc_closure * notify, grpc_closure_list * closure_list) +{ + connector->vtable->connect (connector, in_args, out_args, notify, closure_list); } -void grpc_connector_shutdown(grpc_connector *connector, - grpc_closure_list *closure_list) { - connector->vtable->shutdown(connector, closure_list); +void +grpc_connector_shutdown (grpc_connector * connector, grpc_closure_list * closure_list) +{ + connector->vtable->shutdown (connector, closure_list); } diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index fd46392dd8..12df36c08e 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -41,11 +41,13 @@ typedef struct grpc_connector grpc_connector; typedef struct grpc_connector_vtable grpc_connector_vtable; -struct grpc_connector { +struct grpc_connector +{ const grpc_connector_vtable *vtable; }; -typedef struct { +typedef struct +{ /** set of pollsets interested in this connection */ grpc_pollset_set *interested_parties; /** address to connect to */ @@ -57,7 +59,8 @@ typedef struct { const grpc_channel_args *channel_args; } grpc_connect_in_args; -typedef struct { +typedef struct +{ /** the connected transport */ grpc_transport *transport; /** any additional filters (owned by the caller of connect) */ @@ -65,29 +68,21 @@ typedef struct { size_t num_filters; } grpc_connect_out_args; -struct grpc_connector_vtable { - void (*ref)(grpc_connector *connector); - void (*unref)(grpc_connector *connector, grpc_closure_list *closure_list); +struct grpc_connector_vtable +{ + void (*ref) (grpc_connector * connector); + void (*unref) (grpc_connector * connector, grpc_closure_list * closure_list); /** Implementation of grpc_connector_shutdown */ - void (*shutdown)(grpc_connector *connector, grpc_closure_list *closure_list); + void (*shutdown) (grpc_connector * connector, grpc_closure_list * closure_list); /** Implementation of grpc_connector_connect */ - void (*connect)(grpc_connector *connector, - const grpc_connect_in_args *in_args, - grpc_connect_out_args *out_args, grpc_closure *notify, - grpc_closure_list *closure_list); + void (*connect) (grpc_connector * connector, const grpc_connect_in_args * in_args, grpc_connect_out_args * out_args, grpc_closure * notify, grpc_closure_list * closure_list); }; -void grpc_connector_ref(grpc_connector *connector); -void grpc_connector_unref(grpc_connector *connector, - grpc_closure_list *closure_list); +void grpc_connector_ref (grpc_connector * connector); +void grpc_connector_unref (grpc_connector * connector, grpc_closure_list * closure_list); /** Connect using the connector: max one outstanding call at a time */ -void grpc_connector_connect(grpc_connector *connector, - const grpc_connect_in_args *in_args, - grpc_connect_out_args *out_args, - grpc_closure *notify, - grpc_closure_list *closure_list); +void grpc_connector_connect (grpc_connector * connector, const grpc_connect_in_args * in_args, grpc_connect_out_args * out_args, grpc_closure * notify, grpc_closure_list * closure_list); /** Cancel any pending connection */ -void grpc_connector_shutdown(grpc_connector *connector, - grpc_closure_list *closure_list); +void grpc_connector_shutdown (grpc_connector * connector, grpc_closure_list * closure_list); #endif diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 7557053711..b8991dfaa7 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -39,14 +39,16 @@ #include <grpc/support/alloc.h> #include "src/core/transport/connectivity_state.h" -typedef struct pending_pick { +typedef struct pending_pick +{ struct pending_pick *next; grpc_pollset *pollset; grpc_subchannel **target; grpc_closure *on_complete; } pending_pick; -typedef struct { +typedef struct +{ /** base policy: must be first */ grpc_lb_policy base; /** all our subchannels */ @@ -76,286 +78,303 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -static void del_interested_parties_locked(pick_first_lb_policy *p, - grpc_closure_list *closure_list) { +static void +del_interested_parties_locked (pick_first_lb_policy * p, grpc_closure_list * closure_list) +{ pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], - pp->pollset, closure_list); - } + for (pp = p->pending_picks; pp; pp = pp->next) + { + grpc_subchannel_del_interested_party (p->subchannels[p->checking_subchannel], pp->pollset, closure_list); + } } -static void add_interested_parties_locked(pick_first_lb_policy *p, - grpc_closure_list *closure_list) { +static void +add_interested_parties_locked (pick_first_lb_policy * p, grpc_closure_list * closure_list) +{ pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], - pp->pollset, closure_list); - } + for (pp = p->pending_picks; pp; pp = pp->next) + { + grpc_subchannel_add_interested_party (p->subchannels[p->checking_subchannel], pp->pollset, closure_list); + } } -void pf_destroy(grpc_lb_policy *pol, grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; +void +pf_destroy (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; size_t i; - GPR_ASSERT(p->pending_picks == NULL); - for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first", closure_list); - } - grpc_connectivity_state_destroy(&p->state_tracker, closure_list); - gpr_free(p->subchannels); - gpr_mu_destroy(&p->mu); - gpr_free(p); + GPR_ASSERT (p->pending_picks == NULL); + for (i = 0; i < p->num_subchannels; i++) + { + GRPC_SUBCHANNEL_UNREF (p->subchannels[i], "pick_first", closure_list); + } + grpc_connectivity_state_destroy (&p->state_tracker, closure_list); + gpr_free (p->subchannels); + gpr_mu_destroy (&p->mu); + gpr_free (p); } -void pf_shutdown(grpc_lb_policy *pol, grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; +void +pf_shutdown (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; pending_pick *pp; - gpr_mu_lock(&p->mu); - del_interested_parties_locked(p, closure_list); + gpr_mu_lock (&p->mu); + del_interested_parties_locked (p, closure_list); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, - "shutdown", closure_list); - gpr_mu_unlock(&p->mu); - while (pp != NULL) { - pending_pick *next = pp->next; - *pp->target = NULL; - grpc_closure_list_add(closure_list, pp->on_complete, 1); - gpr_free(pp); - pp = next; - } + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown", closure_list); + gpr_mu_unlock (&p->mu); + while (pp != NULL) + { + pending_pick *next = pp->next; + *pp->target = NULL; + grpc_closure_list_add (closure_list, pp->on_complete, 1); + gpr_free (pp); + pp = next; + } } -static void start_picking(pick_first_lb_policy *p, - grpc_closure_list *closure_list) { +static void +start_picking (pick_first_lb_policy * p, grpc_closure_list * closure_list) +{ p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed, closure_list); + GRPC_LB_POLICY_REF (&p->base, "pick_first_connectivity"); + grpc_subchannel_notify_on_state_change (p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed, closure_list); } -void pf_exit_idle(grpc_lb_policy *pol, grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - gpr_mu_lock(&p->mu); - if (!p->started_picking) { - start_picking(p, closure_list); - } - gpr_mu_unlock(&p->mu); +void +pf_exit_idle (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; + gpr_mu_lock (&p->mu); + if (!p->started_picking) + { + start_picking (p, closure_list); + } + gpr_mu_unlock (&p->mu); } -void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_closure *on_complete, grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; +void +pf_pick (grpc_lb_policy * pol, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; pending_pick *pp; - gpr_mu_lock(&p->mu); - if (p->selected) { - gpr_mu_unlock(&p->mu); - *target = p->selected; - grpc_closure_list_add(closure_list, on_complete, 1); - } else { - if (!p->started_picking) { - start_picking(p, closure_list); + gpr_mu_lock (&p->mu); + if (p->selected) + { + gpr_mu_unlock (&p->mu); + *target = p->selected; + grpc_closure_list_add (closure_list, on_complete, 1); + } + else + { + if (!p->started_picking) + { + start_picking (p, closure_list); + } + grpc_subchannel_add_interested_party (p->subchannels[p->checking_subchannel], pollset, closure_list); + pp = gpr_malloc (sizeof (*pp)); + pp->next = p->pending_picks; + pp->pollset = pollset; + pp->target = target; + pp->on_complete = on_complete; + p->pending_picks = pp; + gpr_mu_unlock (&p->mu); } - grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], - pollset, closure_list); - pp = gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->pollset = pollset; - pp->target = target; - pp->on_complete = on_complete; - p->pending_picks = pp; - gpr_mu_unlock(&p->mu); - } } -static void pf_connectivity_changed(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +pf_connectivity_changed (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ pick_first_lb_policy *p = arg; pending_pick *pp; - gpr_mu_lock(&p->mu); + gpr_mu_lock (&p->mu); - if (p->shutdown) { - gpr_mu_unlock(&p->mu); - GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", closure_list); - return; - } else if (p->selected != NULL) { - grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, - "selected_changed", closure_list); - if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_subchannel_notify_on_state_change( - p->selected, &p->checking_connectivity, &p->connectivity_changed, - closure_list); - } else { - GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", closure_list); + if (p->shutdown) + { + gpr_mu_unlock (&p->mu); + GRPC_LB_POLICY_UNREF (&p->base, "pick_first_connectivity", closure_list); + return; } - } else { - loop: - switch (p->checking_connectivity) { - case GRPC_CHANNEL_READY: - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, - "connecting_ready", closure_list); - p->selected = p->subchannels[p->checking_subchannel]; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = p->selected; - grpc_subchannel_del_interested_party(p->selected, pp->pollset, - closure_list); - grpc_closure_list_add(closure_list, pp->on_complete, 1); - gpr_free(pp); - } - grpc_subchannel_notify_on_state_change( - p->selected, &p->checking_connectivity, &p->connectivity_changed, - closure_list); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - "connecting_transient_failure", closure_list); - del_interested_parties_locked(p, closure_list); - p->checking_subchannel = - (p->checking_subchannel + 1) % p->num_subchannels; - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(p, closure_list); - if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed, closure_list); - } else { - goto loop; - } - break; - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING, - "connecting_changed", closure_list); - grpc_subchannel_notify_on_state_change( - p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed, closure_list); - break; - case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(p, closure_list); - GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], - p->subchannels[p->num_subchannels - 1]); - p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first", - closure_list); - if (p->num_subchannels == 0) { - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, - "no_more_channels", closure_list); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_closure_list_add(closure_list, pp->on_complete, 1); - gpr_free(pp); - } - GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", - closure_list); - } else { - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "subchannel_failed", closure_list); - p->checking_subchannel %= p->num_subchannels; - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(p, closure_list); - goto loop; - } + else if (p->selected != NULL) + { + grpc_connectivity_state_set (&p->state_tracker, p->checking_connectivity, "selected_changed", closure_list); + if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) + { + grpc_subchannel_notify_on_state_change (p->selected, &p->checking_connectivity, &p->connectivity_changed, closure_list); + } + else + { + GRPC_LB_POLICY_UNREF (&p->base, "pick_first_connectivity", closure_list); + } + } + else + { + loop: + switch (p->checking_connectivity) + { + case GRPC_CHANNEL_READY: + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready", closure_list); + p->selected = p->subchannels[p->checking_subchannel]; + while ((pp = p->pending_picks)) + { + p->pending_picks = pp->next; + *pp->target = p->selected; + grpc_subchannel_del_interested_party (p->selected, pp->pollset, closure_list); + grpc_closure_list_add (closure_list, pp->on_complete, 1); + gpr_free (pp); + } + grpc_subchannel_notify_on_state_change (p->selected, &p->checking_connectivity, &p->connectivity_changed, closure_list); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure", closure_list); + del_interested_parties_locked (p, closure_list); + p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; + p->checking_connectivity = grpc_subchannel_check_connectivity (p->subchannels[p->checking_subchannel]); + add_interested_parties_locked (p, closure_list); + if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) + { + grpc_subchannel_notify_on_state_change (p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed, closure_list); + } + else + { + goto loop; + } + break; + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE: + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_CONNECTING, "connecting_changed", closure_list); + grpc_subchannel_notify_on_state_change (p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed, closure_list); + break; + case GRPC_CHANNEL_FATAL_FAILURE: + del_interested_parties_locked (p, closure_list); + GPR_SWAP (grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]); + p->num_subchannels--; + GRPC_SUBCHANNEL_UNREF (p->subchannels[p->num_subchannels], "pick_first", closure_list); + if (p->num_subchannels == 0) + { + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "no_more_channels", closure_list); + while ((pp = p->pending_picks)) + { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_closure_list_add (closure_list, pp->on_complete, 1); + gpr_free (pp); + } + GRPC_LB_POLICY_UNREF (&p->base, "pick_first_connectivity", closure_list); + } + else + { + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "subchannel_failed", closure_list); + p->checking_subchannel %= p->num_subchannels; + p->checking_connectivity = grpc_subchannel_check_connectivity (p->subchannels[p->checking_subchannel]); + add_interested_parties_locked (p, closure_list); + goto loop; + } + } } - } - gpr_mu_unlock(&p->mu); + gpr_mu_unlock (&p->mu); } -static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, - grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; +static void +pf_broadcast (grpc_lb_policy * pol, grpc_transport_op * op, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; size_t i; size_t n; grpc_subchannel **subchannels; - gpr_mu_lock(&p->mu); + gpr_mu_lock (&p->mu); n = p->num_subchannels; - subchannels = gpr_malloc(n * sizeof(*subchannels)); - for (i = 0; i < n; i++) { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast"); - } - gpr_mu_unlock(&p->mu); + subchannels = gpr_malloc (n * sizeof (*subchannels)); + for (i = 0; i < n; i++) + { + subchannels[i] = p->subchannels[i]; + GRPC_SUBCHANNEL_REF (subchannels[i], "pf_broadcast"); + } + gpr_mu_unlock (&p->mu); - for (i = 0; i < n; i++) { - grpc_subchannel_process_transport_op(subchannels[i], op, closure_list); - GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast", closure_list); - } - gpr_free(subchannels); + for (i = 0; i < n; i++) + { + grpc_subchannel_process_transport_op (subchannels[i], op, closure_list); + GRPC_SUBCHANNEL_UNREF (subchannels[i], "pf_broadcast", closure_list); + } + gpr_free (subchannels); } -static grpc_connectivity_state pf_check_connectivity( - grpc_lb_policy *pol, grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; +static grpc_connectivity_state +pf_check_connectivity (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; grpc_connectivity_state st; - gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker); - gpr_mu_unlock(&p->mu); + gpr_mu_lock (&p->mu); + st = grpc_connectivity_state_check (&p->state_tracker); + gpr_mu_unlock (&p->mu); return st; } -void pf_notify_on_state_change(grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify, - grpc_closure_list *closure_list) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - gpr_mu_lock(&p->mu); - grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, - notify, closure_list); - gpr_mu_unlock(&p->mu); +void +pf_notify_on_state_change (grpc_lb_policy * pol, grpc_connectivity_state * current, grpc_closure * notify, grpc_closure_list * closure_list) +{ + pick_first_lb_policy *p = (pick_first_lb_policy *) pol; + gpr_mu_lock (&p->mu); + grpc_connectivity_state_notify_on_state_change (&p->state_tracker, current, notify, closure_list); + gpr_mu_unlock (&p->mu); } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, - pf_shutdown, - pf_pick, - pf_exit_idle, - pf_broadcast, - pf_check_connectivity, - pf_notify_on_state_change}; + pf_destroy, + pf_shutdown, + pf_pick, + pf_exit_idle, + pf_broadcast, + pf_check_connectivity, + pf_notify_on_state_change +}; -static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} +static void +pick_first_factory_ref (grpc_lb_policy_factory * factory) +{ +} -static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} +static void +pick_first_factory_unref (grpc_lb_policy_factory * factory) +{ +} -static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { - pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); - GPR_ASSERT(args->num_subchannels > 0); - memset(p, 0, sizeof(*p)); - grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); - p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); +static grpc_lb_policy * +create_pick_first (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args) +{ + pick_first_lb_policy *p = gpr_malloc (sizeof (*p)); + GPR_ASSERT (args->num_subchannels > 0); + memset (p, 0, sizeof (*p)); + grpc_lb_policy_init (&p->base, &pick_first_lb_policy_vtable); + p->subchannels = gpr_malloc (sizeof (grpc_subchannel *) * args->num_subchannels); p->num_subchannels = args->num_subchannels; - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, - "pick_first"); - memcpy(p->subchannels, args->subchannels, - sizeof(grpc_subchannel *) * args->num_subchannels); - grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); - gpr_mu_init(&p->mu); + grpc_connectivity_state_init (&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first"); + memcpy (p->subchannels, args->subchannels, sizeof (grpc_subchannel *) * args->num_subchannels); + grpc_closure_init (&p->connectivity_changed, pf_connectivity_changed, p); + gpr_mu_init (&p->mu); return &p->base; } static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = { - pick_first_factory_ref, pick_first_factory_unref, create_pick_first, - "pick_first"}; + pick_first_factory_ref, pick_first_factory_unref, create_pick_first, + "pick_first" +}; static grpc_lb_policy_factory pick_first_lb_policy_factory = { - &pick_first_factory_vtable}; + &pick_first_factory_vtable +}; -grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() { +grpc_lb_policy_factory * +grpc_pick_first_lb_factory_create () +{ return &pick_first_lb_policy_factory; } diff --git a/src/core/client_config/lb_policies/pick_first.h b/src/core/client_config/lb_policies/pick_first.h index 3ca53ad42a..72a4e7c32c 100644 --- a/src/core/client_config/lb_policies/pick_first.h +++ b/src/core/client_config/lb_policies/pick_first.h @@ -38,6 +38,6 @@ /** Returns a load balancing factory for the pick first policy, which picks up * the first subchannel from \a subchannels to succesfully connect */ -grpc_lb_policy_factory *grpc_pick_first_lb_factory_create(); +grpc_lb_policy_factory *grpc_pick_first_lb_factory_create (); #endif diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index b4139719f4..2e64da9e10 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -43,7 +43,8 @@ int grpc_lb_round_robin_trace = 0; /** List of entities waiting for a pick. * * Once a pick is available, \a target is updated and \a on_complete called. */ -typedef struct pending_pick { +typedef struct pending_pick +{ struct pending_pick *next; grpc_pollset *pollset; grpc_subchannel **target; @@ -51,18 +52,21 @@ typedef struct pending_pick { } pending_pick; /** List of subchannels in a connectivity READY state */ -typedef struct ready_list { +typedef struct ready_list +{ grpc_subchannel *subchannel; struct ready_list *next; struct ready_list *prev; } ready_list; -typedef struct { +typedef struct +{ size_t subchannel_idx; /**< Index over p->subchannels */ - void *p; /**< round_robin_lb_policy instance */ + void *p; /**< round_robin_lb_policy instance */ } connectivity_changed_cb_arg; -typedef struct { +typedef struct +{ /** base policy: must be first */ grpc_lb_policy base; @@ -106,225 +110,264 @@ typedef struct { * * Note that this function does *not* advance p->ready_list_last_pick. Use \a * advance_last_picked_locked() for that. */ -static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) { +static ready_list * +peek_next_connected_locked (const round_robin_lb_policy * p) +{ ready_list *selected; selected = p->ready_list_last_pick->next; - while (selected != NULL) { - if (selected == &p->ready_list) { - GPR_ASSERT(selected->subchannel == NULL); - /* skip dummy root */ - selected = selected->next; - } else { - GPR_ASSERT(selected->subchannel != NULL); - return selected; + while (selected != NULL) + { + if (selected == &p->ready_list) + { + GPR_ASSERT (selected->subchannel == NULL); + /* skip dummy root */ + selected = selected->next; + } + else + { + GPR_ASSERT (selected->subchannel != NULL); + return selected; + } } - } return NULL; } /** Advance the \a ready_list picking head. */ -static void advance_last_picked_locked(round_robin_lb_policy *p) { - if (p->ready_list_last_pick->next != NULL) { /* non-empty list */ - p->ready_list_last_pick = p->ready_list_last_pick->next; - if (p->ready_list_last_pick == &p->ready_list) { - /* skip dummy root */ +static void +advance_last_picked_locked (round_robin_lb_policy * p) +{ + if (p->ready_list_last_pick->next != NULL) + { /* non-empty list */ p->ready_list_last_pick = p->ready_list_last_pick->next; + if (p->ready_list_last_pick == &p->ready_list) + { + /* skip dummy root */ + p->ready_list_last_pick = p->ready_list_last_pick->next; + } + } + else + { /* should be an empty list */ + GPR_ASSERT (p->ready_list_last_pick == &p->ready_list); + } + + if (grpc_lb_round_robin_trace) + { + gpr_log (GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", p->ready_list_last_pick, p->ready_list_last_pick->subchannel); } - } else { /* should be an empty list */ - GPR_ASSERT(p->ready_list_last_pick == &p->ready_list); - } - - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", - p->ready_list_last_pick, p->ready_list_last_pick->subchannel); - } } /** Prepends (relative to the root at p->ready_list) the connected subchannel \a * csc to the list of ready subchannels. */ -static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, - grpc_subchannel *csc) { - ready_list *new_elem = gpr_malloc(sizeof(ready_list)); +static ready_list * +add_connected_sc_locked (round_robin_lb_policy * p, grpc_subchannel * csc) +{ + ready_list *new_elem = gpr_malloc (sizeof (ready_list)); new_elem->subchannel = csc; - if (p->ready_list.prev == NULL) { - /* first element */ - new_elem->next = &p->ready_list; - new_elem->prev = &p->ready_list; - p->ready_list.next = new_elem; - p->ready_list.prev = new_elem; - } else { - new_elem->next = &p->ready_list; - new_elem->prev = p->ready_list.prev; - p->ready_list.prev->next = new_elem; - p->ready_list.prev = new_elem; - } - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc); - } + if (p->ready_list.prev == NULL) + { + /* first element */ + new_elem->next = &p->ready_list; + new_elem->prev = &p->ready_list; + p->ready_list.next = new_elem; + p->ready_list.prev = new_elem; + } + else + { + new_elem->next = &p->ready_list; + new_elem->prev = p->ready_list.prev; + p->ready_list.prev->next = new_elem; + p->ready_list.prev = new_elem; + } + if (grpc_lb_round_robin_trace) + { + gpr_log (GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc); + } return new_elem; } /** Removes \a node from the list of connected subchannels */ -static void remove_disconnected_sc_locked(round_robin_lb_policy *p, - ready_list *node) { - if (node == NULL) { - return; - } - if (node == p->ready_list_last_pick) { - /* If removing the lastly picked node, reset the last pick pointer to the - * dummy root of the list */ - p->ready_list_last_pick = &p->ready_list; - } +static void +remove_disconnected_sc_locked (round_robin_lb_policy * p, ready_list * node) +{ + if (node == NULL) + { + return; + } + if (node == p->ready_list_last_pick) + { + /* If removing the lastly picked node, reset the last pick pointer to the + * dummy root of the list */ + p->ready_list_last_pick = &p->ready_list; + } /* removing last item */ - if (node->next == &p->ready_list && node->prev == &p->ready_list) { - GPR_ASSERT(p->ready_list.next == node); - GPR_ASSERT(p->ready_list.prev == node); - p->ready_list.next = NULL; - p->ready_list.prev = NULL; - } else { - node->prev->next = node->next; - node->next->prev = node->prev; - } - - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, - node->subchannel); - } + if (node->next == &p->ready_list && node->prev == &p->ready_list) + { + GPR_ASSERT (p->ready_list.next == node); + GPR_ASSERT (p->ready_list.prev == node); + p->ready_list.next = NULL; + p->ready_list.prev = NULL; + } + else + { + node->prev->next = node->next; + node->next->prev = node->prev; + } + + if (grpc_lb_round_robin_trace) + { + gpr_log (GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, node->subchannel); + } node->next = NULL; node->prev = NULL; node->subchannel = NULL; - gpr_free(node); + gpr_free (node); } -static void del_interested_parties_locked(round_robin_lb_policy *p, - const size_t subchannel_idx, - grpc_closure_list *closure_list) { +static void +del_interested_parties_locked (round_robin_lb_policy * p, const size_t subchannel_idx, grpc_closure_list * closure_list) +{ pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party(p->subchannels[subchannel_idx], - pp->pollset, closure_list); - } + for (pp = p->pending_picks; pp; pp = pp->next) + { + grpc_subchannel_del_interested_party (p->subchannels[subchannel_idx], pp->pollset, closure_list); + } } -void rr_destroy(grpc_lb_policy *pol, grpc_closure_list *closure_list) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; +void +rr_destroy (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; size_t i; ready_list *elem; - for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(p, i, closure_list); - } - for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "round_robin", closure_list); - } - gpr_free(p->connectivity_changed_cbs); - gpr_free(p->subchannel_connectivity); - - grpc_connectivity_state_destroy(&p->state_tracker, closure_list); - gpr_free(p->subchannels); - gpr_mu_destroy(&p->mu); + for (i = 0; i < p->num_subchannels; i++) + { + del_interested_parties_locked (p, i, closure_list); + } + for (i = 0; i < p->num_subchannels; i++) + { + GRPC_SUBCHANNEL_UNREF (p->subchannels[i], "round_robin", closure_list); + } + gpr_free (p->connectivity_changed_cbs); + gpr_free (p->subchannel_connectivity); + + grpc_connectivity_state_destroy (&p->state_tracker, closure_list); + gpr_free (p->subchannels); + gpr_mu_destroy (&p->mu); elem = p->ready_list.next; - while (elem != NULL && elem != &p->ready_list) { - ready_list *tmp; - tmp = elem->next; - elem->next = NULL; - elem->prev = NULL; - elem->subchannel = NULL; - gpr_free(elem); - elem = tmp; - } - gpr_free(p->subchannel_index_to_readylist_node); - gpr_free(p->cb_args); - gpr_free(p); + while (elem != NULL && elem != &p->ready_list) + { + ready_list *tmp; + tmp = elem->next; + elem->next = NULL; + elem->prev = NULL; + elem->subchannel = NULL; + gpr_free (elem); + elem = tmp; + } + gpr_free (p->subchannel_index_to_readylist_node); + gpr_free (p->cb_args); + gpr_free (p); } -void rr_shutdown(grpc_lb_policy *pol, grpc_closure_list *closure_list) { +void +rr_shutdown (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ size_t i; - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; pending_pick *pp; - gpr_mu_lock(&p->mu); + gpr_mu_lock (&p->mu); - for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(p, i, closure_list); - } + for (i = 0; i < p->num_subchannels; i++) + { + del_interested_parties_locked (p, i, closure_list); + } p->shutdown = 1; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_closure_list_add(closure_list, pp->on_complete, 0); - gpr_free(pp); - } - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, - "shutdown", closure_list); - gpr_mu_unlock(&p->mu); + while ((pp = p->pending_picks)) + { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_closure_list_add (closure_list, pp->on_complete, 0); + gpr_free (pp); + } + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown", closure_list); + gpr_mu_unlock (&p->mu); } -static void start_picking(round_robin_lb_policy *p, - grpc_closure_list *closure_list) { +static void +start_picking (round_robin_lb_policy * p, grpc_closure_list * closure_list) +{ size_t i; p->started_picking = 1; - for (i = 0; i < p->num_subchannels; i++) { - p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; - grpc_subchannel_notify_on_state_change( - p->subchannels[i], &p->subchannel_connectivity[i], - &p->connectivity_changed_cbs[i], closure_list); - GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity"); - } + for (i = 0; i < p->num_subchannels; i++) + { + p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; + grpc_subchannel_notify_on_state_change (p->subchannels[i], &p->subchannel_connectivity[i], &p->connectivity_changed_cbs[i], closure_list); + GRPC_LB_POLICY_REF (&p->base, "round_robin_connectivity"); + } } -void rr_exit_idle(grpc_lb_policy *pol, grpc_closure_list *closure_list) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - gpr_mu_lock(&p->mu); - if (!p->started_picking) { - start_picking(p, closure_list); - } - gpr_mu_unlock(&p->mu); +void +rr_exit_idle (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; + gpr_mu_lock (&p->mu); + if (!p->started_picking) + { + start_picking (p, closure_list); + } + gpr_mu_unlock (&p->mu); } -void rr_pick(grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_closure *on_complete, grpc_closure_list *closure_list) { +void +rr_pick (grpc_lb_policy * pol, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete, grpc_closure_list * closure_list) +{ size_t i; - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; pending_pick *pp; ready_list *selected; - gpr_mu_lock(&p->mu); - if ((selected = peek_next_connected_locked(p))) { - gpr_mu_unlock(&p->mu); - *target = selected->subchannel; - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)", - selected->subchannel, selected); - } - /* only advance the last picked pointer if the selection was used */ - advance_last_picked_locked(p); - on_complete->cb(on_complete->cb_arg, 1, closure_list); - } else { - if (!p->started_picking) { - start_picking(p, closure_list); + gpr_mu_lock (&p->mu); + if ((selected = peek_next_connected_locked (p))) + { + gpr_mu_unlock (&p->mu); + *target = selected->subchannel; + if (grpc_lb_round_robin_trace) + { + gpr_log (GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); + } + /* only advance the last picked pointer if the selection was used */ + advance_last_picked_locked (p); + on_complete->cb (on_complete->cb_arg, 1, closure_list); } - for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party(p->subchannels[i], pollset, - closure_list); + else + { + if (!p->started_picking) + { + start_picking (p, closure_list); + } + for (i = 0; i < p->num_subchannels; i++) + { + grpc_subchannel_add_interested_party (p->subchannels[i], pollset, closure_list); + } + pp = gpr_malloc (sizeof (*pp)); + pp->next = p->pending_picks; + pp->pollset = pollset; + pp->target = target; + pp->on_complete = on_complete; + p->pending_picks = pp; + gpr_mu_unlock (&p->mu); } - pp = gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->pollset = pollset; - pp->target = target; - pp->on_complete = on_complete; - p->pending_picks = pp; - gpr_mu_unlock(&p->mu); - } } -static void rr_connectivity_changed(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +rr_connectivity_changed (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ connectivity_changed_cb_arg *cb_arg = arg; round_robin_lb_policy *p = cb_arg->p; /* index over p->subchannels of this cb's subchannel */ @@ -337,198 +380,194 @@ static void rr_connectivity_changed(void *arg, int iomgr_success, /* connectivity state of this cb's subchannel */ grpc_connectivity_state *this_connectivity; - gpr_mu_lock(&p->mu); + gpr_mu_lock (&p->mu); this_connectivity = &p->subchannel_connectivity[this_idx]; - if (p->shutdown) { - unref = 1; - } else { - switch (*this_connectivity) { - case GRPC_CHANNEL_READY: - grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, - "connecting_ready", closure_list); - /* add the newly connected subchannel to the list of connected ones. - * Note that it goes to the "end of the line". */ - p->subchannel_index_to_readylist_node[this_idx] = - add_connected_sc_locked(p, p->subchannels[this_idx]); - /* at this point we know there's at least one suitable subchannel. Go - * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - selected = peek_next_connected_locked(p); - if (p->pending_picks != NULL) { - /* if the selected subchannel is going to be used for the pending - * picks, update the last picked pointer */ - advance_last_picked_locked(p); - } - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = selected->subchannel; - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, - "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - selected->subchannel, selected); - } - grpc_subchannel_del_interested_party(selected->subchannel, - pp->pollset, closure_list); - grpc_closure_list_add(closure_list, pp->on_complete, 1); - gpr_free(pp); - } - grpc_subchannel_notify_on_state_change( - p->subchannels[this_idx], this_connectivity, - &p->connectivity_changed_cbs[this_idx], closure_list); - break; - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set(&p->state_tracker, *this_connectivity, - "connecting_changed", closure_list); - grpc_subchannel_notify_on_state_change( - p->subchannels[this_idx], this_connectivity, - &p->connectivity_changed_cbs[this_idx], closure_list); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - del_interested_parties_locked(p, this_idx, closure_list); - /* renew state notification */ - grpc_subchannel_notify_on_state_change( - p->subchannels[this_idx], this_connectivity, - &p->connectivity_changed_cbs[this_idx], closure_list); - - /* remove from ready list if still present */ - if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { - remove_disconnected_sc_locked( - p, p->subchannel_index_to_readylist_node[this_idx]); - p->subchannel_index_to_readylist_node[this_idx] = NULL; - } - grpc_connectivity_state_set( - &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - "connecting_transient_failure", closure_list); - break; - case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(p, this_idx, closure_list); - if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { - remove_disconnected_sc_locked( - p, p->subchannel_index_to_readylist_node[this_idx]); - p->subchannel_index_to_readylist_node[this_idx] = NULL; - } - - GPR_SWAP(grpc_subchannel *, p->subchannels[this_idx], - p->subchannels[p->num_subchannels - 1]); - p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "round_robin", - closure_list); - - if (p->num_subchannels == 0) { - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, - "no_more_channels", closure_list); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_closure_list_add(closure_list, pp->on_complete, 1); - gpr_free(pp); - } - unref = 1; - } else { - grpc_connectivity_state_set(&p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "subchannel_failed", closure_list); - } - } /* switch */ - } /* !unref */ - - gpr_mu_unlock(&p->mu); - - if (unref) { - GRPC_LB_POLICY_UNREF(&p->base, "round_robin_connectivity", closure_list); - } + if (p->shutdown) + { + unref = 1; + } + else + { + switch (*this_connectivity) + { + case GRPC_CHANNEL_READY: + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready", closure_list); + /* add the newly connected subchannel to the list of connected ones. + * Note that it goes to the "end of the line". */ + p->subchannel_index_to_readylist_node[this_idx] = add_connected_sc_locked (p, p->subchannels[this_idx]); + /* at this point we know there's at least one suitable subchannel. Go + * ahead and pick one and notify the pending suitors in + * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ + selected = peek_next_connected_locked (p); + if (p->pending_picks != NULL) + { + /* if the selected subchannel is going to be used for the pending + * picks, update the last picked pointer */ + advance_last_picked_locked (p); + } + while ((pp = p->pending_picks)) + { + p->pending_picks = pp->next; + *pp->target = selected->subchannel; + if (grpc_lb_round_robin_trace) + { + gpr_log (GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); + } + grpc_subchannel_del_interested_party (selected->subchannel, pp->pollset, closure_list); + grpc_closure_list_add (closure_list, pp->on_complete, 1); + gpr_free (pp); + } + grpc_subchannel_notify_on_state_change (p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx], closure_list); + break; + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE: + grpc_connectivity_state_set (&p->state_tracker, *this_connectivity, "connecting_changed", closure_list); + grpc_subchannel_notify_on_state_change (p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx], closure_list); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + del_interested_parties_locked (p, this_idx, closure_list); + /* renew state notification */ + grpc_subchannel_notify_on_state_change (p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx], closure_list); + + /* remove from ready list if still present */ + if (p->subchannel_index_to_readylist_node[this_idx] != NULL) + { + remove_disconnected_sc_locked (p, p->subchannel_index_to_readylist_node[this_idx]); + p->subchannel_index_to_readylist_node[this_idx] = NULL; + } + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure", closure_list); + break; + case GRPC_CHANNEL_FATAL_FAILURE: + del_interested_parties_locked (p, this_idx, closure_list); + if (p->subchannel_index_to_readylist_node[this_idx] != NULL) + { + remove_disconnected_sc_locked (p, p->subchannel_index_to_readylist_node[this_idx]); + p->subchannel_index_to_readylist_node[this_idx] = NULL; + } + + GPR_SWAP (grpc_subchannel *, p->subchannels[this_idx], p->subchannels[p->num_subchannels - 1]); + p->num_subchannels--; + GRPC_SUBCHANNEL_UNREF (p->subchannels[p->num_subchannels], "round_robin", closure_list); + + if (p->num_subchannels == 0) + { + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "no_more_channels", closure_list); + while ((pp = p->pending_picks)) + { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_closure_list_add (closure_list, pp->on_complete, 1); + gpr_free (pp); + } + unref = 1; + } + else + { + grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "subchannel_failed", closure_list); + } + } /* switch */ + } /* !unref */ + + gpr_mu_unlock (&p->mu); + + if (unref) + { + GRPC_LB_POLICY_UNREF (&p->base, "round_robin_connectivity", closure_list); + } } -static void rr_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, - grpc_closure_list *closure_list) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; +static void +rr_broadcast (grpc_lb_policy * pol, grpc_transport_op * op, grpc_closure_list * closure_list) +{ + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; size_t i; size_t n; grpc_subchannel **subchannels; - gpr_mu_lock(&p->mu); + gpr_mu_lock (&p->mu); n = p->num_subchannels; - subchannels = gpr_malloc(n * sizeof(*subchannels)); - for (i = 0; i < n; i++) { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast"); - } - gpr_mu_unlock(&p->mu); - - for (i = 0; i < n; i++) { - grpc_subchannel_process_transport_op(subchannels[i], op, closure_list); - GRPC_SUBCHANNEL_UNREF(subchannels[i], "rr_broadcast", closure_list); - } - gpr_free(subchannels); + subchannels = gpr_malloc (n * sizeof (*subchannels)); + for (i = 0; i < n; i++) + { + subchannels[i] = p->subchannels[i]; + GRPC_SUBCHANNEL_REF (subchannels[i], "rr_broadcast"); + } + gpr_mu_unlock (&p->mu); + + for (i = 0; i < n; i++) + { + grpc_subchannel_process_transport_op (subchannels[i], op, closure_list); + GRPC_SUBCHANNEL_UNREF (subchannels[i], "rr_broadcast", closure_list); + } + gpr_free (subchannels); } -static grpc_connectivity_state rr_check_connectivity( - grpc_lb_policy *pol, grpc_closure_list *closure_list) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; +static grpc_connectivity_state +rr_check_connectivity (grpc_lb_policy * pol, grpc_closure_list * closure_list) +{ + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; grpc_connectivity_state st; - gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker); - gpr_mu_unlock(&p->mu); + gpr_mu_lock (&p->mu); + st = grpc_connectivity_state_check (&p->state_tracker); + gpr_mu_unlock (&p->mu); return st; } -static void rr_notify_on_state_change(grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify, - grpc_closure_list *closure_list) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - gpr_mu_lock(&p->mu); - grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, - notify, closure_list); - gpr_mu_unlock(&p->mu); +static void +rr_notify_on_state_change (grpc_lb_policy * pol, grpc_connectivity_state * current, grpc_closure * notify, grpc_closure_list * closure_list) +{ + round_robin_lb_policy *p = (round_robin_lb_policy *) pol; + gpr_mu_lock (&p->mu); + grpc_connectivity_state_notify_on_state_change (&p->state_tracker, current, notify, closure_list); + gpr_mu_unlock (&p->mu); } static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, - rr_shutdown, - rr_pick, - rr_exit_idle, - rr_broadcast, - rr_check_connectivity, - rr_notify_on_state_change}; - -static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} + rr_destroy, + rr_shutdown, + rr_pick, + rr_exit_idle, + rr_broadcast, + rr_check_connectivity, + rr_notify_on_state_change +}; + +static void +round_robin_factory_ref (grpc_lb_policy_factory * factory) +{ +} -static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} +static void +round_robin_factory_unref (grpc_lb_policy_factory * factory) +{ +} -static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { +static grpc_lb_policy * +create_round_robin (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args) +{ size_t i; - round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); - GPR_ASSERT(args->num_subchannels > 0); - memset(p, 0, sizeof(*p)); - grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); - p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); + round_robin_lb_policy *p = gpr_malloc (sizeof (*p)); + GPR_ASSERT (args->num_subchannels > 0); + memset (p, 0, sizeof (*p)); + grpc_lb_policy_init (&p->base, &round_robin_lb_policy_vtable); + p->subchannels = gpr_malloc (sizeof (grpc_subchannel *) * args->num_subchannels); p->num_subchannels = args->num_subchannels; - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, - "round_robin"); - memcpy(p->subchannels, args->subchannels, - sizeof(grpc_subchannel *) * args->num_subchannels); - - gpr_mu_init(&p->mu); - p->connectivity_changed_cbs = - gpr_malloc(sizeof(grpc_closure) * args->num_subchannels); - p->subchannel_connectivity = - gpr_malloc(sizeof(grpc_connectivity_state) * args->num_subchannels); - - p->cb_args = - gpr_malloc(sizeof(connectivity_changed_cb_arg) * args->num_subchannels); - for (i = 0; i < args->num_subchannels; i++) { - p->cb_args[i].subchannel_idx = i; - p->cb_args[i].p = p; - grpc_closure_init(&p->connectivity_changed_cbs[i], rr_connectivity_changed, - &p->cb_args[i]); - } + grpc_connectivity_state_init (&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); + memcpy (p->subchannels, args->subchannels, sizeof (grpc_subchannel *) * args->num_subchannels); + + gpr_mu_init (&p->mu); + p->connectivity_changed_cbs = gpr_malloc (sizeof (grpc_closure) * args->num_subchannels); + p->subchannel_connectivity = gpr_malloc (sizeof (grpc_connectivity_state) * args->num_subchannels); + + p->cb_args = gpr_malloc (sizeof (connectivity_changed_cb_arg) * args->num_subchannels); + for (i = 0; i < args->num_subchannels; i++) + { + p->cb_args[i].subchannel_idx = i; + p->cb_args[i].p = p; + grpc_closure_init (&p->connectivity_changed_cbs[i], rr_connectivity_changed, &p->cb_args[i]); + } /* The (dummy node) root of the ready list */ p->ready_list.subchannel = NULL; @@ -536,20 +575,22 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, p->ready_list.next = NULL; p->ready_list_last_pick = &p->ready_list; - p->subchannel_index_to_readylist_node = - gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); - memset(p->subchannel_index_to_readylist_node, 0, - sizeof(grpc_subchannel *) * args->num_subchannels); + p->subchannel_index_to_readylist_node = gpr_malloc (sizeof (grpc_subchannel *) * args->num_subchannels); + memset (p->subchannel_index_to_readylist_node, 0, sizeof (grpc_subchannel *) * args->num_subchannels); return &p->base; } static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = { - round_robin_factory_ref, round_robin_factory_unref, create_round_robin, - "round_robin"}; + round_robin_factory_ref, round_robin_factory_unref, create_round_robin, + "round_robin" +}; static grpc_lb_policy_factory round_robin_lb_policy_factory = { - &round_robin_factory_vtable}; + &round_robin_factory_vtable +}; -grpc_lb_policy_factory *grpc_round_robin_lb_factory_create() { +grpc_lb_policy_factory * +grpc_round_robin_lb_factory_create () +{ return &round_robin_lb_policy_factory; } diff --git a/src/core/client_config/lb_policies/round_robin.h b/src/core/client_config/lb_policies/round_robin.h index cf1f69c85f..213995aa3f 100644 --- a/src/core/client_config/lb_policies/round_robin.h +++ b/src/core/client_config/lb_policies/round_robin.h @@ -41,6 +41,6 @@ extern int grpc_lb_round_robin_trace; #include "src/core/client_config/lb_policy_factory.h" /** Returns a load balancing factory for the round robin policy */ -grpc_lb_policy_factory *grpc_round_robin_lb_factory_create(); +grpc_lb_policy_factory *grpc_round_robin_lb_factory_create (); #endif diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 99eb69613b..40d9079f34 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -33,69 +33,74 @@ #include "src/core/client_config/lb_policy.h" -void grpc_lb_policy_init(grpc_lb_policy *policy, - const grpc_lb_policy_vtable *vtable) { +void +grpc_lb_policy_init (grpc_lb_policy * policy, const grpc_lb_policy_vtable * vtable) +{ policy->vtable = vtable; - gpr_ref_init(&policy->refs, 1); + gpr_ref_init (&policy->refs, 1); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, - const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s", - policy, (int)policy->refs.count, (int)policy->refs.count + 1, reason); +void +grpc_lb_policy_ref (grpc_lb_policy * policy, const char *file, int line, const char *reason) +{ + gpr_log (file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s", policy, (int) policy->refs.count, (int) policy->refs.count + 1, reason); #else -void grpc_lb_policy_ref(grpc_lb_policy *policy) { +void +grpc_lb_policy_ref (grpc_lb_policy * policy) +{ #endif - gpr_ref(&policy->refs); + gpr_ref (&policy->refs); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void grpc_lb_policy_unref(grpc_lb_policy *policy, - grpc_closure_list *closure_list, const char *file, - int line, const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s", - policy, (int)policy->refs.count, (int)policy->refs.count - 1, reason); +void +grpc_lb_policy_unref (grpc_lb_policy * policy, grpc_closure_list * closure_list, const char *file, int line, const char *reason) +{ + gpr_log (file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s", policy, (int) policy->refs.count, (int) policy->refs.count - 1, reason); #else -void grpc_lb_policy_unref(grpc_lb_policy *policy, - grpc_closure_list *closure_list) { +void +grpc_lb_policy_unref (grpc_lb_policy * policy, grpc_closure_list * closure_list) +{ #endif - if (gpr_unref(&policy->refs)) { - policy->vtable->destroy(policy, closure_list); - } + if (gpr_unref (&policy->refs)) + { + policy->vtable->destroy (policy, closure_list); + } } -void grpc_lb_policy_shutdown(grpc_lb_policy *policy, - grpc_closure_list *closure_list) { - policy->vtable->shutdown(policy, closure_list); +void +grpc_lb_policy_shutdown (grpc_lb_policy * policy, grpc_closure_list * closure_list) +{ + policy->vtable->shutdown (policy, closure_list); } -void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete, - grpc_closure_list *closure_list) { - policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete, - closure_list); +void +grpc_lb_policy_pick (grpc_lb_policy * policy, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete, grpc_closure_list * closure_list) +{ + policy->vtable->pick (policy, pollset, initial_metadata, target, on_complete, closure_list); } -void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op, - grpc_closure_list *closure_list) { - policy->vtable->broadcast(policy, op, closure_list); +void +grpc_lb_policy_broadcast (grpc_lb_policy * policy, grpc_transport_op * op, grpc_closure_list * closure_list) +{ + policy->vtable->broadcast (policy, op, closure_list); } -void grpc_lb_policy_exit_idle(grpc_lb_policy *policy, - grpc_closure_list *closure_list) { - policy->vtable->exit_idle(policy, closure_list); +void +grpc_lb_policy_exit_idle (grpc_lb_policy * policy, grpc_closure_list * closure_list) +{ + policy->vtable->exit_idle (policy, closure_list); } -void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_closure *closure, - grpc_closure_list *closure_list) { - policy->vtable->notify_on_state_change(policy, state, closure, closure_list); +void +grpc_lb_policy_notify_on_state_change (grpc_lb_policy * policy, grpc_connectivity_state * state, grpc_closure * closure, grpc_closure_list * closure_list) +{ + policy->vtable->notify_on_state_change (policy, state, closure, closure_list); } -grpc_connectivity_state grpc_lb_policy_check_connectivity( - grpc_lb_policy *policy, grpc_closure_list *closure_list) { - return policy->vtable->check_connectivity(policy, closure_list); +grpc_connectivity_state +grpc_lb_policy_check_connectivity (grpc_lb_policy * policy, grpc_closure_list * closure_list) +{ + return policy->vtable->check_connectivity (policy, closure_list); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 5e3c016eb2..d530e5b7e1 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -42,41 +42,35 @@ typedef struct grpc_lb_policy grpc_lb_policy; typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable; -typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, - grpc_status_code status, const char *errmsg); +typedef void (*grpc_lb_completion) (void *cb_arg, grpc_subchannel * subchannel, grpc_status_code status, const char *errmsg); -struct grpc_lb_policy { +struct grpc_lb_policy +{ const grpc_lb_policy_vtable *vtable; gpr_refcount refs; }; -struct grpc_lb_policy_vtable { - void (*destroy)(grpc_lb_policy *policy, grpc_closure_list *closure_list); +struct grpc_lb_policy_vtable +{ + void (*destroy) (grpc_lb_policy * policy, grpc_closure_list * closure_list); - void (*shutdown)(grpc_lb_policy *policy, grpc_closure_list *closure_list); + void (*shutdown) (grpc_lb_policy * policy, grpc_closure_list * closure_list); /** implement grpc_lb_policy_pick */ - void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_closure *on_complete, grpc_closure_list *closure_list); + void (*pick) (grpc_lb_policy * policy, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete, grpc_closure_list * closure_list); /** try to enter a READY connectivity state */ - void (*exit_idle)(grpc_lb_policy *policy, grpc_closure_list *closure_list); + void (*exit_idle) (grpc_lb_policy * policy, grpc_closure_list * closure_list); /** broadcast a transport op to all subchannels */ - void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op, - grpc_closure_list *closure_list); + void (*broadcast) (grpc_lb_policy * policy, grpc_transport_op * op, grpc_closure_list * closure_list); /** check the current connectivity of the lb_policy */ - grpc_connectivity_state (*check_connectivity)( - grpc_lb_policy *policy, grpc_closure_list *closure_list); + grpc_connectivity_state (*check_connectivity) (grpc_lb_policy * policy, grpc_closure_list * closure_list); /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the policy */ - void (*notify_on_state_change)(grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_closure *closure, - grpc_closure_list *closure_list); + void (*notify_on_state_change) (grpc_lb_policy * policy, grpc_connectivity_state * state, grpc_closure * closure, grpc_closure_list * closure_list); }; #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -84,48 +78,33 @@ struct grpc_lb_policy_vtable { grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_UNREF(p, r, cl) \ grpc_lb_policy_unref((p), (cl), __FILE__, __LINE__, (r)) -void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, - const char *reason); -void grpc_lb_policy_unref(grpc_lb_policy *policy, - grpc_closure_list *closure_list, const char *file, - int line, const char *reason); +void grpc_lb_policy_ref (grpc_lb_policy * policy, const char *file, int line, const char *reason); +void grpc_lb_policy_unref (grpc_lb_policy * policy, grpc_closure_list * closure_list, const char *file, int line, const char *reason); #else #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_UNREF(p, r, cl) grpc_lb_policy_unref((p), (cl)) -void grpc_lb_policy_ref(grpc_lb_policy *policy); -void grpc_lb_policy_unref(grpc_lb_policy *policy, - grpc_closure_list *closure_list); +void grpc_lb_policy_ref (grpc_lb_policy * policy); +void grpc_lb_policy_unref (grpc_lb_policy * policy, grpc_closure_list * closure_list); #endif /** called by concrete implementations to initialize the base struct */ -void grpc_lb_policy_init(grpc_lb_policy *policy, - const grpc_lb_policy_vtable *vtable); +void grpc_lb_policy_init (grpc_lb_policy * policy, const grpc_lb_policy_vtable * vtable); /** Start shutting down (fail any pending picks) */ -void grpc_lb_policy_shutdown(grpc_lb_policy *policy, - grpc_closure_list *closure_list); +void grpc_lb_policy_shutdown (grpc_lb_policy * policy, grpc_closure_list * closure_list); /** Given initial metadata in \a initial_metadata, find an appropriate target for this rpc, and 'return' it by calling \a on_complete after setting \a target. Picking can be asynchronous. Any IO should be done under \a pollset. */ -void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete, - grpc_closure_list *closure_list); +void grpc_lb_policy_pick (grpc_lb_policy * policy, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete, grpc_closure_list * closure_list); -void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op, - grpc_closure_list *closure_list); +void grpc_lb_policy_broadcast (grpc_lb_policy * policy, grpc_transport_op * op, grpc_closure_list * closure_list); -void grpc_lb_policy_exit_idle(grpc_lb_policy *policy, - grpc_closure_list *closure_list); +void grpc_lb_policy_exit_idle (grpc_lb_policy * policy, grpc_closure_list * closure_list); -void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_closure *closure, - grpc_closure_list *closure_list); +void grpc_lb_policy_notify_on_state_change (grpc_lb_policy * policy, grpc_connectivity_state * state, grpc_closure * closure, grpc_closure_list * closure_list); -grpc_connectivity_state grpc_lb_policy_check_connectivity( - grpc_lb_policy *policy, grpc_closure_list *closure_list); +grpc_connectivity_state grpc_lb_policy_check_connectivity (grpc_lb_policy * policy, grpc_closure_list * closure_list); #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */ diff --git a/src/core/client_config/lb_policy_factory.c b/src/core/client_config/lb_policy_factory.c index 0c097e0542..9d202d0c36 100644 --- a/src/core/client_config/lb_policy_factory.c +++ b/src/core/client_config/lb_policy_factory.c @@ -33,15 +33,22 @@ #include "src/core/client_config/lb_policy_factory.h" -void grpc_lb_policy_factory_ref(grpc_lb_policy_factory *factory) { - factory->vtable->ref(factory); +void +grpc_lb_policy_factory_ref (grpc_lb_policy_factory * factory) +{ + factory->vtable->ref (factory); } -void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory) { - factory->vtable->unref(factory); + +void +grpc_lb_policy_factory_unref (grpc_lb_policy_factory * factory) +{ + factory->vtable->unref (factory); } -grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( - grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - if (factory == NULL) return NULL; - return factory->vtable->create_lb_policy(factory, args); +grpc_lb_policy * +grpc_lb_policy_factory_create_lb_policy (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args) +{ + if (factory == NULL) + return NULL; + return factory->vtable->create_lb_policy (factory, args); } diff --git a/src/core/client_config/lb_policy_factory.h b/src/core/client_config/lb_policy_factory.h index 04610316ee..a00b5142dc 100644 --- a/src/core/client_config/lb_policy_factory.h +++ b/src/core/client_config/lb_policy_factory.h @@ -42,32 +42,33 @@ typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; /** grpc_lb_policy provides grpc_client_config objects to grpc_channel objects */ -struct grpc_lb_policy_factory { +struct grpc_lb_policy_factory +{ const grpc_lb_policy_factory_vtable *vtable; }; -typedef struct grpc_lb_policy_args { +typedef struct grpc_lb_policy_args +{ grpc_subchannel **subchannels; size_t num_subchannels; } grpc_lb_policy_args; -struct grpc_lb_policy_factory_vtable { - void (*ref)(grpc_lb_policy_factory *factory); - void (*unref)(grpc_lb_policy_factory *factory); +struct grpc_lb_policy_factory_vtable +{ + void (*ref) (grpc_lb_policy_factory * factory); + void (*unref) (grpc_lb_policy_factory * factory); /** Implementation of grpc_lb_policy_factory_create_lb_policy */ - grpc_lb_policy *(*create_lb_policy)(grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args); + grpc_lb_policy *(*create_lb_policy) (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args); /** Name for the LB policy this factory implements */ const char *name; }; -void grpc_lb_policy_factory_ref(grpc_lb_policy_factory *factory); -void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory); +void grpc_lb_policy_factory_ref (grpc_lb_policy_factory * factory); +void grpc_lb_policy_factory_unref (grpc_lb_policy_factory * factory); /** Create a lb_policy instance. */ -grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( - grpc_lb_policy_factory *factory, grpc_lb_policy_args *args); +grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args); #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_FACTORY_H */ diff --git a/src/core/client_config/lb_policy_registry.c b/src/core/client_config/lb_policy_registry.c index fc302e82d7..4d7cc7c128 100644 --- a/src/core/client_config/lb_policy_registry.c +++ b/src/core/client_config/lb_policy_registry.c @@ -42,47 +42,59 @@ static int g_number_of_lb_policies = 0; static grpc_lb_policy_factory *g_default_lb_policy_factory; -void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory) { +void +grpc_lb_policy_registry_init (grpc_lb_policy_factory * default_factory) +{ g_number_of_lb_policies = 0; g_default_lb_policy_factory = default_factory; } -void grpc_lb_policy_registry_shutdown(void) { +void +grpc_lb_policy_registry_shutdown (void) +{ int i; - for (i = 0; i < g_number_of_lb_policies; i++) { - grpc_lb_policy_factory_unref(g_all_of_the_lb_policies[i]); - } + for (i = 0; i < g_number_of_lb_policies; i++) + { + grpc_lb_policy_factory_unref (g_all_of_the_lb_policies[i]); + } } -void grpc_register_lb_policy(grpc_lb_policy_factory *factory) { +void +grpc_register_lb_policy (grpc_lb_policy_factory * factory) +{ int i; - for (i = 0; i < g_number_of_lb_policies; i++) { - GPR_ASSERT(0 != strcmp(factory->vtable->name, - g_all_of_the_lb_policies[i]->vtable->name)); - } - GPR_ASSERT(g_number_of_lb_policies != MAX_POLICIES); - grpc_lb_policy_factory_ref(factory); + for (i = 0; i < g_number_of_lb_policies; i++) + { + GPR_ASSERT (0 != strcmp (factory->vtable->name, g_all_of_the_lb_policies[i]->vtable->name)); + } + GPR_ASSERT (g_number_of_lb_policies != MAX_POLICIES); + grpc_lb_policy_factory_ref (factory); g_all_of_the_lb_policies[g_number_of_lb_policies++] = factory; } -static grpc_lb_policy_factory *lookup_factory(const char *name) { +static grpc_lb_policy_factory * +lookup_factory (const char *name) +{ int i; - if (name == NULL) return NULL; + if (name == NULL) + return NULL; - for (i = 0; i < g_number_of_lb_policies; i++) { - if (0 == strcmp(name, g_all_of_the_lb_policies[i]->vtable->name)) { - return g_all_of_the_lb_policies[i]; + for (i = 0; i < g_number_of_lb_policies; i++) + { + if (0 == strcmp (name, g_all_of_the_lb_policies[i]->vtable->name)) + { + return g_all_of_the_lb_policies[i]; + } } - } return NULL; } -grpc_lb_policy *grpc_lb_policy_create(const char *name, - grpc_lb_policy_args *args) { - grpc_lb_policy_factory *factory = lookup_factory(name); - grpc_lb_policy *lb_policy = - grpc_lb_policy_factory_create_lb_policy(factory, args); +grpc_lb_policy * +grpc_lb_policy_create (const char *name, grpc_lb_policy_args * args) +{ + grpc_lb_policy_factory *factory = lookup_factory (name); + grpc_lb_policy *lb_policy = grpc_lb_policy_factory_create_lb_policy (factory, args); return lb_policy; } diff --git a/src/core/client_config/lb_policy_registry.h b/src/core/client_config/lb_policy_registry.h index 96fc2a1628..dea0cfc0fd 100644 --- a/src/core/client_config/lb_policy_registry.h +++ b/src/core/client_config/lb_policy_registry.h @@ -38,17 +38,16 @@ /** Initialize the registry and set \a default_factory as the factory to be * returned when no name is provided in a lookup */ -void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory); -void grpc_lb_policy_registry_shutdown(void); +void grpc_lb_policy_registry_init (grpc_lb_policy_factory * default_factory); +void grpc_lb_policy_registry_shutdown (void); /** Register a LB policy factory. */ -void grpc_register_lb_policy(grpc_lb_policy_factory *factory); +void grpc_register_lb_policy (grpc_lb_policy_factory * factory); /** Create a \a grpc_lb_policy instance. * * If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init * will be returned. */ -grpc_lb_policy *grpc_lb_policy_create(const char *name, - grpc_lb_policy_args *args); +grpc_lb_policy *grpc_lb_policy_create (const char *name, grpc_lb_policy_args * args); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */ diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c index 21186d7aa1..089ce6cd18 100644 --- a/src/core/client_config/resolver.c +++ b/src/core/client_config/resolver.c @@ -33,56 +33,56 @@ #include "src/core/client_config/resolver.h" -void grpc_resolver_init(grpc_resolver *resolver, - const grpc_resolver_vtable *vtable) { +void +grpc_resolver_init (grpc_resolver * resolver, const grpc_resolver_vtable * vtable) +{ resolver->vtable = vtable; - gpr_ref_init(&resolver->refs, 1); + gpr_ref_init (&resolver->refs, 1); } #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG -void grpc_resolver_ref(grpc_resolver *resolver, grpc_closure_list *closure_list, - const char *file, int line, const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p ref %d -> %d %s", - resolver, (int)resolver->refs.count, (int)resolver->refs.count + 1, - reason); +void +grpc_resolver_ref (grpc_resolver * resolver, grpc_closure_list * closure_list, const char *file, int line, const char *reason) +{ + gpr_log (file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p ref %d -> %d %s", resolver, (int) resolver->refs.count, (int) resolver->refs.count + 1, reason); #else -void grpc_resolver_ref(grpc_resolver *resolver) { +void +grpc_resolver_ref (grpc_resolver * resolver) +{ #endif - gpr_ref(&resolver->refs); + gpr_ref (&resolver->refs); } #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG -void grpc_resolver_unref(grpc_resolver *resolver, - grpc_closure_list *closure_list, const char *file, - int line, const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p unref %d -> %d %s", - resolver, (int)resolver->refs.count, (int)resolver->refs.count - 1, - reason); +void +grpc_resolver_unref (grpc_resolver * resolver, grpc_closure_list * closure_list, const char *file, int line, const char *reason) +{ + gpr_log (file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p unref %d -> %d %s", resolver, (int) resolver->refs.count, (int) resolver->refs.count - 1, reason); #else -void grpc_resolver_unref(grpc_resolver *resolver, - grpc_closure_list *closure_list) { +void +grpc_resolver_unref (grpc_resolver * resolver, grpc_closure_list * closure_list) +{ #endif - if (gpr_unref(&resolver->refs)) { - resolver->vtable->destroy(resolver, closure_list); - } + if (gpr_unref (&resolver->refs)) + { + resolver->vtable->destroy (resolver, closure_list); + } } -void grpc_resolver_shutdown(grpc_resolver *resolver, - grpc_closure_list *closure_list) { - resolver->vtable->shutdown(resolver, closure_list); +void +grpc_resolver_shutdown (grpc_resolver * resolver, grpc_closure_list * closure_list) +{ + resolver->vtable->shutdown (resolver, closure_list); } -void grpc_resolver_channel_saw_error(grpc_resolver *resolver, - struct sockaddr *failing_address, - int failing_address_len, - grpc_closure_list *closure_list) { - resolver->vtable->channel_saw_error(resolver, failing_address, - failing_address_len, closure_list); +void +grpc_resolver_channel_saw_error (grpc_resolver * resolver, struct sockaddr *failing_address, int failing_address_len, grpc_closure_list * closure_list) +{ + resolver->vtable->channel_saw_error (resolver, failing_address, failing_address_len, closure_list); } -void grpc_resolver_next(grpc_resolver *resolver, - grpc_client_config **target_config, - grpc_closure *on_complete, - grpc_closure_list *closure_list) { - resolver->vtable->next(resolver, target_config, on_complete, closure_list); +void +grpc_resolver_next (grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete, grpc_closure_list * closure_list) +{ + resolver->vtable->next (resolver, target_config, on_complete, closure_list); } diff --git a/src/core/client_config/resolver.h b/src/core/client_config/resolver.h index 0c39d37018..b51ef89bd5 100644 --- a/src/core/client_config/resolver.h +++ b/src/core/client_config/resolver.h @@ -43,50 +43,40 @@ typedef struct grpc_resolver_vtable grpc_resolver_vtable; /** grpc_resolver provides grpc_client_config objects to grpc_channel objects */ -struct grpc_resolver { +struct grpc_resolver +{ const grpc_resolver_vtable *vtable; gpr_refcount refs; }; -struct grpc_resolver_vtable { - void (*destroy)(grpc_resolver *resolver, grpc_closure_list *closure_list); - void (*shutdown)(grpc_resolver *resolver, grpc_closure_list *closure_list); - void (*channel_saw_error)(grpc_resolver *resolver, - struct sockaddr *failing_address, - int failing_address_len, - grpc_closure_list *closure_list); - void (*next)(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_closure *on_complete, grpc_closure_list *closure_list); +struct grpc_resolver_vtable +{ + void (*destroy) (grpc_resolver * resolver, grpc_closure_list * closure_list); + void (*shutdown) (grpc_resolver * resolver, grpc_closure_list * closure_list); + void (*channel_saw_error) (grpc_resolver * resolver, struct sockaddr * failing_address, int failing_address_len, grpc_closure_list * closure_list); + void (*next) (grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete, grpc_closure_list * closure_list); }; #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG #define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p), __FILE__, __LINE__, (r)) #define GRPC_RESOLVER_UNREF(p, r, cl) \ grpc_resolver_unref((p), (cl), __FILE__, __LINE__, (r)) -void grpc_resolver_ref(grpc_resolver *policy, const char *file, int line, - const char *reason); -void grpc_resolver_unref(grpc_resolver *policy, grpc_closure_list *closure_list, - const char *file, int line, const char *reason); +void grpc_resolver_ref (grpc_resolver * policy, const char *file, int line, const char *reason); +void grpc_resolver_unref (grpc_resolver * policy, grpc_closure_list * closure_list, const char *file, int line, const char *reason); #else #define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p)) #define GRPC_RESOLVER_UNREF(p, r, cl) grpc_resolver_unref((p), (cl)) -void grpc_resolver_ref(grpc_resolver *policy); -void grpc_resolver_unref(grpc_resolver *policy, - grpc_closure_list *closure_list); +void grpc_resolver_ref (grpc_resolver * policy); +void grpc_resolver_unref (grpc_resolver * policy, grpc_closure_list * closure_list); #endif -void grpc_resolver_init(grpc_resolver *resolver, - const grpc_resolver_vtable *vtable); +void grpc_resolver_init (grpc_resolver * resolver, const grpc_resolver_vtable * vtable); -void grpc_resolver_shutdown(grpc_resolver *resolver, - grpc_closure_list *closure_list); +void grpc_resolver_shutdown (grpc_resolver * resolver, grpc_closure_list * closure_list); /** Notification that the channel has seen an error on some address. Can be used as a hint that re-resolution is desirable soon. */ -void grpc_resolver_channel_saw_error(grpc_resolver *resolver, - struct sockaddr *failing_address, - int failing_address_len, - grpc_closure_list *closure_list); +void grpc_resolver_channel_saw_error (grpc_resolver * resolver, struct sockaddr *failing_address, int failing_address_len, grpc_closure_list * closure_list); /** Get the next client config. Called by the channel to fetch a new configuration. Expected to set *target_config with a new configuration, @@ -94,9 +84,6 @@ void grpc_resolver_channel_saw_error(grpc_resolver *resolver, If resolution is fatally broken, set *target_config to NULL and schedule on_complete. */ -void grpc_resolver_next(grpc_resolver *resolver, - grpc_client_config **target_config, - grpc_closure *on_complete, - grpc_closure_list *closure_list); +void grpc_resolver_next (grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete, grpc_closure_list * closure_list); #endif /* GRPC_INTERNAL_CORE_CONFIG_RESOLVER_H */ diff --git a/src/core/client_config/resolver_factory.c b/src/core/client_config/resolver_factory.c index af04f7d9c7..d4f8201918 100644 --- a/src/core/client_config/resolver_factory.c +++ b/src/core/client_config/resolver_factory.c @@ -33,23 +33,31 @@ #include "src/core/client_config/resolver_factory.h" -void grpc_resolver_factory_ref(grpc_resolver_factory *factory) { - factory->vtable->ref(factory); +void +grpc_resolver_factory_ref (grpc_resolver_factory * factory) +{ + factory->vtable->ref (factory); } -void grpc_resolver_factory_unref(grpc_resolver_factory *factory) { - factory->vtable->unref(factory); +void +grpc_resolver_factory_unref (grpc_resolver_factory * factory) +{ + factory->vtable->unref (factory); } /** Create a resolver instance for a name */ -grpc_resolver *grpc_resolver_factory_create_resolver( - grpc_resolver_factory *factory, grpc_resolver_args *args) { - if (factory == NULL) return NULL; - return factory->vtable->create_resolver(factory, args); +grpc_resolver * +grpc_resolver_factory_create_resolver (grpc_resolver_factory * factory, grpc_resolver_args * args) +{ + if (factory == NULL) + return NULL; + return factory->vtable->create_resolver (factory, args); } -char *grpc_resolver_factory_get_default_authority( - grpc_resolver_factory *factory, grpc_uri *uri) { - if (factory == NULL) return NULL; - return factory->vtable->get_default_authority(factory, uri); +char * +grpc_resolver_factory_get_default_authority (grpc_resolver_factory * factory, grpc_uri * uri) +{ + if (factory == NULL) + return NULL; + return factory->vtable->get_default_authority (factory, uri); } diff --git a/src/core/client_config/resolver_factory.h b/src/core/client_config/resolver_factory.h index 4c4df353f7..920776886a 100644 --- a/src/core/client_config/resolver_factory.h +++ b/src/core/client_config/resolver_factory.h @@ -43,40 +43,40 @@ typedef struct grpc_resolver_factory_vtable grpc_resolver_factory_vtable; /** grpc_resolver provides grpc_client_config objects to grpc_channel objects */ -struct grpc_resolver_factory { +struct grpc_resolver_factory +{ const grpc_resolver_factory_vtable *vtable; }; -typedef struct grpc_resolver_args { +typedef struct grpc_resolver_args +{ grpc_uri *uri; grpc_subchannel_factory *subchannel_factory; } grpc_resolver_args; -struct grpc_resolver_factory_vtable { - void (*ref)(grpc_resolver_factory *factory); - void (*unref)(grpc_resolver_factory *factory); +struct grpc_resolver_factory_vtable +{ + void (*ref) (grpc_resolver_factory * factory); + void (*unref) (grpc_resolver_factory * factory); /** Implementation of grpc_resolver_factory_create_resolver */ - grpc_resolver *(*create_resolver)(grpc_resolver_factory *factory, - grpc_resolver_args *args); + grpc_resolver *(*create_resolver) (grpc_resolver_factory * factory, grpc_resolver_args * args); /** Implementation of grpc_resolver_factory_get_default_authority */ - char *(*get_default_authority)(grpc_resolver_factory *factory, grpc_uri *uri); + char *(*get_default_authority) (grpc_resolver_factory * factory, grpc_uri * uri); /** URI scheme that this factory implements */ const char *scheme; }; -void grpc_resolver_factory_ref(grpc_resolver_factory *resolver); -void grpc_resolver_factory_unref(grpc_resolver_factory *resolver); +void grpc_resolver_factory_ref (grpc_resolver_factory * resolver); +void grpc_resolver_factory_unref (grpc_resolver_factory * resolver); /** Create a resolver instance for a name */ -grpc_resolver *grpc_resolver_factory_create_resolver( - grpc_resolver_factory *factory, grpc_resolver_args *args); +grpc_resolver *grpc_resolver_factory_create_resolver (grpc_resolver_factory * factory, grpc_resolver_args * args); /** Return a (freshly allocated with gpr_malloc) string representing the default authority to use for this scheme. */ -char *grpc_resolver_factory_get_default_authority( - grpc_resolver_factory *factory, grpc_uri *uri); +char *grpc_resolver_factory_get_default_authority (grpc_resolver_factory * factory, grpc_uri * uri); #endif /* GRPC_INTERNAL_CORE_CONFIG_RESOLVER_FACTORY_H */ diff --git a/src/core/client_config/resolver_registry.c b/src/core/client_config/resolver_registry.c index 89a945c2d3..1ebf43048e 100644 --- a/src/core/client_config/resolver_registry.c +++ b/src/core/client_config/resolver_registry.c @@ -46,92 +46,112 @@ static int g_number_of_resolvers = 0; static char *g_default_resolver_prefix; -void grpc_resolver_registry_init(const char *default_resolver_prefix) { +void +grpc_resolver_registry_init (const char *default_resolver_prefix) +{ g_number_of_resolvers = 0; - g_default_resolver_prefix = gpr_strdup(default_resolver_prefix); + g_default_resolver_prefix = gpr_strdup (default_resolver_prefix); } -void grpc_resolver_registry_shutdown(void) { +void +grpc_resolver_registry_shutdown (void) +{ int i; - for (i = 0; i < g_number_of_resolvers; i++) { - grpc_resolver_factory_unref(g_all_of_the_resolvers[i]); - } - gpr_free(g_default_resolver_prefix); + for (i = 0; i < g_number_of_resolvers; i++) + { + grpc_resolver_factory_unref (g_all_of_the_resolvers[i]); + } + gpr_free (g_default_resolver_prefix); } -void grpc_register_resolver_type(grpc_resolver_factory *factory) { +void +grpc_register_resolver_type (grpc_resolver_factory * factory) +{ int i; - for (i = 0; i < g_number_of_resolvers; i++) { - GPR_ASSERT(0 != strcmp(factory->vtable->scheme, - g_all_of_the_resolvers[i]->vtable->scheme)); - } - GPR_ASSERT(g_number_of_resolvers != MAX_RESOLVERS); - grpc_resolver_factory_ref(factory); + for (i = 0; i < g_number_of_resolvers; i++) + { + GPR_ASSERT (0 != strcmp (factory->vtable->scheme, g_all_of_the_resolvers[i]->vtable->scheme)); + } + GPR_ASSERT (g_number_of_resolvers != MAX_RESOLVERS); + grpc_resolver_factory_ref (factory); g_all_of_the_resolvers[g_number_of_resolvers++] = factory; } -static grpc_resolver_factory *lookup_factory(grpc_uri *uri) { +static grpc_resolver_factory * +lookup_factory (grpc_uri * uri) +{ int i; /* handling NULL uri's here simplifies grpc_resolver_create */ - if (!uri) return NULL; - - for (i = 0; i < g_number_of_resolvers; i++) { - if (0 == strcmp(uri->scheme, g_all_of_the_resolvers[i]->vtable->scheme)) { - return g_all_of_the_resolvers[i]; + if (!uri) + return NULL; + + for (i = 0; i < g_number_of_resolvers; i++) + { + if (0 == strcmp (uri->scheme, g_all_of_the_resolvers[i]->vtable->scheme)) + { + return g_all_of_the_resolvers[i]; + } } - } return NULL; } -static grpc_resolver_factory *resolve_factory(const char *target, - grpc_uri **uri) { +static grpc_resolver_factory * +resolve_factory (const char *target, grpc_uri ** uri) +{ char *tmp; grpc_resolver_factory *factory = NULL; - GPR_ASSERT(uri != NULL); - *uri = grpc_uri_parse(target, 1); - factory = lookup_factory(*uri); - if (factory == NULL) { - if (g_default_resolver_prefix != NULL) { - grpc_uri_destroy(*uri); - gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target); - *uri = grpc_uri_parse(tmp, 1); - factory = lookup_factory(*uri); - if (factory == NULL) { - grpc_uri_destroy(grpc_uri_parse(target, 0)); - grpc_uri_destroy(grpc_uri_parse(tmp, 0)); - gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, - tmp); - } - gpr_free(tmp); - } else { - grpc_uri_destroy(grpc_uri_parse(target, 0)); - gpr_log(GPR_ERROR, "don't know how to resolve '%s'", target); + GPR_ASSERT (uri != NULL); + *uri = grpc_uri_parse (target, 1); + factory = lookup_factory (*uri); + if (factory == NULL) + { + if (g_default_resolver_prefix != NULL) + { + grpc_uri_destroy (*uri); + gpr_asprintf (&tmp, "%s%s", g_default_resolver_prefix, target); + *uri = grpc_uri_parse (tmp, 1); + factory = lookup_factory (*uri); + if (factory == NULL) + { + grpc_uri_destroy (grpc_uri_parse (target, 0)); + grpc_uri_destroy (grpc_uri_parse (tmp, 0)); + gpr_log (GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, tmp); + } + gpr_free (tmp); + } + else + { + grpc_uri_destroy (grpc_uri_parse (target, 0)); + gpr_log (GPR_ERROR, "don't know how to resolve '%s'", target); + } } - } return factory; } -grpc_resolver *grpc_resolver_create( - const char *target, grpc_subchannel_factory *subchannel_factory) { +grpc_resolver * +grpc_resolver_create (const char *target, grpc_subchannel_factory * subchannel_factory) +{ grpc_uri *uri = NULL; - grpc_resolver_factory *factory = resolve_factory(target, &uri); + grpc_resolver_factory *factory = resolve_factory (target, &uri); grpc_resolver *resolver; grpc_resolver_args args; - memset(&args, 0, sizeof(args)); + memset (&args, 0, sizeof (args)); args.uri = uri; args.subchannel_factory = subchannel_factory; - resolver = grpc_resolver_factory_create_resolver(factory, &args); - grpc_uri_destroy(uri); + resolver = grpc_resolver_factory_create_resolver (factory, &args); + grpc_uri_destroy (uri); return resolver; } -char *grpc_get_default_authority(const char *target) { +char * +grpc_get_default_authority (const char *target) +{ grpc_uri *uri = NULL; - grpc_resolver_factory *factory = resolve_factory(target, &uri); - char *authority = grpc_resolver_factory_get_default_authority(factory, uri); - grpc_uri_destroy(uri); + grpc_resolver_factory *factory = resolve_factory (target, &uri); + char *authority = grpc_resolver_factory_get_default_authority (factory, uri); + grpc_uri_destroy (uri); return authority; } diff --git a/src/core/client_config/resolver_registry.h b/src/core/client_config/resolver_registry.h index 5a7193b7ae..d52e50fd13 100644 --- a/src/core/client_config/resolver_registry.h +++ b/src/core/client_config/resolver_registry.h @@ -36,15 +36,15 @@ #include "src/core/client_config/resolver_factory.h" -void grpc_resolver_registry_init(const char *default_prefix); -void grpc_resolver_registry_shutdown(void); +void grpc_resolver_registry_init (const char *default_prefix); +void grpc_resolver_registry_shutdown (void); /** Register a resolver type. URI's of \a scheme will be resolved with the given resolver. If \a priority is greater than zero, then the resolver will be eligible to resolve names that are passed in with no scheme. Higher priority resolvers will be tried before lower priority schemes. */ -void grpc_register_resolver_type(grpc_resolver_factory *factory); +void grpc_register_resolver_type (grpc_resolver_factory * factory); /** Create a resolver given \a target. First tries to parse \a target as a URI. If this succeeds, tries @@ -55,11 +55,10 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); If a resolver factory was found, use it to instantiate a resolver and return it. If a resolver factory was not found, return NULL. */ -grpc_resolver *grpc_resolver_create( - const char *target, grpc_subchannel_factory *subchannel_factory); +grpc_resolver *grpc_resolver_create (const char *target, grpc_subchannel_factory * subchannel_factory); /** Given a target, return a (freshly allocated with gpr_malloc) string representing the default authority to pass from a client. */ -char *grpc_get_default_authority(const char *target); +char *grpc_get_default_authority (const char *target); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVER_REGISTRY_H */ diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 73723350cb..10d6cfa13e 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -44,7 +44,8 @@ #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" -typedef struct { +typedef struct +{ /** base class: must be first */ grpc_resolver base; /** refcount */ @@ -74,162 +75,175 @@ typedef struct { grpc_client_config *resolved_config; } dns_resolver; -static void dns_destroy(grpc_resolver *r, grpc_closure_list *closure_list); +static void dns_destroy (grpc_resolver * r, grpc_closure_list * closure_list); -static void dns_start_resolving_locked(dns_resolver *r); -static void dns_maybe_finish_next_locked(dns_resolver *r, - grpc_closure_list *closure_list); +static void dns_start_resolving_locked (dns_resolver * r); +static void dns_maybe_finish_next_locked (dns_resolver * r, grpc_closure_list * closure_list); -static void dns_shutdown(grpc_resolver *r, grpc_closure_list *closure_list); -static void dns_channel_saw_error(grpc_resolver *r, - struct sockaddr *failing_address, - int failing_address_len, - grpc_closure_list *closure_list); -static void dns_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_closure *on_complete, - grpc_closure_list *closure_list); +static void dns_shutdown (grpc_resolver * r, grpc_closure_list * closure_list); +static void dns_channel_saw_error (grpc_resolver * r, struct sockaddr *failing_address, int failing_address_len, grpc_closure_list * closure_list); +static void dns_next (grpc_resolver * r, grpc_client_config ** target_config, grpc_closure * on_complete, grpc_closure_list * closure_list); static const grpc_resolver_vtable dns_resolver_vtable = { - dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next}; + dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next +}; -static void dns_shutdown(grpc_resolver *resolver, - grpc_closure_list *closure_list) { - dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); - if (r->next_completion != NULL) { - *r->target_config = NULL; - grpc_closure_list_add(closure_list, r->next_completion, 1); - r->next_completion = NULL; - } - gpr_mu_unlock(&r->mu); +static void +dns_shutdown (grpc_resolver * resolver, grpc_closure_list * closure_list) +{ + dns_resolver *r = (dns_resolver *) resolver; + gpr_mu_lock (&r->mu); + if (r->next_completion != NULL) + { + *r->target_config = NULL; + grpc_closure_list_add (closure_list, r->next_completion, 1); + r->next_completion = NULL; + } + gpr_mu_unlock (&r->mu); } -static void dns_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa, - int len, grpc_closure_list *closure_list) { - dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); - if (!r->resolving) { - dns_start_resolving_locked(r); - } - gpr_mu_unlock(&r->mu); +static void +dns_channel_saw_error (grpc_resolver * resolver, struct sockaddr *sa, int len, grpc_closure_list * closure_list) +{ + dns_resolver *r = (dns_resolver *) resolver; + gpr_mu_lock (&r->mu); + if (!r->resolving) + { + dns_start_resolving_locked (r); + } + gpr_mu_unlock (&r->mu); } -static void dns_next(grpc_resolver *resolver, - grpc_client_config **target_config, - grpc_closure *on_complete, - grpc_closure_list *closure_list) { - dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); - GPR_ASSERT(!r->next_completion); +static void +dns_next (grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete, grpc_closure_list * closure_list) +{ + dns_resolver *r = (dns_resolver *) resolver; + gpr_mu_lock (&r->mu); + GPR_ASSERT (!r->next_completion); r->next_completion = on_complete; r->target_config = target_config; - if (r->resolved_version == 0 && !r->resolving) { - dns_start_resolving_locked(r); - } else { - dns_maybe_finish_next_locked(r, closure_list); - } - gpr_mu_unlock(&r->mu); + if (r->resolved_version == 0 && !r->resolving) + { + dns_start_resolving_locked (r); + } + else + { + dns_maybe_finish_next_locked (r, closure_list); + } + gpr_mu_unlock (&r->mu); } -static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses, - grpc_closure_list *closure_list) { +static void +dns_on_resolved (void *arg, grpc_resolved_addresses * addresses, grpc_closure_list * closure_list) +{ dns_resolver *r = arg; grpc_client_config *config = NULL; grpc_subchannel **subchannels; grpc_subchannel_args args; grpc_lb_policy *lb_policy; size_t i; - if (addresses) { - grpc_lb_policy_args lb_policy_args; - config = grpc_client_config_create(); - subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); - for (i = 0; i < addresses->naddrs; i++) { - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)(addresses->addrs[i].addr); - args.addr_len = (size_t)addresses->addrs[i].len; - subchannels[i] = grpc_subchannel_factory_create_subchannel( - r->subchannel_factory, &args, closure_list); + if (addresses) + { + grpc_lb_policy_args lb_policy_args; + config = grpc_client_config_create (); + subchannels = gpr_malloc (sizeof (grpc_subchannel *) * addresses->naddrs); + for (i = 0; i < addresses->naddrs; i++) + { + memset (&args, 0, sizeof (args)); + args.addr = (struct sockaddr *) (addresses->addrs[i].addr); + args.addr_len = (size_t) addresses->addrs[i].len; + subchannels[i] = grpc_subchannel_factory_create_subchannel (r->subchannel_factory, &args, closure_list); + } + memset (&lb_policy_args, 0, sizeof (lb_policy_args)); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy = grpc_lb_policy_create (r->lb_policy_name, &lb_policy_args); + grpc_client_config_set_lb_policy (config, lb_policy); + GRPC_LB_POLICY_UNREF (lb_policy, "construction", closure_list); + grpc_resolved_addresses_destroy (addresses); + gpr_free (subchannels); } - memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = addresses->naddrs; - lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); - grpc_client_config_set_lb_policy(config, lb_policy); - GRPC_LB_POLICY_UNREF(lb_policy, "construction", closure_list); - grpc_resolved_addresses_destroy(addresses); - gpr_free(subchannels); - } - gpr_mu_lock(&r->mu); - GPR_ASSERT(r->resolving); + gpr_mu_lock (&r->mu); + GPR_ASSERT (r->resolving); r->resolving = 0; - if (r->resolved_config) { - grpc_client_config_unref(r->resolved_config, closure_list); - } + if (r->resolved_config) + { + grpc_client_config_unref (r->resolved_config, closure_list); + } r->resolved_config = config; r->resolved_version++; - dns_maybe_finish_next_locked(r, closure_list); - gpr_mu_unlock(&r->mu); + dns_maybe_finish_next_locked (r, closure_list); + gpr_mu_unlock (&r->mu); - GRPC_RESOLVER_UNREF(&r->base, "dns-resolving", closure_list); + GRPC_RESOLVER_UNREF (&r->base, "dns-resolving", closure_list); } -static void dns_start_resolving_locked(dns_resolver *r) { - GRPC_RESOLVER_REF(&r->base, "dns-resolving"); - GPR_ASSERT(!r->resolving); +static void +dns_start_resolving_locked (dns_resolver * r) +{ + GRPC_RESOLVER_REF (&r->base, "dns-resolving"); + GPR_ASSERT (!r->resolving); r->resolving = 1; - grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r); + grpc_resolve_address (r->name, r->default_port, dns_on_resolved, r); } -static void dns_maybe_finish_next_locked(dns_resolver *r, - grpc_closure_list *closure_list) { - if (r->next_completion != NULL && - r->resolved_version != r->published_version) { - *r->target_config = r->resolved_config; - if (r->resolved_config) { - grpc_client_config_ref(r->resolved_config); +static void +dns_maybe_finish_next_locked (dns_resolver * r, grpc_closure_list * closure_list) +{ + if (r->next_completion != NULL && r->resolved_version != r->published_version) + { + *r->target_config = r->resolved_config; + if (r->resolved_config) + { + grpc_client_config_ref (r->resolved_config); + } + grpc_closure_list_add (closure_list, r->next_completion, 1); + r->next_completion = NULL; + r->published_version = r->resolved_version; } - grpc_closure_list_add(closure_list, r->next_completion, 1); - r->next_completion = NULL; - r->published_version = r->resolved_version; - } } -static void dns_destroy(grpc_resolver *gr, grpc_closure_list *closure_list) { - dns_resolver *r = (dns_resolver *)gr; - gpr_mu_destroy(&r->mu); - if (r->resolved_config) { - grpc_client_config_unref(r->resolved_config, closure_list); - } - grpc_subchannel_factory_unref(r->subchannel_factory, closure_list); - gpr_free(r->name); - gpr_free(r->default_port); - gpr_free(r->lb_policy_name); - gpr_free(r); +static void +dns_destroy (grpc_resolver * gr, grpc_closure_list * closure_list) +{ + dns_resolver *r = (dns_resolver *) gr; + gpr_mu_destroy (&r->mu); + if (r->resolved_config) + { + grpc_client_config_unref (r->resolved_config, closure_list); + } + grpc_subchannel_factory_unref (r->subchannel_factory, closure_list); + gpr_free (r->name); + gpr_free (r->default_port); + gpr_free (r->lb_policy_name); + gpr_free (r); } -static grpc_resolver *dns_create(grpc_resolver_args *args, - const char *default_port, - const char *lb_policy_name) { +static grpc_resolver * +dns_create (grpc_resolver_args * args, const char *default_port, const char *lb_policy_name) +{ dns_resolver *r; const char *path = args->uri->path; - if (0 != strcmp(args->uri->authority, "")) { - gpr_log(GPR_ERROR, "authority based dns uri's not supported"); - return NULL; - } + if (0 != strcmp (args->uri->authority, "")) + { + gpr_log (GPR_ERROR, "authority based dns uri's not supported"); + return NULL; + } - if (path[0] == '/') ++path; + if (path[0] == '/') + ++path; - r = gpr_malloc(sizeof(dns_resolver)); - memset(r, 0, sizeof(*r)); - gpr_ref_init(&r->refs, 1); - gpr_mu_init(&r->mu); - grpc_resolver_init(&r->base, &dns_resolver_vtable); - r->name = gpr_strdup(path); - r->default_port = gpr_strdup(default_port); + r = gpr_malloc (sizeof (dns_resolver)); + memset (r, 0, sizeof (*r)); + gpr_ref_init (&r->refs, 1); + gpr_mu_init (&r->mu); + grpc_resolver_init (&r->base, &dns_resolver_vtable); + r->name = gpr_strdup (path); + r->default_port = gpr_strdup (default_port); r->subchannel_factory = args->subchannel_factory; - grpc_subchannel_factory_ref(r->subchannel_factory); - r->lb_policy_name = gpr_strdup(lb_policy_name); + grpc_subchannel_factory_ref (r->subchannel_factory); + r->lb_policy_name = gpr_strdup (lb_policy_name); return &r->base; } @@ -237,27 +251,39 @@ static grpc_resolver *dns_create(grpc_resolver_args *args, * FACTORY */ -static void dns_factory_ref(grpc_resolver_factory *factory) {} +static void +dns_factory_ref (grpc_resolver_factory * factory) +{ +} -static void dns_factory_unref(grpc_resolver_factory *factory) {} +static void +dns_factory_unref (grpc_resolver_factory * factory) +{ +} -static grpc_resolver *dns_factory_create_resolver( - grpc_resolver_factory *factory, grpc_resolver_args *args) { - return dns_create(args, "https", "pick_first"); +static grpc_resolver * +dns_factory_create_resolver (grpc_resolver_factory * factory, grpc_resolver_args * args) +{ + return dns_create (args, "https", "pick_first"); } -char *dns_factory_get_default_host_name(grpc_resolver_factory *factory, - grpc_uri *uri) { +char * +dns_factory_get_default_host_name (grpc_resolver_factory * factory, grpc_uri * uri) +{ const char *path = uri->path; - if (path[0] == '/') ++path; - return gpr_strdup(path); + if (path[0] == '/') + ++path; + return gpr_strdup (path); } static const grpc_resolver_factory_vtable dns_factory_vtable = { - dns_factory_ref, dns_factory_unref, dns_factory_create_resolver, - dns_factory_get_default_host_name, "dns"}; -static grpc_resolver_factory dns_resolver_factory = {&dns_factory_vtable}; + dns_factory_ref, dns_factory_unref, dns_factory_create_resolver, + dns_factory_get_default_host_name, "dns" +}; +static grpc_resolver_factory dns_resolver_factory = { &dns_factory_vtable }; -grpc_resolver_factory *grpc_dns_resolver_factory_create() { +grpc_resolver_factory * +grpc_dns_resolver_factory_create () +{ return &dns_resolver_factory; } diff --git a/src/core/client_config/resolvers/dns_resolver.h b/src/core/client_config/resolvers/dns_resolver.h index a3ef3161a6..bb43499149 100644 --- a/src/core/client_config/resolvers/dns_resolver.h +++ b/src/core/client_config/resolvers/dns_resolver.h @@ -37,6 +37,6 @@ #include "src/core/client_config/resolver_factory.h" /** Create a dns resolver factory */ -grpc_resolver_factory *grpc_dns_resolver_factory_create(void); +grpc_resolver_factory *grpc_dns_resolver_factory_create (void); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_DNS_RESOLVER_H */ diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 378813f45b..5358caf3bd 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -49,7 +49,8 @@ #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" -typedef struct { +typedef struct +{ /** base class: must be first */ grpc_resolver base; /** refcount */ @@ -76,285 +77,314 @@ typedef struct { grpc_client_config **target_config; } sockaddr_resolver; -static void sockaddr_destroy(grpc_resolver *r, grpc_closure_list *closure_list); +static void sockaddr_destroy (grpc_resolver * r, grpc_closure_list * closure_list); -static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r, - grpc_closure_list *closure_list); +static void sockaddr_maybe_finish_next_locked (sockaddr_resolver * r, grpc_closure_list * closure_list); -static void sockaddr_shutdown(grpc_resolver *r, - grpc_closure_list *closure_list); -static void sockaddr_channel_saw_error(grpc_resolver *r, - struct sockaddr *failing_address, - int failing_address_len, - grpc_closure_list *closure_list); -static void sockaddr_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_closure *on_complete, - grpc_closure_list *closure_list); +static void sockaddr_shutdown (grpc_resolver * r, grpc_closure_list * closure_list); +static void sockaddr_channel_saw_error (grpc_resolver * r, struct sockaddr *failing_address, int failing_address_len, grpc_closure_list * closure_list); +static void sockaddr_next (grpc_resolver * r, grpc_client_config ** target_config, grpc_closure * on_complete, grpc_closure_list * closure_list); static const grpc_resolver_vtable sockaddr_resolver_vtable = { - sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, - sockaddr_next}; - -static void sockaddr_shutdown(grpc_resolver *resolver, - grpc_closure_list *closure_list) { - sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); - if (r->next_completion != NULL) { - *r->target_config = NULL; - grpc_closure_list_add(closure_list, r->next_completion, 1); - r->next_completion = NULL; - } - gpr_mu_unlock(&r->mu); + sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, + sockaddr_next +}; + +static void +sockaddr_shutdown (grpc_resolver * resolver, grpc_closure_list * closure_list) +{ + sockaddr_resolver *r = (sockaddr_resolver *) resolver; + gpr_mu_lock (&r->mu); + if (r->next_completion != NULL) + { + *r->target_config = NULL; + grpc_closure_list_add (closure_list, r->next_completion, 1); + r->next_completion = NULL; + } + gpr_mu_unlock (&r->mu); +} + +static void +sockaddr_channel_saw_error (grpc_resolver * resolver, struct sockaddr *sa, int len, grpc_closure_list * closure_list) +{ } -static void sockaddr_channel_saw_error(grpc_resolver *resolver, - struct sockaddr *sa, int len, - grpc_closure_list *closure_list) {} - -static void sockaddr_next(grpc_resolver *resolver, - grpc_client_config **target_config, - grpc_closure *on_complete, - grpc_closure_list *closure_list) { - sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); - GPR_ASSERT(!r->next_completion); +static void +sockaddr_next (grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete, grpc_closure_list * closure_list) +{ + sockaddr_resolver *r = (sockaddr_resolver *) resolver; + gpr_mu_lock (&r->mu); + GPR_ASSERT (!r->next_completion); r->next_completion = on_complete; r->target_config = target_config; - sockaddr_maybe_finish_next_locked(r, closure_list); - gpr_mu_unlock(&r->mu); + sockaddr_maybe_finish_next_locked (r, closure_list); + gpr_mu_unlock (&r->mu); } -static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r, - grpc_closure_list *closure_list) { +static void +sockaddr_maybe_finish_next_locked (sockaddr_resolver * r, grpc_closure_list * closure_list) +{ grpc_client_config *cfg; grpc_lb_policy *lb_policy; grpc_lb_policy_args lb_policy_args; grpc_subchannel **subchannels; grpc_subchannel_args args; - if (r->next_completion != NULL && !r->published) { - size_t i; - cfg = grpc_client_config_create(); - subchannels = gpr_malloc(sizeof(grpc_subchannel *) * r->num_addrs); - for (i = 0; i < r->num_addrs; i++) { - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)&r->addrs[i]; - args.addr_len = r->addrs_len[i]; - subchannels[i] = grpc_subchannel_factory_create_subchannel( - r->subchannel_factory, &args, closure_list); + if (r->next_completion != NULL && !r->published) + { + size_t i; + cfg = grpc_client_config_create (); + subchannels = gpr_malloc (sizeof (grpc_subchannel *) * r->num_addrs); + for (i = 0; i < r->num_addrs; i++) + { + memset (&args, 0, sizeof (args)); + args.addr = (struct sockaddr *) &r->addrs[i]; + args.addr_len = r->addrs_len[i]; + subchannels[i] = grpc_subchannel_factory_create_subchannel (r->subchannel_factory, &args, closure_list); + } + memset (&lb_policy_args, 0, sizeof (lb_policy_args)); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = r->num_addrs; + lb_policy = grpc_lb_policy_create (r->lb_policy_name, &lb_policy_args); + gpr_free (subchannels); + grpc_client_config_set_lb_policy (cfg, lb_policy); + GRPC_LB_POLICY_UNREF (lb_policy, "sockaddr", closure_list); + r->published = 1; + *r->target_config = cfg; + grpc_closure_list_add (closure_list, r->next_completion, 1); + r->next_completion = NULL; } - memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = r->num_addrs; - lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); - gpr_free(subchannels); - grpc_client_config_set_lb_policy(cfg, lb_policy); - GRPC_LB_POLICY_UNREF(lb_policy, "sockaddr", closure_list); - r->published = 1; - *r->target_config = cfg; - grpc_closure_list_add(closure_list, r->next_completion, 1); - r->next_completion = NULL; - } } -static void sockaddr_destroy(grpc_resolver *gr, - grpc_closure_list *closure_list) { - sockaddr_resolver *r = (sockaddr_resolver *)gr; - gpr_mu_destroy(&r->mu); - grpc_subchannel_factory_unref(r->subchannel_factory, closure_list); - gpr_free(r->addrs); - gpr_free(r->addrs_len); - gpr_free(r->lb_policy_name); - gpr_free(r); +static void +sockaddr_destroy (grpc_resolver * gr, grpc_closure_list * closure_list) +{ + sockaddr_resolver *r = (sockaddr_resolver *) gr; + gpr_mu_destroy (&r->mu); + grpc_subchannel_factory_unref (r->subchannel_factory, closure_list); + gpr_free (r->addrs); + gpr_free (r->addrs_len); + gpr_free (r->lb_policy_name); + gpr_free (r); } #ifdef GPR_POSIX_SOCKET -static int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, - size_t *len) { - struct sockaddr_un *un = (struct sockaddr_un *)addr; +static int +parse_unix (grpc_uri * uri, struct sockaddr_storage *addr, size_t * len) +{ + struct sockaddr_un *un = (struct sockaddr_un *) addr; un->sun_family = AF_UNIX; - strcpy(un->sun_path, uri->path); - *len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; + strcpy (un->sun_path, uri->path); + *len = strlen (un->sun_path) + sizeof (un->sun_family) + 1; return 1; } -static char *unix_get_default_authority(grpc_resolver_factory *factory, - grpc_uri *uri) { - return gpr_strdup("localhost"); +static char * +unix_get_default_authority (grpc_resolver_factory * factory, grpc_uri * uri) +{ + return gpr_strdup ("localhost"); } #endif -static char *ip_get_default_authority(grpc_uri *uri) { +static char * +ip_get_default_authority (grpc_uri * uri) +{ const char *path = uri->path; - if (path[0] == '/') ++path; - return gpr_strdup(path); + if (path[0] == '/') + ++path; + return gpr_strdup (path); } -static char *ipv4_get_default_authority(grpc_resolver_factory *factory, - grpc_uri *uri) { - return ip_get_default_authority(uri); +static char * +ipv4_get_default_authority (grpc_resolver_factory * factory, grpc_uri * uri) +{ + return ip_get_default_authority (uri); } -static char *ipv6_get_default_authority(grpc_resolver_factory *factory, - grpc_uri *uri) { - return ip_get_default_authority(uri); +static char * +ipv6_get_default_authority (grpc_resolver_factory * factory, grpc_uri * uri) +{ + return ip_get_default_authority (uri); } -static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, - size_t *len) { +static int +parse_ipv4 (grpc_uri * uri, struct sockaddr_storage *addr, size_t * len) +{ const char *host_port = uri->path; char *host; char *port; int port_num; int result = 0; - struct sockaddr_in *in = (struct sockaddr_in *)addr; + struct sockaddr_in *in = (struct sockaddr_in *) addr; - if (*host_port == '/') ++host_port; - if (!gpr_split_host_port(host_port, &host, &port)) { - return 0; - } + if (*host_port == '/') + ++host_port; + if (!gpr_split_host_port (host_port, &host, &port)) + { + return 0; + } - memset(in, 0, sizeof(*in)); - *len = sizeof(*in); + memset (in, 0, sizeof (*in)); + *len = sizeof (*in); in->sin_family = AF_INET; - if (inet_pton(AF_INET, host, &in->sin_addr) == 0) { - gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); - goto done; - } + if (inet_pton (AF_INET, host, &in->sin_addr) == 0) + { + gpr_log (GPR_ERROR, "invalid ipv4 address: '%s'", host); + goto done; + } - if (port != NULL) { - if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || - port_num > 65535) { - gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); + if (port != NULL) + { + if (sscanf (port, "%d", &port_num) != 1 || port_num < 0 || port_num > 65535) + { + gpr_log (GPR_ERROR, "invalid ipv4 port: '%s'", port); + goto done; + } + in->sin_port = htons ((gpr_uint16) port_num); + } + else + { + gpr_log (GPR_ERROR, "no port given for ipv4 scheme"); goto done; } - in->sin_port = htons((gpr_uint16)port_num); - } else { - gpr_log(GPR_ERROR, "no port given for ipv4 scheme"); - goto done; - } result = 1; done: - gpr_free(host); - gpr_free(port); + gpr_free (host); + gpr_free (port); return result; } -static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, - size_t *len) { +static int +parse_ipv6 (grpc_uri * uri, struct sockaddr_storage *addr, size_t * len) +{ const char *host_port = uri->path; char *host; char *port; int port_num; int result = 0; - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr; + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) addr; - if (*host_port == '/') ++host_port; - if (!gpr_split_host_port(host_port, &host, &port)) { - return 0; - } + if (*host_port == '/') + ++host_port; + if (!gpr_split_host_port (host_port, &host, &port)) + { + return 0; + } - memset(in6, 0, sizeof(*in6)); - *len = sizeof(*in6); + memset (in6, 0, sizeof (*in6)); + *len = sizeof (*in6); in6->sin6_family = AF_INET6; - if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { - gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); - goto done; - } + if (inet_pton (AF_INET6, host, &in6->sin6_addr) == 0) + { + gpr_log (GPR_ERROR, "invalid ipv6 address: '%s'", host); + goto done; + } - if (port != NULL) { - if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || - port_num > 65535) { - gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); + if (port != NULL) + { + if (sscanf (port, "%d", &port_num) != 1 || port_num < 0 || port_num > 65535) + { + gpr_log (GPR_ERROR, "invalid ipv6 port: '%s'", port); + goto done; + } + in6->sin6_port = htons ((gpr_uint16) port_num); + } + else + { + gpr_log (GPR_ERROR, "no port given for ipv6 scheme"); goto done; } - in6->sin6_port = htons((gpr_uint16)port_num); - } else { - gpr_log(GPR_ERROR, "no port given for ipv6 scheme"); - goto done; - } result = 1; done: - gpr_free(host); - gpr_free(port); + gpr_free (host); + gpr_free (port); return result; } -static void do_nothing(void *ignored) {} -static grpc_resolver *sockaddr_create( - grpc_resolver_args *args, const char *default_lb_policy_name, - int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { +static void +do_nothing (void *ignored) +{ +} + +static grpc_resolver * +sockaddr_create (grpc_resolver_args * args, const char *default_lb_policy_name, int parse (grpc_uri * uri, struct sockaddr_storage *dst, size_t * len)) +{ size_t i; - int errors_found = 0; /* GPR_FALSE */ + int errors_found = 0; /* GPR_FALSE */ sockaddr_resolver *r; gpr_slice path_slice; gpr_slice_buffer path_parts; - if (0 != strcmp(args->uri->authority, "")) { - gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme", - args->uri->scheme); - return NULL; - } + if (0 != strcmp (args->uri->authority, "")) + { + gpr_log (GPR_ERROR, "authority based uri's not supported by the %s scheme", args->uri->scheme); + return NULL; + } - r = gpr_malloc(sizeof(sockaddr_resolver)); - memset(r, 0, sizeof(*r)); + r = gpr_malloc (sizeof (sockaddr_resolver)); + memset (r, 0, sizeof (*r)); r->lb_policy_name = NULL; - if (0 != strcmp(args->uri->query, "")) { - gpr_slice query_slice; - gpr_slice_buffer query_parts; - - query_slice = - gpr_slice_new(args->uri->query, strlen(args->uri->query), do_nothing); - gpr_slice_buffer_init(&query_parts); - gpr_slice_split(query_slice, "=", &query_parts); - GPR_ASSERT(query_parts.count == 2); - if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) { - r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII); + if (0 != strcmp (args->uri->query, "")) + { + gpr_slice query_slice; + gpr_slice_buffer query_parts; + + query_slice = gpr_slice_new (args->uri->query, strlen (args->uri->query), do_nothing); + gpr_slice_buffer_init (&query_parts); + gpr_slice_split (query_slice, "=", &query_parts); + GPR_ASSERT (query_parts.count == 2); + if (0 == gpr_slice_str_cmp (query_parts.slices[0], "lb_policy")) + { + r->lb_policy_name = gpr_dump_slice (query_parts.slices[1], GPR_DUMP_ASCII); + } + gpr_slice_buffer_destroy (&query_parts); + gpr_slice_unref (query_slice); + } + if (r->lb_policy_name == NULL) + { + r->lb_policy_name = gpr_strdup (default_lb_policy_name); } - gpr_slice_buffer_destroy(&query_parts); - gpr_slice_unref(query_slice); - } - if (r->lb_policy_name == NULL) { - r->lb_policy_name = gpr_strdup(default_lb_policy_name); - } - path_slice = - gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing); - gpr_slice_buffer_init(&path_parts); + path_slice = gpr_slice_new (args->uri->path, strlen (args->uri->path), do_nothing); + gpr_slice_buffer_init (&path_parts); - gpr_slice_split(path_slice, ",", &path_parts); + gpr_slice_split (path_slice, ",", &path_parts); r->num_addrs = path_parts.count; - r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs); - r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs); - - for (i = 0; i < r->num_addrs; i++) { - grpc_uri ith_uri = *args->uri; - char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); - ith_uri.path = part_str; - if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) { - errors_found = 1; /* GPR_TRUE */ + r->addrs = gpr_malloc (sizeof (struct sockaddr_storage) * r->num_addrs); + r->addrs_len = gpr_malloc (sizeof (*r->addrs_len) * r->num_addrs); + + for (i = 0; i < r->num_addrs; i++) + { + grpc_uri ith_uri = *args->uri; + char *part_str = gpr_dump_slice (path_parts.slices[i], GPR_DUMP_ASCII); + ith_uri.path = part_str; + if (!parse (&ith_uri, &r->addrs[i], &r->addrs_len[i])) + { + errors_found = 1; /* GPR_TRUE */ + } + gpr_free (part_str); + if (errors_found) + break; } - gpr_free(part_str); - if (errors_found) break; - } - gpr_slice_buffer_destroy(&path_parts); - gpr_slice_unref(path_slice); - if (errors_found) { - gpr_free(r); - return NULL; - } + gpr_slice_buffer_destroy (&path_parts); + gpr_slice_unref (path_slice); + if (errors_found) + { + gpr_free (r); + return NULL; + } - gpr_ref_init(&r->refs, 1); - gpr_mu_init(&r->mu); - grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); + gpr_ref_init (&r->refs, 1); + gpr_mu_init (&r->mu); + grpc_resolver_init (&r->base, &sockaddr_resolver_vtable); r->subchannel_factory = args->subchannel_factory; - grpc_subchannel_factory_ref(r->subchannel_factory); + grpc_subchannel_factory_ref (r->subchannel_factory); return &r->base; } @@ -363,9 +393,15 @@ static grpc_resolver *sockaddr_create( * FACTORY */ -static void sockaddr_factory_ref(grpc_resolver_factory *factory) {} +static void +sockaddr_factory_ref (grpc_resolver_factory * factory) +{ +} -static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} +static void +sockaddr_factory_unref (grpc_resolver_factory * factory) +{ +} #define DECL_FACTORY(name) \ static grpc_resolver *name##_factory_create_resolver( \ @@ -382,7 +418,6 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} } #ifdef GPR_POSIX_SOCKET -DECL_FACTORY(unix) +DECL_FACTORY (unix) #endif -DECL_FACTORY(ipv4) -DECL_FACTORY(ipv6) + DECL_FACTORY (ipv4) DECL_FACTORY (ipv6) diff --git a/src/core/client_config/resolvers/sockaddr_resolver.h b/src/core/client_config/resolvers/sockaddr_resolver.h index 1b7a18f9c2..c778812e49 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.h +++ b/src/core/client_config/resolvers/sockaddr_resolver.h @@ -38,13 +38,13 @@ #include "src/core/client_config/resolver_factory.h" -grpc_resolver_factory *grpc_ipv4_resolver_factory_create(void); +grpc_resolver_factory *grpc_ipv4_resolver_factory_create (void); -grpc_resolver_factory *grpc_ipv6_resolver_factory_create(void); +grpc_resolver_factory *grpc_ipv6_resolver_factory_create (void); #ifdef GPR_POSIX_SOCKET /** Create a unix resolver factory */ -grpc_resolver_factory *grpc_unix_resolver_factory_create(void); +grpc_resolver_factory *grpc_unix_resolver_factory_create (void); #endif #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_UNIX_RESOLVER_H */ diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index db2bf7552f..3e2040c412 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -50,7 +50,8 @@ /** Zookeeper session expiration time in milliseconds */ #define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000 -typedef struct { +typedef struct +{ /** base class: must be first */ grpc_resolver base; /** refcount */ @@ -87,101 +88,119 @@ typedef struct { int resolved_num; } zookeeper_resolver; -static void zookeeper_destroy(grpc_resolver *r); +static void zookeeper_destroy (grpc_resolver * r); -static void zookeeper_start_resolving_locked(zookeeper_resolver *r); -static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) - GRPC_MUST_USE_RESULT; +static void zookeeper_start_resolving_locked (zookeeper_resolver * r); +static grpc_closure * +zookeeper_maybe_finish_next_locked (zookeeper_resolver * r) + GRPC_MUST_USE_RESULT; -static void zookeeper_shutdown(grpc_resolver *r); -static void zookeeper_channel_saw_error(grpc_resolver *r, - struct sockaddr *failing_address, - int failing_address_len); -static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_closure *on_complete); + static void zookeeper_shutdown (grpc_resolver * r); + static void zookeeper_channel_saw_error (grpc_resolver * r, struct sockaddr *failing_address, int failing_address_len); + static void zookeeper_next (grpc_resolver * r, grpc_client_config ** target_config, grpc_closure * on_complete); -static const grpc_resolver_vtable zookeeper_resolver_vtable = { - zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error, - zookeeper_next}; + static const grpc_resolver_vtable zookeeper_resolver_vtable = { + zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error, + zookeeper_next + }; -static void zookeeper_shutdown(grpc_resolver *resolver) { - zookeeper_resolver *r = (zookeeper_resolver *)resolver; +static void +zookeeper_shutdown (grpc_resolver * resolver) +{ + zookeeper_resolver *r = (zookeeper_resolver *) resolver; grpc_closure *call = NULL; - gpr_mu_lock(&r->mu); - if (r->next_completion != NULL) { - *r->target_config = NULL; - call = r->next_completion; - r->next_completion = NULL; - } - zookeeper_close(r->zookeeper_handle); - gpr_mu_unlock(&r->mu); - if (call != NULL) { - call->cb(call->cb_arg, 1); - } + gpr_mu_lock (&r->mu); + if (r->next_completion != NULL) + { + *r->target_config = NULL; + call = r->next_completion; + r->next_completion = NULL; + } + zookeeper_close (r->zookeeper_handle); + gpr_mu_unlock (&r->mu); + if (call != NULL) + { + call->cb (call->cb_arg, 1); + } } -static void zookeeper_channel_saw_error(grpc_resolver *resolver, - struct sockaddr *sa, int len) { - zookeeper_resolver *r = (zookeeper_resolver *)resolver; - gpr_mu_lock(&r->mu); - if (r->resolving == 0) { - zookeeper_start_resolving_locked(r); - } - gpr_mu_unlock(&r->mu); +static void +zookeeper_channel_saw_error (grpc_resolver * resolver, struct sockaddr *sa, int len) +{ + zookeeper_resolver *r = (zookeeper_resolver *) resolver; + gpr_mu_lock (&r->mu); + if (r->resolving == 0) + { + zookeeper_start_resolving_locked (r); + } + gpr_mu_unlock (&r->mu); } -static void zookeeper_next(grpc_resolver *resolver, - grpc_client_config **target_config, - grpc_closure *on_complete) { - zookeeper_resolver *r = (zookeeper_resolver *)resolver; +static void +zookeeper_next (grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete) +{ + zookeeper_resolver *r = (zookeeper_resolver *) resolver; grpc_closure *call; - gpr_mu_lock(&r->mu); - GPR_ASSERT(r->next_completion == NULL); + gpr_mu_lock (&r->mu); + GPR_ASSERT (r->next_completion == NULL); r->next_completion = on_complete; r->target_config = target_config; - if (r->resolved_version == 0 && r->resolving == 0) { - zookeeper_start_resolving_locked(r); - } else { - call = zookeeper_maybe_finish_next_locked(r); - } - gpr_mu_unlock(&r->mu); - if (call) call->cb(call->cb_arg, 1); + if (r->resolved_version == 0 && r->resolving == 0) + { + zookeeper_start_resolving_locked (r); + } + else + { + call = zookeeper_maybe_finish_next_locked (r); + } + gpr_mu_unlock (&r->mu); + if (call) + call->cb (call->cb_arg, 1); } /** Zookeeper global watcher for connection management TODO: better connection management besides logs */ -static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type, - int state, const char *path, - void *watcher_ctx) { - if (type == ZOO_SESSION_EVENT) { - if (state == ZOO_EXPIRED_SESSION_STATE) { - gpr_log(GPR_ERROR, "Zookeeper session expired"); - } else if (state == ZOO_AUTH_FAILED_STATE) { - gpr_log(GPR_ERROR, "Zookeeper authentication failed"); +static void +zookeeper_global_watcher (zhandle_t * zookeeper_handle, int type, int state, const char *path, void *watcher_ctx) +{ + if (type == ZOO_SESSION_EVENT) + { + if (state == ZOO_EXPIRED_SESSION_STATE) + { + gpr_log (GPR_ERROR, "Zookeeper session expired"); + } + else if (state == ZOO_AUTH_FAILED_STATE) + { + gpr_log (GPR_ERROR, "Zookeeper authentication failed"); + } } - } } /** Zookeeper watcher triggered by changes to watched nodes Once triggered, it tries to resolve again to get updated addresses */ -static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state, - const char *path, void *watcher_ctx) { - if (watcher_ctx != NULL) { - zookeeper_resolver *r = (zookeeper_resolver *)watcher_ctx; - if (state == ZOO_CONNECTED_STATE) { - gpr_mu_lock(&r->mu); - if (r->resolving == 0) { - zookeeper_start_resolving_locked(r); - } - gpr_mu_unlock(&r->mu); +static void +zookeeper_watcher (zhandle_t * zookeeper_handle, int type, int state, const char *path, void *watcher_ctx) +{ + if (watcher_ctx != NULL) + { + zookeeper_resolver *r = (zookeeper_resolver *) watcher_ctx; + if (state == ZOO_CONNECTED_STATE) + { + gpr_mu_lock (&r->mu); + if (r->resolving == 0) + { + zookeeper_start_resolving_locked (r); + } + gpr_mu_unlock (&r->mu); + } } - } } /** Callback function after getting all resolved addresses Creates a subchannel for each address */ -static void zookeeper_on_resolved(void *arg, - grpc_resolved_addresses *addresses) { +static void +zookeeper_on_resolved (void *arg, grpc_resolved_addresses * addresses) +{ zookeeper_resolver *r = arg; grpc_client_config *config = NULL; grpc_subchannel **subchannels; @@ -189,74 +208,77 @@ static void zookeeper_on_resolved(void *arg, grpc_lb_policy *lb_policy; grpc_closure *call; size_t i; - if (addresses != NULL) { - grpc_lb_policy_args lb_policy_args; - config = grpc_client_config_create(); - subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); - for (i = 0; i < addresses->naddrs; i++) { - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)(addresses->addrs[i].addr); - args.addr_len = addresses->addrs[i].len; - subchannels[i] = grpc_subchannel_factory_create_subchannel( - r->subchannel_factory, &args); + if (addresses != NULL) + { + grpc_lb_policy_args lb_policy_args; + config = grpc_client_config_create (); + subchannels = gpr_malloc (sizeof (grpc_subchannel *) * addresses->naddrs); + for (i = 0; i < addresses->naddrs; i++) + { + memset (&args, 0, sizeof (args)); + args.addr = (struct sockaddr *) (addresses->addrs[i].addr); + args.addr_len = addresses->addrs[i].len; + subchannels[i] = grpc_subchannel_factory_create_subchannel (r->subchannel_factory, &args); + } + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy = grpc_lb_policy_create (r->lb_policy_name, &lb_policy_args); + grpc_client_config_set_lb_policy (config, lb_policy); + GRPC_LB_POLICY_UNREF (lb_policy, "construction"); + grpc_resolved_addresses_destroy (addresses); + gpr_free (subchannels); } - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = addresses->naddrs; - lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); - grpc_client_config_set_lb_policy(config, lb_policy); - GRPC_LB_POLICY_UNREF(lb_policy, "construction"); - grpc_resolved_addresses_destroy(addresses); - gpr_free(subchannels); - } - gpr_mu_lock(&r->mu); - GPR_ASSERT(r->resolving == 1); + gpr_mu_lock (&r->mu); + GPR_ASSERT (r->resolving == 1); r->resolving = 0; - if (r->resolved_config != NULL) { - grpc_client_config_unref(r->resolved_config); - } + if (r->resolved_config != NULL) + { + grpc_client_config_unref (r->resolved_config); + } r->resolved_config = config; r->resolved_version++; - call = zookeeper_maybe_finish_next_locked(r); - gpr_mu_unlock(&r->mu); + call = zookeeper_maybe_finish_next_locked (r); + gpr_mu_unlock (&r->mu); - if (call) call->cb(call->cb_arg, 1); + if (call) + call->cb (call->cb_arg, 1); - GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving"); + GRPC_RESOLVER_UNREF (&r->base, "zookeeper-resolving"); } /** Callback function for each DNS resolved address */ -static void zookeeper_dns_resolved(void *arg, - grpc_resolved_addresses *addresses) { +static void +zookeeper_dns_resolved (void *arg, grpc_resolved_addresses * addresses) +{ size_t i; zookeeper_resolver *r = arg; int resolve_done = 0; - gpr_mu_lock(&r->mu); + gpr_mu_lock (&r->mu); r->resolved_num++; - r->resolved_addrs->addrs = - gpr_realloc(r->resolved_addrs->addrs, - sizeof(grpc_resolved_address) * - (r->resolved_addrs->naddrs + addresses->naddrs)); - for (i = 0; i < addresses->naddrs; i++) { - memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr, - addresses->addrs[i].addr, addresses->addrs[i].len); - r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len = - addresses->addrs[i].len; - } + r->resolved_addrs->addrs = gpr_realloc (r->resolved_addrs->addrs, sizeof (grpc_resolved_address) * (r->resolved_addrs->naddrs + addresses->naddrs)); + for (i = 0; i < addresses->naddrs; i++) + { + memcpy (r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr, addresses->addrs[i].addr, addresses->addrs[i].len); + r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len = addresses->addrs[i].len; + } r->resolved_addrs->naddrs += addresses->naddrs; - grpc_resolved_addresses_destroy(addresses); + grpc_resolved_addresses_destroy (addresses); /** Wait for all addresses to be resolved */ resolve_done = (r->resolved_num == r->resolved_total); - gpr_mu_unlock(&r->mu); - if (resolve_done) { - zookeeper_on_resolved(r, r->resolved_addrs); - } + gpr_mu_unlock (&r->mu); + if (resolve_done) + { + zookeeper_on_resolved (r, r->resolved_addrs); + } } /** Parses JSON format address of a zookeeper node */ -static char *zookeeper_parse_address(const char *value, size_t value_len) { +static char * +zookeeper_parse_address (const char *value, size_t value_len) +{ grpc_json *json; grpc_json *cur; const char *host; @@ -264,255 +286,301 @@ static char *zookeeper_parse_address(const char *value, size_t value_len) { char *buffer; char *address = NULL; - buffer = gpr_malloc(value_len); - memcpy(buffer, value, value_len); - json = grpc_json_parse_string_with_len(buffer, value_len); - if (json != NULL) { - host = NULL; - port = NULL; - for (cur = json->child; cur != NULL; cur = cur->next) { - if (!strcmp(cur->key, "host")) { - host = cur->value; - if (port != NULL) { - break; - } - } else if (!strcmp(cur->key, "port")) { - port = cur->value; - if (host != NULL) { - break; - } - } - } - if (host != NULL && port != NULL) { - gpr_asprintf(&address, "%s:%s", host, port); + buffer = gpr_malloc (value_len); + memcpy (buffer, value, value_len); + json = grpc_json_parse_string_with_len (buffer, value_len); + if (json != NULL) + { + host = NULL; + port = NULL; + for (cur = json->child; cur != NULL; cur = cur->next) + { + if (!strcmp (cur->key, "host")) + { + host = cur->value; + if (port != NULL) + { + break; + } + } + else if (!strcmp (cur->key, "port")) + { + port = cur->value; + if (host != NULL) + { + break; + } + } + } + if (host != NULL && port != NULL) + { + gpr_asprintf (&address, "%s:%s", host, port); + } + grpc_json_destroy (json); } - grpc_json_destroy(json); - } - gpr_free(buffer); + gpr_free (buffer); return address; } -static void zookeeper_get_children_node_completion(int rc, const char *value, - int value_len, - const struct Stat *stat, - const void *arg) { +static void +zookeeper_get_children_node_completion (int rc, const char *value, int value_len, const struct Stat *stat, const void *arg) +{ char *address = NULL; - zookeeper_resolver *r = (zookeeper_resolver *)arg; + zookeeper_resolver *r = (zookeeper_resolver *) arg; int resolve_done = 0; - if (rc != 0) { - gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name); - return; - } + if (rc != 0) + { + gpr_log (GPR_ERROR, "Error in getting a child node of %s", r->name); + return; + } - address = zookeeper_parse_address(value, (size_t)value_len); - if (address != NULL) { + address = zookeeper_parse_address (value, (size_t) value_len); + if (address != NULL) + { /** Further resolves address by DNS */ - grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); - gpr_free(address); - } else { - gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name); - gpr_mu_lock(&r->mu); - r->resolved_total--; - resolve_done = (r->resolved_num == r->resolved_total); - gpr_mu_unlock(&r->mu); - if (resolve_done) { - zookeeper_on_resolved(r, r->resolved_addrs); + grpc_resolve_address (address, NULL, zookeeper_dns_resolved, r); + gpr_free (address); + } + else + { + gpr_log (GPR_ERROR, "Error in resolving a child node of %s", r->name); + gpr_mu_lock (&r->mu); + r->resolved_total--; + resolve_done = (r->resolved_num == r->resolved_total); + gpr_mu_unlock (&r->mu); + if (resolve_done) + { + zookeeper_on_resolved (r, r->resolved_addrs); + } } - } } -static void zookeeper_get_children_completion( - int rc, const struct String_vector *children, const void *arg) { +static void +zookeeper_get_children_completion (int rc, const struct String_vector *children, const void *arg) +{ char *path; int status; int i; - zookeeper_resolver *r = (zookeeper_resolver *)arg; + zookeeper_resolver *r = (zookeeper_resolver *) arg; - if (rc != 0) { - gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); - return; - } + if (rc != 0) + { + gpr_log (GPR_ERROR, "Error in getting zookeeper children of %s", r->name); + return; + } - if (children->count == 0) { - gpr_log(GPR_ERROR, "Error in resolving zookeeper address %s", r->name); - return; - } + if (children->count == 0) + { + gpr_log (GPR_ERROR, "Error in resolving zookeeper address %s", r->name); + return; + } - r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + r->resolved_addrs = gpr_malloc (sizeof (grpc_resolved_addresses)); r->resolved_addrs->addrs = NULL; r->resolved_addrs->naddrs = 0; r->resolved_total = children->count; /** TODO: Replace expensive heap allocation with stack if we can get maximum length of zookeeper path */ - for (i = 0; i < children->count; i++) { - gpr_asprintf(&path, "%s/%s", r->name, children->data[i]); - status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r, - zookeeper_get_children_node_completion, r); - gpr_free(path); - if (status != 0) { - gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path); + for (i = 0; i < children->count; i++) + { + gpr_asprintf (&path, "%s/%s", r->name, children->data[i]); + status = zoo_awget (r->zookeeper_handle, path, zookeeper_watcher, r, zookeeper_get_children_node_completion, r); + gpr_free (path); + if (status != 0) + { + gpr_log (GPR_ERROR, "Error in getting zookeeper node %s", path); + } } - } } -static void zookeeper_get_node_completion(int rc, const char *value, - int value_len, - const struct Stat *stat, - const void *arg) { +static void +zookeeper_get_node_completion (int rc, const char *value, int value_len, const struct Stat *stat, const void *arg) +{ int status; char *address = NULL; - zookeeper_resolver *r = (zookeeper_resolver *)arg; + zookeeper_resolver *r = (zookeeper_resolver *) arg; r->resolved_addrs = NULL; r->resolved_total = 0; r->resolved_num = 0; - if (rc != 0) { - gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); - return; - } + if (rc != 0) + { + gpr_log (GPR_ERROR, "Error in getting zookeeper node %s", r->name); + return; + } /** If zookeeper node of path r->name does not have address (i.e. service node), get its children */ - address = zookeeper_parse_address(value, (size_t)value_len); - if (address != NULL) { - r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); - r->resolved_addrs->addrs = NULL; - r->resolved_addrs->naddrs = 0; - r->resolved_total = 1; + address = zookeeper_parse_address (value, (size_t) value_len); + if (address != NULL) + { + r->resolved_addrs = gpr_malloc (sizeof (grpc_resolved_addresses)); + r->resolved_addrs->addrs = NULL; + r->resolved_addrs->naddrs = 0; + r->resolved_total = 1; /** Further resolves address by DNS */ - grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); - gpr_free(address); - return; - } - - status = zoo_awget_children(r->zookeeper_handle, r->name, zookeeper_watcher, - r, zookeeper_get_children_completion, r); - if (status != 0) { - gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); - } + grpc_resolve_address (address, NULL, zookeeper_dns_resolved, r); + gpr_free (address); + return; + } + + status = zoo_awget_children (r->zookeeper_handle, r->name, zookeeper_watcher, r, zookeeper_get_children_completion, r); + if (status != 0) + { + gpr_log (GPR_ERROR, "Error in getting zookeeper children of %s", r->name); + } } -static void zookeeper_resolve_address(zookeeper_resolver *r) { +static void +zookeeper_resolve_address (zookeeper_resolver * r) +{ int status; - status = zoo_awget(r->zookeeper_handle, r->name, zookeeper_watcher, r, - zookeeper_get_node_completion, r); - if (status != 0) { - gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); - } + status = zoo_awget (r->zookeeper_handle, r->name, zookeeper_watcher, r, zookeeper_get_node_completion, r); + if (status != 0) + { + gpr_log (GPR_ERROR, "Error in getting zookeeper node %s", r->name); + } } -static void zookeeper_start_resolving_locked(zookeeper_resolver *r) { - GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving"); - GPR_ASSERT(r->resolving == 0); +static void +zookeeper_start_resolving_locked (zookeeper_resolver * r) +{ + GRPC_RESOLVER_REF (&r->base, "zookeeper-resolving"); + GPR_ASSERT (r->resolving == 0); r->resolving = 1; - zookeeper_resolve_address(r); + zookeeper_resolve_address (r); } -static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) { +static grpc_closure * +zookeeper_maybe_finish_next_locked (zookeeper_resolver * r) +{ grpc_closure *call = NULL; - if (r->next_completion != NULL && - r->resolved_version != r->published_version) { - *r->target_config = r->resolved_config; - if (r->resolved_config != NULL) { - grpc_client_config_ref(r->resolved_config); + if (r->next_completion != NULL && r->resolved_version != r->published_version) + { + *r->target_config = r->resolved_config; + if (r->resolved_config != NULL) + { + grpc_client_config_ref (r->resolved_config); + } + call = r->next_completion; + r->next_completion = NULL; + r->published_version = r->resolved_version; } - call = r->next_completion; - r->next_completion = NULL; - r->published_version = r->resolved_version; - } return call; } -static void zookeeper_destroy(grpc_resolver *gr) { - zookeeper_resolver *r = (zookeeper_resolver *)gr; - gpr_mu_destroy(&r->mu); - if (r->resolved_config != NULL) { - grpc_client_config_unref(r->resolved_config); - } - grpc_subchannel_factory_unref(r->subchannel_factory); - gpr_free(r->name); - gpr_free(r->lb_policy_name); - gpr_free(r); +static void +zookeeper_destroy (grpc_resolver * gr) +{ + zookeeper_resolver *r = (zookeeper_resolver *) gr; + gpr_mu_destroy (&r->mu); + if (r->resolved_config != NULL) + { + grpc_client_config_unref (r->resolved_config); + } + grpc_subchannel_factory_unref (r->subchannel_factory); + gpr_free (r->name); + gpr_free (r->lb_policy_name); + gpr_free (r); } -static grpc_resolver *zookeeper_create(grpc_resolver_args *args, - const char *lb_policy_name) { +static grpc_resolver * +zookeeper_create (grpc_resolver_args * args, const char *lb_policy_name) +{ zookeeper_resolver *r; size_t length; char *path = args->uri->path; - if (0 == strcmp(args->uri->authority, "")) { - gpr_log(GPR_ERROR, "No authority specified in zookeeper uri"); - return NULL; - } + if (0 == strcmp (args->uri->authority, "")) + { + gpr_log (GPR_ERROR, "No authority specified in zookeeper uri"); + return NULL; + } /** Removes the trailing slash if exists */ - length = strlen(path); - if (length > 1 && path[length - 1] == '/') { - path[length - 1] = 0; - } - - r = gpr_malloc(sizeof(zookeeper_resolver)); - memset(r, 0, sizeof(*r)); - gpr_ref_init(&r->refs, 1); - gpr_mu_init(&r->mu); - grpc_resolver_init(&r->base, &zookeeper_resolver_vtable); - r->name = gpr_strdup(path); + length = strlen (path); + if (length > 1 && path[length - 1] == '/') + { + path[length - 1] = 0; + } + + r = gpr_malloc (sizeof (zookeeper_resolver)); + memset (r, 0, sizeof (*r)); + gpr_ref_init (&r->refs, 1); + gpr_mu_init (&r->mu); + grpc_resolver_init (&r->base, &zookeeper_resolver_vtable); + r->name = gpr_strdup (path); r->subchannel_factory = args->subchannel_factory; - grpc_subchannel_factory_ref(r->subchannel_factory); + grpc_subchannel_factory_ref (r->subchannel_factory); - r->lb_policy_name = gpr_strdup(lb_policy_name); + r->lb_policy_name = gpr_strdup (lb_policy_name); /** Initializes zookeeper client */ - zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); - r->zookeeper_handle = - zookeeper_init(args->uri->authority, zookeeper_global_watcher, - GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0); - if (r->zookeeper_handle == NULL) { - gpr_log(GPR_ERROR, "Unable to connect to zookeeper server"); - return NULL; - } + zoo_set_debug_level (ZOO_LOG_LEVEL_WARN); + r->zookeeper_handle = zookeeper_init (args->uri->authority, zookeeper_global_watcher, GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0); + if (r->zookeeper_handle == NULL) + { + gpr_log (GPR_ERROR, "Unable to connect to zookeeper server"); + return NULL; + } return &r->base; } -static void zookeeper_plugin_init() { - grpc_register_resolver_type(grpc_zookeeper_resolver_factory_create()); +static void +zookeeper_plugin_init () +{ + grpc_register_resolver_type (grpc_zookeeper_resolver_factory_create ()); } -void grpc_zookeeper_register() { - grpc_register_plugin(zookeeper_plugin_init, NULL); +void +grpc_zookeeper_register () +{ + grpc_register_plugin (zookeeper_plugin_init, NULL); } /* * FACTORY */ -static void zookeeper_factory_ref(grpc_resolver_factory *factory) {} +static void +zookeeper_factory_ref (grpc_resolver_factory * factory) +{ +} -static void zookeeper_factory_unref(grpc_resolver_factory *factory) {} +static void +zookeeper_factory_unref (grpc_resolver_factory * factory) +{ +} -static char *zookeeper_factory_get_default_hostname( - grpc_resolver_factory *factory, grpc_uri *uri) { +static char * +zookeeper_factory_get_default_hostname (grpc_resolver_factory * factory, grpc_uri * uri) +{ return NULL; } -static grpc_resolver *zookeeper_factory_create_resolver( - grpc_resolver_factory *factory, grpc_resolver_args *args) { - return zookeeper_create(args, "pick_first"); +static grpc_resolver * +zookeeper_factory_create_resolver (grpc_resolver_factory * factory, grpc_resolver_args * args) +{ + return zookeeper_create (args, "pick_first"); } static const grpc_resolver_factory_vtable zookeeper_factory_vtable = { - zookeeper_factory_ref, zookeeper_factory_unref, - zookeeper_factory_create_resolver, zookeeper_factory_get_default_hostname, - "zookeeper"}; + zookeeper_factory_ref, zookeeper_factory_unref, + zookeeper_factory_create_resolver, zookeeper_factory_get_default_hostname, + "zookeeper" +}; + static grpc_resolver_factory zookeeper_resolver_factory = { - &zookeeper_factory_vtable}; + &zookeeper_factory_vtable +}; -grpc_resolver_factory *grpc_zookeeper_resolver_factory_create() { +grpc_resolver_factory * +grpc_zookeeper_resolver_factory_create () +{ return &zookeeper_resolver_factory; } diff --git a/src/core/client_config/resolvers/zookeeper_resolver.h b/src/core/client_config/resolvers/zookeeper_resolver.h index a6f002dd6d..86b4480b15 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.h +++ b/src/core/client_config/resolvers/zookeeper_resolver.h @@ -37,6 +37,6 @@ #include "src/core/client_config/resolver_factory.h" /** Create a zookeeper resolver factory */ -grpc_resolver_factory *grpc_zookeeper_resolver_factory_create(void); +grpc_resolver_factory *grpc_zookeeper_resolver_factory_create (void); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H */ diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index d41bf8f566..778d1e8b50 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -50,7 +50,8 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -typedef struct { +typedef struct +{ /* all fields protected by subchannel->mu */ /** refcount */ int refs; @@ -58,14 +59,16 @@ typedef struct { grpc_subchannel *subchannel; } connection; -typedef struct { +typedef struct +{ grpc_closure closure; size_t version; grpc_subchannel *subchannel; grpc_connectivity_state connectivity_state; } state_watcher; -typedef struct waiting_for_connect { +typedef struct waiting_for_connect +{ struct waiting_for_connect *next; grpc_closure *notify; grpc_pollset *pollset; @@ -74,7 +77,8 @@ typedef struct waiting_for_connect { grpc_closure continuation; } waiting_for_connect; -struct grpc_subchannel { +struct grpc_subchannel +{ grpc_connector *connector; /** non-transport related channel filters */ @@ -135,7 +139,8 @@ struct grpc_subchannel { gpr_uint32 random; }; -struct grpc_subchannel_call { +struct grpc_subchannel_call +{ connection *connection; gpr_refcount refs; }; @@ -143,26 +148,19 @@ struct grpc_subchannel_call { #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) -static grpc_subchannel_call *create_call(connection *con, - grpc_closure_list *closure_list); -static void connectivity_state_changed_locked(grpc_subchannel *c, - const char *reason, - grpc_closure_list *closure_list); -static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); -static gpr_timespec compute_connect_deadline(grpc_subchannel *c); -static void subchannel_connected(void *subchannel, int iomgr_success, - grpc_closure_list *closure_list); - -static void subchannel_ref_locked( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -static int subchannel_unref_locked( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; -static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -static grpc_subchannel *connection_unref_locked( - connection *c, grpc_closure_list *closure_list - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; -static void subchannel_destroy(grpc_subchannel *c, - grpc_closure_list *closure_list); +static grpc_subchannel_call *create_call (connection * con, grpc_closure_list * closure_list); +static void connectivity_state_changed_locked (grpc_subchannel * c, const char *reason, grpc_closure_list * closure_list); +static grpc_connectivity_state compute_connectivity_locked (grpc_subchannel * c); +static gpr_timespec compute_connect_deadline (grpc_subchannel * c); +static void subchannel_connected (void *subchannel, int iomgr_success, grpc_closure_list * closure_list); + +static void subchannel_ref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +static int +subchannel_unref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) + GRPC_MUST_USE_RESULT; + static void connection_ref_locked (connection * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + static grpc_subchannel *connection_unref_locked (connection * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; + static void subchannel_destroy (grpc_subchannel * c, grpc_closure_list * closure_list); #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG #define SUBCHANNEL_REF_LOCKED(p, r) \ @@ -198,30 +196,34 @@ static void subchannel_destroy(grpc_subchannel *c, * connection implementation */ -static void connection_destroy(connection *c, grpc_closure_list *closure_list) { - GPR_ASSERT(c->refs == 0); - grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c), closure_list); - gpr_free(c); + static void connection_destroy (connection * c, grpc_closure_list * closure_list) +{ + GPR_ASSERT (c->refs == 0); + grpc_channel_stack_destroy (CHANNEL_STACK_FROM_CONNECTION (c), closure_list); + gpr_free (c); } -static void connection_ref_locked( - connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - REF_LOG("CONNECTION", c); - subchannel_ref_locked(c->subchannel REF_PASS_ARGS); +static void +connection_ref_locked (connection * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ + REF_LOG ("CONNECTION", c); + subchannel_ref_locked (c->subchannel REF_PASS_ARGS); ++c->refs; } -static grpc_subchannel *connection_unref_locked( - connection *c, - grpc_closure_list *closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +static grpc_subchannel * +connection_unref_locked (connection * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ grpc_subchannel *destroy = NULL; - UNREF_LOG("CONNECTION", c); - if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) { - destroy = c->subchannel; - } - if (--c->refs == 0 && c->subchannel->active != c) { - connection_destroy(c, closure_list); - } + UNREF_LOG ("CONNECTION", c); + if (subchannel_unref_locked (c->subchannel REF_PASS_ARGS)) + { + destroy = c->subchannel; + } + if (--c->refs == 0 && c->subchannel->active != c) + { + connection_destroy (c, closure_list); + } return destroy; } @@ -229,241 +231,261 @@ static grpc_subchannel *connection_unref_locked( * grpc_subchannel implementation */ -static void subchannel_ref_locked( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - REF_LOG("SUBCHANNEL", c); +static void +subchannel_ref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ + REF_LOG ("SUBCHANNEL", c); ++c->refs; } -static int subchannel_unref_locked( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - UNREF_LOG("SUBCHANNEL", c); +static int +subchannel_unref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ + UNREF_LOG ("SUBCHANNEL", c); return --c->refs == 0; } -void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_mu_lock(&c->mu); - subchannel_ref_locked(c REF_PASS_ARGS); - gpr_mu_unlock(&c->mu); +void +grpc_subchannel_ref (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ + gpr_mu_lock (&c->mu); + subchannel_ref_locked (c REF_PASS_ARGS); + gpr_mu_unlock (&c->mu); } -void grpc_subchannel_unref(grpc_subchannel *c, - grpc_closure_list *closure_list - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void +grpc_subchannel_unref (grpc_subchannel * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ int destroy; - gpr_mu_lock(&c->mu); - destroy = subchannel_unref_locked(c REF_PASS_ARGS); - gpr_mu_unlock(&c->mu); - if (destroy) subchannel_destroy(c, closure_list); -} - -static void subchannel_destroy(grpc_subchannel *c, - grpc_closure_list *closure_list) { - if (c->active != NULL) { - connection_destroy(c->active, closure_list); - } - gpr_free(c->filters); - grpc_channel_args_destroy(c->args); - gpr_free(c->addr); - grpc_mdctx_unref(c->mdctx); - grpc_connectivity_state_destroy(&c->state_tracker, closure_list); - grpc_connector_unref(c->connector, closure_list); - gpr_free(c); + gpr_mu_lock (&c->mu); + destroy = subchannel_unref_locked (c REF_PASS_ARGS); + gpr_mu_unlock (&c->mu); + if (destroy) + subchannel_destroy (c, closure_list); +} + +static void +subchannel_destroy (grpc_subchannel * c, grpc_closure_list * closure_list) +{ + if (c->active != NULL) + { + connection_destroy (c->active, closure_list); + } + gpr_free (c->filters); + grpc_channel_args_destroy (c->args); + gpr_free (c->addr); + grpc_mdctx_unref (c->mdctx); + grpc_connectivity_state_destroy (&c->state_tracker, closure_list); + grpc_connector_unref (c->connector, closure_list); + gpr_free (c); } -void grpc_subchannel_add_interested_party(grpc_subchannel *c, - grpc_pollset *pollset, - grpc_closure_list *closure_list) { - grpc_pollset_set_add_pollset(c->pollset_set, pollset, closure_list); +void +grpc_subchannel_add_interested_party (grpc_subchannel * c, grpc_pollset * pollset, grpc_closure_list * closure_list) +{ + grpc_pollset_set_add_pollset (c->pollset_set, pollset, closure_list); } -void grpc_subchannel_del_interested_party(grpc_subchannel *c, - grpc_pollset *pollset, - grpc_closure_list *closure_list) { - grpc_pollset_set_del_pollset(c->pollset_set, pollset, closure_list); +void +grpc_subchannel_del_interested_party (grpc_subchannel * c, grpc_pollset * pollset, grpc_closure_list * closure_list) +{ + grpc_pollset_set_del_pollset (c->pollset_set, pollset, closure_list); } -static gpr_uint32 random_seed() { - return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); +static gpr_uint32 +random_seed () +{ + return (gpr_uint32) (gpr_time_to_millis (gpr_now (GPR_CLOCK_MONOTONIC))); } -grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, - grpc_subchannel_args *args) { - grpc_subchannel *c = gpr_malloc(sizeof(*c)); - grpc_channel_element *parent_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(args->master)); - memset(c, 0, sizeof(*c)); +grpc_subchannel * +grpc_subchannel_create (grpc_connector * connector, grpc_subchannel_args * args) +{ + grpc_subchannel *c = gpr_malloc (sizeof (*c)); + grpc_channel_element *parent_elem = grpc_channel_stack_last_element (grpc_channel_get_channel_stack (args->master)); + memset (c, 0, sizeof (*c)); c->refs = 1; c->connector = connector; - grpc_connector_ref(c->connector); + grpc_connector_ref (c->connector); c->num_filters = args->filter_count; - c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters); - memcpy(c->filters, args->filters, - sizeof(grpc_channel_filter *) * c->num_filters); - c->addr = gpr_malloc(args->addr_len); - memcpy(c->addr, args->addr, args->addr_len); + c->filters = gpr_malloc (sizeof (grpc_channel_filter *) * c->num_filters); + memcpy (c->filters, args->filters, sizeof (grpc_channel_filter *) * c->num_filters); + c->addr = gpr_malloc (args->addr_len); + memcpy (c->addr, args->addr, args->addr_len); c->addr_len = args->addr_len; - c->args = grpc_channel_args_copy(args->args); + c->args = grpc_channel_args_copy (args->args); c->mdctx = args->mdctx; c->master = args->master; - c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); - c->random = random_seed(); - grpc_mdctx_ref(c->mdctx); - grpc_closure_init(&c->connected, subchannel_connected, c); - grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, - "subchannel"); - gpr_mu_init(&c->mu); + c->pollset_set = grpc_client_channel_get_connecting_pollset_set (parent_elem); + c->random = random_seed (); + grpc_mdctx_ref (c->mdctx); + grpc_closure_init (&c->connected, subchannel_connected, c); + grpc_connectivity_state_init (&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); + gpr_mu_init (&c->mu); return c; } -static void continue_connect(grpc_subchannel *c, - grpc_closure_list *closure_list) { +static void +continue_connect (grpc_subchannel * c, grpc_closure_list * closure_list) +{ grpc_connect_in_args args; args.interested_parties = c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; - args.deadline = compute_connect_deadline(c); + args.deadline = compute_connect_deadline (c); args.channel_args = c->args; - grpc_connector_connect(c->connector, &args, &c->connecting_result, - &c->connected, closure_list); + grpc_connector_connect (c->connector, &args, &c->connecting_result, &c->connected, closure_list); } -static void start_connect(grpc_subchannel *c, grpc_closure_list *closure_list) { - c->backoff_delta = gpr_time_from_seconds( - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); - c->next_attempt = - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); - continue_connect(c, closure_list); +static void +start_connect (grpc_subchannel * c, grpc_closure_list * closure_list) +{ + c->backoff_delta = gpr_time_from_seconds (GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); + c->next_attempt = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), c->backoff_delta); + continue_connect (c, closure_list); } -static void continue_creating_call(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +continue_creating_call (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ waiting_for_connect *w4c = arg; - grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset, - closure_list); - grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target, - w4c->notify, closure_list); - GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect", closure_list); - gpr_free(w4c); -} - -void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, - grpc_subchannel_call **target, - grpc_closure *notify, - grpc_closure_list *closure_list) { - connection *con; - gpr_mu_lock(&c->mu); - if (c->active != NULL) { - con = c->active; - CONNECTION_REF_LOCKED(con, "call"); - gpr_mu_unlock(&c->mu); - - *target = create_call(con, closure_list); - notify->cb(notify->cb_arg, 1, closure_list); - } else { - waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); - w4c->next = c->waiting; - w4c->notify = notify; - w4c->pollset = pollset; - w4c->target = target; - w4c->subchannel = c; - /* released when clearing w4c */ - SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect"); - grpc_closure_init(&w4c->continuation, continue_creating_call, w4c); - c->waiting = w4c; - grpc_subchannel_add_interested_party(c, pollset, closure_list); - if (!c->connecting) { - c->connecting = 1; - connectivity_state_changed_locked(c, "create_call", closure_list); - /* released by connection */ - SUBCHANNEL_REF_LOCKED(c, "connecting"); - GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); - gpr_mu_unlock(&c->mu); + grpc_subchannel_del_interested_party (w4c->subchannel, w4c->pollset, closure_list); + grpc_subchannel_create_call (w4c->subchannel, w4c->pollset, w4c->target, w4c->notify, closure_list); + GRPC_SUBCHANNEL_UNREF (w4c->subchannel, "waiting_for_connect", closure_list); + gpr_free (w4c); +} - start_connect(c, closure_list); - } else { - gpr_mu_unlock(&c->mu); +void +grpc_subchannel_create_call (grpc_subchannel * c, grpc_pollset * pollset, grpc_subchannel_call ** target, grpc_closure * notify, grpc_closure_list * closure_list) +{ + connection *con; + gpr_mu_lock (&c->mu); + if (c->active != NULL) + { + con = c->active; + CONNECTION_REF_LOCKED (con, "call"); + gpr_mu_unlock (&c->mu); + + *target = create_call (con, closure_list); + notify->cb (notify->cb_arg, 1, closure_list); + } + else + { + waiting_for_connect *w4c = gpr_malloc (sizeof (*w4c)); + w4c->next = c->waiting; + w4c->notify = notify; + w4c->pollset = pollset; + w4c->target = target; + w4c->subchannel = c; + /* released when clearing w4c */ + SUBCHANNEL_REF_LOCKED (c, "waiting_for_connect"); + grpc_closure_init (&w4c->continuation, continue_creating_call, w4c); + c->waiting = w4c; + grpc_subchannel_add_interested_party (c, pollset, closure_list); + if (!c->connecting) + { + c->connecting = 1; + connectivity_state_changed_locked (c, "create_call", closure_list); + /* released by connection */ + SUBCHANNEL_REF_LOCKED (c, "connecting"); + GRPC_CHANNEL_INTERNAL_REF (c->master, "connecting"); + gpr_mu_unlock (&c->mu); + + start_connect (c, closure_list); + } + else + { + gpr_mu_unlock (&c->mu); + } } - } } -grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { +grpc_connectivity_state +grpc_subchannel_check_connectivity (grpc_subchannel * c) +{ grpc_connectivity_state state; - gpr_mu_lock(&c->mu); - state = grpc_connectivity_state_check(&c->state_tracker); - gpr_mu_unlock(&c->mu); + gpr_mu_lock (&c->mu); + state = grpc_connectivity_state_check (&c->state_tracker); + gpr_mu_unlock (&c->mu); return state; } -void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, - grpc_connectivity_state *state, - grpc_closure *notify, - grpc_closure_list *closure_list) { +void +grpc_subchannel_notify_on_state_change (grpc_subchannel * c, grpc_connectivity_state * state, grpc_closure * notify, grpc_closure_list * closure_list) +{ int do_connect = 0; - gpr_mu_lock(&c->mu); - if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, - notify, closure_list)) { - do_connect = 1; - c->connecting = 1; - /* released by connection */ - SUBCHANNEL_REF_LOCKED(c, "connecting"); - GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); - connectivity_state_changed_locked(c, "state_change", closure_list); - } - gpr_mu_unlock(&c->mu); - - if (do_connect) { - start_connect(c, closure_list); - } -} - -void grpc_subchannel_process_transport_op(grpc_subchannel *c, - grpc_transport_op *op, - grpc_closure_list *closure_list) { + gpr_mu_lock (&c->mu); + if (grpc_connectivity_state_notify_on_state_change (&c->state_tracker, state, notify, closure_list)) + { + do_connect = 1; + c->connecting = 1; + /* released by connection */ + SUBCHANNEL_REF_LOCKED (c, "connecting"); + GRPC_CHANNEL_INTERNAL_REF (c->master, "connecting"); + connectivity_state_changed_locked (c, "state_change", closure_list); + } + gpr_mu_unlock (&c->mu); + + if (do_connect) + { + start_connect (c, closure_list); + } +} + +void +grpc_subchannel_process_transport_op (grpc_subchannel * c, grpc_transport_op * op, grpc_closure_list * closure_list) +{ connection *con = NULL; grpc_subchannel *destroy; int cancel_alarm = 0; - gpr_mu_lock(&c->mu); - if (c->active != NULL) { - con = c->active; - CONNECTION_REF_LOCKED(con, "transport-op"); - } - if (op->disconnect) { - c->disconnected = 1; - connectivity_state_changed_locked(c, "disconnect", closure_list); - if (c->have_alarm) { - cancel_alarm = 1; + gpr_mu_lock (&c->mu); + if (c->active != NULL) + { + con = c->active; + CONNECTION_REF_LOCKED (con, "transport-op"); + } + if (op->disconnect) + { + c->disconnected = 1; + connectivity_state_changed_locked (c, "disconnect", closure_list); + if (c->have_alarm) + { + cancel_alarm = 1; + } } - } - gpr_mu_unlock(&c->mu); - - if (con != NULL) { - grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_element *top_elem = - grpc_channel_stack_element(channel_stack, 0); - top_elem->filter->start_transport_op(top_elem, op, closure_list); - - gpr_mu_lock(&c->mu); - destroy = CONNECTION_UNREF_LOCKED(con, "transport-op", closure_list); - gpr_mu_unlock(&c->mu); - if (destroy) { - subchannel_destroy(destroy, closure_list); + gpr_mu_unlock (&c->mu); + + if (con != NULL) + { + grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION (con); + grpc_channel_element *top_elem = grpc_channel_stack_element (channel_stack, 0); + top_elem->filter->start_transport_op (top_elem, op, closure_list); + + gpr_mu_lock (&c->mu); + destroy = CONNECTION_UNREF_LOCKED (con, "transport-op", closure_list); + gpr_mu_unlock (&c->mu); + if (destroy) + { + subchannel_destroy (destroy, closure_list); + } } - } - if (cancel_alarm) { - grpc_alarm_cancel(&c->alarm, closure_list); - } + if (cancel_alarm) + { + grpc_alarm_cancel (&c->alarm, closure_list); + } - if (op->disconnect) { - grpc_connector_shutdown(c->connector, closure_list); - } + if (op->disconnect) + { + grpc_connector_shutdown (c->connector, closure_list); + } } -static void on_state_changed(void *p, int iomgr_success, - grpc_closure_list *closure_list) { +static void +on_state_changed (void *p, int iomgr_success, grpc_closure_list * closure_list) +{ state_watcher *sw = p; grpc_subchannel *c = sw->subchannel; gpr_mu *mu = &c->mu; @@ -472,57 +494,59 @@ static void on_state_changed(void *p, int iomgr_success, grpc_channel_element *elem; connection *destroy_connection = NULL; - gpr_mu_lock(mu); + gpr_mu_lock (mu); /* if we failed or there is a version number mismatch, just leave this closure */ - if (!iomgr_success || sw->subchannel->active_version != sw->version) { - goto done; - } + if (!iomgr_success || sw->subchannel->active_version != sw->version) + { + goto done; + } - switch (sw->connectivity_state) { + switch (sw->connectivity_state) + { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_READY: case GRPC_CHANNEL_IDLE: /* all is still good: keep watching */ - memset(&op, 0, sizeof(op)); + memset (&op, 0, sizeof (op)); op.connectivity_state = &sw->connectivity_state; op.on_connectivity_state_change = &sw->closure; - elem = grpc_channel_stack_element( - CHANNEL_STACK_FROM_CONNECTION(c->active), 0); - elem->filter->start_transport_op(elem, &op, closure_list); + elem = grpc_channel_stack_element (CHANNEL_STACK_FROM_CONNECTION (c->active), 0); + elem->filter->start_transport_op (elem, &op, closure_list); /* early out */ - gpr_mu_unlock(mu); + gpr_mu_unlock (mu); return; case GRPC_CHANNEL_FATAL_FAILURE: case GRPC_CHANNEL_TRANSIENT_FAILURE: /* things have gone wrong, deactivate and enter idle */ - if (sw->subchannel->active->refs == 0) { - destroy_connection = sw->subchannel->active; - } + if (sw->subchannel->active->refs == 0) + { + destroy_connection = sw->subchannel->active; + } sw->subchannel->active = NULL; - grpc_connectivity_state_set( - &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE - : GRPC_CHANNEL_TRANSIENT_FAILURE, - "connection_failed", closure_list); + grpc_connectivity_state_set (&c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE : GRPC_CHANNEL_TRANSIENT_FAILURE, "connection_failed", closure_list); break; - } + } done: - connectivity_state_changed_locked(c, "transport_state_changed", closure_list); - destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); - gpr_free(sw); - gpr_mu_unlock(mu); - if (destroy) { - subchannel_destroy(c, closure_list); - } - if (destroy_connection != NULL) { - connection_destroy(destroy_connection, closure_list); - } -} - -static void publish_transport(grpc_subchannel *c, - grpc_closure_list *closure_list) { + connectivity_state_changed_locked (c, "transport_state_changed", closure_list); + destroy = SUBCHANNEL_UNREF_LOCKED (c, "state_watcher"); + gpr_free (sw); + gpr_mu_unlock (mu); + if (destroy) + { + subchannel_destroy (c, closure_list); + } + if (destroy_connection != NULL) + { + connection_destroy (destroy_connection, closure_list); + } +} + +static void +publish_transport (grpc_subchannel * c, grpc_closure_list * closure_list) +{ size_t channel_stack_size; connection *con; grpc_channel_stack *stk; @@ -536,46 +560,46 @@ static void publish_transport(grpc_subchannel *c, /* build final filter list */ num_filters = c->num_filters + c->connecting_result.num_filters + 1; - filters = gpr_malloc(sizeof(*filters) * num_filters); - memcpy(filters, c->filters, sizeof(*filters) * c->num_filters); - memcpy(filters + c->num_filters, c->connecting_result.filters, - sizeof(*filters) * c->connecting_result.num_filters); + filters = gpr_malloc (sizeof (*filters) * num_filters); + memcpy (filters, c->filters, sizeof (*filters) * c->num_filters); + memcpy (filters + c->num_filters, c->connecting_result.filters, sizeof (*filters) * c->connecting_result.num_filters); filters[num_filters - 1] = &grpc_connected_channel_filter; /* construct channel stack */ - channel_stack_size = grpc_channel_stack_size(filters, num_filters); - con = gpr_malloc(sizeof(connection) + channel_stack_size); - stk = (grpc_channel_stack *)(con + 1); + channel_stack_size = grpc_channel_stack_size (filters, num_filters); + con = gpr_malloc (sizeof (connection) + channel_stack_size); + stk = (grpc_channel_stack *) (con + 1); con->refs = 0; con->subchannel = c; - grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx, - stk, closure_list); - grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); - gpr_free(c->connecting_result.filters); - memset(&c->connecting_result, 0, sizeof(c->connecting_result)); + grpc_channel_stack_init (filters, num_filters, c->master, c->args, c->mdctx, stk, closure_list); + grpc_connected_channel_bind_transport (stk, c->connecting_result.transport); + gpr_free (c->connecting_result.filters); + memset (&c->connecting_result, 0, sizeof (c->connecting_result)); /* initialize state watcher */ - sw = gpr_malloc(sizeof(*sw)); - grpc_closure_init(&sw->closure, on_state_changed, sw); + sw = gpr_malloc (sizeof (*sw)); + grpc_closure_init (&sw->closure, on_state_changed, sw); sw->subchannel = c; sw->connectivity_state = GRPC_CHANNEL_READY; - gpr_mu_lock(&c->mu); + gpr_mu_lock (&c->mu); - if (c->disconnected) { - gpr_mu_unlock(&c->mu); - gpr_free(sw); - gpr_free(filters); - grpc_channel_stack_destroy(stk, closure_list); - GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list); - GRPC_SUBCHANNEL_UNREF(c, "connecting", closure_list); - return; - } + if (c->disconnected) + { + gpr_mu_unlock (&c->mu); + gpr_free (sw); + gpr_free (filters); + grpc_channel_stack_destroy (stk, closure_list); + GRPC_CHANNEL_INTERNAL_UNREF (c->master, "connecting", closure_list); + GRPC_SUBCHANNEL_UNREF (c, "connecting", closure_list); + return; + } /* publish */ - if (c->active != NULL && c->active->refs == 0) { - destroy_connection = c->active; - } + if (c->active != NULL && c->active->refs == 0) + { + destroy_connection = c->active; + } c->active = con; c->active_version++; sw->version = c->active_version; @@ -583,184 +607,202 @@ static void publish_transport(grpc_subchannel *c, /* watch for changes; subchannel ref for connecting is donated to the state watcher */ - memset(&op, 0, sizeof(op)); + memset (&op, 0, sizeof (op)); op.connectivity_state = &sw->connectivity_state; op.on_connectivity_state_change = &sw->closure; op.bind_pollset_set = c->pollset_set; - SUBCHANNEL_REF_LOCKED(c, "state_watcher"); - GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list); - GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting")); - elem = - grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0); - elem->filter->start_transport_op(elem, &op, closure_list); + SUBCHANNEL_REF_LOCKED (c, "state_watcher"); + GRPC_CHANNEL_INTERNAL_UNREF (c->master, "connecting", closure_list); + GPR_ASSERT (!SUBCHANNEL_UNREF_LOCKED (c, "connecting")); + elem = grpc_channel_stack_element (CHANNEL_STACK_FROM_CONNECTION (c->active), 0); + elem->filter->start_transport_op (elem, &op, closure_list); /* signal completion */ - connectivity_state_changed_locked(c, "connected", closure_list); + connectivity_state_changed_locked (c, "connected", closure_list); w4c = c->waiting; c->waiting = NULL; - gpr_mu_unlock(&c->mu); + gpr_mu_unlock (&c->mu); - while (w4c != NULL) { - waiting_for_connect *next = w4c->next; - grpc_closure_list_add(closure_list, &w4c->continuation, 1); - w4c = next; - } + while (w4c != NULL) + { + waiting_for_connect *next = w4c->next; + grpc_closure_list_add (closure_list, &w4c->continuation, 1); + w4c = next; + } - gpr_free(filters); + gpr_free (filters); - if (destroy_connection != NULL) { - connection_destroy(destroy_connection, closure_list); - } + if (destroy_connection != NULL) + { + connection_destroy (destroy_connection, closure_list); + } } /* Generate a random number between 0 and 1. */ -static double generate_uniform_random_number(grpc_subchannel *c) { - c->random = (1103515245 * c->random + 12345) % ((gpr_uint32)1 << 31); - return c->random / (double)((gpr_uint32)1 << 31); +static double +generate_uniform_random_number (grpc_subchannel * c) +{ + c->random = (1103515245 * c->random + 12345) % ((gpr_uint32) 1 << 31); + return c->random / (double) ((gpr_uint32) 1 << 31); } /* Update backoff_delta and next_attempt in subchannel */ -static void update_reconnect_parameters(grpc_subchannel *c) { +static void +update_reconnect_parameters (grpc_subchannel * c) +{ gpr_int32 backoff_delta_millis, jitter; - gpr_int32 max_backoff_millis = - GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + gpr_int32 max_backoff_millis = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; double jitter_range; - backoff_delta_millis = - (gpr_int32)(gpr_time_to_millis(c->backoff_delta) * - GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); - if (backoff_delta_millis > max_backoff_millis) { - backoff_delta_millis = max_backoff_millis; - } - c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN); - c->next_attempt = - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); + backoff_delta_millis = (gpr_int32) (gpr_time_to_millis (c->backoff_delta) * GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); + if (backoff_delta_millis > max_backoff_millis) + { + backoff_delta_millis = max_backoff_millis; + } + c->backoff_delta = gpr_time_from_millis (backoff_delta_millis, GPR_TIMESPAN); + c->next_attempt = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), c->backoff_delta); jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis; - jitter = - (gpr_int32)((2 * generate_uniform_random_number(c) - 1) * jitter_range); - c->next_attempt = - gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); + jitter = (gpr_int32) ((2 * generate_uniform_random_number (c) - 1) * jitter_range); + c->next_attempt = gpr_time_add (c->next_attempt, gpr_time_from_millis (jitter, GPR_TIMESPAN)); } -static void on_alarm(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +on_alarm (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ grpc_subchannel *c = arg; - gpr_mu_lock(&c->mu); + gpr_mu_lock (&c->mu); c->have_alarm = 0; - if (c->disconnected) { - iomgr_success = 0; - } - connectivity_state_changed_locked(c, "alarm", closure_list); - gpr_mu_unlock(&c->mu); - if (iomgr_success) { - update_reconnect_parameters(c); - continue_connect(c, closure_list); - } else { - GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list); - GRPC_SUBCHANNEL_UNREF(c, "connecting", closure_list); - } -} - -static void subchannel_connected(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { + if (c->disconnected) + { + iomgr_success = 0; + } + connectivity_state_changed_locked (c, "alarm", closure_list); + gpr_mu_unlock (&c->mu); + if (iomgr_success) + { + update_reconnect_parameters (c); + continue_connect (c, closure_list); + } + else + { + GRPC_CHANNEL_INTERNAL_UNREF (c->master, "connecting", closure_list); + GRPC_SUBCHANNEL_UNREF (c, "connecting", closure_list); + } +} + +static void +subchannel_connected (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ grpc_subchannel *c = arg; - if (c->connecting_result.transport != NULL) { - publish_transport(c, closure_list); - } else { - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_mu_lock(&c->mu); - GPR_ASSERT(!c->have_alarm); - c->have_alarm = 1; - connectivity_state_changed_locked(c, "connect_failed", closure_list); - grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now, closure_list); - gpr_mu_unlock(&c->mu); - } -} - -static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { - gpr_timespec current_deadline = - gpr_time_add(c->next_attempt, c->backoff_delta); - gpr_timespec min_deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS, - GPR_TIMESPAN)); - return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline - : min_deadline; -} - -static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { - if (c->disconnected) { - return GRPC_CHANNEL_FATAL_FAILURE; - } - if (c->connecting) { - if (c->have_alarm) { - return GRPC_CHANNEL_TRANSIENT_FAILURE; + if (c->connecting_result.transport != NULL) + { + publish_transport (c, closure_list); + } + else + { + gpr_timespec now = gpr_now (GPR_CLOCK_MONOTONIC); + gpr_mu_lock (&c->mu); + GPR_ASSERT (!c->have_alarm); + c->have_alarm = 1; + connectivity_state_changed_locked (c, "connect_failed", closure_list); + grpc_alarm_init (&c->alarm, c->next_attempt, on_alarm, c, now, closure_list); + gpr_mu_unlock (&c->mu); + } +} + +static gpr_timespec +compute_connect_deadline (grpc_subchannel * c) +{ + gpr_timespec current_deadline = gpr_time_add (c->next_attempt, c->backoff_delta); + gpr_timespec min_deadline = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds (GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS, + GPR_TIMESPAN)); + return gpr_time_cmp (current_deadline, min_deadline) > 0 ? current_deadline : min_deadline; +} + +static grpc_connectivity_state +compute_connectivity_locked (grpc_subchannel * c) +{ + if (c->disconnected) + { + return GRPC_CHANNEL_FATAL_FAILURE; + } + if (c->connecting) + { + if (c->have_alarm) + { + return GRPC_CHANNEL_TRANSIENT_FAILURE; + } + return GRPC_CHANNEL_CONNECTING; + } + if (c->active) + { + return GRPC_CHANNEL_READY; } - return GRPC_CHANNEL_CONNECTING; - } - if (c->active) { - return GRPC_CHANNEL_READY; - } return GRPC_CHANNEL_IDLE; } -static void connectivity_state_changed_locked(grpc_subchannel *c, - const char *reason, - grpc_closure_list *closure_list) { - grpc_connectivity_state current = compute_connectivity_locked(c); - grpc_connectivity_state_set(&c->state_tracker, current, reason, closure_list); +static void +connectivity_state_changed_locked (grpc_subchannel * c, const char *reason, grpc_closure_list * closure_list) +{ + grpc_connectivity_state current = compute_connectivity_locked (c); + grpc_connectivity_state_set (&c->state_tracker, current, reason, closure_list); } /* * grpc_subchannel_call implementation */ -void grpc_subchannel_call_ref( - grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_ref(&c->refs); -} - -void grpc_subchannel_call_unref(grpc_subchannel_call *c, - grpc_closure_list *closure_list - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - if (gpr_unref(&c->refs)) { - gpr_mu *mu = &c->connection->subchannel->mu; - grpc_subchannel *destroy; - grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), closure_list); - gpr_mu_lock(mu); - destroy = CONNECTION_UNREF_LOCKED(c->connection, "call", closure_list); - gpr_mu_unlock(mu); - gpr_free(c); - if (destroy != NULL) { - subchannel_destroy(destroy, closure_list); +void +grpc_subchannel_call_ref (grpc_subchannel_call * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ + gpr_ref (&c->refs); +} + +void +grpc_subchannel_call_unref (grpc_subchannel_call * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ + if (gpr_unref (&c->refs)) + { + gpr_mu *mu = &c->connection->subchannel->mu; + grpc_subchannel *destroy; + grpc_call_stack_destroy (SUBCHANNEL_CALL_TO_CALL_STACK (c), closure_list); + gpr_mu_lock (mu); + destroy = CONNECTION_UNREF_LOCKED (c->connection, "call", closure_list); + gpr_mu_unlock (mu); + gpr_free (c); + if (destroy != NULL) + { + subchannel_destroy (destroy, closure_list); + } } - } } -char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call, - grpc_closure_list *closure_list) { - grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); - grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); - return top_elem->filter->get_peer(top_elem, closure_list); +char * +grpc_subchannel_call_get_peer (grpc_subchannel_call * call, grpc_closure_list * closure_list) +{ + grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK (call); + grpc_call_element *top_elem = grpc_call_stack_element (call_stack, 0); + return top_elem->filter->get_peer (top_elem, closure_list); } -void grpc_subchannel_call_process_op(grpc_subchannel_call *call, - grpc_transport_stream_op *op, - grpc_closure_list *closure_list) { - grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); - grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); - top_elem->filter->start_transport_stream_op(top_elem, op, closure_list); +void +grpc_subchannel_call_process_op (grpc_subchannel_call * call, grpc_transport_stream_op * op, grpc_closure_list * closure_list) +{ + grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK (call); + grpc_call_element *top_elem = grpc_call_stack_element (call_stack, 0); + top_elem->filter->start_transport_stream_op (top_elem, op, closure_list); } -static grpc_subchannel_call *create_call(connection *con, - grpc_closure_list *closure_list) { - grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_subchannel_call *call = - gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); - grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); +static grpc_subchannel_call * +create_call (connection * con, grpc_closure_list * closure_list) +{ + grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION (con); + grpc_subchannel_call *call = gpr_malloc (sizeof (grpc_subchannel_call) + chanstk->call_stack_size); + grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK (call); call->connection = con; - gpr_ref_init(&call->refs, 1); - grpc_call_stack_init(chanstk, NULL, NULL, callstk, closure_list); + gpr_ref_init (&call->refs, 1); + grpc_call_stack_init (chanstk, NULL, NULL, callstk, closure_list); return call; } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 57cc248fec..69cf07f626 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -64,59 +64,37 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #endif -void grpc_subchannel_ref( - grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_unref(grpc_subchannel *channel, - grpc_closure_list *closure_list - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_call_ref( - grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_call_unref(grpc_subchannel_call *call, - grpc_closure_list *closure_list - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_ref (grpc_subchannel * channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_unref (grpc_subchannel * channel, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_call_ref (grpc_subchannel_call * call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_call_unref (grpc_subchannel_call * call, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a call (possibly asynchronously) */ -void grpc_subchannel_create_call(grpc_subchannel *subchannel, - grpc_pollset *pollset, - grpc_subchannel_call **target, - grpc_closure *notify, - grpc_closure_list *closure_list); +void grpc_subchannel_create_call (grpc_subchannel * subchannel, grpc_pollset * pollset, grpc_subchannel_call ** target, grpc_closure * notify, grpc_closure_list * closure_list); /** process a transport level op */ -void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, - grpc_transport_op *op, - grpc_closure_list *closure_list); +void grpc_subchannel_process_transport_op (grpc_subchannel * subchannel, grpc_transport_op * op, grpc_closure_list * closure_list); /** poll the current connectivity state of a channel */ -grpc_connectivity_state grpc_subchannel_check_connectivity( - grpc_subchannel *channel); +grpc_connectivity_state grpc_subchannel_check_connectivity (grpc_subchannel * channel); /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the channel */ -void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel, - grpc_connectivity_state *state, - grpc_closure *notify, - grpc_closure_list *closure_list); +void grpc_subchannel_notify_on_state_change (grpc_subchannel * channel, grpc_connectivity_state * state, grpc_closure * notify, grpc_closure_list * closure_list); /** express interest in \a channel's activities through \a pollset. */ -void grpc_subchannel_add_interested_party(grpc_subchannel *channel, - grpc_pollset *pollset, - grpc_closure_list *closure_list); +void grpc_subchannel_add_interested_party (grpc_subchannel * channel, grpc_pollset * pollset, grpc_closure_list * closure_list); /** stop following \a channel's activity through \a pollset. */ -void grpc_subchannel_del_interested_party(grpc_subchannel *channel, - grpc_pollset *pollset, - grpc_closure_list *closure_list); +void grpc_subchannel_del_interested_party (grpc_subchannel * channel, grpc_pollset * pollset, grpc_closure_list * closure_list); /** continue processing a transport op */ -void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call, - grpc_transport_stream_op *op, - grpc_closure_list *closure_list); +void grpc_subchannel_call_process_op (grpc_subchannel_call * subchannel_call, grpc_transport_stream_op * op, grpc_closure_list * closure_list); /** continue querying for peer */ -char *grpc_subchannel_call_get_peer(grpc_subchannel_call *subchannel_call, - grpc_closure_list *closure_list); +char *grpc_subchannel_call_get_peer (grpc_subchannel_call * subchannel_call, grpc_closure_list * closure_list); -struct grpc_subchannel_args { +struct grpc_subchannel_args +{ /** Channel filters for this channel - wrapped factories will likely want to mutate this */ const grpc_channel_filter **filters; @@ -134,7 +112,6 @@ struct grpc_subchannel_args { }; /** create a subchannel given a connector */ -grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, - grpc_subchannel_args *args); +grpc_subchannel *grpc_subchannel_create (grpc_connector * connector, grpc_subchannel_args * args); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */ diff --git a/src/core/client_config/subchannel_factory.c b/src/core/client_config/subchannel_factory.c index 51b805f2f5..29e854e2ed 100644 --- a/src/core/client_config/subchannel_factory.c +++ b/src/core/client_config/subchannel_factory.c @@ -33,17 +33,20 @@ #include "src/core/client_config/subchannel_factory.h" -void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory) { - factory->vtable->ref(factory); +void +grpc_subchannel_factory_ref (grpc_subchannel_factory * factory) +{ + factory->vtable->ref (factory); } -void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory, - grpc_closure_list *closure_list) { - factory->vtable->unref(factory, closure_list); +void +grpc_subchannel_factory_unref (grpc_subchannel_factory * factory, grpc_closure_list * closure_list) +{ + factory->vtable->unref (factory, closure_list); } -grpc_subchannel *grpc_subchannel_factory_create_subchannel( - grpc_subchannel_factory *factory, grpc_subchannel_args *args, - grpc_closure_list *closure_list) { - return factory->vtable->create_subchannel(factory, args, closure_list); +grpc_subchannel * +grpc_subchannel_factory_create_subchannel (grpc_subchannel_factory * factory, grpc_subchannel_args * args, grpc_closure_list * closure_list) +{ + return factory->vtable->create_subchannel (factory, args, closure_list); } diff --git a/src/core/client_config/subchannel_factory.h b/src/core/client_config/subchannel_factory.h index ea19ab8944..771638e0fb 100644 --- a/src/core/client_config/subchannel_factory.h +++ b/src/core/client_config/subchannel_factory.h @@ -42,26 +42,22 @@ typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable; /** Constructor for new configured channels. Creating decorators around this type is encouraged to adapt behavior. */ -struct grpc_subchannel_factory { +struct grpc_subchannel_factory +{ const grpc_subchannel_factory_vtable *vtable; }; -struct grpc_subchannel_factory_vtable { - void (*ref)(grpc_subchannel_factory *factory); - void (*unref)(grpc_subchannel_factory *factory, - grpc_closure_list *closure_list); - grpc_subchannel *(*create_subchannel)(grpc_subchannel_factory *factory, - grpc_subchannel_args *args, - grpc_closure_list *closure_list); +struct grpc_subchannel_factory_vtable +{ + void (*ref) (grpc_subchannel_factory * factory); + void (*unref) (grpc_subchannel_factory * factory, grpc_closure_list * closure_list); + grpc_subchannel *(*create_subchannel) (grpc_subchannel_factory * factory, grpc_subchannel_args * args, grpc_closure_list * closure_list); }; -void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory); -void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory, - grpc_closure_list *closure_list); +void grpc_subchannel_factory_ref (grpc_subchannel_factory * factory); +void grpc_subchannel_factory_unref (grpc_subchannel_factory * factory, grpc_closure_list * closure_list); /** Create a new grpc_subchannel */ -grpc_subchannel *grpc_subchannel_factory_create_subchannel( - grpc_subchannel_factory *factory, grpc_subchannel_args *args, - grpc_closure_list *closure_list); +grpc_subchannel *grpc_subchannel_factory_create_subchannel (grpc_subchannel_factory * factory, grpc_subchannel_args * args, grpc_closure_list * closure_list); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */ diff --git a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c index 585e465fa4..220f00106b 100644 --- a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c +++ b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c @@ -34,10 +34,11 @@ #include "src/core/client_config/subchannel_factory_decorators/add_channel_arg.h" #include "src/core/client_config/subchannel_factory_decorators/merge_channel_args.h" -grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg( - grpc_subchannel_factory *input, const grpc_arg *arg) { +grpc_subchannel_factory * +grpc_subchannel_factory_add_channel_arg (grpc_subchannel_factory * input, const grpc_arg * arg) +{ grpc_channel_args args; args.num_args = 1; - args.args = (grpc_arg *)arg; - return grpc_subchannel_factory_merge_channel_args(input, &args); + args.args = (grpc_arg *) arg; + return grpc_subchannel_factory_merge_channel_args (input, &args); } diff --git a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h index 8457294000..38f9c48d7a 100644 --- a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h +++ b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h @@ -39,8 +39,7 @@ /** Takes a subchannel factory, returns a new one that mutates incoming channel_args by adding a new argument; ownership of input, arg is retained by the caller. */ -grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg( - grpc_subchannel_factory *input, const grpc_arg *arg); +grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg (grpc_subchannel_factory * input, const grpc_arg * arg); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_ADD_CHANNEL_ARG_H \ - */ + */ diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c index cf5fb00cdf..993b95296b 100644 --- a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c +++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c @@ -35,52 +35,58 @@ #include <grpc/support/alloc.h> #include "src/core/channel/channel_args.h" -typedef struct { +typedef struct +{ grpc_subchannel_factory base; gpr_refcount refs; grpc_subchannel_factory *wrapped; grpc_channel_args *merge_args; } merge_args_factory; -static void merge_args_factory_ref(grpc_subchannel_factory *scf) { - merge_args_factory *f = (merge_args_factory *)scf; - gpr_ref(&f->refs); +static void +merge_args_factory_ref (grpc_subchannel_factory * scf) +{ + merge_args_factory *f = (merge_args_factory *) scf; + gpr_ref (&f->refs); } -static void merge_args_factory_unref(grpc_subchannel_factory *scf, - grpc_closure_list *closure_list) { - merge_args_factory *f = (merge_args_factory *)scf; - if (gpr_unref(&f->refs)) { - grpc_subchannel_factory_unref(f->wrapped, closure_list); - grpc_channel_args_destroy(f->merge_args); - gpr_free(f); - } +static void +merge_args_factory_unref (grpc_subchannel_factory * scf, grpc_closure_list * closure_list) +{ + merge_args_factory *f = (merge_args_factory *) scf; + if (gpr_unref (&f->refs)) + { + grpc_subchannel_factory_unref (f->wrapped, closure_list); + grpc_channel_args_destroy (f->merge_args); + gpr_free (f); + } } -static grpc_subchannel *merge_args_factory_create_subchannel( - grpc_subchannel_factory *scf, grpc_subchannel_args *args, - grpc_closure_list *closure_list) { - merge_args_factory *f = (merge_args_factory *)scf; - grpc_channel_args *final_args = - grpc_channel_args_merge(args->args, f->merge_args); +static grpc_subchannel * +merge_args_factory_create_subchannel (grpc_subchannel_factory * scf, grpc_subchannel_args * args, grpc_closure_list * closure_list) +{ + merge_args_factory *f = (merge_args_factory *) scf; + grpc_channel_args *final_args = grpc_channel_args_merge (args->args, f->merge_args); grpc_subchannel *s; args->args = final_args; - s = grpc_subchannel_factory_create_subchannel(f->wrapped, args, closure_list); - grpc_channel_args_destroy(final_args); + s = grpc_subchannel_factory_create_subchannel (f->wrapped, args, closure_list); + grpc_channel_args_destroy (final_args); return s; } static const grpc_subchannel_factory_vtable merge_args_factory_vtable = { - merge_args_factory_ref, merge_args_factory_unref, - merge_args_factory_create_subchannel}; + merge_args_factory_ref, merge_args_factory_unref, + merge_args_factory_create_subchannel +}; -grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args( - grpc_subchannel_factory *input, const grpc_channel_args *args) { - merge_args_factory *f = gpr_malloc(sizeof(*f)); +grpc_subchannel_factory * +grpc_subchannel_factory_merge_channel_args (grpc_subchannel_factory * input, const grpc_channel_args * args) +{ + merge_args_factory *f = gpr_malloc (sizeof (*f)); f->base.vtable = &merge_args_factory_vtable; - gpr_ref_init(&f->refs, 1); - grpc_subchannel_factory_ref(input); + gpr_ref_init (&f->refs, 1); + grpc_subchannel_factory_ref (input); f->wrapped = input; - f->merge_args = grpc_channel_args_copy(args); + f->merge_args = grpc_channel_args_copy (args); return &f->base; } diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h index f4757f0650..5155509a47 100644 --- a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h +++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h @@ -39,8 +39,7 @@ /** Takes a subchannel factory, returns a new one that mutates incoming channel_args by adding a new argument; ownership of input, args is retained by the caller. */ -grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args( - grpc_subchannel_factory *input, const grpc_channel_args *args); +grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args (grpc_subchannel_factory * input, const grpc_channel_args * args); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_MERGE_CHANNEL_ARGS_H \ - */ + */ diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c index 2738e2df57..4f4cc5b9c4 100644 --- a/src/core/client_config/uri_parser.c +++ b/src/core/client_config/uri_parser.c @@ -42,31 +42,35 @@ /** a size_t default value... maps to all 1's */ #define NOT_SET (~(size_t)0) -static grpc_uri *bad_uri(const char *uri_text, size_t pos, const char *section, - int suppress_errors) { +static grpc_uri * +bad_uri (const char *uri_text, size_t pos, const char *section, int suppress_errors) +{ char *line_prefix; size_t pfx_len; - if (!suppress_errors) { - gpr_asprintf(&line_prefix, "bad uri.%s: '", section); - pfx_len = strlen(line_prefix) + pos; - gpr_log(GPR_ERROR, "%s%s'", line_prefix, uri_text); - gpr_free(line_prefix); + if (!suppress_errors) + { + gpr_asprintf (&line_prefix, "bad uri.%s: '", section); + pfx_len = strlen (line_prefix) + pos; + gpr_log (GPR_ERROR, "%s%s'", line_prefix, uri_text); + gpr_free (line_prefix); - line_prefix = gpr_malloc(pfx_len + 1); - memset(line_prefix, ' ', pfx_len); - line_prefix[pfx_len] = 0; - gpr_log(GPR_ERROR, "%s^ here", line_prefix); - gpr_free(line_prefix); - } + line_prefix = gpr_malloc (pfx_len + 1); + memset (line_prefix, ' ', pfx_len); + line_prefix[pfx_len] = 0; + gpr_log (GPR_ERROR, "%s^ here", line_prefix); + gpr_free (line_prefix); + } return NULL; } /** Returns a copy of \a src[begin, end) */ -static char *copy_component(const char *src, size_t begin, size_t end) { - char *out = gpr_malloc(end - begin + 1); - memcpy(out, src + begin, end - begin); +static char * +copy_component (const char *src, size_t begin, size_t end) +{ + char *out = gpr_malloc (end - begin + 1); + memcpy (out, src + begin, end - begin); out[end - begin] = 0; return out; } @@ -74,67 +78,77 @@ static char *copy_component(const char *src, size_t begin, size_t end) { /** Returns how many chars to advance if \a uri_text[i] begins a valid \a pchar * production. If \a uri_text[i] introduces an invalid \a pchar (such as percent * sign not followed by two hex digits), NOT_SET is returned. */ -static size_t parse_pchar(const char *uri_text, size_t i) { +static size_t +parse_pchar (const char *uri_text, size_t i) +{ /* pchar = unreserved / pct-encoded / sub-delims / ":" / "@" * unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~" * pct-encoded = "%" HEXDIG HEXDIG * sub-delims = "!" / "$" / "&" / "'" / "(" / ")" - / "*" / "+" / "," / ";" / "=" */ + / "*" / "+" / "," / ";" / "=" */ char c = uri_text[i]; - if (((c >= 'A') && (c <= 'Z')) || ((c >= 'a') && (c <= 'z')) || - ((c >= '0') && (c <= '9')) || - (c == '-' || c == '.' || c == '_' || c == '~') || /* unreserved */ - - (c == '!' || c == '$' || c == '&' || c == '\'' || c == '$' || c == '&' || - c == '(' || c == ')' || c == '*' || c == '+' || c == ',' || c == ';' || - c == '=') /* sub-delims */) { - return 1; - } - if (c == '%') { /* pct-encoded */ - size_t j; - if (uri_text[i + 1] == 0 || uri_text[i + 2] == 0) { - return NOT_SET; + if (((c >= 'A') && (c <= 'Z')) || ((c >= 'a') && (c <= 'z')) || ((c >= '0') && (c <= '9')) || (c == '-' || c == '.' || c == '_' || c == '~') || /* unreserved */ + (c == '!' || c == '$' || c == '&' || c == '\'' || c == '$' || c == '&' || c == '(' || c == ')' || c == '*' || c == '+' || c == ',' || c == ';' || c == '=') /* sub-delims */ ) + { + return 1; } - for (j = i + 1; j < 2; j++) { - c = uri_text[j]; - if (!(((c >= '0') && (c <= '9')) || ((c >= 'a') && (c <= 'f')) || - ((c >= 'A') && (c <= 'F')))) { - return NOT_SET; - } + if (c == '%') + { /* pct-encoded */ + size_t j; + if (uri_text[i + 1] == 0 || uri_text[i + 2] == 0) + { + return NOT_SET; + } + for (j = i + 1; j < 2; j++) + { + c = uri_text[j]; + if (!(((c >= '0') && (c <= '9')) || ((c >= 'a') && (c <= 'f')) || ((c >= 'A') && (c <= 'F')))) + { + return NOT_SET; + } + } + return 2; } - return 2; - } return 0; } /* *( pchar / "?" / "/" ) */ -static int parse_fragment_or_query(const char *uri_text, size_t *i) { +static int +parse_fragment_or_query (const char *uri_text, size_t * i) +{ char c; - while ((c = uri_text[*i]) != 0) { - const size_t advance = parse_pchar(uri_text, *i); /* pchar */ - switch (advance) { - case 0: /* uri_text[i] isn't in pchar */ - /* maybe it's ? or / */ - if (uri_text[*i] == '?' || uri_text[*i] == '/') { - (*i)++; - break; - } else { - return 1; - } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - default: - (*i) += advance; - break; - case NOT_SET: /* uri_text[i] introduces an invalid URI */ - return 0; + while ((c = uri_text[*i]) != 0) + { + const size_t advance = parse_pchar (uri_text, *i); /* pchar */ + switch (advance) + { + case 0: /* uri_text[i] isn't in pchar */ + /* maybe it's ? or / */ + if (uri_text[*i] == '?' || uri_text[*i] == '/') + { + (*i)++; + break; + } + else + { + return 1; + } + gpr_log (GPR_ERROR, "should never reach here"); + abort (); + default: + (*i) += advance; + break; + case NOT_SET: /* uri_text[i] introduces an invalid URI */ + return 0; + } } - } /* *i is the first uri_text position past the \a query production, maybe \0 */ return 1; } -grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { +grpc_uri * +grpc_uri_parse (const char *uri_text, int suppress_errors) +{ grpc_uri *uri; size_t scheme_begin = 0; size_t scheme_end = NOT_SET; @@ -148,96 +162,127 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { size_t fragment_end = NOT_SET; size_t i; - for (i = scheme_begin; uri_text[i] != 0; i++) { - if (uri_text[i] == ':') { - scheme_end = i; + for (i = scheme_begin; uri_text[i] != 0; i++) + { + if (uri_text[i] == ':') + { + scheme_end = i; + break; + } + if (uri_text[i] >= 'a' && uri_text[i] <= 'z') + continue; + if (uri_text[i] >= 'A' && uri_text[i] <= 'Z') + continue; + if (i != scheme_begin) + { + if (uri_text[i] >= '0' && uri_text[i] <= '9') + continue; + if (uri_text[i] == '+') + continue; + if (uri_text[i] == '-') + continue; + if (uri_text[i] == '.') + continue; + } break; } - if (uri_text[i] >= 'a' && uri_text[i] <= 'z') continue; - if (uri_text[i] >= 'A' && uri_text[i] <= 'Z') continue; - if (i != scheme_begin) { - if (uri_text[i] >= '0' && uri_text[i] <= '9') continue; - if (uri_text[i] == '+') continue; - if (uri_text[i] == '-') continue; - if (uri_text[i] == '.') continue; + if (scheme_end == NOT_SET) + { + return bad_uri (uri_text, i, "scheme", suppress_errors); } - break; - } - if (scheme_end == NOT_SET) { - return bad_uri(uri_text, i, "scheme", suppress_errors); - } - - if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') { - authority_begin = scheme_end + 3; - for (i = authority_begin; uri_text[i] != 0 && authority_end == NOT_SET; - i++) { - if (uri_text[i] == '/' || uri_text[i] == '?' || uri_text[i] == '#') { - authority_end = i; - } + + if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') + { + authority_begin = scheme_end + 3; + for (i = authority_begin; uri_text[i] != 0 && authority_end == NOT_SET; i++) + { + if (uri_text[i] == '/' || uri_text[i] == '?' || uri_text[i] == '#') + { + authority_end = i; + } + } + if (authority_end == NOT_SET && uri_text[i] == 0) + { + authority_end = i; + } + if (authority_end == NOT_SET) + { + return bad_uri (uri_text, i, "authority", suppress_errors); + } + /* TODO(ctiller): parse the authority correctly */ + path_begin = authority_end; } - if (authority_end == NOT_SET && uri_text[i] == 0) { - authority_end = i; + else + { + path_begin = scheme_end + 1; } - if (authority_end == NOT_SET) { - return bad_uri(uri_text, i, "authority", suppress_errors); + + for (i = path_begin; uri_text[i] != 0; i++) + { + if (uri_text[i] == '?' || uri_text[i] == '#') + { + path_end = i; + break; + } } - /* TODO(ctiller): parse the authority correctly */ - path_begin = authority_end; - } else { - path_begin = scheme_end + 1; - } - - for (i = path_begin; uri_text[i] != 0; i++) { - if (uri_text[i] == '?' || uri_text[i] == '#') { + if (path_end == NOT_SET && uri_text[i] == 0) + { path_end = i; - break; } - } - if (path_end == NOT_SET && uri_text[i] == 0) { - path_end = i; - } - if (path_end == NOT_SET) { - return bad_uri(uri_text, i, "path", suppress_errors); - } - - if (uri_text[i] == '?') { - query_begin = ++i; - if (!parse_fragment_or_query(uri_text, &i)) { - return bad_uri(uri_text, i, "query", suppress_errors); - } else if (uri_text[i] != 0 && uri_text[i] != '#') { - /* We must be at the end or at the beginning of a fragment */ - return bad_uri(uri_text, i, "query", suppress_errors); + if (path_end == NOT_SET) + { + return bad_uri (uri_text, i, "path", suppress_errors); + } + + if (uri_text[i] == '?') + { + query_begin = ++i; + if (!parse_fragment_or_query (uri_text, &i)) + { + return bad_uri (uri_text, i, "query", suppress_errors); + } + else if (uri_text[i] != 0 && uri_text[i] != '#') + { + /* We must be at the end or at the beginning of a fragment */ + return bad_uri (uri_text, i, "query", suppress_errors); + } + query_end = i; } - query_end = i; - } - if (uri_text[i] == '#') { - fragment_begin = ++i; - if (!parse_fragment_or_query(uri_text, &i)) { - return bad_uri(uri_text, i - fragment_end, "fragment", suppress_errors); - } else if (uri_text[i] != 0) { - /* We must be at the end */ - return bad_uri(uri_text, i, "fragment", suppress_errors); + if (uri_text[i] == '#') + { + fragment_begin = ++i; + if (!parse_fragment_or_query (uri_text, &i)) + { + return bad_uri (uri_text, i - fragment_end, "fragment", suppress_errors); + } + else if (uri_text[i] != 0) + { + /* We must be at the end */ + return bad_uri (uri_text, i, "fragment", suppress_errors); + } + fragment_end = i; } - fragment_end = i; - } - uri = gpr_malloc(sizeof(*uri)); - memset(uri, 0, sizeof(*uri)); - uri->scheme = copy_component(uri_text, scheme_begin, scheme_end); - uri->authority = copy_component(uri_text, authority_begin, authority_end); - uri->path = copy_component(uri_text, path_begin, path_end); - uri->query = copy_component(uri_text, query_begin, query_end); - uri->fragment = copy_component(uri_text, fragment_begin, fragment_end); + uri = gpr_malloc (sizeof (*uri)); + memset (uri, 0, sizeof (*uri)); + uri->scheme = copy_component (uri_text, scheme_begin, scheme_end); + uri->authority = copy_component (uri_text, authority_begin, authority_end); + uri->path = copy_component (uri_text, path_begin, path_end); + uri->query = copy_component (uri_text, query_begin, query_end); + uri->fragment = copy_component (uri_text, fragment_begin, fragment_end); return uri; } -void grpc_uri_destroy(grpc_uri *uri) { - if (!uri) return; - gpr_free(uri->scheme); - gpr_free(uri->authority); - gpr_free(uri->path); - gpr_free(uri->query); - gpr_free(uri->fragment); - gpr_free(uri); +void +grpc_uri_destroy (grpc_uri * uri) +{ + if (!uri) + return; + gpr_free (uri->scheme); + gpr_free (uri->authority); + gpr_free (uri->path); + gpr_free (uri->query); + gpr_free (uri->fragment); + gpr_free (uri); } diff --git a/src/core/client_config/uri_parser.h b/src/core/client_config/uri_parser.h index b8daa13bd4..1eb26e7ca6 100644 --- a/src/core/client_config/uri_parser.h +++ b/src/core/client_config/uri_parser.h @@ -34,7 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_URI_PARSER_H #define GRPC_INTERNAL_CORE_CLIENT_CONFIG_URI_PARSER_H -typedef struct { +typedef struct +{ char *scheme; char *authority; char *path; @@ -43,9 +44,9 @@ typedef struct { } grpc_uri; /** parse a uri, return NULL on failure */ -grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors); +grpc_uri *grpc_uri_parse (const char *uri_text, int suppress_errors); /** destroy a uri */ -void grpc_uri_destroy(grpc_uri *uri); +void grpc_uri_destroy (grpc_uri * uri); #endif |