aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel/channel_stack.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/channel/channel_stack.c')
-rw-r--r--src/core/lib/channel/channel_stack.c43
1 files changed, 37 insertions, 6 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index ec973d4e7f..8f08b427fb 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -170,7 +170,7 @@ grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
- grpc_slice path, gpr_timespec start_time, gpr_timespec deadline,
+ grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline,
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
@@ -288,10 +288,41 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
sizeof(grpc_call_stack)));
}
-void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_error *error) {
- grpc_transport_stream_op *op = grpc_make_transport_stream_op(NULL);
- op->cancel_error = error;
+static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) {
+ gpr_free(op);
+}
+
+void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->cancel_error = GRPC_ERROR_CANCELLED;
+ op->on_complete =
+ grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
+}
+
+void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_status_code status,
+ grpc_slice *optional_message) {
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->on_complete =
+ grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx);
+ grpc_transport_stream_op_add_cancellation_with_message(exec_ctx, op, status,
+ optional_message);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
+}
+
+void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_status_code status,
+ grpc_slice *optional_message) {
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->on_complete =
+ grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx);
+ grpc_transport_stream_op_add_close(exec_ctx, op, status, optional_message);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}