From b49736829a3e9bf4140b17465ff6c208462e783f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 30 Jun 2015 08:15:08 -0700 Subject: Fix pollset_set handling in tcp_client_posix --- src/core/iomgr/tcp_client_posix.c | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src/core/iomgr/tcp_client_posix.c') diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index bbf7711588..20f833e28b 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -63,6 +63,7 @@ typedef struct { grpc_alarm alarm; int refs; grpc_iomgr_closure write_closure; + grpc_pollset_set *interested_parties; } async_connect; static int prepare_socket(const struct sockaddr *addr, int fd) { @@ -152,6 +153,7 @@ static void on_writable(void *acp, int success) { goto finish; } } else { + grpc_pollset_set_del_fd(ac->interested_parties, ac->fd); ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); goto finish; } @@ -164,10 +166,13 @@ static void on_writable(void *acp, int success) { finish: gpr_mu_lock(&ac->mu); + gpr_log(GPR_DEBUG, "ep=%p", ep); if (!ep) { + grpc_pollset_set_del_fd(ac->interested_parties, ac->fd); grpc_fd_orphan(ac->fd, NULL, "tcp_client_orphan"); } done = (--ac->refs == 0); + gpr_log(GPR_DEBUG, "refs=%d", ac->refs); gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); @@ -240,6 +245,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->cb = cb; ac->cb_arg = arg; ac->fd = fdobj; + ac->interested_parties = interested_parties; gpr_mu_init(&ac->mu); ac->refs = 2; ac->write_closure.cb = on_writable; -- cgit v1.2.3 From b9a46ae5d7044c6b436a33341135ead3f6fd7779 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 1 Jul 2015 09:45:21 -0700 Subject: Fix a bug where cancelled calls can be stranded past disconnection --- src/core/channel/client_channel.c | 12 +++++++----- src/core/iomgr/tcp_client_posix.c | 2 -- src/core/surface/call.h | 2 ++ src/core/transport/transport_op_string.c | 7 +++++++ 4 files changed, 16 insertions(+), 7 deletions(-) (limited to 'src/core/iomgr/tcp_client_posix.c') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index a4de59efb1..5d9331d2c5 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -330,9 +330,6 @@ static void perform_transport_stream_op(grpc_call_element *elem, } else { consumed_op = merge_into_waiting_op(elem, op); gpr_mu_unlock(&calld->mu_state); - if (op->on_consumed != NULL) { - op->on_consumed->cb(op->on_consumed->cb_arg, 0); - } } break; } @@ -362,11 +359,16 @@ static void perform_transport_stream_op(grpc_call_element *elem, pick_target(lb_policy, calld); GRPC_LB_POLICY_UNREF(lb_policy, "pick"); - } else { + } else if (chand->resolver != NULL) { calld->state = CALL_WAITING_FOR_CONFIG; add_to_lb_policy_wait_queue_locked_state_config(elem); gpr_mu_unlock(&chand->mu_config); gpr_mu_unlock(&calld->mu_state); + } else { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&chand->mu_config); + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(elem, op); } } } @@ -402,7 +404,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { gpr_mu_lock(&chand->mu_config); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; - if (lb_policy != NULL) { + if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { wakeup_closures = chand->waiting_for_config_closures; chand->waiting_for_config_closures = NULL; } diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 20f833e28b..d981aaf028 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -166,13 +166,11 @@ static void on_writable(void *acp, int success) { finish: gpr_mu_lock(&ac->mu); - gpr_log(GPR_DEBUG, "ep=%p", ep); if (!ep) { grpc_pollset_set_del_fd(ac->interested_parties, ac->fd); grpc_fd_orphan(ac->fd, NULL, "tcp_client_orphan"); } done = (--ac->refs == 0); - gpr_log(GPR_DEBUG, "refs=%d", ac->refs); gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); diff --git a/src/core/surface/call.h b/src/core/surface/call.h index fb3662b50d..8fa411048b 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -94,6 +94,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq); grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); +#define GRPC_CALL_REF_COUNT_DEBUG + #ifdef GRPC_CALL_REF_COUNT_DEBUG void grpc_call_internal_ref(grpc_call *call, const char *reason); void grpc_call_internal_unref(grpc_call *call, const char *reason, diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 516aa9d4d8..1ffdb5be94 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -146,6 +146,13 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { gpr_strvec_add(&b, tmp); } + if (op->on_consumed != NULL) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_asprintf(&tmp, "ON_CONSUMED:%p", op->on_consumed); + gpr_strvec_add(&b, tmp); + } + out = gpr_strvec_flatten(&b, NULL); gpr_strvec_destroy(&b); -- cgit v1.2.3