aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-29 15:25:49 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-29 15:25:49 -0700
commitb6fbf1d986331c8959e60a172fe922d9f864b03f (patch)
treef74666e2723dec8f70d2beaa82c510a7dda13e89
parentc396753a367677a4f2a111a850c2290dd27e2047 (diff)
Fix refcounting
-rw-r--r--src/core/channel/client_channel.c1
-rw-r--r--src/core/client_config/subchannel.c50
2 files changed, 45 insertions, 6 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index d3d2f42ec1..6b60dc07cf 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -402,7 +402,6 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op
grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE);
destroy_resolver = chand->resolver;
chand->resolver = NULL;
- op->disconnect = 0;
}
if (!is_empty(op, sizeof(*op))) {
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 3d065761ab..eadeb0ef55 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -80,6 +80,8 @@ struct grpc_subchannel {
grpc_mdctx *mdctx;
/** master channel */
grpc_channel *master;
+ /** have we seen a disconnection? */
+ int disconnected;
/** set during connection */
grpc_connect_out_args connecting_result;
@@ -152,6 +154,7 @@ static void subchannel_destroy(grpc_subchannel *c);
static void connection_destroy(connection *c) {
GPR_ASSERT(c->refs == 0);
+ gpr_log(GPR_DEBUG, "CONNECTION_DESTROY %p", c);
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
}
@@ -342,8 +345,32 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_transport_op *op) {
- gpr_log(GPR_ERROR, "grpc_subchannel_process_transport_op not implemented");
- abort();
+ connection *con = NULL;
+ grpc_subchannel *destroy;
+ gpr_mu_lock(&c->mu);
+ if (op->disconnect) {
+ c->disconnected = 1;
+ grpc_connectivity_state_set(&c->state_tracker,
+ compute_connectivity_locked(c));
+ }
+ if (c->active != NULL) {
+ con = c->active;
+ CONNECTION_REF_LOCKED(con, "transport-op");
+ }
+ gpr_mu_unlock(&c->mu);
+
+ if (con != NULL) {
+ grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
+ grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0);
+ top_elem->filter->start_transport_op(top_elem, op);
+
+ gpr_mu_lock(&c->mu);
+ destroy = CONNECTION_UNREF_LOCKED(con, "transport-op");
+ gpr_mu_unlock(&c->mu);
+ if (destroy) {
+ subchannel_destroy(destroy);
+ }
+ }
}
static void on_state_changed(void *p, int iomgr_success) {
@@ -388,7 +415,7 @@ static void on_state_changed(void *p, int iomgr_success) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* things are starting to go wrong, reconnect but don't deactivate */
/* released by connection */
- SUBCHANNEL_REF_LOCKED(c, "connection");
+ SUBCHANNEL_REF_LOCKED(c, "connecting");
do_connect = 1;
c->connecting = 1;
break;
@@ -397,7 +424,7 @@ static void on_state_changed(void *p, int iomgr_success) {
done:
grpc_connectivity_state_set(&c->state_tracker,
compute_connectivity_locked(c));
- destroy = SUBCHANNEL_UNREF_LOCKED(c, "connection");
+ destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
gpr_free(sw);
gpr_mu_unlock(mu);
if (do_connect) {
@@ -450,6 +477,14 @@ static void publish_transport(grpc_subchannel *c) {
gpr_mu_lock(&c->mu);
+ if (c->disconnected) {
+ gpr_mu_unlock(&c->mu);
+ gpr_free(sw);
+ gpr_free(filters);
+ grpc_channel_stack_destroy(stk);
+ return;
+ }
+
/* publish */
if (c->active != NULL && c->active->refs == 0) {
destroy_connection = c->active;
@@ -464,6 +499,8 @@ static void publish_transport(grpc_subchannel *c) {
memset(&op, 0, sizeof(op));
op.connectivity_state = &sw->connectivity_state;
op.on_connectivity_state_change = &sw->closure;
+ SUBCHANNEL_REF_LOCKED(c, "state_watcher");
+ GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
elem =
grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
elem->filter->start_transport_op(elem, &op);
@@ -491,7 +528,7 @@ static void subchannel_connected(void *arg, int iomgr_success) {
} else {
int destroy;
gpr_mu_lock(&c->mu);
- destroy = SUBCHANNEL_UNREF_LOCKED(c, "connection");
+ destroy = SUBCHANNEL_UNREF_LOCKED(c, "connecting");
gpr_mu_unlock(&c->mu);
if (destroy) subchannel_destroy(c);
/* TODO(ctiller): retry after sleeping */
@@ -504,6 +541,9 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
}
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
+ if (c->disconnected) {
+ return GRPC_CHANNEL_FATAL_FAILURE;
+ }
if (c->connecting) {
return GRPC_CHANNEL_CONNECTING;
}