diff options
Diffstat (limited to 'src/core/lib/channel/channel_stack.c')
-rw-r--r-- | src/core/lib/channel/channel_stack.c | 43 |
1 files changed, 37 insertions, 6 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index ec973d4e7f..8f08b427fb 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -170,7 +170,7 @@ grpc_error *grpc_call_stack_init( grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, grpc_call_context_element *context, const void *transport_server_data, - grpc_slice path, gpr_timespec start_time, gpr_timespec deadline, + grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline, grpc_call_stack *call_stack) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); grpc_call_element_args args; @@ -288,10 +288,41 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { sizeof(grpc_call_stack))); } -void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_error *error) { - grpc_transport_stream_op *op = grpc_make_transport_stream_op(NULL); - op->cancel_error = error; +static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) { + gpr_free(op); +} + +void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); + memset(op, 0, sizeof(*op)); + op->cancel_error = GRPC_ERROR_CANCELLED; + op->on_complete = + grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); + elem->filter->start_transport_stream_op(exec_ctx, elem, op); +} + +void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_status_code status, + grpc_slice *optional_message) { + grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); + memset(op, 0, sizeof(*op)); + op->on_complete = + grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); + grpc_transport_stream_op_add_cancellation_with_message(exec_ctx, op, status, + optional_message); + elem->filter->start_transport_stream_op(exec_ctx, elem, op); +} + +void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_status_code status, + grpc_slice *optional_message) { + grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); + memset(op, 0, sizeof(*op)); + op->on_complete = + grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); + grpc_transport_stream_op_add_close(exec_ctx, op, status, optional_message); elem->filter->start_transport_stream_op(exec_ctx, elem, op); } |