aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_channel.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-21 16:02:05 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-21 16:02:05 -0700
commit83f88d90b9c61c94994ed00d03a6fa469359d559 (patch)
treef1320aa3cda8b54961de246437c1ea980459750c /src/core/channel/client_channel.c
parent65582323ad6b934b4cbb3a4632bb4c985acc1bdc (diff)
stuff
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r--src/core/channel/client_channel.c93
1 files changed, 32 insertions, 61 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index bc481e59ca..6ad50cb944 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -82,7 +82,7 @@ struct call_data {
/* owning element */
grpc_call_element *elem;
- gpr_uint8 got_first_send;
+ gpr_uint8 got_first_op;
call_state state;
gpr_timespec deadline;
@@ -91,7 +91,7 @@ struct call_data {
/* our child call stack */
grpc_child_call *child_call;
} active;
- grpc_call_op waiting_op;
+ grpc_transport_op waiting_op;
} s;
};
@@ -110,9 +110,7 @@ static int prepare_activate(grpc_call_element *elem,
return 1;
}
-static void do_nothing(void *ignored, grpc_op_error error) {}
-
-static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
+static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
@@ -121,17 +119,16 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
- child_elem->filter->call_op(child_elem, elem, op);
+ child_elem->filter->start_transport_op(child_elem, op);
}
-static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
+static void start_rpc(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu);
if (calld->state == CALL_CANCELLED) {
gpr_mu_unlock(&chand->mu);
- grpc_metadata_batch_destroy(&op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(op);
return;
}
GPR_ASSERT(calld->state == CALL_CREATED);
@@ -187,19 +184,10 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
}
static void send_up_cancelled_ops(grpc_call_element *elem) {
- grpc_call_op finish_op;
- /* send up a synthesized status */
- grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled");
- /* send up a finish */
- finish_op.type = GRPC_RECV_FINISH;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
+ abort();
}
-static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
+static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_call_element *child_elem;
@@ -209,15 +197,13 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
case CALL_ACTIVE:
child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
gpr_mu_unlock(&chand->mu);
- child_elem->filter->call_op(child_elem, elem, op);
+ child_elem->filter->start_transport_op(child_elem, op);
return; /* early out */
case CALL_WAITING:
- grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
remove_waiting_child(chand, calld);
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu);
send_up_cancelled_ops(elem);
- calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR);
return; /* early out */
case CALL_CREATED:
calld->state = CALL_CANCELLED;
@@ -232,40 +218,27 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
abort();
}
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void cc_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
call_data *calld = elem->call_data;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_SEND_METADATA:
- if (!calld->got_first_send) {
- /* filter out the start event to find which child to send on */
- calld->got_first_send = 1;
- start_rpc(elem, op);
- } else {
- grpc_call_next_op(elem, op);
- }
- break;
- case GRPC_CANCEL_OP:
- cancel_rpc(elem, op);
- break;
- case GRPC_SEND_MESSAGE:
- case GRPC_SEND_FINISH:
- case GRPC_REQUEST_DATA:
- if (calld->state == CALL_ACTIVE) {
- grpc_call_element *child_elem =
- grpc_child_call_get_top_element(calld->s.active.child_call);
- child_elem->filter->call_op(child_elem, elem, op);
- } else {
- op->done_cb(op->user_data, GRPC_OP_ERROR);
- }
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
- break;
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ GPR_ASSERT(op->send_ops == NULL);
+ GPR_ASSERT(op->recv_ops == NULL);
+
+ cancel_rpc(elem, op);
+ return;
+ }
+
+ if (!calld->got_first_op) {
+ calld->got_first_op = 1;
+ start_rpc(elem, op);
+ } else {
+ grpc_call_element *child_elem =
+ grpc_child_call_get_top_element(calld->s.active.child_call);
+ child_elem->filter->start_transport_op(child_elem, op);
}
}
@@ -359,7 +332,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
- calld->got_first_send = 0;
+ calld->got_first_op = 0;
}
/* Destructor for call_data */
@@ -372,9 +345,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
if (calld->state == CALL_ACTIVE) {
grpc_child_call_destroy(calld->s.active.child_call);
}
- if (calld->state == CALL_WAITING) {
- grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
- }
+ GPR_ASSERT(calld->state != CALL_WAITING);
}
/* Constructor for channel_data */
@@ -417,7 +388,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_channel_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"client-channel",
};
@@ -436,7 +407,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_data **waiting_children;
size_t waiting_child_count;
size_t i;
- grpc_call_op *call_ops;
+ grpc_transport_op *call_ops;
/* build the child filter stack */
child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
@@ -472,13 +443,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
chand->waiting_child_count = 0;
chand->waiting_child_capacity = 0;
- call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
+ call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
call_ops[i] = waiting_children[i]->s.waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
- call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(&call_ops[i]);
}
}