From b5585d4f723003e4c900f02c2f4ce25e6fb26d5e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 17 Nov 2015 07:18:31 -0800 Subject: Initial pass through to make subchannels single connect --- src/core/channel/client_channel.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 16d91d4277..5517507423 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -287,7 +287,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, typedef struct { grpc_metadata_batch *initial_metadata; - grpc_subchannel **subchannel; + grpc_connected_subchannel **connected_subchannel; grpc_closure *on_ready; grpc_call_element *elem; grpc_closure closure; @@ -295,17 +295,17 @@ typedef struct { static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, - grpc_subchannel **subchannel, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) { continue_picking_args *cpa = arg; if (!success) { grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0); - } else if (cpa->subchannel == NULL) { + } else if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->subchannel, cpa->on_ready)) { + cpa->connected_subchannel, cpa->on_ready)) { grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1); } gpr_free(cpa); @@ -313,7 +313,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) { static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, grpc_metadata_batch *initial_metadata, - grpc_subchannel **subchannel, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready) { grpc_call_element *elem = elemp; channel_data *chand = elem->channel_data; @@ -321,18 +321,18 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, continue_picking_args *cpa; grpc_closure *closure; - GPR_ASSERT(subchannel); + GPR_ASSERT(connected_subchannel); gpr_mu_lock(&chand->mu_config); if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, subchannel); + grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, connected_subchannel); } for (closure = chand->waiting_for_config_closures.head; closure != NULL; closure = grpc_closure_next(closure)) { cpa = closure->cb_arg; - if (cpa->subchannel == subchannel) { - cpa->subchannel = NULL; + if (cpa->connected_subchannel == connected_subchannel) { + cpa->connected_subchannel = NULL; grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0); } } @@ -341,7 +341,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, } if (chand->lb_policy != NULL) { int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset, - initial_metadata, subchannel, on_ready); + initial_metadata, connected_subchannel, on_ready); gpr_mu_unlock(&chand->mu_config); return r; } @@ -354,7 +354,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, } cpa = gpr_malloc(sizeof(*cpa)); cpa->initial_metadata = initial_metadata; - cpa->subchannel = subchannel; + cpa->connected_subchannel = connected_subchannel; cpa->on_ready = on_ready; cpa->elem = elem; grpc_closure_init(&cpa->closure, continue_picking, cpa); -- cgit v1.2.3 From ab33b488c42c1238b1b2ebbf3bf4f0db61103497 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sat, 21 Nov 2015 08:11:04 -0800 Subject: clang-format, bugfix --- src/core/census/grpc_filter.c | 4 +- src/core/channel/client_channel.c | 8 +- src/core/channel/client_uchannel.c | 24 ++--- src/core/channel/client_uchannel.h | 4 +- src/core/channel/compress_filter.c | 3 +- src/core/channel/http_server_filter.c | 3 +- src/core/channel/subchannel_call_holder.c | 19 ++-- src/core/client_config/lb_policies/pick_first.c | 28 +++--- src/core/client_config/lb_policies/round_robin.c | 10 +- src/core/client_config/lb_policy.c | 3 +- src/core/client_config/lb_policy.h | 3 +- src/core/client_config/subchannel.c | 117 +++++++++++++---------- src/core/client_config/subchannel.h | 43 +++++---- src/core/surface/lame_client.c | 3 +- test/core/end2end/fixtures/h2_uchannel.c | 9 +- 15 files changed, 156 insertions(+), 125 deletions(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index 61a95ec765..7a6ce30612 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -60,9 +60,7 @@ typedef struct call_data { grpc_closure finish_recv; } call_data; -typedef struct channel_data { - gpr_uint8 unused; -} channel_data; +typedef struct channel_data { gpr_uint8 unused; } channel_data; static void extract_and_annotate_method_tag(grpc_metadata_batch *md, call_data *calld, diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 731058c8ff..f026d32265 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -324,7 +324,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, gpr_mu_lock(&chand->mu_config); if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, connected_subchannel); + grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, + connected_subchannel); } for (closure = chand->waiting_for_config_closures.head; closure != NULL; closure = grpc_closure_next(closure)) { @@ -338,8 +339,9 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, return 1; } if (chand->lb_policy != NULL) { - int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset, - initial_metadata, connected_subchannel, on_ready); + int r = + grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset, + initial_metadata, connected_subchannel, on_ready); gpr_mu_unlock(&chand->mu_config); return r; } diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index 1ab0faf65e..926bbde838 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -84,9 +84,9 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, chand->subchannel_connectivity, "uchannel_monitor_subchannel"); - grpc_connected_subchannel_notify_on_state_change(exec_ctx, chand->connected_subchannel, - &chand->subchannel_connectivity, - &chand->connectivity_cb); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, chand->connected_subchannel, &chand->subchannel_connectivity, + &chand->connectivity_cb); } static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { @@ -168,8 +168,8 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - grpc_connected_subchannel_state_change_unsubscribe(exec_ctx, chand->connected_subchannel, - &chand->connectivity_cb); + grpc_connected_subchannel_state_change_unsubscribe( + exec_ctx, chand->connected_subchannel, &chand->connectivity_cb); grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); gpr_mu_destroy(&chand->mu_state); } @@ -198,9 +198,9 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( GRPC_CHANNEL_CONNECTING, "uchannel_connecting_changed"); chand->subchannel_connectivity = out; - grpc_connected_subchannel_notify_on_state_change(exec_ctx, chand->connected_subchannel, - &chand->subchannel_connectivity, - &chand->connectivity_cb); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, chand->connected_subchannel, &chand->subchannel_connectivity, + &chand->connectivity_cb); } gpr_mu_unlock(&chand->mu_state); return out; @@ -221,8 +221,8 @@ grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( channel_data *chand = elem->channel_data; grpc_channel_element *parent_elem; gpr_mu_lock(&chand->mu_state); - parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack( - chand->master)); + parent_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(chand->master)); gpr_mu_unlock(&chand->mu_state); return grpc_client_channel_get_connecting_pollset_set(parent_elem); } @@ -267,8 +267,8 @@ grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, return channel; } -void grpc_client_uchannel_set_connected_subchannel(grpc_channel *uchannel, - grpc_connected_subchannel *connected_subchannel) { +void grpc_client_uchannel_set_connected_subchannel( + grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel) { grpc_channel_element *elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel)); channel_data *chand = elem->channel_data; diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h index 1acf9bfd69..120a3daf3d 100644 --- a/src/core/channel/client_uchannel.h +++ b/src/core/channel/client_uchannel.h @@ -64,7 +64,7 @@ void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel_args *args); -void grpc_client_uchannel_set_connected_subchannel(grpc_channel *uchannel, - grpc_connected_subchannel *connected_subchannel); +void grpc_client_uchannel_set_connected_subchannel( + grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H */ diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index fc8b425e47..c997a074a7 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -288,8 +288,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { -} + grpc_channel_element *elem) {} const grpc_channel_filter grpc_compress_filter = { compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data), diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index c1645c2ba0..e7b8e42819 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -225,8 +225,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { -} + grpc_channel_element *elem) {} const grpc_channel_filter grpc_http_server_filter = { hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index d1a7f86348..2e3d49e806 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -137,19 +137,23 @@ retry: } /* if we don't have a subchannel, try to get one */ if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - holder->connected_subchannel == NULL && op->send_initial_metadata != NULL) { + holder->connected_subchannel == NULL && + op->send_initial_metadata != NULL) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; grpc_closure_init(&holder->next_step, subchannel_ready, holder); - if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, - op->send_initial_metadata, &holder->connected_subchannel, - &holder->next_step)) { + if (holder->pick_subchannel( + exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata, + &holder->connected_subchannel, &holder->next_step)) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; } } /* if we've got a subchannel, then let's ask it to create a call */ if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && holder->connected_subchannel != NULL) { - gpr_atm_rel_store(&holder->subchannel_call, grpc_connected_subchannel_create_call(exec_ctx, holder->connected_subchannel, holder->pollset)); + gpr_atm_rel_store( + &holder->subchannel_call, + grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset)); retry_waiting_locked(exec_ctx, holder); goto retry; } @@ -171,7 +175,10 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; fail_locked(exec_ctx, holder); } else { - gpr_atm_rel_store(&holder->subchannel_call, grpc_connected_subchannel_create_call(exec_ctx, holder->connected_subchannel, holder->pollset)); + gpr_atm_rel_store( + &holder->subchannel_call, + grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset)); retry_waiting_locked(exec_ctx, holder); } gpr_mu_unlock(&holder->mu); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 6d9e6af4a6..e093c3e9a9 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -174,8 +174,8 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target, - grpc_closure *on_complete) { + grpc_metadata_batch *initial_metadata, + grpc_connected_subchannel **target, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -218,7 +218,8 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { - if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != exclude_subchannel) { + if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != + exclude_subchannel) { memset(&op, 0, sizeof(op)); op.disconnect = 1; grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op); @@ -245,9 +246,9 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, - &p->checking_connectivity, - &p->connectivity_changed); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, p->selected, &p->checking_connectivity, + &p->connectivity_changed); } else { GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); } @@ -258,7 +259,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); selected_subchannel = p->subchannels[p->checking_subchannel]; - p->selected = grpc_subchannel_get_connected_subchannel(selected_subchannel); + p->selected = + grpc_subchannel_get_connected_subchannel(selected_subchannel); GPR_ASSERT(p->selected); GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first"); /* drop the pick list: we are connected now */ @@ -274,9 +276,9 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } - grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, - &p->checking_connectivity, - &p->connectivity_changed); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, p->selected, &p->checking_connectivity, + &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: grpc_connectivity_state_set(exec_ctx, &p->state_tracker, @@ -361,13 +363,15 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { - if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i])) continue; + if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i])) + continue; grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); } if (p->selected) { grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected"); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, + "pf_broadcast_to_selected"); } gpr_free(subchannels); } diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 08592b79e1..ca0d6abd07 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -314,8 +314,8 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target, - grpc_closure *on_complete) { + grpc_metadata_batch *initial_metadata, + grpc_connected_subchannel **target, grpc_closure *on_complete) { size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; @@ -325,7 +325,8 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, gpr_mu_unlock(&p->mu); *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", + gpr_log(GPR_DEBUG, + "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } /* only advance the last picked pointer if the selection was used */ @@ -390,7 +391,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + *pp->target = + grpc_subchannel_get_connected_subchannel(selected->subchannel); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 6fa3c1b423..5605f788a5 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -71,7 +71,8 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_connected_subchannel **target, grpc_closure *on_complete) { + grpc_connected_subchannel **target, + grpc_closure *on_complete) { return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata, target, on_complete); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index b1fb64c06c..2889b8e55d 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -111,7 +111,8 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_connected_subchannel **target, grpc_closure *on_complete); + grpc_connected_subchannel **target, + grpc_closure *on_complete); void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 42b056c49e..07a74e250f 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -52,8 +52,9 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ - ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load(&(subchannel)->connected_subchannel))) +#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ + ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \ + &(subchannel)->connected_subchannel))) struct grpc_connected_subchannel { /** refcount */ @@ -152,10 +153,10 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, #define REF_PASS_REASON , reason #define REF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ - (name), (p), (p)->refs, (p)->refs + 1, reason) + (name), (p), (p)->refs.count, (p)->refs.count + 1, reason) #define UNREF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \ - (name), (p), (p)->refs, (p)->refs - 1, reason) + (name), (p), (p)->refs.count, (p)->refs.count - 1, reason) #else #define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p)) #define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p)) @@ -175,23 +176,26 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, * connection implementation */ -static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, + int success) { grpc_connected_subchannel *c = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c)); gpr_free(c); } -void grpc_connected_subchannel_ref(grpc_connected_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_connected_subchannel_ref( + grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { REF_LOG("CONNECTION", c); gpr_ref(&c->refs); } -void grpc_connected_subchannel_unref( - grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { UNREF_LOG("CONNECTION", c); if (gpr_unref(&c->refs)) { - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), 1); + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), + 1); } } @@ -199,7 +203,8 @@ void grpc_connected_subchannel_unref( * grpc_subchannel implementation */ -static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, + int success) { grpc_subchannel *c = arg; grpc_connected_subchannel *con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); if (con != NULL) { @@ -214,13 +219,16 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) } void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + REF_LOG("SUBCHANNEL", c); gpr_ref(&c->refs); } void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + UNREF_LOG("SUBCHANNEL", c); if (gpr_unref(&c->refs)) { - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), 1); + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), + 1); } } @@ -276,7 +284,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { args.deadline = compute_connect_deadline(c); args.channel_args = c->args; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_CONNECTING, "state_change"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + GRPC_CHANNEL_CONNECTING, "state_change"); grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result, &c->connected); } @@ -319,11 +328,11 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, } void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_closure *subscribed_notify) { + grpc_subchannel *c, + grpc_closure *subscribed_notify) { gpr_mu_lock(&c->mu); - grpc_connectivity_state_change_unsubscribe( - exec_ctx, &c->state_tracker, subscribed_notify); + grpc_connectivity_state_change_unsubscribe(exec_ctx, &c->state_tracker, + subscribed_notify); gpr_mu_unlock(&c->mu); } @@ -339,7 +348,8 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, } if (op->disconnect) { c->disconnected = 1; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); if (c->have_alarm) { cancel_alarm = 1; } @@ -360,15 +370,16 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, } } -void grpc_connected_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) { +void grpc_connected_subchannel_process_transport_op( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_transport_op *op) { grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_element *top_elem = - grpc_channel_stack_element(channel_stack, 0); + grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0); top_elem->filter->start_transport_op(exec_ctx, top_elem, op); } static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, - int iomgr_success) { + int iomgr_success) { state_watcher *sw = p; grpc_subchannel *c = sw->whom.subchannel; gpr_mu *mu = &c->mu; @@ -377,9 +388,12 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, /* if we failed just leave this closure */ if (iomgr_success) { - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, sw->connectivity_state, "reflect_child"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + sw->connectivity_state, "reflect_child"); if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_connected_subchannel_notify_on_state_change(exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), &sw->connectivity_state, &sw->closure); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), + &sw->connectivity_state, &sw->closure); GRPC_SUBCHANNEL_REF(c, "state_watcher"); sw = NULL; } @@ -390,7 +404,10 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, gpr_free(sw); } -static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) { +static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, + grpc_connected_subchannel *con, + grpc_connectivity_state *state, + grpc_closure *closure) { grpc_transport_op op; grpc_channel_element *elem; memset(&op, 0, sizeof(op)); @@ -400,12 +417,16 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connecte elem->filter->start_transport_op(exec_ctx, elem, &op); } -void grpc_connected_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) { +void grpc_connected_subchannel_notify_on_state_change( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_connectivity_state *state, grpc_closure *closure) { GPR_ASSERT(state != NULL); connected_subchannel_state_op(exec_ctx, con, state, closure); } -void grpc_connected_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_closure *closure) { +void grpc_connected_subchannel_state_change_unsubscribe( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_closure *closure) { connected_subchannel_state_op(exec_ctx, con, NULL, closure); } @@ -429,7 +450,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { channel_stack_size = grpc_channel_stack_size(filters, num_filters); con = gpr_malloc(sizeof(grpc_connected_subchannel) + channel_stack_size); stk = (grpc_channel_stack *)(con + 1); - gpr_ref_init(&c->refs, 1); + gpr_ref_init(&con->refs, 1); grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args, stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); @@ -440,7 +461,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { sw_subchannel = gpr_malloc(sizeof(*sw_subchannel)); sw_subchannel->whom.subchannel = c; sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; - grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel); + grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, + sw_subchannel); gpr_mu_lock(&c->mu); @@ -458,28 +480,18 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con)); c->connecting = 0; - /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated + /* setup subchannel watching connected subchannel for changes; subchannel ref + for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_REF(c, "state_watcher"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); - grpc_connected_subchannel_notify_on_state_change(exec_ctx, con, &sw_subchannel->connectivity_state, &sw_subchannel->closure); - -#if 0 - grpc_transport_op op; - grpc_channel_element *elem; - - /* setup connected subchannel watching transport for changes */ - memset(&op, 0, sizeof(op)); - op.connectivity_state = &sw_connected_subchannel->connectivity_state; - op.on_connectivity_state_change = &sw_connected_subchannel->closure; - op.bind_pollset_set = c->pollset_set; - elem = - grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, &op); -#endif + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, con, &sw_subchannel->connectivity_state, + &sw_subchannel->closure); /* signal completion */ - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, "connected"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, + "connected"); gpr_mu_unlock(&c->mu); gpr_free((void *)filters); @@ -559,7 +571,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connect_failed"); + grpc_connectivity_state_set(exec_ctx, &c->state_tracker, + GRPC_CHANNEL_TRANSIENT_FAILURE, + "connect_failed"); grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); gpr_mu_unlock(&c->mu); } @@ -623,13 +637,14 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op); } -grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(grpc_subchannel *c) { +grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( + grpc_subchannel *c) { return GET_CONNECTED_SUBCHANNEL(c, acq); } -grpc_subchannel_call *grpc_connected_subchannel_create_call(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *con, - grpc_pollset *pollset) { +grpc_subchannel_call *grpc_connected_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_pollset *pollset) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index f4fb47402c..14eb4baa1f 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -64,7 +64,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p)) #define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) -#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) grpc_connected_subchannel_unref((cl), (p)) +#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \ + grpc_connected_subchannel_unref((cl), (p)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) #define GRPC_SUBCHANNEL_CALL_UNREF(cl, p, r) \ grpc_subchannel_call_unref((cl), (p)) @@ -76,11 +77,11 @@ void grpc_subchannel_ref(grpc_subchannel *channel void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_connected_subchannel_ref( + grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *channel - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + grpc_connected_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_ref(grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, @@ -88,17 +89,17 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a subchannel call */ -grpc_subchannel_call *grpc_connected_subchannel_create_call(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *connected_subchannel, - grpc_pollset *pollset); +grpc_subchannel_call *grpc_connected_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, + grpc_pollset *pollset); /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_transport_op *op); -void grpc_connected_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *subchannel, - grpc_transport_op *op); +void grpc_connected_subchannel_process_transport_op( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *subchannel, + grpc_transport_op *op); /** poll the current connectivity state of a channel */ grpc_connectivity_state grpc_subchannel_check_connectivity( @@ -110,19 +111,18 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, grpc_connectivity_state *state, grpc_closure *notify); -void grpc_connected_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *channel, - grpc_connectivity_state *state, - grpc_closure *notify); +void grpc_connected_subchannel_notify_on_state_change( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, + grpc_connectivity_state *state, grpc_closure *notify); /** Remove \a subscribed_notify from the list of closures to be called on a * state change if present. */ void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_closure *subscribed_notify); -void grpc_connected_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *channel, - grpc_closure *subscribed_notify); + grpc_subchannel *channel, + grpc_closure *subscribed_notify); +void grpc_connected_subchannel_state_change_unsubscribe( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, + grpc_closure *subscribed_notify); /** express interest in \a channel's activities through \a pollset. */ void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, @@ -135,7 +135,8 @@ void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ -grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(grpc_subchannel *subchannel); +grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( + grpc_subchannel *subchannel); /** continue processing a transport op */ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 0247116ebb..4a55544ac1 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -103,8 +103,7 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, } static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { -} + grpc_call_element_args *args) {} static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {} diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 43615d8836..3add8e8007 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -237,7 +237,8 @@ grpc_connectivity_state g_state = GRPC_CHANNEL_IDLE; static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) { if (g_state != GRPC_CHANNEL_READY) { - grpc_subchannel_notify_on_state_change(exec_ctx, arg, &g_state, grpc_closure_create(state_changed, arg)); + grpc_subchannel_notify_on_state_change( + exec_ctx, arg, &g_state, grpc_closure_create(state_changed, arg)); } } @@ -246,12 +247,14 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_init(&pollset); grpc_subchannel_add_interested_party(&exec_ctx, c, &pollset); - grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_state, grpc_closure_create(state_changed, c)); + grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_state, + grpc_closure_create(state_changed, c)); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); while (g_state != GRPC_CHANNEL_READY) { grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &pollset, &worker, gpr_now(GPR_CLOCK_REALTIME), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); + grpc_pollset_work(&exec_ctx, &pollset, &worker, gpr_now(GPR_CLOCK_REALTIME), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); -- cgit v1.2.3 From 906e3bcfb5a32a25af2f00dd4679052f8a608b3f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 Nov 2015 07:31:31 -0800 Subject: Stripping out master channel as a concept --- src/core/channel/channel_stack.c | 11 ++++-- src/core/channel/channel_stack.h | 36 +++++++++++------ src/core/channel/client_channel.c | 52 ++++++------------------- src/core/channel/client_channel.h | 12 +----- src/core/channel/client_uchannel.c | 35 ++--------------- src/core/channel/client_uchannel.h | 10 ----- src/core/channel/connected_channel.c | 2 +- src/core/channel/subchannel_call_holder.c | 5 +-- src/core/channel/subchannel_call_holder.h | 3 +- src/core/client_config/subchannel.c | 64 ++++++++----------------------- src/core/client_config/subchannel.h | 5 --- src/core/surface/channel.c | 1 - src/core/surface/secure_channel_create.c | 1 - test/core/end2end/fixtures/h2_uchannel.c | 1 - 14 files changed, 67 insertions(+), 171 deletions(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 7f7fbf420f..53433a0923 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -101,9 +101,10 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, return CALL_ELEMS_FROM_STACK(call_stack) + index; } -void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, - size_t filter_count, grpc_channel *master, + size_t filter_count, const grpc_channel_args *channel_args, grpc_channel_stack *stack) { size_t call_size = @@ -115,6 +116,8 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, size_t i; stack->count = filter_count; + gpr_ref_init(&stack->refcount.refs, initial_refs); + grpc_closure_init(&stack->refcount.destroy, destroy, destroy_arg); elems = CHANNEL_ELEMS_FROM_STACK(stack); user_data = ((char *)elems) + @@ -122,7 +125,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, /* init per-filter data */ for (i = 0; i < filter_count; i++) { - args.master = master; + args.channel_stack = stack; args.channel_args = channel_args; args.is_first = i == 0; args.is_last = i == (filter_count - 1); @@ -174,7 +177,7 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, /* init per-filter data */ for (i = 0; i < count; i++) { - args.refcount = &call_stack->refcount; + args.call_stack = call_stack; args.server_transport_data = transport_server_data; args.context = context; call_elems[i].filter = channel_elems[i].filter; diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 1db12ead7e..593adcd7b5 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -51,15 +51,18 @@ typedef struct grpc_channel_element grpc_channel_element; typedef struct grpc_call_element grpc_call_element; +typedef struct grpc_channel_stack grpc_channel_stack; +typedef struct grpc_call_stack grpc_call_stack; + typedef struct { - grpc_channel *master; + grpc_channel_stack *channel_stack; const grpc_channel_args *channel_args; int is_first; int is_last; } grpc_channel_element_args; typedef struct { - grpc_stream_refcount *refcount; + grpc_call_stack *call_stack; const void *server_transport_data; grpc_call_context_element *context; } grpc_call_element_args; @@ -144,23 +147,24 @@ struct grpc_call_element { /* A channel stack tracks a set of related filters for one channel, and guarantees they live within a single malloc() allocation */ -typedef struct { +struct grpc_channel_stack { + grpc_stream_refcount refcount; size_t count; /* Memory required for a call stack (computed at channel stack initialization) */ size_t call_stack_size; -} grpc_channel_stack; +}; /* A call stack tracks a set of related filters for one call, and guarantees they live within a single malloc() allocation */ -typedef struct { +struct grpc_call_stack { /* shared refcount for this channel stack. MUST be the first element: the underlying code calls destroy with the address of the refcount, but higher layers prefer to think about the address of the call stack itself. */ grpc_stream_refcount refcount; size_t count; -} grpc_call_stack; +}; /* Get a channel element given a channel stack and its index */ grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack, @@ -175,9 +179,10 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i); size_t grpc_channel_stack_size(const grpc_channel_filter **filters, size_t filter_count); /* Initialize a channel stack given some filters */ -void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, - size_t filter_count, grpc_channel *master, + size_t filter_count, const grpc_channel_args *args, grpc_channel_stack *stack); /* Destroy a channel stack */ @@ -199,14 +204,21 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); #ifdef GRPC_STREAM_REFCOUNT_DEBUG -#define grpc_call_stack_ref(call_stack, reason) \ +#define GRPC_CALL_STACK_REF(call_stack, reason) \ grpc_stream_ref(&(call_stack)->refcount, reason) -#define grpc_call_stack_unref(exec_ctx, call_stack, reason) \ +#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ grpc_stream_unref(exec_ctx, &(call_stack)->refcount, reason) +#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \ + grpc_stream_ref(&(channel_stack)->refcount, reason) +#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ + grpc_stream_unref(exec_ctx, &(channel_stack)->refcount, reason) #else -#define grpc_call_stack_ref(call_stack) grpc_stream_ref(&(call_stack)->refcount) -#define grpc_call_stack_unref(exec_ctx, call_stack) \ +#define GRPC_CALL_STACK_REF(call_stack, reason) grpc_stream_ref(&(call_stack)->refcount) +#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ grpc_stream_unref(exec_ctx, &(call_stack)->refcount) +#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) grpc_stream_ref(&(channel_stack)->refcount) +#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ + grpc_stream_unref(exec_ctx, &(channel_stack)->refcount) #endif /* Destroy a call stack */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index f026d32265..fe43c50018 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -59,11 +59,6 @@ typedef struct client_channel_channel_data { grpc_resolver *resolver; /** have we started resolving this channel */ int started_resolving; - /** master channel - the grpc_channel instance that ultimately owns - this channel_data via its channel stack. - We occasionally use this to bump the refcount on the master channel - to keep ourselves alive through an asynchronous operation. */ - grpc_channel *master; /** mutex protecting client configuration, including all variables below in this data structure */ @@ -81,8 +76,8 @@ typedef struct client_channel_channel_data { grpc_connectivity_state_tracker state_tracker; /** when an lb_policy arrives, should we try to exit idle */ int exit_idle_when_lb_policy_arrives; - /** pollset_set of interested parties in a new connection */ - grpc_pollset_set pollset_set; + /** owning stack */ + grpc_channel_stack *owning_stack; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -103,9 +98,7 @@ typedef struct { } waiting_call; static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data, - chand->master); + return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); } static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, @@ -139,7 +132,7 @@ static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, on_lb_policy_state_changed_locked(exec_ctx, w); gpr_mu_unlock(&w->chand->mu_config); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy"); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); } @@ -147,7 +140,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); - GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); @@ -200,7 +193,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, watch_lb_policy(exec_ctx, chand, lb_policy, state); } gpr_mu_unlock(&chand->mu_config); - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next"); @@ -230,7 +223,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change"); } - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver"); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver"); } static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -347,7 +340,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, } if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = 1; - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); @@ -387,8 +380,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(elem->filter == &grpc_client_channel_filter); gpr_mu_init(&chand->mu_config); - chand->master = args->master; - grpc_pollset_set_init(&chand->pollset_set); grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, @@ -408,7 +399,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(&chand->pollset_set); gpr_mu_destroy(&chand->mu_config); } @@ -437,7 +427,7 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, if (!grpc_closure_list_empty(chand->waiting_for_config_closures) || chand->exit_idle_when_lb_policy_arrives) { chand->started_resolving = 1; - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, &chand->on_config_changed); } @@ -456,7 +446,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( } else { chand->exit_idle_when_lb_policy_arrives = 1; if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); chand->started_resolving = 1; grpc_resolver_next(exec_ctx, chand->resolver, &chand->incoming_configuration, @@ -469,7 +459,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( } void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu_config); @@ -477,23 +467,3 @@ void grpc_client_channel_watch_connectivity_state( exec_ctx, &chand->state_tracker, state, on_complete); gpr_mu_unlock(&chand->mu_config); } - -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( - grpc_channel_element *elem) { - channel_data *chand = elem->channel_data; - return &chand->pollset_set; -} - -void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - channel_data *chand = elem->channel_data; - grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset); -} - -void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - channel_data *chand = elem->channel_data; - grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset); -} diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h index 5103f07a43..d9bc4971f1 100644 --- a/src/core/channel/client_channel.h +++ b/src/core/channel/client_channel.h @@ -57,17 +57,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete); -grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( - grpc_channel_element *elem); - -void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); -void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); - #endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index 926bbde838..b30d030197 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -58,7 +58,7 @@ typedef struct client_uchannel_channel_data { this channel_data via its channel stack. We occasionally use this to bump the refcount on the master channel to keep ourselves alive through an asynchronous operation. */ - grpc_channel *master; + grpc_channel_stack *owning_stack; /** connectivity state being tracked */ grpc_connectivity_state_tracker state_tracker; @@ -90,9 +90,7 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, } static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data, - chand->master); + return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); } static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, @@ -158,7 +156,7 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand); GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); - chand->master = args->master; + chand->owning_stack = args->channel_stack; grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_uchannel"); gpr_mu_init(&chand->mu_state); @@ -216,33 +214,6 @@ void grpc_client_uchannel_watch_connectivity_state( gpr_mu_unlock(&chand->mu_state); } -grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( - grpc_channel_element *elem) { - channel_data *chand = elem->channel_data; - grpc_channel_element *parent_elem; - gpr_mu_lock(&chand->mu_state); - parent_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(chand->master)); - gpr_mu_unlock(&chand->mu_state); - return grpc_client_channel_get_connecting_pollset_set(parent_elem); -} - -void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - grpc_pollset_set *master_pollset_set = - grpc_client_uchannel_get_connecting_pollset_set(elem); - grpc_pollset_set_add_pollset(exec_ctx, master_pollset_set, pollset); -} - -void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - grpc_pollset_set *master_pollset_set = - grpc_client_uchannel_get_connecting_pollset_set(elem); - grpc_pollset_set_del_pollset(exec_ctx, master_pollset_set, pollset); -} - grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel_args *args) { grpc_channel *channel = NULL; diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h index 120a3daf3d..a5cf271042 100644 --- a/src/core/channel/client_uchannel.h +++ b/src/core/channel/client_uchannel.h @@ -51,16 +51,6 @@ void grpc_client_uchannel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_connectivity_state *state, grpc_closure *on_complete); -grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( - grpc_channel_element *elem); - -void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); -void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *channel, - grpc_pollset *pollset); - grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel_args *args); diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 0e1efd965a..73a7dcc81f 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -91,7 +91,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); r = grpc_transport_init_stream(exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - args->refcount, args->server_transport_data); + &args->call_stack->refcount, args->server_transport_data); GPR_ASSERT(r == 0); } diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 2e3d49e806..63069baa84 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -244,13 +244,12 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, } char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call_holder *holder, - grpc_channel *master) { + grpc_subchannel_call_holder *holder) { grpc_subchannel_call *subchannel_call = GET_CALL(holder); if (subchannel_call) { return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); } else { - return grpc_channel_get_target(master); + return NULL; } } diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h index 6328f35344..692c5e5316 100644 --- a/src/core/channel/subchannel_call_holder.h +++ b/src/core/channel/subchannel_call_holder.h @@ -91,7 +91,6 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder, grpc_transport_stream_op *op); char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call_holder *holder, - grpc_channel *master); + grpc_subchannel_call_holder *holder); #endif diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 07a74e250f..6cc2364eaa 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -56,11 +56,6 @@ ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \ &(subchannel)->connected_subchannel))) -struct grpc_connected_subchannel { - /** refcount */ - gpr_refcount refs; -}; - typedef struct { grpc_closure closure; union { @@ -84,11 +79,6 @@ struct grpc_subchannel { /** address to connect to */ struct sockaddr *addr; size_t addr_len; - /** master channel - the grpc_channel instance that ultimately owns - this channel_data via its channel stack. - We occasionally use this to bump the refcount on the master channel - to keep ourselves alive through an asynchronous operation. */ - grpc_channel *master; /** set during connection */ grpc_connect_out_args connecting_result; @@ -97,10 +87,8 @@ struct grpc_subchannel { grpc_closure connected; /** pollset_set tracking who's interested in a connection - being setup - owned by the master channel (in particular the - client_channel - filter there-in) */ - grpc_pollset_set *pollset_set; + being setup */ + grpc_pollset_set pollset_set; /** active connection, or null; of type grpc_connected_subchannel */ gpr_atm connected_subchannel; @@ -132,7 +120,7 @@ struct grpc_subchannel_call { }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) -#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) +#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)(con)) #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call *)(callstack)) - 1) @@ -151,6 +139,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, connection_unref_locked((cl), (p), __FILE__, __LINE__, (r)) #define REF_PASS_ARGS , file, line, reason #define REF_PASS_REASON , reason +#define REF_REASON reason #define REF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ (name), (p), (p)->refs.count, (p)->refs.count + 1, reason) @@ -164,6 +153,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, #define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p)) #define REF_PASS_ARGS #define REF_PASS_REASON +#define REF_REASON "" #define REF_LOG(name, p) \ do { \ } while (0) @@ -185,18 +175,13 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, void grpc_connected_subchannel_ref( grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - REF_LOG("CONNECTION", c); - gpr_ref(&c->refs); + GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - UNREF_LOG("CONNECTION", c); - if (gpr_unref(&c->refs)) { - grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), - 1); - } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } /* @@ -215,6 +200,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c->addr); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); + grpc_pollset_set_destroy(&c->pollset_set); gpr_free(c); } @@ -235,13 +221,13 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset) { - grpc_pollset_set_add_pollset(exec_ctx, c->pollset_set, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &c->pollset_set, pollset); } void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset) { - grpc_pollset_set_del_pollset(exec_ctx, c->pollset_set, pollset); + grpc_pollset_set_del_pollset(exec_ctx, &c->pollset_set, pollset); } static gpr_uint32 random_seed() { @@ -251,8 +237,6 @@ static gpr_uint32 random_seed() { grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args) { grpc_subchannel *c = gpr_malloc(sizeof(*c)); - grpc_channel_element *parent_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(args->master)); memset(c, 0, sizeof(*c)); gpr_ref_init(&c->refs, 1); c->connector = connector; @@ -263,10 +247,9 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, sizeof(grpc_channel_filter *) * c->num_filters); c->addr = gpr_malloc(args->addr_len); memcpy(c->addr, args->addr, args->addr_len); + grpc_pollset_set_init(&c->pollset_set); c->addr_len = args->addr_len; c->args = grpc_channel_args_copy(args->args); - c->master = args->master; - c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); c->random = random_seed(); grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, @@ -278,7 +261,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; - args.interested_parties = c->pollset_set; + args.interested_parties = &c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; args.deadline = compute_connect_deadline(c); @@ -318,7 +301,6 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, c->connecting = 1; /* released by connection */ GRPC_SUBCHANNEL_REF(c, "connecting"); - GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); } gpr_mu_unlock(&c->mu); @@ -448,10 +430,9 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { /* construct channel stack */ channel_stack_size = grpc_channel_stack_size(filters, num_filters); - con = gpr_malloc(sizeof(grpc_connected_subchannel) + channel_stack_size); - stk = (grpc_channel_stack *)(con + 1); - gpr_ref_init(&con->refs, 1); - grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args, + con = gpr_malloc(channel_stack_size); + stk = CHANNEL_STACK_FROM_CONNECTION(con); + grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters, num_filters, c->args, stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); gpr_free((void *)c->connecting_result.filters); @@ -471,7 +452,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_free(sw_subchannel); gpr_free((void *)filters); grpc_channel_stack_destroy(exec_ctx, stk); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); return; } @@ -495,7 +475,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_mu_unlock(&c->mu); gpr_free((void *)filters); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); } /* Generate a random number between 0 and 1. */ @@ -554,7 +533,6 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { update_reconnect_parameters(c); continue_connect(exec_ctx, c); } else { - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); } } @@ -605,21 +583,13 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, void grpc_subchannel_call_ref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { -#ifdef GRPC_STREAM_REFCOUNT_DEBUG - grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c), reason); -#else - grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c)); -#endif + GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { -#ifdef GRPC_STREAM_REFCOUNT_DEBUG - grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), reason); -#else - grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); -#endif + GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 14eb4baa1f..b7db363866 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -161,15 +161,10 @@ struct grpc_subchannel_args { /** Address to connect to */ struct sockaddr *addr; size_t addr_len; - /** master channel */ - grpc_channel *master; }; /** create a subchannel given a connector */ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args); -/** Return the master channel associated with the subchannel */ -grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel); - #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 859197412b..1632b79d0e 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -63,7 +63,6 @@ typedef struct registered_call { struct grpc_channel { int is_client; - gpr_refcount refs; gpr_uint32 max_message_length; grpc_mdelem *default_authority; diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 471b5a71e7..344b4d79f7 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -207,7 +207,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel( gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); args->args = final_args; - args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 3add8e8007..56d7d295fb 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -159,7 +159,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); args->args = final_args; - args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); -- cgit v1.2.3 From 11beb9a55c34fdac8398ce01381ca030c8d6223f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 Nov 2015 10:29:32 -0800 Subject: Fixes --- src/core/channel/client_channel.c | 4 +++- src/core/channel/client_uchannel.c | 2 +- src/core/channel/subchannel_call_holder.c | 8 ++++++-- src/core/channel/subchannel_call_holder.h | 4 +++- test/core/end2end/fixtures/h2_uchannel.c | 3 ++- 5 files changed, 15 insertions(+), 6 deletions(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index fe43c50018..955b390dbf 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -359,7 +359,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, /* Constructor for call_data */ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { - grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem); + grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem, + args->call_stack); } /* Destructor for call_data */ @@ -381,6 +382,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, gpr_mu_init(&chand->mu_config); grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); + chand->owning_stack = args->channel_stack; grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index ff50ebfb60..f8c621b8eb 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -138,7 +138,7 @@ static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel, - elem->channel_data); + elem->channel_data, args->call_stack); } /* Destructor for call_data */ diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 9875dd8080..1455cc0a24 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -57,7 +57,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, void grpc_subchannel_call_holder_init( grpc_subchannel_call_holder *holder, grpc_subchannel_call_holder_pick_subchannel pick_subchannel, - void *pick_subchannel_arg) { + void *pick_subchannel_arg, grpc_call_stack *owning_call) { gpr_atm_rel_store(&holder->subchannel_call, 0); holder->pick_subchannel = pick_subchannel; holder->pick_subchannel_arg = pick_subchannel_arg; @@ -67,6 +67,7 @@ void grpc_subchannel_call_holder_init( holder->waiting_ops_count = 0; holder->waiting_ops_capacity = 0; holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + holder->owning_call = owning_call; } void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, @@ -141,10 +142,12 @@ retry: op->send_initial_metadata != NULL) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; grpc_closure_init(&holder->next_step, subchannel_ready, holder); + GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel"); if (holder->pick_subchannel( exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata, &holder->connected_subchannel, &holder->next_step)) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); } } /* if we've got a subchannel, then let's ask it to create a call */ @@ -171,8 +174,8 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); call = GET_CALL(holder); GPR_ASSERT(call == NULL || call == CANCELLED_CALL); + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (holder->connected_subchannel == NULL) { - holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; fail_locked(exec_ctx, holder); } else { gpr_atm_rel_store( @@ -182,6 +185,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { retry_waiting_locked(exec_ctx, holder); } gpr_mu_unlock(&holder->mu); + GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); } typedef struct { diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h index 692c5e5316..9cf72c6cf7 100644 --- a/src/core/channel/subchannel_call_holder.h +++ b/src/core/channel/subchannel_call_holder.h @@ -78,12 +78,14 @@ typedef struct grpc_subchannel_call_holder { size_t waiting_ops_capacity; grpc_closure next_step; + + grpc_call_stack *owning_call; } grpc_subchannel_call_holder; void grpc_subchannel_call_holder_init( grpc_subchannel_call_holder *holder, grpc_subchannel_call_holder_pick_subchannel pick_subchannel, - void *pick_subchannel_arg); + void *pick_subchannel_arg, grpc_call_stack *owning_call); void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder); diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 56d7d295fb..6c1638590f 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -252,7 +252,8 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); while (g_state != GRPC_CHANNEL_READY) { grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &pollset, &worker, gpr_now(GPR_CLOCK_REALTIME), + grpc_pollset_work(&exec_ctx, &pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); grpc_exec_ctx_flush(&exec_ctx); -- cgit v1.2.3 From cb2609f4756d0c28632c948bdcb3e7cc9d3a8124 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 Nov 2015 17:19:19 -0800 Subject: Signal back to resolver on error --- src/core/channel/client_channel.c | 7 ++++++- src/core/client_config/lb_policies/pick_first.c | 4 ++++ src/core/client_config/resolver.c | 5 ++--- src/core/client_config/resolver.h | 6 ++---- src/core/client_config/resolvers/dns_resolver.c | 6 ++---- src/core/client_config/resolvers/sockaddr_resolver.c | 6 ++---- 6 files changed, 18 insertions(+), 16 deletions(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 955b390dbf..7a4c16227d 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -114,10 +114,15 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, static void on_lb_policy_state_changed_locked( grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) { + grpc_connectivity_state publish_state = w->state; /* check if the notification is for a stale policy */ if (w->lb_policy != w->chand->lb_policy) return; - grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state, + if ((publish_state == GRPC_CHANNEL_FATAL_FAILURE || publish_state == GRPC_CHANNEL_TRANSIENT_FAILURE) && w->chand->resolver != NULL) { + publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); + } + grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state, "lb_changed"); if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 1c9652fc47..4ecfc11cdd 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -243,6 +243,10 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); return; } else if (p->selected != NULL) { + if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { + /* if the selected channel goes bad, we're done */ + p->checking_connectivity = GRPC_CHANNEL_FATAL_FAILURE; + } grpc_connectivity_state_set(exec_ctx, &p->state_tracker, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c index 3dbdbf0ded..eda01e72ba 100644 --- a/src/core/client_config/resolver.c +++ b/src/core/client_config/resolver.c @@ -71,9 +71,8 @@ void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { } void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver, - grpc_subchannel *subchannel) { - resolver->vtable->channel_saw_error(exec_ctx, resolver, subchannel); + grpc_resolver *resolver) { + resolver->vtable->channel_saw_error(exec_ctx, resolver); } void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, diff --git a/src/core/client_config/resolver.h b/src/core/client_config/resolver.h index 09691c0a87..e612eaf3b3 100644 --- a/src/core/client_config/resolver.h +++ b/src/core/client_config/resolver.h @@ -51,8 +51,7 @@ struct grpc_resolver { struct grpc_resolver_vtable { void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_subchannel *subchannel); + void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); void (*next)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, grpc_client_config **target_config, grpc_closure *on_complete); }; @@ -80,8 +79,7 @@ void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); /** Notification that the channel has seen an error on some address. Can be used as a hint that re-resolution is desirable soon. */ void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver, - grpc_subchannel *subchannel); + grpc_resolver *resolver); /** Get the next client config. Called by the channel to fetch a new configuration. Expected to set *target_config with a new configuration, diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index f02d597363..a467340ba2 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -81,8 +81,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, dns_resolver *r); static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r, - grpc_subchannel *subchannel); +static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r); static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, grpc_client_config **target_config, grpc_closure *on_complete); @@ -102,8 +101,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { } static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver, - grpc_subchannel *subchannel) { + grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; gpr_mu_lock(&r->mu); if (!r->resolving) { diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 1091fa0c25..fd0212a1e7 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -83,8 +83,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *r, - grpc_subchannel *subchannel); + grpc_resolver *r); static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, grpc_client_config **target_config, grpc_closure *on_complete); @@ -106,8 +105,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, } static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver, - grpc_subchannel *subchannel) { + grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; gpr_mu_lock(&r->mu); r->published = 0; -- cgit v1.2.3 From ee7531cd7c0ef681d104ab0ec83a8fb151ebb62b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 25 Nov 2015 09:48:15 -0800 Subject: Refine condition to fix some tests --- src/core/channel/client_channel.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 7a4c16227d..ae1f3cf4c2 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -118,7 +118,7 @@ static void on_lb_policy_state_changed_locked( /* check if the notification is for a stale policy */ if (w->lb_policy != w->chand->lb_policy) return; - if ((publish_state == GRPC_CHANNEL_FATAL_FAILURE || publish_state == GRPC_CHANNEL_TRANSIENT_FAILURE) && w->chand->resolver != NULL) { + if (publish_state == GRPC_CHANNEL_FATAL_FAILURE && w->chand->resolver != NULL) { publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); } -- cgit v1.2.3 From 86c99580a0891697f3c5227ae2fd2911734098fc Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 25 Nov 2015 15:22:26 -0800 Subject: Load balancing interest management fixes --- src/core/channel/client_channel.c | 36 ++++++++++++++++- src/core/client_config/lb_policies/pick_first.c | 40 +++++-------------- src/core/client_config/lb_policies/round_robin.c | 49 +++++++----------------- src/core/client_config/lb_policy.c | 1 + src/core/client_config/lb_policy.h | 1 + src/core/client_config/subchannel.c | 38 +++++++++++------- src/core/client_config/subchannel.h | 10 +---- src/core/iomgr/pollset_set.h | 22 +++++++---- src/core/iomgr/pollset_set_posix.c | 44 +++++++++++++++++++++ src/core/iomgr/pollset_set_posix.h | 4 ++ src/core/iomgr/pollset_set_windows.c | 8 ++++ src/core/transport/transport.h | 2 +- test/core/end2end/fixtures/h2_uchannel.c | 6 ++- 13 files changed, 161 insertions(+), 100 deletions(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index ae1f3cf4c2..5fec87c67c 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -78,6 +78,8 @@ typedef struct client_channel_channel_data { int exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack *owning_stack; + /** interested parties */ + grpc_pollset_set interested_parties; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -177,6 +179,10 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->incoming_configuration = NULL; + if (lb_policy != NULL) { + grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, &chand->interested_parties); + } + gpr_mu_lock(&chand->mu_config); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; @@ -220,6 +226,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (old_lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties); grpc_lb_policy_shutdown(exec_ctx, old_lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -263,6 +270,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; @@ -391,6 +399,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); + grpc_pollset_set_init(&chand->interested_parties); } /* Destructor for channel_data */ @@ -403,9 +412,11 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } if (chand->lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); + grpc_pollset_set_destroy(&chand->interested_parties); gpr_mu_destroy(&chand->mu_config); } @@ -465,12 +476,35 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( return out; } +typedef struct { + channel_data *chand; + grpc_pollset *pollset; + grpc_closure *on_complete; + grpc_closure my_closure; +} external_connectivity_watcher; + +static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { + external_connectivity_watcher *w = arg; + grpc_closure *follow_up = w->on_complete; + grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, w->pollset); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); + gpr_free(w); + follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success); +} + void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; + external_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + w->chand = chand; + w->pollset = pollset; + w->on_complete = on_complete; + grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset); + grpc_closure_init(&w->my_closure, on_external_watch_complete, w); + GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); gpr_mu_lock(&chand->mu_config); grpc_connectivity_state_notify_on_state_change( - exec_ctx, &chand->state_tracker, state, on_complete); + exec_ctx, &chand->state_tracker, state, &w->my_closure); gpr_mu_unlock(&chand->mu_config); } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 4ecfc11cdd..c0f1d3fd94 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -76,24 +76,6 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); - } -} - -static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_add_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); - } -} - void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; @@ -114,7 +96,6 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); - del_interested_parties_locked(exec_ctx, p); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; @@ -124,6 +105,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); pp = next; @@ -140,8 +122,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -161,6 +142,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } @@ -187,8 +169,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_subchannel_add_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pollset); + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -275,8 +256,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; - grpc_subchannel_del_interested_party(exec_ctx, selected_subchannel, - pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } @@ -288,15 +268,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure"); - del_interested_parties_locked(exec_ctx, p); p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(exec_ctx, p); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { goto loop; @@ -309,13 +288,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_changed"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(exec_ctx, p); - GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], - p->subchannels[p->num_subchannels - 1]); p->num_subchannels--; + GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], + p->subchannels[p->num_subchannels]); GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], "pick_first"); if (p->num_subchannels == 0) { @@ -336,7 +315,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(exec_ctx, p); goto loop; } } diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index ca0d6abd07..10688b3fa5 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -200,23 +200,10 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, gpr_free(node); } -static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx, - round_robin_lb_policy *p, - const size_t subchannel_idx) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[subchannel_idx], pp->pollset); - } -} - void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; size_t i; ready_list *elem; - for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(exec_ctx, p, i); - } for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "round_robin"); } @@ -243,15 +230,10 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); - for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(exec_ctx, p, i); - } - p->shutdown = 1; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -268,17 +250,13 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; - size_t i; gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], - pp->pollset); - } + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -298,6 +276,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { for (i = 0; i < p->num_subchannels; i++) { p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i], + &p->base.interested_parties, &p->subchannel_connectivity[i], &p->connectivity_changed_cbs[i]); GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity"); @@ -316,7 +295,6 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target, grpc_closure *on_complete) { - size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; ready_list *selected; @@ -336,10 +314,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], - pollset); - } + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -398,13 +373,15 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_subchannel_del_interested_party(exec_ctx, selected->subchannel, - pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, + exec_ctx, + p->subchannels[this_idx], + &p->base.interested_parties, + this_connectivity, &p->connectivity_changed_cbs[this_idx]); break; case GRPC_CHANNEL_CONNECTING: @@ -412,14 +389,17 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, *this_connectivity, "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, + exec_ctx, p->subchannels[this_idx], + &p->base.interested_parties, + this_connectivity, &p->connectivity_changed_cbs[this_idx]); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: - del_interested_parties_locked(exec_ctx, p, this_idx); /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, + exec_ctx, p->subchannels[this_idx], + &p->base.interested_parties, + this_connectivity, &p->connectivity_changed_cbs[this_idx]); /* remove from ready list if still present */ @@ -433,7 +413,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_transient_failure"); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(exec_ctx, p, this_idx); if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { remove_disconnected_sc_locked( p, p->subchannel_index_to_readylist_node[this_idx]); diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 5605f788a5..8d9287c36a 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -37,6 +37,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; gpr_ref_init(&policy->refs, 1); + grpc_pollset_set_init(&policy->interested_parties); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 2889b8e55d..985c96630f 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -48,6 +48,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_refcount refs; + grpc_pollset_set interested_parties; }; struct grpc_lb_policy_vtable { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 16f9346a35..3872cacfa0 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -224,18 +224,6 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, } } -void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_pollset *pollset) { - grpc_pollset_set_add_pollset(exec_ctx, &c->pollset_set, pollset); -} - -void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_pollset *pollset) { - grpc_pollset_set_del_pollset(exec_ctx, &c->pollset_set, pollset); -} - static gpr_uint32 random_seed() { return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); } @@ -298,14 +286,38 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } +typedef struct { + grpc_subchannel *subchannel; + grpc_pollset_set *pollset_set; + grpc_closure *notify; + grpc_closure closure; +} external_state_watcher; + +static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { + external_state_watcher *w = arg; + grpc_closure *follow_up = w->notify; + grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + GRPC_SUBCHANNEL_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); + gpr_free(w); + follow_up->cb(exec_ctx, follow_up->cb_arg, success); +} + void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify) { int do_connect = 0; + external_state_watcher *w = gpr_malloc(sizeof(*w)); + w->subchannel = c; + w->pollset_set = interested_parties; + w->notify = notify; + grpc_closure_init(&w->closure, on_external_state_watcher_done, w); + grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); + GRPC_SUBCHANNEL_REF(c, "external_state_watcher"); gpr_mu_lock(&c->mu); if (grpc_connectivity_state_notify_on_state_change( - exec_ctx, &c->state_tracker, state, notify)) { + exec_ctx, &c->state_tracker, state, &w->closure)) { do_connect = 1; c->connecting = 1; /* released by connection */ diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b50d1e8ecc..20d74e9f10 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -109,6 +109,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( Updates *state with the new state of the channel */ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify); void grpc_connected_subchannel_notify_on_state_change( @@ -124,15 +125,6 @@ void grpc_connected_subchannel_state_change_unsubscribe( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_closure *subscribed_notify); -/** express interest in \a channel's activities through \a pollset. */ -void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_pollset *pollset); -/** stop following \a channel's activity through \a pollset. */ -void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_pollset *pollset); - /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 0fdcba01a4..e93a3dbb56 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -49,13 +49,19 @@ #include "src/core/iomgr/pollset_set_windows.h" #endif -void grpc_pollset_set_init(grpc_pollset_set* pollset_set); -void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set); -void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, - grpc_pollset* pollset); -void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, - grpc_pollset* pollset); +void grpc_pollset_set_init(grpc_pollset_set *pollset_set); +void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); +void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset); +void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset); +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item); +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index c86ed3d5da..f29ef7cdcf 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -55,6 +55,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); } gpr_free(pollset_set->pollsets); + gpr_free(pollset_set->pollset_sets); gpr_free(pollset_set->fds); } @@ -99,6 +100,43 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&pollset_set->mu); } +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i, j; + gpr_mu_lock(&bag->mu); + if (bag->pollset_set_count == bag->pollset_set_capacity) { + bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); + bag->pollset_sets = gpr_realloc(bag->pollset_sets, bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + } + bag->pollset_sets[bag->pollset_set_count++] = item; + for (i = 0, j = 0; i < bag->fd_count; i++) { + if (grpc_fd_is_orphaned(bag->fds[i])) { + GRPC_FD_UNREF(bag->fds[i], "pollset"); + } else { + grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]); + bag->fds[j++] = bag->fds[i]; + } + } + bag->fd_count = j; + gpr_mu_unlock(&bag->mu); +} + +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i; + gpr_mu_lock(&bag->mu); + for (i = 0; i < bag->pollset_set_count; i++) { + if (bag->pollset_sets[i] == item) { + bag->pollset_set_count--; + GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], bag->pollset_sets[bag->pollset_set_count]); + break; + } + } + gpr_mu_unlock(&bag->mu); +} + void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd) { size_t i; @@ -113,6 +151,9 @@ void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, for (i = 0; i < pollset_set->pollset_count; i++) { grpc_pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + grpc_pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } gpr_mu_unlock(&pollset_set->mu); } @@ -129,6 +170,9 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, break; } } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + grpc_pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } gpr_mu_unlock(&pollset_set->mu); } diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index 05234fb642..4820a61e4b 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -44,6 +44,10 @@ typedef struct grpc_pollset_set { size_t pollset_capacity; grpc_pollset **pollsets; + size_t pollset_set_count; + size_t pollset_set_capacity; + struct grpc_pollset_set **pollset_sets; + size_t fd_count; size_t fd_capacity; grpc_fd **fds; diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 53d5d3dcd4..04d88839cb 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -49,4 +49,12 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pollset_set, grpc_pollset* pollset) {} +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) {} + +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) {} + #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f94f0ae76e..08f34ff0aa 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,7 +50,7 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; -/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ +#define GRPC_STREAM_REFCOUNT_DEBUG typedef struct grpc_stream_refcount { gpr_refcount refs; diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index a1b64573b8..25540256c4 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -247,9 +247,11 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, int success) { static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_pollset pollset; + grpc_pollset_set interested_parties; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_init(&pollset); - grpc_subchannel_add_interested_party(&exec_ctx, c, &pollset); + grpc_pollset_set_add_pollset(&exec_ctx, &interested_parties, &pollset); + grpc_subchannel_add_interested_parties(&exec_ctx, c, &interested_parties); grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_state, grpc_closure_create(state_changed, c)); grpc_exec_ctx_flush(&exec_ctx); @@ -266,7 +268,7 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_pollset_shutdown(&exec_ctx, &pollset, grpc_closure_create(destroy_pollset, &pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); - grpc_subchannel_del_interested_party(&exec_ctx, c, &pollset); + grpc_subchannel_del_interested_parties(&exec_ctx, c, &interested_parties); grpc_exec_ctx_finish(&exec_ctx); return grpc_subchannel_get_connected_subchannel(c); } -- cgit v1.2.3 From 50ec2670a45799b95f2910f26a5a9f79ab2e8404 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 27 Nov 2015 21:45:11 -0800 Subject: Most of the way to auto-cleanup subchannels --- src/core/channel/channel_stack.c | 3 +- src/core/channel/channel_stack.h | 1 + src/core/channel/client_channel.c | 9 ----- src/core/client_config/lb_policies/pick_first.c | 44 +-------------------- src/core/client_config/lb_policies/round_robin.c | 25 +----------- src/core/client_config/lb_policy.c | 6 +-- src/core/client_config/lb_policy.h | 7 ---- src/core/client_config/subchannel.c | 50 +++++------------------- src/core/client_config/subchannel.h | 3 -- src/core/iomgr/fd_posix.c | 6 ++- src/core/iomgr/pollset_set_posix.c | 13 +++--- src/core/surface/channel.c | 2 +- src/core/transport/transport.h | 2 - test/core/channel/channel_stack_test.c | 2 +- test/core/end2end/fixtures/h2_uchannel.c | 12 +++--- 15 files changed, 36 insertions(+), 149 deletions(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 559ad0a32c..d2f6a90ca8 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -106,6 +106,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *channel_args, + const char *name, grpc_channel_stack *stack) { size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + @@ -117,7 +118,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, stack->count = filter_count; GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg, - "CHANNEL_STACK"); + name); elems = CHANNEL_ELEMS_FROM_STACK(stack); user_data = ((char *)elems) + diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 766f543404..bb7081b2a2 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -183,6 +183,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args, + const char *name, grpc_channel_stack *stack); /* Destroy a channel stack */ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 5fec87c67c..5ad2e075c3 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -260,10 +260,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } lb_policy = chand->lb_policy; - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "broadcast"); - } - if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); @@ -282,11 +278,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_resolver_shutdown(exec_ctx, destroy_resolver); GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel"); } - - if (lb_policy) { - grpc_lb_policy_broadcast(exec_ctx, lb_policy, op); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast"); - } } typedef struct { diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index c0f1d3fd94..d83f3718c2 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -185,7 +185,6 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; size_t i; - grpc_transport_op op; size_t num_subchannels = p->num_subchannels; grpc_subchannel **subchannels; grpc_connected_subchannel *exclude_subchannel; @@ -199,12 +198,6 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { - if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != - exclude_subchannel) { - memset(&op, 0, sizeof(op)); - op.disconnect = 1; - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op); - } GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); } @@ -323,41 +316,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&p->mu); } -static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_transport_op *op) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - size_t i; - size_t n; - grpc_subchannel **subchannels; - grpc_connected_subchannel *selected; - - gpr_mu_lock(&p->mu); - n = p->num_subchannels; - subchannels = gpr_malloc(n * sizeof(*subchannels)); - selected = p->selected; - if (selected) { - GRPC_CONNECTED_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected"); - } - for (i = 0; i < n; i++) { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast"); - } - gpr_mu_unlock(&p->mu); - - for (i = 0; i < n; i++) { - if (selected != grpc_subchannel_get_connected_subchannel(subchannels[i])) { - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); - } - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); - } - if (p->selected) { - grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, - "pf_broadcast_to_selected"); - } - gpr_free(subchannels); -} - static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; @@ -380,7 +338,7 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle, - pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; + pf_check_connectivity, pf_notify_on_state_change}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 10688b3fa5..16afd8c10e 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -451,29 +451,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } } -static void rr_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_transport_op *op) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - size_t i; - size_t n; - grpc_subchannel **subchannels; - - gpr_mu_lock(&p->mu); - n = p->num_subchannels; - subchannels = gpr_malloc(n * sizeof(*subchannels)); - for (i = 0; i < n; i++) { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast"); - } - gpr_mu_unlock(&p->mu); - - for (i = 0; i < n; i++) { - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "rr_broadcast"); - } - gpr_free(subchannels); -} - static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; @@ -497,7 +474,7 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle, - rr_broadcast, rr_check_connectivity, rr_notify_on_state_change}; + rr_check_connectivity, rr_notify_on_state_change}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 8d9287c36a..2b874daef6 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -61,6 +61,7 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy, void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { #endif if (gpr_unref(&policy->refs)) { + grpc_pollset_set_destroy(&policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); } } @@ -83,11 +84,6 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, policy->vtable->cancel_pick(exec_ctx, policy, target); } -void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_transport_op *op) { - policy->vtable->broadcast(exec_ctx, policy, op); -} - void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { policy->vtable->exit_idle(exec_ctx, policy); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 985c96630f..96b2bdf4ca 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -66,10 +66,6 @@ struct grpc_lb_policy_vtable { /** try to enter a READY connectivity state */ void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - /** broadcast a transport op to all subchannels */ - void (*broadcast)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_transport_op *op); - /** check the current connectivity of the lb_policy */ grpc_connectivity_state (*check_connectivity)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); @@ -118,9 +114,6 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target); -void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_transport_op *op); - void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 1c66a73146..434a37cf6b 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -238,7 +238,7 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); - if ((old_refs & STRONG_REF_MASK) == 0) { + if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { disconnect(exec_ctx, c); } GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref"); @@ -351,7 +351,7 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, do_connect = 1; c->connecting = 1; /* released by connection */ - GRPC_SUBCHANNEL_REF(c, "connecting"); + GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); } gpr_mu_unlock(&c->mu); @@ -369,40 +369,6 @@ void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&c->mu); } -void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_transport_op *op) { - grpc_connected_subchannel *con; - int cancel_alarm = 0; - gpr_mu_lock(&c->mu); - con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); - if (con != NULL) { - GRPC_CONNECTED_SUBCHANNEL_REF(con, "transport-op"); - } - if (op->disconnect) { - c->disconnected = 1; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); - if (c->have_alarm) { - cancel_alarm = 1; - } - } - gpr_mu_unlock(&c->mu); - - if (con != NULL) { - grpc_connected_subchannel_process_transport_op(exec_ctx, con, op); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "transport-op"); - } - - if (cancel_alarm) { - grpc_timer_cancel(exec_ctx, &c->alarm); - } - - if (op->disconnect) { - grpc_connector_shutdown(exec_ctx, c->connector); - } -} - void grpc_connected_subchannel_process_transport_op( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) { @@ -488,7 +454,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { con = gpr_malloc(channel_stack_size); stk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters, - num_filters, c->args, stk); + num_filters, c->args, "CONNECTED_SUBCHANNEL", stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); gpr_free((void *)c->connecting_result.filters); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); @@ -507,7 +473,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_free(sw_subchannel); gpr_free((void *)filters); grpc_channel_stack_destroy(exec_ctx, stk); - GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); + gpr_free(con); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); return; } @@ -519,7 +486,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( exec_ctx, con, &sw_subchannel->connectivity_state, &sw_subchannel->closure); @@ -588,17 +555,18 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { update_reconnect_parameters(c); continue_connect(exec_ctx, c); } else { - GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } } static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { grpc_subchannel *c = arg; + if (c->connecting_result.transport != NULL) { publish_transport(exec_ctx, c); } else if (c->disconnected) { - /* do nothing */ + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_mu_lock(&c->mu); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b64a26561b..66c13990e9 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -105,9 +105,6 @@ grpc_subchannel_call *grpc_connected_subchannel_create_call( grpc_pollset *pollset); /** process a transport level op */ -void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - grpc_transport_op *op); void grpc_connected_subchannel_process_transport_op( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *subchannel, grpc_transport_op *op); diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 7ff80e6cf8..81c19ca797 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -43,6 +43,7 @@ #include #include +#include #include #define CLOSURE_NOT_READY ((grpc_closure *)0) @@ -158,7 +159,10 @@ void grpc_fd_global_shutdown(void) { grpc_fd *grpc_fd_create(int fd, const char *name) { grpc_fd *r = alloc_fd(fd); - grpc_iomgr_register_object(&r->iomgr_object, name); + char *name2; + gpr_asprintf(&name2, "%s fd=%d", name, fd); + grpc_iomgr_register_object(&r->iomgr_object, name2); + gpr_free(name2); #ifdef GRPC_FD_REF_COUNT_DEBUG gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name); #endif diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index f29ef7cdcf..7f0b34c36b 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -52,7 +52,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { size_t i; gpr_mu_destroy(&pollset_set->mu); for (i = 0; i < pollset_set->fd_count; i++) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); } gpr_free(pollset_set->pollsets); gpr_free(pollset_set->pollset_sets); @@ -74,7 +74,7 @@ void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, pollset_set->pollsets[pollset_set->pollset_count++] = pollset; for (i = 0, j = 0; i < pollset_set->fd_count; i++) { if (grpc_fd_is_orphaned(pollset_set->fds[i])) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); } else { grpc_pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); pollset_set->fds[j++] = pollset_set->fds[i]; @@ -107,12 +107,13 @@ void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&bag->mu); if (bag->pollset_set_count == bag->pollset_set_capacity) { bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); - bag->pollset_sets = gpr_realloc(bag->pollset_sets, bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + bag->pollset_sets = gpr_realloc(bag->pollset_sets, + bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); } bag->pollset_sets[bag->pollset_set_count++] = item; for (i = 0, j = 0; i < bag->fd_count; i++) { if (grpc_fd_is_orphaned(bag->fds[i])) { - GRPC_FD_UNREF(bag->fds[i], "pollset"); + GRPC_FD_UNREF(bag->fds[i], "pollset_set"); } else { grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]); bag->fds[j++] = bag->fds[i]; @@ -130,7 +131,9 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, for (i = 0; i < bag->pollset_set_count; i++) { if (bag->pollset_sets[i] == item) { bag->pollset_set_count--; - GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], bag->pollset_sets[bag->pollset_set_count]); + GPR_SWAP(grpc_pollset_set *, + bag->pollset_sets[i], + bag->pollset_sets[bag->pollset_set_count]); break; } } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a78fd0aae2..92fd3dadd7 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -152,7 +152,7 @@ grpc_channel *grpc_channel_create_from_filters( } grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters, - num_filters, args, + num_filters, args, is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL", CHANNEL_STACK_FROM_CHANNEL(channel)); return channel; diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 08f34ff0aa..4de72d7d75 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,8 +50,6 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; -#define GRPC_STREAM_REFCOUNT_DEBUG - typedef struct grpc_stream_refcount { gpr_refcount refs; grpc_closure destroy; diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index cab31bc69d..f1bb37c0bf 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -116,7 +116,7 @@ static void test_create_channel_stack(void) { channel_stack = gpr_malloc(grpc_channel_stack_size(&filters, 1)); grpc_channel_stack_init(&exec_ctx, 1, free_channel, channel_stack, &filters, - 1, &chan_args, channel_stack); + 1, &chan_args, "test", channel_stack); GPR_ASSERT(channel_stack->count == 1); channel_elem = grpc_channel_stack_element(channel_stack, 0); channel_data = (int *)channel_elem->channel_data; diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 25540256c4..0c062d798a 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -233,11 +233,12 @@ static grpc_end2end_test_fixture chttp2_create_fixture_micro_fullstack( } grpc_connectivity_state g_state = GRPC_CHANNEL_IDLE; +grpc_pollset_set g_interested_parties; static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) { if (g_state != GRPC_CHANNEL_READY) { grpc_subchannel_notify_on_state_change( - exec_ctx, arg, &g_state, grpc_closure_create(state_changed, arg)); + exec_ctx, arg, &g_interested_parties, &g_state, grpc_closure_create(state_changed, arg)); } } @@ -247,12 +248,11 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, int success) { static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_pollset pollset; - grpc_pollset_set interested_parties; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_init(&pollset); - grpc_pollset_set_add_pollset(&exec_ctx, &interested_parties, &pollset); - grpc_subchannel_add_interested_parties(&exec_ctx, c, &interested_parties); - grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_state, + grpc_pollset_set_init(&g_interested_parties); + grpc_pollset_set_add_pollset(&exec_ctx, &g_interested_parties, &pollset); + grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, &g_state, grpc_closure_create(state_changed, c)); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); @@ -267,8 +267,8 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { } grpc_pollset_shutdown(&exec_ctx, &pollset, grpc_closure_create(destroy_pollset, &pollset)); + grpc_pollset_set_destroy(&g_interested_parties); gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); - grpc_subchannel_del_interested_parties(&exec_ctx, c, &interested_parties); grpc_exec_ctx_finish(&exec_ctx); return grpc_subchannel_get_connected_subchannel(c); } -- cgit v1.2.3 From 486130455f16d30b3c1f792bf302f0858b334d3d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sun, 29 Nov 2015 14:45:11 -0800 Subject: Sanitize unsubscription to be callback preserving --- src/core/channel/client_channel.c | 2 - src/core/channel/client_uchannel.c | 5 +- src/core/client_config/lb_policies/pick_first.c | 20 +++-- src/core/client_config/lb_policy.c | 58 ++++++++----- src/core/client_config/lb_policy.h | 18 ++++- src/core/client_config/subchannel.c | 103 +++++++++++++----------- src/core/client_config/subchannel.h | 9 --- src/core/iomgr/fd_posix.h | 1 + src/core/iomgr/iomgr.c | 1 + src/core/transport/chttp2_transport.c | 12 +-- src/core/transport/connectivity_state.c | 69 ++++++++-------- src/core/transport/connectivity_state.h | 11 +-- src/core/transport/transport.h | 2 + 13 files changed, 173 insertions(+), 138 deletions(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 5ad2e075c3..9d3690dc7d 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -227,7 +227,6 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, if (old_lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties); - grpc_lb_policy_shutdown(exec_ctx, old_lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -267,7 +266,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, chand->resolver = NULL; if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); - grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index f8c621b8eb..f0682d5946 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -166,8 +166,9 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - grpc_connected_subchannel_state_change_unsubscribe( - exec_ctx, chand->connected_subchannel, &chand->connectivity_cb); + /* cancel subscription */ + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, chand->connected_subchannel, NULL, &chand->connectivity_cb); grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); gpr_mu_destroy(&chand->mu_state); } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index d83f3718c2..a622d98317 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -96,11 +96,17 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); + gpr_log(GPR_DEBUG, "LB_POLICY: pf_shutdown: %p", p); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); + if (p->selected != NULL) { + grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, NULL, &p->connectivity_changed); + } else { + grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[p->checking_connectivity], NULL, NULL, &p->connectivity_changed); + } gpr_mu_unlock(&p->mu); while (pp != NULL) { pending_pick *next = pp->next; @@ -139,7 +145,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], &p->base.interested_parties, @@ -195,7 +201,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, p->subchannels = NULL; exclude_subchannel = p->selected; gpr_mu_unlock(&p->mu); - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); @@ -212,9 +218,11 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&p->mu); + gpr_log(GPR_DEBUG, "LB_POLICY: pf_connectivity_changed: %p success=%d shutdown=%d", p, iomgr_success, p->shutdown); + if (p->shutdown) { gpr_mu_unlock(&p->mu); - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); return; } else if (p->selected != NULL) { if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { @@ -228,7 +236,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, p->selected, &p->checking_connectivity, &p->connectivity_changed); } else { - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); } } else { loop: @@ -242,7 +250,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(p->selected); GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first"); /* drop the pick list: we are connected now */ - GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(destroy_subchannels, p), 1); /* update any calls that were waiting for a pick */ @@ -300,7 +308,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); } else { grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 2b874daef6..4208c15b3f 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -33,41 +33,59 @@ #include "src/core/client_config/lb_policy.h" +#define WEAK_REF_BITS 16 + void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; - gpr_ref_init(&policy->refs, 1); + gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); grpc_pollset_set_init(&policy->interested_parties); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, - const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s", - policy, (int)policy->refs.count, (int)policy->refs.count + 1, reason); +#define REF_FUNC_EXTRA_ARGS , const char *file, int line, const char *reason +#define REF_MUTATE_EXTRA_ARGS REF_FUNC_EXTRA_ARGS, const char *purpose +#define REF_FUNC_PASS_ARGS(new_reason) , file, line, new_reason +#define REF_MUTATE_PASS_ARGS(purpose) , file, line, reason, purpose #else -void grpc_lb_policy_ref(grpc_lb_policy *policy) { +#define REF_FUNC_EXTRA_ARGS +#define REF_MUTATE_EXTRA_ARGS +#define REF_FUNC_PASS_ARGS(new_reason) +#define REF_MUTATE_PASS_ARGS(x) #endif - gpr_ref(&policy->refs); -} +static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void grpc_lb_policy_unref(grpc_lb_policy *policy, - grpc_closure_list *closure_list, const char *file, - int line, const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s", - policy, (int)policy->refs.count, (int)policy->refs.count - 1, reason); -#else -void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); #endif - if (gpr_unref(&policy->refs)) { - grpc_pollset_set_destroy(&policy->interested_parties); - policy->vtable->destroy(exec_ctx, policy); + return old_val; +} + +void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF")); +} + +void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = ref_mutate(policy, (gpr_atm)1-(gpr_atm)(1 << WEAK_REF_BITS), 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF")); + gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); + gpr_atm check = 1 << WEAK_REF_BITS; + if ((old_val & mask) == check) { + policy->vtable->shutdown(exec_ctx, policy); } + grpc_lb_policy_weak_unref(exec_ctx, policy REF_FUNC_PASS_ARGS("strong-unref")); +} + +void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF")); } -void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { - policy->vtable->shutdown(exec_ctx, policy); +void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); + if (old_val == 1) { + grpc_pollset_set_destroy(&policy->interested_parties); + policy->vtable->destroy(exec_ctx, policy); + } } int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 96b2bdf4ca..988789c934 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -47,7 +47,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; - gpr_refcount refs; + gpr_atm ref_pair; grpc_pollset_set interested_parties; }; @@ -78,29 +78,39 @@ struct grpc_lb_policy_vtable { grpc_closure *closure); }; +#define GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #define GRPC_LB_POLICY_REF(p, r) \ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \ grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) +#define GRPC_LB_POLICY_WEAK_REF(p, r) \ + grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, p, r) \ + grpc_lb_policy_weak_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason); void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const char *file, int line, const char *reason); +void grpc_lb_policy_weak_ref(grpc_lb_policy *policy, const char *file, int line, + const char *reason); +void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const char *file, int line, const char *reason); #else #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_UNREF(cl, p, r) grpc_lb_policy_unref((cl), (p)) +#define GRPC_LB_POLICY_WEAK_REF(p, r) grpc_lb_policy_weak_ref((p)) +#define GRPC_LB_POLICY_WEAK_UNREF(cl, p, r) grpc_lb_policy_weak_unref((cl), (p)) void grpc_lb_policy_ref(grpc_lb_policy *policy); void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); +void grpc_lb_policy_weak_ref(grpc_lb_policy *policy); +void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); #endif /** called by concrete implementations to initialize the base struct */ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); -/** Start shutting down (fail any pending picks) */ -void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - /** Given initial metadata in \a initial_metadata, find an appropriate target for this rpc, and 'return' it by calling \a on_complete after setting \a target. diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 434a37cf6b..9f802f1cc3 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -62,13 +62,19 @@ typedef struct { grpc_closure closure; - union { - grpc_subchannel *subchannel; - grpc_connected_subchannel *connected_subchannel; - } whom; + grpc_subchannel *subchannel; grpc_connectivity_state connectivity_state; } state_watcher; +typedef struct external_state_watcher { + grpc_subchannel *subchannel; + grpc_pollset_set *pollset_set; + grpc_closure *notify; + grpc_closure closure; + struct external_state_watcher *next; + struct external_state_watcher *prev; +} external_state_watcher; + struct grpc_subchannel { grpc_connector *connector; @@ -114,6 +120,8 @@ struct grpc_subchannel { /** connectivity state tracking */ grpc_connectivity_state_tracker state_tracker; + external_state_watcher root_external_state_watcher; + /** next connect attempt time */ gpr_timespec next_attempt; /** amount to backoff each failure */ @@ -201,7 +209,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { +static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_STREAM_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); @@ -277,6 +285,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, &c->initial_connect_string); c->args = grpc_channel_args_copy(args->args); c->random = random_seed(); + c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); @@ -316,17 +325,16 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } -typedef struct { - grpc_subchannel *subchannel; - grpc_pollset_set *pollset_set; - grpc_closure *notify; - grpc_closure closure; -} external_state_watcher; - static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; - grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + if (w->pollset_set != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + } + gpr_mu_lock(&w->subchannel->mu); + w->next->prev = w->prev; + w->prev->next = w->next; + gpr_mu_unlock(&w->subchannel->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); gpr_free(w); follow_up->cb(exec_ctx, follow_up->cb_arg, success); @@ -338,37 +346,47 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_connectivity_state *state, grpc_closure *notify) { int do_connect = 0; - external_state_watcher *w = gpr_malloc(sizeof(*w)); - w->subchannel = c; - w->pollset_set = interested_parties; - w->notify = notify; - grpc_closure_init(&w->closure, on_external_state_watcher_done, w); - grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); - GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); - gpr_mu_lock(&c->mu); - if (grpc_connectivity_state_notify_on_state_change( - exec_ctx, &c->state_tracker, state, &w->closure)) { - do_connect = 1; - c->connecting = 1; - /* released by connection */ - GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); + external_state_watcher *w; + + if (state == NULL) { + gpr_mu_lock(&c->mu); + for (w = c->root_external_state_watcher.next; + w != &c->root_external_state_watcher; + w = w->next) { + if (w->notify == notify) { + grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker, NULL, &w->closure); + } + } + gpr_mu_unlock(&c->mu); + } else { + w = gpr_malloc(sizeof(*w)); + w->subchannel = c; + w->pollset_set = interested_parties; + w->notify = notify; + grpc_closure_init(&w->closure, on_external_state_watcher_done, w); + if (interested_parties != NULL) { + grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); + } + GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); + gpr_mu_lock(&c->mu); + w->next = &c->root_external_state_watcher; + w->prev = w->next->prev; + w->next->prev = w->prev->next = w; + if (grpc_connectivity_state_notify_on_state_change( + exec_ctx, &c->state_tracker, state, &w->closure)) { + do_connect = 1; + c->connecting = 1; + /* released by connection */ + GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); + } + gpr_mu_unlock(&c->mu); } - gpr_mu_unlock(&c->mu); if (do_connect) { start_connect(exec_ctx, c); } } -void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_closure *subscribed_notify) { - gpr_mu_lock(&c->mu); - grpc_connectivity_state_change_unsubscribe(exec_ctx, &c->state_tracker, - subscribed_notify); - gpr_mu_unlock(&c->mu); -} - void grpc_connected_subchannel_process_transport_op( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) { @@ -380,7 +398,7 @@ void grpc_connected_subchannel_process_transport_op( static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, int iomgr_success) { state_watcher *sw = p; - grpc_subchannel *c = sw->whom.subchannel; + grpc_subchannel *c = sw->subchannel; gpr_mu *mu = &c->mu; gpr_mu_lock(mu); @@ -423,16 +441,9 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) { - GPR_ASSERT(state != NULL); connected_subchannel_state_op(exec_ctx, con, state, closure); } -void grpc_connected_subchannel_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_closure *closure) { - connected_subchannel_state_op(exec_ctx, con, NULL, closure); -} - static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { size_t channel_stack_size; grpc_connected_subchannel *con; @@ -461,7 +472,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { /* initialize state watcher */ sw_subchannel = gpr_malloc(sizeof(*sw_subchannel)); - sw_subchannel->whom.subchannel = c; + sw_subchannel->subchannel = c; sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 66c13990e9..b735d2ccab 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -124,15 +124,6 @@ void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_connectivity_state *state, grpc_closure *notify); -/** Remove \a subscribed_notify from the list of closures to be called on a - * state change if present. */ -void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_closure *subscribed_notify); -void grpc_connected_subchannel_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, - grpc_closure *subscribed_notify); - /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index dc917ebbc0..8717aa5103 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -168,6 +168,7 @@ void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); /* Reference counting for fds */ +#define GRPC_FD_REF_COUNT_DEBUG #ifdef GRPC_FD_REF_COUNT_DEBUG void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line); diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 212ce5534d..5bba40bc9b 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -116,6 +116,7 @@ void grpc_iomgr_shutdown(void) { "memory leaks are likely", count_objects()); dump_objects("LEAKED"); + abort(); } break; } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index ff0c616386..91c50dd2cb 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -911,15 +911,9 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); if (op->on_connectivity_state_change != NULL) { - if (op->connectivity_state != NULL) { - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, - op->on_connectivity_state_change); - } else { - grpc_connectivity_state_change_unsubscribe( - exec_ctx, &t->channel_callback.state_tracker, - op->on_connectivity_state_change); - } + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change); } if (op->send_goaway) { diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index a435ba49f2..569d3c30b2 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -98,42 +98,47 @@ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify) { if (grpc_connectivity_state_trace) { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", - tracker, tracker->name, grpc_connectivity_state_name(*current), - grpc_connectivity_state_name(tracker->current_state), notify); + if (current == NULL) { + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", + tracker, tracker->name, notify); + } else { + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", + tracker, tracker->name, grpc_connectivity_state_name(*current), + grpc_connectivity_state_name(tracker->current_state), notify); + } } - if (tracker->current_state != *current) { - *current = tracker->current_state; - grpc_exec_ctx_enqueue(exec_ctx, notify, 1); + if (current == NULL) { + grpc_connectivity_state_watcher *w = tracker->watchers; + if (w != NULL && w->notify == notify) { + grpc_exec_ctx_enqueue(exec_ctx, notify, 0); + tracker->watchers = w->next; + gpr_free(w); + return 0; + } + while (w != NULL) { + grpc_connectivity_state_watcher *rm_candidate = w->next; + if (rm_candidate != NULL && rm_candidate->notify == notify) { + grpc_exec_ctx_enqueue(exec_ctx, notify, 0); + w->next = w->next->next; + gpr_free(rm_candidate); + return 0; + } + w = w->next; + } + return 0; } else { - grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); - w->current = current; - w->notify = notify; - w->next = tracker->watchers; - tracker->watchers = w; - } - return tracker->current_state == GRPC_CHANNEL_IDLE; -} - -int grpc_connectivity_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, - grpc_closure *subscribed_notify) { - grpc_connectivity_state_watcher *w = tracker->watchers; - if (w != NULL && w->notify == subscribed_notify) { - tracker->watchers = w->next; - gpr_free(w); - return 1; - } - while (w != NULL) { - grpc_connectivity_state_watcher *rm_candidate = w->next; - if (rm_candidate != NULL && rm_candidate->notify == subscribed_notify) { - w->next = w->next->next; - gpr_free(rm_candidate); - return 1; + if (tracker->current_state != *current) { + *current = tracker->current_state; + grpc_exec_ctx_enqueue(exec_ctx, notify, 1); + } else { + grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); + w->current = current; + w->notify = notify; + w->next = tracker->watchers; + tracker->watchers = w; } - w = w->next; + return tracker->current_state == GRPC_CHANNEL_IDLE; } - return 0; } void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index 119b1c1554..d8b5b38da0 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -73,16 +73,11 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); -/** Return 1 if the channel should start connecting, 0 otherwise */ +/** Return 1 if the channel should start connecting, 0 otherwise. + If current==NULL cancel notify if it is already queued (success==0 in that + case) */ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify); -/** Remove \a subscribed_notify from the list of closures to be called on a - * state change if present, returning 1. Otherwise, nothing is done and return - * 0. */ -int grpc_connectivity_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, - grpc_closure *subscribed_notify); - #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */ diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 4de72d7d75..08f34ff0aa 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,6 +50,8 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; +#define GRPC_STREAM_REFCOUNT_DEBUG + typedef struct grpc_stream_refcount { gpr_refcount refs; grpc_closure destroy; -- cgit v1.2.3 From 1d881fbed6601f34ad0919cbb10d7367fe431148 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 1 Dec 2015 07:39:04 -0800 Subject: clang-format --- src/core/channel/channel_stack.c | 3 +- src/core/channel/channel_stack.h | 3 +- src/core/channel/client_channel.c | 30 +++++++---- src/core/client_config/lb_policies/pick_first.c | 34 +++++++----- src/core/client_config/lb_policies/round_robin.c | 61 +++++++++------------- src/core/client_config/lb_policy.c | 26 ++++++--- src/core/client_config/lb_policy.h | 4 +- src/core/client_config/subchannel.c | 60 ++++++++++++--------- src/core/client_config/subchannel.h | 18 +++---- src/core/iomgr/pollset_set.h | 8 +-- src/core/iomgr/pollset_set_posix.c | 10 ++-- src/core/iomgr/pollset_set_windows.c | 12 ++--- src/core/surface/channel.c | 3 +- src/core/transport/connectivity_state.c | 8 +-- src/core/transport/connectivity_state.h | 2 +- test/core/client_config/lb_policies_test.c | 13 +++-- .../set_initial_connect_string_test.c | 8 +-- test/core/end2end/fixtures/h2_uchannel.c | 12 +++-- test/core/end2end/tests/hpack_size.c | 6 +-- 19 files changed, 177 insertions(+), 144 deletions(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index d2f6a90ca8..5e09a050ee 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -106,8 +106,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *channel_args, - const char *name, - grpc_channel_stack *stack) { + const char *name, grpc_channel_stack *stack) { size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element)); diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index bb7081b2a2..c01050e717 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -183,8 +183,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args, - const char *name, - grpc_channel_stack *stack); + const char *name, grpc_channel_stack *stack); /* Destroy a channel stack */ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_channel_stack *stack); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 9d3690dc7d..60f2acb9ef 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -120,7 +120,8 @@ static void on_lb_policy_state_changed_locked( /* check if the notification is for a stale policy */ if (w->lb_policy != w->chand->lb_policy) return; - if (publish_state == GRPC_CHANNEL_FATAL_FAILURE && w->chand->resolver != NULL) { + if (publish_state == GRPC_CHANNEL_FATAL_FAILURE && + w->chand->resolver != NULL) { publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); } @@ -180,7 +181,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->incoming_configuration = NULL; if (lb_policy != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, &chand->interested_parties); + grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, + &chand->interested_parties); } gpr_mu_lock(&chand->mu_config); @@ -226,7 +228,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (old_lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties); + grpc_pollset_set_del_pollset_set(exec_ctx, + &old_lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -265,7 +269,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); + grpc_pollset_set_del_pollset_set(exec_ctx, + &chand->lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } @@ -401,7 +407,9 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } if (chand->lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); + grpc_pollset_set_del_pollset_set(exec_ctx, + &chand->lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); @@ -472,11 +480,14 @@ typedef struct { grpc_closure my_closure; } external_connectivity_watcher; -static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { +static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; - grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, w->pollset); - GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); + grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, + w->pollset); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, + "external_connectivity_watcher"); gpr_free(w); follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success); } @@ -491,7 +502,8 @@ void grpc_client_channel_watch_connectivity_state( w->on_complete = on_complete; grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset); grpc_closure_init(&w->my_closure, on_external_watch_complete, w); - GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); + GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, + "external_connectivity_watcher"); gpr_mu_lock(&chand->mu_config); grpc_connectivity_state_notify_on_state_change( exec_ctx, &chand->state_tracker, state, &w->my_closure); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index acaf356df4..3bfa7a86fb 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -102,15 +102,19 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); if (p->selected != NULL) { - grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, NULL, &p->connectivity_changed); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, p->selected, NULL, &p->connectivity_changed); } else { - grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, + &p->connectivity_changed); } gpr_mu_unlock(&p->mu); while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); pp = next; @@ -127,7 +131,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -147,8 +152,8 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + &p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); } void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { @@ -174,7 +179,8 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, + pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -254,7 +260,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } @@ -273,8 +280,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + &p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); } else { goto loop; } @@ -286,8 +293,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_changed"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + &p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: p->num_subchannels--; @@ -305,7 +312,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, + "pick_first_connectivity"); } else { grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index f0b8ebe6fe..b86dba20ee 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -244,9 +244,7 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, - NULL, - NULL, + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); } gpr_mu_unlock(&p->mu); @@ -262,7 +260,8 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -279,15 +278,15 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { size_t i; p->started_picking = 1; - gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p, p->num_subchannels); + gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p, + p->num_subchannels); for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; sd->connectivity_state = GRPC_CHANNEL_IDLE; - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, - &p->base.interested_parties, - &sd->connectivity_state, - &sd->connectivity_changed_closure); + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); } } @@ -323,7 +322,8 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, + pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -355,8 +355,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_READY, "connecting_ready"); /* add the newly connected subchannel to the list of connected ones. * Note that it goes to the "end of the line". */ - sd->ready_list_node = - add_connected_sc_locked(p, sd->subchannel); + sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel); /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ @@ -375,34 +374,29 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, - sd->subchannel, - &p->base.interested_parties, - &sd->connectivity_state, - &sd->connectivity_changed_closure); + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - sd->connectivity_state, "connecting_changed"); + sd->connectivity_state, + "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, - &p->base.interested_parties, - &sd->connectivity_state, - &sd->connectivity_changed_closure); + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, - &p->base.interested_parties, - &sd->connectivity_state, - &sd->connectivity_changed_closure); + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); /* remove from ready list if still present */ if (sd->ready_list_node != NULL) { @@ -415,16 +409,14 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, break; case GRPC_CHANNEL_FATAL_FAILURE: if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked( - p, sd->ready_list_node); + remove_disconnected_sc_locked(p, sd->ready_list_node); sd->ready_list_node = NULL; } p->num_subchannels--; GPR_SWAP(subchannel_data *, p->subchannels[sd->index], p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, - "round_robin"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); p->subchannels[sd->index]->index = sd->index; gpr_free(sd); @@ -491,8 +483,7 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, memset(p, 0, sizeof(*p)); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); p->num_subchannels = args->num_subchannels; - p->subchannels = - gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels); + p->subchannels = gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels); memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_subchannels); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); @@ -505,8 +496,8 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, sd->policy = p; sd->index = i; sd->subchannel = args->subchannels[i]; - grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, - sd); + grpc_closure_init(&sd->connectivity_changed_closure, + rr_connectivity_changed, sd); } /* The (dummy node) root of the ready list */ diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 4208c15b3f..d254161546 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -54,10 +54,14 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, #define REF_MUTATE_PASS_ARGS(x) #endif -static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { - gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); +static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, + int barrier REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) + : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, + old_val + delta, reason); #endif return old_val; } @@ -66,22 +70,28 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF")); } -void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { - gpr_atm old_val = ref_mutate(policy, (gpr_atm)1-(gpr_atm)(1 << WEAK_REF_BITS), 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF")); +void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = + ref_mutate(policy, (gpr_atm)1 - (gpr_atm)(1 << WEAK_REF_BITS), + 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF")); gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); gpr_atm check = 1 << WEAK_REF_BITS; if ((old_val & mask) == check) { policy->vtable->shutdown(exec_ctx, policy); } - grpc_lb_policy_weak_unref(exec_ctx, policy REF_FUNC_PASS_ARGS("strong-unref")); + grpc_lb_policy_weak_unref(exec_ctx, + policy REF_FUNC_PASS_ARGS("strong-unref")); } void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF")); } -void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { - gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); +void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = + ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { grpc_pollset_set_destroy(&policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 6754661745..2f8d655558 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -93,9 +93,9 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const char *file, int line, const char *reason); void grpc_lb_policy_weak_ref(grpc_lb_policy *policy, const char *file, int line, - const char *reason); + const char *reason); void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - const char *file, int line, const char *reason); + const char *file, int line, const char *reason); #else #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_UNREF(cl, p, r) grpc_lb_policy_unref((cl), (p)) diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index a0e51d57ec..d89a5e9be1 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -78,9 +78,9 @@ typedef struct external_state_watcher { struct grpc_subchannel { grpc_connector *connector; - /** refcount - - lower INTERNAL_REF_BITS bits are for internal references: - these do not keep the subchannel open. + /** refcount + - lower INTERNAL_REF_BITS bits are for internal references: + these do not keep the subchannel open. - upper remaining bits are for public references: these do keep the subchannel open */ gpr_atm ref_pair; @@ -155,7 +155,8 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, #define UNREF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \ (name), (p), (p)->refs.count, (p)->refs.count - 1, reason) -#define REF_MUTATE_EXTRA_ARGS GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose +#define REF_MUTATE_EXTRA_ARGS \ + GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose #define REF_MUTATE_PURPOSE(x) , file, line, reason, x #else #define REF_REASON "" @@ -209,21 +210,27 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { - gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); +static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, + int barrier REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) + : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_STREAM_REFCOUNT_DEBUG - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, + old_val + delta, reason); #endif return old_val; } void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; - old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); + old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), + 0 REF_MUTATE_PURPOSE("STRONG_REF")); GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); } -void grpc_subchannel_weak_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_weak_ref(grpc_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); GPR_ASSERT(old_refs != 0); @@ -246,7 +253,8 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; - old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); + old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), + 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { disconnect(exec_ctx, c); } @@ -254,7 +262,8 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, } void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + grpc_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 0) { @@ -286,7 +295,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, &c->initial_connect_string); c->args = grpc_channel_args_copy(args->args); c->random = random_seed(); - c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; + c->root_external_state_watcher.next = c->root_external_state_watcher.prev = + &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); @@ -326,11 +336,13 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } -static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, + int success) { external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; if (w->pollset_set != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, + w->pollset_set); } gpr_mu_lock(&w->subchannel->mu); w->next->prev = w->prev; @@ -341,21 +353,20 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, i follow_up->cb(exec_ctx, follow_up->cb_arg, success); } -void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_pollset_set *interested_parties, - grpc_connectivity_state *state, - grpc_closure *notify) { +void grpc_subchannel_notify_on_state_change( + grpc_exec_ctx *exec_ctx, grpc_subchannel *c, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, + grpc_closure *notify) { int do_connect = 0; external_state_watcher *w; if (state == NULL) { gpr_mu_lock(&c->mu); - for (w = c->root_external_state_watcher.next; - w != &c->root_external_state_watcher; - w = w->next) { + for (w = c->root_external_state_watcher.next; + w != &c->root_external_state_watcher; w = w->next) { if (w->notify == notify) { - grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker, NULL, &w->closure); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &c->state_tracker, NULL, &w->closure); } } gpr_mu_unlock(&c->mu); @@ -366,7 +377,8 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, w->notify = notify; grpc_closure_init(&w->closure, on_external_state_watcher_done, w); if (interested_parties != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); + grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, + interested_parties); } GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); gpr_mu_lock(&c->mu); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b735d2ccab..e45708f91d 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -68,7 +68,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) -#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) grpc_subchannel_weak_unref((cl), (p)) +#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \ + grpc_subchannel_weak_unref((cl), (p)) #define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) #define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \ grpc_connected_subchannel_unref((cl), (p)) @@ -84,10 +85,10 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_ref(grpc_subchannel *channel - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + grpc_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, @@ -115,11 +116,10 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the channel */ -void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_pollset_set *interested_parties, - grpc_connectivity_state *state, - grpc_closure *notify); +void grpc_subchannel_notify_on_state_change( + grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, + grpc_closure *notify); void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_connectivity_state *state, grpc_closure *notify); diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index e93a3dbb56..09c04438f7 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -58,10 +58,10 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_pollset *pollset); void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item); + grpc_pollset_set *bag, + grpc_pollset_set *item); void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item); + grpc_pollset_set *bag, + grpc_pollset_set *item); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 7f0b34c36b..4ec92202e3 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -107,8 +107,9 @@ void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&bag->mu); if (bag->pollset_set_count == bag->pollset_set_capacity) { bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); - bag->pollset_sets = gpr_realloc(bag->pollset_sets, - bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + bag->pollset_sets = + gpr_realloc(bag->pollset_sets, + bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); } bag->pollset_sets[bag->pollset_set_count++] = item; for (i = 0, j = 0; i < bag->fd_count; i++) { @@ -131,9 +132,8 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, for (i = 0; i < bag->pollset_set_count; i++) { if (bag->pollset_sets[i] == item) { bag->pollset_set_count--; - GPR_SWAP(grpc_pollset_set *, - bag->pollset_sets[i], - bag->pollset_sets[bag->pollset_set_count]); + GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], + bag->pollset_sets[bag->pollset_set_count]); break; } } diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 04d88839cb..157b46ec32 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -49,12 +49,12 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pollset_set, grpc_pollset* pollset) {} -void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) {} +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* bag, + grpc_pollset_set* item) {} -void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) {} +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* bag, + grpc_pollset_set* item) {} #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 92fd3dadd7..b4959069c6 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -152,7 +152,8 @@ grpc_channel *grpc_channel_create_from_filters( } grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters, - num_filters, args, is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL", + num_filters, args, + is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL", CHANNEL_STACK_FROM_CHANNEL(channel)); return channel; diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 569d3c30b2..c409d983da 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -99,11 +99,11 @@ int grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state *current, grpc_closure *notify) { if (grpc_connectivity_state_trace) { if (current == NULL) { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", - tracker, tracker->name, notify); + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker, + tracker->name, notify); } else { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", - tracker, tracker->name, grpc_connectivity_state_name(*current), + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker, + tracker->name, grpc_connectivity_state_name(*current), grpc_connectivity_state_name(tracker->current_state), notify); } } diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index d8b5b38da0..1f3a130e90 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -74,7 +74,7 @@ grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); /** Return 1 if the channel should start connecting, 0 otherwise. - If current==NULL cancel notify if it is already queued (success==0 in that + If current==NULL cancel notify if it is already queued (success==0 in that case) */ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, diff --git a/test/core/client_config/lb_policies_test.c b/test/core/client_config/lb_policies_test.c index 5aa8140e08..6f218e7f08 100644 --- a/test/core/client_config/lb_policies_test.c +++ b/test/core/client_config/lb_policies_test.c @@ -135,9 +135,8 @@ static void kill_server(const servers_fixture *f, size_t i) { gpr_log(GPR_INFO, "KILLING SERVER %d", i); GPR_ASSERT(f->servers[i] != NULL); grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000)); - GPR_ASSERT( - grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), NULL) - .type == GRPC_OP_COMPLETE); + GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), + NULL).type == GRPC_OP_COMPLETE); grpc_server_destroy(f->servers[i]); f->servers[i] = NULL; } @@ -203,8 +202,8 @@ static void teardown_servers(servers_fixture *f) { if (f->servers[i] == NULL) continue; grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000)); GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), - n_millis_time(5000), NULL) - .type == GRPC_OP_COMPLETE); + n_millis_time(5000), + NULL).type == GRPC_OP_COMPLETE); grpc_server_destroy(f->servers[i]); } grpc_completion_queue_shutdown(f->cq); @@ -304,8 +303,8 @@ int *perform_request(servers_fixture *f, grpc_channel *client, s_idx = -1; while ((ev = grpc_completion_queue_next( - f->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), NULL)) - .type != GRPC_QUEUE_TIMEOUT) { + f->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), NULL)).type != + GRPC_QUEUE_TIMEOUT) { GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); read_tag = ((int)(gpr_intptr)ev.tag); gpr_log(GPR_DEBUG, "EVENT: success:%d, type:%d, tag:%d iter:%d", diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c index 13517b7c1f..ceca56c833 100644 --- a/test/core/client_config/set_initial_connect_string_test.c +++ b/test/core/client_config/set_initial_connect_string_test.c @@ -66,15 +66,15 @@ static grpc_closure on_read; static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { GPR_ASSERT(success); - gpr_slice_buffer_move_into( - &state.temp_incoming_buffer, &state.incoming_buffer); + gpr_slice_buffer_move_into(&state.temp_incoming_buffer, + &state.incoming_buffer); if (state.incoming_buffer.length > strlen(magic_connect_string)) { state.done = 1; grpc_endpoint_shutdown(exec_ctx, state.tcp); grpc_endpoint_destroy(exec_ctx, state.tcp); } else { - grpc_endpoint_read( - exec_ctx, state.tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer, + &on_read); } } diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 6123bf061e..dd9a625ac2 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -239,7 +239,8 @@ grpc_pollset_set g_interested_parties; static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) { if (g_state != GRPC_CHANNEL_READY) { grpc_subchannel_notify_on_state_change( - exec_ctx, arg, &g_interested_parties, &g_state, grpc_closure_create(state_changed, arg)); + exec_ctx, arg, &g_interested_parties, &g_state, + grpc_closure_create(state_changed, arg)); } } @@ -253,7 +254,8 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_pollset_init(&pollset); grpc_pollset_set_init(&g_interested_parties); grpc_pollset_set_add_pollset(&exec_ctx, &g_interested_parties, &pollset); - grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, &g_state, + grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, + &g_state, grpc_closure_create(state_changed, c)); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); @@ -330,9 +332,9 @@ static void chttp2_tear_down_micro_fullstack(grpc_end2end_test_fixture *f) { /* All test configurations */ static grpc_end2end_test_config configs[] = { - {"chttp2/micro_fullstack", 0, - chttp2_create_fixture_micro_fullstack, chttp2_init_client_micro_fullstack, - chttp2_init_server_micro_fullstack, chttp2_tear_down_micro_fullstack}, + {"chttp2/micro_fullstack", 0, chttp2_create_fixture_micro_fullstack, + chttp2_init_client_micro_fullstack, chttp2_init_server_micro_fullstack, + chttp2_tear_down_micro_fullstack}, }; int main(int argc, char **argv) { diff --git a/test/core/end2end/tests/hpack_size.c b/test/core/end2end/tests/hpack_size.c index 0b1a9faad8..f16883ecfd 100644 --- a/test/core/end2end/tests/hpack_size.c +++ b/test/core/end2end/tests/hpack_size.c @@ -262,9 +262,9 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL) - .type == GRPC_OP_COMPLETE); + GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), + NULL).type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; } -- cgit v1.2.3 From f036a64303e04324d791262188c69c1b4de9a5dd Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 1 Dec 2015 17:00:40 -0800 Subject: Memory fixes --- src/core/channel/client_channel.c | 2 -- src/core/channel/client_uchannel.c | 17 +++++------------ src/core/client_config/lb_policies/pick_first.c | 9 ++++----- src/core/client_config/subchannel.c | 12 +++++++----- src/core/client_config/subchannel.h | 2 +- test/core/end2end/fixtures/h2_uchannel.c | 4 ++++ test/core/end2end/tests/channel_connectivity.c | 3 ++- 7 files changed, 23 insertions(+), 26 deletions(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 60f2acb9ef..da0fdba643 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -244,7 +244,6 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op) { - grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; @@ -262,7 +261,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, op->connectivity_state = NULL; } - lb_policy = chand->lb_policy; if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index f0682d5946..ffd7a1993e 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -85,7 +85,7 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, chand->subchannel_connectivity, "uchannel_monitor_subchannel"); grpc_connected_subchannel_notify_on_state_change( - exec_ctx, chand->connected_subchannel, &chand->subchannel_connectivity, + exec_ctx, chand->connected_subchannel, NULL, &chand->subchannel_connectivity, &chand->connectivity_cb); } @@ -168,9 +168,10 @@ static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; /* cancel subscription */ grpc_connected_subchannel_notify_on_state_change( - exec_ctx, chand->connected_subchannel, NULL, &chand->connectivity_cb); + exec_ctx, chand->connected_subchannel, NULL, NULL, &chand->connectivity_cb); grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); gpr_mu_destroy(&chand->mu_state); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, chand->connected_subchannel, "uchannel"); } static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, @@ -190,17 +191,8 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { channel_data *chand = elem->channel_data; grpc_connectivity_state out; - out = grpc_connectivity_state_check(&chand->state_tracker); gpr_mu_lock(&chand->mu_state); - if (out == GRPC_CHANNEL_IDLE && try_to_connect) { - grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, - GRPC_CHANNEL_CONNECTING, - "uchannel_connecting_changed"); - chand->subchannel_connectivity = out; - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, chand->connected_subchannel, &chand->subchannel_connectivity, - &chand->connectivity_cb); - } + out = grpc_connectivity_state_check(&chand->state_tracker); gpr_mu_unlock(&chand->mu_state); return out; } @@ -244,5 +236,6 @@ void grpc_client_uchannel_set_connected_subchannel( GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); gpr_mu_lock(&chand->mu_state); chand->connected_subchannel = connected_subchannel; + GRPC_CONNECTED_SUBCHANNEL_REF(connected_subchannel, "uchannel"); gpr_mu_unlock(&chand->mu_state); } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 3bfa7a86fb..353a54192c 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -101,9 +101,10 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { p->pending_picks = NULL; grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); + /* cancel subscription */ if (p->selected != NULL) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, NULL, &p->connectivity_changed); + exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); } else { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, @@ -198,13 +199,11 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, size_t i; size_t num_subchannels = p->num_subchannels; grpc_subchannel **subchannels; - grpc_connected_subchannel *exclude_subchannel; gpr_mu_lock(&p->mu); subchannels = p->subchannels; p->num_subchannels = 0; p->subchannels = NULL; - exclude_subchannel = p->selected; gpr_mu_unlock(&p->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); @@ -236,7 +235,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, &p->checking_connectivity, + exec_ctx, p->selected, &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); @@ -266,7 +265,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( - exec_ctx, p->selected, &p->checking_connectivity, + exec_ctx, p->selected, &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index d89a5e9be1..5d9db29f5e 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -266,7 +266,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); - if (old_refs == 0) { + if (old_refs == 1) { grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), 1); } @@ -426,7 +426,7 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, sw->connectivity_state, "reflect_child"); if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), + exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL, &sw->connectivity_state, &sw->closure); GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); sw = NULL; @@ -440,6 +440,7 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *closure) { grpc_transport_op op; @@ -447,14 +448,15 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, memset(&op, 0, sizeof(op)); op.connectivity_state = state; op.on_connectivity_state_change = closure; + op.bind_pollset_set = interested_parties; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); elem->filter->start_transport_op(exec_ctx, elem, &op); } void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_connectivity_state *state, grpc_closure *closure) { - connected_subchannel_state_op(exec_ctx, con, state, closure); + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *closure) { + connected_subchannel_state_op(exec_ctx, con, interested_parties, state, closure); } static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { @@ -512,7 +514,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( - exec_ctx, con, &sw_subchannel->connectivity_state, + exec_ctx, con, &c->pollset_set, &sw_subchannel->connectivity_state, &sw_subchannel->closure); /* signal completion */ diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index e45708f91d..1050776815 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -122,7 +122,7 @@ void grpc_subchannel_notify_on_state_change( grpc_closure *notify); void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, - grpc_connectivity_state *state, grpc_closure *notify); + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify); /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index dd9a625ac2..ea630c3275 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -162,6 +162,9 @@ static grpc_subchannel *subchannel_factory_create_subchannel( s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); + if (*f->sniffed_subchannel) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, *f->sniffed_subchannel, "sniffed"); + } *f->sniffed_subchannel = s; GRPC_SUBCHANNEL_REF(s, "sniffed"); return s; @@ -224,6 +227,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_micro_fullstack( micro_fullstack_fixture_data *ffd = gpr_malloc(sizeof(micro_fullstack_fixture_data)); memset(&f, 0, sizeof(f)); + memset(ffd, 0, sizeof(*ffd)); gpr_join_host_port(&ffd->localaddr, "127.0.0.1", port); diff --git a/test/core/end2end/tests/channel_connectivity.c b/test/core/end2end/tests/channel_connectivity.c index 46085bbdaf..e204185251 100644 --- a/test/core/end2end/tests/channel_connectivity.c +++ b/test/core/end2end/tests/channel_connectivity.c @@ -153,7 +153,8 @@ static void test_connectivity(grpc_end2end_test_config config) { cq_verify(cqv); state = grpc_channel_check_connectivity_state(f.client, 0); GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE || - state == GRPC_CHANNEL_CONNECTING); + state == GRPC_CHANNEL_CONNECTING || + state == GRPC_CHANNEL_IDLE); /* cleanup server */ grpc_server_destroy(f.server); -- cgit v1.2.3 From f62c4d5a988f37d812373580137ff69a77305102 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 4 Dec 2015 07:43:07 -0800 Subject: Test repeatability fixes --- src/core/channel/client_channel.c | 2 ++ src/core/security/credentials.c | 14 ++++++-------- test/core/end2end/tests/high_initial_seqno.c | 10 ++++++++-- 3 files changed, 16 insertions(+), 10 deletions(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index da0fdba643..9f993b39d6 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -124,6 +124,8 @@ static void on_lb_policy_state_changed_locked( w->chand->resolver != NULL) { publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); + GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); + w->chand->lb_policy = NULL; } grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state, "lb_changed"); diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index 543c75044b..38a6710cd4 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -39,7 +39,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/http_client_filter.h" #include "src/core/httpcli/httpcli.h" -#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/executor.h" #include "src/core/json/json.h" #include "src/core/support/string.h" #include "src/core/surface/api_trace.h" @@ -48,7 +48,6 @@ #include #include #include -#include #include /* -- Common. -- */ @@ -792,15 +791,14 @@ static void md_only_test_destruct(grpc_call_credentials *creds) { grpc_credentials_md_store_unref(c->md_store); } -static void on_simulated_token_fetch_done(void *user_data) { +static void on_simulated_token_fetch_done(grpc_exec_ctx *exec_ctx, + void *user_data, int success) { grpc_credentials_metadata_request *r = (grpc_credentials_metadata_request *)user_data; grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - r->cb(&exec_ctx, r->user_data, c->md_store->entries, c->md_store->num_entries, + r->cb(exec_ctx, r->user_data, c->md_store->entries, c->md_store->num_entries, GRPC_CREDENTIALS_OK); grpc_credentials_metadata_request_destroy(r); - grpc_exec_ctx_finish(&exec_ctx); } static void md_only_test_get_request_metadata( @@ -810,10 +808,10 @@ static void md_only_test_get_request_metadata( grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)creds; if (c->is_async) { - gpr_thd_id thd_id; grpc_credentials_metadata_request *cb_arg = grpc_credentials_metadata_request_create(creds, cb, user_data); - gpr_thd_new(&thd_id, on_simulated_token_fetch_done, cb_arg, NULL); + grpc_executor_enqueue( + grpc_closure_create(on_simulated_token_fetch_done, cb_arg), 1); } else { cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK); } diff --git a/test/core/end2end/tests/high_initial_seqno.c b/test/core/end2end/tests/high_initial_seqno.c index 578fdf7b35..399b6e2183 100644 --- a/test/core/end2end/tests/high_initial_seqno.c +++ b/test/core/end2end/tests/high_initial_seqno.c @@ -36,13 +36,15 @@ #include #include -#include "src/core/support/string.h" #include #include #include #include +#include #include #include + +#include "src/core/support/string.h" #include "test/core/end2end/cq_verifier.h" enum { TIMEOUT = 200000 }; @@ -208,6 +210,7 @@ static void test_invoke_10_simple_requests(grpc_end2end_test_config config, grpc_end2end_test_fixture f; grpc_arg client_arg; grpc_channel_args client_args; + char *name; client_arg.type = GRPC_ARG_INTEGER; client_arg.key = GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER; @@ -216,13 +219,16 @@ static void test_invoke_10_simple_requests(grpc_end2end_test_config config, client_args.num_args = 1; client_args.args = &client_arg; - f = begin_test(config, "test_invoke_10_simple_requests", &client_args, NULL); + gpr_asprintf(&name, "test_invoke_requests first_seqno=%d", + initial_sequence_number); + f = begin_test(config, name, &client_args, NULL); for (i = 0; i < 10; i++) { simple_request_body(f); gpr_log(GPR_INFO, "Passed simple request %d", i); } end_test(&f); config.tear_down_data(&f); + gpr_free(name); } void grpc_end2end_tests(grpc_end2end_test_config config) { -- cgit v1.2.3