diff options
author | 2015-11-21 08:11:04 -0800 | |
---|---|---|
committer | 2015-11-21 08:11:04 -0800 | |
commit | ab33b488c42c1238b1b2ebbf3bf4f0db61103497 (patch) | |
tree | 564e5d93201837fa31958aa68a51b012eea2eb60 /src/core/client_config/subchannel.c | |
parent | 4f7080c8e00fbc7c53a5f9febfeb5b776d1c4cc2 (diff) |
clang-format, bugfix
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r-- | src/core/client_config/subchannel.c | 117 |
1 files changed, 66 insertions, 51 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 42b056c49e..07a74e250f 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -52,8 +52,9 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ - ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load(&(subchannel)->connected_subchannel))) +#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ + ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \ + &(subchannel)->connected_subchannel))) struct grpc_connected_subchannel { /** refcount */ @@ -152,10 +153,10 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, #define REF_PASS_REASON , reason #define REF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ - (name), (p), (p)->refs, (p)->refs + 1, reason) + (name), (p), (p)->refs.count, (p)->refs.count + 1, reason) #define UNREF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \ - (name), (p), (p)->refs, (p)->refs - 1, reason) + (name), (p), (p)->refs.count, (p)->refs.count - 1, reason) #else #define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p)) #define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p)) @@ -175,23 +176,26 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, * connection implementation */ -static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, + int success) { grpc_connected_subchannel *c = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c)); gpr_free(c); } -void grpc_connected_subchannel_ref(grpc_connected_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_connected_subchannel_ref( + grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { REF_LOG("CONNECTION", c); gpr_ref(&c->refs); } -void grpc_connected_subchannel_unref( - grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { UNREF_LOG("CONNECTION", c); if (gpr_unref(&c->refs)) { - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), 1); + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), + 1); } } @@ -199,7 +203,8 @@ void grpc_connected_subchannel_unref( * grpc_subchannel implementation */ -static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, + int success) { grpc_subchannel *c = arg; grpc_connected_subchannel *con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != NULL) { @@ -214,13 +219,16 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) } void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + REF_LOG("SUBCHANNEL", c); gpr_ref(&c->refs); } void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + UNREF_LOG("SUBCHANNEL", c); if (gpr_unref(&c->refs)) { - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), 1); + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), + 1); } } @@ -276,7 +284,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { args.deadline = compute_connect_deadline(c); args.channel_args = c->args; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_CONNECTING, "state_change"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + GRPC_CHANNEL_CONNECTING, "state_change"); grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result, &c->connected); } @@ -319,11 +328,11 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, } void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_closure *subscribed_notify) { + grpc_subchannel *c, + grpc_closure *subscribed_notify) { gpr_mu_lock(&c->mu); - grpc_connectivity_state_change_unsubscribe( - exec_ctx, &c->state_tracker, subscribed_notify); + grpc_connectivity_state_change_unsubscribe(exec_ctx, &c->state_tracker, + subscribed_notify); gpr_mu_unlock(&c->mu); } @@ -339,7 +348,8 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, } if (op->disconnect) { c->disconnected = 1; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); if (c->have_alarm) { cancel_alarm = 1; } @@ -360,15 +370,16 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, } } -void grpc_connected_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) { +void grpc_connected_subchannel_process_transport_op( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_transport_op *op) { grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_element *top_elem = - grpc_channel_stack_element(channel_stack, 0); + grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0); top_elem->filter->start_transport_op(exec_ctx, top_elem, op); } static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, - int iomgr_success) { + int iomgr_success) { state_watcher *sw = p; grpc_subchannel *c = sw->whom.subchannel; gpr_mu *mu = &c->mu; @@ -377,9 +388,12 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, /* if we failed just leave this closure */ if (iomgr_success) { - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, sw->connectivity_state, "reflect_child"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + sw->connectivity_state, "reflect_child"); if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_connected_subchannel_notify_on_state_change(exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), &sw->connectivity_state, &sw->closure); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), + &sw->connectivity_state, &sw->closure); GRPC_SUBCHANNEL_REF(c, "state_watcher"); sw = NULL; } @@ -390,7 +404,10 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, gpr_free(sw); } -static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) { +static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *con, + grpc_connectivity_state *state, + grpc_closure *closure) { grpc_transport_op op; grpc_channel_element *elem; memset(&op, 0, sizeof(op)); @@ -400,12 +417,16 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connecte elem->filter->start_transport_op(exec_ctx, elem, &op); } -void grpc_connected_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) { +void grpc_connected_subchannel_notify_on_state_change( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_connectivity_state *state, grpc_closure *closure) { GPR_ASSERT(state != NULL); connected_subchannel_state_op(exec_ctx, con, state, closure); } -void grpc_connected_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_closure *closure) { +void grpc_connected_subchannel_state_change_unsubscribe( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_closure *closure) { connected_subchannel_state_op(exec_ctx, con, NULL, closure); } @@ -429,7 +450,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { channel_stack_size = grpc_channel_stack_size(filters, num_filters); con = gpr_malloc(sizeof(grpc_connected_subchannel) + channel_stack_size); stk = (grpc_channel_stack *)(con + 1); - gpr_ref_init(&c->refs, 1); + gpr_ref_init(&con->refs, 1); grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args, stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); @@ -440,7 +461,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { sw_subchannel = gpr_malloc(sizeof(*sw_subchannel)); sw_subchannel->whom.subchannel = c; sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; - grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel); + grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, + sw_subchannel); gpr_mu_lock(&c->mu); @@ -458,28 +480,18 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con)); c->connecting = 0; - /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated + /* setup subchannel watching connected subchannel for changes; subchannel ref + for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_REF(c, "state_watcher"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); - grpc_connected_subchannel_notify_on_state_change(exec_ctx, con, &sw_subchannel->connectivity_state, &sw_subchannel->closure); - -#if 0 - grpc_transport_op op; - grpc_channel_element *elem; - - /* setup connected subchannel watching transport for changes */ - memset(&op, 0, sizeof(op)); - op.connectivity_state = &sw_connected_subchannel->connectivity_state; - op.on_connectivity_state_change = &sw_connected_subchannel->closure; - op.bind_pollset_set = c->pollset_set; - elem = - grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, &op); -#endif + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, con, &sw_subchannel->connectivity_state, + &sw_subchannel->closure); /* signal completion */ - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, "connected"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, + "connected"); gpr_mu_unlock(&c->mu); gpr_free((void *)filters); @@ -559,7 +571,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connect_failed"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "connect_failed"); grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); gpr_mu_unlock(&c->mu); } @@ -623,13 +637,14 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op); } -grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(grpc_subchannel *c) { +grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( + grpc_subchannel *c) { return GET_CONNECTED_SUBCHANNEL(c, acq); } -grpc_subchannel_call *grpc_connected_subchannel_create_call(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *con, - grpc_pollset *pollset) { +grpc_subchannel_call *grpc_connected_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_pollset *pollset) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); |