aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r--src/core/channel/client_channel.c45
1 files changed, 39 insertions, 6 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 42e242ae81..726196e996 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -39,6 +39,7 @@
#include "src/core/channel/child_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/pollset_set.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -101,10 +102,17 @@ struct call_data {
static int prepare_activate(grpc_call_element *elem,
grpc_child_channel *on_child) {
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
if (calld->state == CALL_CANCELLED) return 0;
/* no more access to calld->s.waiting allowed */
GPR_ASSERT(calld->state == CALL_WAITING);
+
+ if (calld->s.waiting_op.bind_pollset) {
+ grpc_transport_setup_del_interested_party(chand->transport_setup,
+ calld->s.waiting_op.bind_pollset);
+ }
+
calld->state = CALL_ACTIVE;
/* create a child call */
@@ -131,7 +139,11 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
size_t new_count;
size_t i;
for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
- if (chand->waiting_children[i] == calld) continue;
+ if (chand->waiting_children[i] == calld) {
+ grpc_transport_setup_del_interested_party(
+ chand->transport_setup, calld->s.waiting_op.bind_pollset);
+ continue;
+ }
chand->waiting_children[new_count++] = chand->waiting_children[i];
}
GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
@@ -166,6 +178,9 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
*op->recv_state = GRPC_STREAM_CLOSED;
op->on_done_recv(op->recv_user_data, 1);
}
+ if (op->on_consumed) {
+ op->on_consumed(op->on_consumed_user_data, 0);
+ }
}
static void cc_start_transport_op(grpc_call_element *elem,
@@ -191,6 +206,7 @@ static void cc_start_transport_op(grpc_call_element *elem,
handle_op_after_cancellation(elem, op);
} else {
calld->state = CALL_WAITING;
+ calld->s.waiting_op.bind_pollset = NULL;
if (chand->active_child) {
/* channel is connected - use the connected stack */
if (prepare_activate(elem, chand->active_child)) {
@@ -222,6 +238,8 @@ static void cc_start_transport_op(grpc_call_element *elem,
}
calld->s.waiting_op = *op;
chand->waiting_children[chand->waiting_child_count++] = calld;
+ grpc_transport_setup_add_interested_party(chand->transport_setup,
+ op->bind_pollset);
gpr_mu_unlock(&chand->mu);
/* finally initiate transport setup if needed */
@@ -257,6 +275,9 @@ static void cc_start_transport_op(grpc_call_element *elem,
calld->s.waiting_op.recv_user_data = op->recv_user_data;
}
gpr_mu_unlock(&chand->mu);
+ if (op->on_consumed) {
+ op->on_consumed(op->on_consumed_user_data, 0);
+ }
}
break;
case CALL_CANCELLED:
@@ -365,12 +386,24 @@ static void init_call_elem(grpc_call_element *elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
- if (calld->state == CALL_ACTIVE) {
- grpc_child_call_destroy(calld->s.active.child_call);
+ gpr_mu_lock(&chand->mu);
+ switch (calld->state) {
+ case CALL_ACTIVE:
+ gpr_mu_unlock(&chand->mu);
+ grpc_child_call_destroy(calld->s.active.child_call);
+ break;
+ case CALL_WAITING:
+ remove_waiting_child(chand, calld);
+ gpr_mu_unlock(&chand->mu);
+ break;
+ default:
+ gpr_mu_unlock(&chand->mu);
+ break;
}
GPR_ASSERT(calld->state != CALL_WAITING);
}
@@ -416,9 +449,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_channel_filter = {
- cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "client-channel",
+ cc_start_transport_op, channel_op, sizeof(call_data),
+ init_call_elem, destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, "client-channel",
};
grpc_transport_setup_result grpc_client_channel_transport_setup_complete(