aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-28 11:41:09 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-28 11:41:09 -0700
commitd7b68e72f55e34cafd0042623b707814a6c2b64e (patch)
tree10c66aaba11704a67a9196aa0b6b8ca15c5153f2 /src
parentca3e9d3e57a664b63b46d31a22b79a3fe2e3738f (diff)
Simple request unsecure passes with new client_config code
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/client_channel.c22
-rw-r--r--src/core/client_config/client_config.c6
-rw-r--r--src/core/client_config/lb_policies/pick_first.c36
-rw-r--r--src/core/client_config/lb_policy.c27
-rw-r--r--src/core/client_config/lb_policy.h18
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c11
-rw-r--r--src/core/client_config/subchannel.c161
-rw-r--r--src/core/client_config/subchannel.h17
-rw-r--r--src/core/surface/channel_create.c1
9 files changed, 164 insertions, 135 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 19700a90a6..b33ef7842f 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -297,14 +297,14 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_
gpr_mu_lock(&chand->mu_config);
lb_policy = chand->lb_policy;
if (lb_policy) {
- grpc_lb_policy_ref(lb_policy);
+ GRPC_LB_POLICY_REF(lb_policy, "pick");
gpr_mu_unlock(&chand->mu_config);
calld->state = CALL_WAITING_FOR_PICK;
gpr_mu_unlock(&calld->mu_state);
pick_target(lb_policy, calld);
- grpc_lb_policy_unref(lb_policy);
+ GRPC_LB_POLICY_UNREF(lb_policy, "pick");
} else {
calld->state = CALL_WAITING_FOR_CONFIG;
add_to_lb_policy_wait_queue_locked_state_config(elem);
@@ -332,9 +332,9 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
grpc_resolver *old_resolver;
grpc_iomgr_closure *wakeup_closures = NULL;
- if (chand->incoming_configuration) {
+ if (chand->incoming_configuration != NULL) {
lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
- grpc_lb_policy_ref(lb_policy);
+ GRPC_LB_POLICY_REF(lb_policy, "channel");
grpc_client_config_unref(chand->incoming_configuration);
}
@@ -357,7 +357,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
if (old_lb_policy) {
- grpc_lb_policy_unref(old_lb_policy);
+ GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
}
if (iomgr_success) {
@@ -391,14 +391,14 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op
if (!is_empty(op, sizeof(*op))) {
lb_policy = chand->lb_policy;
if (lb_policy) {
- grpc_lb_policy_ref(lb_policy);
+ GRPC_LB_POLICY_REF(lb_policy, "broadcast");
}
}
gpr_mu_unlock(&chand->mu_config);
if (lb_policy) {
grpc_lb_policy_broadcast(lb_policy, op);
- grpc_lb_policy_unref(lb_policy);
+ GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
}
if (on_consumed) {
@@ -436,7 +436,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
case CALL_ACTIVE:
subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
- GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "channel");
+ grpc_subchannel_call_unref(subchannel_call);
break;
case CALL_CREATED:
case CALL_CANCELLED:
@@ -472,11 +472,11 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- if (chand->resolver) {
+ if (chand->resolver != NULL) {
grpc_resolver_unref(chand->resolver);
}
- if (chand->lb_policy) {
- grpc_lb_policy_unref(chand->lb_policy);
+ if (chand->lb_policy != NULL) {
+ GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
}
gpr_mu_destroy(&chand->mu_config);
}
diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c
index bc8dcec54e..4453824148 100644
--- a/src/core/client_config/client_config.c
+++ b/src/core/client_config/client_config.c
@@ -53,7 +53,7 @@ void grpc_client_config_ref(grpc_client_config *c) { gpr_ref(&c->refs); }
void grpc_client_config_unref(grpc_client_config *c) {
if (gpr_unref(&c->refs)) {
- grpc_lb_policy_unref(c->lb_policy);
+ GRPC_LB_POLICY_UNREF(c->lb_policy, "client_config");
gpr_free(c);
}
}
@@ -61,10 +61,10 @@ void grpc_client_config_unref(grpc_client_config *c) {
void grpc_client_config_set_lb_policy(grpc_client_config *c,
grpc_lb_policy *lb_policy) {
if (lb_policy) {
- grpc_lb_policy_ref(lb_policy);
+ GRPC_LB_POLICY_REF(lb_policy, "client_config");
}
if (c->lb_policy) {
- grpc_lb_policy_unref(c->lb_policy);
+ GRPC_LB_POLICY_UNREF(c->lb_policy, "client_config");
}
c->lb_policy = lb_policy;
}
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index e6e743fa7a..9d6c264e37 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -48,8 +48,6 @@ typedef struct pending_pick {
typedef struct {
/** base policy: must be first */
grpc_lb_policy base;
- /** ref count */
- gpr_refcount refs;
/** all our subchannels */
grpc_subchannel **subchannels;
size_t num_subchannels;
@@ -75,18 +73,15 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
-void pf_ref(grpc_lb_policy *pol) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- gpr_ref(&p->refs);
-}
-
-void pf_unref(grpc_lb_policy *pol) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- if (gpr_unref(&p->refs)) {
- gpr_free(p->subchannels);
- gpr_mu_destroy(&p->mu);
- gpr_free(p);
+void pf_destroy(grpc_lb_policy *pol) {
+ pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ size_t i;
+ for (i = 0; i < p->num_subchannels; i++) {
+ grpc_subchannel_unref(p->subchannels[i]);
}
+ gpr_free(p->subchannels);
+ gpr_mu_destroy(&p->mu);
+ gpr_free(p);
}
void pf_shutdown(grpc_lb_policy *pol) {
@@ -109,7 +104,7 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
p->started_picking = 1;
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
- pf_ref(pol);
+ GRPC_LB_POLICY_REF(pol, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed);
}
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset);
@@ -173,7 +168,7 @@ loop:
p->checking_subchannel %= p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]);
p->num_subchannels--;
- GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
+ grpc_subchannel_unref(p->subchannels[p->num_subchannels]);
add_interested_parties_locked(p);
if (p->num_subchannels == 0) {
abort();
@@ -184,7 +179,7 @@ loop:
gpr_mu_unlock(&p->mu);
if (unref) {
- pf_unref(&p->base);
+ GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
}
}
@@ -199,13 +194,13 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
subchannels = gpr_malloc(n * sizeof(*subchannels));
for (i = 0; i < n; i++) {
subchannels[i] = p->subchannels[i];
- GRPC_SUBCHANNEL_REF(subchannels[i], "broadcast");
+ grpc_subchannel_ref(subchannels[i]);
}
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
grpc_subchannel_process_transport_op(subchannels[i], op);
- GRPC_SUBCHANNEL_UNREF(subchannels[i], "broadcast");
+ grpc_subchannel_unref(subchannels[i]);
}
gpr_free(subchannels);
}
@@ -227,15 +222,14 @@ static void pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_sta
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_ref, pf_unref, pf_shutdown, pf_pick, pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
+ pf_destroy, pf_shutdown, pf_pick, pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
size_t num_subchannels) {
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
GPR_ASSERT(num_subchannels);
memset(p, 0, sizeof(*p));
- p->base.vtable = &pick_first_lb_policy_vtable;
- gpr_ref_init(&p->refs, 1);
+ grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels);
p->num_subchannels = num_subchannels;
memcpy(p->subchannels, subchannels,
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 2daba14c2a..dfe21cf443 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -33,10 +33,33 @@
#include "src/core/client_config/lb_policy.h"
-void grpc_lb_policy_ref(grpc_lb_policy *policy) { policy->vtable->ref(policy); }
+void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) {
+ policy->vtable = vtable;
+ gpr_ref_init(&policy->refs, 1);
+}
+
+#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
+void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s",
+ policy, (int)policy->refs.count, (int)policy->refs.count + 1,
+ reason);
+#else
+void grpc_lb_policy_ref(grpc_lb_policy *policy) {
+#endif
+ gpr_ref(&policy->refs);
+}
+#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
+void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s",
+ policy, (int)policy->refs.count, (int)policy->refs.count - 1,
+ reason);
+#else
void grpc_lb_policy_unref(grpc_lb_policy *policy) {
- policy->vtable->unref(policy);
+#endif
+ if (gpr_unref(&policy->refs)) {
+ policy->vtable->destroy(policy);
+ }
}
void grpc_lb_policy_shutdown(grpc_lb_policy *policy) {
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index dcefa6e27e..890a89f5c9 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -46,11 +46,11 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel,
struct grpc_lb_policy {
const grpc_lb_policy_vtable *vtable;
+ gpr_refcount refs;
};
struct grpc_lb_policy_vtable {
- void (*ref)(grpc_lb_policy *policy);
- void (*unref)(grpc_lb_policy *policy);
+ void (*destroy)(grpc_lb_policy *policy);
void (*shutdown)(grpc_lb_policy *policy);
@@ -70,8 +70,22 @@ struct grpc_lb_policy_vtable {
void (*notify_on_state_change)(grpc_lb_policy *policy, grpc_connectivity_state *state, grpc_iomgr_closure *closure);
};
+#define GRPC_LB_POLICY_REFCOUNT_DEBUG
+
+#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
+#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p), __FILE__, __LINE__, (r))
+void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason);
+void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, const char *reason);
+#else
+#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
+#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p))
void grpc_lb_policy_ref(grpc_lb_policy *policy);
void grpc_lb_policy_unref(grpc_lb_policy *policy);
+#endif
+
+/** called by concrete implementations to initialize the base struct */
+void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable);
/** Start shutting down (fail any pending picks) */
void grpc_lb_policy_shutdown(grpc_lb_policy *policy);
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index ba82675275..8693bcf5eb 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -145,6 +145,7 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
grpc_client_config *config = NULL;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
+ grpc_lb_policy *lb_policy;
size_t i;
if (addresses) {
config = grpc_client_config_create();
@@ -156,8 +157,9 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
subchannels[i] = grpc_subchannel_factory_create_subchannel(
r->subchannel_factory, &args);
}
- grpc_client_config_set_lb_policy(
- config, r->lb_policy_factory(subchannels, addresses->naddrs));
+ lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs);
+ grpc_client_config_set_lb_policy(config, lb_policy);
+ GRPC_LB_POLICY_UNREF(lb_policy, "construction");
}
gpr_mu_lock(&r->mu);
if (r->resolved_config) {
@@ -167,6 +169,8 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
r->resolved_version++;
dns_maybe_finish_next_locked(r);
gpr_mu_unlock(&r->mu);
+
+ dns_unref(&r->base);
}
static void dns_start_resolving_locked(dns_resolver *r) {
@@ -188,6 +192,9 @@ static void dns_maybe_finish_next_locked(dns_resolver *r) {
static void dns_destroy(dns_resolver *r) {
gpr_mu_destroy(&r->mu);
+ if (r->resolved_config) {
+ grpc_client_config_unref(r->resolved_config);
+ }
grpc_subchannel_factory_unref(r->subchannel_factory);
gpr_free(r->name);
gpr_free(r->default_port);
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index e441befc0c..b4da9cda3f 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -42,7 +42,10 @@
#include "src/core/channel/connectivity_state.h"
typedef struct {
- gpr_refcount refs;
+ /* all fields protected by subchannel->mu */
+ /** refcount */
+ int refs;
+ /** parent subchannel */
grpc_subchannel *subchannel;
} connection;
@@ -54,7 +57,6 @@ typedef struct waiting_for_connect {
} waiting_for_connect;
struct grpc_subchannel {
- gpr_refcount refs;
grpc_connector *connector;
/** non-transport related channel filters */
@@ -83,6 +85,8 @@ struct grpc_subchannel {
/** active connection */
connection *active;
+ /** refcount */
+ int refs;
/** are we connecting */
int connecting;
/** things waiting for a connection */
@@ -105,76 +109,76 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(void *subchannel, int iomgr_success);
+static void subchannel_ref_locked(grpc_subchannel *c);
+static int subchannel_unref_locked(grpc_subchannel *c) GRPC_MUST_USE_RESULT;
+static void connection_ref_locked(connection *c);
+static grpc_subchannel *connection_unref_locked(connection *c) GRPC_MUST_USE_RESULT;
+static void subchannel_destroy(grpc_subchannel *c);
+
/*
* connection implementation
*/
-#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
-#define CONNECTION_REF(c, r) connection_ref((c), __FILE__, __LINE__, (r))
-#define CONNECTION_UNREF(c, r) connection_unref((c), __FILE__, __LINE__, (r))
-#else
-#define CONNECTION_REF(c, r) connection_ref((c))
-#define CONNECTION_UNREF(c, r) connection_unref((c))
-#endif
-
-#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
-static void connection_ref(connection *c, const char *file, int line, const char *reason) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCONN:%p ref %d -> %d %s",
- c, (int)c->refs.count, (int)c->refs.count + 1,
- reason);
-#else
-static void connection_ref(connection *c) {
-#endif
- gpr_ref(&c->refs);
+static void connection_destroy(connection *c) {
+ GPR_ASSERT(c->refs == 0);
+ grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
+ gpr_free(c);
}
-#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
-static void connection_unref(connection *c, const char *file, int line, const char *reason) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCONN:%p unref %d -> %d %s",
- c, (int)c->refs.count, (int)c->refs.count - 1,
- reason);
-#else
-static void connection_unref(connection *c) {
-#endif
- if (gpr_unref(&c->refs)) {
- GRPC_SUBCHANNEL_UNREF(c->subchannel, "connection");
- gpr_free(c);
+static void connection_ref_locked(connection *c) {
+ subchannel_ref_locked(c->subchannel);
+ ++c->refs;
+}
+
+static grpc_subchannel *connection_unref_locked(connection *c) {
+ grpc_subchannel *destroy = NULL;
+ if (subchannel_unref_locked(c->subchannel)) {
+ destroy = c->subchannel;
+ }
+ if (--c->refs == 0 && c->subchannel->active != c) {
+ connection_destroy(c);
}
+ return destroy;
}
+
/*
* grpc_subchannel implementation
*/
-#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
-void grpc_subchannel_ref(grpc_subchannel *c, const char *file, int line, const char *reason) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHAN:%p ref %d -> %d %s",
- c, (int)c->refs.count, (int)c->refs.count + 1,
- reason);
-#else
-void grpc_subchannel_ref(grpc_subchannel *c) {
-#endif
- gpr_ref(&c->refs);
+static void subchannel_ref_locked(grpc_subchannel *c) {
+ ++c->refs;
+}
+
+static int subchannel_unref_locked(grpc_subchannel *c) {
+ return --c->refs == 0;
+}
+
+void grpc_subchannel_ref(grpc_subchannel *c) {
+ gpr_mu_lock(&c->mu);
+ subchannel_ref_locked(c);
+ gpr_mu_unlock(&c->mu);
}
-#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
-void grpc_subchannel_unref(grpc_subchannel *c, const char *file, int line, const char *reason) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHAN:%p unref %d -> %d %s",
- c, (int)c->refs.count, (int)c->refs.count - 1,
- reason);
-#else
void grpc_subchannel_unref(grpc_subchannel *c) {
-#endif
- if (gpr_unref(&c->refs)) {
- if (c->active != NULL) CONNECTION_UNREF(c->active, "subchannel");
- gpr_free(c->filters);
- grpc_channel_args_destroy(c->args);
- gpr_free(c->addr);
- grpc_mdctx_unref(c->mdctx);
- grpc_pollset_set_destroy(&c->pollset_set);
- grpc_connectivity_state_destroy(&c->state_tracker);
- gpr_free(c);
- }
+ int destroy;
+ gpr_mu_lock(&c->mu);
+ destroy = subchannel_unref_locked(c);
+ gpr_mu_unlock(&c->mu);
+ if (destroy) subchannel_destroy(c);
+}
+
+static void subchannel_destroy(grpc_subchannel *c) {
+ if (c->active != NULL) {
+ connection_destroy(c->active);
+ }
+ gpr_free(c->filters);
+ grpc_channel_args_destroy(c->args);
+ gpr_free(c->addr);
+ grpc_mdctx_unref(c->mdctx);
+ grpc_pollset_set_destroy(&c->pollset_set);
+ grpc_connectivity_state_destroy(&c->state_tracker);
+ gpr_free(c);
}
void grpc_subchannel_add_interested_party(grpc_subchannel *c,
@@ -191,7 +195,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
grpc_subchannel_args *args) {
grpc_subchannel *c = gpr_malloc(sizeof(*c));
memset(c, 0, sizeof(*c));
- gpr_ref_init(&c->refs, 1);
+ c->refs = 1;
c->connector = connector;
grpc_connector_ref(c->connector);
c->num_filters = args->filter_count;
@@ -232,7 +236,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
con = c->active;
- CONNECTION_REF(con, "call");
+ connection_ref_locked(con);
gpr_mu_unlock(&c->mu);
*target = create_call(con, initial_op);
@@ -248,7 +252,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
if (!c->connecting) {
c->connecting = 1;
connectivity_state_changed_locked(c);
- GRPC_SUBCHANNEL_REF(c, "connection");
+ subchannel_ref_locked(c);
gpr_mu_unlock(&c->mu);
start_connect(c);
@@ -274,7 +278,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) {
do_connect = 1;
c->connecting = 1;
- GRPC_SUBCHANNEL_REF(c, "connection");
+ subchannel_ref_locked(c);
grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c));
}
gpr_mu_unlock(&c->mu);
@@ -294,6 +298,7 @@ static void publish_transport(grpc_subchannel *c) {
size_t num_filters;
const grpc_channel_filter **filters;
waiting_for_connect *w4c;
+ int destroy;
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
filters = gpr_malloc(sizeof(*filters) * num_filters);
@@ -305,7 +310,7 @@ static void publish_transport(grpc_subchannel *c) {
con = gpr_malloc(sizeof(connection) + channel_stack_size);
stk = (grpc_channel_stack *)(con + 1);
- gpr_ref_init(&con->refs, 1);
+ con->refs = 0;
con->subchannel = c;
grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
@@ -319,9 +324,14 @@ static void publish_transport(grpc_subchannel *c) {
while ((w4c = c->waiting)) {
abort(); /* not implemented */
}
+ destroy = subchannel_unref_locked(c);
gpr_mu_unlock(&c->mu);
gpr_free(filters);
+
+ if (destroy) {
+ subchannel_destroy(c);
+ }
}
static void subchannel_connected(void *arg, int iomgr_success) {
@@ -329,7 +339,11 @@ static void subchannel_connected(void *arg, int iomgr_success) {
if (c->connecting_result.transport) {
publish_transport(c);
} else {
- GRPC_SUBCHANNEL_UNREF(c, "connection");
+ int destroy;
+ gpr_mu_lock(&c->mu);
+ destroy = subchannel_unref_locked(c);
+ gpr_mu_unlock(&c->mu);
+ if (destroy) subchannel_destroy(c);
/* TODO(ctiller): retry after sleeping */
abort();
}
@@ -358,29 +372,22 @@ static void connectivity_state_changed_locked(grpc_subchannel *c) {
* grpc_subchannel_call implementation
*/
-#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
-void grpc_subchannel_call_ref(grpc_subchannel_call *c, const char *file, int line, const char *reason) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCALL:%p ref %d -> %d %s",
- c, (int)c->refs.count, (int)c->refs.count + 1,
- reason);
-#else
void grpc_subchannel_call_ref(grpc_subchannel_call *c) {
-#endif
gpr_ref(&c->refs);
}
-#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
-void grpc_subchannel_call_unref(grpc_subchannel_call *c, const char *file, int line, const char *reason) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCALL:%p unref %d -> %d %s",
- c, (int)c->refs.count, (int)c->refs.count - 1,
- reason);
-#else
void grpc_subchannel_call_unref(grpc_subchannel_call *c) {
-#endif
if (gpr_unref(&c->refs)) {
+ gpr_mu *mu = &c->connection->subchannel->mu;
+ grpc_subchannel *destroy;
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c));
- CONNECTION_UNREF(c->connection, "call");
+ gpr_mu_lock(mu);
+ destroy = connection_unref_locked(c->connection);
+ gpr_mu_unlock(mu);
gpr_free(c);
+ if (destroy) {
+ subchannel_destroy(destroy);
+ }
}
}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 7cdcccce8f..8155aba14c 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -37,33 +37,16 @@
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/connector.h"
-#define GRPC_SUBCHANNEL_REFCOUNT_DEBUG
-
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
-#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
-#define GRPC_SUBCHANNEL_REF(c, r) grpc_subchannel_ref((c), __FILE__, __LINE__, (r))
-#define GRPC_SUBCHANNEL_UNREF(c, r) grpc_subchannel_unref((c), __FILE__, __LINE__, (r))
-#define GRPC_SUBCHANNEL_CALL_REF(c, r) grpc_subchannel_call_ref((c), __FILE__, __LINE__, (r))
-#define GRPC_SUBCHANNEL_CALL_UNREF(c, r) grpc_subchannel_call_unref((c), __FILE__, __LINE__, (r))
-void grpc_subchannel_ref(grpc_subchannel *channel, const char *file, int line, const char *reason);
-void grpc_subchannel_unref(grpc_subchannel *channel, const char *file, int line, const char *reason);
-void grpc_subchannel_call_ref(grpc_subchannel_call *call, const char *file, int line, const char *reason);
-void grpc_subchannel_call_unref(grpc_subchannel_call *call, const char *file, int line, const char *reason);
-#else
-#define GRPC_SUBCHANNEL_REF(c, r) grpc_subchannel_ref((c))
-#define GRPC_SUBCHANNEL_UNREF(c, r) grpc_subchannel_unref((c))
-#define GRPC_SUBCHANNEL_CALL_REF(c, r) grpc_subchannel_call_ref((c))
-#define GRPC_SUBCHANNEL_CALL_UNREF(c, r) grpc_subchannel_call_unref((c))
void grpc_subchannel_ref(grpc_subchannel *channel);
void grpc_subchannel_unref(grpc_subchannel *channel);
void grpc_subchannel_call_ref(grpc_subchannel_call *call);
void grpc_subchannel_call_unref(grpc_subchannel_call *call);
-#endif
/** construct a call (possibly asynchronously) */
void grpc_subchannel_create_call(grpc_subchannel *subchannel,
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index a00c08a830..494a44725a 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -163,6 +163,7 @@ grpc_channel *grpc_channel_create(const char *target,
channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver);
+ grpc_resolver_unref(resolver);
return channel;
}