aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/subchannel.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-22 10:42:19 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-22 10:42:19 -0700
commit45724b35e411fef7c5da66a74c78428c11d56843 (patch)
tree9264034aca675c89444e02f72ef58e67d7043604 /src/core/client_config/subchannel.c
parent298751c1195523ef6228595043b583c3a6270e08 (diff)
indent pass to get logical source lines on one physical line
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r--src/core/client_config/subchannel.c874
1 files changed, 458 insertions, 416 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index d41bf8f566..778d1e8b50 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -50,7 +50,8 @@
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
-typedef struct {
+typedef struct
+{
/* all fields protected by subchannel->mu */
/** refcount */
int refs;
@@ -58,14 +59,16 @@ typedef struct {
grpc_subchannel *subchannel;
} connection;
-typedef struct {
+typedef struct
+{
grpc_closure closure;
size_t version;
grpc_subchannel *subchannel;
grpc_connectivity_state connectivity_state;
} state_watcher;
-typedef struct waiting_for_connect {
+typedef struct waiting_for_connect
+{
struct waiting_for_connect *next;
grpc_closure *notify;
grpc_pollset *pollset;
@@ -74,7 +77,8 @@ typedef struct waiting_for_connect {
grpc_closure continuation;
} waiting_for_connect;
-struct grpc_subchannel {
+struct grpc_subchannel
+{
grpc_connector *connector;
/** non-transport related channel filters */
@@ -135,7 +139,8 @@ struct grpc_subchannel {
gpr_uint32 random;
};
-struct grpc_subchannel_call {
+struct grpc_subchannel_call
+{
connection *connection;
gpr_refcount refs;
};
@@ -143,26 +148,19 @@ struct grpc_subchannel_call {
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
-static grpc_subchannel_call *create_call(connection *con,
- grpc_closure_list *closure_list);
-static void connectivity_state_changed_locked(grpc_subchannel *c,
- const char *reason,
- grpc_closure_list *closure_list);
-static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
-static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
-static void subchannel_connected(void *subchannel, int iomgr_success,
- grpc_closure_list *closure_list);
-
-static void subchannel_ref_locked(
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-static int subchannel_unref_locked(
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
-static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-static grpc_subchannel *connection_unref_locked(
- connection *c, grpc_closure_list *closure_list
- GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
-static void subchannel_destroy(grpc_subchannel *c,
- grpc_closure_list *closure_list);
+static grpc_subchannel_call *create_call (connection * con, grpc_closure_list * closure_list);
+static void connectivity_state_changed_locked (grpc_subchannel * c, const char *reason, grpc_closure_list * closure_list);
+static grpc_connectivity_state compute_connectivity_locked (grpc_subchannel * c);
+static gpr_timespec compute_connect_deadline (grpc_subchannel * c);
+static void subchannel_connected (void *subchannel, int iomgr_success, grpc_closure_list * closure_list);
+
+static void subchannel_ref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+static int
+subchannel_unref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
+ GRPC_MUST_USE_RESULT;
+ static void connection_ref_locked (connection * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+ static grpc_subchannel *connection_unref_locked (connection * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
+ static void subchannel_destroy (grpc_subchannel * c, grpc_closure_list * closure_list);
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define SUBCHANNEL_REF_LOCKED(p, r) \
@@ -198,30 +196,34 @@ static void subchannel_destroy(grpc_subchannel *c,
* connection implementation
*/
-static void connection_destroy(connection *c, grpc_closure_list *closure_list) {
- GPR_ASSERT(c->refs == 0);
- grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c), closure_list);
- gpr_free(c);
+ static void connection_destroy (connection * c, grpc_closure_list * closure_list)
+{
+ GPR_ASSERT (c->refs == 0);
+ grpc_channel_stack_destroy (CHANNEL_STACK_FROM_CONNECTION (c), closure_list);
+ gpr_free (c);
}
-static void connection_ref_locked(
- connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- REF_LOG("CONNECTION", c);
- subchannel_ref_locked(c->subchannel REF_PASS_ARGS);
+static void
+connection_ref_locked (connection * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
+{
+ REF_LOG ("CONNECTION", c);
+ subchannel_ref_locked (c->subchannel REF_PASS_ARGS);
++c->refs;
}
-static grpc_subchannel *connection_unref_locked(
- connection *c,
- grpc_closure_list *closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+static grpc_subchannel *
+connection_unref_locked (connection * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
+{
grpc_subchannel *destroy = NULL;
- UNREF_LOG("CONNECTION", c);
- if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
- destroy = c->subchannel;
- }
- if (--c->refs == 0 && c->subchannel->active != c) {
- connection_destroy(c, closure_list);
- }
+ UNREF_LOG ("CONNECTION", c);
+ if (subchannel_unref_locked (c->subchannel REF_PASS_ARGS))
+ {
+ destroy = c->subchannel;
+ }
+ if (--c->refs == 0 && c->subchannel->active != c)
+ {
+ connection_destroy (c, closure_list);
+ }
return destroy;
}
@@ -229,241 +231,261 @@ static grpc_subchannel *connection_unref_locked(
* grpc_subchannel implementation
*/
-static void subchannel_ref_locked(
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- REF_LOG("SUBCHANNEL", c);
+static void
+subchannel_ref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
+{
+ REF_LOG ("SUBCHANNEL", c);
++c->refs;
}
-static int subchannel_unref_locked(
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- UNREF_LOG("SUBCHANNEL", c);
+static int
+subchannel_unref_locked (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
+{
+ UNREF_LOG ("SUBCHANNEL", c);
return --c->refs == 0;
}
-void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- gpr_mu_lock(&c->mu);
- subchannel_ref_locked(c REF_PASS_ARGS);
- gpr_mu_unlock(&c->mu);
+void
+grpc_subchannel_ref (grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
+{
+ gpr_mu_lock (&c->mu);
+ subchannel_ref_locked (c REF_PASS_ARGS);
+ gpr_mu_unlock (&c->mu);
}
-void grpc_subchannel_unref(grpc_subchannel *c,
- grpc_closure_list *closure_list
- GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+void
+grpc_subchannel_unref (grpc_subchannel * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
+{
int destroy;
- gpr_mu_lock(&c->mu);
- destroy = subchannel_unref_locked(c REF_PASS_ARGS);
- gpr_mu_unlock(&c->mu);
- if (destroy) subchannel_destroy(c, closure_list);
-}
-
-static void subchannel_destroy(grpc_subchannel *c,
- grpc_closure_list *closure_list) {
- if (c->active != NULL) {
- connection_destroy(c->active, closure_list);
- }
- gpr_free(c->filters);
- grpc_channel_args_destroy(c->args);
- gpr_free(c->addr);
- grpc_mdctx_unref(c->mdctx);
- grpc_connectivity_state_destroy(&c->state_tracker, closure_list);
- grpc_connector_unref(c->connector, closure_list);
- gpr_free(c);
+ gpr_mu_lock (&c->mu);
+ destroy = subchannel_unref_locked (c REF_PASS_ARGS);
+ gpr_mu_unlock (&c->mu);
+ if (destroy)
+ subchannel_destroy (c, closure_list);
+}
+
+static void
+subchannel_destroy (grpc_subchannel * c, grpc_closure_list * closure_list)
+{
+ if (c->active != NULL)
+ {
+ connection_destroy (c->active, closure_list);
+ }
+ gpr_free (c->filters);
+ grpc_channel_args_destroy (c->args);
+ gpr_free (c->addr);
+ grpc_mdctx_unref (c->mdctx);
+ grpc_connectivity_state_destroy (&c->state_tracker, closure_list);
+ grpc_connector_unref (c->connector, closure_list);
+ gpr_free (c);
}
-void grpc_subchannel_add_interested_party(grpc_subchannel *c,
- grpc_pollset *pollset,
- grpc_closure_list *closure_list) {
- grpc_pollset_set_add_pollset(c->pollset_set, pollset, closure_list);
+void
+grpc_subchannel_add_interested_party (grpc_subchannel * c, grpc_pollset * pollset, grpc_closure_list * closure_list)
+{
+ grpc_pollset_set_add_pollset (c->pollset_set, pollset, closure_list);
}
-void grpc_subchannel_del_interested_party(grpc_subchannel *c,
- grpc_pollset *pollset,
- grpc_closure_list *closure_list) {
- grpc_pollset_set_del_pollset(c->pollset_set, pollset, closure_list);
+void
+grpc_subchannel_del_interested_party (grpc_subchannel * c, grpc_pollset * pollset, grpc_closure_list * closure_list)
+{
+ grpc_pollset_set_del_pollset (c->pollset_set, pollset, closure_list);
}
-static gpr_uint32 random_seed() {
- return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC)));
+static gpr_uint32
+random_seed ()
+{
+ return (gpr_uint32) (gpr_time_to_millis (gpr_now (GPR_CLOCK_MONOTONIC)));
}
-grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
- grpc_subchannel_args *args) {
- grpc_subchannel *c = gpr_malloc(sizeof(*c));
- grpc_channel_element *parent_elem = grpc_channel_stack_last_element(
- grpc_channel_get_channel_stack(args->master));
- memset(c, 0, sizeof(*c));
+grpc_subchannel *
+grpc_subchannel_create (grpc_connector * connector, grpc_subchannel_args * args)
+{
+ grpc_subchannel *c = gpr_malloc (sizeof (*c));
+ grpc_channel_element *parent_elem = grpc_channel_stack_last_element (grpc_channel_get_channel_stack (args->master));
+ memset (c, 0, sizeof (*c));
c->refs = 1;
c->connector = connector;
- grpc_connector_ref(c->connector);
+ grpc_connector_ref (c->connector);
c->num_filters = args->filter_count;
- c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
- memcpy(c->filters, args->filters,
- sizeof(grpc_channel_filter *) * c->num_filters);
- c->addr = gpr_malloc(args->addr_len);
- memcpy(c->addr, args->addr, args->addr_len);
+ c->filters = gpr_malloc (sizeof (grpc_channel_filter *) * c->num_filters);
+ memcpy (c->filters, args->filters, sizeof (grpc_channel_filter *) * c->num_filters);
+ c->addr = gpr_malloc (args->addr_len);
+ memcpy (c->addr, args->addr, args->addr_len);
c->addr_len = args->addr_len;
- c->args = grpc_channel_args_copy(args->args);
+ c->args = grpc_channel_args_copy (args->args);
c->mdctx = args->mdctx;
c->master = args->master;
- c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
- c->random = random_seed();
- grpc_mdctx_ref(c->mdctx);
- grpc_closure_init(&c->connected, subchannel_connected, c);
- grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
- "subchannel");
- gpr_mu_init(&c->mu);
+ c->pollset_set = grpc_client_channel_get_connecting_pollset_set (parent_elem);
+ c->random = random_seed ();
+ grpc_mdctx_ref (c->mdctx);
+ grpc_closure_init (&c->connected, subchannel_connected, c);
+ grpc_connectivity_state_init (&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel");
+ gpr_mu_init (&c->mu);
return c;
}
-static void continue_connect(grpc_subchannel *c,
- grpc_closure_list *closure_list) {
+static void
+continue_connect (grpc_subchannel * c, grpc_closure_list * closure_list)
+{
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
args.addr = c->addr;
args.addr_len = c->addr_len;
- args.deadline = compute_connect_deadline(c);
+ args.deadline = compute_connect_deadline (c);
args.channel_args = c->args;
- grpc_connector_connect(c->connector, &args, &c->connecting_result,
- &c->connected, closure_list);
+ grpc_connector_connect (c->connector, &args, &c->connecting_result, &c->connected, closure_list);
}
-static void start_connect(grpc_subchannel *c, grpc_closure_list *closure_list) {
- c->backoff_delta = gpr_time_from_seconds(
- GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
- c->next_attempt =
- gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
- continue_connect(c, closure_list);
+static void
+start_connect (grpc_subchannel * c, grpc_closure_list * closure_list)
+{
+ c->backoff_delta = gpr_time_from_seconds (GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
+ c->next_attempt = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), c->backoff_delta);
+ continue_connect (c, closure_list);
}
-static void continue_creating_call(void *arg, int iomgr_success,
- grpc_closure_list *closure_list) {
+static void
+continue_creating_call (void *arg, int iomgr_success, grpc_closure_list * closure_list)
+{
waiting_for_connect *w4c = arg;
- grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset,
- closure_list);
- grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
- w4c->notify, closure_list);
- GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect", closure_list);
- gpr_free(w4c);
-}
-
-void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
- grpc_subchannel_call **target,
- grpc_closure *notify,
- grpc_closure_list *closure_list) {
- connection *con;
- gpr_mu_lock(&c->mu);
- if (c->active != NULL) {
- con = c->active;
- CONNECTION_REF_LOCKED(con, "call");
- gpr_mu_unlock(&c->mu);
-
- *target = create_call(con, closure_list);
- notify->cb(notify->cb_arg, 1, closure_list);
- } else {
- waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
- w4c->next = c->waiting;
- w4c->notify = notify;
- w4c->pollset = pollset;
- w4c->target = target;
- w4c->subchannel = c;
- /* released when clearing w4c */
- SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
- grpc_closure_init(&w4c->continuation, continue_creating_call, w4c);
- c->waiting = w4c;
- grpc_subchannel_add_interested_party(c, pollset, closure_list);
- if (!c->connecting) {
- c->connecting = 1;
- connectivity_state_changed_locked(c, "create_call", closure_list);
- /* released by connection */
- SUBCHANNEL_REF_LOCKED(c, "connecting");
- GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
- gpr_mu_unlock(&c->mu);
+ grpc_subchannel_del_interested_party (w4c->subchannel, w4c->pollset, closure_list);
+ grpc_subchannel_create_call (w4c->subchannel, w4c->pollset, w4c->target, w4c->notify, closure_list);
+ GRPC_SUBCHANNEL_UNREF (w4c->subchannel, "waiting_for_connect", closure_list);
+ gpr_free (w4c);
+}
- start_connect(c, closure_list);
- } else {
- gpr_mu_unlock(&c->mu);
+void
+grpc_subchannel_create_call (grpc_subchannel * c, grpc_pollset * pollset, grpc_subchannel_call ** target, grpc_closure * notify, grpc_closure_list * closure_list)
+{
+ connection *con;
+ gpr_mu_lock (&c->mu);
+ if (c->active != NULL)
+ {
+ con = c->active;
+ CONNECTION_REF_LOCKED (con, "call");
+ gpr_mu_unlock (&c->mu);
+
+ *target = create_call (con, closure_list);
+ notify->cb (notify->cb_arg, 1, closure_list);
+ }
+ else
+ {
+ waiting_for_connect *w4c = gpr_malloc (sizeof (*w4c));
+ w4c->next = c->waiting;
+ w4c->notify = notify;
+ w4c->pollset = pollset;
+ w4c->target = target;
+ w4c->subchannel = c;
+ /* released when clearing w4c */
+ SUBCHANNEL_REF_LOCKED (c, "waiting_for_connect");
+ grpc_closure_init (&w4c->continuation, continue_creating_call, w4c);
+ c->waiting = w4c;
+ grpc_subchannel_add_interested_party (c, pollset, closure_list);
+ if (!c->connecting)
+ {
+ c->connecting = 1;
+ connectivity_state_changed_locked (c, "create_call", closure_list);
+ /* released by connection */
+ SUBCHANNEL_REF_LOCKED (c, "connecting");
+ GRPC_CHANNEL_INTERNAL_REF (c->master, "connecting");
+ gpr_mu_unlock (&c->mu);
+
+ start_connect (c, closure_list);
+ }
+ else
+ {
+ gpr_mu_unlock (&c->mu);
+ }
}
- }
}
-grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
+grpc_connectivity_state
+grpc_subchannel_check_connectivity (grpc_subchannel * c)
+{
grpc_connectivity_state state;
- gpr_mu_lock(&c->mu);
- state = grpc_connectivity_state_check(&c->state_tracker);
- gpr_mu_unlock(&c->mu);
+ gpr_mu_lock (&c->mu);
+ state = grpc_connectivity_state_check (&c->state_tracker);
+ gpr_mu_unlock (&c->mu);
return state;
}
-void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
- grpc_connectivity_state *state,
- grpc_closure *notify,
- grpc_closure_list *closure_list) {
+void
+grpc_subchannel_notify_on_state_change (grpc_subchannel * c, grpc_connectivity_state * state, grpc_closure * notify, grpc_closure_list * closure_list)
+{
int do_connect = 0;
- gpr_mu_lock(&c->mu);
- if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
- notify, closure_list)) {
- do_connect = 1;
- c->connecting = 1;
- /* released by connection */
- SUBCHANNEL_REF_LOCKED(c, "connecting");
- GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
- connectivity_state_changed_locked(c, "state_change", closure_list);
- }
- gpr_mu_unlock(&c->mu);
-
- if (do_connect) {
- start_connect(c, closure_list);
- }
-}
-
-void grpc_subchannel_process_transport_op(grpc_subchannel *c,
- grpc_transport_op *op,
- grpc_closure_list *closure_list) {
+ gpr_mu_lock (&c->mu);
+ if (grpc_connectivity_state_notify_on_state_change (&c->state_tracker, state, notify, closure_list))
+ {
+ do_connect = 1;
+ c->connecting = 1;
+ /* released by connection */
+ SUBCHANNEL_REF_LOCKED (c, "connecting");
+ GRPC_CHANNEL_INTERNAL_REF (c->master, "connecting");
+ connectivity_state_changed_locked (c, "state_change", closure_list);
+ }
+ gpr_mu_unlock (&c->mu);
+
+ if (do_connect)
+ {
+ start_connect (c, closure_list);
+ }
+}
+
+void
+grpc_subchannel_process_transport_op (grpc_subchannel * c, grpc_transport_op * op, grpc_closure_list * closure_list)
+{
connection *con = NULL;
grpc_subchannel *destroy;
int cancel_alarm = 0;
- gpr_mu_lock(&c->mu);
- if (c->active != NULL) {
- con = c->active;
- CONNECTION_REF_LOCKED(con, "transport-op");
- }
- if (op->disconnect) {
- c->disconnected = 1;
- connectivity_state_changed_locked(c, "disconnect", closure_list);
- if (c->have_alarm) {
- cancel_alarm = 1;
+ gpr_mu_lock (&c->mu);
+ if (c->active != NULL)
+ {
+ con = c->active;
+ CONNECTION_REF_LOCKED (con, "transport-op");
+ }
+ if (op->disconnect)
+ {
+ c->disconnected = 1;
+ connectivity_state_changed_locked (c, "disconnect", closure_list);
+ if (c->have_alarm)
+ {
+ cancel_alarm = 1;
+ }
}
- }
- gpr_mu_unlock(&c->mu);
-
- if (con != NULL) {
- grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_channel_element *top_elem =
- grpc_channel_stack_element(channel_stack, 0);
- top_elem->filter->start_transport_op(top_elem, op, closure_list);
-
- gpr_mu_lock(&c->mu);
- destroy = CONNECTION_UNREF_LOCKED(con, "transport-op", closure_list);
- gpr_mu_unlock(&c->mu);
- if (destroy) {
- subchannel_destroy(destroy, closure_list);
+ gpr_mu_unlock (&c->mu);
+
+ if (con != NULL)
+ {
+ grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION (con);
+ grpc_channel_element *top_elem = grpc_channel_stack_element (channel_stack, 0);
+ top_elem->filter->start_transport_op (top_elem, op, closure_list);
+
+ gpr_mu_lock (&c->mu);
+ destroy = CONNECTION_UNREF_LOCKED (con, "transport-op", closure_list);
+ gpr_mu_unlock (&c->mu);
+ if (destroy)
+ {
+ subchannel_destroy (destroy, closure_list);
+ }
}
- }
- if (cancel_alarm) {
- grpc_alarm_cancel(&c->alarm, closure_list);
- }
+ if (cancel_alarm)
+ {
+ grpc_alarm_cancel (&c->alarm, closure_list);
+ }
- if (op->disconnect) {
- grpc_connector_shutdown(c->connector, closure_list);
- }
+ if (op->disconnect)
+ {
+ grpc_connector_shutdown (c->connector, closure_list);
+ }
}
-static void on_state_changed(void *p, int iomgr_success,
- grpc_closure_list *closure_list) {
+static void
+on_state_changed (void *p, int iomgr_success, grpc_closure_list * closure_list)
+{
state_watcher *sw = p;
grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu;
@@ -472,57 +494,59 @@ static void on_state_changed(void *p, int iomgr_success,
grpc_channel_element *elem;
connection *destroy_connection = NULL;
- gpr_mu_lock(mu);
+ gpr_mu_lock (mu);
/* if we failed or there is a version number mismatch, just leave
this closure */
- if (!iomgr_success || sw->subchannel->active_version != sw->version) {
- goto done;
- }
+ if (!iomgr_success || sw->subchannel->active_version != sw->version)
+ {
+ goto done;
+ }
- switch (sw->connectivity_state) {
+ switch (sw->connectivity_state)
+ {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_READY:
case GRPC_CHANNEL_IDLE:
/* all is still good: keep watching */
- memset(&op, 0, sizeof(op));
+ memset (&op, 0, sizeof (op));
op.connectivity_state = &sw->connectivity_state;
op.on_connectivity_state_change = &sw->closure;
- elem = grpc_channel_stack_element(
- CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
- elem->filter->start_transport_op(elem, &op, closure_list);
+ elem = grpc_channel_stack_element (CHANNEL_STACK_FROM_CONNECTION (c->active), 0);
+ elem->filter->start_transport_op (elem, &op, closure_list);
/* early out */
- gpr_mu_unlock(mu);
+ gpr_mu_unlock (mu);
return;
case GRPC_CHANNEL_FATAL_FAILURE:
case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* things have gone wrong, deactivate and enter idle */
- if (sw->subchannel->active->refs == 0) {
- destroy_connection = sw->subchannel->active;
- }
+ if (sw->subchannel->active->refs == 0)
+ {
+ destroy_connection = sw->subchannel->active;
+ }
sw->subchannel->active = NULL;
- grpc_connectivity_state_set(
- &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE
- : GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connection_failed", closure_list);
+ grpc_connectivity_state_set (&c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE : GRPC_CHANNEL_TRANSIENT_FAILURE, "connection_failed", closure_list);
break;
- }
+ }
done:
- connectivity_state_changed_locked(c, "transport_state_changed", closure_list);
- destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
- gpr_free(sw);
- gpr_mu_unlock(mu);
- if (destroy) {
- subchannel_destroy(c, closure_list);
- }
- if (destroy_connection != NULL) {
- connection_destroy(destroy_connection, closure_list);
- }
-}
-
-static void publish_transport(grpc_subchannel *c,
- grpc_closure_list *closure_list) {
+ connectivity_state_changed_locked (c, "transport_state_changed", closure_list);
+ destroy = SUBCHANNEL_UNREF_LOCKED (c, "state_watcher");
+ gpr_free (sw);
+ gpr_mu_unlock (mu);
+ if (destroy)
+ {
+ subchannel_destroy (c, closure_list);
+ }
+ if (destroy_connection != NULL)
+ {
+ connection_destroy (destroy_connection, closure_list);
+ }
+}
+
+static void
+publish_transport (grpc_subchannel * c, grpc_closure_list * closure_list)
+{
size_t channel_stack_size;
connection *con;
grpc_channel_stack *stk;
@@ -536,46 +560,46 @@ static void publish_transport(grpc_subchannel *c,
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
- filters = gpr_malloc(sizeof(*filters) * num_filters);
- memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
- memcpy(filters + c->num_filters, c->connecting_result.filters,
- sizeof(*filters) * c->connecting_result.num_filters);
+ filters = gpr_malloc (sizeof (*filters) * num_filters);
+ memcpy (filters, c->filters, sizeof (*filters) * c->num_filters);
+ memcpy (filters + c->num_filters, c->connecting_result.filters, sizeof (*filters) * c->connecting_result.num_filters);
filters[num_filters - 1] = &grpc_connected_channel_filter;
/* construct channel stack */
- channel_stack_size = grpc_channel_stack_size(filters, num_filters);
- con = gpr_malloc(sizeof(connection) + channel_stack_size);
- stk = (grpc_channel_stack *)(con + 1);
+ channel_stack_size = grpc_channel_stack_size (filters, num_filters);
+ con = gpr_malloc (sizeof (connection) + channel_stack_size);
+ stk = (grpc_channel_stack *) (con + 1);
con->refs = 0;
con->subchannel = c;
- grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx,
- stk, closure_list);
- grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
- gpr_free(c->connecting_result.filters);
- memset(&c->connecting_result, 0, sizeof(c->connecting_result));
+ grpc_channel_stack_init (filters, num_filters, c->master, c->args, c->mdctx, stk, closure_list);
+ grpc_connected_channel_bind_transport (stk, c->connecting_result.transport);
+ gpr_free (c->connecting_result.filters);
+ memset (&c->connecting_result, 0, sizeof (c->connecting_result));
/* initialize state watcher */
- sw = gpr_malloc(sizeof(*sw));
- grpc_closure_init(&sw->closure, on_state_changed, sw);
+ sw = gpr_malloc (sizeof (*sw));
+ grpc_closure_init (&sw->closure, on_state_changed, sw);
sw->subchannel = c;
sw->connectivity_state = GRPC_CHANNEL_READY;
- gpr_mu_lock(&c->mu);
+ gpr_mu_lock (&c->mu);
- if (c->disconnected) {
- gpr_mu_unlock(&c->mu);
- gpr_free(sw);
- gpr_free(filters);
- grpc_channel_stack_destroy(stk, closure_list);
- GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list);
- GRPC_SUBCHANNEL_UNREF(c, "connecting", closure_list);
- return;
- }
+ if (c->disconnected)
+ {
+ gpr_mu_unlock (&c->mu);
+ gpr_free (sw);
+ gpr_free (filters);
+ grpc_channel_stack_destroy (stk, closure_list);
+ GRPC_CHANNEL_INTERNAL_UNREF (c->master, "connecting", closure_list);
+ GRPC_SUBCHANNEL_UNREF (c, "connecting", closure_list);
+ return;
+ }
/* publish */
- if (c->active != NULL && c->active->refs == 0) {
- destroy_connection = c->active;
- }
+ if (c->active != NULL && c->active->refs == 0)
+ {
+ destroy_connection = c->active;
+ }
c->active = con;
c->active_version++;
sw->version = c->active_version;
@@ -583,184 +607,202 @@ static void publish_transport(grpc_subchannel *c,
/* watch for changes; subchannel ref for connecting is donated
to the state watcher */
- memset(&op, 0, sizeof(op));
+ memset (&op, 0, sizeof (op));
op.connectivity_state = &sw->connectivity_state;
op.on_connectivity_state_change = &sw->closure;
op.bind_pollset_set = c->pollset_set;
- SUBCHANNEL_REF_LOCKED(c, "state_watcher");
- GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list);
- GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
- elem =
- grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
- elem->filter->start_transport_op(elem, &op, closure_list);
+ SUBCHANNEL_REF_LOCKED (c, "state_watcher");
+ GRPC_CHANNEL_INTERNAL_UNREF (c->master, "connecting", closure_list);
+ GPR_ASSERT (!SUBCHANNEL_UNREF_LOCKED (c, "connecting"));
+ elem = grpc_channel_stack_element (CHANNEL_STACK_FROM_CONNECTION (c->active), 0);
+ elem->filter->start_transport_op (elem, &op, closure_list);
/* signal completion */
- connectivity_state_changed_locked(c, "connected", closure_list);
+ connectivity_state_changed_locked (c, "connected", closure_list);
w4c = c->waiting;
c->waiting = NULL;
- gpr_mu_unlock(&c->mu);
+ gpr_mu_unlock (&c->mu);
- while (w4c != NULL) {
- waiting_for_connect *next = w4c->next;
- grpc_closure_list_add(closure_list, &w4c->continuation, 1);
- w4c = next;
- }
+ while (w4c != NULL)
+ {
+ waiting_for_connect *next = w4c->next;
+ grpc_closure_list_add (closure_list, &w4c->continuation, 1);
+ w4c = next;
+ }
- gpr_free(filters);
+ gpr_free (filters);
- if (destroy_connection != NULL) {
- connection_destroy(destroy_connection, closure_list);
- }
+ if (destroy_connection != NULL)
+ {
+ connection_destroy (destroy_connection, closure_list);
+ }
}
/* Generate a random number between 0 and 1. */
-static double generate_uniform_random_number(grpc_subchannel *c) {
- c->random = (1103515245 * c->random + 12345) % ((gpr_uint32)1 << 31);
- return c->random / (double)((gpr_uint32)1 << 31);
+static double
+generate_uniform_random_number (grpc_subchannel * c)
+{
+ c->random = (1103515245 * c->random + 12345) % ((gpr_uint32) 1 << 31);
+ return c->random / (double) ((gpr_uint32) 1 << 31);
}
/* Update backoff_delta and next_attempt in subchannel */
-static void update_reconnect_parameters(grpc_subchannel *c) {
+static void
+update_reconnect_parameters (grpc_subchannel * c)
+{
gpr_int32 backoff_delta_millis, jitter;
- gpr_int32 max_backoff_millis =
- GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
+ gpr_int32 max_backoff_millis = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
double jitter_range;
- backoff_delta_millis =
- (gpr_int32)(gpr_time_to_millis(c->backoff_delta) *
- GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER);
- if (backoff_delta_millis > max_backoff_millis) {
- backoff_delta_millis = max_backoff_millis;
- }
- c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN);
- c->next_attempt =
- gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
+ backoff_delta_millis = (gpr_int32) (gpr_time_to_millis (c->backoff_delta) * GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER);
+ if (backoff_delta_millis > max_backoff_millis)
+ {
+ backoff_delta_millis = max_backoff_millis;
+ }
+ c->backoff_delta = gpr_time_from_millis (backoff_delta_millis, GPR_TIMESPAN);
+ c->next_attempt = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC), c->backoff_delta);
jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis;
- jitter =
- (gpr_int32)((2 * generate_uniform_random_number(c) - 1) * jitter_range);
- c->next_attempt =
- gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
+ jitter = (gpr_int32) ((2 * generate_uniform_random_number (c) - 1) * jitter_range);
+ c->next_attempt = gpr_time_add (c->next_attempt, gpr_time_from_millis (jitter, GPR_TIMESPAN));
}
-static void on_alarm(void *arg, int iomgr_success,
- grpc_closure_list *closure_list) {
+static void
+on_alarm (void *arg, int iomgr_success, grpc_closure_list * closure_list)
+{
grpc_subchannel *c = arg;
- gpr_mu_lock(&c->mu);
+ gpr_mu_lock (&c->mu);
c->have_alarm = 0;
- if (c->disconnected) {
- iomgr_success = 0;
- }
- connectivity_state_changed_locked(c, "alarm", closure_list);
- gpr_mu_unlock(&c->mu);
- if (iomgr_success) {
- update_reconnect_parameters(c);
- continue_connect(c, closure_list);
- } else {
- GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list);
- GRPC_SUBCHANNEL_UNREF(c, "connecting", closure_list);
- }
-}
-
-static void subchannel_connected(void *arg, int iomgr_success,
- grpc_closure_list *closure_list) {
+ if (c->disconnected)
+ {
+ iomgr_success = 0;
+ }
+ connectivity_state_changed_locked (c, "alarm", closure_list);
+ gpr_mu_unlock (&c->mu);
+ if (iomgr_success)
+ {
+ update_reconnect_parameters (c);
+ continue_connect (c, closure_list);
+ }
+ else
+ {
+ GRPC_CHANNEL_INTERNAL_UNREF (c->master, "connecting", closure_list);
+ GRPC_SUBCHANNEL_UNREF (c, "connecting", closure_list);
+ }
+}
+
+static void
+subchannel_connected (void *arg, int iomgr_success, grpc_closure_list * closure_list)
+{
grpc_subchannel *c = arg;
- if (c->connecting_result.transport != NULL) {
- publish_transport(c, closure_list);
- } else {
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- gpr_mu_lock(&c->mu);
- GPR_ASSERT(!c->have_alarm);
- c->have_alarm = 1;
- connectivity_state_changed_locked(c, "connect_failed", closure_list);
- grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now, closure_list);
- gpr_mu_unlock(&c->mu);
- }
-}
-
-static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
- gpr_timespec current_deadline =
- gpr_time_add(c->next_attempt, c->backoff_delta);
- gpr_timespec min_deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS,
- GPR_TIMESPAN));
- return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline
- : min_deadline;
-}
-
-static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
- if (c->disconnected) {
- return GRPC_CHANNEL_FATAL_FAILURE;
- }
- if (c->connecting) {
- if (c->have_alarm) {
- return GRPC_CHANNEL_TRANSIENT_FAILURE;
+ if (c->connecting_result.transport != NULL)
+ {
+ publish_transport (c, closure_list);
+ }
+ else
+ {
+ gpr_timespec now = gpr_now (GPR_CLOCK_MONOTONIC);
+ gpr_mu_lock (&c->mu);
+ GPR_ASSERT (!c->have_alarm);
+ c->have_alarm = 1;
+ connectivity_state_changed_locked (c, "connect_failed", closure_list);
+ grpc_alarm_init (&c->alarm, c->next_attempt, on_alarm, c, now, closure_list);
+ gpr_mu_unlock (&c->mu);
+ }
+}
+
+static gpr_timespec
+compute_connect_deadline (grpc_subchannel * c)
+{
+ gpr_timespec current_deadline = gpr_time_add (c->next_attempt, c->backoff_delta);
+ gpr_timespec min_deadline = gpr_time_add (gpr_now (GPR_CLOCK_MONOTONIC),
+ gpr_time_from_seconds (GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS,
+ GPR_TIMESPAN));
+ return gpr_time_cmp (current_deadline, min_deadline) > 0 ? current_deadline : min_deadline;
+}
+
+static grpc_connectivity_state
+compute_connectivity_locked (grpc_subchannel * c)
+{
+ if (c->disconnected)
+ {
+ return GRPC_CHANNEL_FATAL_FAILURE;
+ }
+ if (c->connecting)
+ {
+ if (c->have_alarm)
+ {
+ return GRPC_CHANNEL_TRANSIENT_FAILURE;
+ }
+ return GRPC_CHANNEL_CONNECTING;
+ }
+ if (c->active)
+ {
+ return GRPC_CHANNEL_READY;
}
- return GRPC_CHANNEL_CONNECTING;
- }
- if (c->active) {
- return GRPC_CHANNEL_READY;
- }
return GRPC_CHANNEL_IDLE;
}
-static void connectivity_state_changed_locked(grpc_subchannel *c,
- const char *reason,
- grpc_closure_list *closure_list) {
- grpc_connectivity_state current = compute_connectivity_locked(c);
- grpc_connectivity_state_set(&c->state_tracker, current, reason, closure_list);
+static void
+connectivity_state_changed_locked (grpc_subchannel * c, const char *reason, grpc_closure_list * closure_list)
+{
+ grpc_connectivity_state current = compute_connectivity_locked (c);
+ grpc_connectivity_state_set (&c->state_tracker, current, reason, closure_list);
}
/*
* grpc_subchannel_call implementation
*/
-void grpc_subchannel_call_ref(
- grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- gpr_ref(&c->refs);
-}
-
-void grpc_subchannel_call_unref(grpc_subchannel_call *c,
- grpc_closure_list *closure_list
- GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- if (gpr_unref(&c->refs)) {
- gpr_mu *mu = &c->connection->subchannel->mu;
- grpc_subchannel *destroy;
- grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), closure_list);
- gpr_mu_lock(mu);
- destroy = CONNECTION_UNREF_LOCKED(c->connection, "call", closure_list);
- gpr_mu_unlock(mu);
- gpr_free(c);
- if (destroy != NULL) {
- subchannel_destroy(destroy, closure_list);
+void
+grpc_subchannel_call_ref (grpc_subchannel_call * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
+{
+ gpr_ref (&c->refs);
+}
+
+void
+grpc_subchannel_call_unref (grpc_subchannel_call * c, grpc_closure_list * closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
+{
+ if (gpr_unref (&c->refs))
+ {
+ gpr_mu *mu = &c->connection->subchannel->mu;
+ grpc_subchannel *destroy;
+ grpc_call_stack_destroy (SUBCHANNEL_CALL_TO_CALL_STACK (c), closure_list);
+ gpr_mu_lock (mu);
+ destroy = CONNECTION_UNREF_LOCKED (c->connection, "call", closure_list);
+ gpr_mu_unlock (mu);
+ gpr_free (c);
+ if (destroy != NULL)
+ {
+ subchannel_destroy (destroy, closure_list);
+ }
}
- }
}
-char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call,
- grpc_closure_list *closure_list) {
- grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
- grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
- return top_elem->filter->get_peer(top_elem, closure_list);
+char *
+grpc_subchannel_call_get_peer (grpc_subchannel_call * call, grpc_closure_list * closure_list)
+{
+ grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK (call);
+ grpc_call_element *top_elem = grpc_call_stack_element (call_stack, 0);
+ return top_elem->filter->get_peer (top_elem, closure_list);
}
-void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
- grpc_transport_stream_op *op,
- grpc_closure_list *closure_list) {
- grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
- grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
- top_elem->filter->start_transport_stream_op(top_elem, op, closure_list);
+void
+grpc_subchannel_call_process_op (grpc_subchannel_call * call, grpc_transport_stream_op * op, grpc_closure_list * closure_list)
+{
+ grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK (call);
+ grpc_call_element *top_elem = grpc_call_stack_element (call_stack, 0);
+ top_elem->filter->start_transport_stream_op (top_elem, op, closure_list);
}
-static grpc_subchannel_call *create_call(connection *con,
- grpc_closure_list *closure_list) {
- grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_subchannel_call *call =
- gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
- grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
+static grpc_subchannel_call *
+create_call (connection * con, grpc_closure_list * closure_list)
+{
+ grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION (con);
+ grpc_subchannel_call *call = gpr_malloc (sizeof (grpc_subchannel_call) + chanstk->call_stack_size);
+ grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK (call);
call->connection = con;
- gpr_ref_init(&call->refs, 1);
- grpc_call_stack_init(chanstk, NULL, NULL, callstk, closure_list);
+ gpr_ref_init (&call->refs, 1);
+ grpc_call_stack_init (chanstk, NULL, NULL, callstk, closure_list);
return call;
}