aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/subchannel.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-11-21 08:11:04 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-11-21 08:11:04 -0800
commitab33b488c42c1238b1b2ebbf3bf4f0db61103497 (patch)
tree564e5d93201837fa31958aa68a51b012eea2eb60 /src/core/client_config/subchannel.c
parent4f7080c8e00fbc7c53a5f9febfeb5b776d1c4cc2 (diff)
clang-format, bugfix
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r--src/core/client_config/subchannel.c117
1 files changed, 66 insertions, 51 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 42b056c49e..07a74e250f 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -52,8 +52,9 @@
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
-#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
- ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load(&(subchannel)->connected_subchannel)))
+#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
+ ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \
+ &(subchannel)->connected_subchannel)))
struct grpc_connected_subchannel {
/** refcount */
@@ -152,10 +153,10 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
#define REF_PASS_REASON , reason
#define REF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
- (name), (p), (p)->refs, (p)->refs + 1, reason)
+ (name), (p), (p)->refs.count, (p)->refs.count + 1, reason)
#define UNREF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
- (name), (p), (p)->refs, (p)->refs - 1, reason)
+ (name), (p), (p)->refs.count, (p)->refs.count - 1, reason)
#else
#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
@@ -175,23 +176,26 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
* connection implementation
*/
-static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
+ int success) {
grpc_connected_subchannel *c = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
}
-void grpc_connected_subchannel_ref(grpc_connected_subchannel *c
- GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+void grpc_connected_subchannel_ref(
+ grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
REF_LOG("CONNECTION", c);
gpr_ref(&c->refs);
}
-void grpc_connected_subchannel_unref(
- grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
UNREF_LOG("CONNECTION", c);
if (gpr_unref(&c->refs)) {
- grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), 1);
+ grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c),
+ 1);
}
}
@@ -199,7 +203,8 @@ void grpc_connected_subchannel_unref(
* grpc_subchannel implementation
*/
-static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
+ int success) {
grpc_subchannel *c = arg;
grpc_connected_subchannel *con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
@@ -214,13 +219,16 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success)
}
void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ REF_LOG("SUBCHANNEL", c);
gpr_ref(&c->refs);
}
void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ UNREF_LOG("SUBCHANNEL", c);
if (gpr_unref(&c->refs)) {
- grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), 1);
+ grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c),
+ 1);
}
}
@@ -276,7 +284,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
args.deadline = compute_connect_deadline(c);
args.channel_args = c->args;
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_CONNECTING, "state_change");
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ GRPC_CHANNEL_CONNECTING, "state_change");
grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
&c->connected);
}
@@ -319,11 +328,11 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
}
void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- grpc_closure *subscribed_notify) {
+ grpc_subchannel *c,
+ grpc_closure *subscribed_notify) {
gpr_mu_lock(&c->mu);
- grpc_connectivity_state_change_unsubscribe(
- exec_ctx, &c->state_tracker, subscribed_notify);
+ grpc_connectivity_state_change_unsubscribe(exec_ctx, &c->state_tracker,
+ subscribed_notify);
gpr_mu_unlock(&c->mu);
}
@@ -339,7 +348,8 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
}
if (op->disconnect) {
c->disconnected = 1;
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
if (c->have_alarm) {
cancel_alarm = 1;
}
@@ -360,15 +370,16 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_connected_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) {
+void grpc_connected_subchannel_process_transport_op(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ grpc_transport_op *op) {
grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_channel_element *top_elem =
- grpc_channel_stack_element(channel_stack, 0);
+ grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0);
top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
}
static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
- int iomgr_success) {
+ int iomgr_success) {
state_watcher *sw = p;
grpc_subchannel *c = sw->whom.subchannel;
gpr_mu *mu = &c->mu;
@@ -377,9 +388,12 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
/* if we failed just leave this closure */
if (iomgr_success) {
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker, sw->connectivity_state, "reflect_child");
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ sw->connectivity_state, "reflect_child");
if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
- grpc_connected_subchannel_notify_on_state_change(exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), &sw->connectivity_state, &sw->closure);
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier),
+ &sw->connectivity_state, &sw->closure);
GRPC_SUBCHANNEL_REF(c, "state_watcher");
sw = NULL;
}
@@ -390,7 +404,10 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
gpr_free(sw);
}
-static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) {
+static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *con,
+ grpc_connectivity_state *state,
+ grpc_closure *closure) {
grpc_transport_op op;
grpc_channel_element *elem;
memset(&op, 0, sizeof(op));
@@ -400,12 +417,16 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connecte
elem->filter->start_transport_op(exec_ctx, elem, &op);
}
-void grpc_connected_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) {
+void grpc_connected_subchannel_notify_on_state_change(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ grpc_connectivity_state *state, grpc_closure *closure) {
GPR_ASSERT(state != NULL);
connected_subchannel_state_op(exec_ctx, con, state, closure);
}
-void grpc_connected_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_closure *closure) {
+void grpc_connected_subchannel_state_change_unsubscribe(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ grpc_closure *closure) {
connected_subchannel_state_op(exec_ctx, con, NULL, closure);
}
@@ -429,7 +450,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
channel_stack_size = grpc_channel_stack_size(filters, num_filters);
con = gpr_malloc(sizeof(grpc_connected_subchannel) + channel_stack_size);
stk = (grpc_channel_stack *)(con + 1);
- gpr_ref_init(&c->refs, 1);
+ gpr_ref_init(&con->refs, 1);
grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args,
stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
@@ -440,7 +461,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
sw_subchannel = gpr_malloc(sizeof(*sw_subchannel));
sw_subchannel->whom.subchannel = c;
sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
- grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel);
+ grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed,
+ sw_subchannel);
gpr_mu_lock(&c->mu);
@@ -458,28 +480,18 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con));
c->connecting = 0;
- /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated
+ /* setup subchannel watching connected subchannel for changes; subchannel ref
+ for connecting is donated
to the state watcher */
GRPC_SUBCHANNEL_REF(c, "state_watcher");
GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
- grpc_connected_subchannel_notify_on_state_change(exec_ctx, con, &sw_subchannel->connectivity_state, &sw_subchannel->closure);
-
-#if 0
- grpc_transport_op op;
- grpc_channel_element *elem;
-
- /* setup connected subchannel watching transport for changes */
- memset(&op, 0, sizeof(op));
- op.connectivity_state = &sw_connected_subchannel->connectivity_state;
- op.on_connectivity_state_change = &sw_connected_subchannel->closure;
- op.bind_pollset_set = c->pollset_set;
- elem =
- grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
-#endif
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, con, &sw_subchannel->connectivity_state,
+ &sw_subchannel->closure);
/* signal completion */
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, "connected");
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY,
+ "connected");
gpr_mu_unlock(&c->mu);
gpr_free((void *)filters);
@@ -559,7 +571,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connect_failed");
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "connect_failed");
grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
gpr_mu_unlock(&c->mu);
}
@@ -623,13 +637,14 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
}
-grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(grpc_subchannel *c) {
+grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
+ grpc_subchannel *c) {
return GET_CONNECTED_SUBCHANNEL(c, acq);
}
-grpc_subchannel_call *grpc_connected_subchannel_create_call(grpc_exec_ctx *exec_ctx,
- grpc_connected_subchannel *con,
- grpc_pollset *pollset) {
+grpc_subchannel_call *grpc_connected_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ grpc_pollset *pollset) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call =
gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);