aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r--src/core/lib/channel/channel_stack.c9
-rw-r--r--src/core/lib/channel/compress_filter.c6
-rw-r--r--src/core/lib/channel/deadline_filter.c11
-rw-r--r--src/core/lib/channel/handshaker.c8
-rw-r--r--src/core/lib/channel/http_client_filter.c15
-rw-r--r--src/core/lib/channel/http_server_filter.c9
-rw-r--r--src/core/lib/channel/message_size_filter.c5
7 files changed, 41 insertions, 22 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 1d0b7d4f31..cddd84fcca 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -297,7 +297,8 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
op->cancel_error = GRPC_ERROR_CANCELLED;
- op->on_complete = grpc_closure_create(destroy_op, op);
+ op->on_complete =
+ grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
@@ -307,7 +308,8 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
- op->on_complete = grpc_closure_create(destroy_op, op);
+ op->on_complete =
+ grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx);
grpc_transport_stream_op_add_cancellation_with_message(op, status,
optional_message);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
@@ -319,7 +321,8 @@ void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
grpc_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
- op->on_complete = grpc_closure_create(destroy_op, op);
+ op->on_complete =
+ grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx);
grpc_transport_stream_op_add_close(op, status, optional_message);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 0e336dc330..35455d4908 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -269,8 +269,10 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* initialize members */
grpc_slice_buffer_init(&calld->slices);
calld->has_compression_algorithm = 0;
- grpc_closure_init(&calld->got_slice, got_slice, elem);
- grpc_closure_init(&calld->send_done, send_done, elem);
+ grpc_closure_init(&calld->got_slice, got_slice, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->send_done, send_done, elem,
+ grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index 470ccfea57..902ebf1955 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -123,7 +123,8 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
grpc_transport_stream_op* op) {
deadline_state->next_on_complete = op->on_complete;
- grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state);
+ grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state,
+ grpc_schedule_on_exec_ctx);
op->on_complete = &deadline_state->on_complete;
}
@@ -172,8 +173,9 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
state->elem = elem;
state->deadline = deadline;
- grpc_closure_init(&state->closure, start_timer_after_init, state);
- grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_init(&state->closure, start_timer_after_init, state,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE);
}
}
@@ -290,7 +292,8 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
calld->recv_initial_metadata = op->recv_initial_metadata;
grpc_closure_init(&calld->recv_initial_metadata_ready,
- recv_initial_metadata_ready, elem);
+ recv_initial_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready;
}
// Make sure we know when the call is complete, so that we can cancel
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index 23edc826ca..ff827527b3 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -165,7 +165,7 @@ static bool call_next_handshaker_locked(grpc_exec_ctx* exec_ctx,
// Cancel deadline timer, since we're invoking the on_handshake_done
// callback now.
grpc_timer_cancel(exec_ctx, &mgr->deadline_timer);
- grpc_exec_ctx_sched(exec_ctx, &mgr->on_handshake_done, error, NULL);
+ grpc_closure_sched(exec_ctx, &mgr->on_handshake_done, error);
mgr->shutdown = true;
} else {
grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->index],
@@ -218,8 +218,10 @@ void grpc_handshake_manager_do_handshake(
grpc_slice_buffer_init(mgr->args.read_buffer);
// Initialize state needed for calling handshakers.
mgr->acceptor = acceptor;
- grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr);
- grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args);
+ grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args,
+ grpc_schedule_on_exec_ctx);
// Start deadline timer, which owns a ref.
gpr_ref(&mgr->refs);
grpc_timer_init(exec_ctx, &mgr->deadline_timer,
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 1a2d08dda5..c37cab3939 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -352,12 +352,17 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
calld->send_message_blocked = false;
grpc_slice_buffer_init(&calld->slices);
grpc_closure_init(&calld->hc_on_recv_initial_metadata,
- hc_on_recv_initial_metadata, elem);
+ hc_on_recv_initial_metadata, elem,
+ grpc_schedule_on_exec_ctx);
grpc_closure_init(&calld->hc_on_recv_trailing_metadata,
- hc_on_recv_trailing_metadata, elem);
- grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem);
- grpc_closure_init(&calld->got_slice, got_slice, elem);
- grpc_closure_init(&calld->send_done, send_done, elem);
+ hc_on_recv_trailing_metadata, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->got_slice, got_slice, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->send_done, send_done, elem,
+ grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index a5134ee21b..a6d720530a 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -334,9 +334,12 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
/* initialize members */
memset(calld, 0, sizeof(*calld));
- grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
- grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem);
- grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem);
+ grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem,
+ grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&calld->read_slice_buffer);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index f05c789010..b5e882de52 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -124,7 +124,7 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
gpr_free(message_string);
}
// Invoke the next callback.
- grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL);
+ grpc_closure_sched(exec_ctx, calld->next_recv_message_ready, error);
}
// Start transport stream op.
@@ -160,7 +160,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
channel_data* chand = elem->channel_data;
call_data* calld = elem->call_data;
calld->next_recv_message_ready = NULL;
- grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem);
+ grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem,
+ grpc_schedule_on_exec_ctx);
// Get max sizes from channel data, then merge in per-method config values.
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response