diff options
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/chttp2_transport.cc')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 1098 |
1 files changed, 475 insertions, 623 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 9462d1085e..6f8493b58c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -100,105 +100,77 @@ grpc_tracer_flag grpc_trace_chttp2_refcount = #endif /* forward declarations of various callbacks that we'll build closures around */ -static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); -static void write_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); -static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); +static void write_action_begin_locked(void *t, grpc_error *error); +static void write_action(void *t, grpc_error *error); +static void write_action_end_locked(void *t, grpc_error *error); -static void read_action_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); +static void read_action_locked(void *t, grpc_error *error); -static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, - grpc_error *error); +static void complete_fetch_locked(void *gs, grpc_error *error); /** Set a transport level setting, and push it to our peer */ -static void queue_setting_update(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +static void queue_setting_update(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value); -static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, grpc_error *error); +static void close_from_api(grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_error *error); /** Start new streams that have been created if we can */ -static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t); +static void maybe_start_some_streams(grpc_chttp2_transport *t); -static void connectivity_state_set(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +static void connectivity_state_set(grpc_chttp2_transport *t, grpc_connectivity_state state, grpc_error *error, const char *reason); -static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, - void *byte_stream, +static void incoming_byte_stream_destroy_locked(void *byte_stream, grpc_error *error_ignored); static void incoming_byte_stream_publish_error( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error); -static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs); - -static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); -static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); - -static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t); -static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t); - -static void close_transport_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_error *error); -static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_error *error); - -static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t); -static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error); -static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error); -static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, - void *tp, grpc_error *error); - -static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_error *error); -static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_incoming_byte_stream *bs, grpc_error *error); +static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs); + +static void benign_reclaimer_locked(void *t, grpc_error *error); +static void destructive_reclaimer_locked(void *t, grpc_error *error); + +static void post_benign_reclaimer(grpc_chttp2_transport *t); +static void post_destructive_reclaimer(grpc_chttp2_transport *t); + +static void close_transport_locked(grpc_chttp2_transport *t, grpc_error *error); +static void end_all_the_calls(grpc_chttp2_transport *t, grpc_error *error); + +static void schedule_bdp_ping_locked(grpc_chttp2_transport *t); +static void start_bdp_ping_locked(void *tp, grpc_error *error); +static void finish_bdp_ping_locked(void *tp, grpc_error *error); +static void next_bdp_ping_timer_expired_locked(void *tp, grpc_error *error); + +static void cancel_pings(grpc_chttp2_transport *t, grpc_error *error); +static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_initiate, grpc_closure *on_complete); -static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error); +static void retry_initiate_ping_locked(void *tp, grpc_error *error); /** keepalive-relevant functions */ -static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); - -static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); +static void init_keepalive_ping_locked(void *arg, grpc_error *error); +static void start_keepalive_ping_locked(void *arg, grpc_error *error); +static void finish_keepalive_ping_locked(void *arg, grpc_error *error); +static void keepalive_watchdog_fired_locked(void *arg, grpc_error *error); + +static void reset_byte_stream(void *arg, grpc_error *error); /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ -static void destruct_transport(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +static void destruct_transport(grpc_chttp2_transport *t) { size_t i; - grpc_endpoint_destroy(exec_ctx, t->ep); + grpc_endpoint_destroy(t->ep); - grpc_slice_buffer_destroy_internal(exec_ctx, &t->qbuf); + grpc_slice_buffer_destroy_internal(&t->qbuf); - grpc_slice_buffer_destroy_internal(exec_ctx, &t->outbuf); - grpc_chttp2_hpack_compressor_destroy(exec_ctx, &t->hpack_compressor); + grpc_slice_buffer_destroy_internal(&t->outbuf); + grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor); - grpc_slice_buffer_destroy_internal(exec_ctx, &t->read_buffer); - grpc_chttp2_hpack_parser_destroy(exec_ctx, &t->hpack_parser); + grpc_slice_buffer_destroy_internal(&t->read_buffer); + grpc_chttp2_hpack_parser_destroy(&t->hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->goaway_parser); for (i = 0; i < STREAM_LIST_COUNT; i++) { @@ -209,12 +181,11 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0); grpc_chttp2_stream_map_destroy(&t->stream_map); - grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); + grpc_connectivity_state_destroy(&t->channel_callback.state_tracker); - GRPC_COMBINER_UNREF(exec_ctx, t->combiner, "chttp2_transport"); + GRPC_COMBINER_UNREF(t->combiner, "chttp2_transport"); - cancel_pings(exec_ctx, t, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); + cancel_pings(t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed")); while (t->write_cb_pool) { grpc_chttp2_write_cb *next = t->write_cb_pool->next; @@ -231,8 +202,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, } #ifndef NDEBUG -void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, const char *reason, +void grpc_chttp2_unref_transport(grpc_chttp2_transport *t, const char *reason, const char *file, int line) { if (GRPC_TRACER_ON(grpc_trace_chttp2_refcount)) { gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count); @@ -240,7 +210,7 @@ void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, t, val, val - 1, reason, file, line); } if (!gpr_unref(&t->refs)) return; - destruct_transport(exec_ctx, t); + destruct_transport(t); } void grpc_chttp2_ref_transport(grpc_chttp2_transport *t, const char *reason, @@ -253,10 +223,9 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t, const char *reason, gpr_ref(&t->refs); } #else -void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +void grpc_chttp2_unref_transport(grpc_chttp2_transport *t) { if (!gpr_unref(&t->refs)) return; - destruct_transport(exec_ctx, t); + destruct_transport(t); } void grpc_chttp2_ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } @@ -264,7 +233,7 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } static const grpc_transport_vtable *get_vtable(void); -static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, +static void init_transport(grpc_chttp2_transport *t, const grpc_channel_args *channel_args, grpc_endpoint *ep, bool is_client) { size_t i; @@ -328,7 +297,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->flow_control.bdp_estimator.Init(t->peer_string); grpc_chttp2_goaway_parser_init(&t->goaway_parser); - grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); + grpc_chttp2_hpack_parser_init(&t->hpack_parser); grpc_slice_buffer_init(&t->read_buffer); @@ -360,14 +329,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* configure http2 the way we like it */ if (is_client) { - queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); - queue_setting_update(exec_ctx, t, - GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); + queue_setting_update(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); + queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } - queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); - queue_setting_update(exec_ctx, t, - GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1); + queue_setting_update(t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, + 1); t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; t->ping_policy.min_sent_ping_interval_without_data = @@ -541,7 +509,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, int value = grpc_channel_arg_get_integer( &channel_args->args[i], settings_map[j].integer_options); if (value >= 0) { - queue_setting_update(exec_ctx, t, settings_map[j].setting_id, + queue_setting_update(t, settings_map[j].setting_id, (uint32_t)value); } } @@ -563,8 +531,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, - grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + grpc_timer_init(&t->keepalive_ping_timer, + grpc_exec_ctx_now() + t->keepalive_time, &t->init_keepalive_ping_locked); } else { /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no @@ -574,44 +542,37 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (t->flow_control.enable_bdp_probe) { GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); - schedule_bdp_ping_locked(exec_ctx, t); + schedule_bdp_ping_locked(t); } grpc_chttp2_act_on_flowctl_action( - exec_ctx, - grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, - NULL); + grpc_chttp2_flowctl_get_action(&t->flow_control, NULL), t, NULL); - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); - post_benign_reclaimer(exec_ctx, t); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); + post_benign_reclaimer(t); } -static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { +static void destroy_transport_locked(void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; t->destroying = 1; close_transport_locked( - exec_ctx, t, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), - GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destroy"); + t, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"), + GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state)); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); } -static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { +static void destroy_transport(grpc_transport *gt) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - GRPC_CLOSURE_SCHED(exec_ctx, - GRPC_CLOSURE_CREATE(destroy_transport_locked, t, + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); } -static void close_transport_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +static void close_transport_locked(grpc_chttp2_transport *t, grpc_error *error) { - end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); - cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); + end_all_the_calls(t, GRPC_ERROR_REF(error)); + cancel_pings(t, GRPC_ERROR_REF(error)); if (t->closed_with_error == GRPC_ERROR_NONE) { if (!grpc_error_has_clear_grpc_status(error)) { error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, @@ -629,21 +590,22 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, } GPR_ASSERT(error != GRPC_ERROR_NONE); t->closed_with_error = GRPC_ERROR_REF(error); - connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_REF(error), "close_transport"); + + connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), + "close_transport"); if (t->ping_state.is_delayed_ping_timer_set) { - grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer); + grpc_timer_cancel(&t->ping_state.delayed_ping_timer); } if (t->have_next_bdp_ping_timer) { - grpc_timer_cancel(exec_ctx, &t->next_bdp_ping_timer); + grpc_timer_cancel(&t->next_bdp_ping_timer); } switch (t->keepalive_state) { case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: - grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); + grpc_timer_cancel(&t->keepalive_ping_timer); break; case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: - grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); - grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); + grpc_timer_cancel(&t->keepalive_ping_timer); + grpc_timer_cancel(&t->keepalive_watchdog_timer); break; case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED: @@ -654,10 +616,11 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_writable_stream(t, &s)) { - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); + GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close"); } + GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE); - grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); + grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } @@ -666,22 +629,21 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, void grpc_chttp2_stream_ref(grpc_chttp2_stream *s, const char *reason) { grpc_stream_ref(s->refcount, reason); } -void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s, - const char *reason) { - grpc_stream_unref(exec_ctx, s->refcount, reason); +void grpc_chttp2_stream_unref(grpc_chttp2_stream *s, const char *reason) { + grpc_stream_unref(s->refcount, reason); } #else void grpc_chttp2_stream_ref(grpc_chttp2_stream *s) { grpc_stream_ref(s->refcount); } -void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { - grpc_stream_unref(exec_ctx, s->refcount); +void grpc_chttp2_stream_unref(grpc_chttp2_stream *s) { + grpc_stream_unref(s->refcount); } #endif -static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data, gpr_arena *arena) { +static int init_stream(grpc_transport *gt, grpc_stream *gs, + grpc_stream_refcount *refcount, const void *server_data, + gpr_arena *arena) { GPR_TIMER_BEGIN("init_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; @@ -715,7 +677,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->id = (uint32_t)(uintptr_t)server_data; *t->accepting_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); - post_destructive_reclaimer(exec_ctx, t); + post_destructive_reclaimer(t); } s->flow_control.s = s; @@ -724,8 +686,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, return 0; } -static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, - grpc_error *error) { +static void destroy_stream_locked(void *sp, grpc_error *error) { grpc_chttp2_stream *s = (grpc_chttp2_stream *)sp; grpc_chttp2_transport *t = s->t; @@ -736,11 +697,10 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL); } - grpc_slice_buffer_destroy_internal(exec_ctx, - &s->unprocessed_incoming_frames_buffer); - grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); - grpc_slice_buffer_destroy_internal(exec_ctx, &s->compressed_data_buffer); - grpc_slice_buffer_destroy_internal(exec_ctx, &s->decompressed_data_buffer); + grpc_slice_buffer_destroy_internal(&s->unprocessed_incoming_frames_buffer); + grpc_slice_buffer_destroy_internal(&s->frame_storage); + grpc_slice_buffer_destroy_internal(&s->compressed_data_buffer); + grpc_slice_buffer_destroy_internal(&s->decompressed_data_buffer); grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s); @@ -759,27 +719,24 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_ASSERT(s->recv_initial_metadata_ready == NULL); GPR_ASSERT(s->recv_message_ready == NULL); GPR_ASSERT(s->recv_trailing_metadata_finished == NULL); - grpc_chttp2_data_parser_destroy(exec_ctx, &s->data_parser); - grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx, - &s->metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx, - &s->metadata_buffer[1]); - grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer); + grpc_chttp2_data_parser_destroy(&s->data_parser); + grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]); + grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]); + grpc_slice_buffer_destroy_internal(&s->flow_controlled_buffer); GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->write_closed_error); GRPC_ERROR_UNREF(s->byte_stream_error); grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control); - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); GPR_TIMER_END("destroy_stream", 0); - GRPC_CLOSURE_SCHED(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(s->destroy_stream_arg, GRPC_ERROR_NONE); } -static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, +static void destroy_stream(grpc_transport *gt, grpc_stream *gs, grpc_closure *then_schedule_closure) { GPR_TIMER_BEGIN("destroy_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; @@ -796,8 +753,8 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->destroy_stream_arg = then_schedule_closure; GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, - grpc_combiner_scheduler(t->combiner)), + GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, + grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("destroy_stream", 0); } @@ -807,8 +764,7 @@ grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t, return (grpc_chttp2_stream *)grpc_chttp2_stream_map_find(&t->stream_map, id); } -grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport *t, uint32_t id) { if (t->channel_callback.accept_stream == NULL) { return NULL; @@ -816,8 +772,7 @@ grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *accepting; GPR_ASSERT(t->accepting_stream == NULL); t->accepting_stream = &accepting; - t->channel_callback.accept_stream(exec_ctx, - t->channel_callback.accept_stream_user_data, + t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data, &t->base, (void *)(uintptr_t)id); t->accepting_stream = NULL; return accepting; @@ -839,7 +794,7 @@ static const char *write_state_name(grpc_chttp2_write_state st) { GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, +static void set_write_state(grpc_chttp2_transport *t, grpc_chttp2_write_state st, const char *reason) { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s [%s]", t, t->is_client ? "CLIENT" : "SERVER", @@ -847,108 +802,100 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, write_state_name(st), reason)); t->write_state = st; if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) { - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &t->run_after_write); + GRPC_CLOSURE_LIST_SCHED(&t->run_after_write); if (t->close_transport_on_writes_finished != NULL) { grpc_error *err = t->close_transport_on_writes_finished; t->close_transport_on_writes_finished = NULL; - close_transport_locked(exec_ctx, t, err); + close_transport_locked(t, err); } } } static void inc_initiate_write_reason( - grpc_exec_ctx *exec_ctx, grpc_chttp2_initiate_write_reason reason) { + grpc_chttp2_initiate_write_reason reason) { switch (reason) { case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(); break; case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(); break; case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(); break; case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA( - exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA(); break; case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA( - exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA(); break; case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(); break; case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(); break; case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(); break; case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(); break; case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(); break; case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL(); break; case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL( - exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL(); break; case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(); break; case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING( - exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING(); break; case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE( - exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE(); break; case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(); break; case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(); break; case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED( - exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED(); break; case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(); break; case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(exec_ctx); + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(); break; } } -void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +void grpc_chttp2_initiate_write(grpc_chttp2_transport *t, grpc_chttp2_initiate_write_reason reason) { GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0); switch (t->write_state) { case GRPC_CHTTP2_WRITE_STATE_IDLE: - inc_initiate_write_reason(exec_ctx, reason); - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, + inc_initiate_write_reason(reason); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, grpc_chttp2_initiate_write_reason_string(reason)); t->is_first_write_in_batch = true; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&t->write_action_begin_locked, write_action_begin_locked, t, grpc_combiner_finally_scheduler(t->combiner)), GRPC_ERROR_NONE); break; case GRPC_CHTTP2_WRITE_STATE_WRITING: - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, grpc_chttp2_initiate_write_reason_string(reason)); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: @@ -957,8 +904,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("grpc_chttp2_initiate_write", 0); } -void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { if (t->closed_with_error == GRPC_ERROR_NONE && grpc_chttp2_list_add_writable_stream(t, s)) { @@ -1008,8 +954,7 @@ static const char *begin_writing_desc(bool partial, bool inlined) { GPR_UNREACHABLE_CODE(return "bad state tuple"); } -static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, - grpc_error *error_ignored) { +static void write_action_begin_locked(void *gt, grpc_error *error_ignored) { GPR_TIMER_BEGIN("write_action_begin_locked", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); @@ -1017,60 +962,58 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, if (t->closed_with_error != GRPC_ERROR_NONE) { r.writing = false; } else { - r = grpc_chttp2_begin_write(exec_ctx, t); + r = grpc_chttp2_begin_write(t); } if (r.writing) { if (r.partial) { - GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(exec_ctx); + GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(); } if (!t->is_first_write_in_batch) { - GRPC_STATS_INC_HTTP2_WRITES_CONTINUED(exec_ctx); + GRPC_STATS_INC_HTTP2_WRITES_CONTINUED(); } grpc_closure_scheduler *scheduler = write_scheduler(t, r.early_results_scheduled, r.partial); if (scheduler != grpc_schedule_on_exec_ctx) { - GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED(exec_ctx); + GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED(); } set_write_state( - exec_ctx, t, r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE - : GRPC_CHTTP2_WRITE_STATE_WRITING, + t, r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE + : GRPC_CHTTP2_WRITE_STATE_WRITING, begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx)); - GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_INIT(&t->write_action, - write_action, t, scheduler), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler), + GRPC_ERROR_NONE); } else { - GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(exec_ctx); - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, - "begin writing nothing"); - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); + GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); } GPR_TIMER_END("write_action_begin_locked", 0); } -static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) { +static void write_action(void *gt, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; GPR_TIMER_BEGIN("write_action", 0); grpc_endpoint_write( - exec_ctx, t->ep, &t->outbuf, + t->ep, &t->outbuf, GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, grpc_combiner_scheduler(t->combiner))); GPR_TIMER_END("write_action", 0); } -static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { +static void write_action_end_locked(void *tp, grpc_error *error) { GPR_TIMER_BEGIN("terminate_writing_with_lock", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; if (error != GRPC_ERROR_NONE) { - close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); + close_transport_locked(t, GRPC_ERROR_REF(error)); } if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { close_transport_locked( - exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); + t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent")); } } @@ -1079,17 +1022,14 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_UNREACHABLE_CODE(break); case GRPC_CHTTP2_WRITE_STATE_WRITING: GPR_TIMER_MARK("state=writing", 0); - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, - "finish writing"); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: GPR_TIMER_MARK("state=writing_stale_no_poller", 0); - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, - "continue writing"); + set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); t->is_first_write_in_batch = false; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CLOSURE_RUN( - exec_ctx, GRPC_CLOSURE_INIT(&t->write_action_begin_locked, write_action_begin_locked, t, grpc_combiner_finally_scheduler(t->combiner)), @@ -1097,16 +1037,15 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp, break; } - grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error)); + grpc_chttp2_end_write(t, GRPC_ERROR_REF(error)); - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); GPR_TIMER_END("terminate_writing_with_lock", 0); } // Dirties an HTTP2 setting to be sent out next time a writing path occurs. // If the change needs to occur immediately, manually initiate a write. -static void queue_setting_update(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +static void queue_setting_update(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value) { const grpc_chttp2_setting_parameters *sp = &grpc_chttp2_settings_parameters[id]; @@ -1121,8 +1060,7 @@ static void queue_setting_update(grpc_exec_ctx *exec_ctx, } } -void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport *t, uint32_t goaway_error, grpc_slice goaway_text) { // GRPC_CHTTP2_IF_TRACING( @@ -1149,7 +1087,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, /* lie: use transient failure from the transport to indicate goaway has been * received */ connectivity_state_set( - exec_ctx, t, GRPC_CHANNEL_TRANSIENT_FAILURE, + t, GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_set_str( grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"), @@ -1158,8 +1096,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, "got_goaway"); } -static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +static void maybe_start_some_streams(grpc_chttp2_transport *t) { grpc_chttp2_stream *s; /* start streams where we have free grpc_chttp2_stream ids and free * concurrency */ @@ -1179,25 +1116,23 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) { connectivity_state_set( - exec_ctx, t, GRPC_CHANNEL_TRANSIENT_FAILURE, + t, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"), "no_more_stream_ids"); } grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); - post_destructive_reclaimer(exec_ctx, t); - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM); + post_destructive_reclaimer(t); + grpc_chttp2_mark_stream_writable(t, s); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM); } /* cancel out streams that will never be started */ while (t->next_stream_id >= MAX_CLIENT_STREAM_ID && grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) { grpc_chttp2_cancel_stream( - exec_ctx, t, s, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + t, s, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } @@ -1216,15 +1151,13 @@ static grpc_closure *add_closure_barrier(grpc_closure *closure) { return closure; } -static void null_then_run_closure(grpc_exec_ctx *exec_ctx, - grpc_closure **closure, grpc_error *error) { +static void null_then_run_closure(grpc_closure **closure, grpc_error *error) { grpc_closure *c = *closure; *closure = NULL; - GRPC_CLOSURE_RUN(exec_ctx, c, error); + GRPC_CLOSURE_RUN(c, error); } -void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +void grpc_chttp2_complete_closure_step(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_closure **pclosure, grpc_error *error, const char *desc) { @@ -1264,7 +1197,7 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, } if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) || !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) { - GRPC_CLOSURE_RUN(exec_ctx, closure, closure->error_data.error); + GRPC_CLOSURE_RUN(closure, closure->error_data.error); } else { grpc_closure_list_append(&t->run_after_write, closure, closure->error_data.error); @@ -1280,28 +1213,24 @@ static bool contains_non_ok_status(grpc_metadata_batch *batch) { return false; } -static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length > t->write_buffer_size)) { - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE); + grpc_chttp2_mark_stream_writable(t, s); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE); } } -static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +static void add_fetched_slice_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { s->fetched_send_message_length += (uint32_t)GRPC_SLICE_LENGTH(s->fetching_slice); grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); - maybe_become_writable_due_to_send_msg(exec_ctx, t, s); + maybe_become_writable_due_to_send_msg(t, s); } -static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +static void continue_fetching_send_locked(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { for (;;) { if (s->fetching_send_message == NULL) { @@ -1310,11 +1239,11 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; /* early out */ } if (s->fetched_send_message_length == s->fetching_send_message->length) { - grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); + grpc_byte_stream_destroy(s->fetching_send_message); int64_t notify_offset = s->next_message_end_offset; if (notify_offset <= s->flow_controlled_bytes_written) { grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, + t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, "fetching_send_message_finished"); } else { grpc_chttp2_write_cb *cb = t->write_cb_pool; @@ -1335,39 +1264,37 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, } s->fetching_send_message = NULL; return; /* early out */ - } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, - UINT32_MAX, &s->complete_fetch_locked)) { - grpc_error *error = grpc_byte_stream_pull( - exec_ctx, s->fetching_send_message, &s->fetching_slice); + } else if (grpc_byte_stream_next(s->fetching_send_message, UINT32_MAX, + &s->complete_fetch_locked)) { + grpc_error *error = + grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice); if (error != GRPC_ERROR_NONE) { - grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); - grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_byte_stream_destroy(s->fetching_send_message); + grpc_chttp2_cancel_stream(t, s, error); } else { - add_fetched_slice_locked(exec_ctx, t, s); + add_fetched_slice_locked(t, s); } } } } -static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, - grpc_error *error) { +static void complete_fetch_locked(void *gs, grpc_error *error) { grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; grpc_chttp2_transport *t = s->t; if (error == GRPC_ERROR_NONE) { - error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, - &s->fetching_slice); + error = grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice); if (error == GRPC_ERROR_NONE) { - add_fetched_slice_locked(exec_ctx, t, s); - continue_fetching_send_locked(exec_ctx, t, s); + add_fetched_slice_locked(t, s); + continue_fetching_send_locked(t, s); } } if (error != GRPC_ERROR_NONE) { - grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); - grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_byte_stream_destroy(s->fetching_send_message); + grpc_chttp2_cancel_stream(t, s, error); } } -static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +static void do_nothing(void *arg, grpc_error *error) {} static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id, bool is_client, bool is_initial) { @@ -1382,7 +1309,7 @@ static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id, } } -static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, +static void perform_stream_op_locked(void *stream_op, grpc_error *error_ignored) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); @@ -1392,7 +1319,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_transport_stream_op_batch_payload *op_payload = op->payload; grpc_chttp2_transport *t = s->t; - GRPC_STATS_INC_HTTP2_OP_BATCHES(exec_ctx); + GRPC_STATS_INC_HTTP2_OP_BATCHES(); if (GRPC_TRACER_ON(grpc_http_trace)) { char *str = grpc_transport_stream_op_batch_string(op); @@ -1427,13 +1354,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (op->cancel_stream) { - GRPC_STATS_INC_HTTP2_OP_CANCEL(exec_ctx); - grpc_chttp2_cancel_stream(exec_ctx, t, s, - op_payload->cancel_stream.cancel_error); + GRPC_STATS_INC_HTTP2_OP_CANCEL(); + grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error); } if (op->send_initial_metadata) { - GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA(exec_ctx); + GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA(); GPR_ASSERT(s->send_initial_metadata_finished == NULL); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; @@ -1461,7 +1387,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (metadata_size > metadata_peer_limit) { grpc_chttp2_cancel_stream( - exec_ctx, t, s, + t, s, grpc_error_set_int( grpc_error_set_int( grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -1480,29 +1406,28 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (t->closed_with_error == GRPC_ERROR_NONE) { GPR_ASSERT(s->id == 0); grpc_chttp2_list_add_waiting_for_concurrency(t, s); - maybe_start_some_streams(exec_ctx, t); + maybe_start_some_streams(t); } else { grpc_chttp2_cancel_stream( - exec_ctx, t, s, - grpc_error_set_int( - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Transport closed", &t->closed_with_error, 1), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + t, s, grpc_error_set_int( + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Transport closed", &t->closed_with_error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } else { GPR_ASSERT(s->id != 0); - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_mark_stream_writable(t, s); if (!(op->send_message && (op->payload->send_message.send_message->flags & GRPC_WRITE_BUFFER_HINT))) { grpc_chttp2_initiate_write( - exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); + t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); } } } else { s->send_initial_metadata = NULL; grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->send_initial_metadata_finished, + t, s, &s->send_initial_metadata_finished, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Attempt to send initial metadata after stream was closed", &s->write_closed_error, 1), @@ -1516,9 +1441,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (op->send_message) { - GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(exec_ctx); + GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(); GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE( - exec_ctx, op->payload->send_message.send_message->length); + op->payload->send_message.send_message->length); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->fetching_send_message_finished = add_closure_barrier(op->on_complete); if (s->write_closed) { @@ -1528,7 +1453,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, // recv_message failure, breaking out of its loop, and then // starting recv_trailing_metadata. grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->fetching_send_message_finished, + t, s, &s->fetching_send_message_finished, t->is_client && s->received_trailing_metadata ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( @@ -1557,13 +1482,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else { s->write_buffering = false; } - continue_fetching_send_locked(exec_ctx, t, s); - maybe_become_writable_due_to_send_msg(exec_ctx, t, s); + continue_fetching_send_locked(t, s); + maybe_become_writable_due_to_send_msg(t, s); } } if (op->send_trailing_metadata) { - GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA(exec_ctx); + GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA(); GPR_ASSERT(s->send_trailing_metadata_finished == NULL); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->send_trailing_metadata_finished = add_closure_barrier(on_complete); @@ -1577,7 +1502,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; if (metadata_size > metadata_peer_limit) { grpc_chttp2_cancel_stream( - exec_ctx, t, s, + t, s, grpc_error_set_int( grpc_error_set_int( grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -1594,7 +1519,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (s->write_closed) { s->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->send_trailing_metadata_finished, + t, s, &s->send_trailing_metadata_finished, grpc_metadata_batch_is_empty( op->payload->send_trailing_metadata.send_trailing_metadata) ? GRPC_ERROR_NONE @@ -1605,15 +1530,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else if (s->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_mark_stream_writable(t, s); grpc_chttp2_initiate_write( - exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA); + t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA); } } } if (op->recv_initial_metadata) { - GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA(exec_ctx); + GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA(); GPR_ASSERT(s->recv_initial_metadata_ready == NULL); s->recv_initial_metadata_ready = op_payload->recv_initial_metadata.recv_initial_metadata_ready; @@ -1625,11 +1550,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string, (gpr_atm)gpr_strdup(t->peer_string)); } - grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_initial_metadata(t, s); } if (op->recv_message) { - GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE(exec_ctx); + GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE(); size_t already_received; GPR_ASSERT(s->recv_message_ready == NULL); GPR_ASSERT(!s->pending_byte_stream); @@ -1642,33 +1567,31 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, &t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES, already_received); grpc_chttp2_act_on_flowctl_action( - exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, - &s->flow_control), + grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control), t, s); } } - grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_message(t, s); } if (op->recv_trailing_metadata) { - GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA(exec_ctx); + GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA(); GPR_ASSERT(s->recv_trailing_metadata_finished == NULL); s->recv_trailing_metadata_finished = add_closure_barrier(on_complete); s->recv_trailing_metadata = op_payload->recv_trailing_metadata.recv_trailing_metadata; s->final_metadata_requested = true; - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); } - grpc_chttp2_complete_closure_step(exec_ctx, t, s, &on_complete, - GRPC_ERROR_NONE, "op->on_complete"); + grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE, + "op->on_complete"); GPR_TIMER_END("perform_stream_op_locked", 0); - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "perform_stream_op"); + GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op"); } -static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, +static void perform_stream_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op_batch *op) { GPR_TIMER_BEGIN("perform_stream_op", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; @@ -1696,32 +1619,30 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, op->handler_private.extra_arg = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked, op, grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op", 0); } -static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_error *error) { +static void cancel_pings(grpc_chttp2_transport *t, grpc_error *error) { /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ grpc_chttp2_ping_queue *pq = &t->ping_queue; GPR_ASSERT(error != GRPC_ERROR_NONE); for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]); + GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]); } GRPC_ERROR_UNREF(error); } -static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, +static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_initiate, grpc_closure *on_ack) { if (t->closed_with_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(exec_ctx, on_initiate, - GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_SCHED(exec_ctx, on_ack, GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error)); + return; } grpc_chttp2_ping_queue *pq = &t->ping_queue; @@ -1731,18 +1652,15 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_NONE); } -static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { +static void retry_initiate_ping_locked(void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; t->ping_state.is_delayed_ping_timer_set = false; if (error == GRPC_ERROR_NONE) { - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); } } -void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - uint64_t id) { +void grpc_chttp2_ack_ping(grpc_chttp2_transport *t, uint64_t id) { grpc_chttp2_ping_queue *pq = &t->ping_queue; if (pq->inflight_id != id) { char *from = grpc_endpoint_get_peer(t->ep); @@ -1750,46 +1668,41 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_free(from); return; } - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); + GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); } } -static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_error *error) { +static void send_goaway(grpc_chttp2_transport *t, grpc_error *error) { t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED; grpc_http2_error_code http_error; grpc_slice slice; - grpc_error_get_status(exec_ctx, error, GRPC_MILLIS_INF_FUTURE, NULL, &slice, + grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, NULL, &slice, &http_error); grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error, grpc_slice_ref_internal(slice), &t->qbuf); - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); GRPC_ERROR_UNREF(error); } -void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +void grpc_chttp2_add_ping_strike(grpc_chttp2_transport *t) { t->ping_recv_state.ping_strikes++; if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes && t->ping_policy.max_ping_strikes != 0) { - send_goaway(exec_ctx, t, + send_goaway(t, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"), GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); /*The transport will be closed after the write is done */ close_transport_locked( - exec_ctx, t, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + t, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } -static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, - void *stream_op, +static void perform_transport_op_locked(void *stream_op, grpc_error *error_ignored) { grpc_transport_op *op = (grpc_transport_op *)stream_op; grpc_chttp2_transport *t = @@ -1797,7 +1710,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, grpc_error *close_transport = op->disconnect_with_error; if (op->goaway_error) { - send_goaway(exec_ctx, t, op->goaway_error); + send_goaway(t, op->goaway_error); } if (op->set_accept_stream) { @@ -1807,43 +1720,40 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->bind_pollset) { - grpc_endpoint_add_to_pollset(exec_ctx, t->ep, op->bind_pollset); + grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset); } if (op->bind_pollset_set) { - grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set); + grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set); } if (op->send_ping) { - send_ping_locked(exec_ctx, t, NULL, op->send_ping); - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); + send_ping_locked(t, NULL, op->send_ping); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); } if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( - exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, + &t->channel_callback.state_tracker, op->connectivity_state, op->on_connectivity_state_change); } if (close_transport != GRPC_ERROR_NONE) { - close_transport_locked(exec_ctx, t, close_transport); + close_transport_locked(t, close_transport); } - GRPC_CLOSURE_RUN(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE); - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "transport_op"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op"); } -static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_transport_op *op) { +static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; char *msg = grpc_transport_op_string(op); gpr_free(msg); op->handler_private.extra_arg = gt; GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); - GRPC_CLOSURE_SCHED(exec_ctx, - GRPC_CLOSURE_INIT(&op->handler_private.closure, + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_transport_op_locked, op, grpc_combiner_scheduler(t->combiner)), GRPC_ERROR_NONE); @@ -1853,36 +1763,33 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * INPUT PROCESSING - GENERAL */ -void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { if (s->recv_initial_metadata_ready != NULL && s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) { if (s->seen_error) { - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); if (!s->pending_byte_stream) { grpc_slice_buffer_reset_and_unref_internal( - exec_ctx, &s->unprocessed_incoming_frames_buffer); + &s->unprocessed_incoming_frames_buffer); } } - grpc_chttp2_incoming_metadata_buffer_publish( - exec_ctx, &s->metadata_buffer[0], s->recv_initial_metadata); - null_then_run_closure(exec_ctx, &s->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0], + s->recv_initial_metadata); + null_then_run_closure(&s->recv_initial_metadata_ready, GRPC_ERROR_NONE); } } -void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { grpc_error *error = GRPC_ERROR_NONE; if (s->recv_message_ready != NULL) { *s->recv_message = NULL; if (s->final_metadata_requested && s->seen_error) { - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); if (!s->pending_byte_stream) { grpc_slice_buffer_reset_and_unref_internal( - exec_ctx, &s->unprocessed_incoming_frames_buffer); + &s->unprocessed_incoming_frames_buffer); } } if (!s->pending_byte_stream) { @@ -1909,10 +1816,9 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, &s->decompressed_data_buffer, NULL, GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes, &end_of_context)) { - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, - &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); grpc_slice_buffer_reset_and_unref_internal( - exec_ctx, &s->unprocessed_incoming_frames_buffer); + &s->unprocessed_incoming_frames_buffer); error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Stream decompression error."); } else { @@ -1921,8 +1827,8 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, s->decompressed_header_bytes = 0; } error = grpc_deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, s, &s->decompressed_data_buffer, - NULL, s->recv_message); + &s->data_parser, s, &s->decompressed_data_buffer, NULL, + s->recv_message); if (end_of_context) { grpc_stream_compression_context_destroy( s->stream_decompression_ctx); @@ -1931,15 +1837,14 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, } } else { error = grpc_deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, s, - &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message); + &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, NULL, + s->recv_message); } if (error != GRPC_ERROR_NONE) { s->seen_error = true; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, - &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); grpc_slice_buffer_reset_and_unref_internal( - exec_ctx, &s->unprocessed_incoming_frames_buffer); + &s->unprocessed_incoming_frames_buffer); break; } else if (*s->recv_message != NULL) { break; @@ -1947,26 +1852,25 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, } } if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) { - null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); + null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE); } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) { *s->recv_message = NULL; - null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); + null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE); } GRPC_ERROR_UNREF(error); } } -void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_message(t, s); if (s->recv_trailing_metadata_finished != NULL && s->read_closed && s->write_closed) { if (s->seen_error) { - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); if (!s->pending_byte_stream) { grpc_slice_buffer_reset_and_unref_internal( - exec_ctx, &s->unprocessed_incoming_frames_buffer); + &s->unprocessed_incoming_frames_buffer); } } bool pending_data = s->pending_byte_stream || @@ -1984,9 +1888,9 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, &s->frame_storage, &s->unprocessed_incoming_frames_buffer, NULL, GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) { - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); + grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage); grpc_slice_buffer_reset_and_unref_internal( - exec_ctx, &s->unprocessed_incoming_frames_buffer); + &s->unprocessed_incoming_frames_buffer); s->seen_error = true; } else { if (s->unprocessed_incoming_frames_buffer.length > 0) { @@ -2001,23 +1905,23 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } if (s->read_closed && s->frame_storage.length == 0 && !pending_data && s->recv_trailing_metadata_finished != NULL) { - grpc_chttp2_incoming_metadata_buffer_publish( - exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata); + grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1], + s->recv_trailing_metadata); grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE, + t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE, "recv_trailing_metadata_finished"); } } } -static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - uint32_t id, grpc_error *error) { +static void remove_stream(grpc_chttp2_transport *t, uint32_t id, + grpc_error *error) { grpc_chttp2_stream *s = (grpc_chttp2_stream *)grpc_chttp2_stream_map_delete(&t->stream_map, id); GPR_ASSERT(s); if (t->incoming_stream == s) { t->incoming_stream = NULL; - grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + grpc_chttp2_parsing_become_skip_parser(t); } if (s->pending_byte_stream) { if (s->on_next != NULL) { @@ -2025,8 +1929,8 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); } - incoming_byte_stream_publish_error(exec_ctx, bs, error); - incoming_byte_stream_unref(exec_ctx, bs); + incoming_byte_stream_publish_error(bs, error); + incoming_byte_stream_unref(bs); s->data_parser.parsing_frame = NULL; } else { GRPC_ERROR_UNREF(s->byte_stream_error); @@ -2035,55 +1939,51 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { - post_benign_reclaimer(exec_ctx, t); + post_benign_reclaimer(t); if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) { close_transport_locked( - exec_ctx, t, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Last stream closed after sending GOAWAY", &error, 1)); + t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Last stream closed after sending GOAWAY", &error, 1)); } } if (grpc_chttp2_list_remove_writable_stream(t, s)) { - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream"); + GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream"); } GRPC_ERROR_UNREF(error); - maybe_start_some_streams(exec_ctx, t); + maybe_start_some_streams(t); } -void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, +void grpc_chttp2_cancel_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *due_to_error) { if (!t->is_client && !s->sent_trailing_metadata && grpc_error_has_clear_grpc_status(due_to_error)) { - close_from_api(exec_ctx, t, s, due_to_error); + close_from_api(t, s, due_to_error); return; } if (!s->read_closed || !s->write_closed) { if (s->id != 0) { grpc_http2_error_code http_error; - grpc_error_get_status(exec_ctx, due_to_error, s->deadline, NULL, NULL, - &http_error); + grpc_error_get_status(due_to_error, s->deadline, NULL, NULL, &http_error); grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error, &s->stats.outgoing)); - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); } } if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) { s->seen_error = true; } - grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, due_to_error); + grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error); } -void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, grpc_error *error) { +void grpc_chttp2_fake_status(grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_error *error) { grpc_status_code status; grpc_slice slice; - grpc_error_get_status(exec_ctx, error, s->deadline, &status, &slice, NULL); + grpc_error_get_status(error, s->deadline, &status, &slice, NULL); if (status != GRPC_STATUS_OK) { s->seen_error = true; @@ -2100,20 +2000,20 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_ltoa(status, status_string); GRPC_LOG_IF_ERROR("add_status", grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], + &s->metadata_buffer[1], grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_GRPC_STATUS, + GRPC_MDSTR_GRPC_STATUS, grpc_slice_from_copied_string(status_string)))); if (!GRPC_SLICE_IS_EMPTY(slice)) { GRPC_LOG_IF_ERROR( "add_status_message", grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + &s->metadata_buffer[1], + grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_MESSAGE, grpc_slice_ref_internal(slice)))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); } GRPC_ERROR_UNREF(error); @@ -2146,14 +2046,12 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s, return error; } -static void flush_write_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, grpc_chttp2_write_cb **list, - grpc_error *error) { +static void flush_write_list(grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_chttp2_write_cb **list, grpc_error *error) { while (*list) { grpc_chttp2_write_cb *cb = *list; *list = cb->next; - grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, - GRPC_ERROR_REF(error), + grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error), "on_write_finished_cb"); cb->next = t->write_cb_pool; t->write_cb_pool = cb; @@ -2161,37 +2059,34 @@ static void flush_write_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } -void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error) { error = removal_error(error, s, "Pending writes failed due to stream closure"); s->send_initial_metadata = NULL; - grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error), - "send_initial_metadata_finished"); + grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished, + GRPC_ERROR_REF(error), + "send_initial_metadata_finished"); s->send_trailing_metadata = NULL; - grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->send_trailing_metadata_finished, - GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); + grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished, + GRPC_ERROR_REF(error), + "send_trailing_metadata_finished"); s->fetching_send_message = NULL; - grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), - "fetching_send_message_finished"); - flush_write_list(exec_ctx, t, s, &s->on_write_finished_cbs, - GRPC_ERROR_REF(error)); - flush_write_list(exec_ctx, t, s, &s->on_flow_controlled_cbs, error); + grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished, + GRPC_ERROR_REF(error), + "fetching_send_message_finished"); + flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error)); + flush_write_list(t, s, &s->on_flow_controlled_cbs, error); } -void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport *t, grpc_chttp2_stream *s, int close_reads, int close_writes, grpc_error *error) { if (s->read_closed && s->write_closed) { /* already closed */ - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); GRPC_ERROR_UNREF(error); return; } @@ -2205,20 +2100,20 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, if (close_writes && !s->write_closed) { s->write_closed_error = GRPC_ERROR_REF(error); s->write_closed = true; - grpc_chttp2_fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error)); + grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error)); } if (s->read_closed && s->write_closed) { became_closed = true; grpc_error *overall_error = removal_error(GRPC_ERROR_REF(error), s, "Stream removed"); if (s->id != 0) { - remove_stream(exec_ctx, t, s->id, GRPC_ERROR_REF(overall_error)); + remove_stream(t, s->id, GRPC_ERROR_REF(overall_error)); } else { /* Purge streams waiting on concurrency still waiting for id assignment */ grpc_chttp2_list_remove_waiting_for_concurrency(t, s); } if (overall_error != GRPC_ERROR_NONE) { - grpc_chttp2_fake_status(exec_ctx, t, s, overall_error); + grpc_chttp2_fake_status(t, s, overall_error); } } if (closed_read) { @@ -2227,18 +2122,18 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE; } } - grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); - grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_initial_metadata(t, s); + grpc_chttp2_maybe_complete_recv_message(t, s); } if (became_closed) { - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2"); + grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); + GRPC_CHTTP2_STREAM_UNREF(s, "chttp2"); } GRPC_ERROR_UNREF(error); } -static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, grpc_error *error) { +static void close_from_api(grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_error *error) { grpc_slice hdr; grpc_slice status_hdr; grpc_slice http_status_hdr; @@ -2248,8 +2143,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint32_t len = 0; grpc_status_code grpc_status; grpc_slice slice; - grpc_error_get_status(exec_ctx, error, s->deadline, &grpc_status, &slice, - NULL); + grpc_error_get_status(error, s->deadline, &grpc_status, &slice, NULL); GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100); @@ -2391,13 +2285,12 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); - grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error); - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API); + grpc_chttp2_mark_stream_closed(t, s, 1, 1, error); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API); } typedef struct { - grpc_exec_ctx *exec_ctx; + ExecCtx *exec_ctx; grpc_error *error; grpc_chttp2_transport *t; } cancel_stream_cb_args; @@ -2405,12 +2298,10 @@ typedef struct { static void cancel_stream_cb(void *user_data, uint32_t key, void *stream) { cancel_stream_cb_args *args = (cancel_stream_cb_args *)user_data; grpc_chttp2_stream *s = (grpc_chttp2_stream *)stream; - grpc_chttp2_cancel_stream(args->exec_ctx, args->t, s, - GRPC_ERROR_REF(args->error)); + grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error)); } -static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_error *error) { +static void end_all_the_calls(grpc_chttp2_transport *t, grpc_error *error) { cancel_stream_cb_args args = {exec_ctx, error, t}; grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args); GRPC_ERROR_UNREF(error); @@ -2420,20 +2311,19 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, * INPUT PROCESSING - PARSING */ -void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, - grpc_chttp2_flowctl_action action, +void grpc_chttp2_act_on_flowctl_action(grpc_chttp2_flowctl_action action, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { switch (action.send_stream_update) { case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: break; case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_mark_stream_writable(t, s); grpc_chttp2_initiate_write( - exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL); + t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL); break; case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_mark_stream_writable(t, s); break; } switch (action.send_transport_update) { @@ -2441,7 +2331,7 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, break; case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: grpc_chttp2_initiate_write( - exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL); + t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL); break; // this is the same as no action b/c every time the transport enters the // writing path it will maybe do an update @@ -2450,23 +2340,20 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, } if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { if (action.initial_window_size > 0) { - queue_setting_update(exec_ctx, t, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + queue_setting_update(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, (uint32_t)action.initial_window_size); } if (action.max_frame_size > 0) { - queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, (uint32_t)action.max_frame_size); } if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) { - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS); } } } -static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +static grpc_error *try_http_parsing(grpc_chttp2_transport *t) { grpc_http_parser parser; size_t i = 0; grpc_error *error = GRPC_ERROR_NONE; @@ -2495,8 +2382,7 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, return error; } -static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { +static void read_action_locked(void *tp, grpc_error *error) { GPR_TIMER_BEGIN("reading_action_locked", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; @@ -2520,11 +2406,10 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { t->flow_control.bdp_estimator->AddIncomingBytes( (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); - errors[1] = - grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); + errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]); } if (errors[1] != GRPC_ERROR_NONE) { - errors[2] = try_http_parsing(exec_ctx, t); + errors[2] = try_http_parsing(t); GRPC_ERROR_UNREF(error); error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors)); @@ -2539,10 +2424,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (t->flow_control.initial_window_update > 0) { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_mark_stream_writable(t, s); grpc_chttp2_initiate_write( - exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); + t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); } } t->flow_control.initial_window_update = 0; @@ -2557,24 +2441,21 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, "Transport closed", &t->closed_with_error, 1); } if (error != GRPC_ERROR_NONE) { - close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); + close_transport_locked(t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; } else if (t->closed_with_error == GRPC_ERROR_NONE) { keep_reading = true; GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading"); } - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->read_buffer); + grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer); if (keep_reading) { - grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, - &t->read_action_locked); + grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked); grpc_chttp2_act_on_flowctl_action( - exec_ctx, - grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, - NULL); - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); + grpc_chttp2_flowctl_get_action(&t->flow_control, NULL), t, NULL); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "keep_reading"); } else { - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); } GPR_TIMER_END("post_reading_action_locked", 0); @@ -2586,15 +2467,12 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, // t is reffed prior to calling the first time, and once the callback chain // that kicks off finishes, it's unreffed -static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +static void schedule_bdp_ping_locked(grpc_chttp2_transport *t) { t->flow_control.bdp_estimator->SchedulePing(); - send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, - &t->finish_bdp_ping_locked); + send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); } -static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { +static void start_bdp_ping_locked(void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "%s: Start BDP ping err=%s", t->peer_string, @@ -2602,39 +2480,37 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, } /* Reset the keepalive ping timer */ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { - grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); + grpc_timer_cancel(&t->keepalive_ping_timer); } t->flow_control.bdp_estimator->StartPing(); } -static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, - grpc_error *error) { +static void finish_bdp_ping_locked(void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "%s: Complete BDP ping err=%s", t->peer_string, grpc_error_string(error)); } if (error != GRPC_ERROR_NONE) { - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } - grpc_millis next_ping = t->flow_control.bdp_estimator->CompletePing(exec_ctx); + grpc_millis next_ping = t->flow_control.bdp_estimator->CompletePing(); GPR_ASSERT(!t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = true; - grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping, + grpc_timer_init(&t->next_bdp_ping_timer, next_ping, &t->next_bdp_ping_timer_expired_locked); } -static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, - void *tp, grpc_error *error) { +static void next_bdp_ping_timer_expired_locked(void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; GPR_ASSERT(t->have_next_bdp_ping_timer); t->have_next_bdp_ping_timer = false; if (error != GRPC_ERROR_NONE) { - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); return; } - schedule_bdp_ping_locked(exec_ctx, t); + schedule_bdp_ping_locked(t); } void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, @@ -2695,8 +2571,7 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, } } -static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void init_keepalive_ping_locked(void *arg, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { @@ -2706,59 +2581,60 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_stream_map_size(&t->stream_map) > 0) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); - send_ping_locked(exec_ctx, t, &t->start_keepalive_ping_locked, + send_ping_locked(t, &t->start_keepalive_ping_locked, &t->finish_keepalive_ping_locked); - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); } else { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, - grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + grpc_timer_init(&t->keepalive_ping_timer, + grpc_exec_ctx_now() + t->keepalive_time, &t->init_keepalive_ping_locked); } } else if (error == GRPC_ERROR_CANCELLED) { /* The keepalive ping timer may be cancelled by bdp */ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, - grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + grpc_timer_init(&t->keepalive_ping_timer, + grpc_exec_ctx_now() + t->keepalive_time, &t->init_keepalive_ping_locked); } - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); } -static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void start_keepalive_ping_locked(void *arg, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); - grpc_timer_init(exec_ctx, &t->keepalive_watchdog_timer, - grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + grpc_timer_init(&t->keepalive_watchdog_timer, + grpc_exec_ctx_now() + t->keepalive_time, &t->keepalive_watchdog_fired_locked); } -static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void finish_keepalive_ping_locked(void *arg, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; - grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); + grpc_timer_cancel(&t->keepalive_watchdog_timer); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, - grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + grpc_timer_init(&t->keepalive_ping_timer, + grpc_exec_ctx_now() + t->keepalive_time, &t->init_keepalive_ping_locked); } } - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); } -static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void keepalive_watchdog_fired_locked(void *arg, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; close_transport_locked( - exec_ctx, t, + t, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "keepalive watchdog timeout"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL)); + close_transport_locked( + t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "keepalive watchdog timeout"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL)); @@ -2771,71 +2647,67 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); } } - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive watchdog"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); } /******************************************************************************* * CALLBACK LOOP */ -static void connectivity_state_set(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, +static void connectivity_state_set(grpc_chttp2_transport *t, grpc_connectivity_state state, grpc_error *error, const char *reason) { GRPC_CHTTP2_IF_TRACING( gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); - grpc_connectivity_state_set(exec_ctx, &t->channel_callback.state_tracker, - state, error, reason); + grpc_connectivity_state_set(&t->channel_callback.state_tracker, state, error, + reason); } /******************************************************************************* * POLLSET STUFF */ -static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset *pollset) { +static void set_pollset(grpc_transport *gt, grpc_stream *gs, + grpc_pollset *pollset) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset); + grpc_endpoint_add_to_pollset(t->ep, pollset); } -static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset_set *pollset_set) { +static void set_pollset_set(grpc_transport *gt, grpc_stream *gs, + grpc_pollset_set *pollset_set) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set); + grpc_endpoint_add_to_pollset_set(t->ep, pollset_set); } /******************************************************************************* * BYTE STREAM */ -static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void reset_byte_stream(void *arg, grpc_error *error) { grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg; s->pending_byte_stream = false; if (error == GRPC_ERROR_NONE) { - grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s); + grpc_chttp2_maybe_complete_recv_message(s->t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s); } else { GPR_ASSERT(error != GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error)); s->on_next = NULL; GRPC_ERROR_UNREF(s->byte_stream_error); s->byte_stream_error = GRPC_ERROR_NONE; - grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error)); + grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error)); s->byte_stream_error = GRPC_ERROR_REF(error); } } -static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs) { +static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) { if (gpr_unref(&bs->refs)) { gpr_free(bs); } } -static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, - void *argp, +static void incoming_byte_stream_next_locked(void *argp, grpc_error *error_ignored) { grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)argp; @@ -2848,31 +2720,30 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, bs->next_action.max_size_hint, cur_length); grpc_chttp2_act_on_flowctl_action( - exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, - &s->flow_control), - t, s); + grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control), t, + s); } GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); if (s->frame_storage.length > 0) { grpc_slice_buffer_swap(&s->frame_storage, &s->unprocessed_incoming_frames_buffer); s->unprocessed_incoming_frames_decompressed = false; - GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(bs->next_action.on_complete, GRPC_ERROR_NONE); } else if (s->byte_stream_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, + GRPC_CLOSURE_SCHED(bs->next_action.on_complete, GRPC_ERROR_REF(s->byte_stream_error)); if (s->data_parser.parsing_frame != NULL) { - incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame); + incoming_byte_stream_unref(s->data_parser.parsing_frame); s->data_parser.parsing_frame = NULL; } } else if (s->read_closed) { if (bs->remaining_bytes != 0) { s->byte_stream_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - GRPC_CLOSURE_SCHED(exec_ctx, bs->next_action.on_complete, + GRPC_CLOSURE_SCHED(bs->next_action.on_complete, GRPC_ERROR_REF(s->byte_stream_error)); if (s->data_parser.parsing_frame != NULL) { - incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame); + incoming_byte_stream_unref(s->data_parser.parsing_frame); s->data_parser.parsing_frame = NULL; } } else { @@ -2882,11 +2753,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, } else { s->on_next = bs->next_action.on_complete; } - incoming_byte_stream_unref(exec_ctx, bs); + incoming_byte_stream_unref(bs); } -static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, +static bool incoming_byte_stream_next(grpc_byte_stream *byte_stream, size_t max_size_hint, grpc_closure *on_complete) { GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); @@ -2901,7 +2771,6 @@ static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, bs->next_action.max_size_hint = max_size_hint; bs->next_action.on_complete = on_complete; GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&bs->next_action.closure, incoming_byte_stream_next_locked, bs, grpc_combiner_scheduler(bs->transport->combiner)), @@ -2911,8 +2780,7 @@ static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, } } -static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, +static grpc_error *incoming_byte_stream_pull(grpc_byte_stream *byte_stream, grpc_slice *slice) { GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0); grpc_chttp2_incoming_byte_stream *bs = @@ -2948,62 +2816,58 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, } } error = grpc_deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, - slice, NULL); + &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, slice, + NULL); if (error != GRPC_ERROR_NONE) { return error; } } else { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); return error; } GPR_TIMER_END("incoming_byte_stream_pull", 0); return GRPC_ERROR_NONE; } -static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, - void *byte_stream, +static void incoming_byte_stream_destroy_locked(void *byte_stream, grpc_error *error_ignored); -static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream) { +static void incoming_byte_stream_destroy(grpc_byte_stream *byte_stream) { GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0); grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT( - &bs->destroy_action, incoming_byte_stream_destroy_locked, - bs, grpc_combiner_scheduler(bs->transport->combiner)), + GRPC_CLOSURE_INIT(&bs->destroy_action, + incoming_byte_stream_destroy_locked, bs, + grpc_combiner_scheduler(bs->transport->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("incoming_byte_stream_destroy", 0); } static void incoming_byte_stream_publish_error( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error) { + grpc_chttp2_incoming_byte_stream *bs, grpc_error *error) { grpc_chttp2_stream *s = bs->stream; GPR_ASSERT(error != GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(exec_ctx, s->on_next, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error)); s->on_next = NULL; GRPC_ERROR_UNREF(s->byte_stream_error); s->byte_stream_error = GRPC_ERROR_REF(error); - grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream, - GRPC_ERROR_REF(error)); + grpc_chttp2_cancel_stream(bs->transport, bs->stream, GRPC_ERROR_REF(error)); } grpc_error *grpc_chttp2_incoming_byte_stream_push( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_slice slice, grpc_slice *slice_out) { + grpc_chttp2_incoming_byte_stream *bs, grpc_slice slice, + grpc_slice *slice_out) { grpc_chttp2_stream *s = bs->stream; if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"); - GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); - grpc_slice_unref_internal(exec_ctx, slice); + GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); + grpc_slice_unref_internal(slice); return error; } else { bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice); @@ -3015,8 +2879,8 @@ grpc_error *grpc_chttp2_incoming_byte_stream_push( } grpc_error *grpc_chttp2_incoming_byte_stream_finished( - grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error, bool reset_on_error) { + grpc_chttp2_incoming_byte_stream *bs, grpc_error *error, + bool reset_on_error) { grpc_chttp2_stream *s = bs->stream; if (error == GRPC_ERROR_NONE) { @@ -3025,27 +2889,25 @@ grpc_error *grpc_chttp2_incoming_byte_stream_finished( } } if (error != GRPC_ERROR_NONE && reset_on_error) { - GRPC_CLOSURE_SCHED(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); } - incoming_byte_stream_unref(exec_ctx, bs); + incoming_byte_stream_unref(bs); return error; } -static void incoming_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, +static void incoming_byte_stream_shutdown(grpc_byte_stream *byte_stream, grpc_error *error) { grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( - exec_ctx, bs, error, true /* reset_on_error */)); + bs, error, true /* reset_on_error */)); } static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = { incoming_byte_stream_next, incoming_byte_stream_pull, incoming_byte_stream_shutdown, incoming_byte_stream_destroy}; -static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, - void *byte_stream, +static void incoming_byte_stream_destroy_locked(void *byte_stream, grpc_error *error_ignored) { grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; @@ -3053,15 +2915,15 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t = s->t; GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable); - incoming_byte_stream_unref(exec_ctx, bs); + incoming_byte_stream_unref(bs); s->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_message(t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); } grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - uint32_t frame_size, uint32_t flags) { + grpc_chttp2_transport *t, grpc_chttp2_stream *s, uint32_t frame_size, + uint32_t flags) { grpc_chttp2_incoming_byte_stream *incoming_byte_stream = (grpc_chttp2_incoming_byte_stream *)gpr_malloc( sizeof(*incoming_byte_stream)); @@ -3081,30 +2943,25 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( * RESOURCE QUOTAS */ -static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +static void post_benign_reclaimer(grpc_chttp2_transport *t) { if (!t->benign_reclaimer_registered) { t->benign_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); - grpc_resource_user_post_reclaimer(exec_ctx, - grpc_endpoint_get_resource_user(t->ep), + grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), false, &t->benign_reclaimer_locked); } } -static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +static void post_destructive_reclaimer(grpc_chttp2_transport *t) { if (!t->destructive_reclaimer_registered) { t->destructive_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); - grpc_resource_user_post_reclaimer(exec_ctx, - grpc_endpoint_get_resource_user(t->ep), + grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), true, &t->destructive_reclaimer_locked); } } -static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void benign_reclaimer_locked(void *arg, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; if (error == GRPC_ERROR_NONE && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { @@ -3114,7 +2971,7 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory", t->peer_string); } - send_goaway(exec_ctx, t, + send_goaway(t, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); @@ -3128,13 +2985,12 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, t->benign_reclaimer_registered = false; if (error != GRPC_ERROR_CANCELLED) { grpc_resource_user_finish_reclamation( - exec_ctx, grpc_endpoint_get_resource_user(t->ep)); + grpc_endpoint_get_resource_user(t->ep)); } - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "benign_reclaimer"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); } -static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void destructive_reclaimer_locked(void *arg, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; size_t n = grpc_chttp2_stream_map_size(&t->stream_map); t->destructive_reclaimer_registered = false; @@ -3146,23 +3002,22 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, s->id); } grpc_chttp2_cancel_stream( - exec_ctx, t, s, - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), - GRPC_ERROR_INT_HTTP2_ERROR, - GRPC_HTTP2_ENHANCE_YOUR_CALM)); + t, s, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), + GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); if (n > 1) { /* Since we cancel one stream per destructive reclamation, if there are more streams left, we can immediately post a new reclaimer in case the resource quota needs to free more memory */ - post_destructive_reclaimer(exec_ctx, t); + post_destructive_reclaimer(t); } } if (error != GRPC_ERROR_CANCELLED) { grpc_resource_user_finish_reclamation( - exec_ctx, grpc_endpoint_get_resource_user(t->ep)); + grpc_endpoint_get_resource_user(t->ep)); } - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destructive_reclaimer"); + GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); } /******************************************************************************* @@ -3216,8 +3071,7 @@ const char *grpc_chttp2_initiate_write_reason_string( GPR_UNREACHABLE_CODE(return "unknown"); } -static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx, - grpc_transport *t) { +static grpc_endpoint *chttp2_get_endpoint(grpc_transport *t) { return ((grpc_chttp2_transport *)t)->ep; } @@ -3235,16 +3089,14 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), static const grpc_transport_vtable *get_vtable(void) { return &vtable; } grpc_transport *grpc_create_chttp2_transport( - grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, - grpc_endpoint *ep, int is_client) { + const grpc_channel_args *channel_args, grpc_endpoint *ep, int is_client) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gpr_zalloc(sizeof(grpc_chttp2_transport)); - init_transport(exec_ctx, t, channel_args, ep, is_client != 0); + init_transport(t, channel_args, ep, is_client != 0); return &t->base; } -void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, - grpc_transport *transport, +void grpc_chttp2_transport_start_reading(grpc_transport *transport, grpc_slice_buffer *read_buffer) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; GRPC_CHTTP2_REF_TRANSPORT( @@ -3253,5 +3105,5 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_move_into(read_buffer, &t->read_buffer); gpr_free(read_buffer); } - GRPC_CLOSURE_SCHED(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE); } |