aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-01 09:45:21 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-01 09:45:21 -0700
commitb9a46ae5d7044c6b436a33341135ead3f6fd7779 (patch)
tree3d6eb82b86855c8bcf7e37a69e7d1528d1ba0b54
parent06a1963435bdfa4745a503ce4a6c0422302841f1 (diff)
Fix a bug where cancelled calls can be stranded past disconnection
-rw-r--r--src/core/channel/client_channel.c12
-rw-r--r--src/core/iomgr/tcp_client_posix.c2
-rw-r--r--src/core/surface/call.h2
-rw-r--r--src/core/transport/transport_op_string.c7
4 files changed, 16 insertions, 7 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index a4de59efb1..5d9331d2c5 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -330,9 +330,6 @@ static void perform_transport_stream_op(grpc_call_element *elem,
} else {
consumed_op = merge_into_waiting_op(elem, op);
gpr_mu_unlock(&calld->mu_state);
- if (op->on_consumed != NULL) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 0);
- }
}
break;
}
@@ -362,11 +359,16 @@ static void perform_transport_stream_op(grpc_call_element *elem,
pick_target(lb_policy, calld);
GRPC_LB_POLICY_UNREF(lb_policy, "pick");
- } else {
+ } else if (chand->resolver != NULL) {
calld->state = CALL_WAITING_FOR_CONFIG;
add_to_lb_policy_wait_queue_locked_state_config(elem);
gpr_mu_unlock(&chand->mu_config);
gpr_mu_unlock(&calld->mu_state);
+ } else {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu_config);
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(elem, op);
}
}
}
@@ -402,7 +404,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
gpr_mu_lock(&chand->mu_config);
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
- if (lb_policy != NULL) {
+ if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
wakeup_closures = chand->waiting_for_config_closures;
chand->waiting_for_config_closures = NULL;
}
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 20f833e28b..d981aaf028 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -166,13 +166,11 @@ static void on_writable(void *acp, int success) {
finish:
gpr_mu_lock(&ac->mu);
- gpr_log(GPR_DEBUG, "ep=%p", ep);
if (!ep) {
grpc_pollset_set_del_fd(ac->interested_parties, ac->fd);
grpc_fd_orphan(ac->fd, NULL, "tcp_client_orphan");
}
done = (--ac->refs == 0);
- gpr_log(GPR_DEBUG, "refs=%d", ac->refs);
gpr_mu_unlock(&ac->mu);
if (done) {
gpr_mu_destroy(&ac->mu);
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index fb3662b50d..8fa411048b 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -94,6 +94,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
+#define GRPC_CALL_REF_COUNT_DEBUG
+
#ifdef GRPC_CALL_REF_COUNT_DEBUG
void grpc_call_internal_ref(grpc_call *call, const char *reason);
void grpc_call_internal_unref(grpc_call *call, const char *reason,
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index 516aa9d4d8..1ffdb5be94 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -146,6 +146,13 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
gpr_strvec_add(&b, tmp);
}
+ if (op->on_consumed != NULL) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_asprintf(&tmp, "ON_CONSUMED:%p", op->on_consumed);
+ gpr_strvec_add(&b, tmp);
+ }
+
out = gpr_strvec_flatten(&b, NULL);
gpr_strvec_destroy(&b);