aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_channel.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-17 14:57:44 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-17 14:57:44 -0700
commit8b282cbd0b1df8ae07b563b1a11eecb8e3bbe0a6 (patch)
tree4d4c653b8f351a0ec3bb332403e6c68b2451a6b8 /src/core/channel/client_channel.c
parent9c1043e757a050b789c21498c9c6984133a04d8e (diff)
Got rid of GRPC_SEND_START
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r--src/core/channel/client_channel.c61
1 files changed, 14 insertions, 47 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 018228b4e6..fb984f2878 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -82,21 +82,16 @@ struct call_data {
/* owning element */
grpc_call_element *elem;
+ gpr_uint8 got_first_send;
+
call_state state;
- grpc_metadata_batch pending_metadata;
- gpr_uint32 pending_metadata_flags;
gpr_timespec deadline;
union {
struct {
/* our child call stack */
grpc_child_call *child_call;
} active;
- struct {
- void (*on_complete)(void *user_data, grpc_op_error error);
- void *on_complete_user_data;
- gpr_uint32 start_flags;
- grpc_pollset *pollset;
- } waiting;
+ grpc_call_op waiting_op;
} s;
};
@@ -121,19 +116,9 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
- grpc_call_op mop;
GPR_ASSERT(calld->state == CALL_ACTIVE);
- /* sending buffered metadata down the stack before the start call */
- mop.type = GRPC_SEND_METADATA;
- mop.dir = GRPC_CALL_DOWN;
- mop.flags = calld->pending_metadata_flags;
- mop.data.metadata = calld->pending_metadata;
- mop.done_cb = do_nothing;
- mop.user_data = NULL;
- child_elem->filter->call_op(child_elem, elem, &mop);
-
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
child_elem->filter->call_op(child_elem, elem, op);
@@ -177,10 +162,7 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
gpr_realloc(chand->waiting_children,
chand->waiting_child_capacity * sizeof(call_data *));
}
- calld->s.waiting.on_complete = op->done_cb;
- calld->s.waiting.on_complete_user_data = op->user_data;
- calld->s.waiting.start_flags = op->flags;
- calld->s.waiting.pollset = op->data.start.pollset;
+ calld->s.waiting_op = *op;
chand->waiting_children[chand->waiting_child_count++] = calld;
gpr_mu_unlock(&chand->mu);
@@ -233,7 +215,7 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu);
send_up_cancelled_ops(elem);
- calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data,
+ calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data,
GRPC_OP_ERROR);
return; /* early out */
case CALL_CREATED:
@@ -257,12 +239,13 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
switch (op->type) {
case GRPC_SEND_METADATA:
- grpc_metadata_batch_merge(&calld->pending_metadata, &op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_SEND_START:
- /* filter out the start event to find which child to send on */
- start_rpc(elem, op);
+ if (!calld->got_first_send) {
+ /* filter out the start event to find which child to send on */
+ calld->got_first_send = 1;
+ start_rpc(elem, op);
+ } else {
+ grpc_call_next_op(elem, op);
+ }
break;
case GRPC_CANCEL_OP:
cancel_rpc(elem, op);
@@ -365,12 +348,6 @@ static void channel_op(grpc_channel_element *elem,
}
}
-static void error_bad_on_complete(void *arg, grpc_op_error error) {
- gpr_log(GPR_ERROR,
- "Waiting finished but not started? Bad on_complete callback");
- abort();
-}
-
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data) {
@@ -381,17 +358,13 @@ static void init_call_elem(grpc_call_element *elem,
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
- calld->s.waiting.on_complete = error_bad_on_complete;
- calld->s.waiting.on_complete_user_data = NULL;
- grpc_metadata_batch_init(&calld->pending_metadata);
+ calld->got_first_send = 0;
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
- /* if the metadata buffer is not flushed, destroy it here. */
- grpc_metadata_batch_destroy(&calld->pending_metadata);
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
@@ -498,13 +471,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
- call_ops[i].type = GRPC_SEND_START;
- call_ops[i].dir = GRPC_CALL_DOWN;
- call_ops[i].flags = waiting_children[i]->s.waiting.start_flags;
- call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
- call_ops[i].user_data =
- waiting_children[i]->s.waiting.on_complete_user_data;
- call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset;
+ call_ops[i] = waiting_children[i]->s.waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);