diff options
author | 2017-01-04 19:57:00 -0800 | |
---|---|---|
committer | 2017-01-04 23:45:53 -0800 | |
commit | 1da51ce3f6f9f9f11f617f7a7acb379df4086a06 (patch) | |
tree | 3572480ef757dc1dac7b0ef4e877482214361ebd /src/core/ext/transport | |
parent | 8803ae8c3a643f35d15c9269872d3c1fd2cc8409 (diff) | |
parent | 661c2657835246729cb784dd1f0226b262e675b3 (diff) |
Merge branch 'master' into packet-coalescing-core
Diffstat (limited to 'src/core/ext/transport')
6 files changed, 110 insertions, 149 deletions
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index 114bb07222..be0d42605f 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -47,7 +47,6 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" #include "src/core/lib/iomgr/tcp_client.h" -#include "src/core/lib/security/transport/security_connector.h" typedef struct { grpc_connector base; @@ -141,7 +140,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } grpc_closure *notify = c->notify; c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + grpc_closure_sched(exec_ctx, notify, error); grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); c->handshake_mgr = NULL; gpr_mu_unlock(&c->mu); @@ -180,7 +179,7 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, memset(c->result, 0, sizeof(*c->result)); grpc_closure *notify = c->notify; c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + grpc_closure_sched(exec_ctx, notify, error); gpr_mu_unlock(&c->mu); chttp2_connector_unref(exec_ctx, arg); } else { @@ -203,7 +202,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { memset(c->result, 0, sizeof(*c->result)); grpc_closure *notify = c->notify; c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + grpc_closure_sched(exec_ctx, notify, error); if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint); gpr_mu_unlock(&c->mu); chttp2_connector_unref(exec_ctx, arg); @@ -211,7 +210,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GPR_ASSERT(c->endpoint != NULL); if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, - c); + c, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&c->initial_string_buffer); grpc_slice_buffer_add(&c->initial_string_buffer, c->args.initial_connect_string); @@ -237,7 +236,7 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, c->result = result; GPR_ASSERT(c->endpoint == NULL); chttp2_connector_ref(con); // Ref taken for callback. - grpc_closure_init(&c->connected, connected, c); + grpc_closure_init(&c->connected, connected, c, grpc_schedule_on_exec_ctx); GPR_ASSERT(!c->connecting); c->connecting = true; grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint, diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index f0857714fc..9af62d372e 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -281,7 +281,8 @@ grpc_error *grpc_chttp2_server_add_port( state = gpr_malloc(sizeof(*state)); memset(state, 0, sizeof(*state)); grpc_closure_init(&state->tcp_server_shutdown_complete, - tcp_server_shutdown_complete, state); + tcp_server_shutdown_complete, state, + grpc_schedule_on_exec_ctx); err = grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete, args, &tcp_server); if (err != GRPC_ERROR_NONE) { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 6bc054866b..488c3b93cc 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -73,20 +73,14 @@ static const grpc_transport_vtable vtable; static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void write_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); -static void write_action_end(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); -static void read_action_begin(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); static void read_action_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, grpc_error *error); -static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs, - grpc_error *error); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value); @@ -112,12 +106,8 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, grpc_error *error_ignored); -static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); -static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); @@ -166,8 +156,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, and maybe they hold resources that need to be freed */ while (t->pings.next != &t->pings) { grpc_chttp2_outstanding_ping *ping = t->pings.next; - grpc_exec_ctx_sched(exec_ctx, ping->on_recv, - GRPC_ERROR_CREATE("Transport closed"), NULL); + grpc_closure_sched(exec_ctx, ping->on_recv, + GRPC_ERROR_CREATE("Transport closed")); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -246,18 +236,15 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_slice_buffer_init(&t->outbuf); grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); - grpc_closure_init(&t->write_action_begin_locked, write_action_begin_locked, - t); - grpc_closure_init(&t->write_action, write_action, t); - grpc_closure_init(&t->write_action_end, write_action_end, t); - grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t); - grpc_closure_init(&t->read_action_begin, read_action_begin, t); - grpc_closure_init(&t->read_action_locked, read_action_locked, t); - grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t); - grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t); - grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t); + grpc_closure_init(&t->write_action, write_action, t, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&t->read_action_locked, read_action_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, + grpc_combiner_scheduler(t->combiner, false)); grpc_closure_init(&t->destructive_reclaimer_locked, - destructive_reclaimer_locked, t); + destructive_reclaimer_locked, t, + grpc_combiner_scheduler(t->combiner, false)); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(&t->hpack_parser); @@ -395,9 +382,10 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_combiner_execute(exec_ctx, t->combiner, - grpc_closure_create(destroy_transport_locked, t), - GRPC_ERROR_NONE, false); + grpc_closure_sched(exec_ctx, grpc_closure_create( + destroy_transport_locked, t, + grpc_combiner_scheduler(t->combiner, false)), + GRPC_ERROR_NONE); } static void close_transport_locked(grpc_exec_ctx *exec_ctx, @@ -471,8 +459,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_data_parser_init(&s->data_parser); grpc_slice_buffer_init(&s->flow_controlled_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); - grpc_closure_init(&s->complete_fetch, complete_fetch, s); - grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s); + grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s, + grpc_schedule_on_exec_ctx); GRPC_CHTTP2_REF_TRANSPORT(t, "stream"); @@ -547,9 +535,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; s->destroy_stream_arg = and_free_memory; - grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s); - grpc_combiner_execute(exec_ctx, t->combiner, &s->destroy_stream, - GRPC_ERROR_NONE, false); + grpc_closure_sched( + exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s, + grpc_combiner_scheduler(t->combiner, false)), + GRPC_ERROR_NONE); GPR_TIMER_END("destroy_stream", 0); } @@ -600,7 +589,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, write_state_name(st), reason)); t->write_state = st; if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) { - grpc_exec_ctx_enqueue_list(exec_ctx, &t->run_after_write, NULL); + grpc_closure_list_sched(exec_ctx, &t->run_after_write); if (t->close_transport_on_writes_finished != NULL) { grpc_error *err = t->close_transport_on_writes_finished; t->close_transport_on_writes_finished = NULL; @@ -618,9 +607,12 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, case GRPC_CHTTP2_WRITE_STATE_IDLE: set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason); GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - grpc_combiner_execute_finally(exec_ctx, t->combiner, - &t->write_action_begin_locked, - GRPC_ERROR_NONE, covered_by_poller); + grpc_closure_sched( + exec_ctx, + grpc_closure_init( + &t->write_action_begin_locked, write_action_begin_locked, t, + grpc_combiner_finally_scheduler(t->combiner, covered_by_poller)), + GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: set_write_state( @@ -662,7 +654,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) { set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, "begin writing"); - grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE); } else { set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); @@ -674,19 +666,13 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { grpc_chttp2_transport *t = gt; GPR_TIMER_BEGIN("write_action", 0); - grpc_endpoint_write(exec_ctx, t->ep, &t->outbuf, &t->write_action_end); + grpc_endpoint_write( + exec_ctx, t->ep, &t->outbuf, + grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t, + grpc_combiner_scheduler(t->combiner, false))); GPR_TIMER_END("write_action", 0); } -static void write_action_end(grpc_exec_ctx *exec_ctx, void *gt, - grpc_error *error) { - grpc_chttp2_transport *t = gt; - GPR_TIMER_BEGIN("write_action_end", 0); - grpc_combiner_execute(exec_ctx, t->combiner, &t->write_action_end_locked, - GRPC_ERROR_REF(error), false); - GPR_TIMER_END("write_action_end", 0); -} - static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { GPR_TIMER_BEGIN("terminate_writing_with_lock", 0); @@ -716,18 +702,24 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing [!covered]"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - grpc_combiner_execute_finally(exec_ctx, t->combiner, - &t->write_action_begin_locked, - GRPC_ERROR_NONE, false); + grpc_closure_run( + exec_ctx, + grpc_closure_init( + &t->write_action_begin_locked, write_action_begin_locked, t, + grpc_combiner_finally_scheduler(t->combiner, false)), + GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER: GPR_TIMER_MARK("state=writing_stale_with_poller", 0); set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing [covered]"); GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); - grpc_combiner_execute_finally(exec_ctx, t->combiner, - &t->write_action_begin_locked, - GRPC_ERROR_NONE, true); + grpc_closure_run( + exec_ctx, + grpc_closure_init(&t->write_action_begin_locked, + write_action_begin_locked, t, + grpc_combiner_finally_scheduler(t->combiner, true)), + GRPC_ERROR_NONE); break; } @@ -965,15 +957,6 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, } } -static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs, - grpc_error *error) { - grpc_chttp2_stream *s = gs; - grpc_chttp2_transport *t = s->t; - grpc_combiner_execute(exec_ctx, t->combiner, &s->complete_fetch_locked, - GRPC_ERROR_REF(error), - s->complete_fetch_covered_by_poller); -} - static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id, @@ -1009,7 +992,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_closure *on_complete = op->on_complete; if (on_complete == NULL) { - on_complete = grpc_closure_create(do_nothing, NULL); + on_complete = + grpc_closure_create(do_nothing, NULL, grpc_schedule_on_exec_ctx); } /* use final_data as a barrier until enqueue time; the inital counter is @@ -1212,13 +1196,15 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_free(str); } - grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked, - op); op->transport_private.args[0] = gt; op->transport_private.args[1] = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); - grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure, - GRPC_ERROR_NONE, op->covered_by_poller); + grpc_closure_sched( + exec_ctx, + grpc_closure_init( + &op->transport_private.closure, perform_stream_op_locked, op, + grpc_combiner_scheduler(t->combiner, op->covered_by_poller)), + GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op", 0); } @@ -1247,7 +1233,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_outstanding_ping *ping; for (ping = t->pings.next; ping != &t->pings; ping = ping->next) { if (0 == memcmp(opaque_8bytes, ping->id, 8)) { - grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -1321,11 +1307,12 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, char *msg = grpc_transport_op_string(op); gpr_free(msg); op->transport_private.args[0] = gt; - grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked, - op); GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); - grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure, - GRPC_ERROR_NONE, false); + grpc_closure_sched( + exec_ctx, grpc_closure_init(&op->transport_private.closure, + perform_transport_op_locked, op, + grpc_combiner_scheduler(t->combiner, false)), + GRPC_ERROR_NONE); } /******************************************************************************* @@ -1801,19 +1788,6 @@ static void update_global_window(void *args, uint32_t id, void *stream) { * INPUT PROCESSING - PARSING */ -static void read_action_begin(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { - /* Control flow: - reading_action_locked -> - (parse_unlocked -> post_parse_locked)? -> - post_reading_action_locked */ - GPR_TIMER_BEGIN("reading_action", 0); - grpc_chttp2_transport *t = tp; - grpc_combiner_execute(exec_ctx, t->combiner, &t->read_action_locked, - GRPC_ERROR_REF(error), false); - GPR_TIMER_END("reading_action", 0); -} - static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_http_parser parser; @@ -1913,7 +1887,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_slice_buffer_reset_and_unref(&t->read_buffer); if (keep_reading) { - grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_begin); + grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, + &t->read_action_locked); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); @@ -2050,10 +2025,12 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, bs->next_action.slice = slice; bs->next_action.max_size_hint = max_size_hint; bs->next_action.on_complete = on_complete; - grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked, - bs); - grpc_combiner_execute(exec_ctx, bs->transport->combiner, - &bs->next_action.closure, GRPC_ERROR_NONE, false); + grpc_closure_sched( + exec_ctx, + grpc_closure_init( + &bs->next_action.closure, incoming_byte_stream_next_locked, bs, + grpc_combiner_scheduler(bs->transport->combiner, false)), + GRPC_ERROR_NONE); GPR_TIMER_END("incoming_byte_stream_next", 0); return 0; } @@ -2075,10 +2052,12 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0); grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; - grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked, - bs); - grpc_combiner_execute(exec_ctx, bs->transport->combiner, &bs->destroy_action, - GRPC_ERROR_NONE, false); + grpc_closure_sched( + exec_ctx, + grpc_closure_init( + &bs->destroy_action, incoming_byte_stream_destroy_locked, bs, + grpc_combiner_scheduler(bs->transport->combiner, false)), + GRPC_ERROR_NONE); GPR_TIMER_END("incoming_byte_stream_destroy", 0); } @@ -2086,7 +2065,7 @@ static void incoming_byte_stream_publish_error( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error) { GPR_ASSERT(error != GRPC_ERROR_NONE); - grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); + grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error)); bs->on_next = NULL; GRPC_ERROR_UNREF(bs->error); bs->error = error; @@ -2103,7 +2082,7 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice); if (bs->on_next != NULL) { *bs->next = slice; - grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE); bs->on_next = NULL; } else { grpc_slice_buffer_add(&bs->slices, slice); @@ -2171,7 +2150,7 @@ static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); grpc_resource_user_post_reclaimer(exec_ctx, grpc_endpoint_get_resource_user(t->ep), - false, &t->benign_reclaimer); + false, &t->benign_reclaimer_locked); } } @@ -2182,24 +2161,10 @@ static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); grpc_resource_user_post_reclaimer(exec_ctx, grpc_endpoint_get_resource_user(t->ep), - true, &t->destructive_reclaimer); + true, &t->destructive_reclaimer_locked); } } -static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = arg; - grpc_combiner_execute(exec_ctx, t->combiner, &t->benign_reclaimer_locked, - GRPC_ERROR_REF(error), false); -} - -static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_transport *t = arg; - grpc_combiner_execute(exec_ctx, t->combiner, &t->destructive_reclaimer_locked, - GRPC_ERROR_REF(error), false); -} - static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = arg; @@ -2380,5 +2345,5 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_move_into(read_buffer, &t->read_buffer); gpr_free(read_buffer); } - read_action_begin(exec_ctx, t, GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 6a9200b8db..64ce07b2f7 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1634,10 +1634,11 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, however -- it might be that we receive a RST_STREAM following this and can avoid the extra write */ GRPC_CHTTP2_STREAM_REF(s, "final_rst"); - grpc_combiner_execute_finally( - exec_ctx, t->combiner, - grpc_closure_create(force_client_rst_stream, s), GRPC_ERROR_NONE, - false); + grpc_closure_sched( + exec_ctx, grpc_closure_create(force_client_rst_stream, s, + grpc_combiner_finally_scheduler( + t->combiner, false)), + GRPC_ERROR_NONE); } grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, GRPC_ERROR_NONE); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index b727965d43..a52acbacdb 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -212,10 +212,8 @@ struct grpc_chttp2_transport { grpc_closure write_action_begin_locked; grpc_closure write_action; - grpc_closure write_action_end; grpc_closure write_action_end_locked; - grpc_closure read_action_begin; grpc_closure read_action_locked; /** incoming read bytes */ @@ -336,10 +334,8 @@ struct grpc_chttp2_transport { /** have we scheduled a destructive cleanup? */ bool destructive_reclaimer_registered; /** benign cleanup closure */ - grpc_closure benign_reclaimer; grpc_closure benign_reclaimer_locked; /** destructive cleanup closure */ - grpc_closure destructive_reclaimer; grpc_closure destructive_reclaimer_locked; }; diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 4063dcaefc..48370654ca 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -955,17 +955,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, OP_RECV_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, stream_op->recv_initial_metadata_ready, - make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); } else { grpc_chttp2_incoming_metadata_buffer_publish( &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); } stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -975,22 +975,22 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_CANCELLED, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_CANCELLED); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, stream_op->recv_message_ready, - make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ CRONET_LOG(GPR_DEBUG, "read stream closed"); - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -1023,8 +1023,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, &stream_state->rs.read_slice_buffer, 0); *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1068,8 +1068,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, &stream_state->rs.read_slice_buffer, 0); *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; /* Do an extra read to trigger on_succeeded() callback in case connection @@ -1115,18 +1115,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, OP_ON_COMPLETE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, - GRPC_ERROR_REF(stream_state->cancel_error), NULL); + grpc_closure_sched(exec_ctx, stream_op->on_complete, + GRPC_ERROR_REF(stream_state->cancel_error)); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_exec_ctx_sched( + grpc_closure_sched( exec_ctx, stream_op->on_complete, - make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); } else { /* All actions in this stream_op are complete. Call the on_complete * callback */ - grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE, - NULL); + grpc_closure_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE); } oas->state.state_op_done[OP_ON_COMPLETE] = true; oas->done = true; |