aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_channel.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-24 10:12:34 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-24 10:12:34 -0700
commite70413c91607378857e739f496fac034b368ab85 (patch)
tree2d7cb37f9dcb170cdc1061673ae6c2de7eb1c45b /src/core/channel/client_channel.c
parent4e87e0076716338039b747a15a1df37f721b188a (diff)
Handle reading after cancellation
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r--src/core/channel/client_channel.c46
1 files changed, 43 insertions, 3 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 642822a267..0ad108ad6b 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -58,6 +58,7 @@ typedef struct {
/* the sending child (may be null) */
grpc_child_channel *active_child;
+ grpc_mdctx *mdctx;
/* calls waiting for a channel to be ready */
call_data **waiting_children;
@@ -92,6 +93,10 @@ struct call_data {
grpc_child_call *child_call;
} active;
grpc_transport_op waiting_op;
+ struct {
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem details;
+ } cancelled;
} s;
};
@@ -185,12 +190,38 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
chand->waiting_child_count = new_count;
}
-static void send_up_cancelled_ops(grpc_call_element *elem) { abort(); }
+static void handle_op_after_cancellation(grpc_call_element *elem, grpc_transport_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ if (op->send_ops) {
+ op->on_done_send(op->send_user_data, 0);
+ }
+ if (op->recv_ops) {
+ char status[GPR_LTOA_MIN_BUFSIZE];
+ grpc_metadata_batch mdb;
+ gpr_ltoa(GRPC_STATUS_CANCELLED, status);
+ calld->s.cancelled.status.md = grpc_mdelem_from_strings(chand->mdctx,
+ "grpc-status", status);
+ calld->s.cancelled.details.md = grpc_mdelem_from_strings(chand->mdctx,
+ "grpc-message", "Cancelled");
+ calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
+ calld->s.cancelled.status.next = &calld->s.cancelled.details;
+ calld->s.cancelled.details.prev = &calld->s.cancelled.status;
+ mdb.list.head = &calld->s.cancelled.status;
+ mdb.list.tail = &calld->s.cancelled.details;
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = gpr_inf_future;
+ grpc_sopb_add_metadata(op->recv_ops, mdb);
+ *op->recv_state = GRPC_STREAM_CLOSED;
+ op->on_done_recv(op->recv_user_data, 1);
+ }
+}
static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_call_element *child_elem;
+ grpc_transport_op waiting_op;
gpr_mu_lock(&chand->mu);
switch (calld->state) {
@@ -200,18 +231,21 @@ static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) {
child_elem->filter->start_transport_op(child_elem, op);
return; /* early out */
case CALL_WAITING:
+ waiting_op = calld->s.waiting_op;
remove_waiting_child(chand, calld);
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu);
- send_up_cancelled_ops(elem);
+ handle_op_after_cancellation(elem, &waiting_op);
+ handle_op_after_cancellation(elem, op);
return; /* early out */
case CALL_CREATED:
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu);
- send_up_cancelled_ops(elem);
+ handle_op_after_cancellation(elem, op);
return; /* early out */
case CALL_CANCELLED:
gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, op);
return; /* early out */
}
gpr_log(GPR_ERROR, "should never reach here");
@@ -232,6 +266,11 @@ static void cc_start_transport_op(grpc_call_element *elem,
return;
}
+ if (calld->state == CALL_CANCELLED) {
+ handle_op_after_cancellation(elem, op);
+ return;
+ }
+
if (!calld->got_first_op) {
calld->got_first_op = 1;
start_rpc(elem, op);
@@ -371,6 +410,7 @@ static void init_channel_elem(grpc_channel_element *elem,
chand->transport_setup = NULL;
chand->transport_setup_initiated = 0;
chand->args = grpc_channel_args_copy(args);
+ chand->mdctx = metadata_context;
}
/* Destructor for channel_data */