diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 22 | ||||
-rw-r--r-- | src/core/client_config/client_config.c | 6 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 36 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 27 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 18 | ||||
-rw-r--r-- | src/core/client_config/resolvers/dns_resolver.c | 11 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 161 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 17 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 1 |
9 files changed, 164 insertions, 135 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 19700a90a6..b33ef7842f 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -297,14 +297,14 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_ gpr_mu_lock(&chand->mu_config); lb_policy = chand->lb_policy; if (lb_policy) { - grpc_lb_policy_ref(lb_policy); + GRPC_LB_POLICY_REF(lb_policy, "pick"); gpr_mu_unlock(&chand->mu_config); calld->state = CALL_WAITING_FOR_PICK; gpr_mu_unlock(&calld->mu_state); pick_target(lb_policy, calld); - grpc_lb_policy_unref(lb_policy); + GRPC_LB_POLICY_UNREF(lb_policy, "pick"); } else { calld->state = CALL_WAITING_FOR_CONFIG; add_to_lb_policy_wait_queue_locked_state_config(elem); @@ -332,9 +332,9 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { grpc_resolver *old_resolver; grpc_iomgr_closure *wakeup_closures = NULL; - if (chand->incoming_configuration) { + if (chand->incoming_configuration != NULL) { lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); - grpc_lb_policy_ref(lb_policy); + GRPC_LB_POLICY_REF(lb_policy, "channel"); grpc_client_config_unref(chand->incoming_configuration); } @@ -357,7 +357,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { } if (old_lb_policy) { - grpc_lb_policy_unref(old_lb_policy); + GRPC_LB_POLICY_UNREF(old_lb_policy, "channel"); } if (iomgr_success) { @@ -391,14 +391,14 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op if (!is_empty(op, sizeof(*op))) { lb_policy = chand->lb_policy; if (lb_policy) { - grpc_lb_policy_ref(lb_policy); + GRPC_LB_POLICY_REF(lb_policy, "broadcast"); } } gpr_mu_unlock(&chand->mu_config); if (lb_policy) { grpc_lb_policy_broadcast(lb_policy, op); - grpc_lb_policy_unref(lb_policy); + GRPC_LB_POLICY_UNREF(lb_policy, "broadcast"); } if (on_consumed) { @@ -436,7 +436,7 @@ static void destroy_call_elem(grpc_call_element *elem) { case CALL_ACTIVE: subchannel_call = calld->subchannel_call; gpr_mu_unlock(&calld->mu_state); - GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "channel"); + grpc_subchannel_call_unref(subchannel_call); break; case CALL_CREATED: case CALL_CANCELLED: @@ -472,11 +472,11 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - if (chand->resolver) { + if (chand->resolver != NULL) { grpc_resolver_unref(chand->resolver); } - if (chand->lb_policy) { - grpc_lb_policy_unref(chand->lb_policy); + if (chand->lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); } gpr_mu_destroy(&chand->mu_config); } diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c index bc8dcec54e..4453824148 100644 --- a/src/core/client_config/client_config.c +++ b/src/core/client_config/client_config.c @@ -53,7 +53,7 @@ void grpc_client_config_ref(grpc_client_config *c) { gpr_ref(&c->refs); } void grpc_client_config_unref(grpc_client_config *c) { if (gpr_unref(&c->refs)) { - grpc_lb_policy_unref(c->lb_policy); + GRPC_LB_POLICY_UNREF(c->lb_policy, "client_config"); gpr_free(c); } } @@ -61,10 +61,10 @@ void grpc_client_config_unref(grpc_client_config *c) { void grpc_client_config_set_lb_policy(grpc_client_config *c, grpc_lb_policy *lb_policy) { if (lb_policy) { - grpc_lb_policy_ref(lb_policy); + GRPC_LB_POLICY_REF(lb_policy, "client_config"); } if (c->lb_policy) { - grpc_lb_policy_unref(c->lb_policy); + GRPC_LB_POLICY_UNREF(c->lb_policy, "client_config"); } c->lb_policy = lb_policy; } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index e6e743fa7a..9d6c264e37 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -48,8 +48,6 @@ typedef struct pending_pick { typedef struct { /** base policy: must be first */ grpc_lb_policy base; - /** ref count */ - gpr_refcount refs; /** all our subchannels */ grpc_subchannel **subchannels; size_t num_subchannels; @@ -75,18 +73,15 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -void pf_ref(grpc_lb_policy *pol) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - gpr_ref(&p->refs); -} - -void pf_unref(grpc_lb_policy *pol) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - if (gpr_unref(&p->refs)) { - gpr_free(p->subchannels); - gpr_mu_destroy(&p->mu); - gpr_free(p); +void pf_destroy(grpc_lb_policy *pol) { + pick_first_lb_policy *p = (pick_first_lb_policy*)pol; + size_t i; + for (i = 0; i < p->num_subchannels; i++) { + grpc_subchannel_unref(p->subchannels[i]); } + gpr_free(p->subchannels); + gpr_mu_destroy(&p->mu); + gpr_free(p); } void pf_shutdown(grpc_lb_policy *pol) { @@ -109,7 +104,7 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; - pf_ref(pol); + GRPC_LB_POLICY_REF(pol, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed); } grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset); @@ -173,7 +168,7 @@ loop: p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]); p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); + grpc_subchannel_unref(p->subchannels[p->num_subchannels]); add_interested_parties_locked(p); if (p->num_subchannels == 0) { abort(); @@ -184,7 +179,7 @@ loop: gpr_mu_unlock(&p->mu); if (unref) { - pf_unref(&p->base); + GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity"); } } @@ -199,13 +194,13 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { subchannels = gpr_malloc(n * sizeof(*subchannels)); for (i = 0; i < n; i++) { subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF(subchannels[i], "broadcast"); + grpc_subchannel_ref(subchannels[i]); } gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { grpc_subchannel_process_transport_op(subchannels[i], op); - GRPC_SUBCHANNEL_UNREF(subchannels[i], "broadcast"); + grpc_subchannel_unref(subchannels[i]); } gpr_free(subchannels); } @@ -227,15 +222,14 @@ static void pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_sta } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_ref, pf_unref, pf_shutdown, pf_pick, pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; + pf_destroy, pf_shutdown, pf_pick, pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, size_t num_subchannels) { pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); GPR_ASSERT(num_subchannels); memset(p, 0, sizeof(*p)); - p->base.vtable = &pick_first_lb_policy_vtable; - gpr_ref_init(&p->refs, 1); + grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels); p->num_subchannels = num_subchannels; memcpy(p->subchannels, subchannels, diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 2daba14c2a..dfe21cf443 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -33,10 +33,33 @@ #include "src/core/client_config/lb_policy.h" -void grpc_lb_policy_ref(grpc_lb_policy *policy) { policy->vtable->ref(policy); } +void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { + policy->vtable = vtable; + gpr_ref_init(&policy->refs, 1); +} + +#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG +void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s", + policy, (int)policy->refs.count, (int)policy->refs.count + 1, + reason); +#else +void grpc_lb_policy_ref(grpc_lb_policy *policy) { +#endif + gpr_ref(&policy->refs); +} +#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG +void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s", + policy, (int)policy->refs.count, (int)policy->refs.count - 1, + reason); +#else void grpc_lb_policy_unref(grpc_lb_policy *policy) { - policy->vtable->unref(policy); +#endif + if (gpr_unref(&policy->refs)) { + policy->vtable->destroy(policy); + } } void grpc_lb_policy_shutdown(grpc_lb_policy *policy) { diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index dcefa6e27e..890a89f5c9 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -46,11 +46,11 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; + gpr_refcount refs; }; struct grpc_lb_policy_vtable { - void (*ref)(grpc_lb_policy *policy); - void (*unref)(grpc_lb_policy *policy); + void (*destroy)(grpc_lb_policy *policy); void (*shutdown)(grpc_lb_policy *policy); @@ -70,8 +70,22 @@ struct grpc_lb_policy_vtable { void (*notify_on_state_change)(grpc_lb_policy *policy, grpc_connectivity_state *state, grpc_iomgr_closure *closure); }; +#define GRPC_LB_POLICY_REFCOUNT_DEBUG + +#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG +#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p), __FILE__, __LINE__, (r)) +void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason); +void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, const char *reason); +#else +#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) +#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p)) void grpc_lb_policy_ref(grpc_lb_policy *policy); void grpc_lb_policy_unref(grpc_lb_policy *policy); +#endif + +/** called by concrete implementations to initialize the base struct */ +void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); /** Start shutting down (fail any pending picks) */ void grpc_lb_policy_shutdown(grpc_lb_policy *policy); diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index ba82675275..8693bcf5eb 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -145,6 +145,7 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { grpc_client_config *config = NULL; grpc_subchannel **subchannels; grpc_subchannel_args args; + grpc_lb_policy *lb_policy; size_t i; if (addresses) { config = grpc_client_config_create(); @@ -156,8 +157,9 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { subchannels[i] = grpc_subchannel_factory_create_subchannel( r->subchannel_factory, &args); } - grpc_client_config_set_lb_policy( - config, r->lb_policy_factory(subchannels, addresses->naddrs)); + lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs); + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(lb_policy, "construction"); } gpr_mu_lock(&r->mu); if (r->resolved_config) { @@ -167,6 +169,8 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { r->resolved_version++; dns_maybe_finish_next_locked(r); gpr_mu_unlock(&r->mu); + + dns_unref(&r->base); } static void dns_start_resolving_locked(dns_resolver *r) { @@ -188,6 +192,9 @@ static void dns_maybe_finish_next_locked(dns_resolver *r) { static void dns_destroy(dns_resolver *r) { gpr_mu_destroy(&r->mu); + if (r->resolved_config) { + grpc_client_config_unref(r->resolved_config); + } grpc_subchannel_factory_unref(r->subchannel_factory); gpr_free(r->name); gpr_free(r->default_port); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index e441befc0c..b4da9cda3f 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -42,7 +42,10 @@ #include "src/core/channel/connectivity_state.h" typedef struct { - gpr_refcount refs; + /* all fields protected by subchannel->mu */ + /** refcount */ + int refs; + /** parent subchannel */ grpc_subchannel *subchannel; } connection; @@ -54,7 +57,6 @@ typedef struct waiting_for_connect { } waiting_for_connect; struct grpc_subchannel { - gpr_refcount refs; grpc_connector *connector; /** non-transport related channel filters */ @@ -83,6 +85,8 @@ struct grpc_subchannel { /** active connection */ connection *active; + /** refcount */ + int refs; /** are we connecting */ int connecting; /** things waiting for a connection */ @@ -105,76 +109,76 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(void *subchannel, int iomgr_success); +static void subchannel_ref_locked(grpc_subchannel *c); +static int subchannel_unref_locked(grpc_subchannel *c) GRPC_MUST_USE_RESULT; +static void connection_ref_locked(connection *c); +static grpc_subchannel *connection_unref_locked(connection *c) GRPC_MUST_USE_RESULT; +static void subchannel_destroy(grpc_subchannel *c); + /* * connection implementation */ -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG -#define CONNECTION_REF(c, r) connection_ref((c), __FILE__, __LINE__, (r)) -#define CONNECTION_UNREF(c, r) connection_unref((c), __FILE__, __LINE__, (r)) -#else -#define CONNECTION_REF(c, r) connection_ref((c)) -#define CONNECTION_UNREF(c, r) connection_unref((c)) -#endif - -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG -static void connection_ref(connection *c, const char *file, int line, const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCONN:%p ref %d -> %d %s", - c, (int)c->refs.count, (int)c->refs.count + 1, - reason); -#else -static void connection_ref(connection *c) { -#endif - gpr_ref(&c->refs); +static void connection_destroy(connection *c) { + GPR_ASSERT(c->refs == 0); + grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); + gpr_free(c); } -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG -static void connection_unref(connection *c, const char *file, int line, const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCONN:%p unref %d -> %d %s", - c, (int)c->refs.count, (int)c->refs.count - 1, - reason); -#else -static void connection_unref(connection *c) { -#endif - if (gpr_unref(&c->refs)) { - GRPC_SUBCHANNEL_UNREF(c->subchannel, "connection"); - gpr_free(c); +static void connection_ref_locked(connection *c) { + subchannel_ref_locked(c->subchannel); + ++c->refs; +} + +static grpc_subchannel *connection_unref_locked(connection *c) { + grpc_subchannel *destroy = NULL; + if (subchannel_unref_locked(c->subchannel)) { + destroy = c->subchannel; + } + if (--c->refs == 0 && c->subchannel->active != c) { + connection_destroy(c); } + return destroy; } + /* * grpc_subchannel implementation */ -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG -void grpc_subchannel_ref(grpc_subchannel *c, const char *file, int line, const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHAN:%p ref %d -> %d %s", - c, (int)c->refs.count, (int)c->refs.count + 1, - reason); -#else -void grpc_subchannel_ref(grpc_subchannel *c) { -#endif - gpr_ref(&c->refs); +static void subchannel_ref_locked(grpc_subchannel *c) { + ++c->refs; +} + +static int subchannel_unref_locked(grpc_subchannel *c) { + return --c->refs == 0; +} + +void grpc_subchannel_ref(grpc_subchannel *c) { + gpr_mu_lock(&c->mu); + subchannel_ref_locked(c); + gpr_mu_unlock(&c->mu); } -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG -void grpc_subchannel_unref(grpc_subchannel *c, const char *file, int line, const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHAN:%p unref %d -> %d %s", - c, (int)c->refs.count, (int)c->refs.count - 1, - reason); -#else void grpc_subchannel_unref(grpc_subchannel *c) { -#endif - if (gpr_unref(&c->refs)) { - if (c->active != NULL) CONNECTION_UNREF(c->active, "subchannel"); - gpr_free(c->filters); - grpc_channel_args_destroy(c->args); - gpr_free(c->addr); - grpc_mdctx_unref(c->mdctx); - grpc_pollset_set_destroy(&c->pollset_set); - grpc_connectivity_state_destroy(&c->state_tracker); - gpr_free(c); - } + int destroy; + gpr_mu_lock(&c->mu); + destroy = subchannel_unref_locked(c); + gpr_mu_unlock(&c->mu); + if (destroy) subchannel_destroy(c); +} + +static void subchannel_destroy(grpc_subchannel *c) { + if (c->active != NULL) { + connection_destroy(c->active); + } + gpr_free(c->filters); + grpc_channel_args_destroy(c->args); + gpr_free(c->addr); + grpc_mdctx_unref(c->mdctx); + grpc_pollset_set_destroy(&c->pollset_set); + grpc_connectivity_state_destroy(&c->state_tracker); + gpr_free(c); } void grpc_subchannel_add_interested_party(grpc_subchannel *c, @@ -191,7 +195,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_subchannel_args *args) { grpc_subchannel *c = gpr_malloc(sizeof(*c)); memset(c, 0, sizeof(*c)); - gpr_ref_init(&c->refs, 1); + c->refs = 1; c->connector = connector; grpc_connector_ref(c->connector); c->num_filters = args->filter_count; @@ -232,7 +236,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, gpr_mu_lock(&c->mu); if (c->active != NULL) { con = c->active; - CONNECTION_REF(con, "call"); + connection_ref_locked(con); gpr_mu_unlock(&c->mu); *target = create_call(con, initial_op); @@ -248,7 +252,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, if (!c->connecting) { c->connecting = 1; connectivity_state_changed_locked(c); - GRPC_SUBCHANNEL_REF(c, "connection"); + subchannel_ref_locked(c); gpr_mu_unlock(&c->mu); start_connect(c); @@ -274,7 +278,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) { do_connect = 1; c->connecting = 1; - GRPC_SUBCHANNEL_REF(c, "connection"); + subchannel_ref_locked(c); grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c)); } gpr_mu_unlock(&c->mu); @@ -294,6 +298,7 @@ static void publish_transport(grpc_subchannel *c) { size_t num_filters; const grpc_channel_filter **filters; waiting_for_connect *w4c; + int destroy; num_filters = c->num_filters + c->connecting_result.num_filters + 1; filters = gpr_malloc(sizeof(*filters) * num_filters); @@ -305,7 +310,7 @@ static void publish_transport(grpc_subchannel *c) { con = gpr_malloc(sizeof(connection) + channel_stack_size); stk = (grpc_channel_stack *)(con + 1); - gpr_ref_init(&con->refs, 1); + con->refs = 0; con->subchannel = c; grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); @@ -319,9 +324,14 @@ static void publish_transport(grpc_subchannel *c) { while ((w4c = c->waiting)) { abort(); /* not implemented */ } + destroy = subchannel_unref_locked(c); gpr_mu_unlock(&c->mu); gpr_free(filters); + + if (destroy) { + subchannel_destroy(c); + } } static void subchannel_connected(void *arg, int iomgr_success) { @@ -329,7 +339,11 @@ static void subchannel_connected(void *arg, int iomgr_success) { if (c->connecting_result.transport) { publish_transport(c); } else { - GRPC_SUBCHANNEL_UNREF(c, "connection"); + int destroy; + gpr_mu_lock(&c->mu); + destroy = subchannel_unref_locked(c); + gpr_mu_unlock(&c->mu); + if (destroy) subchannel_destroy(c); /* TODO(ctiller): retry after sleeping */ abort(); } @@ -358,29 +372,22 @@ static void connectivity_state_changed_locked(grpc_subchannel *c) { * grpc_subchannel_call implementation */ -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG -void grpc_subchannel_call_ref(grpc_subchannel_call *c, const char *file, int line, const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCALL:%p ref %d -> %d %s", - c, (int)c->refs.count, (int)c->refs.count + 1, - reason); -#else void grpc_subchannel_call_ref(grpc_subchannel_call *c) { -#endif gpr_ref(&c->refs); } -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG -void grpc_subchannel_call_unref(grpc_subchannel_call *c, const char *file, int line, const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCALL:%p unref %d -> %d %s", - c, (int)c->refs.count, (int)c->refs.count - 1, - reason); -#else void grpc_subchannel_call_unref(grpc_subchannel_call *c) { -#endif if (gpr_unref(&c->refs)) { + gpr_mu *mu = &c->connection->subchannel->mu; + grpc_subchannel *destroy; grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c)); - CONNECTION_UNREF(c->connection, "call"); + gpr_mu_lock(mu); + destroy = connection_unref_locked(c->connection); + gpr_mu_unlock(mu); gpr_free(c); + if (destroy) { + subchannel_destroy(destroy); + } } } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 7cdcccce8f..8155aba14c 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -37,33 +37,16 @@ #include "src/core/channel/channel_stack.h" #include "src/core/client_config/connector.h" -#define GRPC_SUBCHANNEL_REFCOUNT_DEBUG - /** A (sub-)channel that knows how to connect to exactly one target address. Provides a target for load balancing. */ typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG -#define GRPC_SUBCHANNEL_REF(c, r) grpc_subchannel_ref((c), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_UNREF(c, r) grpc_subchannel_unref((c), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_CALL_REF(c, r) grpc_subchannel_call_ref((c), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_CALL_UNREF(c, r) grpc_subchannel_call_unref((c), __FILE__, __LINE__, (r)) -void grpc_subchannel_ref(grpc_subchannel *channel, const char *file, int line, const char *reason); -void grpc_subchannel_unref(grpc_subchannel *channel, const char *file, int line, const char *reason); -void grpc_subchannel_call_ref(grpc_subchannel_call *call, const char *file, int line, const char *reason); -void grpc_subchannel_call_unref(grpc_subchannel_call *call, const char *file, int line, const char *reason); -#else -#define GRPC_SUBCHANNEL_REF(c, r) grpc_subchannel_ref((c)) -#define GRPC_SUBCHANNEL_UNREF(c, r) grpc_subchannel_unref((c)) -#define GRPC_SUBCHANNEL_CALL_REF(c, r) grpc_subchannel_call_ref((c)) -#define GRPC_SUBCHANNEL_CALL_UNREF(c, r) grpc_subchannel_call_unref((c)) void grpc_subchannel_ref(grpc_subchannel *channel); void grpc_subchannel_unref(grpc_subchannel *channel); void grpc_subchannel_call_ref(grpc_subchannel_call *call); void grpc_subchannel_call_unref(grpc_subchannel_call *call); -#endif /** construct a call (possibly asynchronously) */ void grpc_subchannel_create_call(grpc_subchannel *subchannel, diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index a00c08a830..494a44725a 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -163,6 +163,7 @@ grpc_channel *grpc_channel_create(const char *target, channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); + grpc_resolver_unref(resolver); return channel; } |