From e70413c91607378857e739f496fac034b368ab85 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 24 Apr 2015 10:12:34 -0700 Subject: Handle reading after cancellation --- src/core/channel/client_channel.c | 46 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 3 deletions(-) (limited to 'src/core/channel/client_channel.c') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 642822a267..0ad108ad6b 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -58,6 +58,7 @@ typedef struct { /* the sending child (may be null) */ grpc_child_channel *active_child; + grpc_mdctx *mdctx; /* calls waiting for a channel to be ready */ call_data **waiting_children; @@ -92,6 +93,10 @@ struct call_data { grpc_child_call *child_call; } active; grpc_transport_op waiting_op; + struct { + grpc_linked_mdelem status; + grpc_linked_mdelem details; + } cancelled; } s; }; @@ -185,12 +190,38 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { chand->waiting_child_count = new_count; } -static void send_up_cancelled_ops(grpc_call_element *elem) { abort(); } +static void handle_op_after_cancellation(grpc_call_element *elem, grpc_transport_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + if (op->send_ops) { + op->on_done_send(op->send_user_data, 0); + } + if (op->recv_ops) { + char status[GPR_LTOA_MIN_BUFSIZE]; + grpc_metadata_batch mdb; + gpr_ltoa(GRPC_STATUS_CANCELLED, status); + calld->s.cancelled.status.md = grpc_mdelem_from_strings(chand->mdctx, + "grpc-status", status); + calld->s.cancelled.details.md = grpc_mdelem_from_strings(chand->mdctx, + "grpc-message", "Cancelled"); + calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL; + calld->s.cancelled.status.next = &calld->s.cancelled.details; + calld->s.cancelled.details.prev = &calld->s.cancelled.status; + mdb.list.head = &calld->s.cancelled.status; + mdb.list.tail = &calld->s.cancelled.details; + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = gpr_inf_future; + grpc_sopb_add_metadata(op->recv_ops, mdb); + *op->recv_state = GRPC_STREAM_CLOSED; + op->on_done_recv(op->recv_user_data, 1); + } +} static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_call_element *child_elem; + grpc_transport_op waiting_op; gpr_mu_lock(&chand->mu); switch (calld->state) { @@ -200,18 +231,21 @@ static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) { child_elem->filter->start_transport_op(child_elem, op); return; /* early out */ case CALL_WAITING: + waiting_op = calld->s.waiting_op; remove_waiting_child(chand, calld); calld->state = CALL_CANCELLED; gpr_mu_unlock(&chand->mu); - send_up_cancelled_ops(elem); + handle_op_after_cancellation(elem, &waiting_op); + handle_op_after_cancellation(elem, op); return; /* early out */ case CALL_CREATED: calld->state = CALL_CANCELLED; gpr_mu_unlock(&chand->mu); - send_up_cancelled_ops(elem); + handle_op_after_cancellation(elem, op); return; /* early out */ case CALL_CANCELLED: gpr_mu_unlock(&chand->mu); + handle_op_after_cancellation(elem, op); return; /* early out */ } gpr_log(GPR_ERROR, "should never reach here"); @@ -232,6 +266,11 @@ static void cc_start_transport_op(grpc_call_element *elem, return; } + if (calld->state == CALL_CANCELLED) { + handle_op_after_cancellation(elem, op); + return; + } + if (!calld->got_first_op) { calld->got_first_op = 1; start_rpc(elem, op); @@ -371,6 +410,7 @@ static void init_channel_elem(grpc_channel_element *elem, chand->transport_setup = NULL; chand->transport_setup_initiated = 0; chand->args = grpc_channel_args_copy(args); + chand->mdctx = metadata_context; } /* Destructor for channel_data */ -- cgit v1.2.3