diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-22 10:42:19 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-22 10:42:19 -0700 |
commit | 45724b35e411fef7c5da66a74c78428c11d56843 (patch) | |
tree | 9264034aca675c89444e02f72ef58e67d7043604 /src/core/client_config/subchannel.c | |
parent | 298751c1195523ef6228595043b583c3a6270e08 (diff) |
indent pass to get logical source lines on one physical line
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r-- | src/core/client_config/subchannel.c | 874 |
1 files changed, 458 insertions, 416 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index d41bf8f566..778d1e8b50 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -50,7 +50,8 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -typedef struct { +typedef struct +{ /* all fields protected by subchannel->mu */ /** refcount */ int refs; @@ -58,14 +59,16 @@ typedef struct { grpc_subchannel *subchannel; } connection; -typedef struct { +typedef struct +{ grpc_closure closure; size_t version; grpc_subchannel *subchannel; grpc_connectivity_state connectivity_state; } state_watcher; -typedef struct waiting_for_connect { +typedef struct waiting_for_connect +{ struct waiting_for_connect *next; grpc_closure *notify; grpc_pollset *pollset; @@ -74,7 +77,8 @@ typedef struct waiting_for_connect { grpc_closure continuation; } waiting_for_connect; -struct grpc_subchannel { +struct grpc_subchannel +{ grpc_connector *connector; /** non-transport related channel filters */ @@ -135,7 +139,8 @@ struct grpc_subchannel { gpr_uint32 random; }; -struct grpc_subchannel_call { +struct grpc_subchannel_call +{ connection *connection; gpr_refcount refs; }; @@ -143,26 +148,19 @@ struct grpc_subchannel_call { #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) -static grpc_subchannel_call *create_call(connection *con, - grpc_closure_list *closure_list); -static void connectivity_state_changed_locked(grpc_subchannel *c, - const char *reason, - grpc_closure_list *closure_list); -static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); -static gpr_timespec compute_connect_deadline(grpc_subchannel *c); -static void subchannel_connected(void *subchannel, int iomgr_success, - grpc_closure_list *closure_list); - -static void subchannel_ref_locked( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -static int subchannel_unref_locked( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; -static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -static grpc_subchannel *connection_unref_locked( - connection *c, grpc_closure_list *closure_list - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; -static void subchannel_destroy(grpc_subchannel *c, - grpc_closure_list *closure_list); +static grpc_subchannel_call *create_call (connection * con, grpc_closure_list * closure_list); +static void connectivity_state_changed_locked (grpc_subchannel * c, const char *reason, grpc_closure_list * closure_list); +static grpc_connectivity_state compute_connectivity_locked (grpc_subchannel * c); +static gpr_timespec compute_connect_deadline (grpc_subchannel * c); +static void subchannel_connected (void *subchannel, int iomgr_success, grpc_closure_list * closure_list); + +static void subchannel_ref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +static int +subchannel_unref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) + GRPC_MUST_USE_RESULT; + static void connection_ref_locked (connection * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + static grpc_subchannel *connection_unref_locked (connection * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; + static void subchannel_destroy (grpc_subchannel * c, grpc_closure_list * closure_list); #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG #define SUBCHANNEL_REF_LOCKED(p, r) \ @@ -198,30 +196,34 @@ static void subchannel_destroy(grpc_subchannel *c, * connection implementation */ -static void connection_destroy(connection *c, grpc_closure_list *closure_list) { - GPR_ASSERT(c->refs == 0); - grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c), closure_list); - gpr_free(c); + static void connection_destroy (connection * c, grpc_closure_list * closure_list) +{ + GPR_ASSERT (c->refs == 0); + grpc_channel_stack_destroy (CHANNEL_STACK_FROM_CONNECTION (c), closure_list); + gpr_free (c); } -static void connection_ref_locked( - connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - REF_LOG("CONNECTION", c); - subchannel_ref_locked(c->subchannel REF_PASS_ARGS); +static void +connection_ref_locked (connection * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ + REF_LOG ("CONNECTION", c); + subchannel_ref_locked (c->subchannel REF_PASS_ARGS); ++c->refs; } -static grpc_subchannel *connection_unref_locked( - connection *c, - grpc_closure_list *closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +static grpc_subchannel * +connection_unref_locked (connection * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ grpc_subchannel *destroy = NULL; - UNREF_LOG("CONNECTION", c); - if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) { - destroy = c->subchannel; - } - if (--c->refs == 0 && c->subchannel->active != c) { - connection_destroy(c, closure_list); - } + UNREF_LOG ("CONNECTION", c); + if (subchannel_unref_locked (c->subchannel REF_PASS_ARGS)) + { + destroy = c->subchannel; + } + if (--c->refs == 0 && c->subchannel->active != c) + { + connection_destroy (c, closure_list); + } return destroy; } @@ -229,241 +231,261 @@ static grpc_subchannel *connection_unref_locked( * grpc_subchannel implementation */ -static void subchannel_ref_locked( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - REF_LOG("SUBCHANNEL", c); +static void +subchannel_ref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ + REF_LOG ("SUBCHANNEL", c); ++c->refs; } -static int subchannel_unref_locked( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - UNREF_LOG("SUBCHANNEL", c); +static int +subchannel_unref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ + UNREF_LOG ("SUBCHANNEL", c); return --c->refs == 0; } -void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_mu_lock(&c->mu); - subchannel_ref_locked(c REF_PASS_ARGS); - gpr_mu_unlock(&c->mu); +void +grpc_subchannel_ref (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ + gpr_mu_lock (&c->mu); + subchannel_ref_locked (c REF_PASS_ARGS); + gpr_mu_unlock (&c->mu); } -void grpc_subchannel_unref(grpc_subchannel *c, - grpc_closure_list *closure_list - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void +grpc_subchannel_unref (grpc_subchannel * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ int destroy; - gpr_mu_lock(&c->mu); - destroy = subchannel_unref_locked(c REF_PASS_ARGS); - gpr_mu_unlock(&c->mu); - if (destroy) subchannel_destroy(c, closure_list); -} - -static void subchannel_destroy(grpc_subchannel *c, - grpc_closure_list *closure_list) { - if (c->active != NULL) { - connection_destroy(c->active, closure_list); - } - gpr_free(c->filters); - grpc_channel_args_destroy(c->args); - gpr_free(c->addr); - grpc_mdctx_unref(c->mdctx); - grpc_connectivity_state_destroy(&c->state_tracker, closure_list); - grpc_connector_unref(c->connector, closure_list); - gpr_free(c); + gpr_mu_lock (&c->mu); + destroy = subchannel_unref_locked (c REF_PASS_ARGS); + gpr_mu_unlock (&c->mu); + if (destroy) + subchannel_destroy (c, closure_list); +} + +static void +subchannel_destroy (grpc_subchannel * c, grpc_closure_list * closure_list) +{ + if (c->active != NULL) + { + connection_destroy (c->active, closure_list); + } + gpr_free (c->filters); + grpc_channel_args_destroy (c->args); + gpr_free (c->addr); + grpc_mdctx_unref (c->mdctx); + grpc_connectivity_state_destroy (&c->state_tracker, closure_list); + grpc_connector_unref (c->connector, closure_list); + gpr_free (c); } -void grpc_subchannel_add_interested_party(grpc_subchannel *c, - grpc_pollset *pollset, - grpc_closure_list *closure_list) { - grpc_pollset_set_add_pollset(c->pollset_set, pollset, closure_list); +void +grpc_subchannel_add_interested_party (grpc_subchannel * c, grpc_pollset * pollset, grpc_closure_list * closure_list) +{ + grpc_pollset_set_add_pollset (c->pollset_set, pollset, closure_list); } -void grpc_subchannel_del_interested_party(grpc_subchannel *c, - grpc_pollset *pollset, - grpc_closure_list *closure_list) { - grpc_pollset_set_del_pollset(c->pollset_set, pollset, closure_list); +void +grpc_subchannel_del_interested_party (grpc_subchannel * c, grpc_pollset * pollset, grpc_closure_list * closure_list) +{ + grpc_pollset_set_del_pollset (c->pollset_set, pollset, closure_list); } -static gpr_uint32 random_seed() { - return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); +static gpr_uint32 +random_seed () +{ + return (gpr_uint32) (gpr_time_to_millis (gpr_now (GPR_CLOCK_MONOTONIC))); } -grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, - grpc_subchannel_args *args) { - grpc_subchannel *c = gpr_malloc(sizeof(*c)); - grpc_channel_element *parent_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(args->master)); - memset(c, 0, sizeof(*c)); +grpc_subchannel * +grpc_subchannel_create (grpc_connector * connector, grpc_subchannel_args * args) +{ + grpc_subchannel *c = gpr_malloc (sizeof (*c)); + grpc_channel_element *parent_elem = grpc_channel_stack_last_element (grpc_channel_get_channel_stack (args->master)); + memset (c, 0, sizeof (*c)); c->refs = 1; c->connector = connector; - grpc_connector_ref(c->connector); + grpc_connector_ref (c->connector); c->num_filters = args->filter_count; - c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters); - memcpy(c->filters, args->filters, - sizeof(grpc_channel_filter *) * c->num_filters); - c->addr = gpr_malloc(args->addr_len); - memcpy(c->addr, args->addr, args->addr_len); + c->filters = gpr_malloc (sizeof (grpc_channel_filter *) * c->num_filters); + memcpy (c->filters, args->filters, sizeof (grpc_channel_filter *) * c->num_filters); + c->addr = gpr_malloc (args->addr_len); + memcpy (c->addr, args->addr, args->addr_len); c->addr_len = args->addr_len; - c->args = grpc_channel_args_copy(args->args); + c->args = grpc_channel_args_copy (args->args); c->mdctx = args->mdctx; c->master = args->master; - c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); - c->random = random_seed(); - grpc_mdctx_ref(c->mdctx); - grpc_closure_init(&c->connected, subchannel_connected, c); - grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, - "subchannel"); - gpr_mu_init(&c->mu); + c->pollset_set = grpc_client_channel_get_connecting_pollset_set (parent_elem); + c->random = random_seed (); + grpc_mdctx_ref (c->mdctx); + grpc_closure_init (&c->connected, subchannel_connected, c); + grpc_connectivity_state_init (&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); + gpr_mu_init (&c->mu); return c; } -static void continue_connect(grpc_subchannel *c, - grpc_closure_list *closure_list) { +static void +continue_connect (grpc_subchannel * c, grpc_closure_list * closure_list) +{ grpc_connect_in_args args; args.interested_parties = c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; - args.deadline = compute_connect_deadline(c); + args.deadline = compute_connect_deadline (c); args.channel_args = c->args; - grpc_connector_connect(c->connector, &args, &c->connecting_result, - &c->connected, closure_list); + grpc_connector_connect (c->connector, &args, &c->connecting_result, &c->connected, closure_list); } -static void start_connect(grpc_subchannel *c, grpc_closure_list *closure_list) { - c->backoff_delta = gpr_time_from_seconds( - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); - c->next_attempt = - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); - continue_connect(c, closure_list); +static void +start_connect (grpc_subchannel * c, grpc_closure_list * closure_list) +{ + c->backoff_delta = gpr_time_from_seconds (GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); + c->next_attempt = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), c->backoff_delta); + continue_connect (c, closure_list); } -static void continue_creating_call(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +continue_creating_call (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ waiting_for_connect *w4c = arg; - grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset, - closure_list); - grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target, - w4c->notify, closure_list); - GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect", closure_list); - gpr_free(w4c); -} - -void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, - grpc_subchannel_call **target, - grpc_closure *notify, - grpc_closure_list *closure_list) { - connection *con; - gpr_mu_lock(&c->mu); - if (c->active != NULL) { - con = c->active; - CONNECTION_REF_LOCKED(con, "call"); - gpr_mu_unlock(&c->mu); - - *target = create_call(con, closure_list); - notify->cb(notify->cb_arg, 1, closure_list); - } else { - waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); - w4c->next = c->waiting; - w4c->notify = notify; - w4c->pollset = pollset; - w4c->target = target; - w4c->subchannel = c; - /* released when clearing w4c */ - SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect"); - grpc_closure_init(&w4c->continuation, continue_creating_call, w4c); - c->waiting = w4c; - grpc_subchannel_add_interested_party(c, pollset, closure_list); - if (!c->connecting) { - c->connecting = 1; - connectivity_state_changed_locked(c, "create_call", closure_list); - /* released by connection */ - SUBCHANNEL_REF_LOCKED(c, "connecting"); - GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); - gpr_mu_unlock(&c->mu); + grpc_subchannel_del_interested_party (w4c->subchannel, w4c->pollset, closure_list); + grpc_subchannel_create_call (w4c->subchannel, w4c->pollset, w4c->target, w4c->notify, closure_list); + GRPC_SUBCHANNEL_UNREF (w4c->subchannel, "waiting_for_connect", closure_list); + gpr_free (w4c); +} - start_connect(c, closure_list); - } else { - gpr_mu_unlock(&c->mu); +void +grpc_subchannel_create_call (grpc_subchannel * c, grpc_pollset * pollset, grpc_subchannel_call ** target, grpc_closure * notify, grpc_closure_list * closure_list) +{ + connection *con; + gpr_mu_lock (&c->mu); + if (c->active != NULL) + { + con = c->active; + CONNECTION_REF_LOCKED (con, "call"); + gpr_mu_unlock (&c->mu); + + *target = create_call (con, closure_list); + notify->cb (notify->cb_arg, 1, closure_list); + } + else + { + waiting_for_connect *w4c = gpr_malloc (sizeof (*w4c)); + w4c->next = c->waiting; + w4c->notify = notify; + w4c->pollset = pollset; + w4c->target = target; + w4c->subchannel = c; + /* released when clearing w4c */ + SUBCHANNEL_REF_LOCKED (c, "waiting_for_connect"); + grpc_closure_init (&w4c->continuation, continue_creating_call, w4c); + c->waiting = w4c; + grpc_subchannel_add_interested_party (c, pollset, closure_list); + if (!c->connecting) + { + c->connecting = 1; + connectivity_state_changed_locked (c, "create_call", closure_list); + /* released by connection */ + SUBCHANNEL_REF_LOCKED (c, "connecting"); + GRPC_CHANNEL_INTERNAL_REF (c->master, "connecting"); + gpr_mu_unlock (&c->mu); + + start_connect (c, closure_list); + } + else + { + gpr_mu_unlock (&c->mu); + } } - } } -grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { +grpc_connectivity_state +grpc_subchannel_check_connectivity (grpc_subchannel * c) +{ grpc_connectivity_state state; - gpr_mu_lock(&c->mu); - state = grpc_connectivity_state_check(&c->state_tracker); - gpr_mu_unlock(&c->mu); + gpr_mu_lock (&c->mu); + state = grpc_connectivity_state_check (&c->state_tracker); + gpr_mu_unlock (&c->mu); return state; } -void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, - grpc_connectivity_state *state, - grpc_closure *notify, - grpc_closure_list *closure_list) { +void +grpc_subchannel_notify_on_state_change (grpc_subchannel * c, grpc_connectivity_state * state, grpc_closure * notify, grpc_closure_list * closure_list) +{ int do_connect = 0; - gpr_mu_lock(&c->mu); - if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, - notify, closure_list)) { - do_connect = 1; - c->connecting = 1; - /* released by connection */ - SUBCHANNEL_REF_LOCKED(c, "connecting"); - GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); - connectivity_state_changed_locked(c, "state_change", closure_list); - } - gpr_mu_unlock(&c->mu); - - if (do_connect) { - start_connect(c, closure_list); - } -} - -void grpc_subchannel_process_transport_op(grpc_subchannel *c, - grpc_transport_op *op, - grpc_closure_list *closure_list) { + gpr_mu_lock (&c->mu); + if (grpc_connectivity_state_notify_on_state_change (&c->state_tracker, state, notify, closure_list)) + { + do_connect = 1; + c->connecting = 1; + /* released by connection */ + SUBCHANNEL_REF_LOCKED (c, "connecting"); + GRPC_CHANNEL_INTERNAL_REF (c->master, "connecting"); + connectivity_state_changed_locked (c, "state_change", closure_list); + } + gpr_mu_unlock (&c->mu); + + if (do_connect) + { + start_connect (c, closure_list); + } +} + +void +grpc_subchannel_process_transport_op (grpc_subchannel * c, grpc_transport_op * op, grpc_closure_list * closure_list) +{ connection *con = NULL; grpc_subchannel *destroy; int cancel_alarm = 0; - gpr_mu_lock(&c->mu); - if (c->active != NULL) { - con = c->active; - CONNECTION_REF_LOCKED(con, "transport-op"); - } - if (op->disconnect) { - c->disconnected = 1; - connectivity_state_changed_locked(c, "disconnect", closure_list); - if (c->have_alarm) { - cancel_alarm = 1; + gpr_mu_lock (&c->mu); + if (c->active != NULL) + { + con = c->active; + CONNECTION_REF_LOCKED (con, "transport-op"); + } + if (op->disconnect) + { + c->disconnected = 1; + connectivity_state_changed_locked (c, "disconnect", closure_list); + if (c->have_alarm) + { + cancel_alarm = 1; + } } - } - gpr_mu_unlock(&c->mu); - - if (con != NULL) { - grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_element *top_elem = - grpc_channel_stack_element(channel_stack, 0); - top_elem->filter->start_transport_op(top_elem, op, closure_list); - - gpr_mu_lock(&c->mu); - destroy = CONNECTION_UNREF_LOCKED(con, "transport-op", closure_list); - gpr_mu_unlock(&c->mu); - if (destroy) { - subchannel_destroy(destroy, closure_list); + gpr_mu_unlock (&c->mu); + + if (con != NULL) + { + grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION (con); + grpc_channel_element *top_elem = grpc_channel_stack_element (channel_stack, 0); + top_elem->filter->start_transport_op (top_elem, op, closure_list); + + gpr_mu_lock (&c->mu); + destroy = CONNECTION_UNREF_LOCKED (con, "transport-op", closure_list); + gpr_mu_unlock (&c->mu); + if (destroy) + { + subchannel_destroy (destroy, closure_list); + } } - } - if (cancel_alarm) { - grpc_alarm_cancel(&c->alarm, closure_list); - } + if (cancel_alarm) + { + grpc_alarm_cancel (&c->alarm, closure_list); + } - if (op->disconnect) { - grpc_connector_shutdown(c->connector, closure_list); - } + if (op->disconnect) + { + grpc_connector_shutdown (c->connector, closure_list); + } } -static void on_state_changed(void *p, int iomgr_success, - grpc_closure_list *closure_list) { +static void +on_state_changed (void *p, int iomgr_success, grpc_closure_list * closure_list) +{ state_watcher *sw = p; grpc_subchannel *c = sw->subchannel; gpr_mu *mu = &c->mu; @@ -472,57 +494,59 @@ static void on_state_changed(void *p, int iomgr_success, grpc_channel_element *elem; connection *destroy_connection = NULL; - gpr_mu_lock(mu); + gpr_mu_lock (mu); /* if we failed or there is a version number mismatch, just leave this closure */ - if (!iomgr_success || sw->subchannel->active_version != sw->version) { - goto done; - } + if (!iomgr_success || sw->subchannel->active_version != sw->version) + { + goto done; + } - switch (sw->connectivity_state) { + switch (sw->connectivity_state) + { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_READY: case GRPC_CHANNEL_IDLE: /* all is still good: keep watching */ - memset(&op, 0, sizeof(op)); + memset (&op, 0, sizeof (op)); op.connectivity_state = &sw->connectivity_state; op.on_connectivity_state_change = &sw->closure; - elem = grpc_channel_stack_element( - CHANNEL_STACK_FROM_CONNECTION(c->active), 0); - elem->filter->start_transport_op(elem, &op, closure_list); + elem = grpc_channel_stack_element (CHANNEL_STACK_FROM_CONNECTION (c->active), 0); + elem->filter->start_transport_op (elem, &op, closure_list); /* early out */ - gpr_mu_unlock(mu); + gpr_mu_unlock (mu); return; case GRPC_CHANNEL_FATAL_FAILURE: case GRPC_CHANNEL_TRANSIENT_FAILURE: /* things have gone wrong, deactivate and enter idle */ - if (sw->subchannel->active->refs == 0) { - destroy_connection = sw->subchannel->active; - } + if (sw->subchannel->active->refs == 0) + { + destroy_connection = sw->subchannel->active; + } sw->subchannel->active = NULL; - grpc_connectivity_state_set( - &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE - : GRPC_CHANNEL_TRANSIENT_FAILURE, - "connection_failed", closure_list); + grpc_connectivity_state_set (&c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE : GRPC_CHANNEL_TRANSIENT_FAILURE, "connection_failed", closure_list); break; - } + } done: - connectivity_state_changed_locked(c, "transport_state_changed", closure_list); - destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); - gpr_free(sw); - gpr_mu_unlock(mu); - if (destroy) { - subchannel_destroy(c, closure_list); - } - if (destroy_connection != NULL) { - connection_destroy(destroy_connection, closure_list); - } -} - -static void publish_transport(grpc_subchannel *c, - grpc_closure_list *closure_list) { + connectivity_state_changed_locked (c, "transport_state_changed", closure_list); + destroy = SUBCHANNEL_UNREF_LOCKED (c, "state_watcher"); + gpr_free (sw); + gpr_mu_unlock (mu); + if (destroy) + { + subchannel_destroy (c, closure_list); + } + if (destroy_connection != NULL) + { + connection_destroy (destroy_connection, closure_list); + } +} + +static void +publish_transport (grpc_subchannel * c, grpc_closure_list * closure_list) +{ size_t channel_stack_size; connection *con; grpc_channel_stack *stk; @@ -536,46 +560,46 @@ static void publish_transport(grpc_subchannel *c, /* build final filter list */ num_filters = c->num_filters + c->connecting_result.num_filters + 1; - filters = gpr_malloc(sizeof(*filters) * num_filters); - memcpy(filters, c->filters, sizeof(*filters) * c->num_filters); - memcpy(filters + c->num_filters, c->connecting_result.filters, - sizeof(*filters) * c->connecting_result.num_filters); + filters = gpr_malloc (sizeof (*filters) * num_filters); + memcpy (filters, c->filters, sizeof (*filters) * c->num_filters); + memcpy (filters + c->num_filters, c->connecting_result.filters, sizeof (*filters) * c->connecting_result.num_filters); filters[num_filters - 1] = &grpc_connected_channel_filter; /* construct channel stack */ - channel_stack_size = grpc_channel_stack_size(filters, num_filters); - con = gpr_malloc(sizeof(connection) + channel_stack_size); - stk = (grpc_channel_stack *)(con + 1); + channel_stack_size = grpc_channel_stack_size (filters, num_filters); + con = gpr_malloc (sizeof (connection) + channel_stack_size); + stk = (grpc_channel_stack *) (con + 1); con->refs = 0; con->subchannel = c; - grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx, - stk, closure_list); - grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); - gpr_free(c->connecting_result.filters); - memset(&c->connecting_result, 0, sizeof(c->connecting_result)); + grpc_channel_stack_init (filters, num_filters, c->master, c->args, c->mdctx, stk, closure_list); + grpc_connected_channel_bind_transport (stk, c->connecting_result.transport); + gpr_free (c->connecting_result.filters); + memset (&c->connecting_result, 0, sizeof (c->connecting_result)); /* initialize state watcher */ - sw = gpr_malloc(sizeof(*sw)); - grpc_closure_init(&sw->closure, on_state_changed, sw); + sw = gpr_malloc (sizeof (*sw)); + grpc_closure_init (&sw->closure, on_state_changed, sw); sw->subchannel = c; sw->connectivity_state = GRPC_CHANNEL_READY; - gpr_mu_lock(&c->mu); + gpr_mu_lock (&c->mu); - if (c->disconnected) { - gpr_mu_unlock(&c->mu); - gpr_free(sw); - gpr_free(filters); - grpc_channel_stack_destroy(stk, closure_list); - GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list); - GRPC_SUBCHANNEL_UNREF(c, "connecting", closure_list); - return; - } + if (c->disconnected) + { + gpr_mu_unlock (&c->mu); + gpr_free (sw); + gpr_free (filters); + grpc_channel_stack_destroy (stk, closure_list); + GRPC_CHANNEL_INTERNAL_UNREF (c->master, "connecting", closure_list); + GRPC_SUBCHANNEL_UNREF (c, "connecting", closure_list); + return; + } /* publish */ - if (c->active != NULL && c->active->refs == 0) { - destroy_connection = c->active; - } + if (c->active != NULL && c->active->refs == 0) + { + destroy_connection = c->active; + } c->active = con; c->active_version++; sw->version = c->active_version; @@ -583,184 +607,202 @@ static void publish_transport(grpc_subchannel *c, /* watch for changes; subchannel ref for connecting is donated to the state watcher */ - memset(&op, 0, sizeof(op)); + memset (&op, 0, sizeof (op)); op.connectivity_state = &sw->connectivity_state; op.on_connectivity_state_change = &sw->closure; op.bind_pollset_set = c->pollset_set; - SUBCHANNEL_REF_LOCKED(c, "state_watcher"); - GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list); - GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting")); - elem = - grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0); - elem->filter->start_transport_op(elem, &op, closure_list); + SUBCHANNEL_REF_LOCKED (c, "state_watcher"); + GRPC_CHANNEL_INTERNAL_UNREF (c->master, "connecting", closure_list); + GPR_ASSERT (!SUBCHANNEL_UNREF_LOCKED (c, "connecting")); + elem = grpc_channel_stack_element (CHANNEL_STACK_FROM_CONNECTION (c->active), 0); + elem->filter->start_transport_op (elem, &op, closure_list); /* signal completion */ - connectivity_state_changed_locked(c, "connected", closure_list); + connectivity_state_changed_locked (c, "connected", closure_list); w4c = c->waiting; c->waiting = NULL; - gpr_mu_unlock(&c->mu); + gpr_mu_unlock (&c->mu); - while (w4c != NULL) { - waiting_for_connect *next = w4c->next; - grpc_closure_list_add(closure_list, &w4c->continuation, 1); - w4c = next; - } + while (w4c != NULL) + { + waiting_for_connect *next = w4c->next; + grpc_closure_list_add (closure_list, &w4c->continuation, 1); + w4c = next; + } - gpr_free(filters); + gpr_free (filters); - if (destroy_connection != NULL) { - connection_destroy(destroy_connection, closure_list); - } + if (destroy_connection != NULL) + { + connection_destroy (destroy_connection, closure_list); + } } /* Generate a random number between 0 and 1. */ -static double generate_uniform_random_number(grpc_subchannel *c) { - c->random = (1103515245 * c->random + 12345) % ((gpr_uint32)1 << 31); - return c->random / (double)((gpr_uint32)1 << 31); +static double +generate_uniform_random_number (grpc_subchannel * c) +{ + c->random = (1103515245 * c->random + 12345) % ((gpr_uint32) 1 << 31); + return c->random / (double) ((gpr_uint32) 1 << 31); } /* Update backoff_delta and next_attempt in subchannel */ -static void update_reconnect_parameters(grpc_subchannel *c) { +static void +update_reconnect_parameters (grpc_subchannel * c) +{ gpr_int32 backoff_delta_millis, jitter; - gpr_int32 max_backoff_millis = - GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + gpr_int32 max_backoff_millis = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; double jitter_range; - backoff_delta_millis = - (gpr_int32)(gpr_time_to_millis(c->backoff_delta) * - GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); - if (backoff_delta_millis > max_backoff_millis) { - backoff_delta_millis = max_backoff_millis; - } - c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN); - c->next_attempt = - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); + backoff_delta_millis = (gpr_int32) (gpr_time_to_millis (c->backoff_delta) * GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); + if (backoff_delta_millis > max_backoff_millis) + { + backoff_delta_millis = max_backoff_millis; + } + c->backoff_delta = gpr_time_from_millis (backoff_delta_millis, GPR_TIMESPAN); + c->next_attempt = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), c->backoff_delta); jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis; - jitter = - (gpr_int32)((2 * generate_uniform_random_number(c) - 1) * jitter_range); - c->next_attempt = - gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); + jitter = (gpr_int32) ((2 * generate_uniform_random_number (c) - 1) * jitter_range); + c->next_attempt = gpr_time_add (c->next_attempt, gpr_time_from_millis (jitter, GPR_TIMESPAN)); } -static void on_alarm(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { +static void +on_alarm (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ grpc_subchannel *c = arg; - gpr_mu_lock(&c->mu); + gpr_mu_lock (&c->mu); c->have_alarm = 0; - if (c->disconnected) { - iomgr_success = 0; - } - connectivity_state_changed_locked(c, "alarm", closure_list); - gpr_mu_unlock(&c->mu); - if (iomgr_success) { - update_reconnect_parameters(c); - continue_connect(c, closure_list); - } else { - GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list); - GRPC_SUBCHANNEL_UNREF(c, "connecting", closure_list); - } -} - -static void subchannel_connected(void *arg, int iomgr_success, - grpc_closure_list *closure_list) { + if (c->disconnected) + { + iomgr_success = 0; + } + connectivity_state_changed_locked (c, "alarm", closure_list); + gpr_mu_unlock (&c->mu); + if (iomgr_success) + { + update_reconnect_parameters (c); + continue_connect (c, closure_list); + } + else + { + GRPC_CHANNEL_INTERNAL_UNREF (c->master, "connecting", closure_list); + GRPC_SUBCHANNEL_UNREF (c, "connecting", closure_list); + } +} + +static void +subchannel_connected (void *arg, int iomgr_success, grpc_closure_list * closure_list) +{ grpc_subchannel *c = arg; - if (c->connecting_result.transport != NULL) { - publish_transport(c, closure_list); - } else { - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_mu_lock(&c->mu); - GPR_ASSERT(!c->have_alarm); - c->have_alarm = 1; - connectivity_state_changed_locked(c, "connect_failed", closure_list); - grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now, closure_list); - gpr_mu_unlock(&c->mu); - } -} - -static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { - gpr_timespec current_deadline = - gpr_time_add(c->next_attempt, c->backoff_delta); - gpr_timespec min_deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS, - GPR_TIMESPAN)); - return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline - : min_deadline; -} - -static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { - if (c->disconnected) { - return GRPC_CHANNEL_FATAL_FAILURE; - } - if (c->connecting) { - if (c->have_alarm) { - return GRPC_CHANNEL_TRANSIENT_FAILURE; + if (c->connecting_result.transport != NULL) + { + publish_transport (c, closure_list); + } + else + { + gpr_timespec now = gpr_now (GPR_CLOCK_MONOTONIC); + gpr_mu_lock (&c->mu); + GPR_ASSERT (!c->have_alarm); + c->have_alarm = 1; + connectivity_state_changed_locked (c, "connect_failed", closure_list); + grpc_alarm_init (&c->alarm, c->next_attempt, on_alarm, c, now, closure_list); + gpr_mu_unlock (&c->mu); + } +} + +static gpr_timespec +compute_connect_deadline (grpc_subchannel * c) +{ + gpr_timespec current_deadline = gpr_time_add (c->next_attempt, c->backoff_delta); + gpr_timespec min_deadline = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds (GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS, + GPR_TIMESPAN)); + return gpr_time_cmp (current_deadline, min_deadline) > 0 ? current_deadline : min_deadline; +} + +static grpc_connectivity_state +compute_connectivity_locked (grpc_subchannel * c) +{ + if (c->disconnected) + { + return GRPC_CHANNEL_FATAL_FAILURE; + } + if (c->connecting) + { + if (c->have_alarm) + { + return GRPC_CHANNEL_TRANSIENT_FAILURE; + } + return GRPC_CHANNEL_CONNECTING; + } + if (c->active) + { + return GRPC_CHANNEL_READY; } - return GRPC_CHANNEL_CONNECTING; - } - if (c->active) { - return GRPC_CHANNEL_READY; - } return GRPC_CHANNEL_IDLE; } -static void connectivity_state_changed_locked(grpc_subchannel *c, - const char *reason, - grpc_closure_list *closure_list) { - grpc_connectivity_state current = compute_connectivity_locked(c); - grpc_connectivity_state_set(&c->state_tracker, current, reason, closure_list); +static void +connectivity_state_changed_locked (grpc_subchannel * c, const char *reason, grpc_closure_list * closure_list) +{ + grpc_connectivity_state current = compute_connectivity_locked (c); + grpc_connectivity_state_set (&c->state_tracker, current, reason, closure_list); } /* * grpc_subchannel_call implementation */ -void grpc_subchannel_call_ref( - grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_ref(&c->refs); -} - -void grpc_subchannel_call_unref(grpc_subchannel_call *c, - grpc_closure_list *closure_list - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - if (gpr_unref(&c->refs)) { - gpr_mu *mu = &c->connection->subchannel->mu; - grpc_subchannel *destroy; - grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), closure_list); - gpr_mu_lock(mu); - destroy = CONNECTION_UNREF_LOCKED(c->connection, "call", closure_list); - gpr_mu_unlock(mu); - gpr_free(c); - if (destroy != NULL) { - subchannel_destroy(destroy, closure_list); +void +grpc_subchannel_call_ref (grpc_subchannel_call * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ + gpr_ref (&c->refs); +} + +void +grpc_subchannel_call_unref (grpc_subchannel_call * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) +{ + if (gpr_unref (&c->refs)) + { + gpr_mu *mu = &c->connection->subchannel->mu; + grpc_subchannel *destroy; + grpc_call_stack_destroy (SUBCHANNEL_CALL_TO_CALL_STACK (c), closure_list); + gpr_mu_lock (mu); + destroy = CONNECTION_UNREF_LOCKED (c->connection, "call", closure_list); + gpr_mu_unlock (mu); + gpr_free (c); + if (destroy != NULL) + { + subchannel_destroy (destroy, closure_list); + } } - } } -char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call, - grpc_closure_list *closure_list) { - grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); - grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); - return top_elem->filter->get_peer(top_elem, closure_list); +char * +grpc_subchannel_call_get_peer (grpc_subchannel_call * call, grpc_closure_list * closure_list) +{ + grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK (call); + grpc_call_element *top_elem = grpc_call_stack_element (call_stack, 0); + return top_elem->filter->get_peer (top_elem, closure_list); } -void grpc_subchannel_call_process_op(grpc_subchannel_call *call, - grpc_transport_stream_op *op, - grpc_closure_list *closure_list) { - grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); - grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); - top_elem->filter->start_transport_stream_op(top_elem, op, closure_list); +void +grpc_subchannel_call_process_op (grpc_subchannel_call * call, grpc_transport_stream_op * op, grpc_closure_list * closure_list) +{ + grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK (call); + grpc_call_element *top_elem = grpc_call_stack_element (call_stack, 0); + top_elem->filter->start_transport_stream_op (top_elem, op, closure_list); } -static grpc_subchannel_call *create_call(connection *con, - grpc_closure_list *closure_list) { - grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_subchannel_call *call = - gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); - grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); +static grpc_subchannel_call * +create_call (connection * con, grpc_closure_list * closure_list) +{ + grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION (con); + grpc_subchannel_call *call = gpr_malloc (sizeof (grpc_subchannel_call) + chanstk->call_stack_size); + grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK (call); call->connection = con; - gpr_ref_init(&call->refs, 1); - grpc_call_stack_init(chanstk, NULL, NULL, callstk, closure_list); + gpr_ref_init (&call->refs, 1); + grpc_call_stack_init (chanstk, NULL, NULL, callstk, closure_list); return call; } |