diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-22 12:33:20 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-22 12:33:20 -0700 |
commit | a82950e68318a6aab6fe894fa39f7fa616c4647b (patch) | |
tree | 7d02bd1e9e1cbae1f14ad4ad1e06d3ae81a96dfe /src/core/client_config | |
parent | 8af4c337181322cc4fb396199c90f574cfb4163f (diff) |
clang-format all core files
Diffstat (limited to 'src/core/client_config')
36 files changed, 2134 insertions, 2451 deletions
diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c index fc6448201f..6ecffb3854 100644 --- a/src/core/client_config/client_config.c +++ b/src/core/client_config/client_config.c @@ -37,50 +37,36 @@ #include <grpc/support/alloc.h> -struct grpc_client_config -{ +struct grpc_client_config { gpr_refcount refs; grpc_lb_policy *lb_policy; }; -grpc_client_config * -grpc_client_config_create () -{ - grpc_client_config *c = gpr_malloc (sizeof (*c)); - memset (c, 0, sizeof (*c)); - gpr_ref_init (&c->refs, 1); +grpc_client_config *grpc_client_config_create() { + grpc_client_config *c = gpr_malloc(sizeof(*c)); + memset(c, 0, sizeof(*c)); + gpr_ref_init(&c->refs, 1); return c; } -void -grpc_client_config_ref (grpc_client_config * c) -{ - gpr_ref (&c->refs); -} +void grpc_client_config_ref(grpc_client_config *c) { gpr_ref(&c->refs); } -void -grpc_client_config_unref (grpc_exec_ctx * exec_ctx, grpc_client_config * c) -{ - if (gpr_unref (&c->refs)) - { - GRPC_LB_POLICY_UNREF (exec_ctx, c->lb_policy, "client_config"); - gpr_free (c); - } +void grpc_client_config_unref(grpc_exec_ctx *exec_ctx, grpc_client_config *c) { + if (gpr_unref(&c->refs)) { + GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "client_config"); + gpr_free(c); + } } -void -grpc_client_config_set_lb_policy (grpc_client_config * c, grpc_lb_policy * lb_policy) -{ - GPR_ASSERT (c->lb_policy == NULL); - if (lb_policy) - { - GRPC_LB_POLICY_REF (lb_policy, "client_config"); - } +void grpc_client_config_set_lb_policy(grpc_client_config *c, + grpc_lb_policy *lb_policy) { + GPR_ASSERT(c->lb_policy == NULL); + if (lb_policy) { + GRPC_LB_POLICY_REF(lb_policy, "client_config"); + } c->lb_policy = lb_policy; } -grpc_lb_policy * -grpc_client_config_get_lb_policy (grpc_client_config * c) -{ +grpc_lb_policy *grpc_client_config_get_lb_policy(grpc_client_config *c) { return c->lb_policy; } diff --git a/src/core/client_config/client_config.h b/src/core/client_config/client_config.h index c6afafe023..04bf036b00 100644 --- a/src/core/client_config/client_config.h +++ b/src/core/client_config/client_config.h @@ -40,11 +40,14 @@ grpc_resolver */ typedef struct grpc_client_config grpc_client_config; -grpc_client_config *grpc_client_config_create (); -void grpc_client_config_ref (grpc_client_config * client_config); -void grpc_client_config_unref (grpc_exec_ctx * exec_ctx, grpc_client_config * client_config); +grpc_client_config *grpc_client_config_create(); +void grpc_client_config_ref(grpc_client_config *client_config); +void grpc_client_config_unref(grpc_exec_ctx *exec_ctx, + grpc_client_config *client_config); -void grpc_client_config_set_lb_policy (grpc_client_config * client_config, grpc_lb_policy * lb_policy); -grpc_lb_policy *grpc_client_config_get_lb_policy (grpc_client_config * client_config); +void grpc_client_config_set_lb_policy(grpc_client_config *client_config, + grpc_lb_policy *lb_policy); +grpc_lb_policy *grpc_client_config_get_lb_policy( + grpc_client_config *client_config); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_CLIENT_CONFIG_H */ diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index 209c3f1767..1603ffb8be 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -33,26 +33,22 @@ #include "src/core/client_config/connector.h" -void -grpc_connector_ref (grpc_connector * connector) -{ - connector->vtable->ref (connector); +void grpc_connector_ref(grpc_connector* connector) { + connector->vtable->ref(connector); } -void -grpc_connector_unref (grpc_exec_ctx * exec_ctx, grpc_connector * connector) -{ - connector->vtable->unref (exec_ctx, connector); +void grpc_connector_unref(grpc_exec_ctx* exec_ctx, grpc_connector* connector) { + connector->vtable->unref(exec_ctx, connector); } -void -grpc_connector_connect (grpc_exec_ctx * exec_ctx, grpc_connector * connector, const grpc_connect_in_args * in_args, grpc_connect_out_args * out_args, grpc_closure * notify) -{ - connector->vtable->connect (exec_ctx, connector, in_args, out_args, notify); +void grpc_connector_connect(grpc_exec_ctx* exec_ctx, grpc_connector* connector, + const grpc_connect_in_args* in_args, + grpc_connect_out_args* out_args, + grpc_closure* notify) { + connector->vtable->connect(exec_ctx, connector, in_args, out_args, notify); } -void -grpc_connector_shutdown (grpc_exec_ctx * exec_ctx, grpc_connector * connector) -{ - connector->vtable->shutdown (exec_ctx, connector); +void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx, + grpc_connector* connector) { + connector->vtable->shutdown(exec_ctx, connector); } diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index 3abee64008..e9b8be4b53 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -41,13 +41,11 @@ typedef struct grpc_connector grpc_connector; typedef struct grpc_connector_vtable grpc_connector_vtable; -struct grpc_connector -{ +struct grpc_connector { const grpc_connector_vtable *vtable; }; -typedef struct -{ +typedef struct { /** set of pollsets interested in this connection */ grpc_pollset_set *interested_parties; /** address to connect to */ @@ -59,8 +57,7 @@ typedef struct const grpc_channel_args *channel_args; } grpc_connect_in_args; -typedef struct -{ +typedef struct { /** the connected transport */ grpc_transport *transport; /** any additional filters (owned by the caller of connect) */ @@ -68,21 +65,26 @@ typedef struct size_t num_filters; } grpc_connect_out_args; -struct grpc_connector_vtable -{ - void (*ref) (grpc_connector * connector); - void (*unref) (grpc_exec_ctx * exec_ctx, grpc_connector * connector); +struct grpc_connector_vtable { + void (*ref)(grpc_connector *connector); + void (*unref)(grpc_exec_ctx *exec_ctx, grpc_connector *connector); /** Implementation of grpc_connector_shutdown */ - void (*shutdown) (grpc_exec_ctx * exec_ctx, grpc_connector * connector); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector); /** Implementation of grpc_connector_connect */ - void (*connect) (grpc_exec_ctx * exec_ctx, grpc_connector * connector, const grpc_connect_in_args * in_args, grpc_connect_out_args * out_args, grpc_closure * notify); + void (*connect)(grpc_exec_ctx *exec_ctx, grpc_connector *connector, + const grpc_connect_in_args *in_args, + grpc_connect_out_args *out_args, grpc_closure *notify); }; -void grpc_connector_ref (grpc_connector * connector); -void grpc_connector_unref (grpc_exec_ctx * exec_ctx, grpc_connector * connector); +void grpc_connector_ref(grpc_connector *connector); +void grpc_connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *connector); /** Connect using the connector: max one outstanding call at a time */ -void grpc_connector_connect (grpc_exec_ctx * exec_ctx, grpc_connector * connector, const grpc_connect_in_args * in_args, grpc_connect_out_args * out_args, grpc_closure * notify); +void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector, + const grpc_connect_in_args *in_args, + grpc_connect_out_args *out_args, + grpc_closure *notify); /** Cancel any pending connection */ -void grpc_connector_shutdown (grpc_exec_ctx * exec_ctx, grpc_connector * connector); +void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx, + grpc_connector *connector); #endif diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 75e6c01839..5fa1ee4418 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -39,16 +39,14 @@ #include <grpc/support/alloc.h> #include "src/core/transport/connectivity_state.h" -typedef struct pending_pick -{ +typedef struct pending_pick { struct pending_pick *next; grpc_pollset *pollset; grpc_subchannel **target; grpc_closure *on_complete; } pending_pick; -typedef struct -{ +typedef struct { /** base policy: must be first */ grpc_lb_policy base; /** all our subchannels */ @@ -78,303 +76,284 @@ typedef struct grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -static void -del_interested_parties_locked (grpc_exec_ctx * exec_ctx, pick_first_lb_policy * p) -{ +static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) - { - grpc_subchannel_del_interested_party (exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); - } + for (pp = p->pending_picks; pp; pp = pp->next) { + grpc_subchannel_del_interested_party( + exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); + } } -static void -add_interested_parties_locked (grpc_exec_ctx * exec_ctx, pick_first_lb_policy * p) -{ +static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) - { - grpc_subchannel_add_interested_party (exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); - } + for (pp = p->pending_picks; pp; pp = pp->next) { + grpc_subchannel_add_interested_party( + exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); + } } -void -pf_destroy (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) -{ - pick_first_lb_policy *p = (pick_first_lb_policy *) pol; +void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; - GPR_ASSERT (p->pending_picks == NULL); - for (i = 0; i < p->num_subchannels; i++) - { - GRPC_SUBCHANNEL_UNREF (exec_ctx, p->subchannels[i], "pick_first"); - } - grpc_connectivity_state_destroy (exec_ctx, &p->state_tracker); - gpr_free (p->subchannels); - gpr_mu_destroy (&p->mu); - gpr_free (p); + GPR_ASSERT(p->pending_picks == NULL); + for (i = 0; i < p->num_subchannels; i++) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); + } + grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + gpr_free(p->subchannels); + gpr_mu_destroy(&p->mu); + gpr_free(p); } -void -pf_shutdown (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) -{ - pick_first_lb_policy *p = (pick_first_lb_policy *) pol; +void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; - gpr_mu_lock (&p->mu); - del_interested_parties_locked (exec_ctx, p); + gpr_mu_lock(&p->mu); + del_interested_parties_locked(exec_ctx, p); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); - gpr_mu_unlock (&p->mu); - while (pp != NULL) - { - pending_pick *next = pp->next; - *pp->target = NULL; - grpc_exec_ctx_enqueue (exec_ctx, pp->on_complete, 1); - gpr_free (pp); - pp = next; - } + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); + gpr_mu_unlock(&p->mu); + while (pp != NULL) { + pending_pick *next = pp->next; + *pp->target = NULL; + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + gpr_free(pp); + pp = next; + } } -static void -start_picking (grpc_exec_ctx * exec_ctx, pick_first_lb_policy * p) -{ +static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_REF (&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); + GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], + &p->checking_connectivity, &p->connectivity_changed); } -void -pf_exit_idle (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) -{ - pick_first_lb_policy *p = (pick_first_lb_policy *) pol; - gpr_mu_lock (&p->mu); - if (!p->started_picking) - { - start_picking (exec_ctx, p); - } - gpr_mu_unlock (&p->mu); +void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (!p->started_picking) { + start_picking(exec_ctx, p); + } + gpr_mu_unlock(&p->mu); } -void -pf_pick (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete) -{ - pick_first_lb_policy *p = (pick_first_lb_policy *) pol; +void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, + grpc_subchannel **target, grpc_closure *on_complete) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; - gpr_mu_lock (&p->mu); - if (p->selected) - { - gpr_mu_unlock (&p->mu); - *target = p->selected; - grpc_exec_ctx_enqueue (exec_ctx, on_complete, 1); - } - else - { - if (!p->started_picking) - { - start_picking (exec_ctx, p); - } - grpc_subchannel_add_interested_party (exec_ctx, p->subchannels[p->checking_subchannel], pollset); - pp = gpr_malloc (sizeof (*pp)); - pp->next = p->pending_picks; - pp->pollset = pollset; - pp->target = target; - pp->on_complete = on_complete; - p->pending_picks = pp; - gpr_mu_unlock (&p->mu); + gpr_mu_lock(&p->mu); + if (p->selected) { + gpr_mu_unlock(&p->mu); + *target = p->selected; + grpc_exec_ctx_enqueue(exec_ctx, on_complete, 1); + } else { + if (!p->started_picking) { + start_picking(exec_ctx, p); } + grpc_subchannel_add_interested_party( + exec_ctx, p->subchannels[p->checking_subchannel], pollset); + pp = gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->pollset = pollset; + pp->target = target; + pp->on_complete = on_complete; + p->pending_picks = pp; + gpr_mu_unlock(&p->mu); + } } -static void -pf_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { pick_first_lb_policy *p = arg; pending_pick *pp; - gpr_mu_lock (&p->mu); + gpr_mu_lock(&p->mu); - if (p->shutdown) - { - gpr_mu_unlock (&p->mu); - GRPC_LB_POLICY_UNREF (exec_ctx, &p->base, "pick_first_connectivity"); - return; + if (p->shutdown) { + gpr_mu_unlock(&p->mu); + GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + return; + } else if (p->selected != NULL) { + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + p->checking_connectivity, "selected_changed"); + if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { + grpc_subchannel_notify_on_state_change(exec_ctx, p->selected, + &p->checking_connectivity, + &p->connectivity_changed); + } else { + GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); } - else if (p->selected != NULL) - { - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, p->checking_connectivity, "selected_changed"); - if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) - { - grpc_subchannel_notify_on_state_change (exec_ctx, p->selected, &p->checking_connectivity, &p->connectivity_changed); - } - else - { - GRPC_LB_POLICY_UNREF (exec_ctx, &p->base, "pick_first_connectivity"); - } - } - else - { - loop: - switch (p->checking_connectivity) - { - case GRPC_CHANNEL_READY: - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); - p->selected = p->subchannels[p->checking_subchannel]; - while ((pp = p->pending_picks)) - { - p->pending_picks = pp->next; - *pp->target = p->selected; - grpc_subchannel_del_interested_party (exec_ctx, p->selected, pp->pollset); - grpc_exec_ctx_enqueue (exec_ctx, pp->on_complete, 1); - gpr_free (pp); - } - grpc_subchannel_notify_on_state_change (exec_ctx, p->selected, &p->checking_connectivity, &p->connectivity_changed); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure"); - del_interested_parties_locked (exec_ctx, p); - p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; - p->checking_connectivity = grpc_subchannel_check_connectivity (p->subchannels[p->checking_subchannel]); - add_interested_parties_locked (exec_ctx, p); - if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) - { - grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); - } - else - { - goto loop; - } - break; - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING, "connecting_changed"); - grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); - break; - case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked (exec_ctx, p); - GPR_SWAP (grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]); - p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF (exec_ctx, p->subchannels[p->num_subchannels], "pick_first"); - if (p->num_subchannels == 0) - { - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "no_more_channels"); - while ((pp = p->pending_picks)) - { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_exec_ctx_enqueue (exec_ctx, pp->on_complete, 1); - gpr_free (pp); - } - GRPC_LB_POLICY_UNREF (exec_ctx, &p->base, "pick_first_connectivity"); - } - else - { - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "subchannel_failed"); - p->checking_subchannel %= p->num_subchannels; - p->checking_connectivity = grpc_subchannel_check_connectivity (p->subchannels[p->checking_subchannel]); - add_interested_parties_locked (exec_ctx, p); - goto loop; - } - } + } else { + loop: + switch (p->checking_connectivity) { + case GRPC_CHANNEL_READY: + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_READY, "connecting_ready"); + p->selected = p->subchannels[p->checking_subchannel]; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = p->selected; + grpc_subchannel_del_interested_party(exec_ctx, p->selected, + pp->pollset); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + gpr_free(pp); + } + grpc_subchannel_notify_on_state_change(exec_ctx, p->selected, + &p->checking_connectivity, + &p->connectivity_changed); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "connecting_transient_failure"); + del_interested_parties_locked(exec_ctx, p); + p->checking_subchannel = + (p->checking_subchannel + 1) % p->num_subchannels; + p->checking_connectivity = grpc_subchannel_check_connectivity( + p->subchannels[p->checking_subchannel]); + add_interested_parties_locked(exec_ctx, p); + if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], + &p->checking_connectivity, &p->connectivity_changed); + } else { + goto loop; + } + break; + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE: + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_CONNECTING, + "connecting_changed"); + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], + &p->checking_connectivity, &p->connectivity_changed); + break; + case GRPC_CHANNEL_FATAL_FAILURE: + del_interested_parties_locked(exec_ctx, p); + GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], + p->subchannels[p->num_subchannels - 1]); + p->num_subchannels--; + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], + "pick_first"); + if (p->num_subchannels == 0) { + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, + "no_more_channels"); + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + gpr_free(pp); + } + GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + } else { + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "subchannel_failed"); + p->checking_subchannel %= p->num_subchannels; + p->checking_connectivity = grpc_subchannel_check_connectivity( + p->subchannels[p->checking_subchannel]); + add_interested_parties_locked(exec_ctx, p); + goto loop; + } } + } - gpr_mu_unlock (&p->mu); + gpr_mu_unlock(&p->mu); } -static void -pf_broadcast (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol, grpc_transport_op * op) -{ - pick_first_lb_policy *p = (pick_first_lb_policy *) pol; +static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_transport_op *op) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; size_t n; grpc_subchannel **subchannels; - gpr_mu_lock (&p->mu); + gpr_mu_lock(&p->mu); n = p->num_subchannels; - subchannels = gpr_malloc (n * sizeof (*subchannels)); - for (i = 0; i < n; i++) - { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF (subchannels[i], "pf_broadcast"); - } - gpr_mu_unlock (&p->mu); + subchannels = gpr_malloc(n * sizeof(*subchannels)); + for (i = 0; i < n; i++) { + subchannels[i] = p->subchannels[i]; + GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast"); + } + gpr_mu_unlock(&p->mu); - for (i = 0; i < n; i++) - { - grpc_subchannel_process_transport_op (exec_ctx, subchannels[i], op); - GRPC_SUBCHANNEL_UNREF (exec_ctx, subchannels[i], "pf_broadcast"); - } - gpr_free (subchannels); + for (i = 0; i < n; i++) { + grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); + GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); + } + gpr_free(subchannels); } -static grpc_connectivity_state -pf_check_connectivity (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) -{ - pick_first_lb_policy *p = (pick_first_lb_policy *) pol; +static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; grpc_connectivity_state st; - gpr_mu_lock (&p->mu); - st = grpc_connectivity_state_check (&p->state_tracker); - gpr_mu_unlock (&p->mu); + gpr_mu_lock(&p->mu); + st = grpc_connectivity_state_check(&p->state_tracker); + gpr_mu_unlock(&p->mu); return st; } -void -pf_notify_on_state_change (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol, grpc_connectivity_state * current, grpc_closure * notify) -{ - pick_first_lb_policy *p = (pick_first_lb_policy *) pol; - gpr_mu_lock (&p->mu); - grpc_connectivity_state_notify_on_state_change (exec_ctx, &p->state_tracker, current, notify); - gpr_mu_unlock (&p->mu); +void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + gpr_mu_lock(&p->mu); + grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, + current, notify); + gpr_mu_unlock(&p->mu); } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, - pf_shutdown, - pf_pick, - pf_exit_idle, - pf_broadcast, - pf_check_connectivity, - pf_notify_on_state_change -}; + pf_destroy, + pf_shutdown, + pf_pick, + pf_exit_idle, + pf_broadcast, + pf_check_connectivity, + pf_notify_on_state_change}; -static void -pick_first_factory_ref (grpc_lb_policy_factory * factory) -{ -} +static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} -static void -pick_first_factory_unref (grpc_lb_policy_factory * factory) -{ -} +static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} -static grpc_lb_policy * -create_pick_first (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args) -{ - pick_first_lb_policy *p = gpr_malloc (sizeof (*p)); - GPR_ASSERT (args->num_subchannels > 0); - memset (p, 0, sizeof (*p)); - grpc_lb_policy_init (&p->base, &pick_first_lb_policy_vtable); - p->subchannels = gpr_malloc (sizeof (grpc_subchannel *) * args->num_subchannels); +static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { + pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); + GPR_ASSERT(args->num_subchannels > 0); + memset(p, 0, sizeof(*p)); + grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); + p->subchannels = + gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); p->num_subchannels = args->num_subchannels; - grpc_connectivity_state_init (&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first"); - memcpy (p->subchannels, args->subchannels, sizeof (grpc_subchannel *) * args->num_subchannels); - grpc_closure_init (&p->connectivity_changed, pf_connectivity_changed, p); - gpr_mu_init (&p->mu); + grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, + "pick_first"); + memcpy(p->subchannels, args->subchannels, + sizeof(grpc_subchannel *) * args->num_subchannels); + grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); + gpr_mu_init(&p->mu); return &p->base; } static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = { - pick_first_factory_ref, pick_first_factory_unref, create_pick_first, - "pick_first" -}; + pick_first_factory_ref, pick_first_factory_unref, create_pick_first, + "pick_first"}; static grpc_lb_policy_factory pick_first_lb_policy_factory = { - &pick_first_factory_vtable -}; + &pick_first_factory_vtable}; -grpc_lb_policy_factory * -grpc_pick_first_lb_factory_create () -{ +grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() { return &pick_first_lb_policy_factory; } diff --git a/src/core/client_config/lb_policies/pick_first.h b/src/core/client_config/lb_policies/pick_first.h index 72a4e7c32c..3ca53ad42a 100644 --- a/src/core/client_config/lb_policies/pick_first.h +++ b/src/core/client_config/lb_policies/pick_first.h @@ -38,6 +38,6 @@ /** Returns a load balancing factory for the pick first policy, which picks up * the first subchannel from \a subchannels to succesfully connect */ -grpc_lb_policy_factory *grpc_pick_first_lb_factory_create (); +grpc_lb_policy_factory *grpc_pick_first_lb_factory_create(); #endif diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 5f8976396b..479c376724 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -43,8 +43,7 @@ int grpc_lb_round_robin_trace = 0; /** List of entities waiting for a pick. * * Once a pick is available, \a target is updated and \a on_complete called. */ -typedef struct pending_pick -{ +typedef struct pending_pick { struct pending_pick *next; grpc_pollset *pollset; grpc_subchannel **target; @@ -52,21 +51,18 @@ typedef struct pending_pick } pending_pick; /** List of subchannels in a connectivity READY state */ -typedef struct ready_list -{ +typedef struct ready_list { grpc_subchannel *subchannel; struct ready_list *next; struct ready_list *prev; } ready_list; -typedef struct -{ +typedef struct { size_t subchannel_idx; /**< Index over p->subchannels */ - void *p; /**< round_robin_lb_policy instance */ + void *p; /**< round_robin_lb_policy instance */ } connectivity_changed_cb_arg; -typedef struct -{ +typedef struct { /** base policy: must be first */ grpc_lb_policy base; @@ -110,264 +106,224 @@ typedef struct * * Note that this function does *not* advance p->ready_list_last_pick. Use \a * advance_last_picked_locked() for that. */ -static ready_list * -peek_next_connected_locked (const round_robin_lb_policy * p) -{ +static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) { ready_list *selected; selected = p->ready_list_last_pick->next; - while (selected != NULL) - { - if (selected == &p->ready_list) - { - GPR_ASSERT (selected->subchannel == NULL); - /* skip dummy root */ - selected = selected->next; - } - else - { - GPR_ASSERT (selected->subchannel != NULL); - return selected; - } + while (selected != NULL) { + if (selected == &p->ready_list) { + GPR_ASSERT(selected->subchannel == NULL); + /* skip dummy root */ + selected = selected->next; + } else { + GPR_ASSERT(selected->subchannel != NULL); + return selected; } + } return NULL; } /** Advance the \a ready_list picking head. */ -static void -advance_last_picked_locked (round_robin_lb_policy * p) -{ - if (p->ready_list_last_pick->next != NULL) - { /* non-empty list */ +static void advance_last_picked_locked(round_robin_lb_policy *p) { + if (p->ready_list_last_pick->next != NULL) { /* non-empty list */ + p->ready_list_last_pick = p->ready_list_last_pick->next; + if (p->ready_list_last_pick == &p->ready_list) { + /* skip dummy root */ p->ready_list_last_pick = p->ready_list_last_pick->next; - if (p->ready_list_last_pick == &p->ready_list) - { - /* skip dummy root */ - p->ready_list_last_pick = p->ready_list_last_pick->next; - } - } - else - { /* should be an empty list */ - GPR_ASSERT (p->ready_list_last_pick == &p->ready_list); - } - - if (grpc_lb_round_robin_trace) - { - gpr_log (GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", p->ready_list_last_pick, p->ready_list_last_pick->subchannel); } + } else { /* should be an empty list */ + GPR_ASSERT(p->ready_list_last_pick == &p->ready_list); + } + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", + p->ready_list_last_pick, p->ready_list_last_pick->subchannel); + } } /** Prepends (relative to the root at p->ready_list) the connected subchannel \a * csc to the list of ready subchannels. */ -static ready_list * -add_connected_sc_locked (round_robin_lb_policy * p, grpc_subchannel * csc) -{ - ready_list *new_elem = gpr_malloc (sizeof (ready_list)); +static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, + grpc_subchannel *csc) { + ready_list *new_elem = gpr_malloc(sizeof(ready_list)); new_elem->subchannel = csc; - if (p->ready_list.prev == NULL) - { - /* first element */ - new_elem->next = &p->ready_list; - new_elem->prev = &p->ready_list; - p->ready_list.next = new_elem; - p->ready_list.prev = new_elem; - } - else - { - new_elem->next = &p->ready_list; - new_elem->prev = p->ready_list.prev; - p->ready_list.prev->next = new_elem; - p->ready_list.prev = new_elem; - } - if (grpc_lb_round_robin_trace) - { - gpr_log (GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc); - } + if (p->ready_list.prev == NULL) { + /* first element */ + new_elem->next = &p->ready_list; + new_elem->prev = &p->ready_list; + p->ready_list.next = new_elem; + p->ready_list.prev = new_elem; + } else { + new_elem->next = &p->ready_list; + new_elem->prev = p->ready_list.prev; + p->ready_list.prev->next = new_elem; + p->ready_list.prev = new_elem; + } + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc); + } return new_elem; } /** Removes \a node from the list of connected subchannels */ -static void -remove_disconnected_sc_locked (round_robin_lb_policy * p, ready_list * node) -{ - if (node == NULL) - { - return; - } - if (node == p->ready_list_last_pick) - { - /* If removing the lastly picked node, reset the last pick pointer to the - * dummy root of the list */ - p->ready_list_last_pick = &p->ready_list; - } +static void remove_disconnected_sc_locked(round_robin_lb_policy *p, + ready_list *node) { + if (node == NULL) { + return; + } + if (node == p->ready_list_last_pick) { + /* If removing the lastly picked node, reset the last pick pointer to the + * dummy root of the list */ + p->ready_list_last_pick = &p->ready_list; + } /* removing last item */ - if (node->next == &p->ready_list && node->prev == &p->ready_list) - { - GPR_ASSERT (p->ready_list.next == node); - GPR_ASSERT (p->ready_list.prev == node); - p->ready_list.next = NULL; - p->ready_list.prev = NULL; - } - else - { - node->prev->next = node->next; - node->next->prev = node->prev; - } - - if (grpc_lb_round_robin_trace) - { - gpr_log (GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, node->subchannel); - } + if (node->next == &p->ready_list && node->prev == &p->ready_list) { + GPR_ASSERT(p->ready_list.next == node); + GPR_ASSERT(p->ready_list.prev == node); + p->ready_list.next = NULL; + p->ready_list.prev = NULL; + } else { + node->prev->next = node->next; + node->next->prev = node->prev; + } + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, + node->subchannel); + } node->next = NULL; node->prev = NULL; node->subchannel = NULL; - gpr_free (node); + gpr_free(node); } -static void -del_interested_parties_locked (grpc_exec_ctx * exec_ctx, round_robin_lb_policy * p, const size_t subchannel_idx) -{ +static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx, + round_robin_lb_policy *p, + const size_t subchannel_idx) { pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) - { - grpc_subchannel_del_interested_party (exec_ctx, p->subchannels[subchannel_idx], pp->pollset); - } + for (pp = p->pending_picks; pp; pp = pp->next) { + grpc_subchannel_del_interested_party( + exec_ctx, p->subchannels[subchannel_idx], pp->pollset); + } } -void -rr_destroy (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) -{ - round_robin_lb_policy *p = (round_robin_lb_policy *) pol; +void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; size_t i; ready_list *elem; - for (i = 0; i < p->num_subchannels; i++) - { - del_interested_parties_locked (exec_ctx, p, i); - } - for (i = 0; i < p->num_subchannels; i++) - { - GRPC_SUBCHANNEL_UNREF (exec_ctx, p->subchannels[i], "round_robin"); - } - gpr_free (p->connectivity_changed_cbs); - gpr_free (p->subchannel_connectivity); - - grpc_connectivity_state_destroy (exec_ctx, &p->state_tracker); - gpr_free (p->subchannels); - gpr_mu_destroy (&p->mu); + for (i = 0; i < p->num_subchannels; i++) { + del_interested_parties_locked(exec_ctx, p, i); + } + for (i = 0; i < p->num_subchannels; i++) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "round_robin"); + } + gpr_free(p->connectivity_changed_cbs); + gpr_free(p->subchannel_connectivity); + + grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + gpr_free(p->subchannels); + gpr_mu_destroy(&p->mu); elem = p->ready_list.next; - while (elem != NULL && elem != &p->ready_list) - { - ready_list *tmp; - tmp = elem->next; - elem->next = NULL; - elem->prev = NULL; - elem->subchannel = NULL; - gpr_free (elem); - elem = tmp; - } - gpr_free (p->subchannel_index_to_readylist_node); - gpr_free (p->cb_args); - gpr_free (p); + while (elem != NULL && elem != &p->ready_list) { + ready_list *tmp; + tmp = elem->next; + elem->next = NULL; + elem->prev = NULL; + elem->subchannel = NULL; + gpr_free(elem); + elem = tmp; + } + gpr_free(p->subchannel_index_to_readylist_node); + gpr_free(p->cb_args); + gpr_free(p); } -void -rr_shutdown (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) -{ +void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { size_t i; - round_robin_lb_policy *p = (round_robin_lb_policy *) pol; + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; - gpr_mu_lock (&p->mu); + gpr_mu_lock(&p->mu); - for (i = 0; i < p->num_subchannels; i++) - { - del_interested_parties_locked (exec_ctx, p, i); - } + for (i = 0; i < p->num_subchannels; i++) { + del_interested_parties_locked(exec_ctx, p, i); + } p->shutdown = 1; - while ((pp = p->pending_picks)) - { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_exec_ctx_enqueue (exec_ctx, pp->on_complete, 0); - gpr_free (pp); - } - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); - gpr_mu_unlock (&p->mu); + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); + gpr_free(pp); + } + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); + gpr_mu_unlock(&p->mu); } -static void -start_picking (grpc_exec_ctx * exec_ctx, round_robin_lb_policy * p) -{ +static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { size_t i; p->started_picking = 1; - for (i = 0; i < p->num_subchannels; i++) - { - p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; - grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[i], &p->subchannel_connectivity[i], &p->connectivity_changed_cbs[i]); - GRPC_LB_POLICY_REF (&p->base, "round_robin_connectivity"); - } + for (i = 0; i < p->num_subchannels; i++) { + p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; + grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i], + &p->subchannel_connectivity[i], + &p->connectivity_changed_cbs[i]); + GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity"); + } } -void -rr_exit_idle (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) -{ - round_robin_lb_policy *p = (round_robin_lb_policy *) pol; - gpr_mu_lock (&p->mu); - if (!p->started_picking) - { - start_picking (exec_ctx, p); - } - gpr_mu_unlock (&p->mu); +void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (!p->started_picking) { + start_picking(exec_ctx, p); + } + gpr_mu_unlock(&p->mu); } -void -rr_pick (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete) -{ +void rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, + grpc_subchannel **target, grpc_closure *on_complete) { size_t i; - round_robin_lb_policy *p = (round_robin_lb_policy *) pol; + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; ready_list *selected; - gpr_mu_lock (&p->mu); - if ((selected = peek_next_connected_locked (p))) - { - gpr_mu_unlock (&p->mu); - *target = selected->subchannel; - if (grpc_lb_round_robin_trace) - { - gpr_log (GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); - } - /* only advance the last picked pointer if the selection was used */ - advance_last_picked_locked (p); - on_complete->cb (exec_ctx, on_complete->cb_arg, 1); + gpr_mu_lock(&p->mu); + if ((selected = peek_next_connected_locked(p))) { + gpr_mu_unlock(&p->mu); + *target = selected->subchannel; + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)", + selected->subchannel, selected); + } + /* only advance the last picked pointer if the selection was used */ + advance_last_picked_locked(p); + on_complete->cb(exec_ctx, on_complete->cb_arg, 1); + } else { + if (!p->started_picking) { + start_picking(exec_ctx, p); } - else - { - if (!p->started_picking) - { - start_picking (exec_ctx, p); - } - for (i = 0; i < p->num_subchannels; i++) - { - grpc_subchannel_add_interested_party (exec_ctx, p->subchannels[i], pollset); - } - pp = gpr_malloc (sizeof (*pp)); - pp->next = p->pending_picks; - pp->pollset = pollset; - pp->target = target; - pp->on_complete = on_complete; - p->pending_picks = pp; - gpr_mu_unlock (&p->mu); + for (i = 0; i < p->num_subchannels; i++) { + grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], + pollset); } + pp = gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->pollset = pollset; + pp->target = target; + pp->on_complete = on_complete; + p->pending_picks = pp; + gpr_mu_unlock(&p->mu); + } } -static void -rr_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { connectivity_changed_cb_arg *cb_arg = arg; round_robin_lb_policy *p = cb_arg->p; /* index over p->subchannels of this cb's subchannel */ @@ -380,194 +336,198 @@ rr_connectivity_changed (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) /* connectivity state of this cb's subchannel */ grpc_connectivity_state *this_connectivity; - gpr_mu_lock (&p->mu); + gpr_mu_lock(&p->mu); this_connectivity = &p->subchannel_connectivity[this_idx]; - if (p->shutdown) - { - unref = 1; - } - else - { - switch (*this_connectivity) - { - case GRPC_CHANNEL_READY: - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); - /* add the newly connected subchannel to the list of connected ones. - * Note that it goes to the "end of the line". */ - p->subchannel_index_to_readylist_node[this_idx] = add_connected_sc_locked (p, p->subchannels[this_idx]); - /* at this point we know there's at least one suitable subchannel. Go - * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - selected = peek_next_connected_locked (p); - if (p->pending_picks != NULL) - { - /* if the selected subchannel is going to be used for the pending - * picks, update the last picked pointer */ - advance_last_picked_locked (p); - } - while ((pp = p->pending_picks)) - { - p->pending_picks = pp->next; - *pp->target = selected->subchannel; - if (grpc_lb_round_robin_trace) - { - gpr_log (GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); - } - grpc_subchannel_del_interested_party (exec_ctx, selected->subchannel, pp->pollset); - grpc_exec_ctx_enqueue (exec_ctx, pp->on_complete, 1); - gpr_free (pp); - } - grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx]); - break; - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, *this_connectivity, "connecting_changed"); - grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx]); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - del_interested_parties_locked (exec_ctx, p, this_idx); - /* renew state notification */ - grpc_subchannel_notify_on_state_change (exec_ctx, p->subchannels[this_idx], this_connectivity, &p->connectivity_changed_cbs[this_idx]); - - /* remove from ready list if still present */ - if (p->subchannel_index_to_readylist_node[this_idx] != NULL) - { - remove_disconnected_sc_locked (p, p->subchannel_index_to_readylist_node[this_idx]); - p->subchannel_index_to_readylist_node[this_idx] = NULL; - } - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure"); - break; - case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked (exec_ctx, p, this_idx); - if (p->subchannel_index_to_readylist_node[this_idx] != NULL) - { - remove_disconnected_sc_locked (p, p->subchannel_index_to_readylist_node[this_idx]); - p->subchannel_index_to_readylist_node[this_idx] = NULL; - } - - GPR_SWAP (grpc_subchannel *, p->subchannels[this_idx], p->subchannels[p->num_subchannels - 1]); - p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF (exec_ctx, p->subchannels[p->num_subchannels], "round_robin"); - - if (p->num_subchannels == 0) - { - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "no_more_channels"); - while ((pp = p->pending_picks)) - { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_exec_ctx_enqueue (exec_ctx, pp->on_complete, 1); - gpr_free (pp); - } - unref = 1; - } - else - { - grpc_connectivity_state_set (exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "subchannel_failed"); - } - } /* switch */ - } /* !unref */ - - gpr_mu_unlock (&p->mu); - - if (unref) - { - GRPC_LB_POLICY_UNREF (exec_ctx, &p->base, "round_robin_connectivity"); - } + if (p->shutdown) { + unref = 1; + } else { + switch (*this_connectivity) { + case GRPC_CHANNEL_READY: + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_READY, "connecting_ready"); + /* add the newly connected subchannel to the list of connected ones. + * Note that it goes to the "end of the line". */ + p->subchannel_index_to_readylist_node[this_idx] = + add_connected_sc_locked(p, p->subchannels[this_idx]); + /* at this point we know there's at least one suitable subchannel. Go + * ahead and pick one and notify the pending suitors in + * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ + selected = peek_next_connected_locked(p); + if (p->pending_picks != NULL) { + /* if the selected subchannel is going to be used for the pending + * picks, update the last picked pointer */ + advance_last_picked_locked(p); + } + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = selected->subchannel; + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, + "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", + selected->subchannel, selected); + } + grpc_subchannel_del_interested_party(exec_ctx, selected->subchannel, + pp->pollset); + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + gpr_free(pp); + } + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[this_idx], this_connectivity, + &p->connectivity_changed_cbs[this_idx]); + break; + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_IDLE: + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + *this_connectivity, "connecting_changed"); + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[this_idx], this_connectivity, + &p->connectivity_changed_cbs[this_idx]); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + del_interested_parties_locked(exec_ctx, p, this_idx); + /* renew state notification */ + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[this_idx], this_connectivity, + &p->connectivity_changed_cbs[this_idx]); + + /* remove from ready list if still present */ + if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { + remove_disconnected_sc_locked( + p, p->subchannel_index_to_readylist_node[this_idx]); + p->subchannel_index_to_readylist_node[this_idx] = NULL; + } + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "connecting_transient_failure"); + break; + case GRPC_CHANNEL_FATAL_FAILURE: + del_interested_parties_locked(exec_ctx, p, this_idx); + if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { + remove_disconnected_sc_locked( + p, p->subchannel_index_to_readylist_node[this_idx]); + p->subchannel_index_to_readylist_node[this_idx] = NULL; + } + + GPR_SWAP(grpc_subchannel *, p->subchannels[this_idx], + p->subchannels[p->num_subchannels - 1]); + p->num_subchannels--; + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], + "round_robin"); + + if (p->num_subchannels == 0) { + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, + "no_more_channels"); + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); + gpr_free(pp); + } + unref = 1; + } else { + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "subchannel_failed"); + } + } /* switch */ + } /* !unref */ + + gpr_mu_unlock(&p->mu); + + if (unref) { + GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "round_robin_connectivity"); + } } -static void -rr_broadcast (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol, grpc_transport_op * op) -{ - round_robin_lb_policy *p = (round_robin_lb_policy *) pol; +static void rr_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_transport_op *op) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; size_t i; size_t n; grpc_subchannel **subchannels; - gpr_mu_lock (&p->mu); + gpr_mu_lock(&p->mu); n = p->num_subchannels; - subchannels = gpr_malloc (n * sizeof (*subchannels)); - for (i = 0; i < n; i++) - { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF (subchannels[i], "rr_broadcast"); - } - gpr_mu_unlock (&p->mu); - - for (i = 0; i < n; i++) - { - grpc_subchannel_process_transport_op (exec_ctx, subchannels[i], op); - GRPC_SUBCHANNEL_UNREF (exec_ctx, subchannels[i], "rr_broadcast"); - } - gpr_free (subchannels); + subchannels = gpr_malloc(n * sizeof(*subchannels)); + for (i = 0; i < n; i++) { + subchannels[i] = p->subchannels[i]; + GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast"); + } + gpr_mu_unlock(&p->mu); + + for (i = 0; i < n; i++) { + grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); + GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "rr_broadcast"); + } + gpr_free(subchannels); } -static grpc_connectivity_state -rr_check_connectivity (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol) -{ - round_robin_lb_policy *p = (round_robin_lb_policy *) pol; +static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; grpc_connectivity_state st; - gpr_mu_lock (&p->mu); - st = grpc_connectivity_state_check (&p->state_tracker); - gpr_mu_unlock (&p->mu); + gpr_mu_lock(&p->mu); + st = grpc_connectivity_state_check(&p->state_tracker); + gpr_mu_unlock(&p->mu); return st; } -static void -rr_notify_on_state_change (grpc_exec_ctx * exec_ctx, grpc_lb_policy * pol, grpc_connectivity_state * current, grpc_closure * notify) -{ - round_robin_lb_policy *p = (round_robin_lb_policy *) pol; - gpr_mu_lock (&p->mu); - grpc_connectivity_state_notify_on_state_change (exec_ctx, &p->state_tracker, current, notify); - gpr_mu_unlock (&p->mu); +static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + gpr_mu_lock(&p->mu); + grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, + current, notify); + gpr_mu_unlock(&p->mu); } static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, - rr_shutdown, - rr_pick, - rr_exit_idle, - rr_broadcast, - rr_check_connectivity, - rr_notify_on_state_change -}; - -static void -round_robin_factory_ref (grpc_lb_policy_factory * factory) -{ -} + rr_destroy, + rr_shutdown, + rr_pick, + rr_exit_idle, + rr_broadcast, + rr_check_connectivity, + rr_notify_on_state_change}; -static void -round_robin_factory_unref (grpc_lb_policy_factory * factory) -{ -} +static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} + +static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} -static grpc_lb_policy * -create_round_robin (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args) -{ +static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { size_t i; - round_robin_lb_policy *p = gpr_malloc (sizeof (*p)); - GPR_ASSERT (args->num_subchannels > 0); - memset (p, 0, sizeof (*p)); - grpc_lb_policy_init (&p->base, &round_robin_lb_policy_vtable); - p->subchannels = gpr_malloc (sizeof (grpc_subchannel *) * args->num_subchannels); + round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); + GPR_ASSERT(args->num_subchannels > 0); + memset(p, 0, sizeof(*p)); + grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); + p->subchannels = + gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); p->num_subchannels = args->num_subchannels; - grpc_connectivity_state_init (&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); - memcpy (p->subchannels, args->subchannels, sizeof (grpc_subchannel *) * args->num_subchannels); - - gpr_mu_init (&p->mu); - p->connectivity_changed_cbs = gpr_malloc (sizeof (grpc_closure) * args->num_subchannels); - p->subchannel_connectivity = gpr_malloc (sizeof (grpc_connectivity_state) * args->num_subchannels); - - p->cb_args = gpr_malloc (sizeof (connectivity_changed_cb_arg) * args->num_subchannels); - for (i = 0; i < args->num_subchannels; i++) - { - p->cb_args[i].subchannel_idx = i; - p->cb_args[i].p = p; - grpc_closure_init (&p->connectivity_changed_cbs[i], rr_connectivity_changed, &p->cb_args[i]); - } + grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, + "round_robin"); + memcpy(p->subchannels, args->subchannels, + sizeof(grpc_subchannel *) * args->num_subchannels); + + gpr_mu_init(&p->mu); + p->connectivity_changed_cbs = + gpr_malloc(sizeof(grpc_closure) * args->num_subchannels); + p->subchannel_connectivity = + gpr_malloc(sizeof(grpc_connectivity_state) * args->num_subchannels); + + p->cb_args = + gpr_malloc(sizeof(connectivity_changed_cb_arg) * args->num_subchannels); + for (i = 0; i < args->num_subchannels; i++) { + p->cb_args[i].subchannel_idx = i; + p->cb_args[i].p = p; + grpc_closure_init(&p->connectivity_changed_cbs[i], rr_connectivity_changed, + &p->cb_args[i]); + } /* The (dummy node) root of the ready list */ p->ready_list.subchannel = NULL; @@ -575,22 +535,20 @@ create_round_robin (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args p->ready_list.next = NULL; p->ready_list_last_pick = &p->ready_list; - p->subchannel_index_to_readylist_node = gpr_malloc (sizeof (grpc_subchannel *) * args->num_subchannels); - memset (p->subchannel_index_to_readylist_node, 0, sizeof (grpc_subchannel *) * args->num_subchannels); + p->subchannel_index_to_readylist_node = + gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); + memset(p->subchannel_index_to_readylist_node, 0, + sizeof(grpc_subchannel *) * args->num_subchannels); return &p->base; } static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = { - round_robin_factory_ref, round_robin_factory_unref, create_round_robin, - "round_robin" -}; + round_robin_factory_ref, round_robin_factory_unref, create_round_robin, + "round_robin"}; static grpc_lb_policy_factory round_robin_lb_policy_factory = { - &round_robin_factory_vtable -}; + &round_robin_factory_vtable}; -grpc_lb_policy_factory * -grpc_round_robin_lb_factory_create () -{ +grpc_lb_policy_factory *grpc_round_robin_lb_factory_create() { return &round_robin_lb_policy_factory; } diff --git a/src/core/client_config/lb_policies/round_robin.h b/src/core/client_config/lb_policies/round_robin.h index 213995aa3f..cf1f69c85f 100644 --- a/src/core/client_config/lb_policies/round_robin.h +++ b/src/core/client_config/lb_policies/round_robin.h @@ -41,6 +41,6 @@ extern int grpc_lb_round_robin_trace; #include "src/core/client_config/lb_policy_factory.h" /** Returns a load balancing factory for the round robin policy */ -grpc_lb_policy_factory *grpc_round_robin_lb_factory_create (); +grpc_lb_policy_factory *grpc_round_robin_lb_factory_create(); #endif diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 6a9cf66887..c955186f7f 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -33,74 +33,66 @@ #include "src/core/client_config/lb_policy.h" -void -grpc_lb_policy_init (grpc_lb_policy * policy, const grpc_lb_policy_vtable * vtable) -{ +void grpc_lb_policy_init(grpc_lb_policy *policy, + const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; - gpr_ref_init (&policy->refs, 1); + gpr_ref_init(&policy->refs, 1); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void -grpc_lb_policy_ref (grpc_lb_policy * policy, const char *file, int line, const char *reason) -{ - gpr_log (file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s", policy, (int) policy->refs.count, (int) policy->refs.count + 1, reason); +void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, + const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s", + policy, (int)policy->refs.count, (int)policy->refs.count + 1, reason); #else -void -grpc_lb_policy_ref (grpc_lb_policy * policy) -{ +void grpc_lb_policy_ref(grpc_lb_policy *policy) { #endif - gpr_ref (&policy->refs); + gpr_ref(&policy->refs); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void -grpc_lb_policy_unref (grpc_lb_policy * policy, grpc_closure_list * closure_list, const char *file, int line, const char *reason) -{ - gpr_log (file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s", policy, (int) policy->refs.count, (int) policy->refs.count - 1, reason); +void grpc_lb_policy_unref(grpc_lb_policy *policy, + grpc_closure_list *closure_list, const char *file, + int line, const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s", + policy, (int)policy->refs.count, (int)policy->refs.count - 1, reason); #else -void -grpc_lb_policy_unref (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy) -{ +void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { #endif - if (gpr_unref (&policy->refs)) - { - policy->vtable->destroy (exec_ctx, policy); - } + if (gpr_unref(&policy->refs)) { + policy->vtable->destroy(exec_ctx, policy); + } } -void -grpc_lb_policy_shutdown (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy) -{ - policy->vtable->shutdown (exec_ctx, policy); +void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { + policy->vtable->shutdown(exec_ctx, policy); } -void -grpc_lb_policy_pick (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete) -{ - policy->vtable->pick (exec_ctx, policy, pollset, initial_metadata, target, on_complete); +void grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, + grpc_subchannel **target, grpc_closure *on_complete) { + policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata, target, + on_complete); } -void -grpc_lb_policy_broadcast (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, grpc_transport_op * op) -{ - policy->vtable->broadcast (exec_ctx, policy, op); +void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_transport_op *op) { + policy->vtable->broadcast(exec_ctx, policy, op); } -void -grpc_lb_policy_exit_idle (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy) -{ - policy->vtable->exit_idle (exec_ctx, policy); +void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { + policy->vtable->exit_idle(exec_ctx, policy); } -void -grpc_lb_policy_notify_on_state_change (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, grpc_connectivity_state * state, grpc_closure * closure) -{ - policy->vtable->notify_on_state_change (exec_ctx, policy, state, closure); +void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_closure *closure) { + policy->vtable->notify_on_state_change(exec_ctx, policy, state, closure); } -grpc_connectivity_state -grpc_lb_policy_check_connectivity (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy) -{ - return policy->vtable->check_connectivity (exec_ctx, policy); +grpc_connectivity_state grpc_lb_policy_check_connectivity( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { + return policy->vtable->check_connectivity(exec_ctx, policy); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index e66dac15b0..0eefe64991 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -42,35 +42,41 @@ typedef struct grpc_lb_policy grpc_lb_policy; typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable; -typedef void (*grpc_lb_completion) (void *cb_arg, grpc_subchannel * subchannel, grpc_status_code status, const char *errmsg); +typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, + grpc_status_code status, const char *errmsg); -struct grpc_lb_policy -{ +struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_refcount refs; }; -struct grpc_lb_policy_vtable -{ - void (*destroy) (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy); +struct grpc_lb_policy_vtable { + void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - void (*shutdown) (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** implement grpc_lb_policy_pick */ - void (*pick) (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete); + void (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, + grpc_subchannel **target, grpc_closure *on_complete); /** try to enter a READY connectivity state */ - void (*exit_idle) (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy); + void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** broadcast a transport op to all subchannels */ - void (*broadcast) (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, grpc_transport_op * op); + void (*broadcast)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_transport_op *op); /** check the current connectivity of the lb_policy */ - grpc_connectivity_state (*check_connectivity) (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy); + grpc_connectivity_state (*check_connectivity)(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy); /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the policy */ - void (*notify_on_state_change) (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, grpc_connectivity_state * state, grpc_closure * closure); + void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_closure *closure); }; #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -78,33 +84,44 @@ struct grpc_lb_policy_vtable grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \ grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) -void grpc_lb_policy_ref (grpc_lb_policy * policy, const char *file, int line, const char *reason); -void grpc_lb_policy_unref (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, const char *file, int line, const char *reason); +void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, + const char *reason); +void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const char *file, int line, const char *reason); #else #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_UNREF(cl, p, r) grpc_lb_policy_unref((cl), (p)) -void grpc_lb_policy_ref (grpc_lb_policy * policy); -void grpc_lb_policy_unref (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy); +void grpc_lb_policy_ref(grpc_lb_policy *policy); +void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); #endif /** called by concrete implementations to initialize the base struct */ -void grpc_lb_policy_init (grpc_lb_policy * policy, const grpc_lb_policy_vtable * vtable); +void grpc_lb_policy_init(grpc_lb_policy *policy, + const grpc_lb_policy_vtable *vtable); /** Start shutting down (fail any pending picks) */ -void grpc_lb_policy_shutdown (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy); +void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** Given initial metadata in \a initial_metadata, find an appropriate target for this rpc, and 'return' it by calling \a on_complete after setting \a target. Picking can be asynchronous. Any IO should be done under \a pollset. */ -void grpc_lb_policy_pick (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete); +void grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, + grpc_subchannel **target, grpc_closure *on_complete); -void grpc_lb_policy_broadcast (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, grpc_transport_op * op); +void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_transport_op *op); -void grpc_lb_policy_exit_idle (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy); +void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); -void grpc_lb_policy_notify_on_state_change (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy, grpc_connectivity_state * state, grpc_closure * closure); +void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_closure *closure); -grpc_connectivity_state grpc_lb_policy_check_connectivity (grpc_exec_ctx * exec_ctx, grpc_lb_policy * policy); +grpc_connectivity_state grpc_lb_policy_check_connectivity( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */ diff --git a/src/core/client_config/lb_policy_factory.c b/src/core/client_config/lb_policy_factory.c index 9d202d0c36..e49de544e3 100644 --- a/src/core/client_config/lb_policy_factory.c +++ b/src/core/client_config/lb_policy_factory.c @@ -33,22 +33,16 @@ #include "src/core/client_config/lb_policy_factory.h" -void -grpc_lb_policy_factory_ref (grpc_lb_policy_factory * factory) -{ - factory->vtable->ref (factory); +void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) { + factory->vtable->ref(factory); } -void -grpc_lb_policy_factory_unref (grpc_lb_policy_factory * factory) -{ - factory->vtable->unref (factory); +void grpc_lb_policy_factory_unref(grpc_lb_policy_factory* factory) { + factory->vtable->unref(factory); } -grpc_lb_policy * -grpc_lb_policy_factory_create_lb_policy (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args) -{ - if (factory == NULL) - return NULL; - return factory->vtable->create_lb_policy (factory, args); +grpc_lb_policy* grpc_lb_policy_factory_create_lb_policy( + grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) { + if (factory == NULL) return NULL; + return factory->vtable->create_lb_policy(factory, args); } diff --git a/src/core/client_config/lb_policy_factory.h b/src/core/client_config/lb_policy_factory.h index a00b5142dc..04610316ee 100644 --- a/src/core/client_config/lb_policy_factory.h +++ b/src/core/client_config/lb_policy_factory.h @@ -42,33 +42,32 @@ typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; /** grpc_lb_policy provides grpc_client_config objects to grpc_channel objects */ -struct grpc_lb_policy_factory -{ +struct grpc_lb_policy_factory { const grpc_lb_policy_factory_vtable *vtable; }; -typedef struct grpc_lb_policy_args -{ +typedef struct grpc_lb_policy_args { grpc_subchannel **subchannels; size_t num_subchannels; } grpc_lb_policy_args; -struct grpc_lb_policy_factory_vtable -{ - void (*ref) (grpc_lb_policy_factory * factory); - void (*unref) (grpc_lb_policy_factory * factory); +struct grpc_lb_policy_factory_vtable { + void (*ref)(grpc_lb_policy_factory *factory); + void (*unref)(grpc_lb_policy_factory *factory); /** Implementation of grpc_lb_policy_factory_create_lb_policy */ - grpc_lb_policy *(*create_lb_policy) (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args); + grpc_lb_policy *(*create_lb_policy)(grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args); /** Name for the LB policy this factory implements */ const char *name; }; -void grpc_lb_policy_factory_ref (grpc_lb_policy_factory * factory); -void grpc_lb_policy_factory_unref (grpc_lb_policy_factory * factory); +void grpc_lb_policy_factory_ref(grpc_lb_policy_factory *factory); +void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory); /** Create a lb_policy instance. */ -grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args); +grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args); #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_FACTORY_H */ diff --git a/src/core/client_config/lb_policy_registry.c b/src/core/client_config/lb_policy_registry.c index 4d7cc7c128..fc302e82d7 100644 --- a/src/core/client_config/lb_policy_registry.c +++ b/src/core/client_config/lb_policy_registry.c @@ -42,59 +42,47 @@ static int g_number_of_lb_policies = 0; static grpc_lb_policy_factory *g_default_lb_policy_factory; -void -grpc_lb_policy_registry_init (grpc_lb_policy_factory * default_factory) -{ +void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory) { g_number_of_lb_policies = 0; g_default_lb_policy_factory = default_factory; } -void -grpc_lb_policy_registry_shutdown (void) -{ +void grpc_lb_policy_registry_shutdown(void) { int i; - for (i = 0; i < g_number_of_lb_policies; i++) - { - grpc_lb_policy_factory_unref (g_all_of_the_lb_policies[i]); - } + for (i = 0; i < g_number_of_lb_policies; i++) { + grpc_lb_policy_factory_unref(g_all_of_the_lb_policies[i]); + } } -void -grpc_register_lb_policy (grpc_lb_policy_factory * factory) -{ +void grpc_register_lb_policy(grpc_lb_policy_factory *factory) { int i; - for (i = 0; i < g_number_of_lb_policies; i++) - { - GPR_ASSERT (0 != strcmp (factory->vtable->name, g_all_of_the_lb_policies[i]->vtable->name)); - } - GPR_ASSERT (g_number_of_lb_policies != MAX_POLICIES); - grpc_lb_policy_factory_ref (factory); + for (i = 0; i < g_number_of_lb_policies; i++) { + GPR_ASSERT(0 != strcmp(factory->vtable->name, + g_all_of_the_lb_policies[i]->vtable->name)); + } + GPR_ASSERT(g_number_of_lb_policies != MAX_POLICIES); + grpc_lb_policy_factory_ref(factory); g_all_of_the_lb_policies[g_number_of_lb_policies++] = factory; } -static grpc_lb_policy_factory * -lookup_factory (const char *name) -{ +static grpc_lb_policy_factory *lookup_factory(const char *name) { int i; - if (name == NULL) - return NULL; + if (name == NULL) return NULL; - for (i = 0; i < g_number_of_lb_policies; i++) - { - if (0 == strcmp (name, g_all_of_the_lb_policies[i]->vtable->name)) - { - return g_all_of_the_lb_policies[i]; - } + for (i = 0; i < g_number_of_lb_policies; i++) { + if (0 == strcmp(name, g_all_of_the_lb_policies[i]->vtable->name)) { + return g_all_of_the_lb_policies[i]; } + } return NULL; } -grpc_lb_policy * -grpc_lb_policy_create (const char *name, grpc_lb_policy_args * args) -{ - grpc_lb_policy_factory *factory = lookup_factory (name); - grpc_lb_policy *lb_policy = grpc_lb_policy_factory_create_lb_policy (factory, args); +grpc_lb_policy *grpc_lb_policy_create(const char *name, + grpc_lb_policy_args *args) { + grpc_lb_policy_factory *factory = lookup_factory(name); + grpc_lb_policy *lb_policy = + grpc_lb_policy_factory_create_lb_policy(factory, args); return lb_policy; } diff --git a/src/core/client_config/lb_policy_registry.h b/src/core/client_config/lb_policy_registry.h index dea0cfc0fd..96fc2a1628 100644 --- a/src/core/client_config/lb_policy_registry.h +++ b/src/core/client_config/lb_policy_registry.h @@ -38,16 +38,17 @@ /** Initialize the registry and set \a default_factory as the factory to be * returned when no name is provided in a lookup */ -void grpc_lb_policy_registry_init (grpc_lb_policy_factory * default_factory); -void grpc_lb_policy_registry_shutdown (void); +void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory); +void grpc_lb_policy_registry_shutdown(void); /** Register a LB policy factory. */ -void grpc_register_lb_policy (grpc_lb_policy_factory * factory); +void grpc_register_lb_policy(grpc_lb_policy_factory *factory); /** Create a \a grpc_lb_policy instance. * * If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init * will be returned. */ -grpc_lb_policy *grpc_lb_policy_create (const char *name, grpc_lb_policy_args * args); +grpc_lb_policy *grpc_lb_policy_create(const char *name, + grpc_lb_policy_args *args); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */ diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c index a55daad5a2..081097eb19 100644 --- a/src/core/client_config/resolver.c +++ b/src/core/client_config/resolver.c @@ -33,56 +33,53 @@ #include "src/core/client_config/resolver.h" -void -grpc_resolver_init (grpc_resolver * resolver, const grpc_resolver_vtable * vtable) -{ +void grpc_resolver_init(grpc_resolver *resolver, + const grpc_resolver_vtable *vtable) { resolver->vtable = vtable; - gpr_ref_init (&resolver->refs, 1); + gpr_ref_init(&resolver->refs, 1); } #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG -void -grpc_resolver_ref (grpc_resolver * resolver, grpc_closure_list * closure_list, const char *file, int line, const char *reason) -{ - gpr_log (file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p ref %d -> %d %s", resolver, (int) resolver->refs.count, (int) resolver->refs.count + 1, reason); +void grpc_resolver_ref(grpc_resolver *resolver, grpc_closure_list *closure_list, + const char *file, int line, const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p ref %d -> %d %s", + resolver, (int)resolver->refs.count, (int)resolver->refs.count + 1, + reason); #else -void -grpc_resolver_ref (grpc_resolver * resolver) -{ +void grpc_resolver_ref(grpc_resolver *resolver) { #endif - gpr_ref (&resolver->refs); + gpr_ref(&resolver->refs); } #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG -void -grpc_resolver_unref (grpc_resolver * resolver, grpc_closure_list * closure_list, const char *file, int line, const char *reason) -{ - gpr_log (file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p unref %d -> %d %s", resolver, (int) resolver->refs.count, (int) resolver->refs.count - 1, reason); +void grpc_resolver_unref(grpc_resolver *resolver, + grpc_closure_list *closure_list, const char *file, + int line, const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p unref %d -> %d %s", + resolver, (int)resolver->refs.count, (int)resolver->refs.count - 1, + reason); #else -void -grpc_resolver_unref (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver) -{ +void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { #endif - if (gpr_unref (&resolver->refs)) - { - resolver->vtable->destroy (exec_ctx, resolver); - } + if (gpr_unref(&resolver->refs)) { + resolver->vtable->destroy(exec_ctx, resolver); + } } -void -grpc_resolver_shutdown (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver) -{ - resolver->vtable->shutdown (exec_ctx, resolver); +void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { + resolver->vtable->shutdown(exec_ctx, resolver); } -void -grpc_resolver_channel_saw_error (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, struct sockaddr *failing_address, int failing_address_len) -{ - resolver->vtable->channel_saw_error (exec_ctx, resolver, failing_address, failing_address_len); +void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, + struct sockaddr *failing_address, + int failing_address_len) { + resolver->vtable->channel_saw_error(exec_ctx, resolver, failing_address, + failing_address_len); } -void -grpc_resolver_next (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete) -{ - resolver->vtable->next (exec_ctx, resolver, target_config, on_complete); +void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_client_config **target_config, + grpc_closure *on_complete) { + resolver->vtable->next(exec_ctx, resolver, target_config, on_complete); } diff --git a/src/core/client_config/resolver.h b/src/core/client_config/resolver.h index b7de00a143..7ba0cd5bd4 100644 --- a/src/core/client_config/resolver.h +++ b/src/core/client_config/resolver.h @@ -43,40 +43,47 @@ typedef struct grpc_resolver_vtable grpc_resolver_vtable; /** grpc_resolver provides grpc_client_config objects to grpc_channel objects */ -struct grpc_resolver -{ +struct grpc_resolver { const grpc_resolver_vtable *vtable; gpr_refcount refs; }; -struct grpc_resolver_vtable -{ - void (*destroy) (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver); - void (*shutdown) (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver); - void (*channel_saw_error) (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, struct sockaddr * failing_address, int failing_address_len); - void (*next) (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete); +struct grpc_resolver_vtable { + void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); + void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + struct sockaddr *failing_address, + int failing_address_len); + void (*next)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_client_config **target_config, grpc_closure *on_complete); }; #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG #define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p), __FILE__, __LINE__, (r)) #define GRPC_RESOLVER_UNREF(cl, p, r) \ grpc_resolver_unref((cl), (p), __FILE__, __LINE__, (r)) -void grpc_resolver_ref (grpc_resolver * policy, const char *file, int line, const char *reason); -void grpc_resolver_unref (grpc_resolver * policy, grpc_closure_list * closure_list, const char *file, int line, const char *reason); +void grpc_resolver_ref(grpc_resolver *policy, const char *file, int line, + const char *reason); +void grpc_resolver_unref(grpc_resolver *policy, grpc_closure_list *closure_list, + const char *file, int line, const char *reason); #else #define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p)) #define GRPC_RESOLVER_UNREF(cl, p, r) grpc_resolver_unref((cl), (p)) -void grpc_resolver_ref (grpc_resolver * policy); -void grpc_resolver_unref (grpc_exec_ctx * exec_ctx, grpc_resolver * policy); +void grpc_resolver_ref(grpc_resolver *policy); +void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *policy); #endif -void grpc_resolver_init (grpc_resolver * resolver, const grpc_resolver_vtable * vtable); +void grpc_resolver_init(grpc_resolver *resolver, + const grpc_resolver_vtable *vtable); -void grpc_resolver_shutdown (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver); +void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); /** Notification that the channel has seen an error on some address. Can be used as a hint that re-resolution is desirable soon. */ -void grpc_resolver_channel_saw_error (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, struct sockaddr *failing_address, int failing_address_len); +void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, + struct sockaddr *failing_address, + int failing_address_len); /** Get the next client config. Called by the channel to fetch a new configuration. Expected to set *target_config with a new configuration, @@ -84,6 +91,8 @@ void grpc_resolver_channel_saw_error (grpc_exec_ctx * exec_ctx, grpc_resolver * If resolution is fatally broken, set *target_config to NULL and schedule on_complete. */ -void grpc_resolver_next (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete); +void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_client_config **target_config, + grpc_closure *on_complete); #endif /* GRPC_INTERNAL_CORE_CONFIG_RESOLVER_H */ diff --git a/src/core/client_config/resolver_factory.c b/src/core/client_config/resolver_factory.c index d4f8201918..e7e9196ac4 100644 --- a/src/core/client_config/resolver_factory.c +++ b/src/core/client_config/resolver_factory.c @@ -33,31 +33,23 @@ #include "src/core/client_config/resolver_factory.h" -void -grpc_resolver_factory_ref (grpc_resolver_factory * factory) -{ - factory->vtable->ref (factory); +void grpc_resolver_factory_ref(grpc_resolver_factory* factory) { + factory->vtable->ref(factory); } -void -grpc_resolver_factory_unref (grpc_resolver_factory * factory) -{ - factory->vtable->unref (factory); +void grpc_resolver_factory_unref(grpc_resolver_factory* factory) { + factory->vtable->unref(factory); } /** Create a resolver instance for a name */ -grpc_resolver * -grpc_resolver_factory_create_resolver (grpc_resolver_factory * factory, grpc_resolver_args * args) -{ - if (factory == NULL) - return NULL; - return factory->vtable->create_resolver (factory, args); +grpc_resolver* grpc_resolver_factory_create_resolver( + grpc_resolver_factory* factory, grpc_resolver_args* args) { + if (factory == NULL) return NULL; + return factory->vtable->create_resolver(factory, args); } -char * -grpc_resolver_factory_get_default_authority (grpc_resolver_factory * factory, grpc_uri * uri) -{ - if (factory == NULL) - return NULL; - return factory->vtable->get_default_authority (factory, uri); +char* grpc_resolver_factory_get_default_authority( + grpc_resolver_factory* factory, grpc_uri* uri) { + if (factory == NULL) return NULL; + return factory->vtable->get_default_authority(factory, uri); } diff --git a/src/core/client_config/resolver_factory.h b/src/core/client_config/resolver_factory.h index 920776886a..4c4df353f7 100644 --- a/src/core/client_config/resolver_factory.h +++ b/src/core/client_config/resolver_factory.h @@ -43,40 +43,40 @@ typedef struct grpc_resolver_factory_vtable grpc_resolver_factory_vtable; /** grpc_resolver provides grpc_client_config objects to grpc_channel objects */ -struct grpc_resolver_factory -{ +struct grpc_resolver_factory { const grpc_resolver_factory_vtable *vtable; }; -typedef struct grpc_resolver_args -{ +typedef struct grpc_resolver_args { grpc_uri *uri; grpc_subchannel_factory *subchannel_factory; } grpc_resolver_args; -struct grpc_resolver_factory_vtable -{ - void (*ref) (grpc_resolver_factory * factory); - void (*unref) (grpc_resolver_factory * factory); +struct grpc_resolver_factory_vtable { + void (*ref)(grpc_resolver_factory *factory); + void (*unref)(grpc_resolver_factory *factory); /** Implementation of grpc_resolver_factory_create_resolver */ - grpc_resolver *(*create_resolver) (grpc_resolver_factory * factory, grpc_resolver_args * args); + grpc_resolver *(*create_resolver)(grpc_resolver_factory *factory, + grpc_resolver_args *args); /** Implementation of grpc_resolver_factory_get_default_authority */ - char *(*get_default_authority) (grpc_resolver_factory * factory, grpc_uri * uri); + char *(*get_default_authority)(grpc_resolver_factory *factory, grpc_uri *uri); /** URI scheme that this factory implements */ const char *scheme; }; -void grpc_resolver_factory_ref (grpc_resolver_factory * resolver); -void grpc_resolver_factory_unref (grpc_resolver_factory * resolver); +void grpc_resolver_factory_ref(grpc_resolver_factory *resolver); +void grpc_resolver_factory_unref(grpc_resolver_factory *resolver); /** Create a resolver instance for a name */ -grpc_resolver *grpc_resolver_factory_create_resolver (grpc_resolver_factory * factory, grpc_resolver_args * args); +grpc_resolver *grpc_resolver_factory_create_resolver( + grpc_resolver_factory *factory, grpc_resolver_args *args); /** Return a (freshly allocated with gpr_malloc) string representing the default authority to use for this scheme. */ -char *grpc_resolver_factory_get_default_authority (grpc_resolver_factory * factory, grpc_uri * uri); +char *grpc_resolver_factory_get_default_authority( + grpc_resolver_factory *factory, grpc_uri *uri); #endif /* GRPC_INTERNAL_CORE_CONFIG_RESOLVER_FACTORY_H */ diff --git a/src/core/client_config/resolver_registry.c b/src/core/client_config/resolver_registry.c index 1ebf43048e..89a945c2d3 100644 --- a/src/core/client_config/resolver_registry.c +++ b/src/core/client_config/resolver_registry.c @@ -46,112 +46,92 @@ static int g_number_of_resolvers = 0; static char *g_default_resolver_prefix; -void -grpc_resolver_registry_init (const char *default_resolver_prefix) -{ +void grpc_resolver_registry_init(const char *default_resolver_prefix) { g_number_of_resolvers = 0; - g_default_resolver_prefix = gpr_strdup (default_resolver_prefix); + g_default_resolver_prefix = gpr_strdup(default_resolver_prefix); } -void -grpc_resolver_registry_shutdown (void) -{ +void grpc_resolver_registry_shutdown(void) { int i; - for (i = 0; i < g_number_of_resolvers; i++) - { - grpc_resolver_factory_unref (g_all_of_the_resolvers[i]); - } - gpr_free (g_default_resolver_prefix); + for (i = 0; i < g_number_of_resolvers; i++) { + grpc_resolver_factory_unref(g_all_of_the_resolvers[i]); + } + gpr_free(g_default_resolver_prefix); } -void -grpc_register_resolver_type (grpc_resolver_factory * factory) -{ +void grpc_register_resolver_type(grpc_resolver_factory *factory) { int i; - for (i = 0; i < g_number_of_resolvers; i++) - { - GPR_ASSERT (0 != strcmp (factory->vtable->scheme, g_all_of_the_resolvers[i]->vtable->scheme)); - } - GPR_ASSERT (g_number_of_resolvers != MAX_RESOLVERS); - grpc_resolver_factory_ref (factory); + for (i = 0; i < g_number_of_resolvers; i++) { + GPR_ASSERT(0 != strcmp(factory->vtable->scheme, + g_all_of_the_resolvers[i]->vtable->scheme)); + } + GPR_ASSERT(g_number_of_resolvers != MAX_RESOLVERS); + grpc_resolver_factory_ref(factory); g_all_of_the_resolvers[g_number_of_resolvers++] = factory; } -static grpc_resolver_factory * -lookup_factory (grpc_uri * uri) -{ +static grpc_resolver_factory *lookup_factory(grpc_uri *uri) { int i; /* handling NULL uri's here simplifies grpc_resolver_create */ - if (!uri) - return NULL; - - for (i = 0; i < g_number_of_resolvers; i++) - { - if (0 == strcmp (uri->scheme, g_all_of_the_resolvers[i]->vtable->scheme)) - { - return g_all_of_the_resolvers[i]; - } + if (!uri) return NULL; + + for (i = 0; i < g_number_of_resolvers; i++) { + if (0 == strcmp(uri->scheme, g_all_of_the_resolvers[i]->vtable->scheme)) { + return g_all_of_the_resolvers[i]; } + } return NULL; } -static grpc_resolver_factory * -resolve_factory (const char *target, grpc_uri ** uri) -{ +static grpc_resolver_factory *resolve_factory(const char *target, + grpc_uri **uri) { char *tmp; grpc_resolver_factory *factory = NULL; - GPR_ASSERT (uri != NULL); - *uri = grpc_uri_parse (target, 1); - factory = lookup_factory (*uri); - if (factory == NULL) - { - if (g_default_resolver_prefix != NULL) - { - grpc_uri_destroy (*uri); - gpr_asprintf (&tmp, "%s%s", g_default_resolver_prefix, target); - *uri = grpc_uri_parse (tmp, 1); - factory = lookup_factory (*uri); - if (factory == NULL) - { - grpc_uri_destroy (grpc_uri_parse (target, 0)); - grpc_uri_destroy (grpc_uri_parse (tmp, 0)); - gpr_log (GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, tmp); - } - gpr_free (tmp); - } - else - { - grpc_uri_destroy (grpc_uri_parse (target, 0)); - gpr_log (GPR_ERROR, "don't know how to resolve '%s'", target); - } + GPR_ASSERT(uri != NULL); + *uri = grpc_uri_parse(target, 1); + factory = lookup_factory(*uri); + if (factory == NULL) { + if (g_default_resolver_prefix != NULL) { + grpc_uri_destroy(*uri); + gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target); + *uri = grpc_uri_parse(tmp, 1); + factory = lookup_factory(*uri); + if (factory == NULL) { + grpc_uri_destroy(grpc_uri_parse(target, 0)); + grpc_uri_destroy(grpc_uri_parse(tmp, 0)); + gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, + tmp); + } + gpr_free(tmp); + } else { + grpc_uri_destroy(grpc_uri_parse(target, 0)); + gpr_log(GPR_ERROR, "don't know how to resolve '%s'", target); } + } return factory; } -grpc_resolver * -grpc_resolver_create (const char *target, grpc_subchannel_factory * subchannel_factory) -{ +grpc_resolver *grpc_resolver_create( + const char *target, grpc_subchannel_factory *subchannel_factory) { grpc_uri *uri = NULL; - grpc_resolver_factory *factory = resolve_factory (target, &uri); + grpc_resolver_factory *factory = resolve_factory(target, &uri); grpc_resolver *resolver; grpc_resolver_args args; - memset (&args, 0, sizeof (args)); + memset(&args, 0, sizeof(args)); args.uri = uri; args.subchannel_factory = subchannel_factory; - resolver = grpc_resolver_factory_create_resolver (factory, &args); - grpc_uri_destroy (uri); + resolver = grpc_resolver_factory_create_resolver(factory, &args); + grpc_uri_destroy(uri); return resolver; } -char * -grpc_get_default_authority (const char *target) -{ +char *grpc_get_default_authority(const char *target) { grpc_uri *uri = NULL; - grpc_resolver_factory *factory = resolve_factory (target, &uri); - char *authority = grpc_resolver_factory_get_default_authority (factory, uri); - grpc_uri_destroy (uri); + grpc_resolver_factory *factory = resolve_factory(target, &uri); + char *authority = grpc_resolver_factory_get_default_authority(factory, uri); + grpc_uri_destroy(uri); return authority; } diff --git a/src/core/client_config/resolver_registry.h b/src/core/client_config/resolver_registry.h index d52e50fd13..5a7193b7ae 100644 --- a/src/core/client_config/resolver_registry.h +++ b/src/core/client_config/resolver_registry.h @@ -36,15 +36,15 @@ #include "src/core/client_config/resolver_factory.h" -void grpc_resolver_registry_init (const char *default_prefix); -void grpc_resolver_registry_shutdown (void); +void grpc_resolver_registry_init(const char *default_prefix); +void grpc_resolver_registry_shutdown(void); /** Register a resolver type. URI's of \a scheme will be resolved with the given resolver. If \a priority is greater than zero, then the resolver will be eligible to resolve names that are passed in with no scheme. Higher priority resolvers will be tried before lower priority schemes. */ -void grpc_register_resolver_type (grpc_resolver_factory * factory); +void grpc_register_resolver_type(grpc_resolver_factory *factory); /** Create a resolver given \a target. First tries to parse \a target as a URI. If this succeeds, tries @@ -55,10 +55,11 @@ void grpc_register_resolver_type (grpc_resolver_factory * factory); If a resolver factory was found, use it to instantiate a resolver and return it. If a resolver factory was not found, return NULL. */ -grpc_resolver *grpc_resolver_create (const char *target, grpc_subchannel_factory * subchannel_factory); +grpc_resolver *grpc_resolver_create( + const char *target, grpc_subchannel_factory *subchannel_factory); /** Given a target, return a (freshly allocated with gpr_malloc) string representing the default authority to pass from a client. */ -char *grpc_get_default_authority (const char *target); +char *grpc_get_default_authority(const char *target); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVER_REGISTRY_H */ diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index dc8151311a..7f9dd2543f 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -44,8 +44,7 @@ #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" -typedef struct -{ +typedef struct { /** base class: must be first */ grpc_resolver base; /** refcount */ @@ -75,175 +74,160 @@ typedef struct grpc_client_config *resolved_config; } dns_resolver; -static void dns_destroy (grpc_exec_ctx * exec_ctx, grpc_resolver * r); +static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_start_resolving_locked (dns_resolver * r); -static void dns_maybe_finish_next_locked (grpc_exec_ctx * exec_ctx, dns_resolver * r); +static void dns_start_resolving_locked(dns_resolver *r); +static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + dns_resolver *r); -static void dns_shutdown (grpc_exec_ctx * exec_ctx, grpc_resolver * r); -static void dns_channel_saw_error (grpc_exec_ctx * exec_ctx, grpc_resolver * r, struct sockaddr *failing_address, int failing_address_len); -static void dns_next (grpc_exec_ctx * exec_ctx, grpc_resolver * r, grpc_client_config ** target_config, grpc_closure * on_complete); +static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + struct sockaddr *failing_address, + int failing_address_len); +static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_client_config **target_config, + grpc_closure *on_complete); static const grpc_resolver_vtable dns_resolver_vtable = { - dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next -}; + dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next}; -static void -dns_shutdown (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver) -{ - dns_resolver *r = (dns_resolver *) resolver; - gpr_mu_lock (&r->mu); - if (r->next_completion != NULL) - { - *r->target_config = NULL; - grpc_exec_ctx_enqueue (exec_ctx, r->next_completion, 1); - r->next_completion = NULL; - } - gpr_mu_unlock (&r->mu); +static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { + dns_resolver *r = (dns_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (r->next_completion != NULL) { + *r->target_config = NULL; + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + r->next_completion = NULL; + } + gpr_mu_unlock(&r->mu); } -static void -dns_channel_saw_error (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, struct sockaddr *sa, int len) -{ - dns_resolver *r = (dns_resolver *) resolver; - gpr_mu_lock (&r->mu); - if (!r->resolving) - { - dns_start_resolving_locked (r); - } - gpr_mu_unlock (&r->mu); +static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, struct sockaddr *sa, + int len) { + dns_resolver *r = (dns_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (!r->resolving) { + dns_start_resolving_locked(r); + } + gpr_mu_unlock(&r->mu); } -static void -dns_next (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete) -{ - dns_resolver *r = (dns_resolver *) resolver; - gpr_mu_lock (&r->mu); - GPR_ASSERT (!r->next_completion); +static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_client_config **target_config, + grpc_closure *on_complete) { + dns_resolver *r = (dns_resolver *)resolver; + gpr_mu_lock(&r->mu); + GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_config = target_config; - if (r->resolved_version == 0 && !r->resolving) - { - dns_start_resolving_locked (r); - } - else - { - dns_maybe_finish_next_locked (exec_ctx, r); - } - gpr_mu_unlock (&r->mu); + if (r->resolved_version == 0 && !r->resolving) { + dns_start_resolving_locked(r); + } else { + dns_maybe_finish_next_locked(exec_ctx, r); + } + gpr_mu_unlock(&r->mu); } -static void -dns_on_resolved (grpc_exec_ctx * exec_ctx, void *arg, grpc_resolved_addresses * addresses) -{ +static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, + grpc_resolved_addresses *addresses) { dns_resolver *r = arg; grpc_client_config *config = NULL; grpc_subchannel **subchannels; grpc_subchannel_args args; grpc_lb_policy *lb_policy; size_t i; - if (addresses) - { - grpc_lb_policy_args lb_policy_args; - config = grpc_client_config_create (); - subchannels = gpr_malloc (sizeof (grpc_subchannel *) * addresses->naddrs); - for (i = 0; i < addresses->naddrs; i++) - { - memset (&args, 0, sizeof (args)); - args.addr = (struct sockaddr *) (addresses->addrs[i].addr); - args.addr_len = (size_t) addresses->addrs[i].len; - subchannels[i] = grpc_subchannel_factory_create_subchannel (exec_ctx, r->subchannel_factory, &args); - } - memset (&lb_policy_args, 0, sizeof (lb_policy_args)); - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = addresses->naddrs; - lb_policy = grpc_lb_policy_create (r->lb_policy_name, &lb_policy_args); - grpc_client_config_set_lb_policy (config, lb_policy); - GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "construction"); - grpc_resolved_addresses_destroy (addresses); - gpr_free (subchannels); + if (addresses) { + grpc_lb_policy_args lb_policy_args; + config = grpc_client_config_create(); + subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); + for (i = 0; i < addresses->naddrs; i++) { + memset(&args, 0, sizeof(args)); + args.addr = (struct sockaddr *)(addresses->addrs[i].addr); + args.addr_len = (size_t)addresses->addrs[i].len; + subchannels[i] = grpc_subchannel_factory_create_subchannel( + exec_ctx, r->subchannel_factory, &args); } - gpr_mu_lock (&r->mu); - GPR_ASSERT (r->resolving); + memset(&lb_policy_args, 0, sizeof(lb_policy_args)); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); + grpc_resolved_addresses_destroy(addresses); + gpr_free(subchannels); + } + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->resolving); r->resolving = 0; - if (r->resolved_config) - { - grpc_client_config_unref (exec_ctx, r->resolved_config); - } + if (r->resolved_config) { + grpc_client_config_unref(exec_ctx, r->resolved_config); + } r->resolved_config = config; r->resolved_version++; - dns_maybe_finish_next_locked (exec_ctx, r); - gpr_mu_unlock (&r->mu); + dns_maybe_finish_next_locked(exec_ctx, r); + gpr_mu_unlock(&r->mu); - GRPC_RESOLVER_UNREF (exec_ctx, &r->base, "dns-resolving"); + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } -static void -dns_start_resolving_locked (dns_resolver * r) -{ - GRPC_RESOLVER_REF (&r->base, "dns-resolving"); - GPR_ASSERT (!r->resolving); +static void dns_start_resolving_locked(dns_resolver *r) { + GRPC_RESOLVER_REF(&r->base, "dns-resolving"); + GPR_ASSERT(!r->resolving); r->resolving = 1; - grpc_resolve_address (r->name, r->default_port, dns_on_resolved, r); + grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r); } -static void -dns_maybe_finish_next_locked (grpc_exec_ctx * exec_ctx, dns_resolver * r) -{ - if (r->next_completion != NULL && r->resolved_version != r->published_version) - { - *r->target_config = r->resolved_config; - if (r->resolved_config) - { - grpc_client_config_ref (r->resolved_config); - } - grpc_exec_ctx_enqueue (exec_ctx, r->next_completion, 1); - r->next_completion = NULL; - r->published_version = r->resolved_version; +static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + dns_resolver *r) { + if (r->next_completion != NULL && + r->resolved_version != r->published_version) { + *r->target_config = r->resolved_config; + if (r->resolved_config) { + grpc_client_config_ref(r->resolved_config); } + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + r->next_completion = NULL; + r->published_version = r->resolved_version; + } } -static void -dns_destroy (grpc_exec_ctx * exec_ctx, grpc_resolver * gr) -{ - dns_resolver *r = (dns_resolver *) gr; - gpr_mu_destroy (&r->mu); - if (r->resolved_config) - { - grpc_client_config_unref (exec_ctx, r->resolved_config); - } - grpc_subchannel_factory_unref (exec_ctx, r->subchannel_factory); - gpr_free (r->name); - gpr_free (r->default_port); - gpr_free (r->lb_policy_name); - gpr_free (r); +static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { + dns_resolver *r = (dns_resolver *)gr; + gpr_mu_destroy(&r->mu); + if (r->resolved_config) { + grpc_client_config_unref(exec_ctx, r->resolved_config); + } + grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory); + gpr_free(r->name); + gpr_free(r->default_port); + gpr_free(r->lb_policy_name); + gpr_free(r); } -static grpc_resolver * -dns_create (grpc_resolver_args * args, const char *default_port, const char *lb_policy_name) -{ +static grpc_resolver *dns_create(grpc_resolver_args *args, + const char *default_port, + const char *lb_policy_name) { dns_resolver *r; const char *path = args->uri->path; - if (0 != strcmp (args->uri->authority, "")) - { - gpr_log (GPR_ERROR, "authority based dns uri's not supported"); - return NULL; - } + if (0 != strcmp(args->uri->authority, "")) { + gpr_log(GPR_ERROR, "authority based dns uri's not supported"); + return NULL; + } - if (path[0] == '/') - ++path; + if (path[0] == '/') ++path; - r = gpr_malloc (sizeof (dns_resolver)); - memset (r, 0, sizeof (*r)); - gpr_ref_init (&r->refs, 1); - gpr_mu_init (&r->mu); - grpc_resolver_init (&r->base, &dns_resolver_vtable); - r->name = gpr_strdup (path); - r->default_port = gpr_strdup (default_port); + r = gpr_malloc(sizeof(dns_resolver)); + memset(r, 0, sizeof(*r)); + gpr_ref_init(&r->refs, 1); + gpr_mu_init(&r->mu); + grpc_resolver_init(&r->base, &dns_resolver_vtable); + r->name = gpr_strdup(path); + r->default_port = gpr_strdup(default_port); r->subchannel_factory = args->subchannel_factory; - grpc_subchannel_factory_ref (r->subchannel_factory); - r->lb_policy_name = gpr_strdup (lb_policy_name); + grpc_subchannel_factory_ref(r->subchannel_factory); + r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; } @@ -251,39 +235,27 @@ dns_create (grpc_resolver_args * args, const char *default_port, const char *lb_ * FACTORY */ -static void -dns_factory_ref (grpc_resolver_factory * factory) -{ -} +static void dns_factory_ref(grpc_resolver_factory *factory) {} -static void -dns_factory_unref (grpc_resolver_factory * factory) -{ -} +static void dns_factory_unref(grpc_resolver_factory *factory) {} -static grpc_resolver * -dns_factory_create_resolver (grpc_resolver_factory * factory, grpc_resolver_args * args) -{ - return dns_create (args, "https", "pick_first"); +static grpc_resolver *dns_factory_create_resolver( + grpc_resolver_factory *factory, grpc_resolver_args *args) { + return dns_create(args, "https", "pick_first"); } -char * -dns_factory_get_default_host_name (grpc_resolver_factory * factory, grpc_uri * uri) -{ +char *dns_factory_get_default_host_name(grpc_resolver_factory *factory, + grpc_uri *uri) { const char *path = uri->path; - if (path[0] == '/') - ++path; - return gpr_strdup (path); + if (path[0] == '/') ++path; + return gpr_strdup(path); } static const grpc_resolver_factory_vtable dns_factory_vtable = { - dns_factory_ref, dns_factory_unref, dns_factory_create_resolver, - dns_factory_get_default_host_name, "dns" -}; -static grpc_resolver_factory dns_resolver_factory = { &dns_factory_vtable }; + dns_factory_ref, dns_factory_unref, dns_factory_create_resolver, + dns_factory_get_default_host_name, "dns"}; +static grpc_resolver_factory dns_resolver_factory = {&dns_factory_vtable}; -grpc_resolver_factory * -grpc_dns_resolver_factory_create () -{ +grpc_resolver_factory *grpc_dns_resolver_factory_create() { return &dns_resolver_factory; } diff --git a/src/core/client_config/resolvers/dns_resolver.h b/src/core/client_config/resolvers/dns_resolver.h index bb43499149..a3ef3161a6 100644 --- a/src/core/client_config/resolvers/dns_resolver.h +++ b/src/core/client_config/resolvers/dns_resolver.h @@ -37,6 +37,6 @@ #include "src/core/client_config/resolver_factory.h" /** Create a dns resolver factory */ -grpc_resolver_factory *grpc_dns_resolver_factory_create (void); +grpc_resolver_factory *grpc_dns_resolver_factory_create(void); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_DNS_RESOLVER_H */ diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index f4c958c424..0b017f06c7 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -49,8 +49,7 @@ #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" -typedef struct -{ +typedef struct { /** base class: must be first */ grpc_resolver base; /** refcount */ @@ -77,314 +76,283 @@ typedef struct grpc_client_config **target_config; } sockaddr_resolver; -static void sockaddr_destroy (grpc_exec_ctx * exec_ctx, grpc_resolver * r); +static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void sockaddr_maybe_finish_next_locked (grpc_exec_ctx * exec_ctx, sockaddr_resolver * r); +static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + sockaddr_resolver *r); -static void sockaddr_shutdown (grpc_exec_ctx * exec_ctx, grpc_resolver * r); -static void sockaddr_channel_saw_error (grpc_exec_ctx * exec_ctx, grpc_resolver * r, struct sockaddr *failing_address, int failing_address_len); -static void sockaddr_next (grpc_exec_ctx * exec_ctx, grpc_resolver * r, grpc_client_config ** target_config, grpc_closure * on_complete); +static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *r, + struct sockaddr *failing_address, + int failing_address_len); +static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_client_config **target_config, + grpc_closure *on_complete); static const grpc_resolver_vtable sockaddr_resolver_vtable = { - sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, - sockaddr_next -}; - -static void -sockaddr_shutdown (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver) -{ - sockaddr_resolver *r = (sockaddr_resolver *) resolver; - gpr_mu_lock (&r->mu); - if (r->next_completion != NULL) - { - *r->target_config = NULL; - grpc_exec_ctx_enqueue (exec_ctx, r->next_completion, 1); - r->next_completion = NULL; - } - gpr_mu_unlock (&r->mu); + sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, + sockaddr_next}; + +static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + sockaddr_resolver *r = (sockaddr_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (r->next_completion != NULL) { + *r->target_config = NULL; + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + r->next_completion = NULL; + } + gpr_mu_unlock(&r->mu); } -static void -sockaddr_channel_saw_error (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, struct sockaddr *sa, int len) -{ -} +static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, + struct sockaddr *sa, int len) {} -static void -sockaddr_next (grpc_exec_ctx * exec_ctx, grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete) -{ - sockaddr_resolver *r = (sockaddr_resolver *) resolver; - gpr_mu_lock (&r->mu); - GPR_ASSERT (!r->next_completion); +static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_client_config **target_config, + grpc_closure *on_complete) { + sockaddr_resolver *r = (sockaddr_resolver *)resolver; + gpr_mu_lock(&r->mu); + GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_config = target_config; - sockaddr_maybe_finish_next_locked (exec_ctx, r); - gpr_mu_unlock (&r->mu); + sockaddr_maybe_finish_next_locked(exec_ctx, r); + gpr_mu_unlock(&r->mu); } -static void -sockaddr_maybe_finish_next_locked (grpc_exec_ctx * exec_ctx, sockaddr_resolver * r) -{ +static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + sockaddr_resolver *r) { grpc_client_config *cfg; grpc_lb_policy *lb_policy; grpc_lb_policy_args lb_policy_args; grpc_subchannel **subchannels; grpc_subchannel_args args; - if (r->next_completion != NULL && !r->published) - { - size_t i; - cfg = grpc_client_config_create (); - subchannels = gpr_malloc (sizeof (grpc_subchannel *) * r->num_addrs); - for (i = 0; i < r->num_addrs; i++) - { - memset (&args, 0, sizeof (args)); - args.addr = (struct sockaddr *) &r->addrs[i]; - args.addr_len = r->addrs_len[i]; - subchannels[i] = grpc_subchannel_factory_create_subchannel (exec_ctx, r->subchannel_factory, &args); - } - memset (&lb_policy_args, 0, sizeof (lb_policy_args)); - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = r->num_addrs; - lb_policy = grpc_lb_policy_create (r->lb_policy_name, &lb_policy_args); - gpr_free (subchannels); - grpc_client_config_set_lb_policy (cfg, lb_policy); - GRPC_LB_POLICY_UNREF (exec_ctx, lb_policy, "sockaddr"); - r->published = 1; - *r->target_config = cfg; - grpc_exec_ctx_enqueue (exec_ctx, r->next_completion, 1); - r->next_completion = NULL; + if (r->next_completion != NULL && !r->published) { + size_t i; + cfg = grpc_client_config_create(); + subchannels = gpr_malloc(sizeof(grpc_subchannel *) * r->num_addrs); + for (i = 0; i < r->num_addrs; i++) { + memset(&args, 0, sizeof(args)); + args.addr = (struct sockaddr *)&r->addrs[i]; + args.addr_len = r->addrs_len[i]; + subchannels[i] = grpc_subchannel_factory_create_subchannel( + exec_ctx, r->subchannel_factory, &args); } + memset(&lb_policy_args, 0, sizeof(lb_policy_args)); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = r->num_addrs; + lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); + gpr_free(subchannels); + grpc_client_config_set_lb_policy(cfg, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); + r->published = 1; + *r->target_config = cfg; + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); + r->next_completion = NULL; + } } -static void -sockaddr_destroy (grpc_exec_ctx * exec_ctx, grpc_resolver * gr) -{ - sockaddr_resolver *r = (sockaddr_resolver *) gr; - gpr_mu_destroy (&r->mu); - grpc_subchannel_factory_unref (exec_ctx, r->subchannel_factory); - gpr_free (r->addrs); - gpr_free (r->addrs_len); - gpr_free (r->lb_policy_name); - gpr_free (r); +static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { + sockaddr_resolver *r = (sockaddr_resolver *)gr; + gpr_mu_destroy(&r->mu); + grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory); + gpr_free(r->addrs); + gpr_free(r->addrs_len); + gpr_free(r->lb_policy_name); + gpr_free(r); } #ifdef GPR_POSIX_SOCKET -static int -parse_unix (grpc_uri * uri, struct sockaddr_storage *addr, size_t * len) -{ - struct sockaddr_un *un = (struct sockaddr_un *) addr; +static int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, + size_t *len) { + struct sockaddr_un *un = (struct sockaddr_un *)addr; un->sun_family = AF_UNIX; - strcpy (un->sun_path, uri->path); - *len = strlen (un->sun_path) + sizeof (un->sun_family) + 1; + strcpy(un->sun_path, uri->path); + *len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; return 1; } -static char * -unix_get_default_authority (grpc_resolver_factory * factory, grpc_uri * uri) -{ - return gpr_strdup ("localhost"); +static char *unix_get_default_authority(grpc_resolver_factory *factory, + grpc_uri *uri) { + return gpr_strdup("localhost"); } #endif -static char * -ip_get_default_authority (grpc_uri * uri) -{ +static char *ip_get_default_authority(grpc_uri *uri) { const char *path = uri->path; - if (path[0] == '/') - ++path; - return gpr_strdup (path); + if (path[0] == '/') ++path; + return gpr_strdup(path); } -static char * -ipv4_get_default_authority (grpc_resolver_factory * factory, grpc_uri * uri) -{ - return ip_get_default_authority (uri); +static char *ipv4_get_default_authority(grpc_resolver_factory *factory, + grpc_uri *uri) { + return ip_get_default_authority(uri); } -static char * -ipv6_get_default_authority (grpc_resolver_factory * factory, grpc_uri * uri) -{ - return ip_get_default_authority (uri); +static char *ipv6_get_default_authority(grpc_resolver_factory *factory, + grpc_uri *uri) { + return ip_get_default_authority(uri); } -static int -parse_ipv4 (grpc_uri * uri, struct sockaddr_storage *addr, size_t * len) -{ +static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, + size_t *len) { const char *host_port = uri->path; char *host; char *port; int port_num; int result = 0; - struct sockaddr_in *in = (struct sockaddr_in *) addr; + struct sockaddr_in *in = (struct sockaddr_in *)addr; - if (*host_port == '/') - ++host_port; - if (!gpr_split_host_port (host_port, &host, &port)) - { - return 0; - } + if (*host_port == '/') ++host_port; + if (!gpr_split_host_port(host_port, &host, &port)) { + return 0; + } - memset (in, 0, sizeof (*in)); - *len = sizeof (*in); + memset(in, 0, sizeof(*in)); + *len = sizeof(*in); in->sin_family = AF_INET; - if (inet_pton (AF_INET, host, &in->sin_addr) == 0) - { - gpr_log (GPR_ERROR, "invalid ipv4 address: '%s'", host); - goto done; - } + if (inet_pton(AF_INET, host, &in->sin_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); + goto done; + } - if (port != NULL) - { - if (sscanf (port, "%d", &port_num) != 1 || port_num < 0 || port_num > 65535) - { - gpr_log (GPR_ERROR, "invalid ipv4 port: '%s'", port); - goto done; - } - in->sin_port = htons ((gpr_uint16) port_num); - } - else - { - gpr_log (GPR_ERROR, "no port given for ipv4 scheme"); + if (port != NULL) { + if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || + port_num > 65535) { + gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); goto done; } + in->sin_port = htons((gpr_uint16)port_num); + } else { + gpr_log(GPR_ERROR, "no port given for ipv4 scheme"); + goto done; + } result = 1; done: - gpr_free (host); - gpr_free (port); + gpr_free(host); + gpr_free(port); return result; } -static int -parse_ipv6 (grpc_uri * uri, struct sockaddr_storage *addr, size_t * len) -{ +static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, + size_t *len) { const char *host_port = uri->path; char *host; char *port; int port_num; int result = 0; - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) addr; + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr; - if (*host_port == '/') - ++host_port; - if (!gpr_split_host_port (host_port, &host, &port)) - { - return 0; - } + if (*host_port == '/') ++host_port; + if (!gpr_split_host_port(host_port, &host, &port)) { + return 0; + } - memset (in6, 0, sizeof (*in6)); - *len = sizeof (*in6); + memset(in6, 0, sizeof(*in6)); + *len = sizeof(*in6); in6->sin6_family = AF_INET6; - if (inet_pton (AF_INET6, host, &in6->sin6_addr) == 0) - { - gpr_log (GPR_ERROR, "invalid ipv6 address: '%s'", host); - goto done; - } + if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); + goto done; + } - if (port != NULL) - { - if (sscanf (port, "%d", &port_num) != 1 || port_num < 0 || port_num > 65535) - { - gpr_log (GPR_ERROR, "invalid ipv6 port: '%s'", port); - goto done; - } - in6->sin6_port = htons ((gpr_uint16) port_num); - } - else - { - gpr_log (GPR_ERROR, "no port given for ipv6 scheme"); + if (port != NULL) { + if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || + port_num > 65535) { + gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); goto done; } + in6->sin6_port = htons((gpr_uint16)port_num); + } else { + gpr_log(GPR_ERROR, "no port given for ipv6 scheme"); + goto done; + } result = 1; done: - gpr_free (host); - gpr_free (port); + gpr_free(host); + gpr_free(port); return result; } -static void -do_nothing (void *ignored) -{ -} +static void do_nothing(void *ignored) {} -static grpc_resolver * -sockaddr_create (grpc_resolver_args * args, const char *default_lb_policy_name, int parse (grpc_uri * uri, struct sockaddr_storage *dst, size_t * len)) -{ +static grpc_resolver *sockaddr_create( + grpc_resolver_args *args, const char *default_lb_policy_name, + int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { size_t i; - int errors_found = 0; /* GPR_FALSE */ + int errors_found = 0; /* GPR_FALSE */ sockaddr_resolver *r; gpr_slice path_slice; gpr_slice_buffer path_parts; - if (0 != strcmp (args->uri->authority, "")) - { - gpr_log (GPR_ERROR, "authority based uri's not supported by the %s scheme", args->uri->scheme); - return NULL; - } + if (0 != strcmp(args->uri->authority, "")) { + gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme", + args->uri->scheme); + return NULL; + } - r = gpr_malloc (sizeof (sockaddr_resolver)); - memset (r, 0, sizeof (*r)); + r = gpr_malloc(sizeof(sockaddr_resolver)); + memset(r, 0, sizeof(*r)); r->lb_policy_name = NULL; - if (0 != strcmp (args->uri->query, "")) - { - gpr_slice query_slice; - gpr_slice_buffer query_parts; - - query_slice = gpr_slice_new (args->uri->query, strlen (args->uri->query), do_nothing); - gpr_slice_buffer_init (&query_parts); - gpr_slice_split (query_slice, "=", &query_parts); - GPR_ASSERT (query_parts.count == 2); - if (0 == gpr_slice_str_cmp (query_parts.slices[0], "lb_policy")) - { - r->lb_policy_name = gpr_dump_slice (query_parts.slices[1], GPR_DUMP_ASCII); - } - gpr_slice_buffer_destroy (&query_parts); - gpr_slice_unref (query_slice); - } - if (r->lb_policy_name == NULL) - { - r->lb_policy_name = gpr_strdup (default_lb_policy_name); + if (0 != strcmp(args->uri->query, "")) { + gpr_slice query_slice; + gpr_slice_buffer query_parts; + + query_slice = + gpr_slice_new(args->uri->query, strlen(args->uri->query), do_nothing); + gpr_slice_buffer_init(&query_parts); + gpr_slice_split(query_slice, "=", &query_parts); + GPR_ASSERT(query_parts.count == 2); + if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) { + r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII); } + gpr_slice_buffer_destroy(&query_parts); + gpr_slice_unref(query_slice); + } + if (r->lb_policy_name == NULL) { + r->lb_policy_name = gpr_strdup(default_lb_policy_name); + } - path_slice = gpr_slice_new (args->uri->path, strlen (args->uri->path), do_nothing); - gpr_slice_buffer_init (&path_parts); + path_slice = + gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing); + gpr_slice_buffer_init(&path_parts); - gpr_slice_split (path_slice, ",", &path_parts); + gpr_slice_split(path_slice, ",", &path_parts); r->num_addrs = path_parts.count; - r->addrs = gpr_malloc (sizeof (struct sockaddr_storage) * r->num_addrs); - r->addrs_len = gpr_malloc (sizeof (*r->addrs_len) * r->num_addrs); - - for (i = 0; i < r->num_addrs; i++) - { - grpc_uri ith_uri = *args->uri; - char *part_str = gpr_dump_slice (path_parts.slices[i], GPR_DUMP_ASCII); - ith_uri.path = part_str; - if (!parse (&ith_uri, &r->addrs[i], &r->addrs_len[i])) - { - errors_found = 1; /* GPR_TRUE */ - } - gpr_free (part_str); - if (errors_found) - break; + r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs); + r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs); + + for (i = 0; i < r->num_addrs; i++) { + grpc_uri ith_uri = *args->uri; + char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); + ith_uri.path = part_str; + if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) { + errors_found = 1; /* GPR_TRUE */ } + gpr_free(part_str); + if (errors_found) break; + } - gpr_slice_buffer_destroy (&path_parts); - gpr_slice_unref (path_slice); - if (errors_found) - { - gpr_free (r); - return NULL; - } + gpr_slice_buffer_destroy(&path_parts); + gpr_slice_unref(path_slice); + if (errors_found) { + gpr_free(r); + return NULL; + } - gpr_ref_init (&r->refs, 1); - gpr_mu_init (&r->mu); - grpc_resolver_init (&r->base, &sockaddr_resolver_vtable); + gpr_ref_init(&r->refs, 1); + gpr_mu_init(&r->mu); + grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); r->subchannel_factory = args->subchannel_factory; - grpc_subchannel_factory_ref (r->subchannel_factory); + grpc_subchannel_factory_ref(r->subchannel_factory); return &r->base; } @@ -393,15 +361,9 @@ sockaddr_create (grpc_resolver_args * args, const char *default_lb_policy_name, * FACTORY */ -static void -sockaddr_factory_ref (grpc_resolver_factory * factory) -{ -} +static void sockaddr_factory_ref(grpc_resolver_factory *factory) {} -static void -sockaddr_factory_unref (grpc_resolver_factory * factory) -{ -} +static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} #define DECL_FACTORY(name) \ static grpc_resolver *name##_factory_create_resolver( \ @@ -418,6 +380,6 @@ sockaddr_factory_unref (grpc_resolver_factory * factory) } #ifdef GPR_POSIX_SOCKET -DECL_FACTORY (unix) +DECL_FACTORY(unix) #endif - DECL_FACTORY (ipv4) DECL_FACTORY (ipv6) +DECL_FACTORY(ipv4) DECL_FACTORY(ipv6) diff --git a/src/core/client_config/resolvers/sockaddr_resolver.h b/src/core/client_config/resolvers/sockaddr_resolver.h index c778812e49..1b7a18f9c2 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.h +++ b/src/core/client_config/resolvers/sockaddr_resolver.h @@ -38,13 +38,13 @@ #include "src/core/client_config/resolver_factory.h" -grpc_resolver_factory *grpc_ipv4_resolver_factory_create (void); +grpc_resolver_factory *grpc_ipv4_resolver_factory_create(void); -grpc_resolver_factory *grpc_ipv6_resolver_factory_create (void); +grpc_resolver_factory *grpc_ipv6_resolver_factory_create(void); #ifdef GPR_POSIX_SOCKET /** Create a unix resolver factory */ -grpc_resolver_factory *grpc_unix_resolver_factory_create (void); +grpc_resolver_factory *grpc_unix_resolver_factory_create(void); #endif #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_UNIX_RESOLVER_H */ diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index 3e2040c412..213d5a172f 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -50,8 +50,7 @@ /** Zookeeper session expiration time in milliseconds */ #define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000 -typedef struct -{ +typedef struct { /** base class: must be first */ grpc_resolver base; /** refcount */ @@ -88,119 +87,101 @@ typedef struct int resolved_num; } zookeeper_resolver; -static void zookeeper_destroy (grpc_resolver * r); +static void zookeeper_destroy(grpc_resolver *r); -static void zookeeper_start_resolving_locked (zookeeper_resolver * r); -static grpc_closure * -zookeeper_maybe_finish_next_locked (zookeeper_resolver * r) - GRPC_MUST_USE_RESULT; +static void zookeeper_start_resolving_locked(zookeeper_resolver *r); +static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) + GRPC_MUST_USE_RESULT; - static void zookeeper_shutdown (grpc_resolver * r); - static void zookeeper_channel_saw_error (grpc_resolver * r, struct sockaddr *failing_address, int failing_address_len); - static void zookeeper_next (grpc_resolver * r, grpc_client_config ** target_config, grpc_closure * on_complete); +static void zookeeper_shutdown(grpc_resolver *r); +static void zookeeper_channel_saw_error(grpc_resolver *r, + struct sockaddr *failing_address, + int failing_address_len); +static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config, + grpc_closure *on_complete); - static const grpc_resolver_vtable zookeeper_resolver_vtable = { - zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error, - zookeeper_next - }; +static const grpc_resolver_vtable zookeeper_resolver_vtable = { + zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error, + zookeeper_next}; -static void -zookeeper_shutdown (grpc_resolver * resolver) -{ - zookeeper_resolver *r = (zookeeper_resolver *) resolver; +static void zookeeper_shutdown(grpc_resolver *resolver) { + zookeeper_resolver *r = (zookeeper_resolver *)resolver; grpc_closure *call = NULL; - gpr_mu_lock (&r->mu); - if (r->next_completion != NULL) - { - *r->target_config = NULL; - call = r->next_completion; - r->next_completion = NULL; - } - zookeeper_close (r->zookeeper_handle); - gpr_mu_unlock (&r->mu); - if (call != NULL) - { - call->cb (call->cb_arg, 1); - } + gpr_mu_lock(&r->mu); + if (r->next_completion != NULL) { + *r->target_config = NULL; + call = r->next_completion; + r->next_completion = NULL; + } + zookeeper_close(r->zookeeper_handle); + gpr_mu_unlock(&r->mu); + if (call != NULL) { + call->cb(call->cb_arg, 1); + } } -static void -zookeeper_channel_saw_error (grpc_resolver * resolver, struct sockaddr *sa, int len) -{ - zookeeper_resolver *r = (zookeeper_resolver *) resolver; - gpr_mu_lock (&r->mu); - if (r->resolving == 0) - { - zookeeper_start_resolving_locked (r); - } - gpr_mu_unlock (&r->mu); +static void zookeeper_channel_saw_error(grpc_resolver *resolver, + struct sockaddr *sa, int len) { + zookeeper_resolver *r = (zookeeper_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } + gpr_mu_unlock(&r->mu); } -static void -zookeeper_next (grpc_resolver * resolver, grpc_client_config ** target_config, grpc_closure * on_complete) -{ - zookeeper_resolver *r = (zookeeper_resolver *) resolver; +static void zookeeper_next(grpc_resolver *resolver, + grpc_client_config **target_config, + grpc_closure *on_complete) { + zookeeper_resolver *r = (zookeeper_resolver *)resolver; grpc_closure *call; - gpr_mu_lock (&r->mu); - GPR_ASSERT (r->next_completion == NULL); + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->next_completion == NULL); r->next_completion = on_complete; r->target_config = target_config; - if (r->resolved_version == 0 && r->resolving == 0) - { - zookeeper_start_resolving_locked (r); - } - else - { - call = zookeeper_maybe_finish_next_locked (r); - } - gpr_mu_unlock (&r->mu); - if (call) - call->cb (call->cb_arg, 1); + if (r->resolved_version == 0 && r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } else { + call = zookeeper_maybe_finish_next_locked(r); + } + gpr_mu_unlock(&r->mu); + if (call) call->cb(call->cb_arg, 1); } /** Zookeeper global watcher for connection management TODO: better connection management besides logs */ -static void -zookeeper_global_watcher (zhandle_t * zookeeper_handle, int type, int state, const char *path, void *watcher_ctx) -{ - if (type == ZOO_SESSION_EVENT) - { - if (state == ZOO_EXPIRED_SESSION_STATE) - { - gpr_log (GPR_ERROR, "Zookeeper session expired"); - } - else if (state == ZOO_AUTH_FAILED_STATE) - { - gpr_log (GPR_ERROR, "Zookeeper authentication failed"); - } +static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type, + int state, const char *path, + void *watcher_ctx) { + if (type == ZOO_SESSION_EVENT) { + if (state == ZOO_EXPIRED_SESSION_STATE) { + gpr_log(GPR_ERROR, "Zookeeper session expired"); + } else if (state == ZOO_AUTH_FAILED_STATE) { + gpr_log(GPR_ERROR, "Zookeeper authentication failed"); } + } } /** Zookeeper watcher triggered by changes to watched nodes Once triggered, it tries to resolve again to get updated addresses */ -static void -zookeeper_watcher (zhandle_t * zookeeper_handle, int type, int state, const char *path, void *watcher_ctx) -{ - if (watcher_ctx != NULL) - { - zookeeper_resolver *r = (zookeeper_resolver *) watcher_ctx; - if (state == ZOO_CONNECTED_STATE) - { - gpr_mu_lock (&r->mu); - if (r->resolving == 0) - { - zookeeper_start_resolving_locked (r); - } - gpr_mu_unlock (&r->mu); - } +static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state, + const char *path, void *watcher_ctx) { + if (watcher_ctx != NULL) { + zookeeper_resolver *r = (zookeeper_resolver *)watcher_ctx; + if (state == ZOO_CONNECTED_STATE) { + gpr_mu_lock(&r->mu); + if (r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } + gpr_mu_unlock(&r->mu); } + } } /** Callback function after getting all resolved addresses Creates a subchannel for each address */ -static void -zookeeper_on_resolved (void *arg, grpc_resolved_addresses * addresses) -{ +static void zookeeper_on_resolved(void *arg, + grpc_resolved_addresses *addresses) { zookeeper_resolver *r = arg; grpc_client_config *config = NULL; grpc_subchannel **subchannels; @@ -208,77 +189,74 @@ zookeeper_on_resolved (void *arg, grpc_resolved_addresses * addresses) grpc_lb_policy *lb_policy; grpc_closure *call; size_t i; - if (addresses != NULL) - { - grpc_lb_policy_args lb_policy_args; - config = grpc_client_config_create (); - subchannels = gpr_malloc (sizeof (grpc_subchannel *) * addresses->naddrs); - for (i = 0; i < addresses->naddrs; i++) - { - memset (&args, 0, sizeof (args)); - args.addr = (struct sockaddr *) (addresses->addrs[i].addr); - args.addr_len = addresses->addrs[i].len; - subchannels[i] = grpc_subchannel_factory_create_subchannel (r->subchannel_factory, &args); - } - lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = addresses->naddrs; - lb_policy = grpc_lb_policy_create (r->lb_policy_name, &lb_policy_args); - grpc_client_config_set_lb_policy (config, lb_policy); - GRPC_LB_POLICY_UNREF (lb_policy, "construction"); - grpc_resolved_addresses_destroy (addresses); - gpr_free (subchannels); + if (addresses != NULL) { + grpc_lb_policy_args lb_policy_args; + config = grpc_client_config_create(); + subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); + for (i = 0; i < addresses->naddrs; i++) { + memset(&args, 0, sizeof(args)); + args.addr = (struct sockaddr *)(addresses->addrs[i].addr); + args.addr_len = addresses->addrs[i].len; + subchannels[i] = grpc_subchannel_factory_create_subchannel( + r->subchannel_factory, &args); } - gpr_mu_lock (&r->mu); - GPR_ASSERT (r->resolving == 1); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(lb_policy, "construction"); + grpc_resolved_addresses_destroy(addresses); + gpr_free(subchannels); + } + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->resolving == 1); r->resolving = 0; - if (r->resolved_config != NULL) - { - grpc_client_config_unref (r->resolved_config); - } + if (r->resolved_config != NULL) { + grpc_client_config_unref(r->resolved_config); + } r->resolved_config = config; r->resolved_version++; - call = zookeeper_maybe_finish_next_locked (r); - gpr_mu_unlock (&r->mu); + call = zookeeper_maybe_finish_next_locked(r); + gpr_mu_unlock(&r->mu); - if (call) - call->cb (call->cb_arg, 1); + if (call) call->cb(call->cb_arg, 1); - GRPC_RESOLVER_UNREF (&r->base, "zookeeper-resolving"); + GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving"); } /** Callback function for each DNS resolved address */ -static void -zookeeper_dns_resolved (void *arg, grpc_resolved_addresses * addresses) -{ +static void zookeeper_dns_resolved(void *arg, + grpc_resolved_addresses *addresses) { size_t i; zookeeper_resolver *r = arg; int resolve_done = 0; - gpr_mu_lock (&r->mu); + gpr_mu_lock(&r->mu); r->resolved_num++; - r->resolved_addrs->addrs = gpr_realloc (r->resolved_addrs->addrs, sizeof (grpc_resolved_address) * (r->resolved_addrs->naddrs + addresses->naddrs)); - for (i = 0; i < addresses->naddrs; i++) - { - memcpy (r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr, addresses->addrs[i].addr, addresses->addrs[i].len); - r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len = addresses->addrs[i].len; - } + r->resolved_addrs->addrs = + gpr_realloc(r->resolved_addrs->addrs, + sizeof(grpc_resolved_address) * + (r->resolved_addrs->naddrs + addresses->naddrs)); + for (i = 0; i < addresses->naddrs; i++) { + memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr, + addresses->addrs[i].addr, addresses->addrs[i].len); + r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len = + addresses->addrs[i].len; + } r->resolved_addrs->naddrs += addresses->naddrs; - grpc_resolved_addresses_destroy (addresses); + grpc_resolved_addresses_destroy(addresses); /** Wait for all addresses to be resolved */ resolve_done = (r->resolved_num == r->resolved_total); - gpr_mu_unlock (&r->mu); - if (resolve_done) - { - zookeeper_on_resolved (r, r->resolved_addrs); - } + gpr_mu_unlock(&r->mu); + if (resolve_done) { + zookeeper_on_resolved(r, r->resolved_addrs); + } } /** Parses JSON format address of a zookeeper node */ -static char * -zookeeper_parse_address (const char *value, size_t value_len) -{ +static char *zookeeper_parse_address(const char *value, size_t value_len) { grpc_json *json; grpc_json *cur; const char *host; @@ -286,301 +264,256 @@ zookeeper_parse_address (const char *value, size_t value_len) char *buffer; char *address = NULL; - buffer = gpr_malloc (value_len); - memcpy (buffer, value, value_len); - json = grpc_json_parse_string_with_len (buffer, value_len); - if (json != NULL) - { - host = NULL; - port = NULL; - for (cur = json->child; cur != NULL; cur = cur->next) - { - if (!strcmp (cur->key, "host")) - { - host = cur->value; - if (port != NULL) - { - break; - } - } - else if (!strcmp (cur->key, "port")) - { - port = cur->value; - if (host != NULL) - { - break; - } - } - } - if (host != NULL && port != NULL) - { - gpr_asprintf (&address, "%s:%s", host, port); - } - grpc_json_destroy (json); + buffer = gpr_malloc(value_len); + memcpy(buffer, value, value_len); + json = grpc_json_parse_string_with_len(buffer, value_len); + if (json != NULL) { + host = NULL; + port = NULL; + for (cur = json->child; cur != NULL; cur = cur->next) { + if (!strcmp(cur->key, "host")) { + host = cur->value; + if (port != NULL) { + break; + } + } else if (!strcmp(cur->key, "port")) { + port = cur->value; + if (host != NULL) { + break; + } + } + } + if (host != NULL && port != NULL) { + gpr_asprintf(&address, "%s:%s", host, port); } - gpr_free (buffer); + grpc_json_destroy(json); + } + gpr_free(buffer); return address; } -static void -zookeeper_get_children_node_completion (int rc, const char *value, int value_len, const struct Stat *stat, const void *arg) -{ +static void zookeeper_get_children_node_completion(int rc, const char *value, + int value_len, + const struct Stat *stat, + const void *arg) { char *address = NULL; - zookeeper_resolver *r = (zookeeper_resolver *) arg; + zookeeper_resolver *r = (zookeeper_resolver *)arg; int resolve_done = 0; - if (rc != 0) - { - gpr_log (GPR_ERROR, "Error in getting a child node of %s", r->name); - return; - } + if (rc != 0) { + gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name); + return; + } - address = zookeeper_parse_address (value, (size_t) value_len); - if (address != NULL) - { + address = zookeeper_parse_address(value, (size_t)value_len); + if (address != NULL) { /** Further resolves address by DNS */ - grpc_resolve_address (address, NULL, zookeeper_dns_resolved, r); - gpr_free (address); - } - else - { - gpr_log (GPR_ERROR, "Error in resolving a child node of %s", r->name); - gpr_mu_lock (&r->mu); - r->resolved_total--; - resolve_done = (r->resolved_num == r->resolved_total); - gpr_mu_unlock (&r->mu); - if (resolve_done) - { - zookeeper_on_resolved (r, r->resolved_addrs); - } + grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + gpr_free(address); + } else { + gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name); + gpr_mu_lock(&r->mu); + r->resolved_total--; + resolve_done = (r->resolved_num == r->resolved_total); + gpr_mu_unlock(&r->mu); + if (resolve_done) { + zookeeper_on_resolved(r, r->resolved_addrs); } + } } -static void -zookeeper_get_children_completion (int rc, const struct String_vector *children, const void *arg) -{ +static void zookeeper_get_children_completion( + int rc, const struct String_vector *children, const void *arg) { char *path; int status; int i; - zookeeper_resolver *r = (zookeeper_resolver *) arg; + zookeeper_resolver *r = (zookeeper_resolver *)arg; - if (rc != 0) - { - gpr_log (GPR_ERROR, "Error in getting zookeeper children of %s", r->name); - return; - } + if (rc != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); + return; + } - if (children->count == 0) - { - gpr_log (GPR_ERROR, "Error in resolving zookeeper address %s", r->name); - return; - } + if (children->count == 0) { + gpr_log(GPR_ERROR, "Error in resolving zookeeper address %s", r->name); + return; + } - r->resolved_addrs = gpr_malloc (sizeof (grpc_resolved_addresses)); + r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); r->resolved_addrs->addrs = NULL; r->resolved_addrs->naddrs = 0; r->resolved_total = children->count; /** TODO: Replace expensive heap allocation with stack if we can get maximum length of zookeeper path */ - for (i = 0; i < children->count; i++) - { - gpr_asprintf (&path, "%s/%s", r->name, children->data[i]); - status = zoo_awget (r->zookeeper_handle, path, zookeeper_watcher, r, zookeeper_get_children_node_completion, r); - gpr_free (path); - if (status != 0) - { - gpr_log (GPR_ERROR, "Error in getting zookeeper node %s", path); - } + for (i = 0; i < children->count; i++) { + gpr_asprintf(&path, "%s/%s", r->name, children->data[i]); + status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r, + zookeeper_get_children_node_completion, r); + gpr_free(path); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path); } + } } -static void -zookeeper_get_node_completion (int rc, const char *value, int value_len, const struct Stat *stat, const void *arg) -{ +static void zookeeper_get_node_completion(int rc, const char *value, + int value_len, + const struct Stat *stat, + const void *arg) { int status; char *address = NULL; - zookeeper_resolver *r = (zookeeper_resolver *) arg; + zookeeper_resolver *r = (zookeeper_resolver *)arg; r->resolved_addrs = NULL; r->resolved_total = 0; r->resolved_num = 0; - if (rc != 0) - { - gpr_log (GPR_ERROR, "Error in getting zookeeper node %s", r->name); - return; - } + if (rc != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); + return; + } /** If zookeeper node of path r->name does not have address (i.e. service node), get its children */ - address = zookeeper_parse_address (value, (size_t) value_len); - if (address != NULL) - { - r->resolved_addrs = gpr_malloc (sizeof (grpc_resolved_addresses)); - r->resolved_addrs->addrs = NULL; - r->resolved_addrs->naddrs = 0; - r->resolved_total = 1; + address = zookeeper_parse_address(value, (size_t)value_len); + if (address != NULL) { + r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + r->resolved_addrs->addrs = NULL; + r->resolved_addrs->naddrs = 0; + r->resolved_total = 1; /** Further resolves address by DNS */ - grpc_resolve_address (address, NULL, zookeeper_dns_resolved, r); - gpr_free (address); - return; - } - - status = zoo_awget_children (r->zookeeper_handle, r->name, zookeeper_watcher, r, zookeeper_get_children_completion, r); - if (status != 0) - { - gpr_log (GPR_ERROR, "Error in getting zookeeper children of %s", r->name); - } + grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + gpr_free(address); + return; + } + + status = zoo_awget_children(r->zookeeper_handle, r->name, zookeeper_watcher, + r, zookeeper_get_children_completion, r); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); + } } -static void -zookeeper_resolve_address (zookeeper_resolver * r) -{ +static void zookeeper_resolve_address(zookeeper_resolver *r) { int status; - status = zoo_awget (r->zookeeper_handle, r->name, zookeeper_watcher, r, zookeeper_get_node_completion, r); - if (status != 0) - { - gpr_log (GPR_ERROR, "Error in getting zookeeper node %s", r->name); - } + status = zoo_awget(r->zookeeper_handle, r->name, zookeeper_watcher, r, + zookeeper_get_node_completion, r); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); + } } -static void -zookeeper_start_resolving_locked (zookeeper_resolver * r) -{ - GRPC_RESOLVER_REF (&r->base, "zookeeper-resolving"); - GPR_ASSERT (r->resolving == 0); +static void zookeeper_start_resolving_locked(zookeeper_resolver *r) { + GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving"); + GPR_ASSERT(r->resolving == 0); r->resolving = 1; - zookeeper_resolve_address (r); + zookeeper_resolve_address(r); } -static grpc_closure * -zookeeper_maybe_finish_next_locked (zookeeper_resolver * r) -{ +static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) { grpc_closure *call = NULL; - if (r->next_completion != NULL && r->resolved_version != r->published_version) - { - *r->target_config = r->resolved_config; - if (r->resolved_config != NULL) - { - grpc_client_config_ref (r->resolved_config); - } - call = r->next_completion; - r->next_completion = NULL; - r->published_version = r->resolved_version; + if (r->next_completion != NULL && + r->resolved_version != r->published_version) { + *r->target_config = r->resolved_config; + if (r->resolved_config != NULL) { + grpc_client_config_ref(r->resolved_config); } + call = r->next_completion; + r->next_completion = NULL; + r->published_version = r->resolved_version; + } return call; } -static void -zookeeper_destroy (grpc_resolver * gr) -{ - zookeeper_resolver *r = (zookeeper_resolver *) gr; - gpr_mu_destroy (&r->mu); - if (r->resolved_config != NULL) - { - grpc_client_config_unref (r->resolved_config); - } - grpc_subchannel_factory_unref (r->subchannel_factory); - gpr_free (r->name); - gpr_free (r->lb_policy_name); - gpr_free (r); +static void zookeeper_destroy(grpc_resolver *gr) { + zookeeper_resolver *r = (zookeeper_resolver *)gr; + gpr_mu_destroy(&r->mu); + if (r->resolved_config != NULL) { + grpc_client_config_unref(r->resolved_config); + } + grpc_subchannel_factory_unref(r->subchannel_factory); + gpr_free(r->name); + gpr_free(r->lb_policy_name); + gpr_free(r); } -static grpc_resolver * -zookeeper_create (grpc_resolver_args * args, const char *lb_policy_name) -{ +static grpc_resolver *zookeeper_create(grpc_resolver_args *args, + const char *lb_policy_name) { zookeeper_resolver *r; size_t length; char *path = args->uri->path; - if (0 == strcmp (args->uri->authority, "")) - { - gpr_log (GPR_ERROR, "No authority specified in zookeeper uri"); - return NULL; - } + if (0 == strcmp(args->uri->authority, "")) { + gpr_log(GPR_ERROR, "No authority specified in zookeeper uri"); + return NULL; + } /** Removes the trailing slash if exists */ - length = strlen (path); - if (length > 1 && path[length - 1] == '/') - { - path[length - 1] = 0; - } - - r = gpr_malloc (sizeof (zookeeper_resolver)); - memset (r, 0, sizeof (*r)); - gpr_ref_init (&r->refs, 1); - gpr_mu_init (&r->mu); - grpc_resolver_init (&r->base, &zookeeper_resolver_vtable); - r->name = gpr_strdup (path); + length = strlen(path); + if (length > 1 && path[length - 1] == '/') { + path[length - 1] = 0; + } + + r = gpr_malloc(sizeof(zookeeper_resolver)); + memset(r, 0, sizeof(*r)); + gpr_ref_init(&r->refs, 1); + gpr_mu_init(&r->mu); + grpc_resolver_init(&r->base, &zookeeper_resolver_vtable); + r->name = gpr_strdup(path); r->subchannel_factory = args->subchannel_factory; - grpc_subchannel_factory_ref (r->subchannel_factory); + grpc_subchannel_factory_ref(r->subchannel_factory); - r->lb_policy_name = gpr_strdup (lb_policy_name); + r->lb_policy_name = gpr_strdup(lb_policy_name); /** Initializes zookeeper client */ - zoo_set_debug_level (ZOO_LOG_LEVEL_WARN); - r->zookeeper_handle = zookeeper_init (args->uri->authority, zookeeper_global_watcher, GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0); - if (r->zookeeper_handle == NULL) - { - gpr_log (GPR_ERROR, "Unable to connect to zookeeper server"); - return NULL; - } + zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); + r->zookeeper_handle = + zookeeper_init(args->uri->authority, zookeeper_global_watcher, + GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0); + if (r->zookeeper_handle == NULL) { + gpr_log(GPR_ERROR, "Unable to connect to zookeeper server"); + return NULL; + } return &r->base; } -static void -zookeeper_plugin_init () -{ - grpc_register_resolver_type (grpc_zookeeper_resolver_factory_create ()); +static void zookeeper_plugin_init() { + grpc_register_resolver_type(grpc_zookeeper_resolver_factory_create()); } -void -grpc_zookeeper_register () -{ - grpc_register_plugin (zookeeper_plugin_init, NULL); +void grpc_zookeeper_register() { + grpc_register_plugin(zookeeper_plugin_init, NULL); } /* * FACTORY */ -static void -zookeeper_factory_ref (grpc_resolver_factory * factory) -{ -} +static void zookeeper_factory_ref(grpc_resolver_factory *factory) {} -static void -zookeeper_factory_unref (grpc_resolver_factory * factory) -{ -} +static void zookeeper_factory_unref(grpc_resolver_factory *factory) {} -static char * -zookeeper_factory_get_default_hostname (grpc_resolver_factory * factory, grpc_uri * uri) -{ +static char *zookeeper_factory_get_default_hostname( + grpc_resolver_factory *factory, grpc_uri *uri) { return NULL; } -static grpc_resolver * -zookeeper_factory_create_resolver (grpc_resolver_factory * factory, grpc_resolver_args * args) -{ - return zookeeper_create (args, "pick_first"); +static grpc_resolver *zookeeper_factory_create_resolver( + grpc_resolver_factory *factory, grpc_resolver_args *args) { + return zookeeper_create(args, "pick_first"); } static const grpc_resolver_factory_vtable zookeeper_factory_vtable = { - zookeeper_factory_ref, zookeeper_factory_unref, - zookeeper_factory_create_resolver, zookeeper_factory_get_default_hostname, - "zookeeper" -}; + zookeeper_factory_ref, zookeeper_factory_unref, + zookeeper_factory_create_resolver, zookeeper_factory_get_default_hostname, + "zookeeper"}; static grpc_resolver_factory zookeeper_resolver_factory = { - &zookeeper_factory_vtable -}; + &zookeeper_factory_vtable}; -grpc_resolver_factory * -grpc_zookeeper_resolver_factory_create () -{ +grpc_resolver_factory *grpc_zookeeper_resolver_factory_create() { return &zookeeper_resolver_factory; } diff --git a/src/core/client_config/resolvers/zookeeper_resolver.h b/src/core/client_config/resolvers/zookeeper_resolver.h index 86b4480b15..a6f002dd6d 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.h +++ b/src/core/client_config/resolvers/zookeeper_resolver.h @@ -37,6 +37,6 @@ #include "src/core/client_config/resolver_factory.h" /** Create a zookeeper resolver factory */ -grpc_resolver_factory *grpc_zookeeper_resolver_factory_create (void); +grpc_resolver_factory *grpc_zookeeper_resolver_factory_create(void); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H */ diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 8058c9508b..c7dfba9a4b 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -50,8 +50,7 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -typedef struct -{ +typedef struct { /* all fields protected by subchannel->mu */ /** refcount */ int refs; @@ -59,16 +58,14 @@ typedef struct grpc_subchannel *subchannel; } connection; -typedef struct -{ +typedef struct { grpc_closure closure; size_t version; grpc_subchannel *subchannel; grpc_connectivity_state connectivity_state; } state_watcher; -typedef struct waiting_for_connect -{ +typedef struct waiting_for_connect { struct waiting_for_connect *next; grpc_closure *notify; grpc_pollset *pollset; @@ -77,8 +74,7 @@ typedef struct waiting_for_connect grpc_closure continuation; } waiting_for_connect; -struct grpc_subchannel -{ +struct grpc_subchannel { grpc_connector *connector; /** non-transport related channel filters */ @@ -139,8 +135,7 @@ struct grpc_subchannel gpr_uint32 random; }; -struct grpc_subchannel_call -{ +struct grpc_subchannel_call { connection *connection; gpr_refcount refs; }; @@ -148,19 +143,25 @@ struct grpc_subchannel_call #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) -static grpc_subchannel_call *create_call (grpc_exec_ctx * exec_ctx, connection * con); -static void connectivity_state_changed_locked (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, const char *reason); -static grpc_connectivity_state compute_connectivity_locked (grpc_subchannel * c); -static gpr_timespec compute_connect_deadline (grpc_subchannel * c); -static void subchannel_connected (grpc_exec_ctx * exec_ctx, void *subchannel, int iomgr_success); - -static void subchannel_ref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -static int -subchannel_unref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) - GRPC_MUST_USE_RESULT; - static void connection_ref_locked (connection * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); - static grpc_subchannel *connection_unref_locked (grpc_exec_ctx *exec_ctx, connection * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; - static void subchannel_destroy (grpc_exec_ctx * exec_ctx, grpc_subchannel * c); +static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, + connection *con); +static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c, + const char *reason); +static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); +static gpr_timespec compute_connect_deadline(grpc_subchannel *c); +static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, + int iomgr_success); + +static void subchannel_ref_locked( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +static int subchannel_unref_locked( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; +static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +static grpc_subchannel *connection_unref_locked( + grpc_exec_ctx *exec_ctx, + connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; +static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG #define SUBCHANNEL_REF_LOCKED(p, r) \ @@ -196,34 +197,29 @@ subchannel_unref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) * connection implementation */ - static void connection_destroy (grpc_exec_ctx * exec_ctx, connection * c) -{ - GPR_ASSERT (c->refs == 0); - grpc_channel_stack_destroy (exec_ctx, CHANNEL_STACK_FROM_CONNECTION (c)); - gpr_free (c); +static void connection_destroy(grpc_exec_ctx *exec_ctx, connection *c) { + GPR_ASSERT(c->refs == 0); + grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c)); + gpr_free(c); } -static void -connection_ref_locked (connection * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) -{ - REF_LOG ("CONNECTION", c); - subchannel_ref_locked (c->subchannel REF_PASS_ARGS); +static void connection_ref_locked( + connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + REF_LOG("CONNECTION", c); + subchannel_ref_locked(c->subchannel REF_PASS_ARGS); ++c->refs; } -static grpc_subchannel * -connection_unref_locked (grpc_exec_ctx *exec_ctx, connection * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) -{ +static grpc_subchannel *connection_unref_locked( + grpc_exec_ctx *exec_ctx, connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { grpc_subchannel *destroy = NULL; - UNREF_LOG ("CONNECTION", c); - if (subchannel_unref_locked (c->subchannel REF_PASS_ARGS)) - { - destroy = c->subchannel; - } - if (--c->refs == 0 && c->subchannel->active != c) - { - connection_destroy (exec_ctx, c); - } + UNREF_LOG("CONNECTION", c); + if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) { + destroy = c->subchannel; + } + if (--c->refs == 0 && c->subchannel->active != c) { + connection_destroy(exec_ctx, c); + } return destroy; } @@ -231,261 +227,237 @@ connection_unref_locked (grpc_exec_ctx *exec_ctx, connection * c GRPC_SUBCHANNEL * grpc_subchannel implementation */ -static void -subchannel_ref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) -{ - REF_LOG ("SUBCHANNEL", c); +static void subchannel_ref_locked( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + REF_LOG("SUBCHANNEL", c); ++c->refs; } -static int -subchannel_unref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) -{ - UNREF_LOG ("SUBCHANNEL", c); +static int subchannel_unref_locked( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + UNREF_LOG("SUBCHANNEL", c); return --c->refs == 0; } -void -grpc_subchannel_ref (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) -{ - gpr_mu_lock (&c->mu); - subchannel_ref_locked (c REF_PASS_ARGS); - gpr_mu_unlock (&c->mu); +void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_mu_lock(&c->mu); + subchannel_ref_locked(c REF_PASS_ARGS); + gpr_mu_unlock(&c->mu); } -void -grpc_subchannel_unref (grpc_exec_ctx *exec_ctx, grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) -{ +void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { int destroy; - gpr_mu_lock (&c->mu); - destroy = subchannel_unref_locked (c REF_PASS_ARGS); - gpr_mu_unlock (&c->mu); - if (destroy) - subchannel_destroy (exec_ctx, c); -} - -static void -subchannel_destroy (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) -{ - if (c->active != NULL) - { - connection_destroy (exec_ctx, c->active); - } - gpr_free (c->filters); - grpc_channel_args_destroy (c->args); - gpr_free (c->addr); - grpc_mdctx_unref (c->mdctx); - grpc_connectivity_state_destroy (exec_ctx, &c->state_tracker); - grpc_connector_unref (exec_ctx, c->connector); - gpr_free (c); + gpr_mu_lock(&c->mu); + destroy = subchannel_unref_locked(c REF_PASS_ARGS); + gpr_mu_unlock(&c->mu); + if (destroy) subchannel_destroy(exec_ctx, c); +} + +static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { + if (c->active != NULL) { + connection_destroy(exec_ctx, c->active); + } + gpr_free(c->filters); + grpc_channel_args_destroy(c->args); + gpr_free(c->addr); + grpc_mdctx_unref(c->mdctx); + grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); + grpc_connector_unref(exec_ctx, c->connector); + gpr_free(c); } -void -grpc_subchannel_add_interested_party (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc_pollset * pollset) -{ - grpc_pollset_set_add_pollset (exec_ctx, c->pollset_set, pollset); +void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c, + grpc_pollset *pollset) { + grpc_pollset_set_add_pollset(exec_ctx, c->pollset_set, pollset); } -void -grpc_subchannel_del_interested_party (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc_pollset * pollset) -{ - grpc_pollset_set_del_pollset (exec_ctx, c->pollset_set, pollset); +void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c, + grpc_pollset *pollset) { + grpc_pollset_set_del_pollset(exec_ctx, c->pollset_set, pollset); } -static gpr_uint32 -random_seed () -{ - return (gpr_uint32) (gpr_time_to_millis (gpr_now (GPR_CLOCK_MONOTONIC))); +static gpr_uint32 random_seed() { + return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); } -grpc_subchannel * -grpc_subchannel_create (grpc_connector * connector, grpc_subchannel_args * args) -{ - grpc_subchannel *c = gpr_malloc (sizeof (*c)); - grpc_channel_element *parent_elem = grpc_channel_stack_last_element (grpc_channel_get_channel_stack (args->master)); - memset (c, 0, sizeof (*c)); +grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, + grpc_subchannel_args *args) { + grpc_subchannel *c = gpr_malloc(sizeof(*c)); + grpc_channel_element *parent_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(args->master)); + memset(c, 0, sizeof(*c)); c->refs = 1; c->connector = connector; - grpc_connector_ref (c->connector); + grpc_connector_ref(c->connector); c->num_filters = args->filter_count; - c->filters = gpr_malloc (sizeof (grpc_channel_filter *) * c->num_filters); - memcpy (c->filters, args->filters, sizeof (grpc_channel_filter *) * c->num_filters); - c->addr = gpr_malloc (args->addr_len); - memcpy (c->addr, args->addr, args->addr_len); + c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters); + memcpy(c->filters, args->filters, + sizeof(grpc_channel_filter *) * c->num_filters); + c->addr = gpr_malloc(args->addr_len); + memcpy(c->addr, args->addr, args->addr_len); c->addr_len = args->addr_len; - c->args = grpc_channel_args_copy (args->args); + c->args = grpc_channel_args_copy(args->args); c->mdctx = args->mdctx; c->master = args->master; - c->pollset_set = grpc_client_channel_get_connecting_pollset_set (parent_elem); - c->random = random_seed (); - grpc_mdctx_ref (c->mdctx); - grpc_closure_init (&c->connected, subchannel_connected, c); - grpc_connectivity_state_init (&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); - gpr_mu_init (&c->mu); + c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); + c->random = random_seed(); + grpc_mdctx_ref(c->mdctx); + grpc_closure_init(&c->connected, subchannel_connected, c); + grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, + "subchannel"); + gpr_mu_init(&c->mu); return c; } -static void -continue_connect (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) -{ +static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; args.interested_parties = c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; - args.deadline = compute_connect_deadline (c); + args.deadline = compute_connect_deadline(c); args.channel_args = c->args; - grpc_connector_connect (exec_ctx, c->connector, &args, &c->connecting_result, &c->connected); + grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result, + &c->connected); } -static void -start_connect (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) -{ - c->backoff_delta = gpr_time_from_seconds (GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); - c->next_attempt = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), c->backoff_delta); - continue_connect (exec_ctx, c); +static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { + c->backoff_delta = gpr_time_from_seconds( + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); + c->next_attempt = + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); + continue_connect(exec_ctx, c); } -static void -continue_creating_call (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { waiting_for_connect *w4c = arg; - grpc_subchannel_del_interested_party (exec_ctx, w4c->subchannel, w4c->pollset); - grpc_subchannel_create_call (exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); - GRPC_SUBCHANNEL_UNREF (exec_ctx, w4c->subchannel, "waiting_for_connect"); - gpr_free (w4c); + grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset); + grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset, + w4c->target, w4c->notify); + GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); + gpr_free(w4c); } -void -grpc_subchannel_create_call (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc_pollset * pollset, grpc_subchannel_call ** target, grpc_closure * notify) -{ +void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, + grpc_pollset *pollset, + grpc_subchannel_call **target, + grpc_closure *notify) { connection *con; - gpr_mu_lock (&c->mu); - if (c->active != NULL) - { - con = c->active; - CONNECTION_REF_LOCKED (con, "call"); - gpr_mu_unlock (&c->mu); - - *target = create_call (exec_ctx, con); - notify->cb (exec_ctx, notify->cb_arg, 1); - } - else - { - waiting_for_connect *w4c = gpr_malloc (sizeof (*w4c)); - w4c->next = c->waiting; - w4c->notify = notify; - w4c->pollset = pollset; - w4c->target = target; - w4c->subchannel = c; - /* released when clearing w4c */ - SUBCHANNEL_REF_LOCKED (c, "waiting_for_connect"); - grpc_closure_init (&w4c->continuation, continue_creating_call, w4c); - c->waiting = w4c; - grpc_subchannel_add_interested_party (exec_ctx, c, pollset); - if (!c->connecting) - { - c->connecting = 1; - connectivity_state_changed_locked (exec_ctx, c, "create_call"); - /* released by connection */ - SUBCHANNEL_REF_LOCKED (c, "connecting"); - GRPC_CHANNEL_INTERNAL_REF (c->master, "connecting"); - gpr_mu_unlock (&c->mu); - - start_connect (exec_ctx, c); - } - else - { - gpr_mu_unlock (&c->mu); - } + gpr_mu_lock(&c->mu); + if (c->active != NULL) { + con = c->active; + CONNECTION_REF_LOCKED(con, "call"); + gpr_mu_unlock(&c->mu); + + *target = create_call(exec_ctx, con); + notify->cb(exec_ctx, notify->cb_arg, 1); + } else { + waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); + w4c->next = c->waiting; + w4c->notify = notify; + w4c->pollset = pollset; + w4c->target = target; + w4c->subchannel = c; + /* released when clearing w4c */ + SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect"); + grpc_closure_init(&w4c->continuation, continue_creating_call, w4c); + c->waiting = w4c; + grpc_subchannel_add_interested_party(exec_ctx, c, pollset); + if (!c->connecting) { + c->connecting = 1; + connectivity_state_changed_locked(exec_ctx, c, "create_call"); + /* released by connection */ + SUBCHANNEL_REF_LOCKED(c, "connecting"); + GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); + gpr_mu_unlock(&c->mu); + + start_connect(exec_ctx, c); + } else { + gpr_mu_unlock(&c->mu); } + } } -grpc_connectivity_state -grpc_subchannel_check_connectivity (grpc_subchannel * c) -{ +grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { grpc_connectivity_state state; - gpr_mu_lock (&c->mu); - state = grpc_connectivity_state_check (&c->state_tracker); - gpr_mu_unlock (&c->mu); + gpr_mu_lock(&c->mu); + state = grpc_connectivity_state_check(&c->state_tracker); + gpr_mu_unlock(&c->mu); return state; } -void -grpc_subchannel_notify_on_state_change (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc_connectivity_state * state, grpc_closure * notify) -{ +void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c, + grpc_connectivity_state *state, + grpc_closure *notify) { int do_connect = 0; - gpr_mu_lock (&c->mu); - if (grpc_connectivity_state_notify_on_state_change (exec_ctx, &c->state_tracker, state, notify)) - { - do_connect = 1; - c->connecting = 1; - /* released by connection */ - SUBCHANNEL_REF_LOCKED (c, "connecting"); - GRPC_CHANNEL_INTERNAL_REF (c->master, "connecting"); - connectivity_state_changed_locked (exec_ctx, c, "state_change"); - } - gpr_mu_unlock (&c->mu); - - if (do_connect) - { - start_connect (exec_ctx, c); - } -} - -void -grpc_subchannel_process_transport_op (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, grpc_transport_op * op) -{ + gpr_mu_lock(&c->mu); + if (grpc_connectivity_state_notify_on_state_change( + exec_ctx, &c->state_tracker, state, notify)) { + do_connect = 1; + c->connecting = 1; + /* released by connection */ + SUBCHANNEL_REF_LOCKED(c, "connecting"); + GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); + connectivity_state_changed_locked(exec_ctx, c, "state_change"); + } + gpr_mu_unlock(&c->mu); + + if (do_connect) { + start_connect(exec_ctx, c); + } +} + +void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c, + grpc_transport_op *op) { connection *con = NULL; grpc_subchannel *destroy; int cancel_alarm = 0; - gpr_mu_lock (&c->mu); - if (c->active != NULL) - { - con = c->active; - CONNECTION_REF_LOCKED (con, "transport-op"); - } - if (op->disconnect) - { - c->disconnected = 1; - connectivity_state_changed_locked (exec_ctx, c, "disconnect"); - if (c->have_alarm) - { - cancel_alarm = 1; - } + gpr_mu_lock(&c->mu); + if (c->active != NULL) { + con = c->active; + CONNECTION_REF_LOCKED(con, "transport-op"); + } + if (op->disconnect) { + c->disconnected = 1; + connectivity_state_changed_locked(exec_ctx, c, "disconnect"); + if (c->have_alarm) { + cancel_alarm = 1; } - gpr_mu_unlock (&c->mu); - - if (con != NULL) - { - grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION (con); - grpc_channel_element *top_elem = grpc_channel_stack_element (channel_stack, 0); - top_elem->filter->start_transport_op (exec_ctx, top_elem, op); - - gpr_mu_lock (&c->mu); - destroy = CONNECTION_UNREF_LOCKED (exec_ctx, con, "transport-op"); - gpr_mu_unlock (&c->mu); - if (destroy) - { - subchannel_destroy (exec_ctx, destroy); - } + } + gpr_mu_unlock(&c->mu); + + if (con != NULL) { + grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); + grpc_channel_element *top_elem = + grpc_channel_stack_element(channel_stack, 0); + top_elem->filter->start_transport_op(exec_ctx, top_elem, op); + + gpr_mu_lock(&c->mu); + destroy = CONNECTION_UNREF_LOCKED(exec_ctx, con, "transport-op"); + gpr_mu_unlock(&c->mu); + if (destroy) { + subchannel_destroy(exec_ctx, destroy); } + } - if (cancel_alarm) - { - grpc_alarm_cancel (exec_ctx, &c->alarm); - } + if (cancel_alarm) { + grpc_alarm_cancel(exec_ctx, &c->alarm); + } - if (op->disconnect) - { - grpc_connector_shutdown (exec_ctx, c->connector); - } + if (op->disconnect) { + grpc_connector_shutdown(exec_ctx, c->connector); + } } -static void -on_state_changed (grpc_exec_ctx * exec_ctx, void *p, int iomgr_success) -{ +static void on_state_changed(grpc_exec_ctx *exec_ctx, void *p, + int iomgr_success) { state_watcher *sw = p; grpc_subchannel *c = sw->subchannel; gpr_mu *mu = &c->mu; @@ -494,59 +466,57 @@ on_state_changed (grpc_exec_ctx * exec_ctx, void *p, int iomgr_success) grpc_channel_element *elem; connection *destroy_connection = NULL; - gpr_mu_lock (mu); + gpr_mu_lock(mu); /* if we failed or there is a version number mismatch, just leave this closure */ - if (!iomgr_success || sw->subchannel->active_version != sw->version) - { - goto done; - } + if (!iomgr_success || sw->subchannel->active_version != sw->version) { + goto done; + } - switch (sw->connectivity_state) - { + switch (sw->connectivity_state) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_READY: case GRPC_CHANNEL_IDLE: /* all is still good: keep watching */ - memset (&op, 0, sizeof (op)); + memset(&op, 0, sizeof(op)); op.connectivity_state = &sw->connectivity_state; op.on_connectivity_state_change = &sw->closure; - elem = grpc_channel_stack_element (CHANNEL_STACK_FROM_CONNECTION (c->active), 0); - elem->filter->start_transport_op (exec_ctx, elem, &op); + elem = grpc_channel_stack_element( + CHANNEL_STACK_FROM_CONNECTION(c->active), 0); + elem->filter->start_transport_op(exec_ctx, elem, &op); /* early out */ - gpr_mu_unlock (mu); + gpr_mu_unlock(mu); return; case GRPC_CHANNEL_FATAL_FAILURE: case GRPC_CHANNEL_TRANSIENT_FAILURE: /* things have gone wrong, deactivate and enter idle */ - if (sw->subchannel->active->refs == 0) - { - destroy_connection = sw->subchannel->active; - } + if (sw->subchannel->active->refs == 0) { + destroy_connection = sw->subchannel->active; + } sw->subchannel->active = NULL; - grpc_connectivity_state_set (exec_ctx, &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE : GRPC_CHANNEL_TRANSIENT_FAILURE, "connection_failed"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + c->disconnected + ? GRPC_CHANNEL_FATAL_FAILURE + : GRPC_CHANNEL_TRANSIENT_FAILURE, + "connection_failed"); break; - } + } done: - connectivity_state_changed_locked (exec_ctx, c, "transport_state_changed"); - destroy = SUBCHANNEL_UNREF_LOCKED (c, "state_watcher"); - gpr_free (sw); - gpr_mu_unlock (mu); - if (destroy) - { - subchannel_destroy (exec_ctx, c); - } - if (destroy_connection != NULL) - { - connection_destroy (exec_ctx, destroy_connection); - } -} - -static void -publish_transport (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) -{ + connectivity_state_changed_locked(exec_ctx, c, "transport_state_changed"); + destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); + gpr_free(sw); + gpr_mu_unlock(mu); + if (destroy) { + subchannel_destroy(exec_ctx, c); + } + if (destroy_connection != NULL) { + connection_destroy(exec_ctx, destroy_connection); + } +} + +static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { size_t channel_stack_size; connection *con; grpc_channel_stack *stk; @@ -560,46 +530,46 @@ publish_transport (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) /* build final filter list */ num_filters = c->num_filters + c->connecting_result.num_filters + 1; - filters = gpr_malloc (sizeof (*filters) * num_filters); - memcpy (filters, c->filters, sizeof (*filters) * c->num_filters); - memcpy (filters + c->num_filters, c->connecting_result.filters, sizeof (*filters) * c->connecting_result.num_filters); + filters = gpr_malloc(sizeof(*filters) * num_filters); + memcpy(filters, c->filters, sizeof(*filters) * c->num_filters); + memcpy(filters + c->num_filters, c->connecting_result.filters, + sizeof(*filters) * c->connecting_result.num_filters); filters[num_filters - 1] = &grpc_connected_channel_filter; /* construct channel stack */ - channel_stack_size = grpc_channel_stack_size (filters, num_filters); - con = gpr_malloc (sizeof (connection) + channel_stack_size); - stk = (grpc_channel_stack *) (con + 1); + channel_stack_size = grpc_channel_stack_size(filters, num_filters); + con = gpr_malloc(sizeof(connection) + channel_stack_size); + stk = (grpc_channel_stack *)(con + 1); con->refs = 0; con->subchannel = c; - grpc_channel_stack_init (exec_ctx, filters, num_filters, c->master, c->args, c->mdctx, stk); - grpc_connected_channel_bind_transport (stk, c->connecting_result.transport); - gpr_free (c->connecting_result.filters); - memset (&c->connecting_result, 0, sizeof (c->connecting_result)); + grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args, + c->mdctx, stk); + grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); + gpr_free(c->connecting_result.filters); + memset(&c->connecting_result, 0, sizeof(c->connecting_result)); /* initialize state watcher */ - sw = gpr_malloc (sizeof (*sw)); - grpc_closure_init (&sw->closure, on_state_changed, sw); + sw = gpr_malloc(sizeof(*sw)); + grpc_closure_init(&sw->closure, on_state_changed, sw); sw->subchannel = c; sw->connectivity_state = GRPC_CHANNEL_READY; - gpr_mu_lock (&c->mu); + gpr_mu_lock(&c->mu); - if (c->disconnected) - { - gpr_mu_unlock (&c->mu); - gpr_free (sw); - gpr_free (filters); - grpc_channel_stack_destroy (exec_ctx, stk); - GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, c->master, "connecting"); - GRPC_SUBCHANNEL_UNREF (exec_ctx, c, "connecting"); - return; - } + if (c->disconnected) { + gpr_mu_unlock(&c->mu); + gpr_free(sw); + gpr_free(filters); + grpc_channel_stack_destroy(exec_ctx, stk); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); + return; + } /* publish */ - if (c->active != NULL && c->active->refs == 0) - { - destroy_connection = c->active; - } + if (c->active != NULL && c->active->refs == 0) { + destroy_connection = c->active; + } c->active = con; c->active_version++; sw->version = c->active_version; @@ -607,202 +577,183 @@ publish_transport (grpc_exec_ctx * exec_ctx, grpc_subchannel * c) /* watch for changes; subchannel ref for connecting is donated to the state watcher */ - memset (&op, 0, sizeof (op)); + memset(&op, 0, sizeof(op)); op.connectivity_state = &sw->connectivity_state; op.on_connectivity_state_change = &sw->closure; op.bind_pollset_set = c->pollset_set; - SUBCHANNEL_REF_LOCKED (c, "state_watcher"); - GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, c->master, "connecting"); - GPR_ASSERT (!SUBCHANNEL_UNREF_LOCKED (c, "connecting")); - elem = grpc_channel_stack_element (CHANNEL_STACK_FROM_CONNECTION (c->active), 0); - elem->filter->start_transport_op (exec_ctx, elem, &op); + SUBCHANNEL_REF_LOCKED(c, "state_watcher"); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); + GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting")); + elem = + grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0); + elem->filter->start_transport_op(exec_ctx, elem, &op); /* signal completion */ - connectivity_state_changed_locked (exec_ctx, c, "connected"); + connectivity_state_changed_locked(exec_ctx, c, "connected"); w4c = c->waiting; c->waiting = NULL; - gpr_mu_unlock (&c->mu); + gpr_mu_unlock(&c->mu); - while (w4c != NULL) - { - waiting_for_connect *next = w4c->next; - grpc_exec_ctx_enqueue (exec_ctx, &w4c->continuation, 1); - w4c = next; - } + while (w4c != NULL) { + waiting_for_connect *next = w4c->next; + grpc_exec_ctx_enqueue(exec_ctx, &w4c->continuation, 1); + w4c = next; + } - gpr_free (filters); + gpr_free(filters); - if (destroy_connection != NULL) - { - connection_destroy (exec_ctx, destroy_connection); - } + if (destroy_connection != NULL) { + connection_destroy(exec_ctx, destroy_connection); + } } /* Generate a random number between 0 and 1. */ -static double -generate_uniform_random_number (grpc_subchannel * c) -{ - c->random = (1103515245 * c->random + 12345) % ((gpr_uint32) 1 << 31); - return c->random / (double) ((gpr_uint32) 1 << 31); +static double generate_uniform_random_number(grpc_subchannel *c) { + c->random = (1103515245 * c->random + 12345) % ((gpr_uint32)1 << 31); + return c->random / (double)((gpr_uint32)1 << 31); } /* Update backoff_delta and next_attempt in subchannel */ -static void -update_reconnect_parameters (grpc_subchannel * c) -{ +static void update_reconnect_parameters(grpc_subchannel *c) { gpr_int32 backoff_delta_millis, jitter; - gpr_int32 max_backoff_millis = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + gpr_int32 max_backoff_millis = + GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; double jitter_range; - backoff_delta_millis = (gpr_int32) (gpr_time_to_millis (c->backoff_delta) * GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); - if (backoff_delta_millis > max_backoff_millis) - { - backoff_delta_millis = max_backoff_millis; - } - c->backoff_delta = gpr_time_from_millis (backoff_delta_millis, GPR_TIMESPAN); - c->next_attempt = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), c->backoff_delta); + backoff_delta_millis = + (gpr_int32)(gpr_time_to_millis(c->backoff_delta) * + GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); + if (backoff_delta_millis > max_backoff_millis) { + backoff_delta_millis = max_backoff_millis; + } + c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN); + c->next_attempt = + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis; - jitter = (gpr_int32) ((2 * generate_uniform_random_number (c) - 1) * jitter_range); - c->next_attempt = gpr_time_add (c->next_attempt, gpr_time_from_millis (jitter, GPR_TIMESPAN)); + jitter = + (gpr_int32)((2 * generate_uniform_random_number(c) - 1) * jitter_range); + c->next_attempt = + gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); } -static void -on_alarm (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ +static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { grpc_subchannel *c = arg; - gpr_mu_lock (&c->mu); + gpr_mu_lock(&c->mu); c->have_alarm = 0; - if (c->disconnected) - { - iomgr_success = 0; - } - connectivity_state_changed_locked (exec_ctx, c, "alarm"); - gpr_mu_unlock (&c->mu); - if (iomgr_success) - { - update_reconnect_parameters (c); - continue_connect (exec_ctx, c); - } - else - { - GRPC_CHANNEL_INTERNAL_UNREF (exec_ctx, c->master, "connecting"); - GRPC_SUBCHANNEL_UNREF (exec_ctx, c, "connecting"); - } -} - -static void -subchannel_connected (grpc_exec_ctx * exec_ctx, void *arg, int iomgr_success) -{ + if (c->disconnected) { + iomgr_success = 0; + } + connectivity_state_changed_locked(exec_ctx, c, "alarm"); + gpr_mu_unlock(&c->mu); + if (iomgr_success) { + update_reconnect_parameters(c); + continue_connect(exec_ctx, c); + } else { + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); + } +} + +static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { grpc_subchannel *c = arg; - if (c->connecting_result.transport != NULL) - { - publish_transport (exec_ctx, c); - } - else - { - gpr_timespec now = gpr_now (GPR_CLOCK_MONOTONIC); - gpr_mu_lock (&c->mu); - GPR_ASSERT (!c->have_alarm); - c->have_alarm = 1; - connectivity_state_changed_locked (exec_ctx, c, "connect_failed"); - grpc_alarm_init (exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); - gpr_mu_unlock (&c->mu); - } -} - -static gpr_timespec -compute_connect_deadline (grpc_subchannel * c) -{ - gpr_timespec current_deadline = gpr_time_add (c->next_attempt, c->backoff_delta); - gpr_timespec min_deadline = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds (GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS, - GPR_TIMESPAN)); - return gpr_time_cmp (current_deadline, min_deadline) > 0 ? current_deadline : min_deadline; -} - -static grpc_connectivity_state -compute_connectivity_locked (grpc_subchannel * c) -{ - if (c->disconnected) - { - return GRPC_CHANNEL_FATAL_FAILURE; - } - if (c->connecting) - { - if (c->have_alarm) - { - return GRPC_CHANNEL_TRANSIENT_FAILURE; - } - return GRPC_CHANNEL_CONNECTING; - } - if (c->active) - { - return GRPC_CHANNEL_READY; + if (c->connecting_result.transport != NULL) { + publish_transport(exec_ctx, c); + } else { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_mu_lock(&c->mu); + GPR_ASSERT(!c->have_alarm); + c->have_alarm = 1; + connectivity_state_changed_locked(exec_ctx, c, "connect_failed"); + grpc_alarm_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); + gpr_mu_unlock(&c->mu); + } +} + +static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { + gpr_timespec current_deadline = + gpr_time_add(c->next_attempt, c->backoff_delta); + gpr_timespec min_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS, + GPR_TIMESPAN)); + return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline + : min_deadline; +} + +static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { + if (c->disconnected) { + return GRPC_CHANNEL_FATAL_FAILURE; + } + if (c->connecting) { + if (c->have_alarm) { + return GRPC_CHANNEL_TRANSIENT_FAILURE; } + return GRPC_CHANNEL_CONNECTING; + } + if (c->active) { + return GRPC_CHANNEL_READY; + } return GRPC_CHANNEL_IDLE; } -static void -connectivity_state_changed_locked (grpc_exec_ctx * exec_ctx, grpc_subchannel * c, const char *reason) -{ - grpc_connectivity_state current = compute_connectivity_locked (c); - grpc_connectivity_state_set (exec_ctx, &c->state_tracker, current, reason); +static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c, + const char *reason) { + grpc_connectivity_state current = compute_connectivity_locked(c); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, current, reason); } /* * grpc_subchannel_call implementation */ -void -grpc_subchannel_call_ref (grpc_subchannel_call * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) -{ - gpr_ref (&c->refs); -} - -void -grpc_subchannel_call_unref (grpc_exec_ctx *exec_ctx, grpc_subchannel_call * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) -{ - if (gpr_unref (&c->refs)) - { - gpr_mu *mu = &c->connection->subchannel->mu; - grpc_subchannel *destroy; - grpc_call_stack_destroy (exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK (c)); - gpr_mu_lock (mu); - destroy = CONNECTION_UNREF_LOCKED (exec_ctx, c->connection, "call"); - gpr_mu_unlock (mu); - gpr_free (c); - if (destroy != NULL) - { - subchannel_destroy (exec_ctx, destroy); - } +void grpc_subchannel_call_ref( + grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_ref(&c->refs); +} + +void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + if (gpr_unref(&c->refs)) { + gpr_mu *mu = &c->connection->subchannel->mu; + grpc_subchannel *destroy; + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); + gpr_mu_lock(mu); + destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call"); + gpr_mu_unlock(mu); + gpr_free(c); + if (destroy != NULL) { + subchannel_destroy(exec_ctx, destroy); } + } } -char * -grpc_subchannel_call_get_peer (grpc_exec_ctx * exec_ctx, grpc_subchannel_call * call) -{ - grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK (call); - grpc_call_element *top_elem = grpc_call_stack_element (call_stack, 0); - return top_elem->filter->get_peer (exec_ctx, top_elem); +char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call *call) { + grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); + grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); + return top_elem->filter->get_peer(exec_ctx, top_elem); } -void -grpc_subchannel_call_process_op (grpc_exec_ctx * exec_ctx, grpc_subchannel_call * call, grpc_transport_stream_op * op) -{ - grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK (call); - grpc_call_element *top_elem = grpc_call_stack_element (call_stack, 0); - top_elem->filter->start_transport_stream_op (exec_ctx, top_elem, op); +void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call *call, + grpc_transport_stream_op *op) { + grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); + grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); + top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op); } -static grpc_subchannel_call * -create_call (grpc_exec_ctx * exec_ctx, connection * con) -{ - grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION (con); - grpc_subchannel_call *call = gpr_malloc (sizeof (grpc_subchannel_call) + chanstk->call_stack_size); - grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK (call); +static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, + connection *con) { + grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); + grpc_subchannel_call *call = + gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); + grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); call->connection = con; - gpr_ref_init (&call->refs, 1); - grpc_call_stack_init (exec_ctx, chanstk, NULL, NULL, callstk); + gpr_ref_init(&call->refs, 1); + grpc_call_stack_init(exec_ctx, chanstk, NULL, NULL, callstk); return call; } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b87d6f55ff..c9e5861d9c 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -64,37 +64,59 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #endif -void grpc_subchannel_ref (grpc_subchannel * channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_unref (grpc_exec_ctx *exec_ctx, grpc_subchannel * channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_call_ref (grpc_subchannel_call * call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_call_unref (grpc_exec_ctx *exec_ctx, grpc_subchannel_call * call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_ref( + grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, + grpc_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_call_ref( + grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call *call + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a call (possibly asynchronously) */ -void grpc_subchannel_create_call (grpc_exec_ctx * exec_ctx, grpc_subchannel * subchannel, grpc_pollset * pollset, grpc_subchannel_call ** target, grpc_closure * notify); +void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + grpc_pollset *pollset, + grpc_subchannel_call **target, + grpc_closure *notify); /** process a transport level op */ -void grpc_subchannel_process_transport_op (grpc_exec_ctx * exec_ctx, grpc_subchannel * subchannel, grpc_transport_op * op); +void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + grpc_transport_op *op); /** poll the current connectivity state of a channel */ -grpc_connectivity_state grpc_subchannel_check_connectivity (grpc_subchannel * channel); +grpc_connectivity_state grpc_subchannel_check_connectivity( + grpc_subchannel *channel); /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the channel */ -void grpc_subchannel_notify_on_state_change (grpc_exec_ctx * exec_ctx, grpc_subchannel * channel, grpc_connectivity_state * state, grpc_closure * notify); +void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, + grpc_subchannel *channel, + grpc_connectivity_state *state, + grpc_closure *notify); /** express interest in \a channel's activities through \a pollset. */ -void grpc_subchannel_add_interested_party (grpc_exec_ctx * exec_ctx, grpc_subchannel * channel, grpc_pollset * pollset); +void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, + grpc_subchannel *channel, + grpc_pollset *pollset); /** stop following \a channel's activity through \a pollset. */ -void grpc_subchannel_del_interested_party (grpc_exec_ctx * exec_ctx, grpc_subchannel * channel, grpc_pollset * pollset); +void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, + grpc_subchannel *channel, + grpc_pollset *pollset); /** continue processing a transport op */ -void grpc_subchannel_call_process_op (grpc_exec_ctx * exec_ctx, grpc_subchannel_call * subchannel_call, grpc_transport_stream_op * op); +void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call *subchannel_call, + grpc_transport_stream_op *op); /** continue querying for peer */ -char *grpc_subchannel_call_get_peer (grpc_exec_ctx * exec_ctx, grpc_subchannel_call * subchannel_call); +char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call *subchannel_call); -struct grpc_subchannel_args -{ +struct grpc_subchannel_args { /** Channel filters for this channel - wrapped factories will likely want to mutate this */ const grpc_channel_filter **filters; @@ -112,6 +134,7 @@ struct grpc_subchannel_args }; /** create a subchannel given a connector */ -grpc_subchannel *grpc_subchannel_create (grpc_connector * connector, grpc_subchannel_args * args); +grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, + grpc_subchannel_args *args); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */ diff --git a/src/core/client_config/subchannel_factory.c b/src/core/client_config/subchannel_factory.c index f60cd02421..2c64219e8b 100644 --- a/src/core/client_config/subchannel_factory.c +++ b/src/core/client_config/subchannel_factory.c @@ -33,20 +33,17 @@ #include "src/core/client_config/subchannel_factory.h" -void -grpc_subchannel_factory_ref (grpc_subchannel_factory * factory) -{ - factory->vtable->ref (factory); +void grpc_subchannel_factory_ref(grpc_subchannel_factory* factory) { + factory->vtable->ref(factory); } -void -grpc_subchannel_factory_unref (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * factory) -{ - factory->vtable->unref (exec_ctx, factory); +void grpc_subchannel_factory_unref(grpc_exec_ctx* exec_ctx, + grpc_subchannel_factory* factory) { + factory->vtable->unref(exec_ctx, factory); } -grpc_subchannel * -grpc_subchannel_factory_create_subchannel (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * factory, grpc_subchannel_args * args) -{ - return factory->vtable->create_subchannel (exec_ctx, factory, args); +grpc_subchannel* grpc_subchannel_factory_create_subchannel( + grpc_exec_ctx* exec_ctx, grpc_subchannel_factory* factory, + grpc_subchannel_args* args) { + return factory->vtable->create_subchannel(exec_ctx, factory, args); } diff --git a/src/core/client_config/subchannel_factory.h b/src/core/client_config/subchannel_factory.h index 7586a74754..c6d8cc90be 100644 --- a/src/core/client_config/subchannel_factory.h +++ b/src/core/client_config/subchannel_factory.h @@ -42,22 +42,25 @@ typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable; /** Constructor for new configured channels. Creating decorators around this type is encouraged to adapt behavior. */ -struct grpc_subchannel_factory -{ +struct grpc_subchannel_factory { const grpc_subchannel_factory_vtable *vtable; }; -struct grpc_subchannel_factory_vtable -{ - void (*ref) (grpc_subchannel_factory * factory); - void (*unref) (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * factory); - grpc_subchannel *(*create_subchannel) (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * factory, grpc_subchannel_args * args); +struct grpc_subchannel_factory_vtable { + void (*ref)(grpc_subchannel_factory *factory); + void (*unref)(grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory); + grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx, + grpc_subchannel_factory *factory, + grpc_subchannel_args *args); }; -void grpc_subchannel_factory_ref (grpc_subchannel_factory * factory); -void grpc_subchannel_factory_unref (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * factory); +void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory); +void grpc_subchannel_factory_unref(grpc_exec_ctx *exec_ctx, + grpc_subchannel_factory *factory); /** Create a new grpc_subchannel */ -grpc_subchannel *grpc_subchannel_factory_create_subchannel (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * factory, grpc_subchannel_args * args); +grpc_subchannel *grpc_subchannel_factory_create_subchannel( + grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory, + grpc_subchannel_args *args); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */ diff --git a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c index 220f00106b..585e465fa4 100644 --- a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c +++ b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.c @@ -34,11 +34,10 @@ #include "src/core/client_config/subchannel_factory_decorators/add_channel_arg.h" #include "src/core/client_config/subchannel_factory_decorators/merge_channel_args.h" -grpc_subchannel_factory * -grpc_subchannel_factory_add_channel_arg (grpc_subchannel_factory * input, const grpc_arg * arg) -{ +grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg( + grpc_subchannel_factory *input, const grpc_arg *arg) { grpc_channel_args args; args.num_args = 1; - args.args = (grpc_arg *) arg; - return grpc_subchannel_factory_merge_channel_args (input, &args); + args.args = (grpc_arg *)arg; + return grpc_subchannel_factory_merge_channel_args(input, &args); } diff --git a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h index 38f9c48d7a..76a535ebed 100644 --- a/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h +++ b/src/core/client_config/subchannel_factory_decorators/add_channel_arg.h @@ -39,7 +39,8 @@ /** Takes a subchannel factory, returns a new one that mutates incoming channel_args by adding a new argument; ownership of input, arg is retained by the caller. */ -grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg (grpc_subchannel_factory * input, const grpc_arg * arg); +grpc_subchannel_factory *grpc_subchannel_factory_add_channel_arg( + grpc_subchannel_factory *input, const grpc_arg *arg); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_ADD_CHANNEL_ARG_H \ */ diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c index 442862a4b8..cd25fdcf0f 100644 --- a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c +++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.c @@ -35,58 +35,52 @@ #include <grpc/support/alloc.h> #include "src/core/channel/channel_args.h" -typedef struct -{ +typedef struct { grpc_subchannel_factory base; gpr_refcount refs; grpc_subchannel_factory *wrapped; grpc_channel_args *merge_args; } merge_args_factory; -static void -merge_args_factory_ref (grpc_subchannel_factory * scf) -{ - merge_args_factory *f = (merge_args_factory *) scf; - gpr_ref (&f->refs); +static void merge_args_factory_ref(grpc_subchannel_factory *scf) { + merge_args_factory *f = (merge_args_factory *)scf; + gpr_ref(&f->refs); } -static void -merge_args_factory_unref (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * scf) -{ - merge_args_factory *f = (merge_args_factory *) scf; - if (gpr_unref (&f->refs)) - { - grpc_subchannel_factory_unref (exec_ctx, f->wrapped); - grpc_channel_args_destroy (f->merge_args); - gpr_free (f); - } +static void merge_args_factory_unref(grpc_exec_ctx *exec_ctx, + grpc_subchannel_factory *scf) { + merge_args_factory *f = (merge_args_factory *)scf; + if (gpr_unref(&f->refs)) { + grpc_subchannel_factory_unref(exec_ctx, f->wrapped); + grpc_channel_args_destroy(f->merge_args); + gpr_free(f); + } } -static grpc_subchannel * -merge_args_factory_create_subchannel (grpc_exec_ctx * exec_ctx, grpc_subchannel_factory * scf, grpc_subchannel_args * args) -{ - merge_args_factory *f = (merge_args_factory *) scf; - grpc_channel_args *final_args = grpc_channel_args_merge (args->args, f->merge_args); +static grpc_subchannel *merge_args_factory_create_subchannel( + grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *scf, + grpc_subchannel_args *args) { + merge_args_factory *f = (merge_args_factory *)scf; + grpc_channel_args *final_args = + grpc_channel_args_merge(args->args, f->merge_args); grpc_subchannel *s; args->args = final_args; - s = grpc_subchannel_factory_create_subchannel (exec_ctx, f->wrapped, args); - grpc_channel_args_destroy (final_args); + s = grpc_subchannel_factory_create_subchannel(exec_ctx, f->wrapped, args); + grpc_channel_args_destroy(final_args); return s; } static const grpc_subchannel_factory_vtable merge_args_factory_vtable = { - merge_args_factory_ref, merge_args_factory_unref, - merge_args_factory_create_subchannel -}; + merge_args_factory_ref, merge_args_factory_unref, + merge_args_factory_create_subchannel}; -grpc_subchannel_factory * -grpc_subchannel_factory_merge_channel_args (grpc_subchannel_factory * input, const grpc_channel_args * args) -{ - merge_args_factory *f = gpr_malloc (sizeof (*f)); +grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args( + grpc_subchannel_factory *input, const grpc_channel_args *args) { + merge_args_factory *f = gpr_malloc(sizeof(*f)); f->base.vtable = &merge_args_factory_vtable; - gpr_ref_init (&f->refs, 1); - grpc_subchannel_factory_ref (input); + gpr_ref_init(&f->refs, 1); + grpc_subchannel_factory_ref(input); f->wrapped = input; - f->merge_args = grpc_channel_args_copy (args); + f->merge_args = grpc_channel_args_copy(args); return &f->base; } diff --git a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h index 5155509a47..a9e1691871 100644 --- a/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h +++ b/src/core/client_config/subchannel_factory_decorators/merge_channel_args.h @@ -39,7 +39,8 @@ /** Takes a subchannel factory, returns a new one that mutates incoming channel_args by adding a new argument; ownership of input, args is retained by the caller. */ -grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args (grpc_subchannel_factory * input, const grpc_channel_args * args); +grpc_subchannel_factory *grpc_subchannel_factory_merge_channel_args( + grpc_subchannel_factory *input, const grpc_channel_args *args); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_DECORATORS_MERGE_CHANNEL_ARGS_H \ */ diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c index 4f4cc5b9c4..df9f32d403 100644 --- a/src/core/client_config/uri_parser.c +++ b/src/core/client_config/uri_parser.c @@ -42,35 +42,31 @@ /** a size_t default value... maps to all 1's */ #define NOT_SET (~(size_t)0) -static grpc_uri * -bad_uri (const char *uri_text, size_t pos, const char *section, int suppress_errors) -{ +static grpc_uri *bad_uri(const char *uri_text, size_t pos, const char *section, + int suppress_errors) { char *line_prefix; size_t pfx_len; - if (!suppress_errors) - { - gpr_asprintf (&line_prefix, "bad uri.%s: '", section); - pfx_len = strlen (line_prefix) + pos; - gpr_log (GPR_ERROR, "%s%s'", line_prefix, uri_text); - gpr_free (line_prefix); + if (!suppress_errors) { + gpr_asprintf(&line_prefix, "bad uri.%s: '", section); + pfx_len = strlen(line_prefix) + pos; + gpr_log(GPR_ERROR, "%s%s'", line_prefix, uri_text); + gpr_free(line_prefix); - line_prefix = gpr_malloc (pfx_len + 1); - memset (line_prefix, ' ', pfx_len); - line_prefix[pfx_len] = 0; - gpr_log (GPR_ERROR, "%s^ here", line_prefix); - gpr_free (line_prefix); - } + line_prefix = gpr_malloc(pfx_len + 1); + memset(line_prefix, ' ', pfx_len); + line_prefix[pfx_len] = 0; + gpr_log(GPR_ERROR, "%s^ here", line_prefix); + gpr_free(line_prefix); + } return NULL; } /** Returns a copy of \a src[begin, end) */ -static char * -copy_component (const char *src, size_t begin, size_t end) -{ - char *out = gpr_malloc (end - begin + 1); - memcpy (out, src + begin, end - begin); +static char *copy_component(const char *src, size_t begin, size_t end) { + char *out = gpr_malloc(end - begin + 1); + memcpy(out, src + begin, end - begin); out[end - begin] = 0; return out; } @@ -78,77 +74,66 @@ copy_component (const char *src, size_t begin, size_t end) /** Returns how many chars to advance if \a uri_text[i] begins a valid \a pchar * production. If \a uri_text[i] introduces an invalid \a pchar (such as percent * sign not followed by two hex digits), NOT_SET is returned. */ -static size_t -parse_pchar (const char *uri_text, size_t i) -{ +static size_t parse_pchar(const char *uri_text, size_t i) { /* pchar = unreserved / pct-encoded / sub-delims / ":" / "@" * unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~" * pct-encoded = "%" HEXDIG HEXDIG * sub-delims = "!" / "$" / "&" / "'" / "(" / ")" / "*" / "+" / "," / ";" / "=" */ char c = uri_text[i]; - if (((c >= 'A') && (c <= 'Z')) || ((c >= 'a') && (c <= 'z')) || ((c >= '0') && (c <= '9')) || (c == '-' || c == '.' || c == '_' || c == '~') || /* unreserved */ - (c == '!' || c == '$' || c == '&' || c == '\'' || c == '$' || c == '&' || c == '(' || c == ')' || c == '*' || c == '+' || c == ',' || c == ';' || c == '=') /* sub-delims */ ) - { - return 1; + if (((c >= 'A') && (c <= 'Z')) || ((c >= 'a') && (c <= 'z')) || + ((c >= '0') && (c <= '9')) || + (c == '-' || c == '.' || c == '_' || c == '~') || /* unreserved */ + (c == '!' || c == '$' || c == '&' || c == '\'' || c == '$' || c == '&' || + c == '(' || c == ')' || c == '*' || c == '+' || c == ',' || c == ';' || + c == '=') /* sub-delims */) { + return 1; + } + if (c == '%') { /* pct-encoded */ + size_t j; + if (uri_text[i + 1] == 0 || uri_text[i + 2] == 0) { + return NOT_SET; } - if (c == '%') - { /* pct-encoded */ - size_t j; - if (uri_text[i + 1] == 0 || uri_text[i + 2] == 0) - { - return NOT_SET; - } - for (j = i + 1; j < 2; j++) - { - c = uri_text[j]; - if (!(((c >= '0') && (c <= '9')) || ((c >= 'a') && (c <= 'f')) || ((c >= 'A') && (c <= 'F')))) - { - return NOT_SET; - } - } - return 2; + for (j = i + 1; j < 2; j++) { + c = uri_text[j]; + if (!(((c >= '0') && (c <= '9')) || ((c >= 'a') && (c <= 'f')) || + ((c >= 'A') && (c <= 'F')))) { + return NOT_SET; + } } + return 2; + } return 0; } /* *( pchar / "?" / "/" ) */ -static int -parse_fragment_or_query (const char *uri_text, size_t * i) -{ +static int parse_fragment_or_query(const char *uri_text, size_t *i) { char c; - while ((c = uri_text[*i]) != 0) - { - const size_t advance = parse_pchar (uri_text, *i); /* pchar */ - switch (advance) - { - case 0: /* uri_text[i] isn't in pchar */ - /* maybe it's ? or / */ - if (uri_text[*i] == '?' || uri_text[*i] == '/') - { - (*i)++; - break; - } - else - { - return 1; - } - gpr_log (GPR_ERROR, "should never reach here"); - abort (); - default: - (*i) += advance; - break; - case NOT_SET: /* uri_text[i] introduces an invalid URI */ - return 0; - } + while ((c = uri_text[*i]) != 0) { + const size_t advance = parse_pchar(uri_text, *i); /* pchar */ + switch (advance) { + case 0: /* uri_text[i] isn't in pchar */ + /* maybe it's ? or / */ + if (uri_text[*i] == '?' || uri_text[*i] == '/') { + (*i)++; + break; + } else { + return 1; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + default: + (*i) += advance; + break; + case NOT_SET: /* uri_text[i] introduces an invalid URI */ + return 0; } + } /* *i is the first uri_text position past the \a query production, maybe \0 */ return 1; } -grpc_uri * -grpc_uri_parse (const char *uri_text, int suppress_errors) -{ +grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { grpc_uri *uri; size_t scheme_begin = 0; size_t scheme_end = NOT_SET; @@ -162,127 +147,96 @@ grpc_uri_parse (const char *uri_text, int suppress_errors) size_t fragment_end = NOT_SET; size_t i; - for (i = scheme_begin; uri_text[i] != 0; i++) - { - if (uri_text[i] == ':') - { - scheme_end = i; - break; - } - if (uri_text[i] >= 'a' && uri_text[i] <= 'z') - continue; - if (uri_text[i] >= 'A' && uri_text[i] <= 'Z') - continue; - if (i != scheme_begin) - { - if (uri_text[i] >= '0' && uri_text[i] <= '9') - continue; - if (uri_text[i] == '+') - continue; - if (uri_text[i] == '-') - continue; - if (uri_text[i] == '.') - continue; - } + for (i = scheme_begin; uri_text[i] != 0; i++) { + if (uri_text[i] == ':') { + scheme_end = i; break; } - if (scheme_end == NOT_SET) - { - return bad_uri (uri_text, i, "scheme", suppress_errors); + if (uri_text[i] >= 'a' && uri_text[i] <= 'z') continue; + if (uri_text[i] >= 'A' && uri_text[i] <= 'Z') continue; + if (i != scheme_begin) { + if (uri_text[i] >= '0' && uri_text[i] <= '9') continue; + if (uri_text[i] == '+') continue; + if (uri_text[i] == '-') continue; + if (uri_text[i] == '.') continue; } - - if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') - { - authority_begin = scheme_end + 3; - for (i = authority_begin; uri_text[i] != 0 && authority_end == NOT_SET; i++) - { - if (uri_text[i] == '/' || uri_text[i] == '?' || uri_text[i] == '#') - { - authority_end = i; - } - } - if (authority_end == NOT_SET && uri_text[i] == 0) - { - authority_end = i; - } - if (authority_end == NOT_SET) - { - return bad_uri (uri_text, i, "authority", suppress_errors); - } - /* TODO(ctiller): parse the authority correctly */ - path_begin = authority_end; + break; + } + if (scheme_end == NOT_SET) { + return bad_uri(uri_text, i, "scheme", suppress_errors); + } + + if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') { + authority_begin = scheme_end + 3; + for (i = authority_begin; uri_text[i] != 0 && authority_end == NOT_SET; + i++) { + if (uri_text[i] == '/' || uri_text[i] == '?' || uri_text[i] == '#') { + authority_end = i; + } } - else - { - path_begin = scheme_end + 1; + if (authority_end == NOT_SET && uri_text[i] == 0) { + authority_end = i; } - - for (i = path_begin; uri_text[i] != 0; i++) - { - if (uri_text[i] == '?' || uri_text[i] == '#') - { - path_end = i; - break; - } + if (authority_end == NOT_SET) { + return bad_uri(uri_text, i, "authority", suppress_errors); } - if (path_end == NOT_SET && uri_text[i] == 0) - { + /* TODO(ctiller): parse the authority correctly */ + path_begin = authority_end; + } else { + path_begin = scheme_end + 1; + } + + for (i = path_begin; uri_text[i] != 0; i++) { + if (uri_text[i] == '?' || uri_text[i] == '#') { path_end = i; + break; } - if (path_end == NOT_SET) - { - return bad_uri (uri_text, i, "path", suppress_errors); - } - - if (uri_text[i] == '?') - { - query_begin = ++i; - if (!parse_fragment_or_query (uri_text, &i)) - { - return bad_uri (uri_text, i, "query", suppress_errors); - } - else if (uri_text[i] != 0 && uri_text[i] != '#') - { - /* We must be at the end or at the beginning of a fragment */ - return bad_uri (uri_text, i, "query", suppress_errors); - } - query_end = i; + } + if (path_end == NOT_SET && uri_text[i] == 0) { + path_end = i; + } + if (path_end == NOT_SET) { + return bad_uri(uri_text, i, "path", suppress_errors); + } + + if (uri_text[i] == '?') { + query_begin = ++i; + if (!parse_fragment_or_query(uri_text, &i)) { + return bad_uri(uri_text, i, "query", suppress_errors); + } else if (uri_text[i] != 0 && uri_text[i] != '#') { + /* We must be at the end or at the beginning of a fragment */ + return bad_uri(uri_text, i, "query", suppress_errors); } - if (uri_text[i] == '#') - { - fragment_begin = ++i; - if (!parse_fragment_or_query (uri_text, &i)) - { - return bad_uri (uri_text, i - fragment_end, "fragment", suppress_errors); - } - else if (uri_text[i] != 0) - { - /* We must be at the end */ - return bad_uri (uri_text, i, "fragment", suppress_errors); - } - fragment_end = i; + query_end = i; + } + if (uri_text[i] == '#') { + fragment_begin = ++i; + if (!parse_fragment_or_query(uri_text, &i)) { + return bad_uri(uri_text, i - fragment_end, "fragment", suppress_errors); + } else if (uri_text[i] != 0) { + /* We must be at the end */ + return bad_uri(uri_text, i, "fragment", suppress_errors); } + fragment_end = i; + } - uri = gpr_malloc (sizeof (*uri)); - memset (uri, 0, sizeof (*uri)); - uri->scheme = copy_component (uri_text, scheme_begin, scheme_end); - uri->authority = copy_component (uri_text, authority_begin, authority_end); - uri->path = copy_component (uri_text, path_begin, path_end); - uri->query = copy_component (uri_text, query_begin, query_end); - uri->fragment = copy_component (uri_text, fragment_begin, fragment_end); + uri = gpr_malloc(sizeof(*uri)); + memset(uri, 0, sizeof(*uri)); + uri->scheme = copy_component(uri_text, scheme_begin, scheme_end); + uri->authority = copy_component(uri_text, authority_begin, authority_end); + uri->path = copy_component(uri_text, path_begin, path_end); + uri->query = copy_component(uri_text, query_begin, query_end); + uri->fragment = copy_component(uri_text, fragment_begin, fragment_end); return uri; } -void -grpc_uri_destroy (grpc_uri * uri) -{ - if (!uri) - return; - gpr_free (uri->scheme); - gpr_free (uri->authority); - gpr_free (uri->path); - gpr_free (uri->query); - gpr_free (uri->fragment); - gpr_free (uri); +void grpc_uri_destroy(grpc_uri *uri) { + if (!uri) return; + gpr_free(uri->scheme); + gpr_free(uri->authority); + gpr_free(uri->path); + gpr_free(uri->query); + gpr_free(uri->fragment); + gpr_free(uri); } diff --git a/src/core/client_config/uri_parser.h b/src/core/client_config/uri_parser.h index 1eb26e7ca6..b8daa13bd4 100644 --- a/src/core/client_config/uri_parser.h +++ b/src/core/client_config/uri_parser.h @@ -34,8 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_URI_PARSER_H #define GRPC_INTERNAL_CORE_CLIENT_CONFIG_URI_PARSER_H -typedef struct -{ +typedef struct { char *scheme; char *authority; char *path; @@ -44,9 +43,9 @@ typedef struct } grpc_uri; /** parse a uri, return NULL on failure */ -grpc_uri *grpc_uri_parse (const char *uri_text, int suppress_errors); +grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors); /** destroy a uri */ -void grpc_uri_destroy (grpc_uri * uri); +void grpc_uri_destroy(grpc_uri *uri); #endif |