diff options
-rw-r--r-- | src/core/census/grpc_filter.c | 4 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 8 | ||||
-rw-r--r-- | src/core/channel/client_uchannel.c | 24 | ||||
-rw-r--r-- | src/core/channel/client_uchannel.h | 4 | ||||
-rw-r--r-- | src/core/channel/compress_filter.c | 3 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 3 | ||||
-rw-r--r-- | src/core/channel/subchannel_call_holder.c | 19 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 28 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/round_robin.c | 10 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 3 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 3 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 117 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 43 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 3 | ||||
-rw-r--r-- | test/core/end2end/fixtures/h2_uchannel.c | 9 |
15 files changed, 156 insertions, 125 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index 61a95ec765..7a6ce30612 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -60,9 +60,7 @@ typedef struct call_data { grpc_closure finish_recv; } call_data; -typedef struct channel_data { - gpr_uint8 unused; -} channel_data; +typedef struct channel_data { gpr_uint8 unused; } channel_data; static void extract_and_annotate_method_tag(grpc_metadata_batch *md, call_data *calld, diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 731058c8ff..f026d32265 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -324,7 +324,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, gpr_mu_lock(&chand->mu_config); if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, connected_subchannel); + grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, + connected_subchannel); } for (closure = chand->waiting_for_config_closures.head; closure != NULL; closure = grpc_closure_next(closure)) { @@ -338,8 +339,9 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, return 1; } if (chand->lb_policy != NULL) { - int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset, - initial_metadata, connected_subchannel, on_ready); + int r = + grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset, + initial_metadata, connected_subchannel, on_ready); gpr_mu_unlock(&chand->mu_config); return r; } diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index 1ab0faf65e..926bbde838 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -84,9 +84,9 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, chand->subchannel_connectivity, "uchannel_monitor_subchannel"); - grpc_connected_subchannel_notify_on_state_change(exec_ctx, chand->connected_subchannel, - &chand->subchannel_connectivity, - &chand->connectivity_cb); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, chand->connected_subchannel, &chand->subchannel_connectivity, + &chand->connectivity_cb); } static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { @@ -168,8 +168,8 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - grpc_connected_subchannel_state_change_unsubscribe(exec_ctx, chand->connected_subchannel, - &chand->connectivity_cb); + grpc_connected_subchannel_state_change_unsubscribe( + exec_ctx, chand->connected_subchannel, &chand->connectivity_cb); grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); gpr_mu_destroy(&chand->mu_state); } @@ -198,9 +198,9 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( GRPC_CHANNEL_CONNECTING, "uchannel_connecting_changed"); chand->subchannel_connectivity = out; - grpc_connected_subchannel_notify_on_state_change(exec_ctx, chand->connected_subchannel, - &chand->subchannel_connectivity, - &chand->connectivity_cb); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, chand->connected_subchannel, &chand->subchannel_connectivity, + &chand->connectivity_cb); } gpr_mu_unlock(&chand->mu_state); return out; @@ -221,8 +221,8 @@ grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( channel_data *chand = elem->channel_data; grpc_channel_element *parent_elem; gpr_mu_lock(&chand->mu_state); - parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack( - chand->master)); + parent_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(chand->master)); gpr_mu_unlock(&chand->mu_state); return grpc_client_channel_get_connecting_pollset_set(parent_elem); } @@ -267,8 +267,8 @@ grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, return channel; } -void grpc_client_uchannel_set_connected_subchannel(grpc_channel *uchannel, - grpc_connected_subchannel *connected_subchannel) { +void grpc_client_uchannel_set_connected_subchannel( + grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel) { grpc_channel_element *elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel)); channel_data *chand = elem->channel_data; diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h index 1acf9bfd69..120a3daf3d 100644 --- a/src/core/channel/client_uchannel.h +++ b/src/core/channel/client_uchannel.h @@ -64,7 +64,7 @@ void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel_args *args); -void grpc_client_uchannel_set_connected_subchannel(grpc_channel *uchannel, - grpc_connected_subchannel *connected_subchannel); +void grpc_client_uchannel_set_connected_subchannel( + grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H */ diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index fc8b425e47..c997a074a7 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -288,8 +288,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { -} + grpc_channel_element *elem) {} const grpc_channel_filter grpc_compress_filter = { compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data), diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index c1645c2ba0..e7b8e42819 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -225,8 +225,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { -} + grpc_channel_element *elem) {} const grpc_channel_filter grpc_http_server_filter = { hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index d1a7f86348..2e3d49e806 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -137,19 +137,23 @@ retry: } /* if we don't have a subchannel, try to get one */ if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - holder->connected_subchannel == NULL && op->send_initial_metadata != NULL) { + holder->connected_subchannel == NULL && + op->send_initial_metadata != NULL) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; grpc_closure_init(&holder->next_step, subchannel_ready, holder); - if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, - op->send_initial_metadata, &holder->connected_subchannel, - &holder->next_step)) { + if (holder->pick_subchannel( + exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata, + &holder->connected_subchannel, &holder->next_step)) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; } } /* if we've got a subchannel, then let's ask it to create a call */ if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && holder->connected_subchannel != NULL) { - gpr_atm_rel_store(&holder->subchannel_call, grpc_connected_subchannel_create_call(exec_ctx, holder->connected_subchannel, holder->pollset)); + gpr_atm_rel_store( + &holder->subchannel_call, + grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset)); retry_waiting_locked(exec_ctx, holder); goto retry; } @@ -171,7 +175,10 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; fail_locked(exec_ctx, holder); } else { - gpr_atm_rel_store(&holder->subchannel_call, grpc_connected_subchannel_create_call(exec_ctx, holder->connected_subchannel, holder->pollset)); + gpr_atm_rel_store( + &holder->subchannel_call, + grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset)); retry_waiting_locked(exec_ctx, holder); } gpr_mu_unlock(&holder->mu); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 6d9e6af4a6..e093c3e9a9 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -174,8 +174,8 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target, - grpc_closure *on_complete) { + grpc_metadata_batch *initial_metadata, + grpc_connected_subchannel **target, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -218,7 +218,8 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { - if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != exclude_subchannel) { + if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != + exclude_subchannel) { memset(&op, 0, sizeof(op)); op.disconnect = 1; grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op); @@ -245,9 +246,9 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, - &p->checking_connectivity, - &p->connectivity_changed); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, p->selected, &p->checking_connectivity, + &p->connectivity_changed); } else { GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); } @@ -258,7 +259,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); selected_subchannel = p->subchannels[p->checking_subchannel]; - p->selected = grpc_subchannel_get_connected_subchannel(selected_subchannel); + p->selected = + grpc_subchannel_get_connected_subchannel(selected_subchannel); GPR_ASSERT(p->selected); GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first"); /* drop the pick list: we are connected now */ @@ -274,9 +276,9 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } - grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, - &p->checking_connectivity, - &p->connectivity_changed); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, p->selected, &p->checking_connectivity, + &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: grpc_connectivity_state_set(exec_ctx, &p->state_tracker, @@ -361,13 +363,15 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { - if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i])) continue; + if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i])) + continue; grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); } if (p->selected) { grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, + "pf_broadcast_to_selected"); } gpr_free(subchannels); } diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 08592b79e1..ca0d6abd07 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -314,8 +314,8 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target, - grpc_closure *on_complete) { + grpc_metadata_batch *initial_metadata, + grpc_connected_subchannel **target, grpc_closure *on_complete) { size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; @@ -325,7 +325,8 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, gpr_mu_unlock(&p->mu); *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", + gpr_log(GPR_DEBUG, + "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } /* only advance the last picked pointer if the selection was used */ @@ -390,7 +391,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + *pp->target = + grpc_subchannel_get_connected_subchannel(selected->subchannel); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 6fa3c1b423..5605f788a5 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -71,7 +71,8 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_connected_subchannel **target, grpc_closure *on_complete) { + grpc_connected_subchannel **target, + grpc_closure *on_complete) { return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata, target, on_complete); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index b1fb64c06c..2889b8e55d 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -111,7 +111,8 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_connected_subchannel **target, grpc_closure *on_complete); + grpc_connected_subchannel **target, + grpc_closure *on_complete); void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 42b056c49e..07a74e250f 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -52,8 +52,9 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ - ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load(&(subchannel)->connected_subchannel))) +#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ + ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \ + &(subchannel)->connected_subchannel))) struct grpc_connected_subchannel { /** refcount */ @@ -152,10 +153,10 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, #define REF_PASS_REASON , reason #define REF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ - (name), (p), (p)->refs, (p)->refs + 1, reason) + (name), (p), (p)->refs.count, (p)->refs.count + 1, reason) #define UNREF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \ - (name), (p), (p)->refs, (p)->refs - 1, reason) + (name), (p), (p)->refs.count, (p)->refs.count - 1, reason) #else #define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p)) #define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p)) @@ -175,23 +176,26 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, * connection implementation */ -static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, + int success) { grpc_connected_subchannel *c = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c)); gpr_free(c); } -void grpc_connected_subchannel_ref(grpc_connected_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_connected_subchannel_ref( + grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { REF_LOG("CONNECTION", c); gpr_ref(&c->refs); } -void grpc_connected_subchannel_unref( - grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { UNREF_LOG("CONNECTION", c); if (gpr_unref(&c->refs)) { - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), 1); + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), + 1); } } @@ -199,7 +203,8 @@ void grpc_connected_subchannel_unref( * grpc_subchannel implementation */ -static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, + int success) { grpc_subchannel *c = arg; grpc_connected_subchannel *con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != NULL) { @@ -214,13 +219,16 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) } void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + REF_LOG("SUBCHANNEL", c); gpr_ref(&c->refs); } void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + UNREF_LOG("SUBCHANNEL", c); if (gpr_unref(&c->refs)) { - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), 1); + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), + 1); } } @@ -276,7 +284,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { args.deadline = compute_connect_deadline(c); args.channel_args = c->args; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_CONNECTING, "state_change"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + GRPC_CHANNEL_CONNECTING, "state_change"); grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result, &c->connected); } @@ -319,11 +328,11 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, } void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_closure *subscribed_notify) { + grpc_subchannel *c, + grpc_closure *subscribed_notify) { gpr_mu_lock(&c->mu); - grpc_connectivity_state_change_unsubscribe( - exec_ctx, &c->state_tracker, subscribed_notify); + grpc_connectivity_state_change_unsubscribe(exec_ctx, &c->state_tracker, + subscribed_notify); gpr_mu_unlock(&c->mu); } @@ -339,7 +348,8 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, } if (op->disconnect) { c->disconnected = 1; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); if (c->have_alarm) { cancel_alarm = 1; } @@ -360,15 +370,16 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, } } -void grpc_connected_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) { +void grpc_connected_subchannel_process_transport_op( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_transport_op *op) { grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_element *top_elem = - grpc_channel_stack_element(channel_stack, 0); + grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0); top_elem->filter->start_transport_op(exec_ctx, top_elem, op); } static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, - int iomgr_success) { + int iomgr_success) { state_watcher *sw = p; grpc_subchannel *c = sw->whom.subchannel; gpr_mu *mu = &c->mu; @@ -377,9 +388,12 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, /* if we failed just leave this closure */ if (iomgr_success) { - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, sw->connectivity_state, "reflect_child"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + sw->connectivity_state, "reflect_child"); if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_connected_subchannel_notify_on_state_change(exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), &sw->connectivity_state, &sw->closure); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), + &sw->connectivity_state, &sw->closure); GRPC_SUBCHANNEL_REF(c, "state_watcher"); sw = NULL; } @@ -390,7 +404,10 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, gpr_free(sw); } -static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) { +static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *con, + grpc_connectivity_state *state, + grpc_closure *closure) { grpc_transport_op op; grpc_channel_element *elem; memset(&op, 0, sizeof(op)); @@ -400,12 +417,16 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connecte elem->filter->start_transport_op(exec_ctx, elem, &op); } -void grpc_connected_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) { +void grpc_connected_subchannel_notify_on_state_change( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_connectivity_state *state, grpc_closure *closure) { GPR_ASSERT(state != NULL); connected_subchannel_state_op(exec_ctx, con, state, closure); } -void grpc_connected_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_closure *closure) { +void grpc_connected_subchannel_state_change_unsubscribe( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_closure *closure) { connected_subchannel_state_op(exec_ctx, con, NULL, closure); } @@ -429,7 +450,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { channel_stack_size = grpc_channel_stack_size(filters, num_filters); con = gpr_malloc(sizeof(grpc_connected_subchannel) + channel_stack_size); stk = (grpc_channel_stack *)(con + 1); - gpr_ref_init(&c->refs, 1); + gpr_ref_init(&con->refs, 1); grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args, stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); @@ -440,7 +461,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { sw_subchannel = gpr_malloc(sizeof(*sw_subchannel)); sw_subchannel->whom.subchannel = c; sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; - grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel); + grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, + sw_subchannel); gpr_mu_lock(&c->mu); @@ -458,28 +480,18 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con)); c->connecting = 0; - /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated + /* setup subchannel watching connected subchannel for changes; subchannel ref + for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_REF(c, "state_watcher"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); - grpc_connected_subchannel_notify_on_state_change(exec_ctx, con, &sw_subchannel->connectivity_state, &sw_subchannel->closure); - -#if 0 - grpc_transport_op op; - grpc_channel_element *elem; - - /* setup connected subchannel watching transport for changes */ - memset(&op, 0, sizeof(op)); - op.connectivity_state = &sw_connected_subchannel->connectivity_state; - op.on_connectivity_state_change = &sw_connected_subchannel->closure; - op.bind_pollset_set = c->pollset_set; - elem = - grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, &op); -#endif + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, con, &sw_subchannel->connectivity_state, + &sw_subchannel->closure); /* signal completion */ - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, "connected"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, + "connected"); gpr_mu_unlock(&c->mu); gpr_free((void *)filters); @@ -559,7 +571,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connect_failed"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "connect_failed"); grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); gpr_mu_unlock(&c->mu); } @@ -623,13 +637,14 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op); } -grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(grpc_subchannel *c) { +grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( + grpc_subchannel *c) { return GET_CONNECTED_SUBCHANNEL(c, acq); } -grpc_subchannel_call *grpc_connected_subchannel_create_call(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *con, - grpc_pollset *pollset) { +grpc_subchannel_call *grpc_connected_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_pollset *pollset) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index f4fb47402c..14eb4baa1f 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -64,7 +64,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p)) #define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) -#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) grpc_connected_subchannel_unref((cl), (p)) +#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \ + grpc_connected_subchannel_unref((cl), (p)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) #define GRPC_SUBCHANNEL_CALL_UNREF(cl, p, r) \ grpc_subchannel_call_unref((cl), (p)) @@ -76,11 +77,11 @@ void grpc_subchannel_ref(grpc_subchannel *channel void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_connected_subchannel_ref( + grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *channel - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + grpc_connected_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_ref(grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, @@ -88,17 +89,17 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a subchannel call */ -grpc_subchannel_call *grpc_connected_subchannel_create_call(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *connected_subchannel, - grpc_pollset *pollset); +grpc_subchannel_call *grpc_connected_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, + grpc_pollset *pollset); /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_transport_op *op); -void grpc_connected_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *subchannel, - grpc_transport_op *op); +void grpc_connected_subchannel_process_transport_op( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *subchannel, + grpc_transport_op *op); /** poll the current connectivity state of a channel */ grpc_connectivity_state grpc_subchannel_check_connectivity( @@ -110,19 +111,18 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, grpc_connectivity_state *state, grpc_closure *notify); -void grpc_connected_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *channel, - grpc_connectivity_state *state, - grpc_closure *notify); +void grpc_connected_subchannel_notify_on_state_change( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, + grpc_connectivity_state *state, grpc_closure *notify); /** Remove \a subscribed_notify from the list of closures to be called on a * state change if present. */ void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_closure *subscribed_notify); -void grpc_connected_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *channel, - grpc_closure *subscribed_notify); + grpc_subchannel *channel, + grpc_closure *subscribed_notify); +void grpc_connected_subchannel_state_change_unsubscribe( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, + grpc_closure *subscribed_notify); /** express interest in \a channel's activities through \a pollset. */ void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, @@ -135,7 +135,8 @@ void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ -grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(grpc_subchannel *subchannel); +grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( + grpc_subchannel *subchannel); /** continue processing a transport op */ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 0247116ebb..4a55544ac1 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -103,8 +103,7 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, } static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { -} + grpc_call_element_args *args) {} static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {} diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 43615d8836..3add8e8007 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -237,7 +237,8 @@ grpc_connectivity_state g_state = GRPC_CHANNEL_IDLE; static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) { if (g_state != GRPC_CHANNEL_READY) { - grpc_subchannel_notify_on_state_change(exec_ctx, arg, &g_state, grpc_closure_create(state_changed, arg)); + grpc_subchannel_notify_on_state_change( + exec_ctx, arg, &g_state, grpc_closure_create(state_changed, arg)); } } @@ -246,12 +247,14 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_init(&pollset); grpc_subchannel_add_interested_party(&exec_ctx, c, &pollset); - grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_state, grpc_closure_create(state_changed, c)); + grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_state, + grpc_closure_create(state_changed, c)); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); while (g_state != GRPC_CHANNEL_READY) { grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &pollset, &worker, gpr_now(GPR_CLOCK_REALTIME), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); + grpc_pollset_work(&exec_ctx, &pollset, &worker, gpr_now(GPR_CLOCK_REALTIME), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); |