aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel/channel_stack.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/channel/channel_stack.c')
-rw-r--r--src/core/lib/channel/channel_stack.c32
1 files changed, 21 insertions, 11 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 0655b9353f..57d34d9e9a 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -158,13 +158,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
}
}
-grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
- int initial_refs, grpc_iomgr_cb_func destroy,
- void *destroy_arg,
- grpc_call_context_element *context,
- const void *transport_server_data,
- grpc_call_stack *call_stack) {
+grpc_error *grpc_call_stack_init(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
+ grpc_call_context_element *context, const void *transport_server_data,
+ gpr_timespec deadline, grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@@ -185,6 +183,7 @@ grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
args.context = context;
+ args.deadline = deadline;
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
@@ -276,16 +275,16 @@ static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) {
}
void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
- grpc_call_element *cur_elem) {
+ grpc_call_element *elem) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
op->cancel_error = GRPC_ERROR_CANCELLED;
op->on_complete = grpc_closure_create(destroy_op, op);
- grpc_call_next_op(exec_ctx, cur_elem, op);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *cur_elem,
+ grpc_call_element *elem,
grpc_status_code status,
gpr_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
@@ -293,5 +292,16 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
op->on_complete = grpc_closure_create(destroy_op, op);
grpc_transport_stream_op_add_cancellation_with_message(op, status,
optional_message);
- grpc_call_next_op(exec_ctx, cur_elem, op);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
+}
+
+void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_status_code status,
+ gpr_slice *optional_message) {
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->on_complete = grpc_closure_create(destroy_op, op);
+ grpc_transport_stream_op_add_close(op, status, optional_message);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}